switched to Decimal based time. Much cleaner. Stacky Watch command working
This commit is contained in:
parent
cbc96b6b52
commit
71895341fe
28
stacktach/datetime_to_decimal.py
Normal file
28
stacktach/datetime_to_decimal.py
Normal file
@ -0,0 +1,28 @@
|
||||
import calendar
|
||||
import datetime
|
||||
import decimal
|
||||
import time
|
||||
|
||||
|
||||
def dt_to_decimal(utc):
|
||||
decimal.getcontext().prec = 6
|
||||
return decimal.Decimal(calendar.timegm(utc.utctimetuple()) +
|
||||
utc.microsecond/float(1e6))
|
||||
|
||||
|
||||
def dt_from_decimal(dec):
|
||||
integer = int(dec)
|
||||
micro = (dec - decimal.Decimal(integer)) * decimal.Decimal(1000000)
|
||||
|
||||
daittyme = datetime.datetime.utcfromtimestamp(integer)
|
||||
return daittyme.replace(microsecond=micro)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
now = datetime.datetime.utcnow()
|
||||
d = dt_to_decimal(now)
|
||||
daittyme = dt_from_decimal(d)
|
||||
print repr(now)
|
||||
print repr(d)
|
||||
print repr(daittyme)
|
||||
assert(now == daittyme)
|
@ -38,8 +38,7 @@ class RawData(models.Model):
|
||||
blank=True, db_index=True)
|
||||
old_task = models.CharField(max_length=30, null=True,
|
||||
blank=True, db_index=True)
|
||||
when = models.DateTimeField(db_index=True)
|
||||
microseconds = models.IntegerField(default=0, db_index=True)
|
||||
when = models.DecimalField(max_digits=20, decimal_places=6)
|
||||
publisher = models.CharField(max_length=100, null=True,
|
||||
blank=True, db_index=True)
|
||||
event = models.CharField(max_length=50, null=True,
|
||||
@ -73,10 +72,8 @@ class Timing(models.Model):
|
||||
start_raw = models.ForeignKey(RawData, related_name='+', null=True)
|
||||
end_raw = models.ForeignKey(RawData, related_name='+', null=True)
|
||||
|
||||
start_when = models.DateTimeField(db_index=True, null=True)
|
||||
start_ms = models.IntegerField(default=0)
|
||||
end_when = models.DateTimeField(db_index=True, null=True)
|
||||
end_ms = models.IntegerField(default=3)
|
||||
start_when = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
||||
end_when = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
||||
|
||||
diff_days = models.IntegerField(default=0)
|
||||
diff_seconds = models.IntegerField(default=0)
|
||||
|
@ -1,9 +1,11 @@
|
||||
import decimal
|
||||
import datetime
|
||||
import json
|
||||
|
||||
from django.db.models import Q
|
||||
from django.http import HttpResponse
|
||||
|
||||
import datetime_to_decimal as dt
|
||||
import models
|
||||
import views
|
||||
|
||||
@ -112,25 +114,27 @@ def do_hosts(request):
|
||||
def do_uuid(request):
|
||||
uuid = str(request.GET['uuid'])
|
||||
related = models.RawData.objects.select_related(). \
|
||||
filter(instance=uuid).order_by('when', 'microseconds')
|
||||
filter(instance=uuid).order_by('when')
|
||||
results = []
|
||||
results.append(["#", "?", "When", "Deployment", "Event", "Host",
|
||||
"State", "State'", "Task'"])
|
||||
for e in related:
|
||||
results.append([e.id, routing_key_type(e.routing_key), str(e.when),
|
||||
when = dt.dt_from_decimal(e.when)
|
||||
results.append([e.id, routing_key_type(e.routing_key), str(when),
|
||||
e.deployment.name, e.event,
|
||||
e.host,
|
||||
e.state, e.old_state, e.old_task])
|
||||
return rsp(results)
|
||||
|
||||
|
||||
def do_timings(request, name):
|
||||
def do_timings(request):
|
||||
name = request.GET['name']
|
||||
results = []
|
||||
results.append([name, "Time"])
|
||||
timings = models.Timing.objects.select_related().filter(name=name) \
|
||||
.exclude(Q(start_raw=None) | Q(end_raw=None)) \
|
||||
.order_by('diff_days', 'diff_seconds',
|
||||
'diff_usecs')
|
||||
.exclude(Q(start_raw=None) | Q(end_raw=None)) \
|
||||
.order_by('diff_days', 'diff_seconds',
|
||||
'diff_usecs')
|
||||
|
||||
for t in timings:
|
||||
seconds = seconds_from_timing(t)
|
||||
@ -175,12 +179,13 @@ def do_summary(request):
|
||||
def do_request(request):
|
||||
request_id = request.GET['request_id']
|
||||
events = models.RawData.objects.filter(request_id=request_id) \
|
||||
.order_by('when', 'microseconds')
|
||||
.order_by('when')
|
||||
results = []
|
||||
results.append(["#", "?", "When", "Deployment", "Event", "Host",
|
||||
"State", "State'", "Task'"])
|
||||
for e in events:
|
||||
results.append([e.id, routing_key_type(e.routing_key), str(e.when),
|
||||
when = dt.dt_from_decimal(e.when)
|
||||
results.append([e.id, routing_key_type(e.routing_key), str(when),
|
||||
e.deployment.name, e.event,
|
||||
e.host, e.state,
|
||||
e.old_state, e.old_task])
|
||||
@ -198,6 +203,8 @@ def do_show(request, event_id):
|
||||
|
||||
results.append(["Key", "Value"])
|
||||
results.append(["#", event.id])
|
||||
when = dt.dt_from_decimal(event.when)
|
||||
results.append(["When", str(when)])
|
||||
results.append(["Deployment", event.deployment.name])
|
||||
results.append(["Category", event.routing_key])
|
||||
results.append(["Publisher", event.publisher])
|
||||
@ -216,7 +223,11 @@ def do_show(request, event_id):
|
||||
return rsp(final)
|
||||
|
||||
|
||||
def do_watch(request, deployment_id=None, event_name="", since=None):
|
||||
def do_watch(request, deployment_id):
|
||||
deployment_id = int(deployment_id)
|
||||
since = request.GET.get('since')
|
||||
event_name = request.GET.get('event_name')
|
||||
|
||||
deployment_map = {}
|
||||
for d in get_deployments():
|
||||
deployment_map[d.id] = d
|
||||
@ -225,47 +236,52 @@ def do_watch(request, deployment_id=None, event_name="", since=None):
|
||||
hosts = get_host_names()
|
||||
max_host_width = max([len(host['host']) for host in hosts])
|
||||
|
||||
deployment = None
|
||||
|
||||
if deployment_id:
|
||||
deployment = models.Deployment.objects.get(id=deployment_id)
|
||||
|
||||
base_events = models.RawData.objects.order_by('-when', '-microseconds')
|
||||
if tenant:
|
||||
base_events = models.RawData.objects.order_by('when')
|
||||
if deployment_id > 0:
|
||||
base_events = base_events.filter(deployment=deployment_id)
|
||||
|
||||
if event_name:
|
||||
base_events = base_events.filter(event=event_name)
|
||||
if since:
|
||||
since = datetime.datetime.strptime(since, "%Y-%m-%d %H:%M:%S.%f")
|
||||
events = events.filter(when__gt=since)
|
||||
events = events[:20]
|
||||
|
||||
c = [10, 1, 10, 20, max_event_width, 36]
|
||||
# Ok, this may seem a little wonky, but I'm clamping the
|
||||
# query time to the closest second. The implication is we
|
||||
# may not return the absolute latest data (which will have
|
||||
# to wait for the next query). The upside of doing this
|
||||
# is we can effectively cache the responses. So, with a
|
||||
# caching proxy server we can service a lot more clients
|
||||
# without having to worry about microsecond differences
|
||||
# causing cache misses.
|
||||
|
||||
now = datetime.datetime.utcnow()
|
||||
now = now.replace(microsecond=0) # clamp it down.
|
||||
dec_now = dt.dt_to_decimal(now)
|
||||
if since:
|
||||
since = decimal.Decimal(since)
|
||||
else:
|
||||
since = now - datetime.timedelta(seconds=2)
|
||||
since = dt.dt_to_decimal(since)
|
||||
base_events = base_events.filter(when__gt=since)
|
||||
events = base_events.filter(when__lte=dec_now)
|
||||
|
||||
c = [10, 1, 15, 20, max_event_width, 36]
|
||||
header = ("+%s" * len(c)) + "+"
|
||||
splat = header.replace("+", "|")
|
||||
|
||||
results = []
|
||||
results.append([''.center(col, '-') for col in c])
|
||||
results.append(['#'.center(c[0]), '?',
|
||||
str(event.when.date()).center(c[2]),
|
||||
'Deployment'.center(c[3]),
|
||||
'Event'.center(c[4]),
|
||||
'UUID'.center(c[5])])
|
||||
results.append([''.center(col, '-') for col in c])
|
||||
last = None
|
||||
for event in events:
|
||||
uuid = event.instance
|
||||
|
||||
for raw in events:
|
||||
uuid = raw.instance
|
||||
if not uuid:
|
||||
uuid = "-"
|
||||
typ = routing_key_type(event.routing_key)
|
||||
results.append([str(event.id).center(c[0]),
|
||||
typ,
|
||||
str(event.when.time()).center(c[2]),
|
||||
deployment_map[event.deployment.id].name.center(c[3]),
|
||||
event.event.center(c[4]),
|
||||
uuid.center(c[5])])
|
||||
last = event.when
|
||||
return rsp([results, last])
|
||||
typ = routing_key_type(raw.routing_key)
|
||||
when = dt.dt_from_decimal(raw.when)
|
||||
results.append([raw.id, typ,
|
||||
str(when.date()), str(when.time()),
|
||||
deployment_map[raw.deployment.id].name,
|
||||
raw.event,
|
||||
uuid])
|
||||
|
||||
return rsp([c, results, str(dec_now)])
|
||||
|
||||
|
||||
def do_kpi(request):
|
||||
@ -277,7 +293,7 @@ def do_kpi(request):
|
||||
Q(event="compute.instance.update")) \
|
||||
.only('event', 'host', 'request_id',
|
||||
'instance', 'deployment') \
|
||||
.order_by('when', 'microseconds')
|
||||
.order_by('when')
|
||||
|
||||
events = list(events)
|
||||
instance_map = {} # { uuid: [(request_id, start_event, end_event), ...] }
|
||||
@ -317,10 +333,8 @@ def do_kpi(request):
|
||||
if not end_event:
|
||||
continue
|
||||
event = end_event.event[:-len(".end")]
|
||||
start = views._make_datetime_from_raw(start_event.when,
|
||||
start_event.microseconds)
|
||||
end = views._make_datetime_from_raw(end_event.when,
|
||||
end_event.microseconds)
|
||||
start = dt.dt_from_decimal(start_event.when)
|
||||
end = dt.dt_from_decimal(end_event.when)
|
||||
diff = end - start
|
||||
results.append([event, sec_to_time(seconds_from_timedelta(
|
||||
diff.days, diff.seconds, diff.microseconds)), uuid,
|
||||
|
@ -8,19 +8,13 @@ urlpatterns = patterns('',
|
||||
url(r'stacky/events', 'stacktach.stacky_server.do_events'),
|
||||
url(r'stacky/hosts', 'stacktach.stacky_server.do_hosts'),
|
||||
url(r'stacky/uuid', 'stacktach.stacky_server.do_uuid'),
|
||||
url(r'stacky/timings/(?P<name>\w+)', 'stacktach.stacky_server.do_timings'),
|
||||
url(r'stacky/timings', 'stacktach.stacky_server.do_timings'),
|
||||
url(r'stacky/summary', 'stacktach.stacky_server.do_summary'),
|
||||
url(r'stacky/request', 'stacktach.stacky_server.do_request'),
|
||||
url(r'stacky/show/(?P<event_id>\d+)',
|
||||
'stacktach.stacky_server.do_show'),
|
||||
url(r'stacky/watch', 'stacktach.stacky_server.do_watch'),
|
||||
url(r'stacky/watch/(?P<deployment_id>\d+)',
|
||||
'stacktach.stacky_server.do_watch'),
|
||||
url(r'stacky/watch/(?P<deployment_id>\d+)/(?P<event_name>\w+)',
|
||||
'stacktach.stacky_server.do_watch'),
|
||||
url(r'stacky/watch/(?P<deployment_id>\d+)/(?P<event_name>\w+)/'
|
||||
'(?P<since>\w+)',
|
||||
'stacktach.stacky_server.do_watch'),
|
||||
url(r'stacky/kpi', 'stacktach.stacky_server.do_kpi'),
|
||||
|
||||
url(r'^(?P<deployment_id>\d+)/$', 'stacktach.views.home', name='home'),
|
||||
|
@ -5,6 +5,7 @@ from django import http
|
||||
from django import template
|
||||
|
||||
from stacktach import models
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
|
||||
import datetime
|
||||
import json
|
||||
@ -73,14 +74,6 @@ HANDLERS = {'monitor.info':_monitor_message,
|
||||
'':_compute_update_message}
|
||||
|
||||
|
||||
def _make_datetime_from_raw(dt, ms):
|
||||
if dt is None:
|
||||
return None
|
||||
return datetime.datetime(day=dt.day, month=dt.month, year=dt.year,
|
||||
hour=dt.hour, minute=dt.minute, second=dt.second,
|
||||
microsecond=ms)
|
||||
|
||||
|
||||
def aggregate(raw):
|
||||
"""Roll up the raw event into a Lifecycle object
|
||||
and a bunch of Timing objects.
|
||||
@ -136,7 +129,6 @@ def aggregate(raw):
|
||||
if start:
|
||||
timing.start_raw = raw
|
||||
timing.start_when = raw.when
|
||||
timing.start_ms = raw.microseconds
|
||||
|
||||
# Erase all the other fields which may have been set
|
||||
# the first time this operation was performed.
|
||||
@ -144,19 +136,17 @@ def aggregate(raw):
|
||||
# We'll only record the last one, but track that 3 were done.
|
||||
timing.end_raw = None
|
||||
timing.end_when = None
|
||||
timing.end_ms = 0
|
||||
|
||||
timing.diff_when = None
|
||||
timing.diff_ms = 0
|
||||
else:
|
||||
timing.end_raw = raw
|
||||
timing.end_when = raw.when
|
||||
timing.end_ms = raw.microseconds
|
||||
start = _make_datetime_from_raw(timing.start_when, timing.start_ms)
|
||||
end = _make_datetime_from_raw(timing.end_when, timing.end_ms)
|
||||
end = dt.dt_from_decimal(timing.end_when)
|
||||
|
||||
# We could have missed start so watch out ...
|
||||
if start and end:
|
||||
if timing.start_when and end:
|
||||
start = dt.dt_from_decimal(timing.start_when)
|
||||
diff = end - start
|
||||
timing.diff_days = diff.days
|
||||
timing.diff_seconds = diff.seconds
|
||||
@ -184,18 +174,17 @@ def process_raw_data(deployment, args, json_args):
|
||||
except ValueError:
|
||||
# Old way of doing it
|
||||
when = datetime.datetime.strptime(when, "%Y-%m-%dT%H:%M:%S.%f")
|
||||
values['microseconds'] = when.microsecond
|
||||
except Exception, e:
|
||||
pass
|
||||
values['when'] = when
|
||||
values['when'] = dt.dt_to_decimal(when)
|
||||
values['routing_key'] = routing_key
|
||||
values['json'] = json_args
|
||||
record = models.RawData(**values)
|
||||
record.save()
|
||||
|
||||
aggregate(record)
|
||||
return values
|
||||
return {}
|
||||
return record
|
||||
return None
|
||||
|
||||
|
||||
def _post_process_raw_data(rows, highlight=None):
|
||||
@ -204,7 +193,7 @@ def _post_process_raw_data(rows, highlight=None):
|
||||
row.is_error = True
|
||||
if highlight and row.id == int(highlight):
|
||||
row.highlight = True
|
||||
row.when += datetime.timedelta(microseconds=row.microseconds)
|
||||
row.fwhen = dt.dt_from_decimal(row.when)
|
||||
|
||||
|
||||
def _default_context(request, deployment_id=0):
|
||||
@ -250,12 +239,14 @@ def details(request, deployment_id, column, row_id):
|
||||
if column != 'when':
|
||||
rows = rows.filter(**{column:value})
|
||||
else:
|
||||
value += datetime.timedelta(microseconds=row.microseconds)
|
||||
from_time = value - datetime.timedelta(minutes=1)
|
||||
to_time = value + datetime.timedelta(minutes=1)
|
||||
rows = rows.filter(when__range=(from_time, to_time))
|
||||
when = dt.dt_from_decimal(value)
|
||||
from_time = when - datetime.timedelta(minutes=1)
|
||||
to_time = when + datetime.timedelta(minutes=1)
|
||||
from_time_dec = dt.dt_to_decimal(from_time)
|
||||
to_time_dec = dt.dt_to_decimal(to_time)
|
||||
rows = rows.filter(when__range=(from_time_dec, to_time_dec))
|
||||
|
||||
rows = rows.order_by('-when', '-microseconds')[:200]
|
||||
rows = rows.order_by('-when')[:200]
|
||||
_post_process_raw_data(rows, highlight=row_id)
|
||||
c['rows'] = rows
|
||||
c['allow_expansion'] = True
|
||||
@ -279,7 +270,7 @@ def latest_raw(request, deployment_id):
|
||||
query = models.RawData.objects.select_related()
|
||||
if deployment_id > 0:
|
||||
query = query.filter(deployment=deployment_id)
|
||||
rows = query.order_by('-when', '-microseconds')[:20]
|
||||
rows = query.order_by('-when')[:20]
|
||||
_post_process_raw_data(rows)
|
||||
c['rows'] = rows
|
||||
return render_to_response('host_status.html', c)
|
||||
@ -295,7 +286,7 @@ def search(request, deployment_id):
|
||||
if deployment_id:
|
||||
row = rows.filter(deployment=deployment_id)
|
||||
rows = rows.filter(**{column:value}). \
|
||||
order_by('-when', '-microseconds')[:22]
|
||||
order_by('-when')[:22]
|
||||
_post_process_raw_data(rows)
|
||||
c['rows'] = rows
|
||||
c['allow_expansion'] = True
|
||||
|
@ -65,8 +65,8 @@
|
||||
|
||||
<td class='cell-border'>
|
||||
<a href='#' onclick='details({{deployment_id}}, "when", {{row.id}});'>
|
||||
{% if show_absolute_time %}{{row.when}} (+{{row.when.microsecond}})
|
||||
{%else%}{{row.when|timesince:utc}} ago{%endif%}</a>
|
||||
{% if show_absolute_time %}{{row.fwhen}}
|
||||
{%else%}{{row.fwhen|timesince:utc}} ago{%endif%}</a>
|
||||
</td>
|
||||
</tr>
|
||||
{% if allow_expansion %}
|
||||
|
@ -25,6 +25,7 @@ import logging
|
||||
import time
|
||||
|
||||
from stacktach import models, views
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -47,9 +48,10 @@ nova_queues = [
|
||||
|
||||
|
||||
class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
def __init__(self, connection, deployment):
|
||||
def __init__(self, name, connection, deployment):
|
||||
self.connection = connection
|
||||
self.deployment = deployment
|
||||
self.name = name
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
return [Consumer(queues=nova_queues, callbacks=[self.on_nova])]
|
||||
@ -62,8 +64,14 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
args = (routing_key, json.loads(message.body))
|
||||
asJson = json.dumps(args)
|
||||
|
||||
views.process_raw_data(self.deployment, args, asJson)
|
||||
self.logger.debug("Recorded %s ", routing_key)
|
||||
raw = views.process_raw_data(self.deployment, args, asJson)
|
||||
if not raw:
|
||||
LOG.debug("No record from %s", routing_key)
|
||||
else:
|
||||
LOG.debug("Recorded rec# %d from %s/%s at %s (%.6f)" %
|
||||
(raw.id, self.name, routing_key,
|
||||
str(dt.dt_from_decimal(raw.when)),
|
||||
float(raw.when)))
|
||||
|
||||
def on_nova(self, body, message):
|
||||
self._process(body, message)
|
||||
@ -92,7 +100,7 @@ def run(deployment_config):
|
||||
while True:
|
||||
with kombu.connection.BrokerConnection(**params) as conn:
|
||||
try:
|
||||
consumer = NovaConsumer(conn, deployment)
|
||||
consumer = NovaConsumer(name, conn, deployment)
|
||||
consumer.run()
|
||||
except Exception as e:
|
||||
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %
|
||||
|
Loading…
x
Reference in New Issue
Block a user