diff --git a/bin/pump_from_stv2.py b/bin/pump_from_stv2.py new file mode 100644 index 0000000..c860756 --- /dev/null +++ b/bin/pump_from_stv2.py @@ -0,0 +1,87 @@ +"""Pump simulated OpenStack notificationss into RabbitMQ. + +You need to install rabbitqm-server and +pip install librabbitmq +pip install --pre notabene +pip install --pre notification_utils +""" + + +import datetime +import json +import sys +import time + +import mysql.connector +from notabene import kombu_driver as driver +import notification_utils as nu + + +connection = driver.create_connection("localhost", 5672, 'guest', 'guest', + "librabbitmq", "/") +exchange = driver.create_exchange("monitor", "topic") +queue_name = "monitor.info" +queue = driver.create_queue(queue_name, exchange, queue_name, + channel=connection.channel()) +queue.declare() + +cnx = mysql.connector.connect(user='root', password='password', + host='127.0.0.1', + database='stacktach') + +cursor = cnx.cursor() +query = ("SELECT min(stacktach_rawdata.when) AS mindate, " + "MAX(stacktach_rawdata.when) AS maxdate " + "FROM stacktach_rawdata") +cursor.execute(query) +mindate, maxdate = list(cursor)[0] +cursor.close() + +oldest = nu.dt_from_decimal(mindate) +newest = nu.dt_from_decimal(maxdate) + +# Move to the next day so we get all the events for the day. +start = oldest.replace(hour=0,minute=0,second=0,microsecond=0) \ + + datetime.timedelta(days=1) +end = start + datetime.timedelta(days=1, hours=4) + +print "Events from %s to %s" % (oldest, newest) +print "Extracting events from %s to %s" % (start, end) + +time.sleep(5) + +dstart = nu.dt_to_decimal(start) +dend = nu.dt_to_decimal(end) + +cursor = cnx.cursor() +query = ("SELECT stacktach_rawdata.when AS d, " + "stacktach_rawdata.json AS rawjson " + "FROM stacktach_rawdata " + "WHERE (stacktach_rawdata.when BETWEEN %f AND %f) " + "AND stacktach_rawdata.event!='compute.instance.updates' " + "AND stacktach_rawdata.event!='compute.instance.exists.verified' " + "ORDER BY stacktach_rawdata.when LIMIT 100000" % (dstart, dend)) +cursor.execute(query) + +start = None +end = None +num = 0 +for when, rawjson in cursor: + when = nu.dt_from_decimal(when) + if not start: + start = when + end = start + datetime.timedelta(days=1) + if when > end: + break + queue, event = json.loads(rawjson) + # Skip the noise ... + if event['event_type'] in ['compute.instance.update', 'compute.instance.exists.verified']: + continue + print when, event['event_type'] + driver.send_notification(event, queue_name, connection, exchange) + num+=1 + +print "Published %d events" % num + +cursor.close() +cnx.close() diff --git a/bin/time_sync.py b/bin/time_sync.py new file mode 100644 index 0000000..75a363d --- /dev/null +++ b/bin/time_sync.py @@ -0,0 +1,34 @@ +import datetime + +import falcon + + +# gunicorn --log-file=- 'time_sync:get_api()' +# +# To get the current time +# curl localhost:8000/time +# +# To set the current time +# curl --data "2014-10-09 22:55:33.111111" localhost:8000/time + +class TimeResource(object): + def __init__(self): + self.last_time = None + + def on_get(self, req, resp): + resp.body = self.last_time + print "GET", self.last_time + + def on_post(self, req, resp): + chunk = req.stream.read(4096) + self.last_time = chunk + print "POST", self.last_time + + +api = falcon.API() +time_resource = TimeResource() +api.add_route('/time', time_resource) + + +def get_api(): + return api