Implemented multiprocess logging

This commit is contained in:
Anuj Mathur 2013-11-05 18:24:20 +05:30
parent 8909e698a5
commit baaba5d3e7
12 changed files with 320 additions and 207 deletions

View File

@ -20,8 +20,13 @@
import logging
import logging.handlers
import multiprocessing
import threading
import traceback
import sys
LOGGERS = {}
LOGGER_QUEUE_MAP = {}
default_logger_location = '/var/log/stacktach/%s.log'
default_logger_name = 'stacktach-default'
@ -36,34 +41,46 @@ def set_default_logger_name(name):
default_logger_name = name
def _logger_factory(exchange, name):
if exchange:
return ExchangeLogger(exchange, name)
class ParentLoggerDoesNotExist(Exception):
def __init__(self, parent_logger_name):
self.reason = "Cannot create child logger as parent logger with the" \
"name %s does not exist." % parent_logger_name
def _create_parent_logger(parent_logger_name):
if parent_logger_name not in LOGGERS:
logger = _create_timed_rotating_logger(parent_logger_name)
LOGGERS[parent_logger_name] = logger
LOGGER_QUEUE_MAP[parent_logger_name] = multiprocessing.Queue(-1)
return LOGGERS[parent_logger_name]
def _create_child_logger(parent_logger_name):
child_logger_name = "child_%s" % parent_logger_name
if child_logger_name in LOGGERS:
return LOGGERS[child_logger_name]
if parent_logger_name in LOGGERS:
queue = LOGGER_QUEUE_MAP[parent_logger_name]
logger = _create_queue_logger(child_logger_name, queue)
LOGGERS[child_logger_name] = logger
else:
logger = logging.getLogger(__name__)
_configure(logger, name)
return logger
raise ParentLoggerDoesNotExist(parent_logger_name)
return LOGGERS[child_logger_name]
def _make_logger(name, exchange=None):
log = _logger_factory(exchange, name)
return log
def _logger_factory(parent_logger_name, is_parent):
if parent_logger_name is None:
parent_logger_name = default_logger_name
if is_parent:
return _create_parent_logger(parent_logger_name)
else:
return _create_child_logger(parent_logger_name)
def init_logger(name=None, exchange=None):
global LOGGERS
if name is None:
name = default_logger_name
if name not in LOGGERS:
LOGGERS[name] = _make_logger(name, exchange)
def get_logger(name=None, exchange=None):
global LOGGERS
if name is None:
name = default_logger_name
init_logger(name=name, exchange=exchange)
return LOGGERS[name]
def get_logger(name=None, is_parent=True):
return _logger_factory(name, is_parent)
def warn(msg, name=None):
@ -84,36 +101,87 @@ def info(msg, name=None):
get_logger(name=name).info(msg)
def _configure(logger, name):
logger.setLevel(logging.DEBUG)
handler = logging.handlers.TimedRotatingFileHandler(
default_logger_location % name,
when='midnight', interval=1, backupCount=3)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.handlers[0].doRollover()
def _create_timed_rotating_logger(name):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = logging.handlers.TimedRotatingFileHandler(
default_logger_location % name,
when='midnight', interval=1, backupCount=3)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
logger.handlers[0].doRollover()
return logger
class ExchangeLogger():
def __init__(self, exchange, name='stacktach-default'):
self.logger = logging.getLogger(__name__)
_configure(self.logger, name)
self.exchange = exchange
def _create_queue_logger(name, queue):
logger = logging.getLogger(name)
logger.setLevel(logging.DEBUG)
handler = QueueHandler(queue)
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)
logger.addHandler(handler)
return logger
def info(self, msg, *args, **kwargs):
msg = self.exchange + ': ' + msg
self.logger.info(msg, *args, **kwargs)
def warn(self, msg, *args, **kwargs):
msg = self.exchange + ': ' + msg
self.logger.warn(msg, *args, **kwargs)
class QueueHandler(logging.Handler):
def __init__(self, queue):
logging.Handler.__init__(self)
self.queue = queue
def error(self, msg, *args, **kwargs):
msg = self.exchange + ': ' + msg
self.logger.error(msg, *args, **kwargs)
def emit(self, record):
try:
# ensure that exc_info and args
# have been stringified. Removes any chance of
# unpickleable things inside and possibly reduces
# message size sent over the pipe
if record.exc_info:
# just to get traceback text into record.exc_text
self.format(record)
# remove exception info as it's not needed any more
record.exc_info = None
if record.args:
record.msg = record.msg % record.args
record.args = None
self.queue.put_nowait(record)
except (KeyboardInterrupt, SystemExit):
raise
except:
self.handleError(record)
def exception(self, msg, *args, **kwargs):
msg = self.exchange + ': ' + msg
self.logger.error(msg, *args, **kwargs)
class LogListener:
def __init__(self, logger):
self.logger = logger
self.queue = get_queue(logger.name)
def start(self):
self.thread = threading.Thread(target=self._receive)
self.thread.daemon = True
self.thread.start()
def _receive(self):
while True:
try:
record = self.queue.get()
# None is sent as a sentinel to tell the listener to quit
if record is None:
break
self.logger.handle(record)
except (KeyboardInterrupt, SystemExit):
raise
except EOFError:
break
except:
traceback.print_exc(file=sys.stderr)
def end(self):
self.queue.put_nowait(None)
self.thread.join()
self.logger.handler.close()
def get_queue(logger_name):
return LOGGER_QUEUE_MAP[logger_name]

View File

@ -3,6 +3,7 @@ import time
from django.db import transaction
import mox
from stacktach import message_service
from stacktach import stacklog
from tests.unit import StacktachBaseTestCase
from tests.unit.utils import HOST, PORT, VIRTUAL_HOST, USERID, PASSWORD, TICK_TIME, SETTLE_TIME, SETTLE_UNITS
from tests.unit.utils import make_verifier_config
@ -139,6 +140,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_run_notifications(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
self._mock_exchange_create_and_connect(self.verifier_with_notifications)
self.mox.StubOutWithMock(self.verifier_with_notifications, '_run')
self.verifier_with_notifications._run(callback=mox.Not(mox.Is(None)))
@ -147,6 +150,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_run_notifications_with_routing_keys(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
self._mock_exchange_create_and_connect(self.verifier_with_notifications)
self.mox.StubOutWithMock(self.verifier_with_notifications, '_run')
self.verifier_with_notifications._run(callback=mox.Not(mox.Is(None)))
@ -155,6 +160,8 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_run_no_notifications(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
self.mox.StubOutWithMock(self.verifier_without_notifications, '_run')
self.verifier_without_notifications._run()
self.mox.ReplayAll()
@ -162,6 +169,11 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_run_full_no_notifications(self):
mock_logger = self._create_mock_logger()
mock_logger.info('None: N: None, P: 0, S: 2, E: 0')
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
self.mox.StubOutWithMock(transaction, 'commit_on_success')
tran = self.mox.CreateMockAnything()
tran.__enter__().AndReturn(tran)
@ -196,7 +208,17 @@ class BaseVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def _create_mock_logger(self):
mock_logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(stacklog, 'get_logger')
return mock_logger
def test_run_full(self):
mock_logger = self._create_mock_logger()
mock_logger.info('exchange: N: None, P: 0, S: 2, E: 0')
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
self.mox.StubOutWithMock(transaction, 'commit_on_success')
tran = self.mox.CreateMockAnything()
tran.__enter__().AndReturn(tran)

View File

@ -21,12 +21,14 @@ from datetime import datetime
import decimal
import json
import logging
import uuid
import kombu
import mox
from stacktach import datetime_to_decimal as dt
from stacktach import stacklog
from stacktach import models
from tests.unit import StacktachBaseTestCase
from utils import IMAGE_UUID_1
@ -60,6 +62,12 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.mox.UnsetStubs()
self.verifier = None
def _setup_mock_logger(self):
mock_logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(stacklog, 'get_logger')
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
return mock_logger
def test_verify_usage_should_not_raise_exception_on_success(self):
exist = self.mox.CreateMockAnything()
exist.created_at = decimal.Decimal('1.1')
@ -427,6 +435,11 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.assertTrue(verified)
def test_verify_exist_marks_exist_failed_if_field_mismatch_exception(self):
mock_logger = self._setup_mock_logger()
self.mox.StubOutWithMock(mock_logger, 'info')
mock_logger.exception("glance: Expected field to be 'expected' "
"got 'actual'")
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
@ -450,6 +463,10 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.assertFalse(verified)
def test_verify_for_range_without_callback(self):
mock_logger = self._setup_mock_logger()
self.mox.StubOutWithMock(mock_logger, 'info')
mock_logger.info('glance: Adding 2 per-owner exists to queue.')
when_max = datetime.utcnow()
models.ImageExists.VERIFYING = 'verifying'
models.ImageExists.PENDING = 'pending'
@ -477,6 +494,10 @@ class GlanceVerifierTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_for_range_with_callback(self):
mock_logger = self._setup_mock_logger()
self.mox.StubOutWithMock(mock_logger, 'info')
mock_logger.info('glance: Adding 2 per-owner exists to queue.')
callback = self.mox.CreateMockAnything()
when_max = datetime.utcnow()
models.ImageExists.PENDING = 'pending'

View File

@ -29,6 +29,7 @@ import kombu.pools
import mox
from stacktach import datetime_to_decimal as dt
from stacktach import stacklog
from stacktach import models
from tests.unit import StacktachBaseTestCase
from utils import make_verifier_config
@ -632,6 +633,11 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
self.verifier = nova_verifier.NovaVerifier(config,
pool=self.pool, reconciler=self.reconciler)
def _create_mock_logger(self):
mock_logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(stacklog, 'get_logger')
return mock_logger
def tearDown(self):
self.mox.UnsetStubs()
self.verifier = None
@ -719,6 +725,10 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_fail_with_reconciled_data_exception(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
mock_logger.exception("nova: message")
exist = self.mox.CreateMockAnything()
exist.launched_at = decimal.Decimal('1.1')
self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch')
@ -728,7 +738,7 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
nova_verifier._verify_for_launch(exist).AndRaise(verify_exception)
self.mox.StubOutWithMock(nova_verifier, '_verify_with_reconciled_data')
nova_verifier._verify_with_reconciled_data(exist)\
.AndRaise(Exception())
.AndRaise(Exception("message"))
exist.mark_failed(reason='Exception')
self.mox.ReplayAll()
result, exists = nova_verifier._verify(exist, 'none')
@ -754,12 +764,16 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_exception_during_launch(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
mock_logger.exception("nova: message")
exist = self.mox.CreateMockAnything()
exist.launched_at = decimal.Decimal('1.1')
self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch')
self.mox.StubOutWithMock(nova_verifier, '_verify_for_delete')
self.mox.StubOutWithMock(exist, 'mark_failed')
nova_verifier._verify_for_launch(exist).AndRaise(Exception())
nova_verifier._verify_for_launch(exist).AndRaise(Exception("message"))
exist.mark_failed(reason='Exception')
self.mox.ReplayAll()
result, exists = nova_verifier._verify(exist, 'none')
@ -767,44 +781,29 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_exception_during_delete(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
mock_logger.exception("nova: message")
exist = self.mox.CreateMockAnything()
exist.launched_at = decimal.Decimal('1.1')
self.mox.StubOutWithMock(nova_verifier, '_verify_for_launch')
self.mox.StubOutWithMock(nova_verifier, '_verify_for_delete')
self.mox.StubOutWithMock(exist, 'mark_failed')
nova_verifier._verify_for_launch(exist)
nova_verifier._verify_for_delete(exist).AndRaise(Exception())
nova_verifier._verify_for_delete(exist).AndRaise(Exception("message"))
exist.mark_failed(reason='Exception')
self.mox.ReplayAll()
result, exists = nova_verifier._verify(exist, 'none')
self.assertFalse(result)
self.mox.VerifyAll()
def test_verify_for_range_without_callback(self):
when_max = datetime.datetime.utcnow()
results = self.mox.CreateMockAnything()
models.InstanceExists.PENDING = 'pending'
models.InstanceExists.VERIFYING = 'verifying'
models.InstanceExists.find(
ending_max=when_max, status='pending').AndReturn(results)
results.count().AndReturn(2)
exist1 = self.mox.CreateMockAnything()
exist2 = self.mox.CreateMockAnything()
results.__getslice__(0, 1000).AndReturn(results)
results.__iter__().AndReturn([exist1, exist2].__iter__())
exist1.update_status('verifying')
exist2.update_status('verifying')
exist1.save()
exist2.save()
self.pool.apply_async(nova_verifier._verify, args=(exist1, 'all'),
callback=None)
self.pool.apply_async(nova_verifier._verify, args=(exist2, 'all'),
callback=None)
self.mox.ReplayAll()
self.verifier.verify_for_range(when_max)
self.mox.VerifyAll()
def test_verify_for_range_without_callback(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
mock_logger.info('nova: Adding 2 exists to queue.')
when_max = datetime.datetime.utcnow()
results = self.mox.CreateMockAnything()
models.InstanceExists.PENDING = 'pending'
@ -829,6 +828,10 @@ class NovaVerifierVerifyTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_verify_for_range_with_callback(self):
mock_logger = self._create_mock_logger()
stacklog.get_logger('verifier', is_parent=False).AndReturn(mock_logger)
mock_logger.info("nova: Adding 2 exists to queue.")
callback = self.mox.CreateMockAnything()
when_max = datetime.datetime.utcnow()
results = self.mox.CreateMockAnything()

View File

@ -1,11 +1,9 @@
import glob
import logging
import os
import mox
from stacktach import stacklog
from stacktach.stacklog import ExchangeLogger
from tests.unit import StacktachBaseTestCase
class StacklogTestCase(StacktachBaseTestCase):
def setUp(self):
self.mox = mox.Mox()
@ -13,88 +11,47 @@ class StacklogTestCase(StacktachBaseTestCase):
def tearDown(self):
self.mox.UnsetStubs()
def test_get_logger_should_get_exchange_logger_if_exchange_provided(self):
filename = 'filename'
logger = stacklog.get_logger(filename, 'nova')
self.assertIsInstance(logger, ExchangeLogger)
for file in glob.glob('{0}.log*'.format(filename)):
os.remove(file)
def test_get_logger_should_create_timed_rotating_logger_for_parent(self):
logger_name = 'logger'
logger = stacklog.get_logger(logger_name, is_parent=True)
self.assertIsInstance(
logger.handlers[0], logging.handlers.TimedRotatingFileHandler)
self.assertEquals(logger.handlers[0].when, 'MIDNIGHT')
self.assertEquals(logger.handlers[0].interval, 86400)
self.assertEquals(logger.handlers[0].backupCount, 3)
self.assertEqual(logger.name, 'logger')
self.assertEquals(logger.level, logging.DEBUG)
def test_get_logger_should_get_default_logger_if_exchange_not_provided(self):
filename = 'default_logger'
logger = stacklog.get_logger(filename)
self.assertIsInstance(logger, logging.Logger)
for file in glob.glob('{0}.log*'.format(filename)):
os.remove(file)
def test_get_logger_should_create_queue_logger_for_child(self):
logger_name = 'logger'
stacklog.get_logger(logger_name, is_parent=True)
child_logger = stacklog.get_logger(logger_name, is_parent=False)
self.assertIsInstance(
child_logger.handlers[0], stacklog.QueueHandler)
self.assertEqual(child_logger.name, 'child_logger')
self.assertEquals(child_logger.level, logging.DEBUG)
def test_get_logger_should_use_default_name_when_name_not_specified(self):
logger = stacklog.get_logger(None, is_parent=True)
self.assertEquals(logger.name, 'stacktach-default')
class ExchangeLoggerTestCase(StacktachBaseTestCase):
def setUp(self):
self.mox = mox.Mox()
stacklog.set_default_logger_name('default')
logger = stacklog.get_logger(None, is_parent=True)
self.assertEquals(logger.name, 'default')
def tearDown(self):
self.mox.UnsetStubs()
def test_get_logger_raise_exception_when_child_created_before_parent(self):
with self.assertRaises(stacklog.ParentLoggerDoesNotExist):
stacklog.get_logger('logger', is_parent=False)
def _setup_logger_mocks(self, name='name'):
mock_logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(logging, 'getLogger')
logging.getLogger(stacklog.__name__).AndReturn(mock_logger)
mock_logger.setLevel(logging.DEBUG)
self.mox.StubOutClassWithMocks(logging.handlers,
'TimedRotatingFileHandler')
filename = "/tmp/{0}.log".format(name)
handler = logging.handlers.TimedRotatingFileHandler(
filename, backupCount=3, interval=1, when='midnight')
self.mox.StubOutClassWithMocks(logging, 'Formatter')
mock_formatter = logging.Formatter(
"%(asctime)s - %(name)s - %(levelname)s - %(message)s")
handler.setFormatter(mock_formatter)
mock_logger.addHandler(handler)
mock_logger.handlers = [handler]
handler.doRollover()
return mock_logger
def test_get_logger_should_return_existing_parent_logger_if_present(self):
logger_1 = stacklog.get_logger('logger', is_parent=True)
logger_2 = stacklog.get_logger('logger', is_parent=True)
def test_exchange_logger_should_append_exchange_name_to_info(self):
mock_logger = self._setup_logger_mocks()
mock_logger.info('exchange: Log %s', 'args', xyz='xyz')
self.mox.ReplayAll()
log = ExchangeLogger('exchange', 'name')
log.info("Log %s", 'args', xyz='xyz')
self.mox.VerifyAll()
def test_exchange_logger_should_append_exchange_name_to_warn(self):
mock_logger = self._setup_logger_mocks()
mock_logger.warn('exchange: Log %s', 'args', xyz='xyz')
self.mox.ReplayAll()
logger = ExchangeLogger('exchange', 'name')
logger.warn("Log %s", 'args', xyz='xyz')
self.mox.VerifyAll()
def test_exchange_logger_should_append_exchange_name_to_error(self):
mock_logger = self._setup_logger_mocks()
mock_logger.error('exchange: Log %s', 'args', xyz='xyz')
self.mox.ReplayAll()
logger = ExchangeLogger('exchange', 'name')
logger.error("Log %s", 'args', xyz='xyz')
self.mox.VerifyAll()
def test_exchange_logger_should_append_exchange_name_to_exception(self):
mock_logger = self._setup_logger_mocks()
mock_logger.error('exchange: Log %s', 'args', xyz='xyz')
self.mox.ReplayAll()
logger = ExchangeLogger('exchange', 'name')
logger.exception("Log %s", 'args', xyz='xyz')
self.mox.VerifyAll()
def test_exchange_logger_should_use_default_name_if_not_provided(self):
self._setup_logger_mocks('stacktach-default')
self.mox.ReplayAll()
ExchangeLogger('exchange')
self.mox.VerifyAll()
self.assertIs(logger_1, logger_2)
def test_get_logger_should_return_existing_child_logger_if_present(self):
stacklog.get_logger('logger', is_parent=True)
child_logger_1 = stacklog.get_logger('logger', is_parent=False)
child_logger_2 = stacklog.get_logger('logger', is_parent=False)
self.assertIs(child_logger_1, child_logger_2)

View File

@ -23,7 +23,7 @@ import json
import kombu
import mox
from stacktach import db
from stacktach import db, stacklog
from stacktach import views
import worker.worker as worker
from tests.unit import StacktachBaseTestCase
@ -36,6 +36,12 @@ class ConsumerTestCase(StacktachBaseTestCase):
def tearDown(self):
self.mox.UnsetStubs()
def _setup_mock_logger(self):
mock_logger = self.mox.CreateMockAnything()
self.mox.StubOutWithMock(stacklog, 'get_logger')
stacklog.get_logger('worker', is_parent=False).AndReturn(mock_logger)
return mock_logger
def _test_topics(self):
return [
dict(queue="queue1", routing_key="monitor.info"),
@ -103,7 +109,6 @@ class ConsumerTestCase(StacktachBaseTestCase):
self.assertEqual(actual_queue, queue)
self.mox.VerifyAll()
def test_create_queue_with_queue_args(self):
self.mox.StubOutClassWithMocks(kombu, 'Queue')
exchange = self.mox.CreateMockAnything()
@ -157,6 +162,14 @@ class ConsumerTestCase(StacktachBaseTestCase):
worker.POST_PROCESS_METHODS["RawData"] = old_handler
def test_run(self):
mock_logger = self._setup_mock_logger()
self.mox.StubOutWithMock(mock_logger, 'info')
mock_logger.info('east_coast.prod.global: nova 10.0.0.1 5672 rabbit /')
self.mox.StubOutWithMock(mock_logger, 'debug')
mock_logger.debug("Processing on 'east_coast.prod.global nova'")
mock_logger.debug("Completed processing on "
"'east_coast.prod.global nova'")
config = {
'name': 'east_coast.prod.global',
'durable_queue': False,
@ -197,6 +210,14 @@ class ConsumerTestCase(StacktachBaseTestCase):
self.mox.VerifyAll()
def test_run_queue_args(self):
mock_logger = self._setup_mock_logger()
self.mox.StubOutWithMock(mock_logger, 'info')
mock_logger.info("east_coast.prod.global: nova 10.0.0.1 5672 rabbit /")
self.mox.StubOutWithMock(mock_logger, 'debug')
mock_logger.debug("Processing on 'east_coast.prod.global nova'")
mock_logger.debug("Completed processing on "
"'east_coast.prod.global nova'")
config = {
'name': 'east_coast.prod.global',
'durable_queue': False,

View File

@ -27,6 +27,7 @@ import time
import multiprocessing
from django.db import transaction
from stacktach import message_service
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
@ -39,9 +40,11 @@ from django.db import reset_queries
from django.core import exceptions
from verifier import WrongTypeException
from stacktach import stacklog
from stacktach import stacklog, message_service
LOG = stacklog.get_logger('verifier')
def _get_child_logger():
return stacklog.get_logger('verifier', is_parent=False)
def _has_field(d1, d2, field1, field2=None):
@ -155,10 +158,11 @@ class Verifier(object):
if self.reconciler:
self.reconcile_failed()
msg = "%s: N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
_get_child_logger().info(msg)
time.sleep(tick_time)
def run(self):
logger = _get_child_logger()
if self.enable_notifications:
exchange_name = self.exchange()
exchange = message_service.create_exchange(
@ -182,18 +186,18 @@ class Verifier(object):
break
except exceptions.ObjectDoesNotExist:
if attempt < 1:
LOG.warn("ObjectDoesNotExist in callback, "
logger.warn("ObjectDoesNotExist in callback, "
"attempting to reconnect and try "
"again.")
close_connection()
reset_queries()
else:
LOG.error("ObjectDoesNotExist in callback "
logger.error("ObjectDoesNotExist in callback "
"again, giving up.")
except Exception, e:
msg = "ERROR in Callback %s: %s" % (exchange_name,
e)
LOG.exception(msg)
logger.exception(msg)
break
attempt += 1
try:

View File

@ -37,9 +37,13 @@ from verifier import base_verifier
from verifier import NullFieldException
from verifier import NotFound
from stacktach import datetime_to_decimal as dt
from stacktach import stacklog
from stacktach import message_service
import datetime
from stacktach import stacklog, message_service
LOG = stacklog.get_logger('verifier')
def _get_child_logger():
return stacklog.get_logger('verifier', is_parent=False)
def _verify_field_mismatch(exists, usage):
@ -133,7 +137,7 @@ def _verify(exists):
except Exception, e:
verified = False
exist.mark_failed(reason=e.__class__.__name__)
LOG.exception("glance: %s" % e)
_get_child_logger().exception("glance: %s" % e)
return verified, exists[0]
@ -151,7 +155,7 @@ class GlanceVerifier(Verifier):
added = 0
update_interval = datetime.timedelta(seconds=30)
next_update = datetime.datetime.utcnow() + update_interval
LOG.info("glance: Adding %s per-owner exists to queue." % count)
_get_child_logger().info("glance: Adding %s per-owner exists to queue." % count)
while added < count:
for exists in exists_grouped_by_owner_and_rawid.values():
for exist in exists:
@ -164,7 +168,7 @@ class GlanceVerifier(Verifier):
if datetime.datetime.utcnow() > next_update:
values = ((added,) + self.clean_results())
msg = "glance: N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
_get_child_logger().info(msg)
next_update = datetime.datetime.utcnow() + update_interval
return count

View File

@ -34,13 +34,17 @@ from verifier import base_verifier
from verifier import config
from verifier import NullFieldException
from stacktach import models
from stacktach import stacklog
from stacktach import datetime_to_decimal as dt
from verifier import FieldMismatch
from verifier import AmbiguousResults
from verifier import NotFound
from verifier import VerificationException
from stacktach import stacklog, message_service
LOG = stacklog.get_logger('verifier')
from stacktach import message_service
def _get_child_logger():
return stacklog.get_logger('verifier', is_parent=False)
def _verify_field_mismatch(exists, launch):
@ -178,13 +182,6 @@ def _verify_optional_validity(exist):
base_verifier._is_alphanumeric('os_distro', exist.os_distro, exist.id)
base_verifier._is_alphanumeric('os_version', exist.os_version, exist.id)
def verify_fields_not_null(exist_id, null_value, fields):
for (field_value, field_name) in fields.items():
print "value: %s, name = %s" % (field_value, field_name)
if field_value == null_value:
raise NullFieldException(field_name, exist_id)
def _verify_validity(exist, validation_level):
if validation_level == 'none':
@ -240,7 +237,7 @@ def _attempt_reconciled_verify(exist, orig_e):
exist.mark_failed(reason=str(rec_e))
except Exception, rec_e:
exist.mark_failed(reason=rec_e.__class__.__name__)
LOG.exception("nova: %s" % rec_e)
_get_child_logger().exception("nova: %s" % rec_e)
return verified
@ -260,7 +257,7 @@ def _verify(exist, validation_level):
verified = _attempt_reconciled_verify(exist, orig_e)
except Exception, e:
exist.mark_failed(reason=e.__class__.__name__)
LOG.exception("nova: %s" % e)
_get_child_logger().exception("nova: %s" % e)
return verified, exist
@ -298,7 +295,7 @@ class NovaVerifier(base_verifier.Verifier):
added = 0
update_interval = datetime.timedelta(seconds=30)
next_update = datetime.datetime.utcnow() + update_interval
LOG.info("nova: Adding %s exists to queue." % count)
_get_child_logger().info("nova: Adding %s exists to queue." % count)
while added < count:
for exist in exists[0:1000]:
exist.update_status(models.InstanceExists.VERIFYING)
@ -312,7 +309,7 @@ class NovaVerifier(base_verifier.Verifier):
if datetime.datetime.utcnow() > next_update:
values = ((added,) + self.clean_results())
msg = "nova: N: %s, P: %s, S: %s, E: %s" % values
LOG.info(msg)
_get_child_logger().info(msg)
next_update = datetime.datetime.utcnow() + update_interval
return count

View File

@ -29,7 +29,7 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
os.pardir, os.pardir))
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from stacktach import stacklog
from stacktach import reconciler
from verifier import nova_verifier
from verifier import glance_verifier
@ -42,11 +42,16 @@ except ImportError:
pass
process = None
log_listener = None
processes = []
def _get_parent_logger():
return stacklog.get_logger('verifier', is_parent=True)
def kill_time(signal, frame):
log_listener.end()
print "dying ..."
for process in processes:
process.terminate()
@ -81,6 +86,7 @@ if __name__ == '__main__':
verifier.run()
verifier_config.load()
log_listener = stacklog.LogListener(_get_parent_logger()).start()
for exchange in verifier_config.topics().keys():
process = Process(target=make_and_start_verifier, args=(exchange,))
process.start()

View File

@ -9,16 +9,22 @@ POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'stacktach')):
sys.path.insert(0, POSSIBLE_TOPDIR)
from stacktach import db
from stacktach import db, stacklog
from django.db import close_connection
import worker.worker as worker
from worker import config
processes = []
log_listener = None
def _get_parent_logger():
return stacklog.get_logger('worker', is_parent=True)
def kill_time(signal, frame):
log_listener.end()
print "dying ..."
for process in processes:
process.terminate()
@ -30,7 +36,7 @@ def kill_time(signal, frame):
if __name__ == '__main__':
log_listener = stacklog.LogListener(_get_parent_logger()).start()
for deployment in config.deployments():
if deployment.get('enabled', True):
db_deployment, new = db.get_or_create_deployment(deployment['name'])

View File

@ -34,12 +34,14 @@ except ImportError:
from pympler.process import ProcessMemoryInfo
from stacktach import db, message_service
from stacktach import db
from stacktach import message_service
from stacktach import stacklog
from stacktach import views
stacklog.set_default_logger_name('worker')
LOG = stacklog.get_logger()
def _get_child_logger():
return stacklog.get_logger('worker', is_parent=False)
class Consumer(kombu.mixins.ConsumerMixin):
@ -58,9 +60,10 @@ class Consumer(kombu.mixins.ConsumerMixin):
self.exchange = exchange
def _create_exchange(self, name, type, exclusive=False, auto_delete=False):
return message_service.create_exchange(name, exchange_type=type, exclusive=exclusive,
durable=self.durable,
auto_delete=auto_delete)
return message_service.create_exchange(name, exchange_type=type,
exclusive=exclusive,
durable=self.durable,
auto_delete=auto_delete)
def _create_queue(self, name, nova_exchange, routing_key, exclusive=False,
auto_delete=False):
@ -115,7 +118,7 @@ class Consumer(kombu.mixins.ConsumerMixin):
per_message = 0
if self.total_processed:
per_message = idiff / self.total_processed
LOG.debug("%20s %20s %6dk/%6dk ram, "
_get_child_logger().debug("%20s %20s %6dk/%6dk ram, "
"%3d/%4d msgs @ %6dk/msg" %
(self.name, self.exchange, diff, idiff, self.processed,
self.total_processed, per_message))
@ -126,9 +129,8 @@ class Consumer(kombu.mixins.ConsumerMixin):
try:
self._process(message)
except Exception, e:
LOG.debug("Problem: %s\nFailed message body:\n%s" %
(e, json.loads(str(message.body)))
)
_get_child_logger().debug("Problem: %s\nFailed message body:\n%s" %
(e, json.loads(str(message.body))))
raise
@ -153,12 +155,13 @@ def run(deployment_config, deployment_id, exchange):
queue_arguments = deployment_config.get('queue_arguments', {})
exit_on_exception = deployment_config.get('exit_on_exception', False)
topics = deployment_config.get('topics', {})
logger = _get_child_logger()
deployment = db.get_deployment(deployment_id)
print "Starting worker for '%s %s'" % (name, exchange)
LOG.info("%s: %s %s %s %s %s" % (name, exchange, host, port, user_id,
virtual_host))
logger.info("%s: %s %s %s %s %s" %
(name, exchange, host, port, user_id, virtual_host))
params = dict(hostname=host,
port=port,
@ -170,7 +173,7 @@ def run(deployment_config, deployment_id, exchange):
# continue_running() is used for testing
while continue_running():
try:
LOG.debug("Processing on '%s %s'" % (name, exchange))
logger.debug("Processing on '%s %s'" % (name, exchange))
with kombu.connection.BrokerConnection(**params) as conn:
try:
consumer = Consumer(name, conn, deployment, durable,
@ -178,18 +181,19 @@ def run(deployment_config, deployment_id, exchange):
topics[exchange])
consumer.run()
except Exception as e:
LOG.error("!!!!Exception!!!!")
LOG.exception("name=%s, exchange=%s, exception=%s. "
"Reconnecting in 5s" %
(name, exchange, e))
logger.error("!!!!Exception!!!!")
logger.exception(
"name=%s, exchange=%s, exception=%s. "
"Reconnecting in 5s" % (name, exchange, e))
exit_or_sleep(exit_on_exception)
LOG.debug("Completed processing on '%s %s'" % (name, exchange))
except:
LOG.error("!!!!Exception!!!!")
logger.debug("Completed processing on '%s %s'" %
(name, exchange))
except Exception:
logger.error("!!!!Exception!!!!")
e = sys.exc_info()[0]
msg = "Uncaught exception: deployment=%s, exchange=%s, " \
"exception=%s. Retrying in 5s"
LOG.exception(msg % (name, exchange, e))
logger.exception(msg % (name, exchange, e))
exit_or_sleep(exit_on_exception)
POST_PROCESS_METHODS = {