From baaba5d3e7f7c9220e7d9c5577ed0dd709df1254 Mon Sep 17 00:00:00 2001 From: Anuj Mathur Date: Tue, 5 Nov 2013 18:24:20 +0530 Subject: [PATCH] Implemented multiprocess logging --- stacktach/stacklog.py | 168 ++++++++++++++++++++--------- tests/unit/test_base_verifier.py | 22 ++++ tests/unit/test_glance_verifier.py | 21 ++++ tests/unit/test_nova_verifier.py | 55 +++++----- tests/unit/test_stacklog.py | 117 +++++++------------- tests/unit/test_worker.py | 25 ++++- verifier/base_verifier.py | 16 +-- verifier/glance_verifier.py | 14 ++- verifier/nova_verifier.py | 23 ++-- verifier/start_verifier.py | 10 +- worker/start_workers.py | 10 +- worker/worker.py | 46 ++++---- 12 files changed, 320 insertions(+), 207 deletions(-) diff --git a/stacktach/stacklog.py b/stacktach/stacklog.py index ab469e9..25505d5 100644 --- a/stacktach/stacklog.py +++ b/stacktach/stacklog.py @@ -20,8 +20,13 @@ import logging import logging.handlers +import multiprocessing +import threading +import traceback +import sys LOGGERS = {} +LOGGER_QUEUE_MAP = {} default_logger_location = '/var/log/stacktach/%s.log' default_logger_name = 'stacktach-default' @@ -36,34 +41,46 @@ def set_default_logger_name(name): default_logger_name = name -def _logger_factory(exchange, name): - if exchange: - return ExchangeLogger(exchange, name) +class ParentLoggerDoesNotExist(Exception): + def __init__(self, parent_logger_name): + self.reason = "Cannot create child logger as parent logger with the" \ + "name %s does not exist." % parent_logger_name + + +def _create_parent_logger(parent_logger_name): + if parent_logger_name not in LOGGERS: + logger = _create_timed_rotating_logger(parent_logger_name) + LOGGERS[parent_logger_name] = logger + LOGGER_QUEUE_MAP[parent_logger_name] = multiprocessing.Queue(-1) + + return LOGGERS[parent_logger_name] + + +def _create_child_logger(parent_logger_name): + child_logger_name = "child_%s" % parent_logger_name + if child_logger_name in LOGGERS: + return LOGGERS[child_logger_name] + if parent_logger_name in LOGGERS: + queue = LOGGER_QUEUE_MAP[parent_logger_name] + logger = _create_queue_logger(child_logger_name, queue) + LOGGERS[child_logger_name] = logger else: - logger = logging.getLogger(__name__) - _configure(logger, name) - return logger + raise ParentLoggerDoesNotExist(parent_logger_name) + + return LOGGERS[child_logger_name] -def _make_logger(name, exchange=None): - log = _logger_factory(exchange, name) - return log +def _logger_factory(parent_logger_name, is_parent): + if parent_logger_name is None: + parent_logger_name = default_logger_name + if is_parent: + return _create_parent_logger(parent_logger_name) + else: + return _create_child_logger(parent_logger_name) -def init_logger(name=None, exchange=None): - global LOGGERS - if name is None: - name = default_logger_name - if name not in LOGGERS: - LOGGERS[name] = _make_logger(name, exchange) - - -def get_logger(name=None, exchange=None): - global LOGGERS - if name is None: - name = default_logger_name - init_logger(name=name, exchange=exchange) - return LOGGERS[name] +def get_logger(name=None, is_parent=True): + return _logger_factory(name, is_parent) def warn(msg, name=None): @@ -84,36 +101,87 @@ def info(msg, name=None): get_logger(name=name).info(msg) -def _configure(logger, name): - logger.setLevel(logging.DEBUG) - handler = logging.handlers.TimedRotatingFileHandler( - default_logger_location % name, - when='midnight', interval=1, backupCount=3) - formatter = logging.Formatter( - '%(asctime)s - %(name)s - %(levelname)s - %(message)s') - handler.setFormatter(formatter) - logger.addHandler(handler) - logger.handlers[0].doRollover() +def _create_timed_rotating_logger(name): + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + handler = logging.handlers.TimedRotatingFileHandler( + default_logger_location % name, + when='midnight', interval=1, backupCount=3) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + logger.handlers[0].doRollover() + return logger -class ExchangeLogger(): - def __init__(self, exchange, name='stacktach-default'): - self.logger = logging.getLogger(__name__) - _configure(self.logger, name) - self.exchange = exchange +def _create_queue_logger(name, queue): + logger = logging.getLogger(name) + logger.setLevel(logging.DEBUG) + handler = QueueHandler(queue) + formatter = logging.Formatter( + '%(asctime)s - %(name)s - %(levelname)s - %(message)s') + handler.setFormatter(formatter) + logger.addHandler(handler) + return logger - def info(self, msg, *args, **kwargs): - msg = self.exchange + ': ' + msg - self.logger.info(msg, *args, **kwargs) - def warn(self, msg, *args, **kwargs): - msg = self.exchange + ': ' + msg - self.logger.warn(msg, *args, **kwargs) +class QueueHandler(logging.Handler): + def __init__(self, queue): + logging.Handler.__init__(self) + self.queue = queue - def error(self, msg, *args, **kwargs): - msg = self.exchange + ': ' + msg - self.logger.error(msg, *args, **kwargs) + def emit(self, record): + try: + # ensure that exc_info and args + # have been stringified. Removes any chance of + # unpickleable things inside and possibly reduces + # message size sent over the pipe + if record.exc_info: + # just to get traceback text into record.exc_text + self.format(record) + # remove exception info as it's not needed any more + record.exc_info = None + if record.args: + record.msg = record.msg % record.args + record.args = None + self.queue.put_nowait(record) + except (KeyboardInterrupt, SystemExit): + raise + except: + self.handleError(record) - def exception(self, msg, *args, **kwargs): - msg = self.exchange + ': ' + msg - self.logger.error(msg, *args, **kwargs) \ No newline at end of file + +class LogListener: + def __init__(self, logger): + self.logger = logger + self.queue = get_queue(logger.name) + + def start(self): + self.thread = threading.Thread(target=self._receive) + self.thread.daemon = True + self.thread.start() + + def _receive(self): + while True: + try: + record = self.queue.get() + # None is sent as a sentinel to tell the listener to quit + if record is None: + break + self.logger.handle(record) + except (KeyboardInterrupt, SystemExit): + raise + except EOFError: + break + except: + traceback.print_exc(file=sys.stderr) + + def end(self): + self.queue.put_nowait(None) + self.thread.join() + self.logger.handler.close() + + +def get_queue(logger_name): + return LOGGER_QUEUE_MAP[logger_name] diff --git a/tests/unit/test_base_verifier.py b/tests/unit/test_base_verifier.py index a17837a..0038595 100644 --- a/tests/unit/test_base_verifier.py +++ b/tests/unit/test_base_verifier.py @@ -3,6 +3,7 @@ import time from django.db import transaction import mox from stacktach import message_service +from stacktach import stacklog from tests.unit import StacktachBaseTestCase from tests.unit.utils import HOST, PORT, VIRTUAL_HOST, USERID, PASSWORD, TICK_TIME, SETTLE_TIME, SETTLE_UNITS from tests.unit.utils import make_verifier_config @@ -139,6 +140,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_run_notifications(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) self._mock_exchange_create_and_connect(self.verifier_with_notifications) self.mox.StubOutWithMock(self.verifier_with_notifications, '_run') self.verifier_with_notifications._run(callback=mox.Not(mox.Is(None))) @@ -147,6 +150,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_run_notifications_with_routing_keys(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) self._mock_exchange_create_and_connect(self.verifier_with_notifications) self.mox.StubOutWithMock(self.verifier_with_notifications, '_run') self.verifier_with_notifications._run(callback=mox.Not(mox.Is(None))) @@ -155,6 +160,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_run_no_notifications(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) self.mox.StubOutWithMock(self.verifier_without_notifications, '_run') self.verifier_without_notifications._run() self.mox.ReplayAll() @@ -162,6 +169,11 @@ class BaseVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_run_full_no_notifications(self): + mock_logger = self._create_mock_logger() + mock_logger.info('None: N: None, P: 0, S: 2, E: 0') + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + self.mox.StubOutWithMock(transaction, 'commit_on_success') tran = self.mox.CreateMockAnything() tran.__enter__().AndReturn(tran) @@ -196,7 +208,17 @@ class BaseVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() + def _create_mock_logger(self): + mock_logger = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') + return mock_logger + def test_run_full(self): + mock_logger = self._create_mock_logger() + mock_logger.info('exchange: N: None, P: 0, S: 2, E: 0') + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + self.mox.StubOutWithMock(transaction, 'commit_on_success') tran = self.mox.CreateMockAnything() tran.__enter__().AndReturn(tran) diff --git a/tests/unit/test_glance_verifier.py b/tests/unit/test_glance_verifier.py index c4c2f12..e783038 100644 --- a/tests/unit/test_glance_verifier.py +++ b/tests/unit/test_glance_verifier.py @@ -21,12 +21,14 @@ from datetime import datetime import decimal import json +import logging import uuid import kombu import mox from stacktach import datetime_to_decimal as dt +from stacktach import stacklog from stacktach import models from tests.unit import StacktachBaseTestCase from utils import IMAGE_UUID_1 @@ -60,6 +62,12 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): self.mox.UnsetStubs() self.verifier = None + def _setup_mock_logger(self): + mock_logger = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + return mock_logger + def test_verify_usage_should_not_raise_exception_on_success(self): exist = self.mox.CreateMockAnything() exist.created_at = decimal.Decimal('1.1') @@ -427,6 +435,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): self.assertTrue(verified) def test_verify_exist_marks_exist_failed_if_field_mismatch_exception(self): + mock_logger = self._setup_mock_logger() + self.mox.StubOutWithMock(mock_logger, 'info') + mock_logger.exception("glance: Expected field to be 'expected' " + "got 'actual'") + exist1 = self.mox.CreateMockAnything() exist2 = self.mox.CreateMockAnything() @@ -450,6 +463,10 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): self.assertFalse(verified) def test_verify_for_range_without_callback(self): + mock_logger = self._setup_mock_logger() + self.mox.StubOutWithMock(mock_logger, 'info') + mock_logger.info('glance: Adding 2 per-owner exists to queue.') + when_max = datetime.utcnow() models.ImageExists.VERIFYING = 'verifying' models.ImageExists.PENDING = 'pending' @@ -477,6 +494,10 @@ class GlanceVerifierTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_verify_for_range_with_callback(self): + mock_logger = self._setup_mock_logger() + self.mox.StubOutWithMock(mock_logger, 'info') + mock_logger.info('glance: Adding 2 per-owner exists to queue.') + callback = self.mox.CreateMockAnything() when_max = datetime.utcnow() models.ImageExists.PENDING = 'pending' diff --git a/tests/unit/test_nova_verifier.py b/tests/unit/test_nova_verifier.py index cb97daa..eb3490d 100644 --- a/tests/unit/test_nova_verifier.py +++ b/tests/unit/test_nova_verifier.py @@ -29,6 +29,7 @@ import kombu.pools import mox from stacktach import datetime_to_decimal as dt +from stacktach import stacklog from stacktach import models from tests.unit import StacktachBaseTestCase from utils import make_verifier_config @@ -632,6 +633,11 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): self.verifier = nova_verifier.NovaVerifier(config, pool=self.pool, reconciler=self.reconciler) + def _create_mock_logger(self): + mock_logger = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') + return mock_logger + def tearDown(self): self.mox.UnsetStubs() self.verifier = None @@ -719,6 +725,10 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_verify_fail_with_reconciled_data_exception(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + mock_logger.exception("nova: message") + exist = self.mox.CreateMockAnything() exist.launched_at = decimal.Decimal('1.1') self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch') @@ -728,7 +738,7 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): nova_verifier._verify_for_launch(exist).AndRaise(verify_exception) self.mox.StubOutWithMock(nova_verifier, '_verify_with_reconciled_data') nova_verifier._verify_with_reconciled_data(exist)\ - .AndRaise(Exception()) + .AndRaise(Exception("message")) exist.mark_failed(reason='Exception') self.mox.ReplayAll() result, exists = nova_verifier._verify(exist, 'none') @@ -754,12 +764,16 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_verify_exception_during_launch(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + mock_logger.exception("nova: message") + exist = self.mox.CreateMockAnything() exist.launched_at = decimal.Decimal('1.1') self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch') self.mox.StubOutWithMock(nova_verifier, '_verify_for_delete') self.mox.StubOutWithMock(exist, 'mark_failed') - nova_verifier._verify_for_launch(exist).AndRaise(Exception()) + nova_verifier._verify_for_launch(exist).AndRaise(Exception("message")) exist.mark_failed(reason='Exception') self.mox.ReplayAll() result, exists = nova_verifier._verify(exist, 'none') @@ -767,44 +781,29 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_verify_exception_during_delete(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + mock_logger.exception("nova: message") + exist = self.mox.CreateMockAnything() exist.launched_at = decimal.Decimal('1.1') self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch') self.mox.StubOutWithMock(nova_verifier, '_verify_for_delete') self.mox.StubOutWithMock(exist, 'mark_failed') nova_verifier._verify_for_launch(exist) - nova_verifier._verify_for_delete(exist).AndRaise(Exception()) + nova_verifier._verify_for_delete(exist).AndRaise(Exception("message")) exist.mark_failed(reason='Exception') self.mox.ReplayAll() result, exists = nova_verifier._verify(exist, 'none') self.assertFalse(result) self.mox.VerifyAll() - def test_verify_for_range_without_callback(self): - when_max = datetime.datetime.utcnow() - results = self.mox.CreateMockAnything() - models.InstanceExists.PENDING = 'pending' - models.InstanceExists.VERIFYING = 'verifying' - models.InstanceExists.find( - ending_max=when_max, status='pending').AndReturn(results) - results.count().AndReturn(2) - exist1 = self.mox.CreateMockAnything() - exist2 = self.mox.CreateMockAnything() - results.__getslice__(0, 1000).AndReturn(results) - results.__iter__().AndReturn([exist1, exist2].__iter__()) - exist1.update_status('verifying') - exist2.update_status('verifying') - exist1.save() - exist2.save() - self.pool.apply_async(nova_verifier._verify, args=(exist1, 'all'), - callback=None) - self.pool.apply_async(nova_verifier._verify, args=(exist2, 'all'), - callback=None) - self.mox.ReplayAll() - self.verifier.verify_for_range(when_max) - self.mox.VerifyAll() def test_verify_for_range_without_callback(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + mock_logger.info('nova: Adding 2 exists to queue.') + when_max = datetime.datetime.utcnow() results = self.mox.CreateMockAnything() models.InstanceExists.PENDING = 'pending' @@ -829,6 +828,10 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_verify_for_range_with_callback(self): + mock_logger = self._create_mock_logger() + stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger) + mock_logger.info("nova: Adding 2 exists to queue.") + callback = self.mox.CreateMockAnything() when_max = datetime.datetime.utcnow() results = self.mox.CreateMockAnything() diff --git a/tests/unit/test_stacklog.py b/tests/unit/test_stacklog.py index f2e730d..644a140 100644 --- a/tests/unit/test_stacklog.py +++ b/tests/unit/test_stacklog.py @@ -1,11 +1,9 @@ -import glob import logging -import os import mox from stacktach import stacklog -from stacktach.stacklog import ExchangeLogger from tests.unit import StacktachBaseTestCase + class StacklogTestCase(StacktachBaseTestCase): def setUp(self): self.mox = mox.Mox() @@ -13,88 +11,47 @@ class StacklogTestCase(StacktachBaseTestCase): def tearDown(self): self.mox.UnsetStubs() - def test_get_logger_should_get_exchange_logger_if_exchange_provided(self): - filename = 'filename' - logger = stacklog.get_logger(filename, 'nova') - self.assertIsInstance(logger, ExchangeLogger) - for file in glob.glob('{0}.log*'.format(filename)): - os.remove(file) + def test_get_logger_should_create_timed_rotating_logger_for_parent(self): + logger_name = 'logger' + logger = stacklog.get_logger(logger_name, is_parent=True) + self.assertIsInstance( + logger.handlers[0], logging.handlers.TimedRotatingFileHandler) + self.assertEquals(logger.handlers[0].when, 'MIDNIGHT') + self.assertEquals(logger.handlers[0].interval, 86400) + self.assertEquals(logger.handlers[0].backupCount, 3) + self.assertEqual(logger.name, 'logger') + self.assertEquals(logger.level, logging.DEBUG) - def test_get_logger_should_get_default_logger_if_exchange_not_provided(self): - filename = 'default_logger' - logger = stacklog.get_logger(filename) - self.assertIsInstance(logger, logging.Logger) - for file in glob.glob('{0}.log*'.format(filename)): - os.remove(file) + def test_get_logger_should_create_queue_logger_for_child(self): + logger_name = 'logger' + stacklog.get_logger(logger_name, is_parent=True) + child_logger = stacklog.get_logger(logger_name, is_parent=False) + self.assertIsInstance( + child_logger.handlers[0], stacklog.QueueHandler) + self.assertEqual(child_logger.name, 'child_logger') + self.assertEquals(child_logger.level, logging.DEBUG) + def test_get_logger_should_use_default_name_when_name_not_specified(self): + logger = stacklog.get_logger(None, is_parent=True) + self.assertEquals(logger.name, 'stacktach-default') -class ExchangeLoggerTestCase(StacktachBaseTestCase): - def setUp(self): - self.mox = mox.Mox() + stacklog.set_default_logger_name('default') + logger = stacklog.get_logger(None, is_parent=True) + self.assertEquals(logger.name, 'default') - def tearDown(self): - self.mox.UnsetStubs() + def test_get_logger_raise_exception_when_child_created_before_parent(self): + with self.assertRaises(stacklog.ParentLoggerDoesNotExist): + stacklog.get_logger('logger', is_parent=False) - def _setup_logger_mocks(self, name='name'): - mock_logger = self.mox.CreateMockAnything() - self.mox.StubOutWithMock(logging, 'getLogger') - logging.getLogger(stacklog.__name__).AndReturn(mock_logger) - mock_logger.setLevel(logging.DEBUG) - self.mox.StubOutClassWithMocks(logging.handlers, - 'TimedRotatingFileHandler') - filename = "/tmp/{0}.log".format(name) - handler = logging.handlers.TimedRotatingFileHandler( - filename, backupCount=3, interval=1, when='midnight') - self.mox.StubOutClassWithMocks(logging, 'Formatter') - mock_formatter = logging.Formatter( - "%(asctime)s - %(name)s - %(levelname)s - %(message)s") - handler.setFormatter(mock_formatter) - mock_logger.addHandler(handler) - mock_logger.handlers = [handler] - handler.doRollover() - return mock_logger + def test_get_logger_should_return_existing_parent_logger_if_present(self): + logger_1 = stacklog.get_logger('logger', is_parent=True) + logger_2 = stacklog.get_logger('logger', is_parent=True) - def test_exchange_logger_should_append_exchange_name_to_info(self): - mock_logger = self._setup_logger_mocks() - mock_logger.info('exchange: Log %s', 'args', xyz='xyz') - self.mox.ReplayAll() - - log = ExchangeLogger('exchange', 'name') - log.info("Log %s", 'args', xyz='xyz') - self.mox.VerifyAll() - - def test_exchange_logger_should_append_exchange_name_to_warn(self): - mock_logger = self._setup_logger_mocks() - mock_logger.warn('exchange: Log %s', 'args', xyz='xyz') - self.mox.ReplayAll() - - logger = ExchangeLogger('exchange', 'name') - logger.warn("Log %s", 'args', xyz='xyz') - self.mox.VerifyAll() - - def test_exchange_logger_should_append_exchange_name_to_error(self): - mock_logger = self._setup_logger_mocks() - mock_logger.error('exchange: Log %s', 'args', xyz='xyz') - self.mox.ReplayAll() - - logger = ExchangeLogger('exchange', 'name') - logger.error("Log %s", 'args', xyz='xyz') - self.mox.VerifyAll() - - def test_exchange_logger_should_append_exchange_name_to_exception(self): - mock_logger = self._setup_logger_mocks() - mock_logger.error('exchange: Log %s', 'args', xyz='xyz') - self.mox.ReplayAll() - - logger = ExchangeLogger('exchange', 'name') - logger.exception("Log %s", 'args', xyz='xyz') - self.mox.VerifyAll() - - def test_exchange_logger_should_use_default_name_if_not_provided(self): - self._setup_logger_mocks('stacktach-default') - self.mox.ReplayAll() - - ExchangeLogger('exchange') - self.mox.VerifyAll() + self.assertIs(logger_1, logger_2) + def test_get_logger_should_return_existing_child_logger_if_present(self): + stacklog.get_logger('logger', is_parent=True) + child_logger_1 = stacklog.get_logger('logger', is_parent=False) + child_logger_2 = stacklog.get_logger('logger', is_parent=False) + self.assertIs(child_logger_1, child_logger_2) diff --git a/tests/unit/test_worker.py b/tests/unit/test_worker.py index 4e594ee..01d70cc 100644 --- a/tests/unit/test_worker.py +++ b/tests/unit/test_worker.py @@ -23,7 +23,7 @@ import json import kombu import mox -from stacktach import db +from stacktach import db, stacklog from stacktach import views import worker.worker as worker from tests.unit import StacktachBaseTestCase @@ -36,6 +36,12 @@ class ConsumerTestCase(StacktachBaseTestCase): def tearDown(self): self.mox.UnsetStubs() + def _setup_mock_logger(self): + mock_logger = self.mox.CreateMockAnything() + self.mox.StubOutWithMock(stacklog, 'get_logger') + stacklog.get_logger('worker', is_parent=False).AndReturn(mock_logger) + return mock_logger + def _test_topics(self): return [ dict(queue="queue1", routing_key="monitor.info"), @@ -103,7 +109,6 @@ class ConsumerTestCase(StacktachBaseTestCase): self.assertEqual(actual_queue, queue) self.mox.VerifyAll() - def test_create_queue_with_queue_args(self): self.mox.StubOutClassWithMocks(kombu, 'Queue') exchange = self.mox.CreateMockAnything() @@ -157,6 +162,14 @@ class ConsumerTestCase(StacktachBaseTestCase): worker.POST_PROCESS_METHODS["RawData"] = old_handler def test_run(self): + mock_logger = self._setup_mock_logger() + self.mox.StubOutWithMock(mock_logger, 'info') + mock_logger.info('east_coast.prod.global: nova 10.0.0.1 5672 rabbit /') + self.mox.StubOutWithMock(mock_logger, 'debug') + mock_logger.debug("Processing on 'east_coast.prod.global nova'") + mock_logger.debug("Completed processing on " + "'east_coast.prod.global nova'") + config = { 'name': 'east_coast.prod.global', 'durable_queue': False, @@ -197,6 +210,14 @@ class ConsumerTestCase(StacktachBaseTestCase): self.mox.VerifyAll() def test_run_queue_args(self): + mock_logger = self._setup_mock_logger() + self.mox.StubOutWithMock(mock_logger, 'info') + mock_logger.info("east_coast.prod.global: nova 10.0.0.1 5672 rabbit /") + self.mox.StubOutWithMock(mock_logger, 'debug') + mock_logger.debug("Processing on 'east_coast.prod.global nova'") + mock_logger.debug("Completed processing on " + "'east_coast.prod.global nova'") + config = { 'name': 'east_coast.prod.global', 'durable_queue': False, diff --git a/verifier/base_verifier.py b/verifier/base_verifier.py index e2228cc..cf06042 100644 --- a/verifier/base_verifier.py +++ b/verifier/base_verifier.py @@ -27,6 +27,7 @@ import time import multiprocessing from django.db import transaction +from stacktach import message_service POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), @@ -39,9 +40,11 @@ from django.db import reset_queries from django.core import exceptions from verifier import WrongTypeException +from stacktach import stacklog -from stacktach import stacklog, message_service -LOG = stacklog.get_logger('verifier') + +def _get_child_logger(): + return stacklog.get_logger('verifier', is_parent=False) def _has_field(d1, d2, field1, field2=None): @@ -155,10 +158,11 @@ class Verifier(object): if self.reconciler: self.reconcile_failed() msg = "%s: N: %s, P: %s, S: %s, E: %s" % values - LOG.info(msg) + _get_child_logger().info(msg) time.sleep(tick_time) def run(self): + logger = _get_child_logger() if self.enable_notifications: exchange_name = self.exchange() exchange = message_service.create_exchange( @@ -182,18 +186,18 @@ class Verifier(object): break except exceptions.ObjectDoesNotExist: if attempt < 1: - LOG.warn("ObjectDoesNotExist in callback, " + logger.warn("ObjectDoesNotExist in callback, " "attempting to reconnect and try " "again.") close_connection() reset_queries() else: - LOG.error("ObjectDoesNotExist in callback " + logger.error("ObjectDoesNotExist in callback " "again, giving up.") except Exception, e: msg = "ERROR in Callback %s: %s" % (exchange_name, e) - LOG.exception(msg) + logger.exception(msg) break attempt += 1 try: diff --git a/verifier/glance_verifier.py b/verifier/glance_verifier.py index 4601552..69ffbf0 100644 --- a/verifier/glance_verifier.py +++ b/verifier/glance_verifier.py @@ -37,9 +37,13 @@ from verifier import base_verifier from verifier import NullFieldException from verifier import NotFound from stacktach import datetime_to_decimal as dt +from stacktach import stacklog +from stacktach import message_service import datetime -from stacktach import stacklog, message_service -LOG = stacklog.get_logger('verifier') + + +def _get_child_logger(): + return stacklog.get_logger('verifier', is_parent=False) def _verify_field_mismatch(exists, usage): @@ -133,7 +137,7 @@ def _verify(exists): except Exception, e: verified = False exist.mark_failed(reason=e.__class__.__name__) - LOG.exception("glance: %s" % e) + _get_child_logger().exception("glance: %s" % e) return verified, exists[0] @@ -151,7 +155,7 @@ class GlanceVerifier(Verifier): added = 0 update_interval = datetime.timedelta(seconds=30) next_update = datetime.datetime.utcnow() + update_interval - LOG.info("glance: Adding %s per-owner exists to queue." % count) + _get_child_logger().info("glance: Adding %s per-owner exists to queue." % count) while added < count: for exists in exists_grouped_by_owner_and_rawid.values(): for exist in exists: @@ -164,7 +168,7 @@ class GlanceVerifier(Verifier): if datetime.datetime.utcnow() > next_update: values = ((added,) + self.clean_results()) msg = "glance: N: %s, P: %s, S: %s, E: %s" % values - LOG.info(msg) + _get_child_logger().info(msg) next_update = datetime.datetime.utcnow() + update_interval return count diff --git a/verifier/nova_verifier.py b/verifier/nova_verifier.py index e642dfc..faebde9 100644 --- a/verifier/nova_verifier.py +++ b/verifier/nova_verifier.py @@ -34,13 +34,17 @@ from verifier import base_verifier from verifier import config from verifier import NullFieldException from stacktach import models +from stacktach import stacklog from stacktach import datetime_to_decimal as dt from verifier import FieldMismatch from verifier import AmbiguousResults from verifier import NotFound from verifier import VerificationException -from stacktach import stacklog, message_service -LOG = stacklog.get_logger('verifier') +from stacktach import message_service + + +def _get_child_logger(): + return stacklog.get_logger('verifier', is_parent=False) def _verify_field_mismatch(exists, launch): @@ -178,13 +182,6 @@ def _verify_optional_validity(exist): base_verifier._is_alphanumeric('os_distro', exist.os_distro, exist.id) base_verifier._is_alphanumeric('os_version', exist.os_version, exist.id) -def verify_fields_not_null(exist_id, null_value, fields): - - for (field_value, field_name) in fields.items(): - print "value: %s, name = %s" % (field_value, field_name) - if field_value == null_value: - raise NullFieldException(field_name, exist_id) - def _verify_validity(exist, validation_level): if validation_level == 'none': @@ -240,7 +237,7 @@ def _attempt_reconciled_verify(exist, orig_e): exist.mark_failed(reason=str(rec_e)) except Exception, rec_e: exist.mark_failed(reason=rec_e.__class__.__name__) - LOG.exception("nova: %s" % rec_e) + _get_child_logger().exception("nova: %s" % rec_e) return verified @@ -260,7 +257,7 @@ def _verify(exist, validation_level): verified = _attempt_reconciled_verify(exist, orig_e) except Exception, e: exist.mark_failed(reason=e.__class__.__name__) - LOG.exception("nova: %s" % e) + _get_child_logger().exception("nova: %s" % e) return verified, exist @@ -298,7 +295,7 @@ class NovaVerifier(base_verifier.Verifier): added = 0 update_interval = datetime.timedelta(seconds=30) next_update = datetime.datetime.utcnow() + update_interval - LOG.info("nova: Adding %s exists to queue." % count) + _get_child_logger().info("nova: Adding %s exists to queue." % count) while added < count: for exist in exists[0:1000]: exist.update_status(models.InstanceExists.VERIFYING) @@ -312,7 +309,7 @@ class NovaVerifier(base_verifier.Verifier): if datetime.datetime.utcnow() > next_update: values = ((added,) + self.clean_results()) msg = "nova: N: %s, P: %s, S: %s, E: %s" % values - LOG.info(msg) + _get_child_logger().info(msg) next_update = datetime.datetime.utcnow() + update_interval return count diff --git a/verifier/start_verifier.py b/verifier/start_verifier.py index adbe69a..5fd2869 100644 --- a/verifier/start_verifier.py +++ b/verifier/start_verifier.py @@ -29,7 +29,7 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), os.pardir, os.pardir)) if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): sys.path.insert(0, POSSIBLE_TOPDIR) - +from stacktach import stacklog from stacktach import reconciler from verifier import nova_verifier from verifier import glance_verifier @@ -42,11 +42,16 @@ except ImportError: pass process = None - +log_listener = None processes = [] +def _get_parent_logger(): + return stacklog.get_logger('verifier', is_parent=True) + + def kill_time(signal, frame): + log_listener.end() print "dying ..." for process in processes: process.terminate() @@ -81,6 +86,7 @@ if __name__ == '__main__': verifier.run() verifier_config.load() + log_listener = stacklog.LogListener(_get_parent_logger()).start() for exchange in verifier_config.topics().keys(): process = Process(target=make_and_start_verifier, args=(exchange,)) process.start() diff --git a/worker/start_workers.py b/worker/start_workers.py index a6a5e20..74beaac 100644 --- a/worker/start_workers.py +++ b/worker/start_workers.py @@ -9,16 +9,22 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')): sys.path.insert(0, POSSIBLE_TOPDIR) -from stacktach import db +from stacktach import db, stacklog from django.db import close_connection import worker.worker as worker from worker import config processes = [] +log_listener = None + + +def _get_parent_logger(): + return stacklog.get_logger('worker', is_parent=True) def kill_time(signal, frame): + log_listener.end() print "dying ..." for process in processes: process.terminate() @@ -30,7 +36,7 @@ def kill_time(signal, frame): if __name__ == '__main__': - + log_listener = stacklog.LogListener(_get_parent_logger()).start() for deployment in config.deployments(): if deployment.get('enabled', True): db_deployment, new = db.get_or_create_deployment(deployment['name']) diff --git a/worker/worker.py b/worker/worker.py index e646788..19c5b0f 100644 --- a/worker/worker.py +++ b/worker/worker.py @@ -34,12 +34,14 @@ except ImportError: from pympler.process import ProcessMemoryInfo -from stacktach import db, message_service +from stacktach import db +from stacktach import message_service from stacktach import stacklog from stacktach import views -stacklog.set_default_logger_name('worker') -LOG = stacklog.get_logger() + +def _get_child_logger(): + return stacklog.get_logger('worker', is_parent=False) class Consumer(kombu.mixins.ConsumerMixin): @@ -58,9 +60,10 @@ class Consumer(kombu.mixins.ConsumerMixin): self.exchange = exchange def _create_exchange(self, name, type, exclusive=False, auto_delete=False): - return message_service.create_exchange(name, exchange_type=type, exclusive=exclusive, - durable=self.durable, - auto_delete=auto_delete) + return message_service.create_exchange(name, exchange_type=type, + exclusive=exclusive, + durable=self.durable, + auto_delete=auto_delete) def _create_queue(self, name, nova_exchange, routing_key, exclusive=False, auto_delete=False): @@ -115,7 +118,7 @@ class Consumer(kombu.mixins.ConsumerMixin): per_message = 0 if self.total_processed: per_message = idiff / self.total_processed - LOG.debug("%20s %20s %6dk/%6dk ram, " + _get_child_logger().debug("%20s %20s %6dk/%6dk ram, " "%3d/%4d msgs @ %6dk/msg" % (self.name, self.exchange, diff, idiff, self.processed, self.total_processed, per_message)) @@ -126,9 +129,8 @@ class Consumer(kombu.mixins.ConsumerMixin): try: self._process(message) except Exception, e: - LOG.debug("Problem: %s\nFailed message body:\n%s" % - (e, json.loads(str(message.body))) - ) + _get_child_logger().debug("Problem: %s\nFailed message body:\n%s" % + (e, json.loads(str(message.body)))) raise @@ -153,12 +155,13 @@ def run(deployment_config, deployment_id, exchange): queue_arguments = deployment_config.get('queue_arguments', {}) exit_on_exception = deployment_config.get('exit_on_exception', False) topics = deployment_config.get('topics', {}) + logger = _get_child_logger() deployment = db.get_deployment(deployment_id) print "Starting worker for '%s %s'" % (name, exchange) - LOG.info("%s: %s %s %s %s %s" % (name, exchange, host, port, user_id, - virtual_host)) + logger.info("%s: %s %s %s %s %s" % + (name, exchange, host, port, user_id, virtual_host)) params = dict(hostname=host, port=port, @@ -170,7 +173,7 @@ def run(deployment_config, deployment_id, exchange): # continue_running() is used for testing while continue_running(): try: - LOG.debug("Processing on '%s %s'" % (name, exchange)) + logger.debug("Processing on '%s %s'" % (name, exchange)) with kombu.connection.BrokerConnection(**params) as conn: try: consumer = Consumer(name, conn, deployment, durable, @@ -178,18 +181,19 @@ def run(deployment_config, deployment_id, exchange): topics[exchange]) consumer.run() except Exception as e: - LOG.error("!!!!Exception!!!!") - LOG.exception("name=%s, exchange=%s, exception=%s. " - "Reconnecting in 5s" % - (name, exchange, e)) + logger.error("!!!!Exception!!!!") + logger.exception( + "name=%s, exchange=%s, exception=%s. " + "Reconnecting in 5s" % (name, exchange, e)) exit_or_sleep(exit_on_exception) - LOG.debug("Completed processing on '%s %s'" % (name, exchange)) - except: - LOG.error("!!!!Exception!!!!") + logger.debug("Completed processing on '%s %s'" % + (name, exchange)) + except Exception: + logger.error("!!!!Exception!!!!") e = sys.exc_info()[0] msg = "Uncaught exception: deployment=%s, exchange=%s, " \ "exception=%s. Retrying in 5s" - LOG.exception(msg % (name, exchange, e)) + logger.exception(msg % (name, exchange, e)) exit_or_sleep(exit_on_exception) POST_PROCESS_METHODS = {