Introduce files from openstack common.

Because the openstack common project does not stick to latest pep8 rules,
I have to exclude some bad-ruled files in the tox.ini and run_tests.sh.
However pep8 does not support exclude option in the format quantum/openstack/common,
so I have to exclude some of openstack common files one by one.

Also, I have changed the Qunatum Context to base on the common context.

I does not update the setup.py in our openstack common dir and the one
under quantum top dir, since it should be maintained in a consistent way
across all of openstack projects.

After this introduction, we are ready for notification feature.

Change-Id: I2729c2dc3958835374c88d704e842e613785ec14
This commit is contained in:
Yong Sheng Gong 2012-07-15 08:17:34 +08:00
parent b5db77a4bc
commit 3cff1e7b97
25 changed files with 1056 additions and 26 deletions

View File

@ -16,7 +16,7 @@ bind_port = 9696
# api_extensions_path = extensions:/path/to/more/extensions:/even/more/extensions # api_extensions_path = extensions:/path/to/more/extensions:/even/more/extensions
# The __path__ of quantum.extensions is appended to this, so if your # The __path__ of quantum.extensions is appended to this, so if your
# extensions are in there you don't need to specify them here # extensions are in there you don't need to specify them here
api_extensions_path = # api_extensions_path =
# Quantum plugin provider module # Quantum plugin provider module
core_plugin = quantum.plugins.sample.SamplePlugin.FakePlugin core_plugin = quantum.plugins.sample.SamplePlugin.FakePlugin

View File

@ -1,7 +1,5 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,context,exception,excutils,gettextutils,importutils,iniparser,jsonutils,local,policy,rpc,setup modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,notifier,timeutils,log,context,local,rpc,gettextutils,excutils
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=quantum base=quantum

View File

@ -23,11 +23,11 @@ import logging
from datetime import datetime from datetime import datetime
from quantum.db import api as db_api from quantum.db import api as db_api
from quantum.openstack.common import context as common_context
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class Context(object): class Context(common_context.RequestContext):
"""Security context and request information. """Security context and request information.
Represents the user taking a given action within the system. Represents the user taking a given action within the system.
@ -44,11 +44,11 @@ class Context(object):
if kwargs: if kwargs:
LOG.warn(_('Arguments dropped when creating ' LOG.warn(_('Arguments dropped when creating '
'context: %s') % str(kwargs)) 'context: %s') % str(kwargs))
super(Context, self).__init__(user=user_id, tenant=tenant_id,
is_admin=is_admin)
self.user_id = user_id self.user_id = user_id
self.tenant_id = tenant_id self.tenant_id = tenant_id
self.roles = roles or [] self.roles = roles or []
self.is_admin = is_admin
if self.is_admin is None: if self.is_admin is None:
self.is_admin = 'admin' in [x.lower() for x in self.roles] self.is_admin = 'admin' in [x.lower() for x in self.roles]
elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]: elif self.is_admin and 'admin' not in [x.lower() for x in self.roles]:

View File

@ -42,8 +42,8 @@ Options can be strings, integers, floats, booleans, lists or 'multi strings'::
osapi_compute_extension_opt = cfg.MultiStrOpt('osapi_compute_extension', osapi_compute_extension_opt = cfg.MultiStrOpt('osapi_compute_extension',
default=DEFAULT_EXTENSIONS) default=DEFAULT_EXTENSIONS)
Option schemas are registered with with the config manager at runtime, but Option schemas are registered with the config manager at runtime, but before
before the option is referenced:: the option is referenced::
class ExtensionManager(object): class ExtensionManager(object):

View File

@ -20,6 +20,7 @@ Import related utilities and helper functions.
""" """
import sys import sys
import traceback
def import_class(import_str): def import_class(import_str):
@ -30,7 +31,8 @@ def import_class(import_str):
return getattr(sys.modules[mod_str], class_str) return getattr(sys.modules[mod_str], class_str)
except (ImportError, ValueError, AttributeError), exc: except (ImportError, ValueError, AttributeError), exc:
raise ImportError('Class %s cannot be found (%s)' % raise ImportError('Class %s cannot be found (%s)' %
(class_str, str(exc))) (class_str,
traceback.format_exception(*sys.exc_info())))
def import_object(import_str, *args, **kwargs): def import_object(import_str, *args, **kwargs):

View File

@ -39,6 +39,8 @@ import itertools
import json import json
import xmlrpclib import xmlrpclib
from quantum.openstack.common import timeutils
def to_primitive(value, convert_instances=False, level=0): def to_primitive(value, convert_instances=False, level=0):
"""Convert a complex object into primitives. """Convert a complex object into primitives.
@ -101,7 +103,7 @@ def to_primitive(value, convert_instances=False, level=0):
level=level) level=level)
return o return o
elif isinstance(value, datetime.datetime): elif isinstance(value, datetime.datetime):
return str(value) return timeutils.strtime(value)
elif hasattr(value, 'iteritems'): elif hasattr(value, 'iteritems'):
return to_primitive(dict(value.iteritems()), return to_primitive(dict(value.iteritems()),
convert_instances=convert_instances, convert_instances=convert_instances,
@ -130,11 +132,15 @@ def loads(s):
return json.loads(s) return json.loads(s)
def load(s):
return json.load(s)
try: try:
import anyjson import anyjson
except ImportError: except ImportError:
pass pass
else: else:
anyjson._modules.append((__name__, 'dumps', TypeError, anyjson._modules.append((__name__, 'dumps', TypeError,
'loads', ValueError)) 'loads', ValueError, 'load'))
anyjson.force_implementation(__name__) anyjson.force_implementation(__name__)

View File

@ -0,0 +1,459 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# Copyright 2010 United States Government as represented by the
# Administrator of the National Aeronautics and Space Administration.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Openstack logging handler.
This module adds to logging functionality by adding the option to specify
a context object when calling the various log methods. If the context object
is not specified, default formatting is used. Additionally, an instance uuid
may be passed as part of the log message, which is intended to make it easier
for admins to find messages related to a specific instance.
It also allows setting of formatting information through conf.
"""
import cStringIO
import inspect
import itertools
import logging
import logging.config
import logging.handlers
import os
import stat
import sys
import traceback
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils
from quantum.openstack.common import local
from quantum.openstack.common import notifier
log_opts = [
cfg.StrOpt('logging_context_format_string',
default='%(asctime)s %(levelname)s %(name)s [%(request_id)s '
'%(user_id)s %(project_id)s] %(instance)s'
'%(message)s',
help='format string to use for log messages with context'),
cfg.StrOpt('logging_default_format_string',
default='%(asctime)s %(levelname)s %(name)s [-] %(instance)s'
'%(message)s',
help='format string to use for log messages without context'),
cfg.StrOpt('logging_debug_format_suffix',
default='from (pid=%(process)d) %(funcName)s '
'%(pathname)s:%(lineno)d',
help='data to append to log format when level is DEBUG'),
cfg.StrOpt('logging_exception_prefix',
default='%(asctime)s TRACE %(name)s %(instance)s',
help='prefix each line of exception output with this format'),
cfg.ListOpt('default_log_levels',
default=[
'amqplib=WARN',
'sqlalchemy=WARN',
'boto=WARN',
'suds=INFO',
'keystone=INFO',
'eventlet.wsgi.server=WARN'
],
help='list of logger=LEVEL pairs'),
cfg.BoolOpt('publish_errors',
default=False,
help='publish error events'),
# NOTE(mikal): there are two options here because sometimes we are handed
# a full instance (and could include more information), and other times we
# are just handed a UUID for the instance.
cfg.StrOpt('instance_format',
default='[instance: %(uuid)s] ',
help='If an instance is passed with the log message, format '
'it like this'),
cfg.StrOpt('instance_uuid_format',
default='[instance: %(uuid)s] ',
help='If an instance UUID is passed with the log message, '
'format it like this'),
]
generic_log_opts = [
cfg.StrOpt('logdir',
default=None,
help='Log output to a per-service log file in named directory'),
cfg.StrOpt('logfile',
default=None,
help='Log output to a named file'),
cfg.BoolOpt('use_stderr',
default=True,
help='Log output to standard error'),
cfg.StrOpt('logfile_mode',
default='0644',
help='Default file mode used when creating log files'),
]
CONF = cfg.CONF
CONF.register_opts(generic_log_opts)
CONF.register_opts(log_opts)
# our new audit level
# NOTE(jkoelker) Since we synthesized an audit level, make the logging
# module aware of it so it acts like other levels.
logging.AUDIT = logging.INFO + 1
logging.addLevelName(logging.AUDIT, 'AUDIT')
try:
NullHandler = logging.NullHandler
except AttributeError: # NOTE(jkoelker) NullHandler added in Python 2.7
class NullHandler(logging.Handler):
def handle(self, record):
pass
def emit(self, record):
pass
def createLock(self):
self.lock = None
def _dictify_context(context):
if context is None:
return None
if not isinstance(context, dict) and getattr(context, 'to_dict', None):
context = context.to_dict()
return context
def _get_binary_name():
return os.path.basename(inspect.stack()[-1][1])
def _get_log_file_path(binary=None):
logfile = CONF.log_file or CONF.logfile
logdir = CONF.log_dir or CONF.logdir
if logfile and not logdir:
return logfile
if logfile and logdir:
return os.path.join(logdir, logfile)
if logdir:
binary = binary or _get_binary_name()
return '%s.log' % (os.path.join(logdir, binary),)
class ContextAdapter(logging.LoggerAdapter):
warn = logging.LoggerAdapter.warning
def __init__(self, logger, project_name, version_string):
self.logger = logger
self.project = project_name
self.version = version_string
def audit(self, msg, *args, **kwargs):
self.log(logging.AUDIT, msg, *args, **kwargs)
def process(self, msg, kwargs):
if 'extra' not in kwargs:
kwargs['extra'] = {}
extra = kwargs['extra']
context = kwargs.pop('context', None)
if not context:
context = getattr(local.store, 'context', None)
if context:
extra.update(_dictify_context(context))
instance = kwargs.pop('instance', None)
instance_extra = ''
if instance:
instance_extra = CONF.instance_format % instance
else:
instance_uuid = kwargs.pop('instance_uuid', None)
if instance_uuid:
instance_extra = (CONF.instance_uuid_format
% {'uuid': instance_uuid})
extra.update({'instance': instance_extra})
extra.update({"project": self.project})
extra.update({"version": self.version})
extra['extra'] = extra.copy()
return msg, kwargs
class JSONFormatter(logging.Formatter):
def __init__(self, fmt=None, datefmt=None):
# NOTE(jkoelker) we ignore the fmt argument, but its still there
# since logging.config.fileConfig passes it.
self.datefmt = datefmt
def formatException(self, ei, strip_newlines=True):
lines = traceback.format_exception(*ei)
if strip_newlines:
lines = [itertools.ifilter(lambda x: x,
line.rstrip().splitlines())
for line in lines]
lines = list(itertools.chain(*lines))
return lines
def format(self, record):
message = {'message': record.getMessage(),
'asctime': self.formatTime(record, self.datefmt),
'name': record.name,
'msg': record.msg,
'args': record.args,
'levelname': record.levelname,
'levelno': record.levelno,
'pathname': record.pathname,
'filename': record.filename,
'module': record.module,
'lineno': record.lineno,
'funcname': record.funcName,
'created': record.created,
'msecs': record.msecs,
'relative_created': record.relativeCreated,
'thread': record.thread,
'thread_name': record.threadName,
'process_name': record.processName,
'process': record.process,
'traceback': None}
if hasattr(record, 'extra'):
message['extra'] = record.extra
if record.exc_info:
message['traceback'] = self.formatException(record.exc_info)
return jsonutils.dumps(message)
class PublishErrorsHandler(logging.Handler):
def emit(self, record):
if 'list_notifier_drivers' in CONF:
if ('quantum.openstack.common.notifier.log_notifier' in
CONF.list_notifier_drivers):
return
notifier.api.notify(None, 'error.publisher',
'error_notification',
notifier.api.ERROR,
dict(error=record.msg))
def handle_exception(type, value, tb):
extra = {}
if CONF.verbose:
extra['exc_info'] = (type, value, tb)
getLogger().critical(str(value), **extra)
def setup(product_name):
"""Setup logging."""
sys.excepthook = handle_exception
if CONF.log_config:
try:
logging.config.fileConfig(CONF.log_config)
except Exception:
traceback.print_exc()
raise
else:
_setup_logging_from_conf(product_name)
def _find_facility_from_conf():
facility_names = logging.handlers.SysLogHandler.facility_names
facility = getattr(logging.handlers.SysLogHandler,
CONF.syslog_log_facility,
None)
if facility is None and CONF.syslog_log_facility in facility_names:
facility = facility_names.get(CONF.syslog_log_facility)
if facility is None:
valid_facilities = facility_names.keys()
consts = ['LOG_AUTH', 'LOG_AUTHPRIV', 'LOG_CRON', 'LOG_DAEMON',
'LOG_FTP', 'LOG_KERN', 'LOG_LPR', 'LOG_MAIL', 'LOG_NEWS',
'LOG_AUTH', 'LOG_SYSLOG', 'LOG_USER', 'LOG_UUCP',
'LOG_LOCAL0', 'LOG_LOCAL1', 'LOG_LOCAL2', 'LOG_LOCAL3',
'LOG_LOCAL4', 'LOG_LOCAL5', 'LOG_LOCAL6', 'LOG_LOCAL7']
valid_facilities.extend(consts)
raise TypeError(_('syslog facility must be one of: %s') %
', '.join("'%s'" % fac
for fac in valid_facilities))
return facility
def _setup_logging_from_conf(product_name):
log_root = getLogger(product_name).logger
for handler in log_root.handlers:
log_root.removeHandler(handler)
if CONF.use_syslog:
facility = _find_facility_from_conf()
syslog = logging.handlers.SysLogHandler(address='/dev/log',
facility=facility)
log_root.addHandler(syslog)
logpath = _get_log_file_path()
if logpath:
filelog = logging.handlers.WatchedFileHandler(logpath)
log_root.addHandler(filelog)
mode = int(CONF.logfile_mode, 8)
st = os.stat(logpath)
if st.st_mode != (stat.S_IFREG | mode):
os.chmod(logpath, mode)
if CONF.use_stderr:
streamlog = ColorHandler()
log_root.addHandler(streamlog)
elif not CONF.log_file:
# pass sys.stdout as a positional argument
# python2.6 calls the argument strm, in 2.7 it's stream
streamlog = logging.StreamHandler(sys.stdout)
log_root.addHandler(streamlog)
if CONF.publish_errors:
log_root.addHandler(PublishErrorsHandler(logging.ERROR))
for handler in log_root.handlers:
datefmt = CONF.log_date_format
if CONF.log_format:
handler.setFormatter(logging.Formatter(fmt=CONF.log_format,
datefmt=datefmt))
handler.setFormatter(LegacyFormatter(datefmt=datefmt))
if CONF.verbose or CONF.debug:
log_root.setLevel(logging.DEBUG)
else:
log_root.setLevel(logging.INFO)
level = logging.NOTSET
for pair in CONF.default_log_levels:
mod, _sep, level_name = pair.partition('=')
level = logging.getLevelName(level_name)
logger = logging.getLogger(mod)
logger.setLevel(level)
for handler in log_root.handlers:
logger.addHandler(handler)
# NOTE(jkoelker) Clear the handlers for the root logger that was setup
# by basicConfig in nova/__init__.py and install the
# NullHandler.
root = logging.getLogger()
for handler in root.handlers:
root.removeHandler(handler)
handler = NullHandler()
handler.setFormatter(logging.Formatter())
root.addHandler(handler)
_loggers = {}
def getLogger(name='unknown', version='unknown'):
if name not in _loggers:
_loggers[name] = ContextAdapter(logging.getLogger(name),
name,
version)
return _loggers[name]
class WritableLogger(object):
"""A thin wrapper that responds to `write` and logs."""
def __init__(self, logger, level=logging.INFO):
self.logger = logger
self.level = level
def write(self, msg):
self.logger.log(self.level, msg)
class LegacyFormatter(logging.Formatter):
"""A context.RequestContext aware formatter configured through flags.
The flags used to set format strings are: logging_context_format_string
and logging_default_format_string. You can also specify
logging_debug_format_suffix to append extra formatting if the log level is
debug.
For information about what variables are available for the formatter see:
http://docs.python.org/library/logging.html#formatter
"""
def format(self, record):
"""Uses contextstring if request_id is set, otherwise default."""
if 'instance' not in record.__dict__:
record.__dict__['instance'] = ''
if record.__dict__.get('request_id', None):
self._fmt = CONF.logging_context_format_string
else:
self._fmt = CONF.logging_default_format_string
if (record.levelno == logging.DEBUG and
CONF.logging_debug_format_suffix):
self._fmt += " " + CONF.logging_debug_format_suffix
# Cache this on the record, Logger will respect our formated copy
if record.exc_info:
record.exc_text = self.formatException(record.exc_info, record)
return logging.Formatter.format(self, record)
def formatException(self, exc_info, record=None):
"""Format exception output with CONF.logging_exception_prefix."""
if not record:
return logging.Formatter.formatException(self, exc_info)
stringbuffer = cStringIO.StringIO()
traceback.print_exception(exc_info[0], exc_info[1], exc_info[2],
None, stringbuffer)
lines = stringbuffer.getvalue().split('\n')
stringbuffer.close()
if CONF.logging_exception_prefix.find('%(asctime)') != -1:
record.asctime = self.formatTime(record, self.datefmt)
formatted_lines = []
for line in lines:
pl = CONF.logging_exception_prefix % record.__dict__
fl = '%s%s' % (pl, line)
formatted_lines.append(fl)
return '\n'.join(formatted_lines)
class ColorHandler(logging.StreamHandler):
LEVEL_COLORS = {
logging.DEBUG: '\033[00;32m', # GREEN
logging.INFO: '\033[00;36m', # CYAN
logging.AUDIT: '\033[01;36m', # BOLD CYAN
logging.WARN: '\033[01;33m', # BOLD YELLOW
logging.ERROR: '\033[01;31m', # BOLD RED
logging.CRITICAL: '\033[01;31m', # BOLD RED
}
def format(self, record):
record.color = self.LEVEL_COLORS[record.levelno]
return logging.StreamHandler.format(self, record)

View File

@ -0,0 +1,14 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.

View File

@ -0,0 +1,142 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import inspect
import uuid
from quantum.openstack.common import cfg
from quantum.openstack.common import context
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
from quantum.openstack.common import timeutils
LOG = logging.getLogger(__name__)
notifier_opts = [
cfg.StrOpt('notification_driver',
default='quantum.openstack.common.notifier.no_op_notifier',
help='Default driver for sending notifications'),
cfg.StrOpt('default_notification_level',
default='INFO',
help='Default notification level for outgoing notifications'),
cfg.StrOpt('default_publisher_id',
default='$host',
help='Default publisher_id for outgoing notifications'),
]
CONF = cfg.CONF
CONF.register_opts(notifier_opts)
WARN = 'WARN'
INFO = 'INFO'
ERROR = 'ERROR'
CRITICAL = 'CRITICAL'
DEBUG = 'DEBUG'
log_levels = (DEBUG, WARN, INFO, ERROR, CRITICAL)
class BadPriorityException(Exception):
pass
def notify_decorator(name, fn):
""" decorator for notify which is used from utils.monkey_patch()
:param name: name of the function
:param function: - object of the function
:returns: function -- decorated function
"""
def wrapped_func(*args, **kwarg):
body = {}
body['args'] = []
body['kwarg'] = {}
for arg in args:
body['args'].append(arg)
for key in kwarg:
body['kwarg'][key] = kwarg[key]
ctxt = context.get_context_from_function_and_args(fn, args, kwarg)
notify(ctxt,
CONF.default_publisher_id,
name,
CONF.default_notification_level,
body)
return fn(*args, **kwarg)
return wrapped_func
def publisher_id(service, host=None):
if not host:
host = CONF.host
return "%s.%s" % (service, host)
def notify(context, publisher_id, event_type, priority, payload):
"""Sends a notification using the specified driver
:param publisher_id: the source worker_type.host of the message
:param event_type: the literal type of event (ex. Instance Creation)
:param priority: patterned after the enumeration of Python logging
levels in the set (DEBUG, WARN, INFO, ERROR, CRITICAL)
:param payload: A python dictionary of attributes
Outgoing message format includes the above parameters, and appends the
following:
message_id
a UUID representing the id for this notification
timestamp
the GMT timestamp the notification was sent at
The composite message will be constructed as a dictionary of the above
attributes, which will then be sent via the transport mechanism defined
by the driver.
Message example::
{'message_id': str(uuid.uuid4()),
'publisher_id': 'compute.host1',
'timestamp': timeutils.utcnow(),
'priority': 'WARN',
'event_type': 'compute.create_instance',
'payload': {'instance_id': 12, ... }}
"""
if priority not in log_levels:
raise BadPriorityException(
_('%s not in valid priorities') % priority)
# Ensure everything is JSON serializable.
payload = jsonutils.to_primitive(payload, convert_instances=True)
driver = importutils.import_module(CONF.notification_driver)
msg = dict(message_id=str(uuid.uuid4()),
publisher_id=publisher_id,
event_type=event_type,
priority=priority,
payload=payload,
timestamp=str(timeutils.utcnow()))
try:
driver.notify(context, msg)
except Exception, e:
LOG.exception(_("Problem '%(e)s' attempting to "
"send to notification system. Payload=%(payload)s") %
locals())

View File

@ -0,0 +1,117 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils
from quantum.openstack.common import log as logging
list_notifier_drivers_opt = cfg.MultiStrOpt('list_notifier_drivers',
default=['quantum.openstack.common.notifier.no_op_notifier'],
help='List of drivers to send notifications')
CONF = cfg.CONF
CONF.register_opt(list_notifier_drivers_opt)
LOG = logging.getLogger(__name__)
drivers = None
class ImportFailureNotifier(object):
"""Noisily re-raises some exception over-and-over when notify is called."""
def __init__(self, exception):
self.exception = exception
def notify(self, context, message):
raise self.exception
def _get_drivers():
"""Instantiates and returns drivers based on the flag values."""
global drivers
if drivers is None:
drivers = []
for notification_driver in CONF.list_notifier_drivers:
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
return drivers
def add_driver(notification_driver):
"""Add a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
if isinstance(notification_driver, basestring):
# Load and add
try:
drivers.append(importutils.import_module(notification_driver))
except ImportError as e:
drivers.append(ImportFailureNotifier(e))
else:
# Driver is already loaded; just add the object.
drivers.append(notification_driver)
def _object_name(obj):
name = []
if hasattr(obj, '__module__'):
name.append(obj.__module__)
if hasattr(obj, '__name__'):
name.append(obj.__name__)
else:
name.append(obj.__class__.__name__)
return '.'.join(name)
def remove_driver(notification_driver):
"""Remove a notification driver at runtime."""
# Make sure the driver list is initialized.
_get_drivers()
removed = False
if notification_driver in drivers:
# We're removing an object. Easy.
drivers.remove(notification_driver)
removed = True
else:
# We're removing a driver by name. Search for it.
for driver in drivers:
if _object_name(driver) == notification_driver:
drivers.remove(driver)
removed = True
if not removed:
raise ValueError("Cannot remove; %s is not in list" %
notification_driver)
def notify(context, message):
"""Passes notification to multiple notifiers in a list."""
for driver in _get_drivers():
try:
driver.notify(context, message)
except Exception as e:
LOG.exception(_("Problem '%(e)s' attempting to send to "
"notification driver %(driver)s."), locals())
def _reset_drivers():
"""Used by unit tests to reset the drivers."""
global drivers
drivers = None

View File

@ -0,0 +1,35 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common import cfg
from quantum.openstack.common import jsonutils
from quantum.openstack.common import log as logging
CONF = cfg.CONF
def notify(_context, message):
"""Notifies the recipient of the desired event given the model.
Log notifications using openstack's default logging system"""
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
logger = logging.getLogger(
'quantum.openstack.common.notification.%s' %
message['event_type'])
getattr(logger, priority)(jsonutils.dumps(message))

View File

@ -0,0 +1,19 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
def notify(_context, message):
"""Notifies the recipient of the desired event given the model"""
pass

View File

@ -0,0 +1,46 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
from quantum.openstack.common import cfg
from quantum.openstack.common import context as req_context
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import log as logging
from quantum.openstack.common import rpc
LOG = logging.getLogger(__name__)
notification_topic_opt = cfg.ListOpt('notification_topics',
default=['notifications', ],
help='AMQP topic used for openstack notifications')
CONF = cfg.CONF
CONF.register_opt(notification_topic_opt)
def notify(context, message):
"""Sends a notification to the RabbitMQ"""
if not context:
context = req_context.get_admin_context()
priority = message.get('priority',
CONF.default_notification_level)
priority = priority.lower()
for topic in CONF.notification_topics:
topic = '%s.%s' % (topic, priority)
try:
rpc.notify(context, topic, message)
except Exception, e:
LOG.exception(_("Could not send notification to %(topic)s. "
"Payload=%(message)s"), locals())

View File

@ -0,0 +1,22 @@
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
NOTIFICATIONS = []
def notify(_context, message):
"""Test notifier, stores notifications in memory for unittests."""
NOTIFICATIONS.append(message)

View File

@ -21,6 +21,7 @@ import logging
import urllib import urllib
import urllib2 import urllib2
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils

View File

@ -48,7 +48,8 @@ rpc_opts = [
'Only supported by impl_zmq.'), 'Only supported by impl_zmq.'),
cfg.ListOpt('allowed_rpc_exception_modules', cfg.ListOpt('allowed_rpc_exception_modules',
default=['quantum.openstack.common.exception', default=['quantum.openstack.common.exception',
'nova.exception'], 'nova.exception',
],
help='Modules of exceptions that are permitted to be recreated' help='Modules of exceptions that are permitted to be recreated'
'upon receiving exception data from an rpc call.'), 'upon receiving exception data from an rpc call.'),
cfg.StrOpt('control_exchange', cfg.StrOpt('control_exchange',

View File

@ -35,6 +35,7 @@ from eventlet import pools
from eventlet import semaphore from eventlet import semaphore
from quantum.openstack.common import excutils from quantum.openstack.common import excutils
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import local from quantum.openstack.common import local
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common

View File

@ -23,10 +23,10 @@ import sys
import traceback import traceback
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
from quantum.openstack.common import importutils from quantum.openstack.common import importutils
from quantum.openstack.common import jsonutils from quantum.openstack.common import jsonutils
from quantum.openstack.common import local from quantum.openstack.common import local
from quantum.openstack.common.gettextutils import _
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)

View File

@ -40,6 +40,45 @@ The conversion over to a versioned API must be done on both the client side and
server side of the API at the same time. However, as the code stands today, server side of the API at the same time. However, as the code stands today,
there can be both versioned and unversioned APIs implemented in the same code there can be both versioned and unversioned APIs implemented in the same code
base. base.
EXAMPLES:
Nova was the first project to use versioned rpc APIs. Consider the compute rpc
API as an example. The client side is in nova/compute/rpcapi.py and the server
side is in nova/compute/manager.py.
Example 1) Adding a new method.
Adding a new method is a backwards compatible change. It should be added to
nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to
X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should
have a specific version specified to indicate the minimum API version that must
be implemented for the method to be supported. For example:
def get_host_uptime(self, ctxt, host):
topic = _compute_topic(self.topic, ctxt, host, None)
return self.call(ctxt, self.make_msg('get_host_uptime'), topic,
version='1.1')
In this case, version '1.1' is the first version that supported the
get_host_uptime() method.
Example 2) Adding a new parameter.
Adding a new parameter to an rpc method can be made backwards compatible. The
RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped.
The implementation of the method must not expect the parameter to be present.
def some_remote_method(self, arg1, arg2, newarg=None):
# The code needs to deal with newarg=None for cases
# where an older client sends a message without it.
pass
On the client side, the same changes should be made as in example 1. The
minimum version that supports the new parameter should be specified.
""" """
from quantum.openstack.common.rpc import common as rpc_common from quantum.openstack.common.rpc import common as rpc_common

View File

@ -15,6 +15,7 @@
# under the License. # under the License.
import pprint import pprint
import socket
import string import string
import sys import sys
import types import types
@ -46,9 +47,12 @@ zmq_opts = [
'address.'), 'address.'),
# The module.Class to use for matchmaking. # The module.Class to use for matchmaking.
cfg.StrOpt('rpc_zmq_matchmaker', cfg.StrOpt(
default='quantum.openstack.common.rpc.matchmaker.' 'rpc_zmq_matchmaker',
'MatchMakerLocalhost', help='MatchMaker driver'), default=('quantum.openstack.common.rpc.'
'matchmaker.MatchMakerLocalhost'),
help='MatchMaker driver',
),
# The following port is unassigned by IANA as of 2012-05-21 # The following port is unassigned by IANA as of 2012-05-21
cfg.IntOpt('rpc_zmq_port', default=9501, cfg.IntOpt('rpc_zmq_port', default=9501,
@ -59,6 +63,10 @@ zmq_opts = [
cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack',
help='Directory for holding IPC sockets'), help='Directory for holding IPC sockets'),
cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(),
help='Name of this node. Must be a valid hostname, FQDN, or '
'IP address. Must match "host" option, if running Nova.')
] ]
@ -119,11 +127,12 @@ class ZmqSocket(object):
for f in do_sub: for f in do_sub:
self.subscribe(f) self.subscribe(f)
LOG.debug(_("Connecting to %{addr}s with %{type}s" str_data = {'addr': addr, 'type': self.socket_s(),
"\n-> Subscribed to %{subscribe}s" 'subscribe': subscribe, 'bind': bind}
"\n-> bind: %{bind}s"),
{'addr': addr, 'type': self.socket_s(), LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data)
'subscribe': subscribe, 'bind': bind}) LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data)
LOG.debug(_("-> bind: %(bind)s"), str_data)
try: try:
if bind: if bind:
@ -542,8 +551,7 @@ def _call(addr, context, msg_id, topic, msg, timeout=None):
msg_id = str(uuid.uuid4().hex) msg_id = str(uuid.uuid4().hex)
# Replies always come into the reply service. # Replies always come into the reply service.
# We require that FLAGS.host is a FQDN, IP, or resolvable hostname. reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host
reply_topic = "zmq_replies.%s" % FLAGS.host
LOG.debug(_("Creating payload")) LOG.debug(_("Creating payload"))
# Curry the original request into a reply method. # Curry the original request into a reply method.
@ -712,3 +720,6 @@ def register_opts(conf):
mm_impl = importutils.import_module(mm_module) mm_impl = importutils.import_module(mm_module)
mm_constructor = getattr(mm_impl, mm_class) mm_constructor = getattr(mm_impl, mm_class)
matchmaker = mm_constructor() matchmaker = mm_constructor()
register_opts(cfg.CONF)

View File

@ -24,6 +24,7 @@ import json
import logging import logging
from quantum.openstack.common import cfg from quantum.openstack.common import cfg
from quantum.openstack.common.gettextutils import _
matchmaker_opts = [ matchmaker_opts = [

View File

@ -0,0 +1,109 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2011 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Time related utilities and helper functions.
"""
import calendar
import datetime
import time
import iso8601
TIME_FORMAT = "%Y-%m-%dT%H:%M:%S"
PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f"
def isotime(at=None):
"""Stringify time in ISO 8601 format"""
if not at:
at = utcnow()
str = at.strftime(TIME_FORMAT)
tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC'
str += ('Z' if tz == 'UTC' else tz)
return str
def parse_isotime(timestr):
"""Parse time from ISO 8601 format"""
try:
return iso8601.parse_date(timestr)
except iso8601.ParseError as e:
raise ValueError(e.message)
except TypeError as e:
raise ValueError(e.message)
def strtime(at=None, fmt=PERFECT_TIME_FORMAT):
"""Returns formatted utcnow."""
if not at:
at = utcnow()
return at.strftime(fmt)
def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT):
"""Turn a formatted time back into a datetime."""
return datetime.datetime.strptime(timestr, fmt)
def normalize_time(timestamp):
"""Normalize time in arbitrary timezone to UTC"""
offset = timestamp.utcoffset()
return timestamp.replace(tzinfo=None) - offset if offset else timestamp
def is_older_than(before, seconds):
"""Return True if before is older than seconds."""
return utcnow() - before > datetime.timedelta(seconds=seconds)
def utcnow_ts():
"""Timestamp version of our utcnow function."""
return calendar.timegm(utcnow().timetuple())
def utcnow():
"""Overridable version of utils.utcnow."""
if utcnow.override_time:
return utcnow.override_time
return datetime.datetime.utcnow()
utcnow.override_time = None
def set_time_override(override_time=datetime.datetime.utcnow()):
"""Override utils.utcnow to return a constant time."""
utcnow.override_time = override_time
def advance_time_delta(timedelta):
"""Advance overriden time using a datetime.timedelta."""
assert(not utcnow.override_time is None)
utcnow.override_time += timedelta
def advance_time_seconds(seconds):
"""Advance overriden time by seconds."""
advance_time_delta(datetime.timedelta(0, seconds))
def clear_time_override():
"""Remove the overridden time."""
utcnow.override_time = None

View File

@ -98,6 +98,12 @@ function run_pep8 {
echo "Running pep8 ..." echo "Running pep8 ..."
PEP8_EXCLUDE="vcsversion.py,*.pyc" PEP8_EXCLUDE="vcsversion.py,*.pyc"
# TODO(gongysh) we should pep8 check openstack common files. But that project does
# not stick to latest pep8 version. Therefore we exclude these common files here.
# Pep8 does not support exclude in the format quantum/openstack/common,
# so I have to exclude some of openstack common files one by one. This also applies
# to tox.ini.
PEP8_EXCLUDE="$PEP8_EXCLUDE,log.py,notifier,rpc"
PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat --show-source" PEP8_OPTIONS="--exclude=$PEP8_EXCLUDE --repeat --show-source"
PEP8_INCLUDE="bin/* quantum run_tests.py setup*.py" PEP8_INCLUDE="bin/* quantum run_tests.py setup*.py"
${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE ${wrapper} pep8 $PEP8_OPTIONS $PEP8_INCLUDE

View File

@ -3,6 +3,7 @@ PasteDeploy==1.5.0
Routes>=1.12.3 Routes>=1.12.3
eventlet>=0.9.12 eventlet>=0.9.12
httplib2 httplib2
iso8601>=0.1.4
lxml lxml
netaddr netaddr
python-gflags==1.3 python-gflags==1.3

View File

@ -21,7 +21,7 @@ downloadcache = ~/cache/pip
[testenv:pep8] [testenv:pep8]
deps = pep8 deps = pep8
setuptools_git>=0.4 setuptools_git>=0.4
commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,*egg . commands = pep8 --repeat --show-source --exclude=.venv,.tox,dist,doc,*egg,log.py,notifier,rpc .
[testenv:cover] [testenv:cover]
setenv = NOSE_WITH_COVERAGE=1 setenv = NOSE_WITH_COVERAGE=1