Merge pull request #109 from rackerlabs/master
Promoting Master to Stable
This commit is contained in:
commit
6565793f91
@ -6,7 +6,8 @@
|
||||
"rabbit_port": 5672,
|
||||
"rabbit_userid": "rabbit",
|
||||
"rabbit_password": "rabbit",
|
||||
"rabbit_virtual_host": "/"
|
||||
"rabbit_virtual_host": "/",
|
||||
"exit_on_exception": true
|
||||
},
|
||||
{
|
||||
"name": "east_coast.prod.cell1",
|
||||
@ -15,6 +16,7 @@
|
||||
"rabbit_port": 5672,
|
||||
"rabbit_userid": "rabbit",
|
||||
"rabbit_password": "rabbit",
|
||||
"rabbit_virtual_host": "/"
|
||||
"rabbit_virtual_host": "/",
|
||||
"exit_on_exception": false
|
||||
}]
|
||||
}
|
||||
|
1
migrations/012_shrink_fail_reason.sql
Normal file
1
migrations/012_shrink_fail_reason.sql
Normal file
@ -0,0 +1 @@
|
||||
ALTER TABLE stacktach_instanceexists MODIFY `fail_reason` varchar(300);
|
@ -3,6 +3,7 @@ import json
|
||||
import sys
|
||||
import time
|
||||
import os
|
||||
import re
|
||||
|
||||
sys.path.append(os.environ.get('STACKTACH_INSTALL_DIR', '/stacktach'))
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
@ -13,215 +14,315 @@ from stacktach import models
|
||||
if __name__ != '__main__':
|
||||
sys.exit(1)
|
||||
|
||||
yesterday = datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
|
||||
if len(sys.argv) == 2:
|
||||
try:
|
||||
t = time.strptime(sys.argv[1], "%Y-%m-%d")
|
||||
yesterday = datetime.datetime(*t[:6])
|
||||
except Exception, e:
|
||||
print e
|
||||
print "Usage: python requests.py YYYY-MM-DD (the end date)"
|
||||
sys.exit(1)
|
||||
|
||||
hours = 0
|
||||
length = 24
|
||||
# To mask unique identifiers for categorizing notifications
|
||||
def mask_msg(text):
|
||||
masking_regex = (
|
||||
(1, 'REQ_ID',
|
||||
r"req-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
|
||||
),
|
||||
(2, 'UUID',
|
||||
r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}"
|
||||
),
|
||||
(3, 'HOST_ADDRESS',
|
||||
r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b"
|
||||
),
|
||||
(4, 'LG_NUM',
|
||||
r"\b\d{3}\d+\b"
|
||||
)
|
||||
)
|
||||
masked = str(text)
|
||||
for config in masking_regex:
|
||||
masked = re.sub(config[2], "$%s" % str(config[1]), masked)
|
||||
return masked
|
||||
|
||||
start = datetime.datetime(year=yesterday.year, month=yesterday.month,
|
||||
day=yesterday.day)
|
||||
end = start + datetime.timedelta(hours=length-1, minutes=59, seconds=59)
|
||||
|
||||
instance_map = {} # { uuid : [request_id, request_id, ...] }
|
||||
metadata = {'report_format': 'json', 'instances': instance_map}
|
||||
report = [metadata] # Tell Stacky to format as JSON
|
||||
# Assemble message from exception object
|
||||
def build_exc_msg(exc=None, separator=", "):
|
||||
|
||||
dstart = dt.dt_to_decimal(start)
|
||||
dend = dt.dt_to_decimal(end)
|
||||
"""
|
||||
White-list exception components we're aware of, and leave a catch all;
|
||||
because of freeform exception objects from notifications.
|
||||
"""
|
||||
|
||||
codes = {}
|
||||
if exc is None:
|
||||
return exc
|
||||
|
||||
deployments = {}
|
||||
for deploy in models.Deployment.objects.all():
|
||||
deployments[deploy.id] = deploy.name
|
||||
message = []
|
||||
if exc.get('kwargs', False):
|
||||
kwargs = exc['kwargs']
|
||||
if kwargs.get('value', False):
|
||||
value = kwargs['value']
|
||||
trcbk_index = value.rfind("Traceback")
|
||||
if trcbk_index > 0:
|
||||
value = str(value[:trcbk_index]) + "$TRACEBACK"
|
||||
message.append("value: %s" % value)
|
||||
|
||||
# Get all the instances that have changed in the last N hours ...
|
||||
updates = models.RawData.objects.filter(event='compute.instance.update',
|
||||
when__gt=dstart, when__lte=dend)\
|
||||
.values('instance').distinct()
|
||||
# kwargs: generic message components that don't require more filter
|
||||
misc_list = ['reason', 'method', 'topic', 'exc_type',
|
||||
'actual', 'code']
|
||||
for key in misc_list:
|
||||
if kwargs.get(key, False):
|
||||
message.append("%s: %s" % (key, kwargs[key]))
|
||||
# END generic message components in kwargs
|
||||
|
||||
expiry = 60 * 60 # 1 hour
|
||||
cmds = ['create', 'rebuild', 'rescue', 'resize', 'snapshot']
|
||||
if kwargs.get('expected', False):
|
||||
message.append("expected: %s" % kwargs['expected'][0])
|
||||
|
||||
failures = {}
|
||||
causes = {}
|
||||
durations = {}
|
||||
error_messages = {}
|
||||
successes = {}
|
||||
tenant_issues = {}
|
||||
if exc.get('details', False):
|
||||
details = exc['details']
|
||||
if type(details) is list:
|
||||
for item in details:
|
||||
message.append(str(item))
|
||||
elif type(details) is dict:
|
||||
for k, v in details.iteritems():
|
||||
message.append("%s: %s" % (k, v))
|
||||
elif type(details) is str:
|
||||
message.append(details)
|
||||
|
||||
for uuid_dict in updates:
|
||||
uuid = uuid_dict['instance']
|
||||
# exc: generic messages that don't require more filter
|
||||
misc_list = ['message', 'cmd', 'stderr', 'exit_code',
|
||||
'code', 'description']
|
||||
for key in misc_list:
|
||||
if exc.get(key, False):
|
||||
message.append("%s: %s" % (key, exc[key]))
|
||||
|
||||
# All the unique Request ID's for this instance during that timespan.
|
||||
reqs = models.RawData.objects.filter(instance=uuid,
|
||||
when__gt=dstart, when__lte=dend)\
|
||||
.values('request_id').distinct()
|
||||
if exc.get('stdout', False):
|
||||
if exc['stdout'] != "":
|
||||
message.append("stdout: %s" % exc['stdout'])
|
||||
#END generic message components in exc
|
||||
|
||||
req_list = []
|
||||
for req_dict in reqs:
|
||||
req = req_dict['request_id']
|
||||
if len(message) == 0:
|
||||
for k, v in exc.iteritems():
|
||||
message.append("%s: %s" % (k, v))
|
||||
return separator.join(message)
|
||||
|
||||
raws = list(models.RawData.objects.filter(request_id=req)
|
||||
.exclude(event='compute.instance.exists')
|
||||
.values("id", "when", "routing_key", "old_state",
|
||||
"state", "tenant", "event", "image_type",
|
||||
"deployment")
|
||||
.order_by('when'))
|
||||
if __name__ == '__main__':
|
||||
|
||||
_start = None
|
||||
err_id = None
|
||||
failure_type = None
|
||||
# Start report
|
||||
yesterday = datetime.datetime.utcnow().date() - datetime.timedelta(days=1)
|
||||
if len(sys.argv) == 2:
|
||||
try:
|
||||
t = time.strptime(sys.argv[1], "%Y-%m-%d")
|
||||
yesterday = datetime.datetime(*t[:6])
|
||||
except Exception, e:
|
||||
print e
|
||||
print "Usage: python error_details.py YYYY-MM-DD (the end date)"
|
||||
sys.exit(1)
|
||||
|
||||
operation = "n/a"
|
||||
platform = 0
|
||||
tenant = 0
|
||||
cell = "n/a"
|
||||
image_type_num = 0
|
||||
hours = 0
|
||||
length = 24
|
||||
|
||||
_when = None
|
||||
start = datetime.datetime(year=yesterday.year, month=yesterday.month,
|
||||
day=yesterday.day)
|
||||
end = start + datetime.timedelta(hours=length-1, minutes=59, seconds=59)
|
||||
|
||||
for raw in raws:
|
||||
_when = raw['when']
|
||||
_routing_key = raw['routing_key']
|
||||
_old_state = raw['old_state']
|
||||
_state = raw['state']
|
||||
_tenant = raw['tenant']
|
||||
_event = raw['event']
|
||||
_image_type = raw['image_type']
|
||||
_name = raw['deployment']
|
||||
_id = raw['id']
|
||||
instance_map = {} # { uuid : [request_id, request_id, ...] }
|
||||
exception_counts = {} # { exception_message : count }
|
||||
event_counts = {} # { event_name : count }
|
||||
metadata = {'report_format': 'json',
|
||||
'instances': instance_map,
|
||||
'exception_counts': exception_counts,
|
||||
'event_counts': event_counts
|
||||
}
|
||||
|
||||
# Tell Stacky to format as JSON and set placeholders for various summaries
|
||||
report = [metadata]
|
||||
|
||||
dstart = dt.dt_to_decimal(start)
|
||||
dend = dt.dt_to_decimal(end)
|
||||
|
||||
codes = {}
|
||||
deployments = {}
|
||||
for deploy in models.Deployment.objects.all():
|
||||
deployments[deploy.id] = deploy.name
|
||||
|
||||
# Get all the instances that have changed in the last N hours ...
|
||||
updates = models.RawData.objects.filter(event='compute.instance.update',
|
||||
when__gt=dstart, when__lte=dend)\
|
||||
.values('instance').distinct()
|
||||
|
||||
expiry = 60 * 60 # 1 hour
|
||||
cmds = ['create', 'rebuild', 'rescue', 'resize', 'snapshot']
|
||||
|
||||
failures = {}
|
||||
causes = {}
|
||||
durations = {}
|
||||
successes = {}
|
||||
tenant_issues = {}
|
||||
|
||||
for uuid_dict in updates:
|
||||
uuid = uuid_dict['instance']
|
||||
|
||||
# All the unique Request ID's for this instance during that timespan.
|
||||
reqs = models.RawData.objects.filter(instance=uuid,
|
||||
when__gt=dstart, when__lte=dend)\
|
||||
.values('request_id').distinct()
|
||||
|
||||
req_list = []
|
||||
for req_dict in reqs:
|
||||
req = req_dict['request_id']
|
||||
|
||||
raws = list(models.RawData.objects.filter(request_id=req)
|
||||
.exclude(event='compute.instance.exists')
|
||||
.values("id", "when", "routing_key", "old_state",
|
||||
"state", "tenant", "event", "image_type",
|
||||
"deployment")
|
||||
.order_by('when'))
|
||||
|
||||
_start = None
|
||||
_when = None
|
||||
|
||||
err_id = None
|
||||
failure_type = None
|
||||
operation = "n/a"
|
||||
platform = 0
|
||||
tenant = 0
|
||||
cell = "n/a"
|
||||
image_type_num = 0
|
||||
|
||||
for raw in raws:
|
||||
_when = raw['when']
|
||||
_routing_key = raw['routing_key']
|
||||
_old_state = raw['old_state']
|
||||
_state = raw['state']
|
||||
_tenant = raw['tenant']
|
||||
_event = raw['event']
|
||||
_image_type = raw['image_type']
|
||||
_name = raw['deployment']
|
||||
_id = raw['id']
|
||||
|
||||
if not _start:
|
||||
_start = _when
|
||||
|
||||
if 'error' in _routing_key:
|
||||
err_id = _id
|
||||
failure_type = 'http'
|
||||
|
||||
if failure_type != 'state' and _old_state != 'error' and\
|
||||
_state == 'error':
|
||||
failure_type = 'state'
|
||||
err_id = _id
|
||||
|
||||
if _old_state == 'error' and \
|
||||
(not _state in ['deleted', 'error']):
|
||||
failure_type = None
|
||||
err_id = None
|
||||
|
||||
if _tenant:
|
||||
tenant = _tenant
|
||||
|
||||
for cmd in cmds:
|
||||
if cmd in _event:
|
||||
operation = cmd
|
||||
cell = deployments.get(_name, "n/a")
|
||||
break
|
||||
|
||||
if _image_type:
|
||||
image_type_num |= _image_type
|
||||
|
||||
if not _start:
|
||||
_start = _when
|
||||
continue
|
||||
|
||||
if 'error' in _routing_key:
|
||||
err_id = _id
|
||||
failure_type = 'http'
|
||||
image = "?"
|
||||
if image_type.isset(image_type_num, image_type.BASE_IMAGE):
|
||||
image = "base"
|
||||
if image_type.isset(image_type_num, image_type.SNAPSHOT_IMAGE):
|
||||
image = "snap"
|
||||
|
||||
if _old_state != 'error' and _state == 'error':
|
||||
failure_type = 'state'
|
||||
err_id = _id
|
||||
_end = _when
|
||||
diff = _end - _start
|
||||
|
||||
if _old_state == 'error' and \
|
||||
(not _state in ['deleted', 'error']):
|
||||
failure_type = None
|
||||
err_id = None
|
||||
if diff > 3600 and failure_type is None:
|
||||
failure_type = ">60"
|
||||
|
||||
if _tenant:
|
||||
tenant = _tenant
|
||||
key = (operation, image_type_num, cell)
|
||||
|
||||
for cmd in cmds:
|
||||
if cmd in _event:
|
||||
operation = cmd
|
||||
cell = deployments.get(_name, "n/a")
|
||||
break
|
||||
# Track durations for all attempts, good and bad ...
|
||||
duration_min, duration_max, duration_count, duration_total = \
|
||||
durations.get(key, (9999999, 0, 0, 0))
|
||||
duration_min = min(duration_min, diff)
|
||||
duration_max = max(duration_max, diff)
|
||||
duration_count += 1
|
||||
duration_total += diff
|
||||
durations[key] = (duration_min, duration_max, duration_count,
|
||||
duration_total)
|
||||
|
||||
if _image_type:
|
||||
image_type_num |= _image_type
|
||||
if not failure_type:
|
||||
successes[key] = successes.get(key, 0) + 1
|
||||
else:
|
||||
failed_request = {}
|
||||
message = [] # For exception message masking
|
||||
req_list.append(req)
|
||||
instance_map[uuid] = req_list
|
||||
failed_request['req'] = req
|
||||
failed_request['duration'] = "%.2f minutes" % (diff/60)
|
||||
failed_request['operation'] = operation
|
||||
failed_request['platform'] = image_type.readable(image_type_num)
|
||||
failures[key] = failures.get(key, 0) + 1
|
||||
tenant_issues[tenant] = tenant_issues.get(tenant, 0) + 1
|
||||
|
||||
if not _start:
|
||||
continue
|
||||
if err_id:
|
||||
err = models.RawData.objects.get(id=err_id)
|
||||
queue, body = json.loads(err.json)
|
||||
payload = body['payload']
|
||||
|
||||
image = "?"
|
||||
if image_type.isset(image_type_num, image_type.BASE_IMAGE):
|
||||
image = "base"
|
||||
if image_type.isset(image_type_num, image_type.SNAPSHOT_IMAGE):
|
||||
image = "snap"
|
||||
# Add error information to failed request report
|
||||
failed_request['event_id'] = err.id
|
||||
failed_request['tenant'] = err.tenant
|
||||
failed_request['service'] = err.service
|
||||
failed_request['host'] = err.host
|
||||
failed_request['deployment'] = err.deployment.name
|
||||
failed_request['event'] = err.event
|
||||
failed_request['when'] = str(dt.dt_from_decimal(err.when))
|
||||
|
||||
_end = _when
|
||||
diff = _end - _start
|
||||
# Track failed event counts
|
||||
event_counts[err.event] = event_counts.get(err.event, 0) + 1
|
||||
|
||||
if diff > 3600 and failure_type is None:
|
||||
failure_type = ">60"
|
||||
exc = payload.get('exception')
|
||||
if exc:
|
||||
# group the messages ...
|
||||
failed_request['exception'] = exc
|
||||
|
||||
key = (operation, image_type_num, cell)
|
||||
# assemble message from exception and generalize
|
||||
message_str = mask_msg(build_exc_msg(exc))
|
||||
# count exception messages
|
||||
exception_counts[message_str] = exception_counts.get(
|
||||
message_str, 0) + 1
|
||||
|
||||
# Track durations for all attempts, good and bad ...
|
||||
duration_min, duration_max, duration_count, duration_total = \
|
||||
durations.get(key, (9999999, 0, 0, 0))
|
||||
duration_min = min(duration_min, diff)
|
||||
duration_max = max(duration_max, diff)
|
||||
duration_count += 1
|
||||
duration_total += diff
|
||||
durations[key] = (duration_min, duration_max, duration_count,
|
||||
duration_total)
|
||||
# extract the code, if any ...
|
||||
code = exc.get('kwargs', {}).get('code')
|
||||
if code:
|
||||
codes[code] = codes.get(code, 0) + 1
|
||||
failure_type = code
|
||||
failed_request['failure_type'] = failure_type
|
||||
raws = models.RawData.objects.filter(request_id=req)\
|
||||
.exclude(event='compute.instance.exists')\
|
||||
.order_by('when')
|
||||
|
||||
if not failure_type:
|
||||
successes[key] = successes.get(key, 0) + 1
|
||||
else:
|
||||
failed_request = {}
|
||||
req_list.append(req)
|
||||
instance_map[uuid] = req_list
|
||||
failed_request['req'] = req
|
||||
failed_request['duration'] = "%.2f minutes" % (diff/60)
|
||||
failed_request['operation'] = operation
|
||||
failed_request['platform'] = image_type.readable(image_type_num)
|
||||
failures[key] = failures.get(key, 0) + 1
|
||||
tenant_issues[tenant] = tenant_issues.get(tenant, 0) + 1
|
||||
failed_request['details'] = []
|
||||
|
||||
if err_id:
|
||||
err = models.RawData.objects.get(id=err_id)
|
||||
queue, body = json.loads(err.json)
|
||||
payload = body['payload']
|
||||
for raw in raws:
|
||||
failure_detail = {}
|
||||
failure_detail['host'] = raw.host
|
||||
failure_detail['event'] = raw.event
|
||||
failure_detail['old_state'] = raw.old_state
|
||||
failure_detail['state'] = raw.state
|
||||
failure_detail['old_task'] = raw.old_task
|
||||
failure_detail['task'] = raw.task
|
||||
failed_request['details'].append(failure_detail)
|
||||
|
||||
# Add error information to failed request report
|
||||
failed_request['event_id'] = err.id
|
||||
failed_request['tenant'] = err.tenant
|
||||
failed_request['service'] = err.service
|
||||
failed_request['host'] = err.host
|
||||
failed_request['deployment'] = err.deployment.name
|
||||
failed_request['event'] = err.event
|
||||
failed_request['when'] = str(dt.dt_from_decimal(err.when))
|
||||
report.append(failed_request)
|
||||
|
||||
exc = payload.get('exception')
|
||||
if exc:
|
||||
# group the messages ...
|
||||
failed_request['exception'] = exc
|
||||
cause_key = (key, failure_type)
|
||||
causes[cause_key] = causes.get(cause_key, 0) + 1
|
||||
|
||||
exc_str = str(exc)
|
||||
error_messages[exc_str] = \
|
||||
error_messages.get(exc_str, 0) + 1
|
||||
|
||||
# extract the code, if any ...
|
||||
code = exc.get('kwargs', {}).get('code')
|
||||
if code:
|
||||
codes[code] = codes.get(code, 0) + 1
|
||||
failure_type = code
|
||||
failed_request['failure_type'] = failure_type
|
||||
raws = models.RawData.objects.filter(request_id=req)\
|
||||
.exclude(event='compute.instance.exists')\
|
||||
.order_by('when')
|
||||
failed_request['details'] = []
|
||||
|
||||
for raw in raws:
|
||||
failure_detail = {}
|
||||
failure_detail['host'] = raw.host
|
||||
failure_detail['event'] = raw.event
|
||||
failure_detail['old_state'] = raw.old_state
|
||||
failure_detail['state'] = raw.state
|
||||
failure_detail['old_task'] = raw.old_task
|
||||
failure_detail['task'] = raw.task
|
||||
failed_request['details'].append(failure_detail)
|
||||
|
||||
report.append(failed_request)
|
||||
|
||||
cause_key = (key, failure_type)
|
||||
causes[cause_key] = causes.get(cause_key, 0) + 1
|
||||
|
||||
values = {'json': json.dumps(report),
|
||||
'created': dt.dt_to_decimal(datetime.datetime.utcnow()),
|
||||
'period_start': start,
|
||||
'period_end': end,
|
||||
'version': 1,
|
||||
'name': 'Error detail report'}
|
||||
report = models.JsonReport(**values)
|
||||
report.save()
|
||||
# Assign values to store in DB
|
||||
values = {'json': json.dumps(report),
|
||||
'created': dt.dt_to_decimal(datetime.datetime.utcnow()),
|
||||
'period_start': start,
|
||||
'period_end': end,
|
||||
'version': 1,
|
||||
'name': 'Error detail report'}
|
||||
json_report = models.JsonReport(**values)
|
||||
json_report.save()
|
||||
|
@ -1,4 +1,19 @@
|
||||
import models
|
||||
from stacktach import stacklog
|
||||
from stacktach import models
|
||||
|
||||
|
||||
def _safe_get(Model, **kwargs):
|
||||
object = None
|
||||
query = Model.objects.filter(**kwargs)
|
||||
count = query.count()
|
||||
if count > 1:
|
||||
stacklog.warn('Multiple records found for %s get.' % Model.__name__)
|
||||
object = query[0]
|
||||
elif count < 1:
|
||||
stacklog.warn('No records found for %s get.' % Model.__name__)
|
||||
else:
|
||||
object = query[0]
|
||||
return object
|
||||
|
||||
|
||||
def get_or_create_deployment(name):
|
||||
@ -41,13 +56,12 @@ def get_or_create_instance_usage(**kwargs):
|
||||
return models.InstanceUsage.objects.get_or_create(**kwargs)
|
||||
|
||||
|
||||
def get_or_create_instance_delete(**kwargs):
|
||||
return models.InstanceDeletes.objects.get_or_create(**kwargs)
|
||||
|
||||
|
||||
def get_instance_usage(**kwargs):
|
||||
usage = None
|
||||
try:
|
||||
usage = models.InstanceUsage.objects.get(**kwargs)
|
||||
except models.InstanceUsage.DoesNotExist:
|
||||
pass
|
||||
return usage
|
||||
return _safe_get(models.InstanceUsage, **kwargs)
|
||||
|
||||
|
||||
def create_instance_delete(**kwargs):
|
||||
@ -55,12 +69,7 @@ def create_instance_delete(**kwargs):
|
||||
|
||||
|
||||
def get_instance_delete(**kwargs):
|
||||
delete = None
|
||||
try:
|
||||
delete = models.InstanceDeletes.objects.get(**kwargs)
|
||||
except models.InstanceDeletes.DoesNotExist:
|
||||
pass
|
||||
return delete
|
||||
return _safe_get(models.InstanceDeletes, **kwargs)
|
||||
|
||||
|
||||
def create_instance_exists(**kwargs):
|
||||
|
@ -148,7 +148,7 @@ def get_usage_exist(request, exist_id):
|
||||
|
||||
@api_call
|
||||
def exists_send_status(request, message_id):
|
||||
if request.method != 'PUT':
|
||||
if request.method not in ['PUT', 'POST']:
|
||||
raise BadRequestException(message="Invalid method")
|
||||
|
||||
if request.body is None or request.body == '':
|
||||
|
@ -130,7 +130,7 @@ class InstanceExists(models.Model):
|
||||
status = models.CharField(max_length=50, db_index=True,
|
||||
choices=STATUS_CHOICES,
|
||||
default=PENDING)
|
||||
fail_reason = models.CharField(max_length=500, null=True,
|
||||
fail_reason = models.CharField(max_length=300, null=True,
|
||||
blank=True, db_index=True)
|
||||
raw = models.ForeignKey(RawData, related_name='+', null=True)
|
||||
usage = models.ForeignKey(InstanceUsage, related_name='+', null=True)
|
||||
|
69
stacktach/stacklog.py
Normal file
69
stacktach/stacklog.py
Normal file
@ -0,0 +1,69 @@
|
||||
# Copyright (c) 2013 - Rackspace Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to
|
||||
# deal in the Software without restriction, including without limitation the
|
||||
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
# sell copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
# IN THE SOFTWARE.
|
||||
|
||||
import logging
|
||||
|
||||
LOGGERS = {}
|
||||
default_logger_name = 'stacktach-default'
|
||||
|
||||
|
||||
def set_default_logger_name(name):
|
||||
global default_logger_name
|
||||
default_logger_name = name
|
||||
|
||||
|
||||
def _make_logger(name):
|
||||
log = logging.getLogger(__name__)
|
||||
log.setLevel(logging.DEBUG)
|
||||
handler = logging.handlers.TimedRotatingFileHandler('%s.log' % name,
|
||||
when='h', interval=6, backupCount=4)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
log.addHandler(handler)
|
||||
log.handlers[0].doRollover()
|
||||
return log
|
||||
|
||||
|
||||
def init_logger(name=None):
|
||||
global LOGGERS
|
||||
if name is None:
|
||||
name = default_logger_name
|
||||
if name not in LOGGERS:
|
||||
LOGGERS[name] = _make_logger(name)
|
||||
|
||||
|
||||
def get_logger(name=None):
|
||||
global LOGGERS
|
||||
if name is None:
|
||||
name = default_logger_name
|
||||
init_logger(name=name)
|
||||
return LOGGERS[name]
|
||||
|
||||
|
||||
def warn(msg, name=None):
|
||||
if name is None:
|
||||
name = default_logger_name
|
||||
get_logger(name=name).warn(msg)
|
||||
|
||||
|
||||
def error(msg, name=None):
|
||||
if name is None:
|
||||
name = default_logger_name
|
||||
get_logger(name=name).warn(msg)
|
@ -9,14 +9,23 @@ from django.shortcuts import render_to_response
|
||||
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
from stacktach import db as stackdb
|
||||
from stacktach import models
|
||||
from stacktach import image_type
|
||||
from stacktach import models
|
||||
from stacktach import stacklog
|
||||
from stacktach import utils
|
||||
|
||||
|
||||
STACKDB = stackdb
|
||||
|
||||
|
||||
def log_warn(msg):
|
||||
global LOG
|
||||
if LOG is None:
|
||||
LOG = stacklog.get_logger()
|
||||
if LOG is not None:
|
||||
LOG.warn(msg)
|
||||
|
||||
|
||||
def _extract_states(payload):
|
||||
return {
|
||||
'state' : payload.get('state', ""),
|
||||
@ -246,6 +255,11 @@ def _process_usage_for_new_launch(raw, body):
|
||||
|
||||
def _process_usage_for_updates(raw, body):
|
||||
payload = body['payload']
|
||||
|
||||
if raw.event == INSTANCE_EVENT['create_end']:
|
||||
if 'message' in payload and payload['message'] != 'Success':
|
||||
return
|
||||
|
||||
instance_id = payload['instance_id']
|
||||
request_id = body['_context_request_id']
|
||||
(usage, new) = STACKDB.get_or_create_instance_usage(instance=instance_id,
|
||||
@ -273,53 +287,58 @@ def _process_delete(raw, body):
|
||||
values = {
|
||||
'instance': instance_id,
|
||||
'deleted_at': deleted_at,
|
||||
'raw': raw
|
||||
}
|
||||
(delete, new) = STACKDB.get_or_create_instance_delete(**values)
|
||||
delete.raw = raw
|
||||
|
||||
launched_at = payload.get('launched_at')
|
||||
if launched_at and launched_at != '':
|
||||
launched_at = utils.str_time_to_unix(launched_at)
|
||||
values['launched_at'] = launched_at
|
||||
delete.launched_at = launched_at
|
||||
|
||||
delete = STACKDB.create_instance_delete(**values)
|
||||
STACKDB.save(delete)
|
||||
|
||||
|
||||
def _process_exists(raw, body):
|
||||
payload = body['payload']
|
||||
instance_id = payload['instance_id']
|
||||
launched_at = utils.str_time_to_unix(payload['launched_at'])
|
||||
launched_range = (launched_at, launched_at+1)
|
||||
usage = STACKDB.get_instance_usage(instance=instance_id,
|
||||
launched_at__range=launched_range)
|
||||
values = {}
|
||||
values['message_id'] = body['message_id']
|
||||
values['instance'] = instance_id
|
||||
values['launched_at'] = launched_at
|
||||
beginning = utils.str_time_to_unix(payload['audit_period_beginning'])
|
||||
values['audit_period_beginning'] = beginning
|
||||
ending = utils.str_time_to_unix(payload['audit_period_ending'])
|
||||
values['audit_period_ending'] = ending
|
||||
values['instance_type_id'] = payload['instance_type_id']
|
||||
if usage:
|
||||
values['usage'] = usage
|
||||
values['raw'] = raw
|
||||
values['tenant'] = payload['tenant_id']
|
||||
launched_at_str = payload.get('launched_at')
|
||||
if launched_at_str is not None and launched_at_str != '':
|
||||
launched_at = utils.str_time_to_unix(payload['launched_at'])
|
||||
launched_range = (launched_at, launched_at+1)
|
||||
usage = STACKDB.get_instance_usage(instance=instance_id,
|
||||
launched_at__range=launched_range)
|
||||
values = {}
|
||||
values['message_id'] = body['message_id']
|
||||
values['instance'] = instance_id
|
||||
values['launched_at'] = launched_at
|
||||
beginning = utils.str_time_to_unix(payload['audit_period_beginning'])
|
||||
values['audit_period_beginning'] = beginning
|
||||
ending = utils.str_time_to_unix(payload['audit_period_ending'])
|
||||
values['audit_period_ending'] = ending
|
||||
values['instance_type_id'] = payload['instance_type_id']
|
||||
if usage:
|
||||
values['usage'] = usage
|
||||
values['raw'] = raw
|
||||
values['tenant'] = payload['tenant_id']
|
||||
|
||||
deleted_at = payload.get('deleted_at')
|
||||
if deleted_at and deleted_at != '':
|
||||
# We only want to pre-populate the 'delete' if we know this is in fact
|
||||
# an exist event for a deleted instance. Otherwise, there is a
|
||||
# chance we may populate it for a previous period's exist.
|
||||
delete = STACKDB.get_instance_delete(instance=instance_id,
|
||||
launched_at__range=launched_range)
|
||||
deleted_at = utils.str_time_to_unix(deleted_at)
|
||||
values['deleted_at'] = deleted_at
|
||||
if delete:
|
||||
values['delete'] = delete
|
||||
deleted_at = payload.get('deleted_at')
|
||||
if deleted_at and deleted_at != '':
|
||||
# We only want to pre-populate the 'delete' if we know this is in
|
||||
# fact an exist event for a deleted instance. Otherwise, there
|
||||
# is a chance we may populate it for a previous period's exist.
|
||||
filter = {'instance': instance_id,
|
||||
'launched_at__range': launched_range}
|
||||
delete = STACKDB.get_instance_delete(**filter)
|
||||
deleted_at = utils.str_time_to_unix(deleted_at)
|
||||
values['deleted_at'] = deleted_at
|
||||
if delete:
|
||||
values['delete'] = delete
|
||||
|
||||
exists = STACKDB.create_instance_exists(**values)
|
||||
STACKDB.save(exists)
|
||||
exists = STACKDB.create_instance_exists(**values)
|
||||
STACKDB.save(exists)
|
||||
else:
|
||||
stacklog.warn("Ignoring exists without launched_at. RawData(%s)" % raw.id)
|
||||
|
||||
|
||||
USAGE_PROCESS_MAPPING = {
|
||||
@ -367,12 +386,14 @@ def process_raw_data(deployment, args, json_args):
|
||||
values['json'] = json_args
|
||||
record = STACKDB.create_rawdata(**values)
|
||||
STACKDB.save(record)
|
||||
|
||||
aggregate_lifecycle(record)
|
||||
aggregate_usage(record, body)
|
||||
return record
|
||||
|
||||
|
||||
def post_process(raw, body):
|
||||
aggregate_lifecycle(raw)
|
||||
aggregate_usage(raw, body)
|
||||
|
||||
|
||||
def _post_process_raw_data(rows, highlight=None):
|
||||
for row in rows:
|
||||
if "error" in row.routing_key:
|
||||
|
@ -404,6 +404,24 @@ class DBAPITestCase(unittest.TestCase):
|
||||
self.assertEqual(exists.send_status, 200)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_send_status_accepts_post(self):
|
||||
fake_request = self.mox.CreateMockAnything()
|
||||
fake_request.method = 'POST'
|
||||
body_dict = {'send_status': 200}
|
||||
body = json.dumps(body_dict)
|
||||
fake_request.body = body
|
||||
exists = self.mox.CreateMockAnything()
|
||||
result = self.mox.CreateMockAnything()
|
||||
models.InstanceExists.objects.select_for_update().AndReturn(result)
|
||||
result.get(message_id=MESSAGE_ID_1).AndReturn(exists)
|
||||
exists.save()
|
||||
self.mox.ReplayAll()
|
||||
|
||||
dbapi.exists_send_status(fake_request, MESSAGE_ID_1)
|
||||
|
||||
self.assertEqual(exists.send_status, 200)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_send_status_not_found(self):
|
||||
fake_request = self.mox.CreateMockAnything()
|
||||
fake_request.method = 'PUT'
|
||||
@ -517,6 +535,33 @@ class DBAPITestCase(unittest.TestCase):
|
||||
trans_obj.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
def test_send_status_batch_accepts_post(self):
|
||||
fake_request = self.mox.CreateMockAnything()
|
||||
fake_request.method = 'POST'
|
||||
messages = {
|
||||
MESSAGE_ID_1: 200,
|
||||
MESSAGE_ID_2: 400
|
||||
}
|
||||
body_dict = {'messages': messages}
|
||||
body = json.dumps(body_dict)
|
||||
fake_request.body = body
|
||||
self.mox.StubOutWithMock(transaction, 'commit_on_success')
|
||||
trans_obj = self.mox.CreateMockAnything()
|
||||
transaction.commit_on_success().AndReturn(trans_obj)
|
||||
trans_obj.__enter__()
|
||||
results1 = self.mox.CreateMockAnything()
|
||||
models.InstanceExists.objects.select_for_update().AndReturn(results1)
|
||||
exists1 = self.mox.CreateMockAnything()
|
||||
results1.get(message_id=MESSAGE_ID_2).AndReturn(exists1)
|
||||
exists1.save()
|
||||
results2 = self.mox.CreateMockAnything()
|
||||
models.InstanceExists.objects.select_for_update().AndReturn(results2)
|
||||
exists2 = self.mox.CreateMockAnything()
|
||||
results2.get(message_id=MESSAGE_ID_1).AndReturn(exists2)
|
||||
exists2.save()
|
||||
trans_obj.__exit__(None, None, None)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
resp = dbapi.exists_send_status(fake_request, 'batch')
|
||||
self.assertEqual(resp.status_code, 200)
|
||||
exists1.send_status = 200
|
||||
|
@ -32,6 +32,7 @@ from utils import TENANT_ID_1
|
||||
from utils import INSTANCE_TYPE_ID_1
|
||||
from utils import DUMMY_TIME
|
||||
from utils import INSTANCE_TYPE_ID_2
|
||||
from stacktach import stacklog
|
||||
from stacktach import views
|
||||
|
||||
|
||||
@ -186,10 +187,6 @@ class StacktachRawParsingTestCase(unittest.TestCase):
|
||||
raw = self.mox.CreateMockAnything()
|
||||
views.STACKDB.create_rawdata(**raw_values).AndReturn(raw)
|
||||
views.STACKDB.save(raw)
|
||||
self.mox.StubOutWithMock(views, "aggregate_lifecycle")
|
||||
views.aggregate_lifecycle(raw)
|
||||
self.mox.StubOutWithMock(views, "aggregate_usage")
|
||||
views.aggregate_usage(raw, dict)
|
||||
self.mox.ReplayAll()
|
||||
views.process_raw_data(deployment, args, json_args)
|
||||
self.mox.VerifyAll()
|
||||
@ -215,10 +212,6 @@ class StacktachRawParsingTestCase(unittest.TestCase):
|
||||
raw = self.mox.CreateMockAnything()
|
||||
views.STACKDB.create_rawdata(**raw_values).AndReturn(raw)
|
||||
views.STACKDB.save(raw)
|
||||
self.mox.StubOutWithMock(views, "aggregate_lifecycle")
|
||||
views.aggregate_lifecycle(raw)
|
||||
self.mox.StubOutWithMock(views, "aggregate_usage")
|
||||
views.aggregate_usage(raw, dict)
|
||||
self.mox.ReplayAll()
|
||||
views.process_raw_data(deployment, args, json_args)
|
||||
self.mox.VerifyAll()
|
||||
@ -388,7 +381,6 @@ class StacktachLifecycleTestCase(unittest.TestCase):
|
||||
|
||||
self.mox.VerifyAll()
|
||||
|
||||
|
||||
def test_aggregate_lifecycle_update(self):
|
||||
event = 'compute.instance.update'
|
||||
when = datetime.datetime.utcnow()
|
||||
@ -416,12 +408,20 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mox = mox.Mox()
|
||||
views.STACKDB = self.mox.CreateMockAnything()
|
||||
self.log = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(stacklog, 'get_logger')
|
||||
|
||||
def tearDown(self):
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
def setup_mock_log(self, name=None):
|
||||
if name is None:
|
||||
stacklog.get_logger(name=mox.IgnoreArg()).AndReturn(self.log)
|
||||
else:
|
||||
stacklog.get_logger(name=name).AndReturn(self.log)
|
||||
|
||||
def test_process_usage_for_new_launch_create_start(self):
|
||||
kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1 }
|
||||
kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1}
|
||||
notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs)
|
||||
event = 'compute.instance.create.start'
|
||||
raw, usage = self._setup_process_usage_mocks(event, notification)
|
||||
@ -516,6 +516,36 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_process_usage_for_updates_create_end_success_message(self):
|
||||
kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1}
|
||||
notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs)
|
||||
notification[1]['payload']['message'] = "Success"
|
||||
event = 'compute.instance.create.end'
|
||||
raw, usage = self._setup_process_usage_mocks(event, notification)
|
||||
|
||||
views._process_usage_for_updates(raw, notification[1])
|
||||
|
||||
self.assertEqual(usage.launched_at, utils.decimal_utc(DUMMY_TIME))
|
||||
self.assertEqual(usage.tenant, TENANT_ID_1)
|
||||
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_process_usage_for_updates_create_end_error_message(self):
|
||||
kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1}
|
||||
notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs)
|
||||
notification[1]['payload']['message'] = "Error"
|
||||
event = 'compute.instance.create.end'
|
||||
when_time = DUMMY_TIME
|
||||
when_decimal = utils.decimal_utc(when_time)
|
||||
json_str = json.dumps(notification)
|
||||
raw = utils.create_raw(self.mox, when_decimal, event=event,
|
||||
json_str=json_str)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
views._process_usage_for_updates(raw, notification[1])
|
||||
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_process_usage_for_updates_revert_end(self):
|
||||
kwargs = {'launched': str(DUMMY_TIME), 'type_id': INSTANCE_TYPE_ID_1, 'tenant_id': TENANT_ID_1}
|
||||
notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs)
|
||||
@ -573,11 +603,9 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
delete.instance = INSTANCE_ID_1
|
||||
delete.launched_at = launch_decimal
|
||||
delete.deleted_at = delete_decimal
|
||||
views.STACKDB.create_instance_delete(instance=INSTANCE_ID_1,
|
||||
launched_at=launch_decimal,
|
||||
deleted_at=delete_decimal,
|
||||
raw=raw)\
|
||||
.AndReturn(delete)
|
||||
views.STACKDB.get_or_create_instance_delete(instance=INSTANCE_ID_1,
|
||||
deleted_at=delete_decimal)\
|
||||
.AndReturn((delete, True))
|
||||
views.STACKDB.save(delete)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@ -599,10 +627,9 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
delete = self.mox.CreateMockAnything()
|
||||
delete.instance = INSTANCE_ID_1
|
||||
delete.deleted_at = delete_decimal
|
||||
views.STACKDB.create_instance_delete(instance=INSTANCE_ID_1,
|
||||
deleted_at=delete_decimal,
|
||||
raw=raw) \
|
||||
.AndReturn(delete)
|
||||
views.STACKDB.get_or_create_instance_delete(instance=INSTANCE_ID_1,
|
||||
deleted_at=delete_decimal)\
|
||||
.AndReturn((delete, True))
|
||||
views.STACKDB.save(delete)
|
||||
self.mox.ReplayAll()
|
||||
|
||||
@ -650,6 +677,24 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
views._process_exists(raw, notif[1])
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_process_exists_no_launched_at(self):
|
||||
current_time = datetime.datetime.utcnow()
|
||||
current_decimal = utils.decimal_utc(current_time)
|
||||
audit_beginning = current_time - datetime.timedelta(hours=20)
|
||||
notif = utils.create_nova_notif(audit_period_beginning=str(audit_beginning),
|
||||
audit_period_ending=str(current_time),
|
||||
tenant_id=TENANT_ID_1)
|
||||
json_str = json.dumps(notif)
|
||||
event = 'compute.instance.exists'
|
||||
raw = utils.create_raw(self.mox, current_decimal, event=event,
|
||||
json_str=json_str)
|
||||
raw.id = 1
|
||||
self.setup_mock_log()
|
||||
self.log.warn('Ignoring exists without launched_at. RawData(1)')
|
||||
self.mox.ReplayAll()
|
||||
views._process_exists(raw, notif[1])
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_process_exists_with_deleted_at(self):
|
||||
current_time = datetime.datetime.utcnow()
|
||||
launch_time = current_time - datetime.timedelta(hours=23)
|
||||
@ -664,7 +709,7 @@ class StacktachUsageParsingTestCase(unittest.TestCase):
|
||||
deleted=str(deleted_time),
|
||||
audit_period_beginning=str(audit_beginning),
|
||||
audit_period_ending=str(current_time),
|
||||
tenant_id= TENANT_ID_1)
|
||||
tenant_id=TENANT_ID_1)
|
||||
json_str = json.dumps(notif)
|
||||
event = 'compute.instance.exists'
|
||||
raw = utils.create_raw(self.mox, current_decimal, event=event,
|
||||
|
227
tests/unit/test_stacktach_db.py
Normal file
227
tests/unit/test_stacktach_db.py
Normal file
@ -0,0 +1,227 @@
|
||||
# Copyright (c) 2013 - Rackspace Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to
|
||||
# deal in the Software without restriction, including without limitation the
|
||||
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
# sell copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
# IN THE SOFTWARE.
|
||||
|
||||
import datetime
|
||||
import unittest
|
||||
|
||||
import mox
|
||||
|
||||
from stacktach import db
|
||||
from stacktach import stacklog
|
||||
from stacktach import models
|
||||
|
||||
|
||||
class StacktachDBTestCase(unittest.TestCase):
|
||||
def setUp(self):
|
||||
self.mox = mox.Mox()
|
||||
self.log = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(stacklog, 'get_logger')
|
||||
self.mox.StubOutWithMock(models, 'RawData', use_mock_anything=True)
|
||||
models.RawData.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'Deployment', use_mock_anything=True)
|
||||
models.Deployment.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'Lifecycle', use_mock_anything=True)
|
||||
models.Lifecycle.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'Timing', use_mock_anything=True)
|
||||
models.Timing.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'RequestTracker',
|
||||
use_mock_anything=True)
|
||||
models.RequestTracker.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'InstanceUsage',
|
||||
use_mock_anything=True)
|
||||
models.InstanceUsage.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'InstanceDeletes',
|
||||
use_mock_anything=True)
|
||||
models.InstanceDeletes.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'InstanceExists',
|
||||
use_mock_anything=True)
|
||||
models.InstanceExists.objects = self.mox.CreateMockAnything()
|
||||
self.mox.StubOutWithMock(models, 'JsonReport', use_mock_anything=True)
|
||||
models.JsonReport.objects = self.mox.CreateMockAnything()
|
||||
|
||||
def tearDown(self):
|
||||
self.mox.UnsetStubs()
|
||||
|
||||
def setup_mock_log(self, name=None):
|
||||
if name is None:
|
||||
stacklog.get_logger(name=mox.IgnoreArg()).AndReturn(self.log)
|
||||
else:
|
||||
stacklog.get_logger(name=name).AndReturn(self.log)
|
||||
|
||||
def test_safe_get(self):
|
||||
Model = self.mox.CreateMockAnything()
|
||||
Model.objects = self.mox.CreateMockAnything()
|
||||
filters = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
Model.objects.filter(**filters).AndReturn(results)
|
||||
results.count().AndReturn(1)
|
||||
object = self.mox.CreateMockAnything()
|
||||
results[0].AndReturn(object)
|
||||
self.mox.ReplayAll()
|
||||
returned = db._safe_get(Model, **filters)
|
||||
self.assertEqual(returned, object)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_safe_get_no_results(self):
|
||||
Model = self.mox.CreateMockAnything()
|
||||
Model.__name__ = 'Model'
|
||||
Model.objects = self.mox.CreateMockAnything()
|
||||
filters = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
Model.objects.filter(**filters).AndReturn(results)
|
||||
results.count().AndReturn(0)
|
||||
log = self.mox.CreateMockAnything()
|
||||
self.setup_mock_log()
|
||||
self.log.warn('No records found for Model get.')
|
||||
self.mox.ReplayAll()
|
||||
returned = db._safe_get(Model, **filters)
|
||||
self.assertEqual(returned, None)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_safe_get_multiple_results(self):
|
||||
Model = self.mox.CreateMockAnything()
|
||||
Model.__name__ = 'Model'
|
||||
Model.objects = self.mox.CreateMockAnything()
|
||||
filters = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
Model.objects.filter(**filters).AndReturn(results)
|
||||
results.count().AndReturn(2)
|
||||
self.setup_mock_log()
|
||||
self.log.warn('Multiple records found for Model get.')
|
||||
object = self.mox.CreateMockAnything()
|
||||
results[0].AndReturn(object)
|
||||
self.mox.ReplayAll()
|
||||
returned = db._safe_get(Model, **filters)
|
||||
self.assertEqual(returned, object)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_get_or_create_deployment(self):
|
||||
deployment = self.mox.CreateMockAnything()
|
||||
models.Deployment.objects.get_or_create(name='test').AndReturn(deployment)
|
||||
self.mox.ReplayAll()
|
||||
returned = db.get_or_create_deployment('test')
|
||||
self.assertEqual(returned, deployment)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def _test_db_create_func(self, Model, func):
|
||||
params = {'field1': 'value1', 'field2': 'value2'}
|
||||
object = self.mox.CreateMockAnything()
|
||||
Model(**params).AndReturn(object)
|
||||
self.mox.ReplayAll()
|
||||
returned = func(**params)
|
||||
self.assertEqual(returned, object)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_create_rawdata(self):
|
||||
self._test_db_create_func(models.RawData, db.create_rawdata)
|
||||
|
||||
def test_create_lifecycle(self):
|
||||
self._test_db_create_func(models.Lifecycle, db.create_lifecycle)
|
||||
|
||||
def test_create_timing(self):
|
||||
self._test_db_create_func(models.Timing, db.create_timing)
|
||||
|
||||
def test_create_request_tracker(self):
|
||||
self._test_db_create_func(models.RequestTracker,
|
||||
db.create_request_tracker)
|
||||
|
||||
def test_create_instance_usage(self):
|
||||
self._test_db_create_func(models.InstanceUsage,
|
||||
db.create_instance_usage)
|
||||
|
||||
def test_create_instance_delete(self):
|
||||
self._test_db_create_func(models.InstanceDeletes,
|
||||
db.create_instance_delete)
|
||||
|
||||
def test_create_instance_exists(self):
|
||||
self._test_db_create_func(models.InstanceExists,
|
||||
db.create_instance_exists)
|
||||
|
||||
def _test_db_find_func(self, Model, func, select_related=True):
|
||||
params = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
if select_related:
|
||||
Model.objects.select_related().AndReturn(results)
|
||||
results.filter(**params).AndReturn(results)
|
||||
else:
|
||||
Model.objects.filter(**params).AndReturn(results)
|
||||
self.mox.ReplayAll()
|
||||
returned = func(**params)
|
||||
self.assertEqual(returned, results)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_find_lifecycles(self):
|
||||
self._test_db_find_func(models.Lifecycle, db.find_lifecycles)
|
||||
|
||||
def test_find_timings(self):
|
||||
self._test_db_find_func(models.Timing, db.find_timings)
|
||||
|
||||
def test_find_request_trackers(self):
|
||||
self._test_db_find_func(models.RequestTracker,
|
||||
db.find_request_trackers,
|
||||
select_related=False)
|
||||
|
||||
def _test_db_get_or_create_func(self, Model, func):
|
||||
params = {'field1': 'value1', 'field2': 'value2'}
|
||||
object = self.mox.CreateMockAnything()
|
||||
Model.objects.get_or_create(**params).AndReturn(object)
|
||||
self.mox.ReplayAll()
|
||||
returned = func(**params)
|
||||
self.assertEqual(returned, object)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_get_or_create_instance_usage(self):
|
||||
self._test_db_get_or_create_func(models.InstanceUsage,
|
||||
db.get_or_create_instance_usage)
|
||||
|
||||
def test_get_or_create_instance_delete(self):
|
||||
self._test_db_get_or_create_func(models.InstanceDeletes,
|
||||
db.get_or_create_instance_delete)
|
||||
|
||||
def test_get_instance_usage(self):
|
||||
filters = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
models.InstanceUsage.objects.filter(**filters).AndReturn(results)
|
||||
results.count().AndReturn(1)
|
||||
usage = self.mox.CreateMockAnything()
|
||||
results[0].AndReturn(usage)
|
||||
self.mox.ReplayAll()
|
||||
returned = db.get_instance_usage(**filters)
|
||||
self.assertEqual(returned, usage)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_get_instance_delete(self):
|
||||
filters = {'field1': 'value1', 'field2': 'value2'}
|
||||
results = self.mox.CreateMockAnything()
|
||||
models.InstanceDeletes.objects.filter(**filters).AndReturn(results)
|
||||
results.count().AndReturn(1)
|
||||
usage = self.mox.CreateMockAnything()
|
||||
results[0].AndReturn(usage)
|
||||
self.mox.ReplayAll()
|
||||
returned = db.get_instance_delete(**filters)
|
||||
self.assertEqual(returned, usage)
|
||||
self.mox.VerifyAll()
|
||||
|
||||
def test_save(self):
|
||||
o = self.mox.CreateMockAnything()
|
||||
o.save()
|
||||
self.mox.ReplayAll()
|
||||
db.save(o)
|
||||
self.mox.VerifyAll()
|
@ -39,7 +39,7 @@ class NovaConsumerTestCase(unittest.TestCase):
|
||||
|
||||
def test_get_consumers(self):
|
||||
created_queues = []
|
||||
created_callbacks = []
|
||||
created_callbacks = []
|
||||
created_consumers = []
|
||||
def Consumer(queues=None, callbacks=None):
|
||||
created_queues.extend(queues)
|
||||
@ -127,6 +127,8 @@ class NovaConsumerTestCase(unittest.TestCase):
|
||||
views.process_raw_data(deployment, args, json.dumps(args))\
|
||||
.AndReturn(raw)
|
||||
message.ack()
|
||||
self.mox.StubOutWithMock(views, 'post_process')
|
||||
views.post_process(raw, body_dict)
|
||||
self.mox.StubOutWithMock(consumer, '_check_memory',
|
||||
use_mock_anything=True)
|
||||
consumer._check_memory()
|
||||
|
@ -90,6 +90,7 @@ def create_raw(mox, when, event, instance=INSTANCE_ID_1,
|
||||
raw.json = json_str
|
||||
return raw
|
||||
|
||||
|
||||
def create_lifecycle(mox, instance, last_state, last_task_state, last_raw):
|
||||
lifecycle = mox.CreateMockAnything()
|
||||
lifecycle.instance = instance
|
||||
@ -98,6 +99,7 @@ def create_lifecycle(mox, instance, last_state, last_task_state, last_raw):
|
||||
lifecycle.last_raw = last_raw
|
||||
return lifecycle
|
||||
|
||||
|
||||
def create_timing(mox, name, lifecycle, start_raw=None, start_when=None,
|
||||
end_raw=None, end_when=None, diff=None):
|
||||
timing = mox.CreateMockAnything()
|
||||
@ -110,6 +112,7 @@ def create_timing(mox, name, lifecycle, start_raw=None, start_when=None,
|
||||
timing.diff = diff
|
||||
return timing
|
||||
|
||||
|
||||
def create_tracker(mox, request_id, lifecycle, start, last_timing=None,
|
||||
duration=str(0.0)):
|
||||
tracker = mox.CreateMockAnything()
|
||||
|
268
util/usage_seed.py
Normal file
268
util/usage_seed.py
Normal file
@ -0,0 +1,268 @@
|
||||
# Copyright (c) 2013 - Rackspace Inc.
|
||||
#
|
||||
# Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
# of this software and associated documentation files (the "Software"), to
|
||||
# deal in the Software without restriction, including without limitation the
|
||||
# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
|
||||
# sell copies of the Software, and to permit persons to whom the Software is
|
||||
# furnished to do so, subject to the following conditions:
|
||||
#
|
||||
# The above copyright notice and this permission notice shall be included in
|
||||
# all copies or substantial portions of the Software.
|
||||
#
|
||||
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
|
||||
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
|
||||
# IN THE SOFTWARE.
|
||||
|
||||
"""
|
||||
Usage: python usage_seed.py [period_length] [sql_connection]
|
||||
python usage_seed hour mysql://user:password@nova-db.example.com/nova?charset=utf8
|
||||
|
||||
The idea behind usage seeding is to take the current state of all
|
||||
active instances on active compute hosts and insert that data into
|
||||
Stacktach's usage tables. This script should be run against the
|
||||
nova database in each cell which has active compute nodes. The
|
||||
reason for that is because the global cell does not have information
|
||||
on active compute hosts.
|
||||
"""
|
||||
|
||||
import __builtin__
|
||||
setattr(__builtin__, '_', lambda x: x)
|
||||
import datetime
|
||||
import os
|
||||
import sys
|
||||
|
||||
from oslo.config import cfg
|
||||
CONF = cfg.CONF
|
||||
CONF.config_file = "/etc/nova/nova.conf"
|
||||
|
||||
if __name__ == '__main__':
|
||||
if len(sys.argv) != 3:
|
||||
print "Proper Usage: usage_seed.py [period_length] [sql_connection]"
|
||||
sys.exit(1)
|
||||
CONF.sql_connection = sys.argv[2]
|
||||
|
||||
from nova.compute import task_states
|
||||
from nova.context import RequestContext
|
||||
from nova.db import api as novadb
|
||||
from nova.db.sqlalchemy import api
|
||||
from nova.db.sqlalchemy import models as novamodels
|
||||
|
||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir, os.pardir))
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
from stacktach import models
|
||||
|
||||
|
||||
# start yanked from reports/nova_usage_audit.py
|
||||
def get_previous_period(time, period_length):
|
||||
if period_length == 'day':
|
||||
last_period = time - datetime.timedelta(days=1)
|
||||
start = datetime.datetime(year=last_period.year,
|
||||
month=last_period.month,
|
||||
day=last_period.day)
|
||||
end = datetime.datetime(year=time.year,
|
||||
month=time.month,
|
||||
day=time.day)
|
||||
return start, end
|
||||
elif period_length == 'hour':
|
||||
last_period = time - datetime.timedelta(hours=1)
|
||||
start = datetime.datetime(year=last_period.year,
|
||||
month=last_period.month,
|
||||
day=last_period.day,
|
||||
hour=last_period.hour)
|
||||
end = datetime.datetime(year=time.year,
|
||||
month=time.month,
|
||||
day=time.day,
|
||||
hour=time.hour)
|
||||
return start, end
|
||||
# end yanked from reports/nova_usage_audit.py
|
||||
|
||||
|
||||
def _usage_for_instance(instance, task=None):
|
||||
usage = {
|
||||
'instance': instance['uuid'],
|
||||
'tenant': instance['project_id'],
|
||||
'instance_type_id': instance.get('instance_type_id'),
|
||||
}
|
||||
|
||||
launched_at = instance.get('launched_at')
|
||||
if launched_at is not None:
|
||||
usage['launched_at'] = dt.dt_to_decimal(launched_at)
|
||||
|
||||
if task is not None:
|
||||
usage['task'] = task
|
||||
|
||||
return usage
|
||||
|
||||
|
||||
def _delete_for_instance(instance):
|
||||
delete = {
|
||||
'instance': instance['uuid'],
|
||||
'deleted_at': dt.dt_to_decimal(instance.get('terminated_at')),
|
||||
}
|
||||
|
||||
launched_at = instance.get('launched_at')
|
||||
if launched_at is not None:
|
||||
delete['launched_at'] = dt.dt_to_decimal(launched_at)
|
||||
return delete
|
||||
|
||||
|
||||
def get_active_instances(period_length):
|
||||
context = RequestContext('1', '1', is_admin=True)
|
||||
start, end = get_previous_period(datetime.datetime.utcnow(), period_length)
|
||||
session = api.get_session()
|
||||
computes = novadb.service_get_all_by_topic(context, 'compute')
|
||||
active_instances = []
|
||||
yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1)
|
||||
for compute in computes:
|
||||
if compute.updated_at > yesterday:
|
||||
query = session.query(novamodels.Instance)
|
||||
|
||||
active_filter = api.or_(novamodels.Instance.terminated_at == None,
|
||||
novamodels.Instance.terminated_at > start)
|
||||
query = query.filter(active_filter)
|
||||
query = query.filter_by(host=compute.host)
|
||||
|
||||
for instance in query.all():
|
||||
active_instances.append(instance)
|
||||
return active_instances
|
||||
|
||||
|
||||
def get_action_for_instance(context, instance_uuid, action_name):
|
||||
actions = novadb.actions_get(context, instance_uuid)
|
||||
for action in actions:
|
||||
if action['action'] == action_name:
|
||||
return action
|
||||
|
||||
|
||||
rebuild_tasks = [task_states.REBUILDING,
|
||||
task_states.REBUILD_BLOCK_DEVICE_MAPPING,
|
||||
task_states.REBUILD_SPAWNING]
|
||||
|
||||
resize_tasks = [task_states.RESIZE_PREP,
|
||||
task_states.RESIZE_MIGRATING,
|
||||
task_states.RESIZE_MIGRATED,
|
||||
task_states.RESIZE_FINISH]
|
||||
|
||||
resize_revert_tasks = [task_states.RESIZE_REVERTING]
|
||||
|
||||
rescue_tasks = [task_states.RESCUING]
|
||||
|
||||
in_flight_tasks = (rebuild_tasks + resize_tasks +
|
||||
resize_revert_tasks + rescue_tasks)
|
||||
|
||||
|
||||
def seed(period_length):
|
||||
usages = []
|
||||
building_usages = []
|
||||
in_flight_usages = []
|
||||
deletes = []
|
||||
|
||||
|
||||
start, end = get_previous_period(datetime.datetime.utcnow(), period_length)
|
||||
|
||||
context = RequestContext(1, 1, is_admin=True)
|
||||
|
||||
print "Selecting all active instances"
|
||||
active_instances = get_active_instances(period_length)
|
||||
print "Selected all active instances"
|
||||
|
||||
print "Populating active usages, preparing for in-flight"
|
||||
for instance in active_instances:
|
||||
vm_state = instance['vm_state']
|
||||
task_state = instance['task_state']
|
||||
|
||||
if vm_state == 'building':
|
||||
if instance['deleted'] != 0 and instance['deleted_at'] >= start:
|
||||
building_usages.append(_usage_for_instance(instance))
|
||||
deletes.append(_delete_for_instance(instance))
|
||||
elif instance['deleted'] == 0:
|
||||
building_usages.append(_usage_for_instance(instance))
|
||||
else:
|
||||
if task_state in in_flight_tasks:
|
||||
if (instance['deleted'] != 0 and
|
||||
instance['deleted_at'] >= start):
|
||||
# Just in case...
|
||||
deletes.append(_delete_for_instance(instance))
|
||||
in_flight_usages.append(_usage_for_instance(instance,
|
||||
task=task_state))
|
||||
elif instance['deleted'] == 0:
|
||||
in_flight_usages.append(_usage_for_instance(instance,
|
||||
task=task_state))
|
||||
else:
|
||||
if (instance['deleted'] != 0 and
|
||||
instance['deleted_at'] >= start):
|
||||
deletes.append(_delete_for_instance(instance))
|
||||
usages.append(_usage_for_instance(instance))
|
||||
elif instance['deleted'] == 0:
|
||||
usages.append(_usage_for_instance(instance))
|
||||
|
||||
print "Populated active instances, processing building"
|
||||
for usage in building_usages:
|
||||
action = get_action_for_instance(context, usage['instance'], 'create')
|
||||
if action is not None:
|
||||
usage['request_id'] = action['request_id']
|
||||
|
||||
print "Populated building, processing in-flight"
|
||||
for usage in in_flight_usages:
|
||||
instance = usage['instance']
|
||||
action = None
|
||||
if usage['task'] in rebuild_tasks:
|
||||
action = get_action_for_instance(context, instance, 'rebuild')
|
||||
elif usage['task'] in resize_tasks:
|
||||
action = get_action_for_instance(context, instance, 'resize')
|
||||
elif usage['task'] in resize_revert_tasks:
|
||||
action = get_action_for_instance(context, instance, 'resizeRevert')
|
||||
elif usage['task'] in rescue_tasks:
|
||||
action = get_action_for_instance(context, instance, 'rescue')
|
||||
|
||||
if action is not None:
|
||||
usage['request_id'] = action['request_id']
|
||||
del usage['task']
|
||||
|
||||
print "Done cataloging usage"
|
||||
|
||||
|
||||
print "Saving active instances"
|
||||
active_InstanceUsages = map(lambda x: models.InstanceUsage(**x),
|
||||
usages)
|
||||
models.InstanceUsage.objects.bulk_create(active_InstanceUsages,
|
||||
batch_size=100)
|
||||
|
||||
print "Saving building instances"
|
||||
building_InstanceUsages = map(lambda x: models.InstanceUsage(**x),
|
||||
building_usages)
|
||||
models.InstanceUsage.objects.bulk_create(building_InstanceUsages,
|
||||
batch_size=100)
|
||||
|
||||
print "Saving in-flight instances"
|
||||
in_flight_InstanceUsages = map(lambda x: models.InstanceUsage(**x),
|
||||
in_flight_usages)
|
||||
models.InstanceUsage.objects.bulk_create(in_flight_InstanceUsages,
|
||||
batch_size=100)
|
||||
|
||||
print "Saving deletes"
|
||||
all_InstanceDeletes = map(lambda x: models.InstanceDeletes(**x),
|
||||
deletes)
|
||||
models.InstanceDeletes.objects.bulk_create(all_InstanceDeletes,
|
||||
batch_size=100)
|
||||
|
||||
return (len(usages), len(building_usages),
|
||||
len(in_flight_usages), len(deletes))
|
||||
|
||||
if __name__ == '__main__':
|
||||
msg = ("Seeded system with: \n"
|
||||
"%s Active Instances \n"
|
||||
"%s Building Instances \n"
|
||||
"%s In Flight Instances \n"
|
||||
"%s Deleted Instances \n")
|
||||
print msg % seed(sys.argv[1])
|
||||
|
@ -21,7 +21,6 @@
|
||||
import argparse
|
||||
import datetime
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import sys
|
||||
from time import sleep
|
||||
@ -38,6 +37,11 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
from stacktach import stacklog
|
||||
|
||||
stacklog.set_default_logger_name('verifier')
|
||||
LOG = stacklog.get_logger()
|
||||
|
||||
from stacktach import models
|
||||
from stacktach import datetime_to_decimal as dt
|
||||
from verifier import AmbiguousResults
|
||||
@ -45,15 +49,6 @@ from verifier import FieldMismatch
|
||||
from verifier import NotFound
|
||||
from verifier import VerificationException
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.DEBUG)
|
||||
handler = logging.handlers.TimedRotatingFileHandler('verifier.log',
|
||||
when='h', interval=6, backupCount=4)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
LOG.addHandler(handler)
|
||||
LOG.handlers[0].doRollover()
|
||||
|
||||
|
||||
def _list_exists(ending_max=None, status=None):
|
||||
params = {}
|
||||
|
@ -20,7 +20,6 @@ import datetime
|
||||
import kombu
|
||||
import kombu.entity
|
||||
import kombu.mixins
|
||||
import logging
|
||||
import sys
|
||||
import time
|
||||
|
||||
@ -34,17 +33,12 @@ except ImportError:
|
||||
|
||||
from pympler.process import ProcessMemoryInfo
|
||||
|
||||
from stacktach import db, views
|
||||
from stacktach import db
|
||||
from stacktach import stacklog
|
||||
from stacktach import views
|
||||
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
LOG.setLevel(logging.DEBUG)
|
||||
handler = logging.handlers.TimedRotatingFileHandler('worker.log',
|
||||
when='h', interval=6, backupCount=4)
|
||||
formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
|
||||
handler.setFormatter(formatter)
|
||||
LOG.addHandler(handler)
|
||||
LOG.handlers[0].doRollover()
|
||||
stacklog.set_default_logger_name('worker')
|
||||
LOG = stacklog.get_logger()
|
||||
|
||||
|
||||
class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
@ -87,10 +81,13 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
args = (routing_key, json.loads(body))
|
||||
asJson = json.dumps(args)
|
||||
|
||||
# save raw and ack the message
|
||||
raw = views.process_raw_data(self.deployment, args, asJson)
|
||||
|
||||
if raw:
|
||||
self.processed += 1
|
||||
message.ack()
|
||||
views.post_process(raw, args[1])
|
||||
|
||||
self._check_memory()
|
||||
|
||||
@ -126,13 +123,22 @@ class NovaConsumer(kombu.mixins.ConsumerMixin):
|
||||
try:
|
||||
self._process(message)
|
||||
except Exception, e:
|
||||
LOG.exception("Problem %s" % e)
|
||||
LOG.debug("Problem: %s\nFailed message body:\n%s" %
|
||||
(e, json.loads(str(message.body)))
|
||||
)
|
||||
raise
|
||||
|
||||
|
||||
def continue_running():
|
||||
return True
|
||||
|
||||
|
||||
def exit_or_sleep(exit=False):
|
||||
if exit:
|
||||
sys.exit(1)
|
||||
time.sleep(5)
|
||||
|
||||
|
||||
def run(deployment_config):
|
||||
name = deployment_config['name']
|
||||
host = deployment_config.get('rabbit_host', 'localhost')
|
||||
@ -142,6 +148,7 @@ def run(deployment_config):
|
||||
virtual_host = deployment_config.get('rabbit_virtual_host', '/')
|
||||
durable = deployment_config.get('durable_queue', True)
|
||||
queue_arguments = deployment_config.get('queue_arguments', {})
|
||||
exit_on_exception = deployment_config.get('exit_on_exception', False)
|
||||
|
||||
deployment, new = db.get_or_create_deployment(name)
|
||||
|
||||
@ -168,11 +175,11 @@ def run(deployment_config):
|
||||
LOG.error("!!!!Exception!!!!")
|
||||
LOG.exception("name=%s, exception=%s. Reconnecting in 5s" %
|
||||
(name, e))
|
||||
time.sleep(5)
|
||||
exit_or_sleep(exit_on_exception)
|
||||
LOG.debug("Completed processing on '%s'" % name)
|
||||
except:
|
||||
LOG.error("!!!!Exception!!!!")
|
||||
e = sys.exc_info()[0]
|
||||
msg = "Uncaught exception: deployment=%s, exception=%s. Retrying in 5s"
|
||||
LOG.exception(msg % (name, e))
|
||||
time.sleep(5)
|
||||
exit_or_sleep(exit_on_exception)
|
||||
|
Loading…
x
Reference in New Issue
Block a user