From 71895341fe0d2e7897807857d8234055aab77394 Mon Sep 17 00:00:00 2001 From: Sandy Walsh Date: Wed, 31 Oct 2012 10:18:56 -0300 Subject: [PATCH] switched to Decimal based time. Much cleaner. Stacky Watch command working --- stacktach/datetime_to_decimal.py | 28 +++++++++ stacktach/models.py | 9 +-- stacktach/stacky_server.py | 104 ++++++++++++++++++------------- stacktach/urls.py | 8 +-- stacktach/views.py | 43 +++++-------- templates/rows.html | 4 +- worker/worker.py | 16 +++-- 7 files changed, 122 insertions(+), 90 deletions(-) create mode 100644 stacktach/datetime_to_decimal.py diff --git a/stacktach/datetime_to_decimal.py b/stacktach/datetime_to_decimal.py new file mode 100644 index 0000000..5f72a3e --- /dev/null +++ b/stacktach/datetime_to_decimal.py @@ -0,0 +1,28 @@ +import calendar +import datetime +import decimal +import time + + +def dt_to_decimal(utc): + decimal.getcontext().prec = 6 + return decimal.Decimal(calendar.timegm(utc.utctimetuple()) + + utc.microsecond/float(1e6)) + + +def dt_from_decimal(dec): + integer = int(dec) + micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(1000000) + + daittyme = datetime.datetime.utcfromtimestamp(integer) + return daittyme.replace(microsecond=micro) + + +if __name__ == '__main__': + now = datetime.datetime.utcnow() + d = dt_to_decimal(now) + daittyme = dt_from_decimal(d) + print repr(now) + print repr(d) + print repr(daittyme) + assert(now == daittyme) diff --git a/stacktach/models.py b/stacktach/models.py index 124cc78..82e5c56 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -38,8 +38,7 @@ class RawData(models.Model): blank=True, db_index=True) old_task = models.CharField(max_length=30, null=True, blank=True, db_index=True) - when = models.DateTimeField(db_index=True) - microseconds = models.IntegerField(default=0, db_index=True) + when = models.DecimalField(max_digits=20, decimal_places=6) publisher = models.CharField(max_length=100, null=True, blank=True, db_index=True) event = models.CharField(max_length=50, null=True, @@ -73,10 +72,8 @@ class Timing(models.Model): start_raw = models.ForeignKey(RawData, related_name='+', null=True) end_raw = models.ForeignKey(RawData, related_name='+', null=True) - start_when = models.DateTimeField(db_index=True, null=True) - start_ms = models.IntegerField(default=0) - end_when = models.DateTimeField(db_index=True, null=True) - end_ms = models.IntegerField(default=3) + start_when = models.DecimalField(null=True, max_digits=20, decimal_places=6) + end_when = models.DecimalField(null=True, max_digits=20, decimal_places=6) diff_days = models.IntegerField(default=0) diff_seconds = models.IntegerField(default=0) diff --git a/stacktach/stacky_server.py b/stacktach/stacky_server.py index c3037eb..bdbbb1f 100644 --- a/stacktach/stacky_server.py +++ b/stacktach/stacky_server.py @@ -1,9 +1,11 @@ +import decimal import datetime import json from django.db.models import Q from django.http import HttpResponse +import datetime_to_decimal as dt import models import views @@ -112,25 +114,27 @@ def do_hosts(request): def do_uuid(request): uuid = str(request.GET['uuid']) related = models.RawData.objects.select_related(). \ - filter(instance=uuid).order_by('when', 'microseconds') + filter(instance=uuid).order_by('when') results = [] results.append(["#", "?", "When", "Deployment", "Event", "Host", "State", "State'", "Task'"]) for e in related: - results.append([e.id, routing_key_type(e.routing_key), str(e.when), + when = dt.dt_from_decimal(e.when) + results.append([e.id, routing_key_type(e.routing_key), str(when), e.deployment.name, e.event, e.host, e.state, e.old_state, e.old_task]) return rsp(results) -def do_timings(request, name): +def do_timings(request): + name = request.GET['name'] results = [] results.append([name, "Time"]) timings = models.Timing.objects.select_related().filter(name=name) \ - .exclude(Q(start_raw=None) | Q(end_raw=None)) \ - .order_by('diff_days', 'diff_seconds', - 'diff_usecs') + .exclude(Q(start_raw=None) | Q(end_raw=None)) \ + .order_by('diff_days', 'diff_seconds', + 'diff_usecs') for t in timings: seconds = seconds_from_timing(t) @@ -175,12 +179,13 @@ def do_summary(request): def do_request(request): request_id = request.GET['request_id'] events = models.RawData.objects.filter(request_id=request_id) \ - .order_by('when', 'microseconds') + .order_by('when') results = [] results.append(["#", "?", "When", "Deployment", "Event", "Host", "State", "State'", "Task'"]) for e in events: - results.append([e.id, routing_key_type(e.routing_key), str(e.when), + when = dt.dt_from_decimal(e.when) + results.append([e.id, routing_key_type(e.routing_key), str(when), e.deployment.name, e.event, e.host, e.state, e.old_state, e.old_task]) @@ -198,6 +203,8 @@ def do_show(request, event_id): results.append(["Key", "Value"]) results.append(["#", event.id]) + when = dt.dt_from_decimal(event.when) + results.append(["When", str(when)]) results.append(["Deployment", event.deployment.name]) results.append(["Category", event.routing_key]) results.append(["Publisher", event.publisher]) @@ -216,7 +223,11 @@ def do_show(request, event_id): return rsp(final) -def do_watch(request, deployment_id=None, event_name="", since=None): +def do_watch(request, deployment_id): + deployment_id = int(deployment_id) + since = request.GET.get('since') + event_name = request.GET.get('event_name') + deployment_map = {} for d in get_deployments(): deployment_map[d.id] = d @@ -225,47 +236,52 @@ def do_watch(request, deployment_id=None, event_name="", since=None): hosts = get_host_names() max_host_width = max([len(host['host']) for host in hosts]) - deployment = None - - if deployment_id: - deployment = models.Deployment.objects.get(id=deployment_id) - - base_events = models.RawData.objects.order_by('-when', '-microseconds') - if tenant: + base_events = models.RawData.objects.order_by('when') + if deployment_id > 0: base_events = base_events.filter(deployment=deployment_id) + if event_name: base_events = base_events.filter(event=event_name) - if since: - since = datetime.datetime.strptime(since, "%Y-%m-%d %H:%M:%S.%f") - events = events.filter(when__gt=since) - events = events[:20] - c = [10, 1, 10, 20, max_event_width, 36] + # Ok, this may seem a little wonky, but I'm clamping the + # query time to the closest second. The implication is we + # may not return the absolute latest data (which will have + # to wait for the next query). The upside of doing this + # is we can effectively cache the responses. So, with a + # caching proxy server we can service a lot more clients + # without having to worry about microsecond differences + # causing cache misses. + + now = datetime.datetime.utcnow() + now = now.replace(microsecond=0) # clamp it down. + dec_now = dt.dt_to_decimal(now) + if since: + since = decimal.Decimal(since) + else: + since = now - datetime.timedelta(seconds=2) + since = dt.dt_to_decimal(since) + base_events = base_events.filter(when__gt=since) + events = base_events.filter(when__lte=dec_now) + + c = [10, 1, 15, 20, max_event_width, 36] header = ("+%s" * len(c)) + "+" splat = header.replace("+", "|") results = [] - results.append([''.center(col, '-') for col in c]) - results.append(['#'.center(c[0]), '?', - str(event.when.date()).center(c[2]), - 'Deployment'.center(c[3]), - 'Event'.center(c[4]), - 'UUID'.center(c[5])]) - results.append([''.center(col, '-') for col in c]) - last = None - for event in events: - uuid = event.instance + + for raw in events: + uuid = raw.instance if not uuid: uuid = "-" - typ = routing_key_type(event.routing_key) - results.append([str(event.id).center(c[0]), - typ, - str(event.when.time()).center(c[2]), - deployment_map[event.deployment.id].name.center(c[3]), - event.event.center(c[4]), - uuid.center(c[5])]) - last = event.when - return rsp([results, last]) + typ = routing_key_type(raw.routing_key) + when = dt.dt_from_decimal(raw.when) + results.append([raw.id, typ, + str(when.date()), str(when.time()), + deployment_map[raw.deployment.id].name, + raw.event, + uuid]) + + return rsp([c, results, str(dec_now)]) def do_kpi(request): @@ -277,7 +293,7 @@ def do_kpi(request): Q(event="compute.instance.update")) \ .only('event', 'host', 'request_id', 'instance', 'deployment') \ - .order_by('when', 'microseconds') + .order_by('when') events = list(events) instance_map = {} # { uuid: [(request_id, start_event, end_event), ...] } @@ -317,10 +333,8 @@ def do_kpi(request): if not end_event: continue event = end_event.event[:-len(".end")] - start = views._make_datetime_from_raw(start_event.when, - start_event.microseconds) - end = views._make_datetime_from_raw(end_event.when, - end_event.microseconds) + start = dt.dt_from_decimal(start_event.when) + end = dt.dt_from_decimal(end_event.when) diff = end - start results.append([event, sec_to_time(seconds_from_timedelta( diff.days, diff.seconds, diff.microseconds)), uuid, diff --git a/stacktach/urls.py b/stacktach/urls.py index a2f1766..5948d94 100644 --- a/stacktach/urls.py +++ b/stacktach/urls.py @@ -8,19 +8,13 @@ urlpatterns = patterns('', url(r'stacky/events', 'stacktach.stacky_server.do_events'), url(r'stacky/hosts', 'stacktach.stacky_server.do_hosts'), url(r'stacky/uuid', 'stacktach.stacky_server.do_uuid'), - url(r'stacky/timings/(?P\w+)', 'stacktach.stacky_server.do_timings'), + url(r'stacky/timings', 'stacktach.stacky_server.do_timings'), url(r'stacky/summary', 'stacktach.stacky_server.do_summary'), url(r'stacky/request', 'stacktach.stacky_server.do_request'), url(r'stacky/show/(?P\d+)', 'stacktach.stacky_server.do_show'), - url(r'stacky/watch', 'stacktach.stacky_server.do_watch'), url(r'stacky/watch/(?P\d+)', 'stacktach.stacky_server.do_watch'), - url(r'stacky/watch/(?P\d+)/(?P\w+)', - 'stacktach.stacky_server.do_watch'), - url(r'stacky/watch/(?P\d+)/(?P\w+)/' - '(?P\w+)', - 'stacktach.stacky_server.do_watch'), url(r'stacky/kpi', 'stacktach.stacky_server.do_kpi'), url(r'^(?P\d+)/$', 'stacktach.views.home', name='home'), diff --git a/stacktach/views.py b/stacktach/views.py index 57bf1d2..89a2d4c 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -5,6 +5,7 @@ from django import http from django import template from stacktach import models +from stacktach import datetime_to_decimal as dt import datetime import json @@ -73,14 +74,6 @@ HANDLERS = {'monitor.info':_monitor_message, '':_compute_update_message} -def _make_datetime_from_raw(dt, ms): - if dt is None: - return None - return datetime.datetime(day=dt.day, month=dt.month, year=dt.year, - hour=dt.hour, minute=dt.minute, second=dt.second, - microsecond=ms) - - def aggregate(raw): """Roll up the raw event into a Lifecycle object and a bunch of Timing objects. @@ -136,7 +129,6 @@ def aggregate(raw): if start: timing.start_raw = raw timing.start_when = raw.when - timing.start_ms = raw.microseconds # Erase all the other fields which may have been set # the first time this operation was performed. @@ -144,19 +136,17 @@ def aggregate(raw): # We'll only record the last one, but track that 3 were done. timing.end_raw = None timing.end_when = None - timing.end_ms = 0 timing.diff_when = None timing.diff_ms = 0 else: timing.end_raw = raw timing.end_when = raw.when - timing.end_ms = raw.microseconds - start = _make_datetime_from_raw(timing.start_when, timing.start_ms) - end = _make_datetime_from_raw(timing.end_when, timing.end_ms) + end = dt.dt_from_decimal(timing.end_when) # We could have missed start so watch out ... - if start and end: + if timing.start_when and end: + start = dt.dt_from_decimal(timing.start_when) diff = end - start timing.diff_days = diff.days timing.diff_seconds = diff.seconds @@ -184,18 +174,17 @@ def process_raw_data(deployment, args, json_args): except ValueError: # Old way of doing it when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f") - values['microseconds'] = when.microsecond except Exception, e: pass - values['when'] = when + values['when'] = dt.dt_to_decimal(when) values['routing_key'] = routing_key values['json'] = json_args record = models.RawData(**values) record.save() aggregate(record) - return values - return {} + return record + return None def _post_process_raw_data(rows, highlight=None): @@ -204,7 +193,7 @@ def _post_process_raw_data(rows, highlight=None): row.is_error = True if highlight and row.id == int(highlight): row.highlight = True - row.when += datetime.timedelta(microseconds=row.microseconds) + row.fwhen = dt.dt_from_decimal(row.when) def _default_context(request, deployment_id=0): @@ -250,12 +239,14 @@ def details(request, deployment_id, column, row_id): if column != 'when': rows = rows.filter(**{column:value}) else: - value += datetime.timedelta(microseconds=row.microseconds) - from_time = value - datetime.timedelta(minutes=1) - to_time = value + datetime.timedelta(minutes=1) - rows = rows.filter(when__range=(from_time, to_time)) + when = dt.dt_from_decimal(value) + from_time = when - datetime.timedelta(minutes=1) + to_time = when + datetime.timedelta(minutes=1) + from_time_dec = dt.dt_to_decimal(from_time) + to_time_dec = dt.dt_to_decimal(to_time) + rows = rows.filter(when__range=(from_time_dec, to_time_dec)) - rows = rows.order_by('-when', '-microseconds')[:200] + rows = rows.order_by('-when')[:200] _post_process_raw_data(rows, highlight=row_id) c['rows'] = rows c['allow_expansion'] = True @@ -279,7 +270,7 @@ def latest_raw(request, deployment_id): query = models.RawData.objects.select_related() if deployment_id > 0: query = query.filter(deployment=deployment_id) - rows = query.order_by('-when', '-microseconds')[:20] + rows = query.order_by('-when')[:20] _post_process_raw_data(rows) c['rows'] = rows return render_to_response('host_status.html', c) @@ -295,7 +286,7 @@ def search(request, deployment_id): if deployment_id: row = rows.filter(deployment=deployment_id) rows = rows.filter(**{column:value}). \ - order_by('-when', '-microseconds')[:22] + order_by('-when')[:22] _post_process_raw_data(rows) c['rows'] = rows c['allow_expansion'] = True diff --git a/templates/rows.html b/templates/rows.html index ec46f92..8c888d7 100644 --- a/templates/rows.html +++ b/templates/rows.html @@ -65,8 +65,8 @@ - {% if show_absolute_time %}{{row.when}} (+{{row.when.microsecond}}) - {%else%}{{row.when|timesince:utc}} ago{%endif%} + {% if show_absolute_time %}{{row.fwhen}} + {%else%}{{row.fwhen|timesince:utc}} ago{%endif%} {% if allow_expansion %} diff --git a/worker/worker.py b/worker/worker.py index 61ae14d..421ae84 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -25,6 +25,7 @@ import logging import time from stacktach import models, views +from stacktach import datetime_to_decimal as dt LOG = logging.getLogger(__name__) @@ -47,9 +48,10 @@ nova_queues = [ class NovaConsumer(kombu.mixins.ConsumerMixin): - def __init__(self, connection, deployment): + def __init__(self, name, connection, deployment): self.connection = connection self.deployment = deployment + self.name = name def get_consumers(self, Consumer, channel): return [Consumer(queues=nova_queues, callbacks=[self.on_nova])] @@ -62,8 +64,14 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): args = (routing_key, json.loads(message.body)) asJson = json.dumps(args) - views.process_raw_data(self.deployment, args, asJson) - self.logger.debug("Recorded %s ", routing_key) + raw = views.process_raw_data(self.deployment, args, asJson) + if not raw: + LOG.debug("No record from %s", routing_key) + else: + LOG.debug("Recorded rec# %d from %s/%s at %s (%.6f)" % + (raw.id, self.name, routing_key, + str(dt.dt_from_decimal(raw.when)), + float(raw.when))) def on_nova(self, body, message): self._process(body, message) @@ -92,7 +100,7 @@ def run(deployment_config): while True: with kombu.connection.BrokerConnection(**params) as conn: try: - consumer = NovaConsumer(conn, deployment) + consumer = NovaConsumer(name, conn, deployment) consumer.run() except Exception as e: LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %