Merge branch 'kpi' of git://github.com/rackspace/stacktach
This commit is contained in:
commit
28d3a74f58
@ -57,6 +57,12 @@ class RawData(models.Model):
|
|||||||
|
|
||||||
|
|
||||||
class Lifecycle(models.Model):
|
class Lifecycle(models.Model):
|
||||||
|
"""The Lifecycle table is the Master for a group of
|
||||||
|
Timing detail records. There is one Lifecycle row for
|
||||||
|
each instance seen in the event stream. The Timings
|
||||||
|
relate to the execution time for each .start/.end event
|
||||||
|
pair for this instance. These pairs are over the entire
|
||||||
|
lifespan of the instance, even across multiple api requests."""
|
||||||
instance = models.CharField(max_length=50, null=True,
|
instance = models.CharField(max_length=50, null=True,
|
||||||
blank=True, db_index=True)
|
blank=True, db_index=True)
|
||||||
last_state = models.CharField(max_length=50, null=True,
|
last_state = models.CharField(max_length=50, null=True,
|
||||||
@ -67,6 +73,9 @@ class Lifecycle(models.Model):
|
|||||||
|
|
||||||
|
|
||||||
class Timing(models.Model):
|
class Timing(models.Model):
|
||||||
|
"""Each Timing record corresponds to a .start/.end event pair
|
||||||
|
for an instance. It tracks how long it took this operation
|
||||||
|
to execute."""
|
||||||
name = models.CharField(max_length=50, db_index=True)
|
name = models.CharField(max_length=50, db_index=True)
|
||||||
lifecycle = models.ForeignKey(Lifecycle)
|
lifecycle = models.ForeignKey(Lifecycle)
|
||||||
start_raw = models.ForeignKey(RawData, related_name='+', null=True)
|
start_raw = models.ForeignKey(RawData, related_name='+', null=True)
|
||||||
@ -76,3 +85,17 @@ class Timing(models.Model):
|
|||||||
end_when = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
end_when = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
||||||
|
|
||||||
diff = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
diff = models.DecimalField(null=True, max_digits=20, decimal_places=6)
|
||||||
|
|
||||||
|
|
||||||
|
class RequestTracker(models.Model):
|
||||||
|
"""The RequestTracker table tracks the elapsed time of a user
|
||||||
|
request from the time it hits the API node to the time of the
|
||||||
|
final .end event (with the same Request ID)."""
|
||||||
|
request_id = models.CharField(max_length=50, db_index=True)
|
||||||
|
lifecycle = models.ForeignKey(Lifecycle)
|
||||||
|
last_timing = models.ForeignKey(Timing, null=true)
|
||||||
|
start = models.DecimalField(max_digits=20, decimal_places=6)
|
||||||
|
duration = models.DecimalField(max_digits=20, decimal_places=6)
|
||||||
|
|
||||||
|
# Not used ... but soon hopefully.
|
||||||
|
completed = models.BooleanField(default=False)
|
||||||
|
@ -280,58 +280,18 @@ def do_watch(request, deployment_id):
|
|||||||
|
|
||||||
|
|
||||||
def do_kpi(request):
|
def do_kpi(request):
|
||||||
yesterday = datetime.datetime.now() - datetime.timedelta(days=1)
|
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
|
||||||
|
trackers = models.RequestTracker.objects.select_related() \
|
||||||
events = models.RawData.objects.exclude(instance=None) \
|
.exclude(last_timing=None) \
|
||||||
.exclude(when__lt=yesterday) \
|
.exclude(start__lt=yesterday) \
|
||||||
.filter(Q(event__endswith='.end') |
|
.order_by('duration')
|
||||||
Q(event="compute.instance.update")) \
|
|
||||||
.only('event', 'host', 'request_id',
|
|
||||||
'instance', 'deployment') \
|
|
||||||
.order_by('when')
|
|
||||||
|
|
||||||
events = list(events)
|
|
||||||
instance_map = {} # { uuid: [(request_id, start_event, end_event), ...] }
|
|
||||||
|
|
||||||
for e in events:
|
|
||||||
if e.event == "compute.instance.update":
|
|
||||||
if "api" in e.host:
|
|
||||||
activities = instance_map.get(e.instance, [])
|
|
||||||
activities.append((e.request_id, e, None))
|
|
||||||
instance_map[e.instance] = activities
|
|
||||||
continue
|
|
||||||
|
|
||||||
if not e.event.endswith(".end"):
|
|
||||||
continue
|
|
||||||
|
|
||||||
activities = instance_map.get(e.instance)
|
|
||||||
if not activities:
|
|
||||||
# We missed the api start, skip it
|
|
||||||
continue
|
|
||||||
|
|
||||||
found = False
|
|
||||||
for index, a in enumerate(activities):
|
|
||||||
request_id, start_event, end_event = a
|
|
||||||
#if end_event is not None:
|
|
||||||
# continue
|
|
||||||
|
|
||||||
if request_id == e.request_id:
|
|
||||||
end_event = e
|
|
||||||
activities[index] = (request_id, start_event, e)
|
|
||||||
found = True
|
|
||||||
break
|
|
||||||
|
|
||||||
results = []
|
results = []
|
||||||
results.append(["Event", "Time", "UUID", "Deployment"])
|
results.append(["Event", "Time", "UUID", "Deployment"])
|
||||||
for uuid, activities in instance_map.iteritems():
|
for track in trackers:
|
||||||
for request_id, start_event, end_event in activities:
|
end_event = track.last_timing.end_raw
|
||||||
if not end_event:
|
|
||||||
continue
|
|
||||||
event = end_event.event[:-len(".end")]
|
event = end_event.event[:-len(".end")]
|
||||||
start = dt.dt_from_decimal(start_event.when)
|
uuid = track.lifecycle.instance
|
||||||
end = dt.dt_from_decimal(end_event.when)
|
results.append([event, sec_to_time(track.duration),
|
||||||
diff = end - start
|
uuid, end_event.deployment.name])
|
||||||
results.append([event, sec_to_time(seconds_from_timedelta(
|
|
||||||
diff.days, diff.seconds, diff.microseconds)), uuid,
|
|
||||||
end_event.deployment.name])
|
|
||||||
return rsp(results)
|
return rsp(results)
|
||||||
|
@ -74,11 +74,52 @@ HANDLERS = {'monitor.info':_monitor_message,
|
|||||||
'':_compute_update_message}
|
'':_compute_update_message}
|
||||||
|
|
||||||
|
|
||||||
|
def start_kpi_tracking(lifecycle, raw):
|
||||||
|
"""Start the clock for kpi timings when we see an instance.update
|
||||||
|
coming in from an api node."""
|
||||||
|
if raw.event != "compute.instance.update":
|
||||||
|
return
|
||||||
|
|
||||||
|
if "api" not in raw.host:
|
||||||
|
return
|
||||||
|
|
||||||
|
tracker = models.RequestTracker(request_id=raw.request_id,
|
||||||
|
start=raw.when,
|
||||||
|
lifecycle=lifecycle,
|
||||||
|
last_timing=None,
|
||||||
|
duration=0.0)
|
||||||
|
tracker.save()
|
||||||
|
|
||||||
|
|
||||||
|
def update_kpi(lifecycle, timing, raw):
|
||||||
|
"""Whenever we get a .end event, use the Timing object to
|
||||||
|
compute our current end-to-end duration.
|
||||||
|
|
||||||
|
Note: it may not be completely accurate if the operation is
|
||||||
|
still in-process, but we have no way of knowing it's still
|
||||||
|
in-process without mapping the original command with the
|
||||||
|
expected .end event (that's a whole other thing)
|
||||||
|
|
||||||
|
Until then, we'll take the lazy route and be aware of these
|
||||||
|
potential fence-post issues."""
|
||||||
|
trackers = models.RequestTracker.objects.filter(request_id=raw.request.id)
|
||||||
|
if len(trackers) == 0:
|
||||||
|
return
|
||||||
|
|
||||||
|
tracker = trackers[0]
|
||||||
|
tracker.last_timing = timing
|
||||||
|
tracker.duration = timing.end_when - tracker.start
|
||||||
|
tracker.save()
|
||||||
|
|
||||||
|
|
||||||
def aggregate(raw):
|
def aggregate(raw):
|
||||||
"""Roll up the raw event into a Lifecycle object
|
"""Roll up the raw event into a Lifecycle object
|
||||||
and a bunch of Timing objects.
|
and a bunch of Timing objects.
|
||||||
|
|
||||||
We can use this for summarized timing reports.
|
We can use this for summarized timing reports.
|
||||||
|
|
||||||
|
Additionally, we can use this processing to give
|
||||||
|
us end-to-end user request timings for kpi reports.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
if not raw.instance:
|
if not raw.instance:
|
||||||
@ -104,6 +145,8 @@ def aggregate(raw):
|
|||||||
name = '.'.join(parts[:-1])
|
name = '.'.join(parts[:-1])
|
||||||
|
|
||||||
if not step in ['start', 'end']:
|
if not step in ['start', 'end']:
|
||||||
|
# Perhaps it's an operation initiated in the API?
|
||||||
|
start_kpi_tracking(lifecyle, raw)
|
||||||
return
|
return
|
||||||
|
|
||||||
# We are going to try to track every event pair that comes
|
# We are going to try to track every event pair that comes
|
||||||
@ -151,9 +194,10 @@ def aggregate(raw):
|
|||||||
# We could have missed start so watch out ...
|
# We could have missed start so watch out ...
|
||||||
if timing.start_when:
|
if timing.start_when:
|
||||||
timing.diff = timing.end_when - timing.start_when
|
timing.diff = timing.end_when - timing.start_when
|
||||||
|
# Looks like a valid pair ...
|
||||||
|
update_kpi(lifecycle, timing, raw)
|
||||||
timing.save()
|
timing.save()
|
||||||
|
|
||||||
|
|
||||||
def process_raw_data(deployment, args, json_args):
|
def process_raw_data(deployment, args, json_args):
|
||||||
"""This is called directly by the worker to add the event to the db."""
|
"""This is called directly by the worker to add the event to the db."""
|
||||||
db.reset_queries()
|
db.reset_queries()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user