diff --git a/etc/sample_stacktach_worker_config.json b/etc/sample_stacktach_worker_config.json index 5fea92a..e33ad9e 100644 --- a/etc/sample_stacktach_worker_config.json +++ b/etc/sample_stacktach_worker_config.json @@ -6,7 +6,8 @@ "rabbit_port": 5672, "rabbit_userid": "rabbit", "rabbit_password": "rabbit", - "rabbit_virtual_host": "/" + "rabbit_virtual_host": "/", + "exit_on_exception": true }, { "name": "east_coast.prod.cell1", @@ -15,6 +16,7 @@ "rabbit_port": 5672, "rabbit_userid": "rabbit", "rabbit_password": "rabbit", - "rabbit_virtual_host": "/" + "rabbit_virtual_host": "/", + "exit_on_exception": false }] } diff --git a/migrations/012_shrink_fail_reason.sql b/migrations/012_shrink_fail_reason.sql new file mode 100644 index 0000000..e97c29b --- /dev/null +++ b/migrations/012_shrink_fail_reason.sql @@ -0,0 +1 @@ +ALTER TABLE stacktach_instanceexists MODIFY `fail_reason` varchar(300); diff --git a/reports/error_details.py b/reports/error_details.py index f1bf891..1fd883d 100644 --- a/reports/error_details.py +++ b/reports/error_details.py @@ -3,6 +3,7 @@ import json import sys import time import os +import re sys.path.append(os.environ.get('STACKTACH_INSTALL_DIR', '/stacktach')) from stacktach import datetime_to_decimal as dt @@ -13,215 +14,315 @@ from stacktach import models if __name__ != '__main__': sys.exit(1) -yesterday = datetime.datetime.utcnow().date() - datetime.timedelta(days=1) -if len(sys.argv) == 2: - try: - t = time.strptime(sys.argv[1], "%Y-%m-%d") - yesterday = datetime.datetime(*t[:6]) - except Exception, e: - print e - print "Usage: python requests.py YYYY-MM-DD (the end date)" - sys.exit(1) -hours = 0 -length = 24 +# To mask unique identifiers for categorizing notifications +def mask_msg(text): + masking_regex = ( + (1, 'REQ_ID', + r"req-[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" + ), + (2, 'UUID', + r"[a-f0-9]{8}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{4}-[a-f0-9]{12}" + ), + (3, 'HOST_ADDRESS', + r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b" + ), + (4, 'LG_NUM', + r"\b\d{3}\d+\b" + ) + ) + masked = str(text) + for config in masking_regex: + masked = re.sub(config[2], "$%s" % str(config[1]), masked) + return masked -start = datetime.datetime(year=yesterday.year, month=yesterday.month, - day=yesterday.day) -end = start + datetime.timedelta(hours=length-1, minutes=59, seconds=59) -instance_map = {} # { uuid : [request_id, request_id, ...] } -metadata = {'report_format': 'json', 'instances': instance_map} -report = [metadata] # Tell Stacky to format as JSON +# Assemble message from exception object +def build_exc_msg(exc=None, separator=", "): -dstart = dt.dt_to_decimal(start) -dend = dt.dt_to_decimal(end) + """ + White-list exception components we're aware of, and leave a catch all; + because of freeform exception objects from notifications. + """ -codes = {} + if exc is None: + return exc -deployments = {} -for deploy in models.Deployment.objects.all(): - deployments[deploy.id] = deploy.name + message = [] + if exc.get('kwargs', False): + kwargs = exc['kwargs'] + if kwargs.get('value', False): + value = kwargs['value'] + trcbk_index = value.rfind("Traceback") + if trcbk_index > 0: + value = str(value[:trcbk_index]) + "$TRACEBACK" + message.append("value: %s" % value) -# Get all the instances that have changed in the last N hours ... -updates = models.RawData.objects.filter(event='compute.instance.update', - when__gt=dstart, when__lte=dend)\ - .values('instance').distinct() + # kwargs: generic message components that don't require more filter + misc_list = ['reason', 'method', 'topic', 'exc_type', + 'actual', 'code'] + for key in misc_list: + if kwargs.get(key, False): + message.append("%s: %s" % (key, kwargs[key])) + # END generic message components in kwargs -expiry = 60 * 60 # 1 hour -cmds = ['create', 'rebuild', 'rescue', 'resize', 'snapshot'] + if kwargs.get('expected', False): + message.append("expected: %s" % kwargs['expected'][0]) -failures = {} -causes = {} -durations = {} -error_messages = {} -successes = {} -tenant_issues = {} + if exc.get('details', False): + details = exc['details'] + if type(details) is list: + for item in details: + message.append(str(item)) + elif type(details) is dict: + for k, v in details.iteritems(): + message.append("%s: %s" % (k, v)) + elif type(details) is str: + message.append(details) -for uuid_dict in updates: - uuid = uuid_dict['instance'] + # exc: generic messages that don't require more filter + misc_list = ['message', 'cmd', 'stderr', 'exit_code', + 'code', 'description'] + for key in misc_list: + if exc.get(key, False): + message.append("%s: %s" % (key, exc[key])) - # All the unique Request ID's for this instance during that timespan. - reqs = models.RawData.objects.filter(instance=uuid, - when__gt=dstart, when__lte=dend)\ - .values('request_id').distinct() + if exc.get('stdout', False): + if exc['stdout'] != "": + message.append("stdout: %s" % exc['stdout']) + #END generic message components in exc - req_list = [] - for req_dict in reqs: - req = req_dict['request_id'] + if len(message) == 0: + for k, v in exc.iteritems(): + message.append("%s: %s" % (k, v)) + return separator.join(message) - raws = list(models.RawData.objects.filter(request_id=req) - .exclude(event='compute.instance.exists') - .values("id", "when", "routing_key", "old_state", - "state", "tenant", "event", "image_type", - "deployment") - .order_by('when')) +if __name__ == '__main__': - _start = None - err_id = None - failure_type = None + # Start report + yesterday = datetime.datetime.utcnow().date() - datetime.timedelta(days=1) + if len(sys.argv) == 2: + try: + t = time.strptime(sys.argv[1], "%Y-%m-%d") + yesterday = datetime.datetime(*t[:6]) + except Exception, e: + print e + print "Usage: python error_details.py YYYY-MM-DD (the end date)" + sys.exit(1) - operation = "n/a" - platform = 0 - tenant = 0 - cell = "n/a" - image_type_num = 0 + hours = 0 + length = 24 - _when = None + start = datetime.datetime(year=yesterday.year, month=yesterday.month, + day=yesterday.day) + end = start + datetime.timedelta(hours=length-1, minutes=59, seconds=59) - for raw in raws: - _when = raw['when'] - _routing_key = raw['routing_key'] - _old_state = raw['old_state'] - _state = raw['state'] - _tenant = raw['tenant'] - _event = raw['event'] - _image_type = raw['image_type'] - _name = raw['deployment'] - _id = raw['id'] + instance_map = {} # { uuid : [request_id, request_id, ...] } + exception_counts = {} # { exception_message : count } + event_counts = {} # { event_name : count } + metadata = {'report_format': 'json', + 'instances': instance_map, + 'exception_counts': exception_counts, + 'event_counts': event_counts + } + + # Tell Stacky to format as JSON and set placeholders for various summaries + report = [metadata] + + dstart = dt.dt_to_decimal(start) + dend = dt.dt_to_decimal(end) + + codes = {} + deployments = {} + for deploy in models.Deployment.objects.all(): + deployments[deploy.id] = deploy.name + + # Get all the instances that have changed in the last N hours ... + updates = models.RawData.objects.filter(event='compute.instance.update', + when__gt=dstart, when__lte=dend)\ + .values('instance').distinct() + + expiry = 60 * 60 # 1 hour + cmds = ['create', 'rebuild', 'rescue', 'resize', 'snapshot'] + + failures = {} + causes = {} + durations = {} + successes = {} + tenant_issues = {} + + for uuid_dict in updates: + uuid = uuid_dict['instance'] + + # All the unique Request ID's for this instance during that timespan. + reqs = models.RawData.objects.filter(instance=uuid, + when__gt=dstart, when__lte=dend)\ + .values('request_id').distinct() + + req_list = [] + for req_dict in reqs: + req = req_dict['request_id'] + + raws = list(models.RawData.objects.filter(request_id=req) + .exclude(event='compute.instance.exists') + .values("id", "when", "routing_key", "old_state", + "state", "tenant", "event", "image_type", + "deployment") + .order_by('when')) + + _start = None + _when = None + + err_id = None + failure_type = None + operation = "n/a" + platform = 0 + tenant = 0 + cell = "n/a" + image_type_num = 0 + + for raw in raws: + _when = raw['when'] + _routing_key = raw['routing_key'] + _old_state = raw['old_state'] + _state = raw['state'] + _tenant = raw['tenant'] + _event = raw['event'] + _image_type = raw['image_type'] + _name = raw['deployment'] + _id = raw['id'] + + if not _start: + _start = _when + + if 'error' in _routing_key: + err_id = _id + failure_type = 'http' + + if failure_type != 'state' and _old_state != 'error' and\ + _state == 'error': + failure_type = 'state' + err_id = _id + + if _old_state == 'error' and \ + (not _state in ['deleted', 'error']): + failure_type = None + err_id = None + + if _tenant: + tenant = _tenant + + for cmd in cmds: + if cmd in _event: + operation = cmd + cell = deployments.get(_name, "n/a") + break + + if _image_type: + image_type_num |= _image_type if not _start: - _start = _when + continue - if 'error' in _routing_key: - err_id = _id - failure_type = 'http' + image = "?" + if image_type.isset(image_type_num, image_type.BASE_IMAGE): + image = "base" + if image_type.isset(image_type_num, image_type.SNAPSHOT_IMAGE): + image = "snap" - if _old_state != 'error' and _state == 'error': - failure_type = 'state' - err_id = _id + _end = _when + diff = _end - _start - if _old_state == 'error' and \ - (not _state in ['deleted', 'error']): - failure_type = None - err_id = None + if diff > 3600 and failure_type is None: + failure_type = ">60" - if _tenant: - tenant = _tenant + key = (operation, image_type_num, cell) - for cmd in cmds: - if cmd in _event: - operation = cmd - cell = deployments.get(_name, "n/a") - break + # Track durations for all attempts, good and bad ... + duration_min, duration_max, duration_count, duration_total = \ + durations.get(key, (9999999, 0, 0, 0)) + duration_min = min(duration_min, diff) + duration_max = max(duration_max, diff) + duration_count += 1 + duration_total += diff + durations[key] = (duration_min, duration_max, duration_count, + duration_total) - if _image_type: - image_type_num |= _image_type + if not failure_type: + successes[key] = successes.get(key, 0) + 1 + else: + failed_request = {} + message = [] # For exception message masking + req_list.append(req) + instance_map[uuid] = req_list + failed_request['req'] = req + failed_request['duration'] = "%.2f minutes" % (diff/60) + failed_request['operation'] = operation + failed_request['platform'] = image_type.readable(image_type_num) + failures[key] = failures.get(key, 0) + 1 + tenant_issues[tenant] = tenant_issues.get(tenant, 0) + 1 - if not _start: - continue + if err_id: + err = models.RawData.objects.get(id=err_id) + queue, body = json.loads(err.json) + payload = body['payload'] - image = "?" - if image_type.isset(image_type_num, image_type.BASE_IMAGE): - image = "base" - if image_type.isset(image_type_num, image_type.SNAPSHOT_IMAGE): - image = "snap" + # Add error information to failed request report + failed_request['event_id'] = err.id + failed_request['tenant'] = err.tenant + failed_request['service'] = err.service + failed_request['host'] = err.host + failed_request['deployment'] = err.deployment.name + failed_request['event'] = err.event + failed_request['when'] = str(dt.dt_from_decimal(err.when)) - _end = _when - diff = _end - _start + # Track failed event counts + event_counts[err.event] = event_counts.get(err.event, 0) + 1 - if diff > 3600 and failure_type is None: - failure_type = ">60" + exc = payload.get('exception') + if exc: + # group the messages ... + failed_request['exception'] = exc - key = (operation, image_type_num, cell) + # assemble message from exception and generalize + message_str = mask_msg(build_exc_msg(exc)) + # count exception messages + exception_counts[message_str] = exception_counts.get( + message_str, 0) + 1 - # Track durations for all attempts, good and bad ... - duration_min, duration_max, duration_count, duration_total = \ - durations.get(key, (9999999, 0, 0, 0)) - duration_min = min(duration_min, diff) - duration_max = max(duration_max, diff) - duration_count += 1 - duration_total += diff - durations[key] = (duration_min, duration_max, duration_count, - duration_total) + # extract the code, if any ... + code = exc.get('kwargs', {}).get('code') + if code: + codes[code] = codes.get(code, 0) + 1 + failure_type = code + failed_request['failure_type'] = failure_type + raws = models.RawData.objects.filter(request_id=req)\ + .exclude(event='compute.instance.exists')\ + .order_by('when') - if not failure_type: - successes[key] = successes.get(key, 0) + 1 - else: - failed_request = {} - req_list.append(req) - instance_map[uuid] = req_list - failed_request['req'] = req - failed_request['duration'] = "%.2f minutes" % (diff/60) - failed_request['operation'] = operation - failed_request['platform'] = image_type.readable(image_type_num) - failures[key] = failures.get(key, 0) + 1 - tenant_issues[tenant] = tenant_issues.get(tenant, 0) + 1 + failed_request['details'] = [] - if err_id: - err = models.RawData.objects.get(id=err_id) - queue, body = json.loads(err.json) - payload = body['payload'] + for raw in raws: + failure_detail = {} + failure_detail['host'] = raw.host + failure_detail['event'] = raw.event + failure_detail['old_state'] = raw.old_state + failure_detail['state'] = raw.state + failure_detail['old_task'] = raw.old_task + failure_detail['task'] = raw.task + failed_request['details'].append(failure_detail) - # Add error information to failed request report - failed_request['event_id'] = err.id - failed_request['tenant'] = err.tenant - failed_request['service'] = err.service - failed_request['host'] = err.host - failed_request['deployment'] = err.deployment.name - failed_request['event'] = err.event - failed_request['when'] = str(dt.dt_from_decimal(err.when)) + report.append(failed_request) - exc = payload.get('exception') - if exc: - # group the messages ... - failed_request['exception'] = exc + cause_key = (key, failure_type) + causes[cause_key] = causes.get(cause_key, 0) + 1 - exc_str = str(exc) - error_messages[exc_str] = \ - error_messages.get(exc_str, 0) + 1 - - # extract the code, if any ... - code = exc.get('kwargs', {}).get('code') - if code: - codes[code] = codes.get(code, 0) + 1 - failure_type = code - failed_request['failure_type'] = failure_type - raws = models.RawData.objects.filter(request_id=req)\ - .exclude(event='compute.instance.exists')\ - .order_by('when') - failed_request['details'] = [] - - for raw in raws: - failure_detail = {} - failure_detail['host'] = raw.host - failure_detail['event'] = raw.event - failure_detail['old_state'] = raw.old_state - failure_detail['state'] = raw.state - failure_detail['old_task'] = raw.old_task - failure_detail['task'] = raw.task - failed_request['details'].append(failure_detail) - - report.append(failed_request) - - cause_key = (key, failure_type) - causes[cause_key] = causes.get(cause_key, 0) + 1 - -values = {'json': json.dumps(report), - 'created': dt.dt_to_decimal(datetime.datetime.utcnow()), - 'period_start': start, - 'period_end': end, - 'version': 1, - 'name': 'Error detail report'} -report = models.JsonReport(**values) -report.save() + # Assign values to store in DB + values = {'json': json.dumps(report), + 'created': dt.dt_to_decimal(datetime.datetime.utcnow()), + 'period_start': start, + 'period_end': end, + 'version': 1, + 'name': 'Error detail report'} + json_report = models.JsonReport(**values) + json_report.save() diff --git a/stacktach/db.py b/stacktach/db.py index a55b4df..52aa6f4 100644 --- a/stacktach/db.py +++ b/stacktach/db.py @@ -1,4 +1,19 @@ -import models +from stacktach import stacklog +from stacktach import models + + +def _safe_get(Model, **kwargs): + object = None + query = Model.objects.filter(**kwargs) + count = query.count() + if count > 1: + stacklog.warn('Multiple records found for %s get.' % Model.__name__) + object = query[0] + elif count < 1: + stacklog.warn('No records found for %s get.' % Model.__name__) + else: + object = query[0] + return object def get_or_create_deployment(name): @@ -41,13 +56,12 @@ def get_or_create_instance_usage(**kwargs): return models.InstanceUsage.objects.get_or_create(**kwargs) +def get_or_create_instance_delete(**kwargs): + return models.InstanceDeletes.objects.get_or_create(**kwargs) + + def get_instance_usage(**kwargs): - usage = None - try: - usage = models.InstanceUsage.objects.get(**kwargs) - except models.InstanceUsage.DoesNotExist: - pass - return usage + return _safe_get(models.InstanceUsage, **kwargs) def create_instance_delete(**kwargs): @@ -55,12 +69,7 @@ def create_instance_delete(**kwargs): def get_instance_delete(**kwargs): - delete = None - try: - delete = models.InstanceDeletes.objects.get(**kwargs) - except models.InstanceDeletes.DoesNotExist: - pass - return delete + return _safe_get(models.InstanceDeletes, **kwargs) def create_instance_exists(**kwargs): diff --git a/stacktach/dbapi.py b/stacktach/dbapi.py index 85734d5..1a548ea 100644 --- a/stacktach/dbapi.py +++ b/stacktach/dbapi.py @@ -148,7 +148,7 @@ def get_usage_exist(request, exist_id): @api_call def exists_send_status(request, message_id): - if request.method != 'PUT': + if request.method not in ['PUT', 'POST']: raise BadRequestException(message="Invalid method") if request.body is None or request.body == '': diff --git a/stacktach/models.py b/stacktach/models.py index eabad54..6150d6f 100644 --- a/stacktach/models.py +++ b/stacktach/models.py @@ -130,7 +130,7 @@ class InstanceExists(models.Model): status = models.CharField(max_length=50, db_index=True, choices=STATUS_CHOICES, default=PENDING) - fail_reason = models.CharField(max_length=500, null=True, + fail_reason = models.CharField(max_length=300, null=True, blank=True, db_index=True) raw = models.ForeignKey(RawData, related_name='+', null=True) usage = models.ForeignKey(InstanceUsage, related_name='+', null=True) diff --git a/stacktach/stacklog.py b/stacktach/stacklog.py new file mode 100644 index 0000000..b700c8c --- /dev/null +++ b/stacktach/stacklog.py @@ -0,0 +1,69 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +import logging + +LOGGERS = {} +default_logger_name = 'stacktach-default' + + +def set_default_logger_name(name): + global default_logger_name + default_logger_name = name + + +def _make_logger(name): + log = logging.getLogger(__name__) + log.setLevel(logging.DEBUG) + handler = logging.handlers.TimedRotatingFileHandler('%s.log' % name, + when='h', interval=6, backupCount=4) + formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + log.addHandler(handler) + log.handlers[0].doRollover() + return log + + +def init_logger(name=None): + global LOGGERS + if name is None: + name = default_logger_name + if name not in LOGGERS: + LOGGERS[name] = _make_logger(name) + + +def get_logger(name=None): + global LOGGERS + if name is None: + name = default_logger_name + init_logger(name=name) + return LOGGERS[name] + + +def warn(msg, name=None): + if name is None: + name = default_logger_name + get_logger(name=name).warn(msg) + + +def error(msg, name=None): + if name is None: + name = default_logger_name + get_logger(name=name).warn(msg) \ No newline at end of file diff --git a/stacktach/views.py b/stacktach/views.py index 3fbeaa6..6b6bfc4 100644 --- a/stacktach/views.py +++ b/stacktach/views.py @@ -9,14 +9,23 @@ from django.shortcuts import render_to_response from stacktach import datetime_to_decimal as dt from stacktach import db as stackdb -from stacktach import models from stacktach import image_type +from stacktach import models +from stacktach import stacklog from stacktach import utils STACKDB = stackdb +def log_warn(msg): + global LOG + if LOG is None: + LOG = stacklog.get_logger() + if LOG is not None: + LOG.warn(msg) + + def _extract_states(payload): return { 'state' : payload.get('state', ""), @@ -246,6 +255,11 @@ def _process_usage_for_new_launch(raw, body): def _process_usage_for_updates(raw, body): payload = body['payload'] + + if raw.event == INSTANCE_EVENT['create_end']: + if 'message' in payload and payload['message'] != 'Success': + return + instance_id = payload['instance_id'] request_id = body['_context_request_id'] (usage, new) = STACKDB.get_or_create_instance_usage(instance=instance_id, @@ -273,53 +287,58 @@ def _process_delete(raw, body): values = { 'instance': instance_id, 'deleted_at': deleted_at, - 'raw': raw } + (delete, new) = STACKDB.get_or_create_instance_delete(**values) + delete.raw = raw launched_at = payload.get('launched_at') if launched_at and launched_at != '': launched_at = utils.str_time_to_unix(launched_at) - values['launched_at'] = launched_at + delete.launched_at = launched_at - delete = STACKDB.create_instance_delete(**values) STACKDB.save(delete) def _process_exists(raw, body): payload = body['payload'] instance_id = payload['instance_id'] - launched_at = utils.str_time_to_unix(payload['launched_at']) - launched_range = (launched_at, launched_at+1) - usage = STACKDB.get_instance_usage(instance=instance_id, - launched_at__range=launched_range) - values = {} - values['message_id'] = body['message_id'] - values['instance'] = instance_id - values['launched_at'] = launched_at - beginning = utils.str_time_to_unix(payload['audit_period_beginning']) - values['audit_period_beginning'] = beginning - ending = utils.str_time_to_unix(payload['audit_period_ending']) - values['audit_period_ending'] = ending - values['instance_type_id'] = payload['instance_type_id'] - if usage: - values['usage'] = usage - values['raw'] = raw - values['tenant'] = payload['tenant_id'] + launched_at_str = payload.get('launched_at') + if launched_at_str is not None and launched_at_str != '': + launched_at = utils.str_time_to_unix(payload['launched_at']) + launched_range = (launched_at, launched_at+1) + usage = STACKDB.get_instance_usage(instance=instance_id, + launched_at__range=launched_range) + values = {} + values['message_id'] = body['message_id'] + values['instance'] = instance_id + values['launched_at'] = launched_at + beginning = utils.str_time_to_unix(payload['audit_period_beginning']) + values['audit_period_beginning'] = beginning + ending = utils.str_time_to_unix(payload['audit_period_ending']) + values['audit_period_ending'] = ending + values['instance_type_id'] = payload['instance_type_id'] + if usage: + values['usage'] = usage + values['raw'] = raw + values['tenant'] = payload['tenant_id'] - deleted_at = payload.get('deleted_at') - if deleted_at and deleted_at != '': - # We only want to pre-populate the 'delete' if we know this is in fact - # an exist event for a deleted instance. Otherwise, there is a - # chance we may populate it for a previous period's exist. - delete = STACKDB.get_instance_delete(instance=instance_id, - launched_at__range=launched_range) - deleted_at = utils.str_time_to_unix(deleted_at) - values['deleted_at'] = deleted_at - if delete: - values['delete'] = delete + deleted_at = payload.get('deleted_at') + if deleted_at and deleted_at != '': + # We only want to pre-populate the 'delete' if we know this is in + # fact an exist event for a deleted instance. Otherwise, there + # is a chance we may populate it for a previous period's exist. + filter = {'instance': instance_id, + 'launched_at__range': launched_range} + delete = STACKDB.get_instance_delete(**filter) + deleted_at = utils.str_time_to_unix(deleted_at) + values['deleted_at'] = deleted_at + if delete: + values['delete'] = delete - exists = STACKDB.create_instance_exists(**values) - STACKDB.save(exists) + exists = STACKDB.create_instance_exists(**values) + STACKDB.save(exists) + else: + stacklog.warn("Ignoring exists without launched_at. RawData(%s)" % raw.id) USAGE_PROCESS_MAPPING = { @@ -367,12 +386,14 @@ def process_raw_data(deployment, args, json_args): values['json'] = json_args record = STACKDB.create_rawdata(**values) STACKDB.save(record) - - aggregate_lifecycle(record) - aggregate_usage(record, body) return record +def post_process(raw, body): + aggregate_lifecycle(raw) + aggregate_usage(raw, body) + + def _post_process_raw_data(rows, highlight=None): for row in rows: if "error" in row.routing_key: diff --git a/tests/unit/test_dbapi.py b/tests/unit/test_dbapi.py index 8af5d01..fdc9c44 100644 --- a/tests/unit/test_dbapi.py +++ b/tests/unit/test_dbapi.py @@ -404,6 +404,24 @@ class DBAPITestCase(unittest.TestCase): self.assertEqual(exists.send_status, 200) self.mox.VerifyAll() + def test_send_status_accepts_post(self): + fake_request = self.mox.CreateMockAnything() + fake_request.method = 'POST' + body_dict = {'send_status': 200} + body = json.dumps(body_dict) + fake_request.body = body + exists = self.mox.CreateMockAnything() + result = self.mox.CreateMockAnything() + models.InstanceExists.objects.select_for_update().AndReturn(result) + result.get(message_id=MESSAGE_ID_1).AndReturn(exists) + exists.save() + self.mox.ReplayAll() + + dbapi.exists_send_status(fake_request, MESSAGE_ID_1) + + self.assertEqual(exists.send_status, 200) + self.mox.VerifyAll() + def test_send_status_not_found(self): fake_request = self.mox.CreateMockAnything() fake_request.method = 'PUT' @@ -517,6 +535,33 @@ class DBAPITestCase(unittest.TestCase): trans_obj.__exit__(None, None, None) self.mox.ReplayAll() + def test_send_status_batch_accepts_post(self): + fake_request = self.mox.CreateMockAnything() + fake_request.method = 'POST' + messages = { + MESSAGE_ID_1: 200, + MESSAGE_ID_2: 400 + } + body_dict = {'messages': messages} + body = json.dumps(body_dict) + fake_request.body = body + self.mox.StubOutWithMock(transaction, 'commit_on_success') + trans_obj = self.mox.CreateMockAnything() + transaction.commit_on_success().AndReturn(trans_obj) + trans_obj.__enter__() + results1 = self.mox.CreateMockAnything() + models.InstanceExists.objects.select_for_update().AndReturn(results1) + exists1 = self.mox.CreateMockAnything() + results1.get(message_id=MESSAGE_ID_2).AndReturn(exists1) + exists1.save() + results2 = self.mox.CreateMockAnything() + models.InstanceExists.objects.select_for_update().AndReturn(results2) + exists2 = self.mox.CreateMockAnything() + results2.get(message_id=MESSAGE_ID_1).AndReturn(exists2) + exists2.save() + trans_obj.__exit__(None, None, None) + self.mox.ReplayAll() + resp = dbapi.exists_send_status(fake_request, 'batch') self.assertEqual(resp.status_code, 200) exists1.send_status = 200 diff --git a/tests/unit/test_stacktach.py b/tests/unit/test_stacktach.py index 3a89cf7..5fe2229 100644 --- a/tests/unit/test_stacktach.py +++ b/tests/unit/test_stacktach.py @@ -32,6 +32,7 @@ from utils import TENANT_ID_1 from utils import INSTANCE_TYPE_ID_1 from utils import DUMMY_TIME from utils import INSTANCE_TYPE_ID_2 +from stacktach import stacklog from stacktach import views @@ -186,10 +187,6 @@ class StacktachRawParsingTestCase(unittest.TestCase): raw = self.mox.CreateMockAnything() views.STACKDB.create_rawdata(**raw_values).AndReturn(raw) views.STACKDB.save(raw) - self.mox.StubOutWithMock(views, "aggregate_lifecycle") - views.aggregate_lifecycle(raw) - self.mox.StubOutWithMock(views, "aggregate_usage") - views.aggregate_usage(raw, dict) self.mox.ReplayAll() views.process_raw_data(deployment, args, json_args) self.mox.VerifyAll() @@ -215,10 +212,6 @@ class StacktachRawParsingTestCase(unittest.TestCase): raw = self.mox.CreateMockAnything() views.STACKDB.create_rawdata(**raw_values).AndReturn(raw) views.STACKDB.save(raw) - self.mox.StubOutWithMock(views, "aggregate_lifecycle") - views.aggregate_lifecycle(raw) - self.mox.StubOutWithMock(views, "aggregate_usage") - views.aggregate_usage(raw, dict) self.mox.ReplayAll() views.process_raw_data(deployment, args, json_args) self.mox.VerifyAll() @@ -388,7 +381,6 @@ class StacktachLifecycleTestCase(unittest.TestCase): self.mox.VerifyAll() - def test_aggregate_lifecycle_update(self): event = 'compute.instance.update' when = datetime.datetime.utcnow() @@ -416,12 +408,20 @@ class StacktachUsageParsingTestCase(unittest.TestCase): def setUp(self): self.mox = mox.Mox() views.STACKDB = self.mox.CreateMockAnything() + self.log = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') def tearDown(self): self.mox.UnsetStubs() + def setup_mock_log(self, name=None): + if name is None: + stacklog.get_logger(name=mox.IgnoreArg()).AndReturn(self.log) + else: + stacklog.get_logger(name=name).AndReturn(self.log) + def test_process_usage_for_new_launch_create_start(self): - kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1 } + kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1} notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs) event = 'compute.instance.create.start' raw, usage = self._setup_process_usage_mocks(event, notification) @@ -516,6 +516,36 @@ class StacktachUsageParsingTestCase(unittest.TestCase): self.mox.VerifyAll() + def test_process_usage_for_updates_create_end_success_message(self): + kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1} + notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs) + notification[1]['payload']['message'] = "Success" + event = 'compute.instance.create.end' + raw, usage = self._setup_process_usage_mocks(event, notification) + + views._process_usage_for_updates(raw, notification[1]) + + self.assertEqual(usage.launched_at, utils.decimal_utc(DUMMY_TIME)) + self.assertEqual(usage.tenant, TENANT_ID_1) + + self.mox.VerifyAll() + + def test_process_usage_for_updates_create_end_error_message(self): + kwargs = {'launched': str(DUMMY_TIME), 'tenant_id': TENANT_ID_1} + notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs) + notification[1]['payload']['message'] = "Error" + event = 'compute.instance.create.end' + when_time = DUMMY_TIME + when_decimal = utils.decimal_utc(when_time) + json_str = json.dumps(notification) + raw = utils.create_raw(self.mox, when_decimal, event=event, + json_str=json_str) + self.mox.ReplayAll() + + views._process_usage_for_updates(raw, notification[1]) + + self.mox.VerifyAll() + def test_process_usage_for_updates_revert_end(self): kwargs = {'launched': str(DUMMY_TIME), 'type_id': INSTANCE_TYPE_ID_1, 'tenant_id': TENANT_ID_1} notification = utils.create_nova_notif(request_id=REQUEST_ID_1, **kwargs) @@ -573,11 +603,9 @@ class StacktachUsageParsingTestCase(unittest.TestCase): delete.instance = INSTANCE_ID_1 delete.launched_at = launch_decimal delete.deleted_at = delete_decimal - views.STACKDB.create_instance_delete(instance=INSTANCE_ID_1, - launched_at=launch_decimal, - deleted_at=delete_decimal, - raw=raw)\ - .AndReturn(delete) + views.STACKDB.get_or_create_instance_delete(instance=INSTANCE_ID_1, + deleted_at=delete_decimal)\ + .AndReturn((delete, True)) views.STACKDB.save(delete) self.mox.ReplayAll() @@ -599,10 +627,9 @@ class StacktachUsageParsingTestCase(unittest.TestCase): delete = self.mox.CreateMockAnything() delete.instance = INSTANCE_ID_1 delete.deleted_at = delete_decimal - views.STACKDB.create_instance_delete(instance=INSTANCE_ID_1, - deleted_at=delete_decimal, - raw=raw) \ - .AndReturn(delete) + views.STACKDB.get_or_create_instance_delete(instance=INSTANCE_ID_1, + deleted_at=delete_decimal)\ + .AndReturn((delete, True)) views.STACKDB.save(delete) self.mox.ReplayAll() @@ -650,6 +677,24 @@ class StacktachUsageParsingTestCase(unittest.TestCase): views._process_exists(raw, notif[1]) self.mox.VerifyAll() + def test_process_exists_no_launched_at(self): + current_time = datetime.datetime.utcnow() + current_decimal = utils.decimal_utc(current_time) + audit_beginning = current_time - datetime.timedelta(hours=20) + notif = utils.create_nova_notif(audit_period_beginning=str(audit_beginning), + audit_period_ending=str(current_time), + tenant_id=TENANT_ID_1) + json_str = json.dumps(notif) + event = 'compute.instance.exists' + raw = utils.create_raw(self.mox, current_decimal, event=event, + json_str=json_str) + raw.id = 1 + self.setup_mock_log() + self.log.warn('Ignoring exists without launched_at. RawData(1)') + self.mox.ReplayAll() + views._process_exists(raw, notif[1]) + self.mox.VerifyAll() + def test_process_exists_with_deleted_at(self): current_time = datetime.datetime.utcnow() launch_time = current_time - datetime.timedelta(hours=23) @@ -664,7 +709,7 @@ class StacktachUsageParsingTestCase(unittest.TestCase): deleted=str(deleted_time), audit_period_beginning=str(audit_beginning), audit_period_ending=str(current_time), - tenant_id= TENANT_ID_1) + tenant_id=TENANT_ID_1) json_str = json.dumps(notif) event = 'compute.instance.exists' raw = utils.create_raw(self.mox, current_decimal, event=event, diff --git a/tests/unit/test_stacktach_db.py b/tests/unit/test_stacktach_db.py new file mode 100644 index 0000000..dc71546 --- /dev/null +++ b/tests/unit/test_stacktach_db.py @@ -0,0 +1,227 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +import datetime +import unittest + +import mox + +from stacktach import db +from stacktach import stacklog +from stacktach import models + + +class StacktachDBTestCase(unittest.TestCase): + def setUp(self): + self.mox = mox.Mox() + self.log = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') + self.mox.StubOutWithMock(models, 'RawData', use_mock_anything=True) + models.RawData.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Deployment', use_mock_anything=True) + models.Deployment.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Lifecycle', use_mock_anything=True) + models.Lifecycle.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'Timing', use_mock_anything=True) + models.Timing.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'RequestTracker', + use_mock_anything=True) + models.RequestTracker.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceUsage', + use_mock_anything=True) + models.InstanceUsage.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceDeletes', + use_mock_anything=True) + models.InstanceDeletes.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'InstanceExists', + use_mock_anything=True) + models.InstanceExists.objects = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(models, 'JsonReport', use_mock_anything=True) + models.JsonReport.objects = self.mox.CreateMockAnything() + + def tearDown(self): + self.mox.UnsetStubs() + + def setup_mock_log(self, name=None): + if name is None: + stacklog.get_logger(name=mox.IgnoreArg()).AndReturn(self.log) + else: + stacklog.get_logger(name=name).AndReturn(self.log) + + def test_safe_get(self): + Model = self.mox.CreateMockAnything() + Model.objects = self.mox.CreateMockAnything() + filters = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + Model.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + object = self.mox.CreateMockAnything() + results[0].AndReturn(object) + self.mox.ReplayAll() + returned = db._safe_get(Model, **filters) + self.assertEqual(returned, object) + self.mox.VerifyAll() + + def test_safe_get_no_results(self): + Model = self.mox.CreateMockAnything() + Model.__name__ = 'Model' + Model.objects = self.mox.CreateMockAnything() + filters = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + Model.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(0) + log = self.mox.CreateMockAnything() + self.setup_mock_log() + self.log.warn('No records found for Model get.') + self.mox.ReplayAll() + returned = db._safe_get(Model, **filters) + self.assertEqual(returned, None) + self.mox.VerifyAll() + + def test_safe_get_multiple_results(self): + Model = self.mox.CreateMockAnything() + Model.__name__ = 'Model' + Model.objects = self.mox.CreateMockAnything() + filters = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + Model.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(2) + self.setup_mock_log() + self.log.warn('Multiple records found for Model get.') + object = self.mox.CreateMockAnything() + results[0].AndReturn(object) + self.mox.ReplayAll() + returned = db._safe_get(Model, **filters) + self.assertEqual(returned, object) + self.mox.VerifyAll() + + def test_get_or_create_deployment(self): + deployment = self.mox.CreateMockAnything() + models.Deployment.objects.get_or_create(name='test').AndReturn(deployment) + self.mox.ReplayAll() + returned = db.get_or_create_deployment('test') + self.assertEqual(returned, deployment) + self.mox.VerifyAll() + + def _test_db_create_func(self, Model, func): + params = {'field1': 'value1', 'field2': 'value2'} + object = self.mox.CreateMockAnything() + Model(**params).AndReturn(object) + self.mox.ReplayAll() + returned = func(**params) + self.assertEqual(returned, object) + self.mox.VerifyAll() + + def test_create_rawdata(self): + self._test_db_create_func(models.RawData, db.create_rawdata) + + def test_create_lifecycle(self): + self._test_db_create_func(models.Lifecycle, db.create_lifecycle) + + def test_create_timing(self): + self._test_db_create_func(models.Timing, db.create_timing) + + def test_create_request_tracker(self): + self._test_db_create_func(models.RequestTracker, + db.create_request_tracker) + + def test_create_instance_usage(self): + self._test_db_create_func(models.InstanceUsage, + db.create_instance_usage) + + def test_create_instance_delete(self): + self._test_db_create_func(models.InstanceDeletes, + db.create_instance_delete) + + def test_create_instance_exists(self): + self._test_db_create_func(models.InstanceExists, + db.create_instance_exists) + + def _test_db_find_func(self, Model, func, select_related=True): + params = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + if select_related: + Model.objects.select_related().AndReturn(results) + results.filter(**params).AndReturn(results) + else: + Model.objects.filter(**params).AndReturn(results) + self.mox.ReplayAll() + returned = func(**params) + self.assertEqual(returned, results) + self.mox.VerifyAll() + + def test_find_lifecycles(self): + self._test_db_find_func(models.Lifecycle, db.find_lifecycles) + + def test_find_timings(self): + self._test_db_find_func(models.Timing, db.find_timings) + + def test_find_request_trackers(self): + self._test_db_find_func(models.RequestTracker, + db.find_request_trackers, + select_related=False) + + def _test_db_get_or_create_func(self, Model, func): + params = {'field1': 'value1', 'field2': 'value2'} + object = self.mox.CreateMockAnything() + Model.objects.get_or_create(**params).AndReturn(object) + self.mox.ReplayAll() + returned = func(**params) + self.assertEqual(returned, object) + self.mox.VerifyAll() + + def test_get_or_create_instance_usage(self): + self._test_db_get_or_create_func(models.InstanceUsage, + db.get_or_create_instance_usage) + + def test_get_or_create_instance_delete(self): + self._test_db_get_or_create_func(models.InstanceDeletes, + db.get_or_create_instance_delete) + + def test_get_instance_usage(self): + filters = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + models.InstanceUsage.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + usage = self.mox.CreateMockAnything() + results[0].AndReturn(usage) + self.mox.ReplayAll() + returned = db.get_instance_usage(**filters) + self.assertEqual(returned, usage) + self.mox.VerifyAll() + + def test_get_instance_delete(self): + filters = {'field1': 'value1', 'field2': 'value2'} + results = self.mox.CreateMockAnything() + models.InstanceDeletes.objects.filter(**filters).AndReturn(results) + results.count().AndReturn(1) + usage = self.mox.CreateMockAnything() + results[0].AndReturn(usage) + self.mox.ReplayAll() + returned = db.get_instance_delete(**filters) + self.assertEqual(returned, usage) + self.mox.VerifyAll() + + def test_save(self): + o = self.mox.CreateMockAnything() + o.save() + self.mox.ReplayAll() + db.save(o) + self.mox.VerifyAll() diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 199f0ad..442b967 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -39,7 +39,7 @@ class NovaConsumerTestCase(unittest.TestCase): def test_get_consumers(self): created_queues = [] - created_callbacks = [] + created_callbacks = [] created_consumers = [] def Consumer(queues=None, callbacks=None): created_queues.extend(queues) @@ -127,6 +127,8 @@ class NovaConsumerTestCase(unittest.TestCase): views.process_raw_data(deployment, args, json.dumps(args))\ .AndReturn(raw) message.ack() + self.mox.StubOutWithMock(views, 'post_process') + views.post_process(raw, body_dict) self.mox.StubOutWithMock(consumer, '_check_memory', use_mock_anything=True) consumer._check_memory() diff --git a/tests/unit/utils.py b/tests/unit/utils.py index 2353525..c1612f1 100644 --- a/tests/unit/utils.py +++ b/tests/unit/utils.py @@ -90,6 +90,7 @@ def create_raw(mox, when, event, instance=INSTANCE_ID_1, raw.json = json_str return raw + def create_lifecycle(mox, instance, last_state, last_task_state, last_raw): lifecycle = mox.CreateMockAnything() lifecycle.instance = instance @@ -98,6 +99,7 @@ def create_lifecycle(mox, instance, last_state, last_task_state, last_raw): lifecycle.last_raw = last_raw return lifecycle + def create_timing(mox, name, lifecycle, start_raw=None, start_when=None, end_raw=None, end_when=None, diff=None): timing = mox.CreateMockAnything() @@ -110,6 +112,7 @@ def create_timing(mox, name, lifecycle, start_raw=None, start_when=None, timing.diff = diff return timing + def create_tracker(mox, request_id, lifecycle, start, last_timing=None, duration=str(0.0)): tracker = mox.CreateMockAnything() diff --git a/util/usage_seed.py b/util/usage_seed.py new file mode 100644 index 0000000..e2dbbf8 --- /dev/null +++ b/util/usage_seed.py @@ -0,0 +1,268 @@ +# Copyright (c) 2013 - Rackspace Inc. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to +# deal in the Software without restriction, including without limitation the +# rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +# sell copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in +# all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS +# IN THE SOFTWARE. + +""" + Usage: python usage_seed.py [period_length] [sql_connection] + python usage_seed hour mysql://user:password@nova-db.example.com/nova?charset=utf8 + + The idea behind usage seeding is to take the current state of all + active instances on active compute hosts and insert that data into + Stacktach's usage tables. This script should be run against the + nova database in each cell which has active compute nodes. The + reason for that is because the global cell does not have information + on active compute hosts. +""" + +import __builtin__ +setattr(__builtin__, '_', lambda x: x) +import datetime +import os +import sys + +from oslo.config import cfg +CONF = cfg.CONF +CONF.config_file = "/etc/nova/nova.conf" + +if __name__ == '__main__': + if len(sys.argv) != 3: + print "Proper Usage: usage_seed.py [period_length] [sql_connection]" + sys.exit(1) + CONF.sql_connection = sys.argv[2] + +from nova.compute import task_states +from nova.context import RequestContext +from nova.db import api as novadb +from nova.db.sqlalchemy import api +from nova.db.sqlalchemy import models as novamodels + +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, os.pardir)) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): + sys.path.insert(0, POSSIBLE_TOPDIR) + +from stacktach import datetime_to_decimal as dt +from stacktach import models + + +# start yanked from reports/nova_usage_audit.py +def get_previous_period(time, period_length): + if period_length == 'day': + last_period = time - datetime.timedelta(days=1) + start = datetime.datetime(year=last_period.year, + month=last_period.month, + day=last_period.day) + end = datetime.datetime(year=time.year, + month=time.month, + day=time.day) + return start, end + elif period_length == 'hour': + last_period = time - datetime.timedelta(hours=1) + start = datetime.datetime(year=last_period.year, + month=last_period.month, + day=last_period.day, + hour=last_period.hour) + end = datetime.datetime(year=time.year, + month=time.month, + day=time.day, + hour=time.hour) + return start, end +# end yanked from reports/nova_usage_audit.py + + +def _usage_for_instance(instance, task=None): + usage = { + 'instance': instance['uuid'], + 'tenant': instance['project_id'], + 'instance_type_id': instance.get('instance_type_id'), + } + + launched_at = instance.get('launched_at') + if launched_at is not None: + usage['launched_at'] = dt.dt_to_decimal(launched_at) + + if task is not None: + usage['task'] = task + + return usage + + +def _delete_for_instance(instance): + delete = { + 'instance': instance['uuid'], + 'deleted_at': dt.dt_to_decimal(instance.get('terminated_at')), + } + + launched_at = instance.get('launched_at') + if launched_at is not None: + delete['launched_at'] = dt.dt_to_decimal(launched_at) + return delete + + +def get_active_instances(period_length): + context = RequestContext('1', '1', is_admin=True) + start, end = get_previous_period(datetime.datetime.utcnow(), period_length) + session = api.get_session() + computes = novadb.service_get_all_by_topic(context, 'compute') + active_instances = [] + yesterday = datetime.datetime.utcnow() - datetime.timedelta(days=1) + for compute in computes: + if compute.updated_at > yesterday: + query = session.query(novamodels.Instance) + + active_filter = api.or_(novamodels.Instance.terminated_at == None, + novamodels.Instance.terminated_at > start) + query = query.filter(active_filter) + query = query.filter_by(host=compute.host) + + for instance in query.all(): + active_instances.append(instance) + return active_instances + + +def get_action_for_instance(context, instance_uuid, action_name): + actions = novadb.actions_get(context, instance_uuid) + for action in actions: + if action['action'] == action_name: + return action + + +rebuild_tasks = [task_states.REBUILDING, + task_states.REBUILD_BLOCK_DEVICE_MAPPING, + task_states.REBUILD_SPAWNING] + +resize_tasks = [task_states.RESIZE_PREP, + task_states.RESIZE_MIGRATING, + task_states.RESIZE_MIGRATED, + task_states.RESIZE_FINISH] + +resize_revert_tasks = [task_states.RESIZE_REVERTING] + +rescue_tasks = [task_states.RESCUING] + +in_flight_tasks = (rebuild_tasks + resize_tasks + + resize_revert_tasks + rescue_tasks) + + +def seed(period_length): + usages = [] + building_usages = [] + in_flight_usages = [] + deletes = [] + + + start, end = get_previous_period(datetime.datetime.utcnow(), period_length) + + context = RequestContext(1, 1, is_admin=True) + + print "Selecting all active instances" + active_instances = get_active_instances(period_length) + print "Selected all active instances" + + print "Populating active usages, preparing for in-flight" + for instance in active_instances: + vm_state = instance['vm_state'] + task_state = instance['task_state'] + + if vm_state == 'building': + if instance['deleted'] != 0 and instance['deleted_at'] >= start: + building_usages.append(_usage_for_instance(instance)) + deletes.append(_delete_for_instance(instance)) + elif instance['deleted'] == 0: + building_usages.append(_usage_for_instance(instance)) + else: + if task_state in in_flight_tasks: + if (instance['deleted'] != 0 and + instance['deleted_at'] >= start): + # Just in case... + deletes.append(_delete_for_instance(instance)) + in_flight_usages.append(_usage_for_instance(instance, + task=task_state)) + elif instance['deleted'] == 0: + in_flight_usages.append(_usage_for_instance(instance, + task=task_state)) + else: + if (instance['deleted'] != 0 and + instance['deleted_at'] >= start): + deletes.append(_delete_for_instance(instance)) + usages.append(_usage_for_instance(instance)) + elif instance['deleted'] == 0: + usages.append(_usage_for_instance(instance)) + + print "Populated active instances, processing building" + for usage in building_usages: + action = get_action_for_instance(context, usage['instance'], 'create') + if action is not None: + usage['request_id'] = action['request_id'] + + print "Populated building, processing in-flight" + for usage in in_flight_usages: + instance = usage['instance'] + action = None + if usage['task'] in rebuild_tasks: + action = get_action_for_instance(context, instance, 'rebuild') + elif usage['task'] in resize_tasks: + action = get_action_for_instance(context, instance, 'resize') + elif usage['task'] in resize_revert_tasks: + action = get_action_for_instance(context, instance, 'resizeRevert') + elif usage['task'] in rescue_tasks: + action = get_action_for_instance(context, instance, 'rescue') + + if action is not None: + usage['request_id'] = action['request_id'] + del usage['task'] + + print "Done cataloging usage" + + + print "Saving active instances" + active_InstanceUsages = map(lambda x: models.InstanceUsage(**x), + usages) + models.InstanceUsage.objects.bulk_create(active_InstanceUsages, + batch_size=100) + + print "Saving building instances" + building_InstanceUsages = map(lambda x: models.InstanceUsage(**x), + building_usages) + models.InstanceUsage.objects.bulk_create(building_InstanceUsages, + batch_size=100) + + print "Saving in-flight instances" + in_flight_InstanceUsages = map(lambda x: models.InstanceUsage(**x), + in_flight_usages) + models.InstanceUsage.objects.bulk_create(in_flight_InstanceUsages, + batch_size=100) + + print "Saving deletes" + all_InstanceDeletes = map(lambda x: models.InstanceDeletes(**x), + deletes) + models.InstanceDeletes.objects.bulk_create(all_InstanceDeletes, + batch_size=100) + + return (len(usages), len(building_usages), + len(in_flight_usages), len(deletes)) + +if __name__ == '__main__': + msg = ("Seeded system with: \n" + "%s Active Instances \n" + "%s Building Instances \n" + "%s In Flight Instances \n" + "%s Deleted Instances \n") + print msg % seed(sys.argv[1]) + diff --git a/verifier/dbverifier.py b/verifier/dbverifier.py index f54f6a8..7a6f1f8 100644 --- a/verifier/dbverifier.py +++ b/verifier/dbverifier.py @@ -21,7 +21,6 @@ import argparse import datetime import json -import logging import os import sys from time import sleep @@ -38,6 +37,11 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): sys.path.insert(0, POSSIBLE_TOPDIR) +from stacktach import stacklog + +stacklog.set_default_logger_name('verifier') +LOG = stacklog.get_logger() + from stacktach import models from stacktach import datetime_to_decimal as dt from verifier import AmbiguousResults @@ -45,15 +49,6 @@ from verifier import FieldMismatch from verifier import NotFound from verifier import VerificationException -LOG = logging.getLogger(__name__) -LOG.setLevel(logging.DEBUG) -handler = logging.handlers.TimedRotatingFileHandler('verifier.log', - when='h', interval=6, backupCount=4) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) -LOG.addHandler(handler) -LOG.handlers[0].doRollover() - def _list_exists(ending_max=None, status=None): params = {} diff --git a/worker/worker.py b/worker/worker.py index f62626f..60f0d7a 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -20,7 +20,6 @@ import datetime import kombu import kombu.entity import kombu.mixins -import logging import sys import time @@ -34,17 +33,12 @@ except ImportError: from pympler.process import ProcessMemoryInfo -from stacktach import db, views +from stacktach import db +from stacktach import stacklog +from stacktach import views - -LOG = logging.getLogger(__name__) -LOG.setLevel(logging.DEBUG) -handler = logging.handlers.TimedRotatingFileHandler('worker.log', - when='h', interval=6, backupCount=4) -formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') -handler.setFormatter(formatter) -LOG.addHandler(handler) -LOG.handlers[0].doRollover() +stacklog.set_default_logger_name('worker') +LOG = stacklog.get_logger() class NovaConsumer(kombu.mixins.ConsumerMixin): @@ -87,10 +81,13 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): args = (routing_key, json.loads(body)) asJson = json.dumps(args) + # save raw and ack the message raw = views.process_raw_data(self.deployment, args, asJson) + if raw: self.processed += 1 message.ack() + views.post_process(raw, args[1]) self._check_memory() @@ -126,13 +123,22 @@ class NovaConsumer(kombu.mixins.ConsumerMixin): try: self._process(message) except Exception, e: - LOG.exception("Problem %s" % e) + LOG.debug("Problem: %s\nFailed message body:\n%s" % + (e, json.loads(str(message.body))) + ) + raise def continue_running(): return True +def exit_or_sleep(exit=False): + if exit: + sys.exit(1) + time.sleep(5) + + def run(deployment_config): name = deployment_config['name'] host = deployment_config.get('rabbit_host', 'localhost') @@ -142,6 +148,7 @@ def run(deployment_config): virtual_host = deployment_config.get('rabbit_virtual_host', '/') durable = deployment_config.get('durable_queue', True) queue_arguments = deployment_config.get('queue_arguments', {}) + exit_on_exception = deployment_config.get('exit_on_exception', False) deployment, new = db.get_or_create_deployment(name) @@ -168,11 +175,11 @@ def run(deployment_config): LOG.error("!!!!Exception!!!!") LOG.exception("name=%s, exception=%s. Reconnecting in 5s" % (name, e)) - time.sleep(5) + exit_or_sleep(exit_on_exception) LOG.debug("Completed processing on '%s'" % name) except: LOG.error("!!!!Exception!!!!") e = sys.exc_info()[0] msg = "Uncaught exception: deployment=%s, exception=%s. Retrying in 5s" LOG.exception(msg % (name, e)) - time.sleep(5) + exit_or_sleep(exit_on_exception)