From 3cff1e7b9723a9a02d3e46cb56d7906134e407d4 Mon Sep 17 00:00:00 2001 From: Yong Sheng Gong Date: Sun, 15 Jul 2012 08:17:34 +0800 Subject: [PATCH] Introduce files from openstack common. Because the openstack common project does not stick to latest pep8 rules, I have to exclude some bad-ruled files in the tox.ini and run_tests.sh. However pep8 does not support exclude option in the format quantum/openstack/common, so I have to exclude some of openstack common files one by one. Also, I have changed the Qunatum Context to base on the common context. I does not update the setup.py in our openstack common dir and the one under quantum top dir, since it should be maintained in a consistent way across all of openstack projects. After this introduction, we are ready for notification feature. Change-Id: I2729c2dc3958835374c88d704e842e613785ec14 --- etc/quantum.conf | 2 +- openstack-common.conf | 4 +- quantum/context.py | 8 +- quantum/openstack/common/cfg.py | 4 +- quantum/openstack/common/importutils.py | 4 +- quantum/openstack/common/jsonutils.py | 10 +- quantum/openstack/common/log.py | 459 ++++++++++++++++++ quantum/openstack/common/notifier/__init__.py | 14 + quantum/openstack/common/notifier/api.py | 142 ++++++ .../common/notifier/list_notifier.py | 117 +++++ .../openstack/common/notifier/log_notifier.py | 35 ++ .../common/notifier/no_op_notifier.py | 19 + .../common/notifier/rabbit_notifier.py | 46 ++ .../common/notifier/test_notifier.py | 22 + quantum/openstack/common/policy.py | 1 + quantum/openstack/common/rpc/__init__.py | 3 +- quantum/openstack/common/rpc/amqp.py | 1 + quantum/openstack/common/rpc/common.py | 2 +- quantum/openstack/common/rpc/dispatcher.py | 39 ++ quantum/openstack/common/rpc/impl_zmq.py | 31 +- quantum/openstack/common/rpc/matchmaker.py | 1 + quantum/openstack/common/timeutils.py | 109 +++++ run_tests.sh | 6 + tools/pip-requires | 1 + tox.ini | 2 +- 25 files changed, 1056 insertions(+), 26 deletions(-) create mode 100644 quantum/openstack/common/log.py create mode 100644 quantum/openstack/common/notifier/__init__.py create mode 100644 quantum/openstack/common/notifier/api.py create mode 100644 quantum/openstack/common/notifier/list_notifier.py create mode 100644 quantum/openstack/common/notifier/log_notifier.py create mode 100644 quantum/openstack/common/notifier/no_op_notifier.py create mode 100644 quantum/openstack/common/notifier/rabbit_notifier.py create mode 100644 quantum/openstack/common/notifier/test_notifier.py create mode 100644 quantum/openstack/common/timeutils.py diff --git a/etc/quantum.conf b/etc/quantum.conf index 643b0be9e2..2b3c47bdd1 100644 --- a/etc/quantum.conf +++ b/etc/quantum.conf @@ -16,7 +16,7 @@ bind_port = 9696 # api_extensions_path = extensions:/path/to/more/extensions:/even/more/extensions # The __path__ of quantum.extensions is appended to this, so if your # extensions are in there you don't need to specify them here -api_extensions_path = +# api_extensions_path = # Quantum plugin provider module core_plugin = quantum.plugins.sample.SamplePlugin.FakePlugin diff --git a/openstack-common.conf b/openstack-common.conf index 4674d6e22b..e85fe9b02c 100644 --- a/openstack-common.conf +++ b/openstack-common.conf @@ -1,7 +1,5 @@ [DEFAULT] - # The list of modules to copy from openstack-common -modules=cfg,context,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,policy,rpc,setup - +modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,notifier,timeutils,log,context,local,rpc,gettextutils,excutils # The base module to hold the copy of openstack.common base=quantum diff --git a/quantum/context.py b/quantum/context.py index 7e9d4a341b..c2555f5232 100644 --- a/quantum/context.py +++ b/quantum/context.py @@ -23,11 +23,11 @@ import logging from datetime import datetime from quantum.db import api as db_api - +from quantum.openstack.common import context as common_context LOG = logging.getLogger(__name__) -class Context(object): +class Context(common_context.RequestContext): """Security context and request information. Represents the user taking a given action within the system. @@ -44,11 +44,11 @@ class Context(object): if kwargs: LOG.warn(_('Arguments dropped when creating ' 'context: %s') % str(kwargs)) - + super(Context, self).__init__(user=user_id, tenant=tenant_id, + is_admin=is_admin) self.user_id = user_id self.tenant_id = tenant_id self.roles = roles or [] - self.is_admin = is_admin if self.is_admin is None: self.is_admin = 'admin' in [x.lower() for x in self.roles] elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]: diff --git a/quantum/openstack/common/cfg.py b/quantum/openstack/common/cfg.py index b1493a31c0..9ad115f0f5 100644 --- a/quantum/openstack/common/cfg.py +++ b/quantum/openstack/common/cfg.py @@ -42,8 +42,8 @@ Options can be strings, integers, floats, booleans, lists or 'multi strings':: osapi_compute_extension_opt = cfg.MultiStrOpt('osapi_compute_extension', default=DEFAULT_EXTENSIONS) -Option schemas are registered with with the config manager at runtime, but -before the option is referenced:: +Option schemas are registered with the config manager at runtime, but before +the option is referenced:: class ExtensionManager(object): diff --git a/quantum/openstack/common/importutils.py b/quantum/openstack/common/importutils.py index 67d94ad5f9..2fbb0291a0 100644 --- a/quantum/openstack/common/importutils.py +++ b/quantum/openstack/common/importutils.py @@ -20,6 +20,7 @@ Import related utilities and helper functions. """ import sys +import traceback def import_class(import_str): @@ -30,7 +31,8 @@ def import_class(import_str): return getattr(sys.modules[mod_str], class_str) except (ImportError, ValueError, AttributeError), exc: raise ImportError('Class %s cannot be found (%s)' % - (class_str, str(exc))) + (class_str, + traceback.format_exception(*sys.exc_info()))) def import_object(import_str, *args, **kwargs): diff --git a/quantum/openstack/common/jsonutils.py b/quantum/openstack/common/jsonutils.py index 7522669811..c78d5f0aee 100644 --- a/quantum/openstack/common/jsonutils.py +++ b/quantum/openstack/common/jsonutils.py @@ -39,6 +39,8 @@ import itertools import json import xmlrpclib +from quantum.openstack.common import timeutils + def to_primitive(value, convert_instances=False, level=0): """Convert a complex object into primitives. @@ -101,7 +103,7 @@ def to_primitive(value, convert_instances=False, level=0): level=level) return o elif isinstance(value, datetime.datetime): - return str(value) + return timeutils.strtime(value) elif hasattr(value, 'iteritems'): return to_primitive(dict(value.iteritems()), convert_instances=convert_instances, @@ -130,11 +132,15 @@ def loads(s): return json.loads(s) +def load(s): + return json.load(s) + + try: import anyjson except ImportError: pass else: anyjson._modules.append((__name__, 'dumps', TypeError, - 'loads', ValueError)) + 'loads', ValueError, 'load')) anyjson.force_implementation(__name__) diff --git a/quantum/openstack/common/log.py b/quantum/openstack/common/log.py new file mode 100644 index 0000000000..7165fe2253 --- /dev/null +++ b/quantum/openstack/common/log.py @@ -0,0 +1,459 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Openstack logging handler. + +This module adds to logging functionality by adding the option to specify +a context object when calling the various log methods. If the context object +is not specified, default formatting is used. Additionally, an instance uuid +may be passed as part of the log message, which is intended to make it easier +for admins to find messages related to a specific instance. + +It also allows setting of formatting information through conf. + +""" + +import cStringIO +import inspect +import itertools +import logging +import logging.config +import logging.handlers +import os +import stat +import sys +import traceback + +from quantum.openstack.common import cfg +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import jsonutils +from quantum.openstack.common import local +from quantum.openstack.common import notifier + + +log_opts = [ + cfg.StrOpt('logging_context_format_string', + default='%(asctime)s %(levelname)s %(name)s [%(request_id)s ' + '%(user_id)s %(project_id)s] %(instance)s' + '%(message)s', + help='format string to use for log messages with context'), + cfg.StrOpt('logging_default_format_string', + default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s' + '%(message)s', + help='format string to use for log messages without context'), + cfg.StrOpt('logging_debug_format_suffix', + default='from (pid=%(process)d) %(funcName)s ' + '%(pathname)s:%(lineno)d', + help='data to append to log format when level is DEBUG'), + cfg.StrOpt('logging_exception_prefix', + default='%(asctime)s TRACE %(name)s %(instance)s', + help='prefix each line of exception output with this format'), + cfg.ListOpt('default_log_levels', + default=[ + 'amqplib=WARN', + 'sqlalchemy=WARN', + 'boto=WARN', + 'suds=INFO', + 'keystone=INFO', + 'eventlet.wsgi.server=WARN' + ], + help='list of logger=LEVEL pairs'), + cfg.BoolOpt('publish_errors', + default=False, + help='publish error events'), + + # NOTE(mikal): there are two options here because sometimes we are handed + # a full instance (and could include more information), and other times we + # are just handed a UUID for the instance. + cfg.StrOpt('instance_format', + default='[instance: %(uuid)s] ', + help='If an instance is passed with the log message, format ' + 'it like this'), + cfg.StrOpt('instance_uuid_format', + default='[instance: %(uuid)s] ', + help='If an instance UUID is passed with the log message, ' + 'format it like this'), + ] + + +generic_log_opts = [ + cfg.StrOpt('logdir', + default=None, + help='Log output to a per-service log file in named directory'), + cfg.StrOpt('logfile', + default=None, + help='Log output to a named file'), + cfg.BoolOpt('use_stderr', + default=True, + help='Log output to standard error'), + cfg.StrOpt('logfile_mode', + default='0644', + help='Default file mode used when creating log files'), + ] + + +CONF = cfg.CONF +CONF.register_opts(generic_log_opts) +CONF.register_opts(log_opts) + +# our new audit level +# NOTE(jkoelker) Since we synthesized an audit level, make the logging +# module aware of it so it acts like other levels. +logging.AUDIT = logging.INFO + 1 +logging.addLevelName(logging.AUDIT, 'AUDIT') + + +try: + NullHandler = logging.NullHandler +except AttributeError: # NOTE(jkoelker) NullHandler added in Python 2.7 + class NullHandler(logging.Handler): + def handle(self, record): + pass + + def emit(self, record): + pass + + def createLock(self): + self.lock = None + + +def _dictify_context(context): + if context is None: + return None + if not isinstance(context, dict) and getattr(context, 'to_dict', None): + context = context.to_dict() + return context + + +def _get_binary_name(): + return os.path.basename(inspect.stack()[-1][1]) + + +def _get_log_file_path(binary=None): + logfile = CONF.log_file or CONF.logfile + logdir = CONF.log_dir or CONF.logdir + + if logfile and not logdir: + return logfile + + if logfile and logdir: + return os.path.join(logdir, logfile) + + if logdir: + binary = binary or _get_binary_name() + return '%s.log' % (os.path.join(logdir, binary),) + + +class ContextAdapter(logging.LoggerAdapter): + warn = logging.LoggerAdapter.warning + + def __init__(self, logger, project_name, version_string): + self.logger = logger + self.project = project_name + self.version = version_string + + def audit(self, msg, *args, **kwargs): + self.log(logging.AUDIT, msg, *args, **kwargs) + + def process(self, msg, kwargs): + if 'extra' not in kwargs: + kwargs['extra'] = {} + extra = kwargs['extra'] + + context = kwargs.pop('context', None) + if not context: + context = getattr(local.store, 'context', None) + if context: + extra.update(_dictify_context(context)) + + instance = kwargs.pop('instance', None) + instance_extra = '' + if instance: + instance_extra = CONF.instance_format % instance + else: + instance_uuid = kwargs.pop('instance_uuid', None) + if instance_uuid: + instance_extra = (CONF.instance_uuid_format + % {'uuid': instance_uuid}) + extra.update({'instance': instance_extra}) + + extra.update({"project": self.project}) + extra.update({"version": self.version}) + extra['extra'] = extra.copy() + return msg, kwargs + + +class JSONFormatter(logging.Formatter): + def __init__(self, fmt=None, datefmt=None): + # NOTE(jkoelker) we ignore the fmt argument, but its still there + # since logging.config.fileConfig passes it. + self.datefmt = datefmt + + def formatException(self, ei, strip_newlines=True): + lines = traceback.format_exception(*ei) + if strip_newlines: + lines = [itertools.ifilter(lambda x: x, + line.rstrip().splitlines()) + for line in lines] + lines = list(itertools.chain(*lines)) + return lines + + def format(self, record): + message = {'message': record.getMessage(), + 'asctime': self.formatTime(record, self.datefmt), + 'name': record.name, + 'msg': record.msg, + 'args': record.args, + 'levelname': record.levelname, + 'levelno': record.levelno, + 'pathname': record.pathname, + 'filename': record.filename, + 'module': record.module, + 'lineno': record.lineno, + 'funcname': record.funcName, + 'created': record.created, + 'msecs': record.msecs, + 'relative_created': record.relativeCreated, + 'thread': record.thread, + 'thread_name': record.threadName, + 'process_name': record.processName, + 'process': record.process, + 'traceback': None} + + if hasattr(record, 'extra'): + message['extra'] = record.extra + + if record.exc_info: + message['traceback'] = self.formatException(record.exc_info) + + return jsonutils.dumps(message) + + +class PublishErrorsHandler(logging.Handler): + def emit(self, record): + if 'list_notifier_drivers' in CONF: + if ('quantum.openstack.common.notifier.log_notifier' in + CONF.list_notifier_drivers): + return + notifier.api.notify(None, 'error.publisher', + 'error_notification', + notifier.api.ERROR, + dict(error=record.msg)) + + +def handle_exception(type, value, tb): + extra = {} + if CONF.verbose: + extra['exc_info'] = (type, value, tb) + getLogger().critical(str(value), **extra) + + +def setup(product_name): + """Setup logging.""" + sys.excepthook = handle_exception + + if CONF.log_config: + try: + logging.config.fileConfig(CONF.log_config) + except Exception: + traceback.print_exc() + raise + else: + _setup_logging_from_conf(product_name) + + +def _find_facility_from_conf(): + facility_names = logging.handlers.SysLogHandler.facility_names + facility = getattr(logging.handlers.SysLogHandler, + CONF.syslog_log_facility, + None) + + if facility is None and CONF.syslog_log_facility in facility_names: + facility = facility_names.get(CONF.syslog_log_facility) + + if facility is None: + valid_facilities = facility_names.keys() + consts = ['LOG_AUTH', 'LOG_AUTHPRIV', 'LOG_CRON', 'LOG_DAEMON', + 'LOG_FTP', 'LOG_KERN', 'LOG_LPR', 'LOG_MAIL', 'LOG_NEWS', + 'LOG_AUTH', 'LOG_SYSLOG', 'LOG_USER', 'LOG_UUCP', + 'LOG_LOCAL0', 'LOG_LOCAL1', 'LOG_LOCAL2', 'LOG_LOCAL3', + 'LOG_LOCAL4', 'LOG_LOCAL5', 'LOG_LOCAL6', 'LOG_LOCAL7'] + valid_facilities.extend(consts) + raise TypeError(_('syslog facility must be one of: %s') % + ', '.join("'%s'" % fac + for fac in valid_facilities)) + + return facility + + +def _setup_logging_from_conf(product_name): + log_root = getLogger(product_name).logger + for handler in log_root.handlers: + log_root.removeHandler(handler) + + if CONF.use_syslog: + facility = _find_facility_from_conf() + syslog = logging.handlers.SysLogHandler(address='/dev/log', + facility=facility) + log_root.addHandler(syslog) + + logpath = _get_log_file_path() + if logpath: + filelog = logging.handlers.WatchedFileHandler(logpath) + log_root.addHandler(filelog) + + mode = int(CONF.logfile_mode, 8) + st = os.stat(logpath) + if st.st_mode != (stat.S_IFREG | mode): + os.chmod(logpath, mode) + + if CONF.use_stderr: + streamlog = ColorHandler() + log_root.addHandler(streamlog) + + elif not CONF.log_file: + # pass sys.stdout as a positional argument + # python2.6 calls the argument strm, in 2.7 it's stream + streamlog = logging.StreamHandler(sys.stdout) + log_root.addHandler(streamlog) + + if CONF.publish_errors: + log_root.addHandler(PublishErrorsHandler(logging.ERROR)) + + for handler in log_root.handlers: + datefmt = CONF.log_date_format + if CONF.log_format: + handler.setFormatter(logging.Formatter(fmt=CONF.log_format, + datefmt=datefmt)) + handler.setFormatter(LegacyFormatter(datefmt=datefmt)) + + if CONF.verbose or CONF.debug: + log_root.setLevel(logging.DEBUG) + else: + log_root.setLevel(logging.INFO) + + level = logging.NOTSET + for pair in CONF.default_log_levels: + mod, _sep, level_name = pair.partition('=') + level = logging.getLevelName(level_name) + logger = logging.getLogger(mod) + logger.setLevel(level) + for handler in log_root.handlers: + logger.addHandler(handler) + + # NOTE(jkoelker) Clear the handlers for the root logger that was setup + # by basicConfig in nova/__init__.py and install the + # NullHandler. + root = logging.getLogger() + for handler in root.handlers: + root.removeHandler(handler) + handler = NullHandler() + handler.setFormatter(logging.Formatter()) + root.addHandler(handler) + + +_loggers = {} + + +def getLogger(name='unknown', version='unknown'): + if name not in _loggers: + _loggers[name] = ContextAdapter(logging.getLogger(name), + name, + version) + return _loggers[name] + + +class WritableLogger(object): + """A thin wrapper that responds to `write` and logs.""" + + def __init__(self, logger, level=logging.INFO): + self.logger = logger + self.level = level + + def write(self, msg): + self.logger.log(self.level, msg) + + +class LegacyFormatter(logging.Formatter): + """A context.RequestContext aware formatter configured through flags. + + The flags used to set format strings are: logging_context_format_string + and logging_default_format_string. You can also specify + logging_debug_format_suffix to append extra formatting if the log level is + debug. + + For information about what variables are available for the formatter see: + http://docs.python.org/library/logging.html#formatter + + """ + + def format(self, record): + """Uses contextstring if request_id is set, otherwise default.""" + if 'instance' not in record.__dict__: + record.__dict__['instance'] = '' + + if record.__dict__.get('request_id', None): + self._fmt = CONF.logging_context_format_string + else: + self._fmt = CONF.logging_default_format_string + + if (record.levelno == logging.DEBUG and + CONF.logging_debug_format_suffix): + self._fmt += " " + CONF.logging_debug_format_suffix + + # Cache this on the record, Logger will respect our formated copy + if record.exc_info: + record.exc_text = self.formatException(record.exc_info, record) + return logging.Formatter.format(self, record) + + def formatException(self, exc_info, record=None): + """Format exception output with CONF.logging_exception_prefix.""" + if not record: + return logging.Formatter.formatException(self, exc_info) + + stringbuffer = cStringIO.StringIO() + traceback.print_exception(exc_info[0], exc_info[1], exc_info[2], + None, stringbuffer) + lines = stringbuffer.getvalue().split('\n') + stringbuffer.close() + + if CONF.logging_exception_prefix.find('%(asctime)') != -1: + record.asctime = self.formatTime(record, self.datefmt) + + formatted_lines = [] + for line in lines: + pl = CONF.logging_exception_prefix % record.__dict__ + fl = '%s%s' % (pl, line) + formatted_lines.append(fl) + return '\n'.join(formatted_lines) + + +class ColorHandler(logging.StreamHandler): + LEVEL_COLORS = { + logging.DEBUG: '\033[00;32m', # GREEN + logging.INFO: '\033[00;36m', # CYAN + logging.AUDIT: '\033[01;36m', # BOLD CYAN + logging.WARN: '\033[01;33m', # BOLD YELLOW + logging.ERROR: '\033[01;31m', # BOLD RED + logging.CRITICAL: '\033[01;31m', # BOLD RED + } + + def format(self, record): + record.color = self.LEVEL_COLORS[record.levelno] + return logging.StreamHandler.format(self, record) diff --git a/quantum/openstack/common/notifier/__init__.py b/quantum/openstack/common/notifier/__init__.py new file mode 100644 index 0000000000..482d54e4fd --- /dev/null +++ b/quantum/openstack/common/notifier/__init__.py @@ -0,0 +1,14 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. diff --git a/quantum/openstack/common/notifier/api.py b/quantum/openstack/common/notifier/api.py new file mode 100644 index 0000000000..b3e7371edf --- /dev/null +++ b/quantum/openstack/common/notifier/api.py @@ -0,0 +1,142 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import inspect +import uuid + +from quantum.openstack.common import cfg +from quantum.openstack.common import context +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import importutils +from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging +from quantum.openstack.common import timeutils + + +LOG = logging.getLogger(__name__) + +notifier_opts = [ + cfg.StrOpt('notification_driver', + default='quantum.openstack.common.notifier.no_op_notifier', + help='Default driver for sending notifications'), + cfg.StrOpt('default_notification_level', + default='INFO', + help='Default notification level for outgoing notifications'), + cfg.StrOpt('default_publisher_id', + default='$host', + help='Default publisher_id for outgoing notifications'), + ] + +CONF = cfg.CONF +CONF.register_opts(notifier_opts) + +WARN = 'WARN' +INFO = 'INFO' +ERROR = 'ERROR' +CRITICAL = 'CRITICAL' +DEBUG = 'DEBUG' + +log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL) + + +class BadPriorityException(Exception): + pass + + +def notify_decorator(name, fn): + """ decorator for notify which is used from utils.monkey_patch() + + :param name: name of the function + :param function: - object of the function + :returns: function -- decorated function + + """ + def wrapped_func(*args, **kwarg): + body = {} + body['args'] = [] + body['kwarg'] = {} + for arg in args: + body['args'].append(arg) + for key in kwarg: + body['kwarg'][key] = kwarg[key] + + ctxt = context.get_context_from_function_and_args(fn, args, kwarg) + notify(ctxt, + CONF.default_publisher_id, + name, + CONF.default_notification_level, + body) + return fn(*args, **kwarg) + return wrapped_func + + +def publisher_id(service, host=None): + if not host: + host = CONF.host + return "%s.%s" % (service, host) + + +def notify(context, publisher_id, event_type, priority, payload): + """Sends a notification using the specified driver + + :param publisher_id: the source worker_type.host of the message + :param event_type: the literal type of event (ex. Instance Creation) + :param priority: patterned after the enumeration of Python logging + levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL) + :param payload: A python dictionary of attributes + + Outgoing message format includes the above parameters, and appends the + following: + + message_id + a UUID representing the id for this notification + + timestamp + the GMT timestamp the notification was sent at + + The composite message will be constructed as a dictionary of the above + attributes, which will then be sent via the transport mechanism defined + by the driver. + + Message example:: + + {'message_id': str(uuid.uuid4()), + 'publisher_id': 'compute.host1', + 'timestamp': timeutils.utcnow(), + 'priority': 'WARN', + 'event_type': 'compute.create_instance', + 'payload': {'instance_id': 12, ... }} + + """ + if priority not in log_levels: + raise BadPriorityException( + _('%s not in valid priorities') % priority) + + # Ensure everything is JSON serializable. + payload = jsonutils.to_primitive(payload, convert_instances=True) + + driver = importutils.import_module(CONF.notification_driver) + msg = dict(message_id=str(uuid.uuid4()), + publisher_id=publisher_id, + event_type=event_type, + priority=priority, + payload=payload, + timestamp=str(timeutils.utcnow())) + try: + driver.notify(context, msg) + except Exception, e: + LOG.exception(_("Problem '%(e)s' attempting to " + "send to notification system. Payload=%(payload)s") % + locals()) diff --git a/quantum/openstack/common/notifier/list_notifier.py b/quantum/openstack/common/notifier/list_notifier.py new file mode 100644 index 0000000000..b2f3afa282 --- /dev/null +++ b/quantum/openstack/common/notifier/list_notifier.py @@ -0,0 +1,117 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +from quantum.openstack.common import cfg +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import importutils +from quantum.openstack.common import log as logging + + +list_notifier_drivers_opt = cfg.MultiStrOpt('list_notifier_drivers', + default=['quantum.openstack.common.notifier.no_op_notifier'], + help='List of drivers to send notifications') + +CONF = cfg.CONF +CONF.register_opt(list_notifier_drivers_opt) + +LOG = logging.getLogger(__name__) + +drivers = None + + +class ImportFailureNotifier(object): + """Noisily re-raises some exception over-and-over when notify is called.""" + + def __init__(self, exception): + self.exception = exception + + def notify(self, context, message): + raise self.exception + + +def _get_drivers(): + """Instantiates and returns drivers based on the flag values.""" + global drivers + if drivers is None: + drivers = [] + for notification_driver in CONF.list_notifier_drivers: + try: + drivers.append(importutils.import_module(notification_driver)) + except ImportError as e: + drivers.append(ImportFailureNotifier(e)) + return drivers + + +def add_driver(notification_driver): + """Add a notification driver at runtime.""" + # Make sure the driver list is initialized. + _get_drivers() + if isinstance(notification_driver, basestring): + # Load and add + try: + drivers.append(importutils.import_module(notification_driver)) + except ImportError as e: + drivers.append(ImportFailureNotifier(e)) + else: + # Driver is already loaded; just add the object. + drivers.append(notification_driver) + + +def _object_name(obj): + name = [] + if hasattr(obj, '__module__'): + name.append(obj.__module__) + if hasattr(obj, '__name__'): + name.append(obj.__name__) + else: + name.append(obj.__class__.__name__) + return '.'.join(name) + + +def remove_driver(notification_driver): + """Remove a notification driver at runtime.""" + # Make sure the driver list is initialized. + _get_drivers() + removed = False + if notification_driver in drivers: + # We're removing an object. Easy. + drivers.remove(notification_driver) + removed = True + else: + # We're removing a driver by name. Search for it. + for driver in drivers: + if _object_name(driver) == notification_driver: + drivers.remove(driver) + removed = True + + if not removed: + raise ValueError("Cannot remove; %s is not in list" % + notification_driver) + + +def notify(context, message): + """Passes notification to multiple notifiers in a list.""" + for driver in _get_drivers(): + try: + driver.notify(context, message) + except Exception as e: + LOG.exception(_("Problem '%(e)s' attempting to send to " + "notification driver %(driver)s."), locals()) + + +def _reset_drivers(): + """Used by unit tests to reset the drivers.""" + global drivers + drivers = None diff --git a/quantum/openstack/common/notifier/log_notifier.py b/quantum/openstack/common/notifier/log_notifier.py new file mode 100644 index 0000000000..ea77908200 --- /dev/null +++ b/quantum/openstack/common/notifier/log_notifier.py @@ -0,0 +1,35 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from quantum.openstack.common import cfg +from quantum.openstack.common import jsonutils +from quantum.openstack.common import log as logging + + +CONF = cfg.CONF + + +def notify(_context, message): + """Notifies the recipient of the desired event given the model. + Log notifications using openstack's default logging system""" + + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + logger = logging.getLogger( + 'quantum.openstack.common.notification.%s' % + message['event_type']) + getattr(logger, priority)(jsonutils.dumps(message)) diff --git a/quantum/openstack/common/notifier/no_op_notifier.py b/quantum/openstack/common/notifier/no_op_notifier.py new file mode 100644 index 0000000000..ee1ddbdcac --- /dev/null +++ b/quantum/openstack/common/notifier/no_op_notifier.py @@ -0,0 +1,19 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +def notify(_context, message): + """Notifies the recipient of the desired event given the model""" + pass diff --git a/quantum/openstack/common/notifier/rabbit_notifier.py b/quantum/openstack/common/notifier/rabbit_notifier.py new file mode 100644 index 0000000000..1f737da18d --- /dev/null +++ b/quantum/openstack/common/notifier/rabbit_notifier.py @@ -0,0 +1,46 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +from quantum.openstack.common import cfg +from quantum.openstack.common import context as req_context +from quantum.openstack.common.gettextutils import _ +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc + +LOG = logging.getLogger(__name__) + +notification_topic_opt = cfg.ListOpt('notification_topics', + default=['notifications', ], + help='AMQP topic used for openstack notifications') + +CONF = cfg.CONF +CONF.register_opt(notification_topic_opt) + + +def notify(context, message): + """Sends a notification to the RabbitMQ""" + if not context: + context = req_context.get_admin_context() + priority = message.get('priority', + CONF.default_notification_level) + priority = priority.lower() + for topic in CONF.notification_topics: + topic = '%s.%s' % (topic, priority) + try: + rpc.notify(context, topic, message) + except Exception, e: + LOG.exception(_("Could not send notification to %(topic)s. " + "Payload=%(message)s"), locals()) diff --git a/quantum/openstack/common/notifier/test_notifier.py b/quantum/openstack/common/notifier/test_notifier.py new file mode 100644 index 0000000000..5e348803dc --- /dev/null +++ b/quantum/openstack/common/notifier/test_notifier.py @@ -0,0 +1,22 @@ +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + + +NOTIFICATIONS = [] + + +def notify(_context, message): + """Test notifier, stores notifications in memory for unittests.""" + NOTIFICATIONS.append(message) diff --git a/quantum/openstack/common/policy.py b/quantum/openstack/common/policy.py index 744871ca78..8a1225b48f 100644 --- a/quantum/openstack/common/policy.py +++ b/quantum/openstack/common/policy.py @@ -21,6 +21,7 @@ import logging import urllib import urllib2 +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import jsonutils diff --git a/quantum/openstack/common/rpc/__init__.py b/quantum/openstack/common/rpc/__init__.py index 0d14c29e3c..6442f1b2cc 100644 --- a/quantum/openstack/common/rpc/__init__.py +++ b/quantum/openstack/common/rpc/__init__.py @@ -48,7 +48,8 @@ rpc_opts = [ 'Only supported by impl_zmq.'), cfg.ListOpt('allowed_rpc_exception_modules', default=['quantum.openstack.common.exception', - 'nova.exception'], + 'nova.exception', + ], help='Modules of exceptions that are permitted to be recreated' 'upon receiving exception data from an rpc call.'), cfg.StrOpt('control_exchange', diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index 9da903d391..b6d2751cb0 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -35,6 +35,7 @@ from eventlet import pools from eventlet import semaphore from quantum.openstack.common import excutils +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import local from quantum.openstack.common.rpc import common as rpc_common diff --git a/quantum/openstack/common/rpc/common.py b/quantum/openstack/common/rpc/common.py index e8f8173b41..9ef1fc1d9c 100644 --- a/quantum/openstack/common/rpc/common.py +++ b/quantum/openstack/common/rpc/common.py @@ -23,10 +23,10 @@ import sys import traceback from quantum.openstack.common import cfg +from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils from quantum.openstack.common import jsonutils from quantum.openstack.common import local -from quantum.openstack.common.gettextutils import _ LOG = logging.getLogger(__name__) diff --git a/quantum/openstack/common/rpc/dispatcher.py b/quantum/openstack/common/rpc/dispatcher.py index f8aedcc073..14dd180b68 100644 --- a/quantum/openstack/common/rpc/dispatcher.py +++ b/quantum/openstack/common/rpc/dispatcher.py @@ -40,6 +40,45 @@ The conversion over to a versioned API must be done on both the client side and server side of the API at the same time. However, as the code stands today, there can be both versioned and unversioned APIs implemented in the same code base. + + +EXAMPLES: + +Nova was the first project to use versioned rpc APIs. Consider the compute rpc +API as an example. The client side is in nova/compute/rpcapi.py and the server +side is in nova/compute/manager.py. + + +Example 1) Adding a new method. + +Adding a new method is a backwards compatible change. It should be added to +nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to +X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should +have a specific version specified to indicate the minimum API version that must +be implemented for the method to be supported. For example: + + def get_host_uptime(self, ctxt, host): + topic = _compute_topic(self.topic, ctxt, host, None) + return self.call(ctxt, self.make_msg('get_host_uptime'), topic, + version='1.1') + +In this case, version '1.1' is the first version that supported the +get_host_uptime() method. + + +Example 2) Adding a new parameter. + +Adding a new parameter to an rpc method can be made backwards compatible. The +RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. +The implementation of the method must not expect the parameter to be present. + + def some_remote_method(self, arg1, arg2, newarg=None): + # The code needs to deal with newarg=None for cases + # where an older client sends a message without it. + pass + +On the client side, the same changes should be made as in example 1. The +minimum version that supports the new parameter should be specified. """ from quantum.openstack.common.rpc import common as rpc_common diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index b7d0ae05c8..0b1336e3af 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -15,6 +15,7 @@ # under the License. import pprint +import socket import string import sys import types @@ -46,9 +47,12 @@ zmq_opts = [ 'address.'), # The module.Class to use for matchmaking. - cfg.StrOpt('rpc_zmq_matchmaker', - default='quantum.openstack.common.rpc.matchmaker.' - 'MatchMakerLocalhost', help='MatchMaker driver'), + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('quantum.openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), # The following port is unassigned by IANA as of 2012-05-21 cfg.IntOpt('rpc_zmq_port', default=9501, @@ -59,6 +63,10 @@ zmq_opts = [ cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', help='Directory for holding IPC sockets'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.') ] @@ -119,11 +127,12 @@ class ZmqSocket(object): for f in do_sub: self.subscribe(f) - LOG.debug(_("Connecting to %{addr}s with %{type}s" - "\n-> Subscribed to %{subscribe}s" - "\n-> bind: %{bind}s"), - {'addr': addr, 'type': self.socket_s(), - 'subscribe': subscribe, 'bind': bind}) + str_data = {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind} + + LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) + LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) + LOG.debug(_("-> bind: %(bind)s"), str_data) try: if bind: @@ -542,8 +551,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None): msg_id = str(uuid.uuid4().hex) # Replies always come into the reply service. - # We require that FLAGS.host is a FQDN, IP, or resolvable hostname. - reply_topic = "zmq_replies.%s" % FLAGS.host + reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host LOG.debug(_("Creating payload")) # Curry the original request into a reply method. @@ -712,3 +720,6 @@ def register_opts(conf): mm_impl = importutils.import_module(mm_module) mm_constructor = getattr(mm_impl, mm_class) matchmaker = mm_constructor() + + +register_opts(cfg.CONF) diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index 0c300c84a5..ecb54eb8ce 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -24,6 +24,7 @@ import json import logging from quantum.openstack.common import cfg +from quantum.openstack.common.gettextutils import _ matchmaker_opts = [ diff --git a/quantum/openstack/common/timeutils.py b/quantum/openstack/common/timeutils.py new file mode 100644 index 0000000000..5eeaf70aa4 --- /dev/null +++ b/quantum/openstack/common/timeutils.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Time related utilities and helper functions. +""" + +import calendar +import datetime +import time + +import iso8601 + + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" +PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + + +def isotime(at=None): + """Stringify time in ISO 8601 format""" + if not at: + at = utcnow() + str = at.strftime(TIME_FORMAT) + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' + str += ('Z' if tz == 'UTC' else tz) + return str + + +def parse_isotime(timestr): + """Parse time from ISO 8601 format""" + try: + return iso8601.parse_date(timestr) + except iso8601.ParseError as e: + raise ValueError(e.message) + except TypeError as e: + raise ValueError(e.message) + + +def strtime(at=None, fmt=PERFECT_TIME_FORMAT): + """Returns formatted utcnow.""" + if not at: + at = utcnow() + return at.strftime(fmt) + + +def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): + """Turn a formatted time back into a datetime.""" + return datetime.datetime.strptime(timestr, fmt) + + +def normalize_time(timestamp): + """Normalize time in arbitrary timezone to UTC""" + offset = timestamp.utcoffset() + return timestamp.replace(tzinfo=None) - offset if offset else timestamp + + +def is_older_than(before, seconds): + """Return True if before is older than seconds.""" + return utcnow() - before > datetime.timedelta(seconds=seconds) + + +def utcnow_ts(): + """Timestamp version of our utcnow function.""" + return calendar.timegm(utcnow().timetuple()) + + +def utcnow(): + """Overridable version of utils.utcnow.""" + if utcnow.override_time: + return utcnow.override_time + return datetime.datetime.utcnow() + + +utcnow.override_time = None + + +def set_time_override(override_time=datetime.datetime.utcnow()): + """Override utils.utcnow to return a constant time.""" + utcnow.override_time = override_time + + +def advance_time_delta(timedelta): + """Advance overriden time using a datetime.timedelta.""" + assert(not utcnow.override_time is None) + utcnow.override_time += timedelta + + +def advance_time_seconds(seconds): + """Advance overriden time by seconds.""" + advance_time_delta(datetime.timedelta(0, seconds)) + + +def clear_time_override(): + """Remove the overridden time.""" + utcnow.override_time = None diff --git a/run_tests.sh b/run_tests.sh index 6c62b5432f..a2e3772d96 100755 --- a/run_tests.sh +++ b/run_tests.sh @@ -98,6 +98,12 @@ function run_pep8 { echo "Running pep8 ..." PEP8_EXCLUDE="vcsversion.py,*.pyc" + # TODO(gongysh) we should pep8 check openstack common files. But that project does + # not stick to latest pep8 version. Therefore we exclude these common files here. + # Pep8 does not support exclude in the format quantum/openstack/common, + # so I have to exclude some of openstack common files one by one. This also applies + # to tox.ini. + PEP8_EXCLUDE="$PEP8_EXCLUDE,log.py,notifier,rpc" PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat --show-source" PEP8_INCLUDE="bin/* quantum run_tests.py setup*.py" ${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE diff --git a/tools/pip-requires b/tools/pip-requires index adfa8c614b..5e3a1f2608 100644 --- a/tools/pip-requires +++ b/tools/pip-requires @@ -3,6 +3,7 @@ PasteDeploy==1.5.0 Routes>=1.12.3 eventlet>=0.9.12 httplib2 +iso8601>=0.1.4 lxml netaddr python-gflags==1.3 diff --git a/tox.ini b/tox.ini index 3a5cf4573e..402b6539bd 100644 --- a/tox.ini +++ b/tox.ini @@ -21,7 +21,7 @@ downloadcache = ~/cache/pip [testenv:pep8] deps = pep8 setuptools_git>=0.4 -commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,*egg . +commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,*egg,log.py,notifier,rpc . [testenv:cover] setenv = NOSE_WITH_COVERAGE=1