better logging, stem the worker memory leaks and fix up the start script
This commit is contained in:
parent
9d810d4436
commit
fa4448174f
@ -1,7 +1,8 @@
|
||||
# Copyright 2012 - Dark Secret Software Inc.
|
||||
|
||||
from django.shortcuts import render_to_response
|
||||
from django import db
|
||||
from django import http
|
||||
from django.shortcuts import render_to_response
|
||||
from django import template
|
||||
|
||||
from stacktach import models
|
||||
@ -30,7 +31,6 @@ def _monitor_message(routing_key, body):
|
||||
host = ".".join(parts[1:])
|
||||
else:
|
||||
host = None
|
||||
#logging.error("publisher=%s, host=%s" % (publisher, host))
|
||||
payload = body['payload']
|
||||
request_spec = payload.get('request_spec', None)
|
||||
|
||||
@ -87,7 +87,8 @@ def aggregate(raw):
|
||||
# While we hope only one lifecycle ever exists it's quite
|
||||
# likely we get multiple due to the workers and threads.
|
||||
lifecycle = None
|
||||
lifecycles = models.Lifecycle.objects.filter(instance=raw.instance)
|
||||
lifecycles = models.Lifecycle.objects.select_related().\
|
||||
filter(instance=raw.instance)
|
||||
if len(lifecycles) > 0:
|
||||
lifecycle = lifecycles[0]
|
||||
if not lifecycle:
|
||||
@ -115,7 +116,8 @@ def aggregate(raw):
|
||||
# *shouldn't* happen).
|
||||
start = step == 'start'
|
||||
timing = None
|
||||
timings = models.Timing.objects.filter(name=name, lifecycle=lifecycle)
|
||||
timings = models.Timing.objects.select_related().\
|
||||
filter(name=name, lifecycle=lifecycle)
|
||||
if not start:
|
||||
for t in timings:
|
||||
try:
|
||||
@ -154,18 +156,21 @@ def aggregate(raw):
|
||||
|
||||
def process_raw_data(deployment, args, json_args):
|
||||
"""This is called directly by the worker to add the event to the db."""
|
||||
db.reset_queries()
|
||||
|
||||
routing_key, body = args
|
||||
record = None
|
||||
handler = HANDLERS.get(routing_key, None)
|
||||
if handler:
|
||||
values = handler(routing_key, body)
|
||||
if not values:
|
||||
return {}
|
||||
return record
|
||||
|
||||
values['deployment'] = deployment
|
||||
try:
|
||||
when = body['timestamp']
|
||||
except KeyError:
|
||||
when = body['_context_timestamp'] # Old way of doing it
|
||||
when = body['_context_timestamp'] # Old way of doing it
|
||||
try:
|
||||
try:
|
||||
when = datetime.datetime.strptime(when, "%Y-%m-%d %H:%M:%S.%f")
|
||||
@ -181,8 +186,7 @@ def process_raw_data(deployment, args, json_args):
|
||||
record.save()
|
||||
|
||||
aggregate(record)
|
||||
return record
|
||||
return None
|
||||
return record
|
||||
|
||||
|
||||
def _post_process_raw_data(rows, highlight=None):
|
||||
|
@ -1,16 +1,15 @@
|
||||
#!/bin/bash
|
||||
|
||||
WORKDIR=/srv/www/stacktach/django/stproject
|
||||
WORKDIR=/srv/www/stacktach/django/stproject/
|
||||
DAEMON=/usr/bin/python
|
||||
ARGS=$WORKDIR/start_workers.py
|
||||
ARGS=$WORKDIR/worker/start_workers.py
|
||||
PIDFILE=/var/run/stacktach.pid
|
||||
|
||||
export DJANGO_SETTINGS_MODULE=settings
|
||||
|
||||
case "$1" in
|
||||
start)
|
||||
echo "Starting server"
|
||||
cd $WORKDIR
|
||||
source etc/stacktach_config.sh
|
||||
/sbin/start-stop-daemon --start --pidfile $PIDFILE --make-pidfile -b --exec $DAEMON $ARGS
|
||||
;;
|
||||
stop)
|
||||
|
@ -16,6 +16,7 @@
|
||||
# This is the worker you run in your OpenStack environment. You need
|
||||
# to set TENANT_ID and URL to point to your StackTach web server.
|
||||
|
||||
import datetime
|
||||
import json
|
||||
import kombu
|
||||
import kombu.connection
|
||||
@ -24,6 +25,8 @@ import kombu.mixins
|
||||
import logging
|
||||
import time
|
||||
|
||||
from pympler.process import ProcessMemoryInfo
|
||||
|
||||
from stacktach import models, views
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
|
||||
@ -40,12 +43,15 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
self.connection = connection
|
||||
self.deployment = deployment
|
||||
self.name = name
|
||||
self.last_time = None
|
||||
self.pmi = None
|
||||
self.processed = 0
|
||||
self.total_processed = 0
|
||||
|
||||
def get_consumers(self, Consumer, channel):
|
||||
durable = self.deployment_config.get('durable_queue', True)
|
||||
nova_exchange = kombu.entity.Exchange("nova", type="topic",
|
||||
exclusive=False, durable=durable, auto_delete=False)
|
||||
|
||||
exclusive=False, durable=durable, auto_delete=False)
|
||||
|
||||
nova_queues = [
|
||||
kombu.Queue("monitor.info", nova_exchange, durable=durable,
|
||||
@ -63,20 +69,49 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
payload = (routing_key, body)
|
||||
jvalues = json.dumps(payload)
|
||||
|
||||
args = (routing_key, json.loads(message.body))
|
||||
body = str(message.body)
|
||||
args = (routing_key, json.loads(body))
|
||||
asJson = json.dumps(args)
|
||||
|
||||
raw = views.process_raw_data(self.deployment, args, asJson)
|
||||
if not raw:
|
||||
LOG.debug("No record from %s", routing_key)
|
||||
else:
|
||||
LOG.debug("Recorded rec# %d from %s/%s at %s (%.6f)" %
|
||||
(raw.id, self.name, routing_key,
|
||||
str(dt.dt_from_decimal(raw.when)),
|
||||
float(raw.when)))
|
||||
if raw:
|
||||
self.processed += 1
|
||||
|
||||
self._check_memory()
|
||||
|
||||
def _check_memory(self):
|
||||
if not self.pmi:
|
||||
self.pmi = ProcessMemoryInfo()
|
||||
self.last_vsz = self.pmi.vsz
|
||||
self.initial_vsz = self.pmi.vsz
|
||||
|
||||
utc = datetime.datetime.utcnow()
|
||||
check = self.last_time is None
|
||||
if self.last_time:
|
||||
diff = utc - self.last_time
|
||||
if diff.seconds > 30:
|
||||
check = True
|
||||
if check:
|
||||
self.last_time = utc
|
||||
self.pmi.update()
|
||||
diff = (self.pmi.vsz - self.last_vsz) / 1000
|
||||
idiff = (self.pmi.vsz - self.initial_vsz) / 1000
|
||||
self.total_processed += self.processed
|
||||
per_message = 0
|
||||
if self.total_processed:
|
||||
per_message = idiff / self.total_processed
|
||||
LOG.debug("%20s %6dk/%6dk ram, "
|
||||
"%3d/%4d msgs @ %6dk/msg" %
|
||||
(self.name, diff, idiff, self.processed,
|
||||
self.total_processed, per_message))
|
||||
self.last_vsz = self.pmi.vsz
|
||||
self.processed = 0
|
||||
|
||||
def on_nova(self, body, message):
|
||||
self._process(body, message)
|
||||
try:
|
||||
self._process(body, message)
|
||||
except Exception, e:
|
||||
LOG.exception("Problem %s" % e)
|
||||
message.ack()
|
||||
|
||||
|
||||
@ -97,9 +132,11 @@ def run(deployment_config):
|
||||
port=port,
|
||||
userid=user_id,
|
||||
password=password,
|
||||
transport="librabbitmq",
|
||||
virtual_host=virtual_host)
|
||||
|
||||
while True:
|
||||
LOG.debug("Processing on '%s'" % name)
|
||||
with kombu.connection.BrokerConnection(**params) as conn:
|
||||
try:
|
||||
consumer = NovaConsumer(name, conn, deployment)
|
||||
@ -108,4 +145,4 @@ def run(deployment_config):
|
||||
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %
|
||||
(name, e))
|
||||
time.sleep(5)
|
||||
|
||||
LOG.debug("Completed processing on '%s'" % name)
|
||||
|
Loading…
x
Reference in New Issue
Block a user