diff --git a/reddwarf/common/api.py b/reddwarf/common/api.py index 708c91ffa6..cfbf5becd1 100644 --- a/reddwarf/common/api.py +++ b/reddwarf/common/api.py @@ -14,6 +14,7 @@ import routes +from reddwarf.openstack.common import rpc from reddwarf.common import wsgi from reddwarf.versions import VersionsController from reddwarf.flavor.service import FlavorController diff --git a/reddwarf/common/config.py b/reddwarf/common/config.py index dd4b1de208..17693f87c1 100644 --- a/reddwarf/common/config.py +++ b/reddwarf/common/config.py @@ -91,7 +91,6 @@ class Config(object): return get_option(cls.instance, key, **kwargs) - def create_type_func(type): @classmethod def get(cls, key, default=None, **kwargs): @@ -105,3 +104,51 @@ Config.get_float = create_type_func('float') Config.get_int = create_type_func('int') Config.get_list = create_type_func('list') Config.get_str = create_type_func('str') +del create_type_func + + +class ConfigFacade(object): + """This class presents an interface usable by OpenStack Common modules. + + OpenStack common uses a new config interface where the values are + accessed as attributes directly. This presents the same interface + so we can interface with OS common modules while we change our config + stuff. + + """ + + value_info = {} + + def __init__(self, conf): + self.conf = conf + + def __getattr__(self, name): + if name == "register_opts": + def f(*args, **kwargs): + pass + return f + if name in self.value_info: + v = self.value_info[name] + return self.conf.get(name, **v) + return self.conf.get(name) + + +class OsCommonModule(object): + """Emulates the OpenStack Common cfg module.""" + + @property + def CONF(self): + return ConfigFacade(Config()) + + +def create_type_func(type): + @classmethod + def func(cls, name, default, help): + ConfigFacade.value_info[name] = {'default': default, 'type': type} + return func + +OsCommonModule.BoolOpt = create_type_func('bool') +OsCommonModule.IntOpt = create_type_func('int') +OsCommonModule.ListOpt = create_type_func('list') +OsCommonModule.StrOpt = create_type_func('str') +del create_type_func diff --git a/reddwarf/common/service.py b/reddwarf/common/service.py index 64920e8c4d..38f0f98bb2 100644 --- a/reddwarf/common/service.py +++ b/reddwarf/common/service.py @@ -22,12 +22,15 @@ import inspect import os import logging import socket +import traceback +import weakref import eventlet import greenlet +from eventlet import greenthread from reddwarf.common import config -from reddwarf import rpc +from reddwarf.openstack.common import rpc from reddwarf.common import utils from reddwarf import version @@ -87,6 +90,19 @@ class Service(object): self.saved_args, self.saved_kwargs = args, kwargs self.timers = [] + def dispatch(self, ctxt, version, method, **kwargs): + """Handles incoming RPC messages.""" + #TODO(tim.simpson): Maybe in the future actually account for the + # version somehow with multiple managers or by + # sending the manager in or something. + if not version: + version = '1.0' + + if version != self.manager.RPC_API_VERSION: + raise UnsupportedRpcVersion(version=version) + + return self.manager.wrapper(method, ctxt, **kwargs) + def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" self.manager.periodic_tasks(raise_on_error=raise_on_error) @@ -94,11 +110,6 @@ class Service(object): def report_state(self): pass - def __getattr__(self, key): - """This method proxy's the calls to the manager implementation""" - manager = self.__dict__.get('manager', None) - return functools.partial(manager._wrapper, key) - def start(self): vcs_string = version.version_string_with_vcs() LOG.info(_('Starting %(topic)s node (version %(vcs_string)s)'), @@ -169,11 +180,12 @@ class Service(object): class Manager(object): def __init__(self, host=None): self.host = host + self.tasks = weakref.WeakKeyDictionary() super(Manager, self).__init__() def periodic_tasks(self, raise_on_error=False): """Tasks to be run at a periodic interval.""" - pass + LOG.debug("No. of running tasks: %d" % len(self.tasks)) def init_host(self): """Handle initialization if this is a standalone service. @@ -183,10 +195,29 @@ class Manager(object): """ pass - def _wrapper(self, method, context, *args, **kwargs): - """Wraps the called functions with additional information.""" - func = getattr(self, method) - return func(context, *args, **kwargs) + #TODO(tim.simpson): Rename this to "execute" or something clearer. + def wrapper(self, method, context, *args, **kwargs): + """Maps the respective manager method with a task counter.""" + # TODO(rnirmal): Just adding a basic counter. Will revist and + # re-implement when we have actual tasks. + self.tasks[greenthread.getcurrent()] = context + try: + if not hasattr(self, method): + raise AttributeError("No such RPC function '%s'" % method) + func = getattr(self, method) + LOG.info(str('*' * 80)) + LOG.info("Running method %s..." % method) + LOG.info(str('*' * 80)) + result = func(context, *args, **kwargs) + LOG.info("Finished method %s." % method) + return result + except Exception as e: + LOG.error("Got an error running %s!" % method) + LOG.error(traceback.format_exc()) + finally: + LOG.info(str('-' * 80)) + del self.tasks[greenthread.getcurrent()] + _launcher = None diff --git a/reddwarf/guestagent/api.py b/reddwarf/guestagent/api.py index 11a932d4a6..18dd19d203 100644 --- a/reddwarf/guestagent/api.py +++ b/reddwarf/guestagent/api.py @@ -23,7 +23,7 @@ import logging from eventlet import Timeout -from reddwarf import rpc +from reddwarf.openstack.common import rpc from reddwarf.common import config from reddwarf.common import exception from reddwarf.common import utils diff --git a/reddwarf/openstack/common/excutils.py b/reddwarf/openstack/common/excutils.py new file mode 100644 index 0000000000..67c9fa9511 --- /dev/null +++ b/reddwarf/openstack/common/excutils.py @@ -0,0 +1,49 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# Copyright 2012, Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Exception related utilities. +""" + +import contextlib +import logging +import sys +import traceback + + +@contextlib.contextmanager +def save_and_reraise_exception(): + """Save current exception, run some code and then re-raise. + + In some cases the exception context can be cleared, resulting in None + being attempted to be reraised after an exception handler is run. This + can happen when eventlet switches greenthreads or when running an + exception handler, code raises and catches an exception. In both + cases the exception context will be cleared. + + To work around this, we save the exception state, run handler code, and + then re-raise the original exception. If another exception occurs, the + saved exception is logged and the new exception is reraised. + """ + type_, value, tb = sys.exc_info() + try: + yield + except Exception: + logging.error('Original exception being dropped: %s' % + (traceback.format_exception(type_, value, tb))) + raise + raise type_, value, tb diff --git a/reddwarf/openstack/common/importutils.py b/reddwarf/openstack/common/importutils.py new file mode 100644 index 0000000000..2fbb0291a0 --- /dev/null +++ b/reddwarf/openstack/common/importutils.py @@ -0,0 +1,59 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Import related utilities and helper functions. +""" + +import sys +import traceback + + +def import_class(import_str): + """Returns a class from a string including module and class""" + mod_str, _sep, class_str = import_str.rpartition('.') + try: + __import__(mod_str) + return getattr(sys.modules[mod_str], class_str) + except (ImportError, ValueError, AttributeError), exc: + raise ImportError('Class %s cannot be found (%s)' % + (class_str, + traceback.format_exception(*sys.exc_info()))) + + +def import_object(import_str, *args, **kwargs): + """Import a class and return an instance of it.""" + return import_class(import_str)(*args, **kwargs) + + +def import_object_ns(name_space, import_str, *args, **kwargs): + """ + Import a class and return an instance of it, first by trying + to find the class in a default namespace, then failing back to + a full path if not found in the default namespace. + """ + import_value = "%s.%s" % (name_space, import_str) + try: + return import_class(import_value)(*args, **kwargs) + except ImportError: + return import_class(import_str)(*args, **kwargs) + + +def import_module(import_str): + """Import a module.""" + __import__(import_str) + return sys.modules[import_str] diff --git a/reddwarf/openstack/common/jsonutils.py b/reddwarf/openstack/common/jsonutils.py new file mode 100644 index 0000000000..7379d4d4a3 --- /dev/null +++ b/reddwarf/openstack/common/jsonutils.py @@ -0,0 +1,148 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# Copyright 2011 Justin Santa Barbara +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +''' +JSON related utilities. + +This module provides a few things: + + 1) A handy function for getting an object down to something that can be + JSON serialized. See to_primitive(). + + 2) Wrappers around loads() and dumps(). The dumps() wrapper will + automatically use to_primitive() for you if needed. + + 3) This sets up anyjson to use the loads() and dumps() wrappers if anyjson + is available. +''' + + +import datetime +import inspect +import itertools +import json +import xmlrpclib + +from reddwarf.openstack.common import timeutils + + +def to_primitive(value, convert_instances=False, level=0): + """Convert a complex object into primitives. + + Handy for JSON serialization. We can optionally handle instances, + but since this is a recursive function, we could have cyclical + data structures. + + To handle cyclical data structures we could track the actual objects + visited in a set, but not all objects are hashable. Instead we just + track the depth of the object inspections and don't go too deep. + + Therefore, convert_instances=True is lossy ... be aware. + + """ + nasty = [inspect.ismodule, inspect.isclass, inspect.ismethod, + inspect.isfunction, inspect.isgeneratorfunction, + inspect.isgenerator, inspect.istraceback, inspect.isframe, + inspect.iscode, inspect.isbuiltin, inspect.isroutine, + inspect.isabstract] + for test in nasty: + if test(value): + return unicode(value) + + # value of itertools.count doesn't get caught by inspects + # above and results in infinite loop when list(value) is called. + if type(value) == itertools.count: + return unicode(value) + + # FIXME(vish): Workaround for LP bug 852095. Without this workaround, + # tests that raise an exception in a mocked method that + # has a @wrap_exception with a notifier will fail. If + # we up the dependency to 0.5.4 (when it is released) we + # can remove this workaround. + if getattr(value, '__module__', None) == 'mox': + return 'mock' + + if level > 3: + return '?' + + # The try block may not be necessary after the class check above, + # but just in case ... + try: + # It's not clear why xmlrpclib created their own DateTime type, but + # for our purposes, make it a datetime type which is explicitly + # handled + if isinstance(value, xmlrpclib.DateTime): + value = datetime.datetime(*tuple(value.timetuple())[:6]) + + if isinstance(value, (list, tuple)): + o = [] + for v in value: + o.append(to_primitive(v, convert_instances=convert_instances, + level=level)) + return o + elif isinstance(value, dict): + o = {} + for k, v in value.iteritems(): + o[k] = to_primitive(v, convert_instances=convert_instances, + level=level) + return o + elif isinstance(value, datetime.datetime): + return timeutils.strtime(value) + elif hasattr(value, 'iteritems'): + return to_primitive(dict(value.iteritems()), + convert_instances=convert_instances, + level=level + 1) + elif hasattr(value, '__iter__'): + return to_primitive(list(value), + convert_instances=convert_instances, + level=level) + elif convert_instances and hasattr(value, '__dict__'): + # Likely an instance of something. Watch for cycles. + # Ignore class member vars. + return to_primitive(value.__dict__, + convert_instances=convert_instances, + level=level + 1) + else: + return value + except TypeError, e: + # Class objects are tricky since they may define something like + # __iter__ defined but it isn't callable as list(). + return unicode(value) + + +def dumps(value, default=to_primitive, **kwargs): + return json.dumps(value, default=default, **kwargs) + + +def loads(s): + return json.loads(s) + + +def load(s): + return json.load(s) + + +try: + import anyjson +except ImportError: + pass +else: + anyjson._modules.append((__name__, 'dumps', TypeError, + 'loads', ValueError, 'load')) + anyjson.force_implementation(__name__) diff --git a/reddwarf/openstack/common/local.py b/reddwarf/openstack/common/local.py new file mode 100644 index 0000000000..19d962732c --- /dev/null +++ b/reddwarf/openstack/common/local.py @@ -0,0 +1,37 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +"""Greenthread local storage of variables using weak references""" + +import weakref + +from eventlet import corolocal + + +class WeakLocal(corolocal.local): + def __getattribute__(self, attr): + rval = corolocal.local.__getattribute__(self, attr) + if rval: + rval = rval() + return rval + + def __setattr__(self, attr, value): + value = weakref.ref(value) + return corolocal.local.__setattr__(self, attr, value) + + +store = WeakLocal() diff --git a/reddwarf/rpc/__init__.py b/reddwarf/openstack/common/rpc/__init__.py similarity index 59% rename from reddwarf/rpc/__init__.py rename to reddwarf/openstack/common/rpc/__init__.py index 4faf49f2aa..fd2e3279e4 100644 --- a/reddwarf/rpc/__init__.py +++ b/reddwarf/openstack/common/rpc/__init__.py @@ -17,14 +17,55 @@ # License for the specific language governing permissions and limitations # under the License. -import logging +""" +A remote procedure call (rpc) abstraction. -from reddwarf.common import utils -from reddwarf.common import config +For some wrappers that add message versioning to rpc, see: + rpc.dispatcher + rpc.proxy +""" -LOG = logging.getLogger(__name__) +#TODO(tim.simpson): Doing this as we aren't yet using the real cfg module. +from reddwarf.common.config import OsCommonModule +cfg = OsCommonModule() -rpc_backend_opt = config.Config.get('rpc_backend', 'reddwarf.rpc.impl_kombu') + +#from openstack.common import cfg +from reddwarf.openstack.common import importutils + + +rpc_opts = [ + cfg.StrOpt('rpc_backend', + default='%s.impl_kombu' % __package__, + help="The messaging module to use, defaults to kombu."), + cfg.IntOpt('rpc_thread_pool_size', + default=64, + help='Size of RPC thread pool'), + cfg.IntOpt('rpc_conn_pool_size', + default=30, + help='Size of RPC connection pool'), + cfg.IntOpt('rpc_response_timeout', + default=60, + help='Seconds to wait for a response from call or multicall'), + cfg.IntOpt('rpc_cast_timeout', + default=30, + help='Seconds to wait before a cast expires (TTL). ' + 'Only supported by impl_zmq.'), + cfg.ListOpt('allowed_rpc_exception_modules', + default=['openstack.common.exception', + 'nova.exception', + ], + help='Modules of exceptions that are permitted to be recreated' + 'upon receiving exception data from an rpc call.'), + cfg.StrOpt('control_exchange', + default='nova', + help='AMQP exchange to connect to if using RabbitMQ or Qpid'), + cfg.BoolOpt('fake_rabbit', + default=False, + help='If passed, use a fake RabbitMQ provider'), +] + +cfg.CONF.register_opts(rpc_opts) def create_connection(new=True): @@ -38,9 +79,9 @@ def create_connection(new=True): implementation is free to return an existing connection from a pool. - :returns: An instance of nova.rpc.common.Connection + :returns: An instance of openstack.common.rpc.common.Connection """ - return _get_impl().create_connection(new=new) + return _get_impl().create_connection(cfg.CONF, new=new) def call(context, topic, msg, timeout=None): @@ -50,8 +91,9 @@ def call(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -59,10 +101,10 @@ def call(context, topic, msg, timeout=None): :returns: A dict from the remote method. - :raises: nova.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ - return _get_impl().call(context, topic, msg, timeout) + return _get_impl().call(cfg.CONF, context, topic, msg, timeout) def cast(context, topic, msg): @@ -72,14 +114,15 @@ def cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :returns: None """ - return _get_impl().cast(context, topic, msg) + return _get_impl().cast(cfg.CONF, context, topic, msg) def cast_with_consumer(context, topic, msg): @@ -96,12 +139,12 @@ def cast_with_consumer(context, topic, msg): :returns: None """ - return _get_impl().cast_with_consumer(context, topic, msg) + return _get_impl().cast_with_consumer(cfg.CONF, context, topic, msg) def delete_queue(context, topic): """Deletes the queue.""" - return _get_impl().delete_queue(context, topic) + return _get_impl().delete_queue(cfg.CONF, context, topic) def fanout_cast(context, topic, msg): @@ -114,14 +157,15 @@ def fanout_cast(context, topic, msg): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=True. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=True. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :returns: None """ - return _get_impl().fanout_cast(context, topic, msg) + return _get_impl().fanout_cast(cfg.CONF, context, topic, msg) def multicall(context, topic, msg, timeout=None): @@ -135,8 +179,9 @@ def multicall(context, topic, msg, timeout=None): request. :param topic: The topic to send the rpc message to. This correlates to the topic argument of - nova.rpc.common.Connection.create_consumer() and only applies - when the consumer was created with fanout=False. + openstack.common.rpc.common.Connection.create_consumer() + and only applies when the consumer was created with + fanout=False. :param msg: This is a dict in the form { "method" : "method_to_invoke", "args" : dict_of_kwargs } :param timeout: int, number of seconds to use for a response timeout. @@ -147,10 +192,10 @@ def multicall(context, topic, msg, timeout=None): returned and X is the Nth value that was returned by the remote method. - :raises: nova.rpc.common.Timeout if a complete response is not received - before the timeout is reached. + :raises: openstack.common.rpc.common.Timeout if a complete response + is not received before the timeout is reached. """ - return _get_impl().multicall(context, topic, msg, timeout) + return _get_impl().multicall(cfg.CONF, context, topic, msg, timeout) def notify(context, topic, msg): @@ -163,7 +208,7 @@ def notify(context, topic, msg): :returns: None """ - return _get_impl().notify(context, topic, msg) + return _get_impl().notify(cfg.CONF, context, topic, msg) def cleanup(): @@ -191,7 +236,8 @@ def cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().cast_to_server(context, server_params, topic, msg) + return _get_impl().cast_to_server(cfg.CONF, context, server_params, topic, + msg) def fanout_cast_to_server(context, server_params, topic, msg): @@ -206,16 +252,40 @@ def fanout_cast_to_server(context, server_params, topic, msg): :returns: None """ - return _get_impl().fanout_cast_to_server(context, server_params, topic, - msg) + return _get_impl().fanout_cast_to_server(cfg.CONF, context, server_params, + topic, msg) + + +def queue_get_for(context, topic, host): + """Get a queue name for a given topic + host. + + This function only works if this naming convention is followed on the + consumer side, as well. For example, in nova, every instance of the + nova-foo service calls create_consumer() for two topics: + + foo + foo. + + Messages sent to the 'foo' topic are distributed to exactly one instance of + the nova-foo service. The services are chosen in a round-robin fashion. + Messages sent to the 'foo.' topic are sent to the nova-foo service on + . + """ + return '%s.%s' % (topic, host) _RPCIMPL = None def _get_impl(): - """Delay import of rpc_backend until FLAGS are loaded.""" + """Delay import of rpc_backend until configuration is loaded.""" global _RPCIMPL if _RPCIMPL is None: - _RPCIMPL = utils.import_object(rpc_backend_opt) + try: + _RPCIMPL = importutils.import_module(cfg.CONF.rpc_backend) + except ImportError: + # For backwards compatibility with older nova config. + impl = cfg.CONF.rpc_backend.replace('nova.rpc', + 'nova.openstack.common.rpc') + _RPCIMPL = importutils.import_module(impl) return _RPCIMPL diff --git a/reddwarf/rpc/amqp.py b/reddwarf/openstack/common/rpc/amqp.py similarity index 63% rename from reddwarf/rpc/amqp.py rename to reddwarf/openstack/common/rpc/amqp.py index 0b3ee5a3de..590c071d9f 100644 --- a/reddwarf/rpc/amqp.py +++ b/reddwarf/openstack/common/rpc/amqp.py @@ -18,7 +18,7 @@ # under the License. """ -Shared code between AMQP based nova.rpc implementations. +Shared code between AMQP based openstack.common.rpc implementations. The code in this module is shared between the rpc implemenations based on AMQP. Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses @@ -28,17 +28,18 @@ AMQP, but is deprecated and predates this code. import inspect import logging import sys -import traceback import uuid from eventlet import greenpool from eventlet import pools +from eventlet import semaphore -from reddwarf.common import config -from reddwarf.common import exception -from reddwarf.common import local -import reddwarf.rpc.common as rpc_common -from reddwarf.common import context +from reddwarf.openstack.common import excutils +#TODO(tim.simpson): Import the true version of Mr. Underscore. +#from reddwarf.openstack.common.gettextutils import _ + +from reddwarf.openstack.common import local +from reddwarf.openstack.common.rpc import common as rpc_common LOG = logging.getLogger(__name__) @@ -46,43 +47,57 @@ LOG = logging.getLogger(__name__) class Pool(pools.Pool): """Class that implements a Pool of Connections.""" - def __init__(self, *args, **kwargs): - self.connection_cls = kwargs.pop("connection_cls", None) - kwargs.setdefault("max_size", - config.Config.get('rpc_conn_pool_size', 30)) + def __init__(self, conf, connection_cls, *args, **kwargs): + self.connection_cls = connection_cls + self.conf = conf + kwargs.setdefault("max_size", self.conf.rpc_conn_pool_size) kwargs.setdefault("order_as_stack", True) super(Pool, self).__init__(*args, **kwargs) # TODO(comstud): Timeout connections not used in a while def create(self): - LOG.debug(_('Pool creating new connection')) - return self.connection_cls() + LOG.debug('Pool creating new connection') + return self.connection_cls(self.conf) def empty(self): while self.free_items: self.get().close() +_pool_create_sem = semaphore.Semaphore() + + +def get_connection_pool(conf, connection_cls): + with _pool_create_sem: + # Make sure only one thread tries to create the connection pool. + if not connection_cls.pool: + connection_cls.pool = Pool(conf, connection_cls) + return connection_cls.pool + + class ConnectionContext(rpc_common.Connection): """The class that is actually returned to the caller of - create_connection(). This is a essentially a wrapper around - Connection that supports 'with' and can return a new Connection or - one from a pool. It will also catch when an instance of this class - is to be deleted so that we can return Connections to the pool on - exceptions and so forth without making the caller be responsible for - catching all exceptions and making sure to return a connection to - the pool. + create_connection(). This is essentially a wrapper around + Connection that supports 'with'. It can also return a new + Connection, or one from a pool. The function will also catch + when an instance of this class is to be deleted. With that + we can return Connections to the pool on exceptions and so + forth without making the caller be responsible for catching + them. If possible the function makes sure to return a + connection to the pool. """ - def __init__(self, connection_pool, pooled=True, server_params=None): + def __init__(self, conf, connection_pool, pooled=True, server_params=None): """Create a new connection, or get one from the pool""" self.connection = None + self.conf = conf self.connection_pool = connection_pool if pooled: self.connection = connection_pool.get() else: self.connection = connection_pool.connection_cls( - server_params=server_params) + conf, + server_params=server_params) self.pooled = pooled def __enter__(self): @@ -121,6 +136,9 @@ class ConnectionContext(rpc_common.Connection): def create_consumer(self, topic, proxy, fanout=False): self.connection.create_consumer(topic, proxy, fanout) + def create_worker(self, topic, proxy, pool_name): + self.connection.create_worker(topic, proxy, pool_name) + def consume_in_thread(self): self.connection.consume_in_thread() @@ -129,50 +147,54 @@ class ConnectionContext(rpc_common.Connection): if self.connection: return getattr(self.connection, key) else: - raise exception.InvalidRPCConnectionReuse() + raise rpc_common.InvalidRPCConnectionReuse() -def msg_reply(msg_id, connection_pool, reply=None, failure=None, ending=False): +def msg_reply(conf, msg_id, connection_pool, reply=None, failure=None, + ending=False): """Sends a reply or an error on the channel signified by msg_id. Failure should be a sys.exc_info() tuple. """ - with ConnectionContext(connection_pool) as conn: + with ConnectionContext(conf, connection_pool) as conn: if failure: - message = str(failure[1]) - tb = traceback.format_exception(*failure) - LOG.error(_("Returning exception %s to caller"), message) - LOG.error(tb) - failure = (failure[0].__name__, str(failure[1]), tb) + failure = rpc_common.serialize_remote_exception(failure) try: msg = {'result': reply, 'failure': failure} except TypeError: msg = {'result': dict((k, repr(v)) - for k, v in reply.__dict__.iteritems()), - 'failure': failure} + for k, v in reply.__dict__.iteritems()), + 'failure': failure} if ending: msg['ending'] = True conn.direct_send(msg_id, msg) -class RpcContext(context.ReddwarfContext): +class RpcContext(rpc_common.CommonRpcContext): """Context that supports replying to a rpc.call""" - def __init__(self, *args, **kwargs): + def __init__(self, **kwargs): self.msg_id = kwargs.pop('msg_id', None) - super(RpcContext, self).__init__(*args, **kwargs) + self.conf = kwargs.pop('conf') + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['conf'] = self.conf + values['msg_id'] = self.msg_id + return self.__class__(**values) def reply(self, reply=None, failure=None, ending=False, connection_pool=None): if self.msg_id: - msg_reply(self.msg_id, connection_pool, reply, failure, + msg_reply(self.conf, self.msg_id, connection_pool, reply, failure, ending) if ending: self.msg_id = None -def unpack_context(msg): +def unpack_context(conf, msg): """Unpack context from msg.""" context_dict = {} for key in list(msg.keys()): @@ -183,8 +205,9 @@ def unpack_context(msg): value = msg.pop(key) context_dict[key[9:]] = value context_dict['msg_id'] = msg.pop('_msg_id', None) + context_dict['conf'] = conf ctx = RpcContext.from_dict(context_dict) - LOG.debug(_('unpacked context: %s'), ctx.to_dict()) + rpc_common._safe_log(LOG.debug, _('unpacked context: %s'), ctx.to_dict()) return ctx @@ -205,11 +228,11 @@ def pack_context(msg, context): class ProxyCallback(object): """Calls methods on a proxy object based on method and args.""" - def __init__(self, proxy, connection_pool): + def __init__(self, conf, proxy, connection_pool): self.proxy = proxy - self.pool = greenpool.GreenPool( - config.Config.get('rpc_thread_pool_size', 1024)) + self.pool = greenpool.GreenPool(conf.rpc_thread_pool_size) self.connection_pool = connection_pool + self.conf = conf def __call__(self, message_data): """Consumer callback to call a method on a proxy object. @@ -229,27 +252,29 @@ class ProxyCallback(object): if hasattr(local.store, 'context'): del local.store.context rpc_common._safe_log(LOG.debug, _('received %s'), message_data) - ctxt = unpack_context(message_data) + ctxt = unpack_context(self.conf, message_data) method = message_data.get('method') args = message_data.get('args', {}) + version = message_data.get('version', None) if not method: LOG.warn(_('no method for message: %s') % message_data) ctxt.reply(_('No method for message: %s') % message_data, connection_pool=self.connection_pool) return - self.pool.spawn_n(self._process_data, ctxt, method, args) + self.pool.spawn_n(self._process_data, ctxt, version, method, args) - @exception.wrap_exception - def _process_data(self, ctxt, method, args): - """Thread that magically looks for a method on the proxy - object and calls it. + def _process_data(self, ctxt, version, method, args): + """Process a message in a new thread. + + If the proxy object we have has a dispatch method + (see rpc.dispatcher.RpcDispatcher), pass it the version, + method, and args and let it dispatch as appropriate. If not, use + the old behavior of magically calling the specified method on the + proxy we have here. """ - + ctxt.update_store() try: - node_func = getattr(self.proxy, str(method)) - node_args = dict((str(k), v) for k, v in args.iteritems()) - # NOTE(vish): magic is fun! - rval = node_func(context=ctxt, **node_args) + rval = self.proxy.dispatch(ctxt, version, method, **args) # Check if the result was a generator if inspect.isgenerator(rval): for x in rval: @@ -259,20 +284,20 @@ class ProxyCallback(object): # This final None tells multicall that it is done. ctxt.reply(ending=True, connection_pool=self.connection_pool) except Exception as e: - LOG.exception(_('Exception during message handling')) + LOG.exception('Exception during message handling') ctxt.reply(None, sys.exc_info(), connection_pool=self.connection_pool) - return class MulticallWaiter(object): - def __init__(self, connection, timeout): + def __init__(self, conf, connection, timeout): self._connection = connection - timeout = timeout or config.Config.get('rpc_response_timeout', 3600) - self._iterator = connection.iterconsume(timeout) + self._iterator = connection.iterconsume(timeout=timeout or + conf.rpc_response_timeout) self._result = None self._done = False self._got_ending = False + self._conf = conf def done(self): if self._done: @@ -285,7 +310,10 @@ class MulticallWaiter(object): def __call__(self, data): """The consume() callback will call this. Store the result.""" if data['failure']: - self._result = rpc_common.RemoteError(*data['failure']) + failure = data['failure'] + self._result = rpc_common.deserialize_remote_exception(self._conf, + failure) + elif data.get('ending', False): self._got_ending = True else: @@ -296,7 +324,11 @@ class MulticallWaiter(object): if self._done: raise StopIteration while True: - self._iterator.next() + try: + self._iterator.next() + except Exception: + with excutils.save_and_reraise_exception(): + self.done() if self._got_ending: self.done() raise StopIteration @@ -307,12 +339,12 @@ class MulticallWaiter(object): yield result -def create_connection(new, connection_pool): +def create_connection(conf, new, connection_pool): """Create a connection""" - return ConnectionContext(connection_pool, pooled=not new) + return ConnectionContext(conf, connection_pool, pooled=not new) -def multicall(context, topic, msg, timeout, connection_pool): +def multicall(conf, context, topic, msg, timeout, connection_pool): """Make a call that returns multiple times.""" # Can't use 'with' for multicall, as it returns an iterator # that will continue to use the connection. When it's done, @@ -324,16 +356,16 @@ def multicall(context, topic, msg, timeout, connection_pool): LOG.debug(_('MSG_ID is %s') % (msg_id)) pack_context(msg, context) - conn = ConnectionContext(connection_pool) - wait_msg = MulticallWaiter(conn, timeout) + conn = ConnectionContext(conf, connection_pool) + wait_msg = MulticallWaiter(conf, conn, timeout) conn.declare_direct_consumer(msg_id, wait_msg) conn.topic_send(topic, msg) return wait_msg -def call(context, topic, msg, timeout, connection_pool): +def call(conf, context, topic, msg, timeout, connection_pool): """Sends a message on a topic and wait for a response.""" - rv = multicall(context, topic, msg, timeout, connection_pool) + rv = multicall(conf, context, topic, msg, timeout, connection_pool) # NOTE(vish): return the last result from the multicall rv = list(rv) if not rv: @@ -341,55 +373,57 @@ def call(context, topic, msg, timeout, connection_pool): return rv[-1] -def cast(context, topic, msg, connection_pool): +def cast(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) - with ConnectionContext(connection_pool) as conn: + with ConnectionContext(conf, connection_pool) as conn: conn.topic_send(topic, msg) -def cast_with_consumer(context, topic, msg, connection_pool): +def cast_with_consumer(conf, context, topic, msg, connection_pool): """Sends a message on a topic without waiting for a response.""" LOG.debug(_('Making asynchronous cast on %s...'), topic) pack_context(msg, context) - with ConnectionContext(connection_pool) as conn: + with ConnectionContext(conf, connection_pool) as conn: consumer = conn.declare_topic_consumer(topic=topic) conn.topic_send(topic, msg) -def fanout_cast(context, topic, msg, connection_pool): +def fanout_cast(conf, context, topic, msg, connection_pool): """Sends a message on a fanout exchange without waiting for a response.""" LOG.debug(_('Making asynchronous fanout cast...')) pack_context(msg, context) - with ConnectionContext(connection_pool) as conn: + with ConnectionContext(conf, connection_pool) as conn: conn.fanout_send(topic, msg) -def cast_to_server(context, server_params, topic, msg, connection_pool): +def cast_to_server(conf, context, server_params, topic, msg, connection_pool): """Sends a message on a topic to a specific server.""" pack_context(msg, context) - with ConnectionContext(connection_pool, pooled=False, - server_params=server_params) as conn: + with ConnectionContext(conf, connection_pool, pooled=False, + server_params=server_params) as conn: conn.topic_send(topic, msg) -def fanout_cast_to_server(context, server_params, topic, msg, - connection_pool): +def fanout_cast_to_server(conf, context, server_params, topic, msg, + connection_pool): """Sends a message on a fanout exchange to a specific server.""" pack_context(msg, context) - with ConnectionContext(connection_pool, pooled=False, - server_params=server_params) as conn: + with ConnectionContext(conf, connection_pool, pooled=False, + server_params=server_params) as conn: conn.fanout_send(topic, msg) -def notify(context, topic, msg, connection_pool): +def notify(conf, context, topic, msg, connection_pool): """Sends a notification event on a topic.""" - LOG.debug(_('Sending notification on %s...'), topic) + event_type = msg.get('event_type') + LOG.debug(_('Sending %(event_type)s on %(topic)s'), locals()) pack_context(msg, context) - with ConnectionContext(connection_pool) as conn: + with ConnectionContext(conf, connection_pool) as conn: conn.notify_send(topic, msg) def cleanup(connection_pool): - connection_pool.empty() + if connection_pool: + connection_pool.empty() diff --git a/reddwarf/openstack/common/rpc/common.py b/reddwarf/openstack/common/rpc/common.py new file mode 100644 index 0000000000..5fde45afb7 --- /dev/null +++ b/reddwarf/openstack/common/rpc/common.py @@ -0,0 +1,321 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2010 United States Government as represented by the +# Administrator of the National Aeronautics and Space Administration. +# All Rights Reserved. +# Copyright 2011 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import copy +import logging +import sys +import traceback + +#from reddwarf.openstack.common import cfg +#TODO(tim.simpson): Doing this as we aren't yet using the real cfg module. +from reddwarf.common.config import OsCommonModule +cfg = OsCommonModule() + + +#TODO(tim.simpson): Import the true version of Mr. Underscore. +#from reddwarf.openstack.common.gettextutils import _ +from reddwarf.openstack.common import importutils +from reddwarf.openstack.common import jsonutils +from reddwarf.openstack.common import local + + +LOG = logging.getLogger(__name__) + + +class RPCException(Exception): + message = _("An unknown RPC related exception occurred.") + + def __init__(self, message=None, **kwargs): + self.kwargs = kwargs + + if not message: + try: + message = self.message % kwargs + + except Exception as e: + # kwargs doesn't match a variable in the message + # log the issue and the kwargs + LOG.exception(_('Exception in string format operation')) + for name, value in kwargs.iteritems(): + LOG.error("%s: %s" % (name, value)) + # at least get the core message out if something happened + message = self.message + + super(RPCException, self).__init__(message) + + +class RemoteError(RPCException): + """Signifies that a remote class has raised an exception. + + Contains a string representation of the type of the original exception, + the value of the original exception, and the traceback. These are + sent to the parent as a joined string so printing the exception + contains all of the relevant info. + + """ + message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") + + def __init__(self, exc_type=None, value=None, traceback=None): + self.exc_type = exc_type + self.value = value + self.traceback = traceback + super(RemoteError, self).__init__(exc_type=exc_type, + value=value, + traceback=traceback) + + +class Timeout(RPCException): + """Signifies that a timeout has occurred. + + This exception is raised if the rpc_response_timeout is reached while + waiting for a response from the remote side. + """ + message = _("Timeout while waiting on RPC response.") + + +class InvalidRPCConnectionReuse(RPCException): + message = _("Invalid reuse of an RPC connection.") + + +class UnsupportedRpcVersion(RPCException): + message = _("Specified RPC version, %(version)s, not supported by " + "this endpoint.") + + +class Connection(object): + """A connection, returned by rpc.create_connection(). + + This class represents a connection to the message bus used for rpc. + An instance of this class should never be created by users of the rpc API. + Use rpc.create_connection() instead. + """ + def close(self): + """Close the connection. + + This method must be called when the connection will no longer be used. + It will ensure that any resources associated with the connection, such + as a network connection, and cleaned up. + """ + raise NotImplementedError() + + def create_consumer(self, conf, topic, proxy, fanout=False): + """Create a consumer on this connection. + + A consumer is associated with a message queue on the backend message + bus. The consumer will read messages from the queue, unpack them, and + dispatch them to the proxy object. The contents of the message pulled + off of the queue will determine which method gets called on the proxy + object. + + :param conf: An openstack.common.cfg configuration object. + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. For example, all instances of nova-compute consume + from a queue called "compute". In that case, the + messages will get distributed amongst the consumers in a + round-robin fashion if fanout=False. If fanout=True, + every consumer associated with this topic will get a + copy of every message. + :param proxy: The object that will handle all incoming messages. + :param fanout: Whether or not this is a fanout topic. See the + documentation for the topic parameter for some + additional comments on this. + """ + raise NotImplementedError() + + def create_worker(self, conf, topic, proxy, pool_name): + """Create a worker on this connection. + + A worker is like a regular consumer of messages directed to a + topic, except that it is part of a set of such consumers (the + "pool") which may run in parallel. Every pool of workers will + receive a given message, but only one worker in the pool will + be asked to process it. Load is distributed across the members + of the pool in round-robin fashion. + + :param conf: An openstack.common.cfg configuration object. + :param topic: This is a name associated with what to consume from. + Multiple instances of a service may consume from the same + topic. + :param proxy: The object that will handle all incoming messages. + :param pool_name: String containing the name of the pool of workers + """ + raise NotImplementedError() + + def consume_in_thread(self): + """Spawn a thread to handle incoming messages. + + Spawn a thread that will be responsible for handling all incoming + messages for consumers that were set up on this connection. + + Message dispatching inside of this is expected to be implemented in a + non-blocking manner. An example implementation would be having this + thread pull messages in for all of the consumers, but utilize a thread + pool for dispatching the messages to the proxy objects. + """ + raise NotImplementedError() + + +def _safe_log(log_func, msg, msg_data): + """Sanitizes the msg_data field before logging.""" + SANITIZE = {'set_admin_password': ('new_pass',), + 'run_instance': ('admin_password',), } + + has_method = 'method' in msg_data and msg_data['method'] in SANITIZE + has_context_token = '_context_auth_token' in msg_data + has_token = 'auth_token' in msg_data + + if not any([has_method, has_context_token, has_token]): + return log_func(msg, msg_data) + + msg_data = copy.deepcopy(msg_data) + + if has_method: + method = msg_data['method'] + if method in SANITIZE: + args_to_sanitize = SANITIZE[method] + for arg in args_to_sanitize: + try: + msg_data['args'][arg] = "" + except KeyError: + pass + + if has_context_token: + msg_data['_context_auth_token'] = '' + + if has_token: + msg_data['auth_token'] = '' + + return log_func(msg, msg_data) + + +def serialize_remote_exception(failure_info): + """Prepares exception data to be sent over rpc. + + Failure_info should be a sys.exc_info() tuple. + + """ + tb = traceback.format_exception(*failure_info) + failure = failure_info[1] + LOG.error(_("Returning exception %s to caller"), unicode(failure)) + LOG.error(tb) + + kwargs = {} + if hasattr(failure, 'kwargs'): + kwargs = failure.kwargs + + data = { + 'class': str(failure.__class__.__name__), + 'module': str(failure.__class__.__module__), + 'message': unicode(failure), + 'tb': tb, + 'args': failure.args, + 'kwargs': kwargs + } + + json_data = jsonutils.dumps(data) + + return json_data + + +def deserialize_remote_exception(conf, data): + failure = jsonutils.loads(str(data)) + + trace = failure.get('tb', []) + message = failure.get('message', "") + "\n" + "\n".join(trace) + name = failure.get('class') + module = failure.get('module') + + # NOTE(ameade): We DO NOT want to allow just any module to be imported, in + # order to prevent arbitrary code execution. + if not module in conf.allowed_rpc_exception_modules: + return RemoteError(name, failure.get('message'), trace) + + try: + mod = importutils.import_module(module) + klass = getattr(mod, name) + if not issubclass(klass, Exception): + raise TypeError("Can only deserialize Exceptions") + + failure = klass(**failure.get('kwargs', {})) + except (AttributeError, TypeError, ImportError): + return RemoteError(name, failure.get('message'), trace) + + ex_type = type(failure) + str_override = lambda self: message + new_ex_type = type(ex_type.__name__ + "_Remote", (ex_type,), + {'__str__': str_override, '__unicode__': str_override}) + try: + # NOTE(ameade): Dynamically create a new exception type and swap it in + # as the new type for the exception. This only works on user defined + # Exceptions and not core python exceptions. This is important because + # we cannot necessarily change an exception message so we must override + # the __str__ method. + failure.__class__ = new_ex_type + except TypeError as e: + # NOTE(ameade): If a core exception then just add the traceback to the + # first exception argument. + failure.args = (message,) + failure.args[1:] + return failure + + +class CommonRpcContext(object): + def __init__(self, **kwargs): + self.values = kwargs + + def __getattr__(self, key): + try: + return self.values[key] + except KeyError: + raise AttributeError(key) + + def to_dict(self): + return copy.deepcopy(self.values) + + @classmethod + def from_dict(cls, values): + return cls(**values) + + def deepcopy(self): + return self.from_dict(self.to_dict()) + + def update_store(self): + local.store.context = self + + def elevated(self, read_deleted=None, overwrite=False): + """Return a version of this context with admin flag set.""" + # TODO(russellb) This method is a bit of a nova-ism. It makes + # some assumptions about the data in the request context sent + # across rpc, while the rest of this class does not. We could get + # rid of this if we changed the nova code that uses this to + # convert the RpcContext back to its native RequestContext doing + # something like nova.context.RequestContext.from_dict(ctxt.to_dict()) + + context = self.deepcopy() + context.values['is_admin'] = True + + context.values.setdefault('roles', []) + + if 'admin' not in context.values['roles']: + context.values['roles'].append('admin') + + if read_deleted is not None: + context.values['read_deleted'] = read_deleted + + return context diff --git a/reddwarf/openstack/common/rpc/dispatcher.py b/reddwarf/openstack/common/rpc/dispatcher.py new file mode 100644 index 0000000000..6c7c34c807 --- /dev/null +++ b/reddwarf/openstack/common/rpc/dispatcher.py @@ -0,0 +1,150 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Code for rpc message dispatching. + +Messages that come in have a version number associated with them. RPC API +version numbers are in the form: + + Major.Minor + +For a given message with version X.Y, the receiver must be marked as able to +handle messages of version A.B, where: + + A = X + + B >= Y + +The Major version number would be incremented for an almost completely new API. +The Minor version number would be incremented for backwards compatible changes +to an existing API. A backwards compatible change could be something like +adding a new method, adding an argument to an existing method (but not +requiring it), or changing the type for an existing argument (but still +handling the old type as well). + +The conversion over to a versioned API must be done on both the client side and +server side of the API at the same time. However, as the code stands today, +there can be both versioned and unversioned APIs implemented in the same code +base. + + +EXAMPLES: + +Nova was the first project to use versioned rpc APIs. Consider the compute rpc +API as an example. The client side is in nova/compute/rpcapi.py and the server +side is in nova/compute/manager.py. + + +Example 1) Adding a new method. + +Adding a new method is a backwards compatible change. It should be added to +nova/compute/manager.py, and RPC_API_VERSION should be bumped from X.Y to +X.Y+1. On the client side, the new method in nova/compute/rpcapi.py should +have a specific version specified to indicate the minimum API version that must +be implemented for the method to be supported. For example: + + def get_host_uptime(self, ctxt, host): + topic = _compute_topic(self.topic, ctxt, host, None) + return self.call(ctxt, self.make_msg('get_host_uptime'), topic, + version='1.1') + +In this case, version '1.1' is the first version that supported the +get_host_uptime() method. + + +Example 2) Adding a new parameter. + +Adding a new parameter to an rpc method can be made backwards compatible. The +RPC_API_VERSION on the server side (nova/compute/manager.py) should be bumped. +The implementation of the method must not expect the parameter to be present. + + def some_remote_method(self, arg1, arg2, newarg=None): + # The code needs to deal with newarg=None for cases + # where an older client sends a message without it. + pass + +On the client side, the same changes should be made as in example 1. The +minimum version that supports the new parameter should be specified. +""" + +from reddwarf.openstack.common.rpc import common as rpc_common + + +class RpcDispatcher(object): + """Dispatch rpc messages according to the requested API version. + + This class can be used as the top level 'manager' for a service. It + contains a list of underlying managers that have an API_VERSION attribute. + """ + + def __init__(self, callbacks): + """Initialize the rpc dispatcher. + + :param callbacks: List of proxy objects that are an instance + of a class with rpc methods exposed. Each proxy + object should have an RPC_API_VERSION attribute. + """ + self.callbacks = callbacks + super(RpcDispatcher, self).__init__() + + @staticmethod + def _is_compatible(mversion, version): + """Determine whether versions are compatible. + + :param mversion: The API version implemented by a callback. + :param version: The API version requested by an incoming message. + """ + version_parts = version.split('.') + mversion_parts = mversion.split('.') + if int(version_parts[0]) != int(mversion_parts[0]): # Major + return False + if int(version_parts[1]) > int(mversion_parts[1]): # Minor + return False + return True + + def dispatch(self, ctxt, version, method, **kwargs): + """Dispatch a message based on a requested version. + + :param ctxt: The request context + :param version: The requested API version from the incoming message + :param method: The method requested to be called by the incoming + message. + :param kwargs: A dict of keyword arguments to be passed to the method. + + :returns: Whatever is returned by the underlying method that gets + called. + """ + if not version: + version = '1.0' + + had_compatible = False + for proxyobj in self.callbacks: + if hasattr(proxyobj, 'RPC_API_VERSION'): + rpc_api_version = proxyobj.RPC_API_VERSION + else: + rpc_api_version = '1.0' + is_compatible = self._is_compatible(rpc_api_version, version) + had_compatible = had_compatible or is_compatible + if not hasattr(proxyobj, method): + continue + if is_compatible: + return getattr(proxyobj, method)(ctxt, **kwargs) + + if had_compatible: + raise AttributeError("No such RPC function '%s'" % method) + else: + raise rpc_common.UnsupportedRpcVersion(version=version) diff --git a/reddwarf/openstack/common/rpc/impl_fake.py b/reddwarf/openstack/common/rpc/impl_fake.py new file mode 100644 index 0000000000..ef76d2310f --- /dev/null +++ b/reddwarf/openstack/common/rpc/impl_fake.py @@ -0,0 +1,184 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +"""Fake RPC implementation which calls proxy methods directly with no +queues. Casts will block, but this is very useful for tests. +""" + +import inspect +import time + +import eventlet + +from reddwarf.openstack.common import jsonutils +from reddwarf.openstack.common.rpc import common as rpc_common + +CONSUMERS = {} + + +class RpcContext(rpc_common.CommonRpcContext): + def __init__(self, **kwargs): + super(RpcContext, self).__init__(**kwargs) + self._response = [] + self._done = False + + def deepcopy(self): + values = self.to_dict() + new_inst = self.__class__(**values) + new_inst._response = self._response + new_inst._done = self._done + return new_inst + + def reply(self, reply=None, failure=None, ending=False): + if ending: + self._done = True + if not self._done: + self._response.append((reply, failure)) + + +class Consumer(object): + def __init__(self, topic, proxy): + self.topic = topic + self.proxy = proxy + + def call(self, context, version, method, args, timeout): + done = eventlet.event.Event() + + def _inner(): + ctxt = RpcContext.from_dict(context.to_dict()) + try: + rval = self.proxy.dispatch(context, version, method, **args) + res = [] + # Caller might have called ctxt.reply() manually + for (reply, failure) in ctxt._response: + if failure: + raise failure[0], failure[1], failure[2] + res.append(reply) + # if ending not 'sent'...we might have more data to + # return from the function itself + if not ctxt._done: + if inspect.isgenerator(rval): + for val in rval: + res.append(val) + else: + res.append(rval) + done.send(res) + except Exception as e: + done.send_exception(e) + + thread = eventlet.greenthread.spawn(_inner) + + if timeout: + start_time = time.time() + while not done.ready(): + eventlet.greenthread.sleep(1) + cur_time = time.time() + if (cur_time - start_time) > timeout: + thread.kill() + raise rpc_common.Timeout() + + return done.wait() + + +class Connection(object): + """Connection object.""" + + def __init__(self): + self.consumers = [] + + def create_consumer(self, topic, proxy, fanout=False): + consumer = Consumer(topic, proxy) + self.consumers.append(consumer) + if topic not in CONSUMERS: + CONSUMERS[topic] = [] + CONSUMERS[topic].append(consumer) + + def close(self): + for consumer in self.consumers: + CONSUMERS[consumer.topic].remove(consumer) + self.consumers = [] + + def consume_in_thread(self): + pass + + +def create_connection(conf, new=True): + """Create a connection""" + return Connection() + + +def check_serialize(msg): + """Make sure a message intended for rpc can be serialized.""" + jsonutils.dumps(msg) + + +def multicall(conf, context, topic, msg, timeout=None): + """Make a call that returns multiple times.""" + + check_serialize(msg) + + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + version = msg.get('version', None) + + try: + consumer = CONSUMERS[topic][0] + except (KeyError, IndexError): + return iter([None]) + else: + return consumer.call(context, version, method, args, timeout) + + +def call(conf, context, topic, msg, timeout=None): + """Sends a message on a topic and wait for a response.""" + rv = multicall(conf, context, topic, msg, timeout) + # NOTE(vish): return the last result from the multicall + rv = list(rv) + if not rv: + return + return rv[-1] + + +def cast(conf, context, topic, msg): + try: + call(conf, context, topic, msg) + except Exception: + pass + + +def notify(conf, context, topic, msg): + check_serialize(msg) + + +def cleanup(): + pass + + +def fanout_cast(conf, context, topic, msg): + """Cast to all consumers of a topic""" + check_serialize(msg) + method = msg.get('method') + if not method: + return + args = msg.get('args', {}) + version = msg.get('version', None) + + for consumer in CONSUMERS.get(topic, []): + try: + consumer.call(context, version, method, args, None) + except Exception: + pass diff --git a/reddwarf/rpc/impl_kombu.py b/reddwarf/openstack/common/rpc/impl_kombu.py similarity index 60% rename from reddwarf/rpc/impl_kombu.py rename to reddwarf/openstack/common/rpc/impl_kombu.py index 0f4fa7b6a5..f210a5d7cd 100644 --- a/reddwarf/rpc/impl_kombu.py +++ b/reddwarf/openstack/common/rpc/impl_kombu.py @@ -14,27 +14,84 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools import socket import ssl import sys import time -import logging import uuid import eventlet import greenlet import kombu +import kombu.connection import kombu.entity import kombu.messaging -import kombu.connection -from reddwarf.common import config -from reddwarf.rpc import amqp as rpc_amqp -from reddwarf.rpc import common as rpc_common +#from reddwarf.openstack.common import cfg +#TODO(tim.simpson): Doing this as we aren't yet using the real cfg module. +from reddwarf.common.config import OsCommonModule +cfg = OsCommonModule() -LOG = logging.getLogger(__name__) -SSL_VERSION = "SSLv2" +#TODO(tim.simpson): Import the true version of Mr. Underscore. +#from reddwarf.openstack.common.gettextutils import _ + +from reddwarf.openstack.common.rpc import amqp as rpc_amqp +from reddwarf.openstack.common.rpc import common as rpc_common + +kombu_opts = [ + cfg.StrOpt('kombu_ssl_version', + default='', + help='SSL version to use (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_keyfile', + default='', + help='SSL key file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_certfile', + default='', + help='SSL cert file (valid only if SSL enabled)'), + cfg.StrOpt('kombu_ssl_ca_certs', + default='', + help=('SSL certification authority file ' + '(valid only if SSL enabled)')), + cfg.StrOpt('rabbit_host', + default='localhost', + help='the RabbitMQ host'), + cfg.IntOpt('rabbit_port', + default=5672, + help='the RabbitMQ port'), + cfg.BoolOpt('rabbit_use_ssl', + default=False, + help='connect over SSL for RabbitMQ'), + cfg.StrOpt('rabbit_userid', + default='guest', + help='the RabbitMQ userid'), + cfg.StrOpt('rabbit_password', + default='guest', + help='the RabbitMQ password'), + cfg.StrOpt('rabbit_virtual_host', + default='/', + help='the RabbitMQ virtual host'), + cfg.IntOpt('rabbit_retry_interval', + default=1, + help='how frequently to retry connecting with RabbitMQ'), + cfg.IntOpt('rabbit_retry_backoff', + default=2, + help='how long to backoff for between retries when connecting ' + 'to RabbitMQ'), + cfg.IntOpt('rabbit_max_retries', + default=0, + help='maximum retries with trying to connect to RabbitMQ ' + '(the default of 0 implies an infinite retry count)'), + cfg.BoolOpt('rabbit_durable_queues', + default=False, + help='use durable queues in RabbitMQ'), + +] + +cfg.CONF.register_opts(kombu_opts) + +LOG = rpc_common.LOG class ConsumerBase(object): @@ -87,8 +144,11 @@ class ConsumerBase(object): def _callback(raw_message): message = self.channel.message_to_python(raw_message) - callback(message.payload) - message.ack() + try: + callback(message.payload) + message.ack() + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) self.queue.consume(*args, callback=_callback, **options) @@ -106,7 +166,7 @@ class ConsumerBase(object): class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'""" - def __init__(self, channel, msg_id, callback, tag, **kwargs): + def __init__(self, conf, channel, msg_id, callback, tag, **kwargs): """Init a 'direct' queue. 'channel' is the amqp channel to use @@ -118,62 +178,61 @@ class DirectConsumer(ConsumerBase): """ # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=msg_id, - type='direct', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(DirectConsumer, self).__init__( - channel, - callback, - tag, - name=msg_id, - exchange=exchange, - routing_key=msg_id, - **options) + exchange = kombu.entity.Exchange(name=msg_id, + type='direct', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(DirectConsumer, self).__init__(channel, + callback, + tag, + name=msg_id, + exchange=exchange, + routing_key=msg_id, + **options) class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, channel, topic, callback, tag, **kwargs): + def __init__(self, conf, channel, topic, callback, tag, name=None, + **kwargs): """Init a 'topic' queue. - 'channel' is the amqp channel to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received - 'tag' is a unique ID for the consumer on the channel + :param channel: the amqp channel to use + :param topic: the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param tag: a unique ID for the consumer on the channel + :param name: optional queue name, defaults to topic + :paramtype name: str - Other kombu options may be passed + Other kombu options may be passed as keyword arguments """ # Default options - options = {'durable': config.Config.get('rabbit_durable_queues', - False), - 'auto_delete': False, - 'exclusive': False} + options = {'durable': conf.rabbit_durable_queues, + 'auto_delete': False, + 'exclusive': False} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=config.Config.get('control_exchange', 'reddwarf'), - type='topic', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(TopicConsumer, self).__init__( - channel, - callback, - tag, - name=topic, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=conf.control_exchange, + type='topic', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(TopicConsumer, self).__init__(channel, + callback, + tag, + name=name or topic, + exchange=exchange, + routing_key=topic, + **options) class FanoutConsumer(ConsumerBase): """Consumer class for 'fanout'""" - def __init__(self, channel, topic, callback, tag, **kwargs): + def __init__(self, conf, channel, topic, callback, tag, **kwargs): """Init a 'fanout' queue. 'channel' is the amqp channel to use @@ -189,22 +248,17 @@ class FanoutConsumer(ConsumerBase): # Default options options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - exchange = kombu.entity.Exchange( - name=exchange_name, - type='fanout', - durable=options['durable'], - auto_delete=options['auto_delete']) - super(FanoutConsumer, self).__init__( - channel, - callback, - tag, - name=queue_name, - exchange=exchange, - routing_key=topic, - **options) + exchange = kombu.entity.Exchange(name=exchange_name, type='fanout', + durable=options['durable'], + auto_delete=options['auto_delete']) + super(FanoutConsumer, self).__init__(channel, callback, tag, + name=queue_name, + exchange=exchange, + routing_key=topic, + **options) class Publisher(object): @@ -222,9 +276,10 @@ class Publisher(object): def reconnect(self, channel): """Re-establish the Producer after a rabbit reconnection""" self.exchange = kombu.entity.Exchange(name=self.exchange_name, - **self.kwargs) + **self.kwargs) self.producer = kombu.messaging.Producer(exchange=self.exchange, - channel=channel, routing_key=self.routing_key) + channel=channel, + routing_key=self.routing_key) def send(self, msg): """Send a message""" @@ -233,67 +288,56 @@ class Publisher(object): class DirectPublisher(Publisher): """Publisher class for 'direct'""" - def __init__(self, channel, msg_id, **kwargs): + def __init__(self, conf, channel, msg_id, **kwargs): """init a 'direct' publisher. Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(DirectPublisher, self).__init__(channel, - msg_id, - msg_id, - type='direct', - **options) + super(DirectPublisher, self).__init__(channel, msg_id, msg_id, + type='direct', **options) class TopicPublisher(Publisher): """Publisher class for 'topic'""" - def __init__(self, channel, topic, **kwargs): + def __init__(self, conf, channel, topic, **kwargs): """init a 'topic' publisher. Kombu options may be passed as keyword args to override defaults """ - options = {'durable': config.Config.get('rabbit_durable_queues', - False), + options = {'durable': conf.rabbit_durable_queues, 'auto_delete': False, 'exclusive': False} options.update(kwargs) - super(TopicPublisher, self).__init__(channel, - config.Config.get('control_exchange', 'reddwarf'), - topic, - type='topic', - **options) + super(TopicPublisher, self).__init__(channel, conf.control_exchange, + topic, type='topic', **options) class FanoutPublisher(Publisher): """Publisher class for 'fanout'""" - def __init__(self, channel, topic, **kwargs): + def __init__(self, conf, channel, topic, **kwargs): """init a 'fanout' publisher. Kombu options may be passed as keyword args to override defaults """ options = {'durable': False, - 'auto_delete': True, - 'exclusive': True} + 'auto_delete': True, + 'exclusive': True} options.update(kwargs) - super(FanoutPublisher, self).__init__(channel, - '%s_fanout' % topic, - None, - type='fanout', - **options) + super(FanoutPublisher, self).__init__(channel, '%s_fanout' % topic, + None, type='fanout', **options) class NotifyPublisher(TopicPublisher): """Publisher class for 'notify'""" - def __init__(self, *args, **kwargs): - default = config.Config.get('rabbit_durable_queues', False) - self.durable = kwargs.pop('durable', default) - super(NotifyPublisher, self).__init__(*args, **kwargs) + def __init__(self, conf, channel, topic, **kwargs): + self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) + super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) def reconnect(self, channel): super(NotifyPublisher, self).reconnect(channel) @@ -302,25 +346,28 @@ class NotifyPublisher(TopicPublisher): # we do this to ensure that messages don't get dropped if the # consumer is started after we do queue = kombu.entity.Queue(channel=channel, - exchange=self.exchange, - durable=self.durable, - name=self.routing_key, - routing_key=self.routing_key) + exchange=self.exchange, + durable=self.durable, + name=self.routing_key, + routing_key=self.routing_key) queue.declare() class Connection(object): """Connection object.""" - def __init__(self, server_params=None): + pool = None + + def __init__(self, conf, server_params=None): self.consumers = [] self.consumer_thread = None - self.max_retries = config.Config.get('rabbit_max_retries', 0) + self.conf = conf + self.max_retries = self.conf.rabbit_max_retries # Try forever? if self.max_retries <= 0: self.max_retries = None - self.interval_start = config.Config.get('rabbit_retry_interval', 1) - self.interval_stepping = config.Config.get('rabbit_retry_backoff', 2) + self.interval_start = self.conf.rabbit_retry_interval + self.interval_stepping = self.conf.rabbit_retry_backoff # max retry-interval = 30 seconds self.interval_max = 30 self.memory_transport = False @@ -336,25 +383,21 @@ class Connection(object): p_key = server_params_to_kombu_params.get(sp_key, sp_key) params[p_key] = value - params.setdefault('hostname', config.Config.get('rabbit_host', - '127.0.0.1')) - params.setdefault('port', config.Config.get('rabbit_port', 5672)) - params.setdefault('userid', - config.Config.get('rabbit_userid', 'guest')) - params.setdefault('password', - config.Config.get('rabbit_password', 'guest')) - params.setdefault('virtual_host', - config.Config.get('rabbit_virtual_host', '/')) + params.setdefault('hostname', self.conf.rabbit_host) + params.setdefault('port', self.conf.rabbit_port) + params.setdefault('userid', self.conf.rabbit_userid) + params.setdefault('password', self.conf.rabbit_password) + params.setdefault('virtual_host', self.conf.rabbit_virtual_host) self.params = params - if config.Config.get('fake_rabbit', False): + if self.conf.fake_rabbit: self.params['transport'] = 'memory' self.memory_transport = True else: self.memory_transport = False - if config.Config.get('rabbit_use_ssl', False): + if self.conf.rabbit_use_ssl: self.params['ssl'] = self._fetch_ssl_params() self.connection = None @@ -366,14 +409,14 @@ class Connection(object): ssl_params = dict() # http://docs.python.org/library/ssl.html - ssl.wrap_socket - if config.Config.get('kombu_ssl_version'): - ssl_params['ssl_version'] = config.Config.get('kombu_ssl_version') - if config.Config.get('kombu_ssl_keyfile'): - ssl_params['keyfile'] = config.Config.get('kombu_ssl_keyfile') - if config.Config.get('kombu_ssl_certfile'): - ssl_params['certfile'] = config.Config.get('kombu_ssl_certfile') - if config.Config.get('kombu_ssl_ca_certs'): - ssl_params['ca_certs'] = config.Config.get('kombu_ssl_ca_certs') + if self.conf.kombu_ssl_version: + ssl_params['ssl_version'] = self.conf.kombu_ssl_version + if self.conf.kombu_ssl_keyfile: + ssl_params['keyfile'] = self.conf.kombu_ssl_keyfile + if self.conf.kombu_ssl_certfile: + ssl_params['certfile'] = self.conf.kombu_ssl_certfile + if self.conf.kombu_ssl_ca_certs: + ssl_params['ca_certs'] = self.conf.kombu_ssl_ca_certs # We might want to allow variations in the # future with this? ssl_params['cert_reqs'] = ssl.CERT_REQUIRED @@ -392,7 +435,7 @@ class Connection(object): """ if self.connection: LOG.info(_("Reconnecting to AMQP server on " - "%(hostname)s:%(port)d") % self.params) + "%(hostname)s:%(port)d") % self.params) try: self.connection.close() except self.connection_errors: @@ -400,8 +443,7 @@ class Connection(object): # Setting this in case the next statement fails, though # it shouldn't be doing any network operations, yet. self.connection = None - self.connection = kombu.connection.BrokerConnection( - **self.params) + self.connection = kombu.connection.BrokerConnection(**self.params) self.connection_errors = self.connection.connection_errors if self.memory_transport: # Kludge to speed up tests. @@ -414,8 +456,8 @@ class Connection(object): self.channel._new_queue('ae.undeliver') for consumer in self.consumers: consumer.reconnect(self.channel) - LOG.info(_('Connected to AMQP server on ' - '%(hostname)s:%(port)d') % self.params) + LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), + self.params) def reconnect(self): """Handles reconnecting and re-establishing queues. @@ -432,7 +474,7 @@ class Connection(object): try: self._connect() return - except self.connection_errors, e: + except (self.connection_errors, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -451,8 +493,8 @@ class Connection(object): if self.max_retries and attempt == self.max_retries: LOG.exception(_('Unable to connect to AMQP server on ' - '%(hostname)s:%(port)d after %(max_retries)d ' - 'tries: %(err_str)s') % log_info) + '%(hostname)s:%(port)d after %(max_retries)d ' + 'tries: %(err_str)s') % log_info) # NOTE(comstud): Copied from original code. There's # really no better recourse because if this was a queue we # need to consume on, we have no way to consume anymore. @@ -467,15 +509,15 @@ class Connection(object): log_info['sleep_time'] = sleep_time LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' - ' unreachable: %(err_str)s. Trying again in ' - '%(sleep_time)d seconds.') % log_info) + ' unreachable: %(err_str)s. Trying again in ' + '%(sleep_time)d seconds.') % log_info) time.sleep(sleep_time) def ensure(self, error_callback, method, *args, **kwargs): while True: try: return method(*args, **kwargs) - except (self.connection_errors, socket.timeout), e: + except (self.connection_errors, socket.timeout, IOError), e: pass except Exception, e: # NOTE(comstud): Unfortunately it's possible for amqplib @@ -518,11 +560,11 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): - consumer = consumer_cls(self.channel, topic, callback, - self.consumer_num.next()) + consumer = consumer_cls(self.conf, self.channel, topic, callback, + self.consumer_num.next()) self.consumers.append(consumer) return consumer @@ -536,11 +578,11 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) info['do_consume'] = True def _consume(): @@ -574,12 +616,10 @@ class Connection(object): def _error_callback(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publish(): - publisher = cls(self.channel, topic, **kwargs) - LOG.info(_("_publish info%s %s %s %s") % (self.channel, topic, - kwargs, publisher)) + publisher = cls(self.conf, self.channel, topic, **kwargs) publisher.send(msg) self.ensure(_error_callback, _publish) @@ -591,9 +631,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -637,73 +680,94 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + if fanout: - self.declare_fanout_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + self.declare_fanout_consumer(topic, proxy_cb) else: - self.declare_topic_consumer(topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + self.declare_topic_consumer(topic, proxy_cb) + + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + self.declare_topic_consumer(topic, proxy_cb, pool_name) -Connection.pool = rpc_amqp.Pool(connection_cls=Connection) - - -def create_connection(new=True): +def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(new, Connection.pool) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) -def multicall(context, topic, msg, timeout=None): +def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) -def call(context, topic, msg, timeout=None): +def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) -def cast(context, topic, msg): +def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg, Connection.pool) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def cast_with_consumer(context, topic, msg): +def cast_with_consumer(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast_with_consumer(context, topic, msg, Connection.pool) + return rpc_amqp.cast_with_consumer(conf, context, topic, msg, + Connection.pool) -def delete_queue(context, topic): +def delete_queue(conf, context, topic): LOG.debug("Deleting queue with name %s." % topic) - with rpc_amqp.ConnectionContext(Connection.pool) as conn: + with rpc_amqp.ConnectionContext(conf, Connection.pool) as conn: channel = conn.channel - durable = config.Config.get('rabbit_durable_queues', False) + durable = conf.rabbit_durable_queues queue = kombu.entity.Queue(name=topic, channel=channel, auto_delete=False, exclusive=False, durable=durable) queue.delete() -def fanout_cast(context, topic, msg): +def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def cast_to_server(context, server_params, topic, msg): +def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(context, server_params, topic, msg, - Connection.pool) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def fanout_cast_to_server(context, server_params, topic, msg): +def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.cast_to_server(context, server_params, topic, msg, - Connection.pool) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def notify(context, topic, msg): +def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg, Connection.pool) + return rpc_amqp.notify( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/reddwarf/rpc/impl_qpid.py b/reddwarf/openstack/common/rpc/impl_qpid.py similarity index 67% rename from reddwarf/rpc/impl_qpid.py rename to reddwarf/openstack/common/rpc/impl_qpid.py index 9366c2dd4a..78d1ceffca 100644 --- a/reddwarf/rpc/impl_qpid.py +++ b/reddwarf/openstack/common/rpc/impl_qpid.py @@ -15,22 +15,24 @@ # License for the specific language governing permissions and limitations # under the License. +import functools import itertools +import logging import time import uuid -import json import eventlet import greenlet import qpid.messaging import qpid.messaging.exceptions -from nova import flags -from nova.openstack.common import cfg -from nova.rpc import amqp as rpc_amqp -from nova.rpc import common as rpc_common -from nova.rpc.common import LOG +from reddwarf.openstack.common import cfg +from reddwarf.openstack.common.gettextutils import _ +from reddwarf.openstack.common import jsonutils +from reddwarf.openstack.common.rpc import amqp as rpc_amqp +from reddwarf.openstack.common.rpc import common as rpc_common +LOG = logging.getLogger(__name__) qpid_opts = [ cfg.StrOpt('qpid_hostname', @@ -75,10 +77,9 @@ qpid_opts = [ cfg.BoolOpt('qpid_tcp_nodelay', default=True, help='Disable Nagle algorithm'), - ] +] -FLAGS = flags.FLAGS -FLAGS.register_opts(qpid_opts) +cfg.CONF.register_opts(qpid_opts) class ConsumerBase(object): @@ -124,7 +125,7 @@ class ConsumerBase(object): addr_opts["node"]["x-declare"].update(node_opts) addr_opts["link"]["x-declare"].update(link_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -137,7 +138,12 @@ class ConsumerBase(object): def consume(self): """Fetch the message and pass it to the callback object""" message = self.receiver.fetch() - self.callback(message.content) + try: + self.callback(message.content) + except Exception: + LOG.exception(_("Failed to process message... skipping it.")) + finally: + self.session.acknowledge(message) def get_receiver(self): return self.receiver @@ -146,7 +152,7 @@ class ConsumerBase(object): class DirectConsumer(ConsumerBase): """Queue/consumer class for 'direct'""" - def __init__(self, session, msg_id, callback): + def __init__(self, conf, session, msg_id, callback): """Init a 'direct' queue. 'session' is the amqp session to use @@ -155,32 +161,35 @@ class DirectConsumer(ConsumerBase): """ super(DirectConsumer, self).__init__(session, callback, - "%s/%s" % (msg_id, msg_id), - {"type": "direct"}, - msg_id, - {"exclusive": True}) + "%s/%s" % (msg_id, msg_id), + {"type": "direct"}, + msg_id, + {"exclusive": True}) class TopicConsumer(ConsumerBase): """Consumer class for 'topic'""" - def __init__(self, session, topic, callback): + def __init__(self, conf, session, topic, callback, name=None): """Init a 'topic' queue. - 'session' is the amqp session to use - 'topic' is the topic to listen on - 'callback' is the callback to call when messages are received + :param session: the amqp session to use + :param topic: is the topic to listen on + :paramtype topic: str + :param callback: the callback to call when messages are received + :param name: optional queue name, defaults to topic """ super(TopicConsumer, self).__init__(session, callback, - "%s/%s" % (FLAGS.control_exchange, topic), {}, - topic, {}) + "%s/%s" % (conf.control_exchange, + topic), + {}, name or topic, {}) class FanoutConsumer(ConsumerBase): """Consumer class for 'fanout'""" - def __init__(self, session, topic, callback): + def __init__(self, conf, session, topic, callback): """Init a 'fanout' queue. 'session' is the amqp session to use @@ -188,11 +197,12 @@ class FanoutConsumer(ConsumerBase): 'callback' is the callback to call when messages are received """ - super(FanoutConsumer, self).__init__(session, callback, - "%s_fanout" % topic, - {"durable": False, "type": "fanout"}, - "%s_fanout_%s" % (topic, uuid.uuid4().hex), - {"exclusive": True}) + super(FanoutConsumer, self).__init__( + session, callback, + "%s_fanout" % topic, + {"durable": False, "type": "fanout"}, + "%s_fanout_%s" % (topic, uuid.uuid4().hex), + {"exclusive": True}) class Publisher(object): @@ -220,7 +230,7 @@ class Publisher(object): if node_opts: addr_opts["node"]["x-declare"].update(node_opts) - self.address = "%s ; %s" % (node_name, json.dumps(addr_opts)) + self.address = "%s ; %s" % (node_name, jsonutils.dumps(addr_opts)) self.reconnect(session) @@ -235,7 +245,7 @@ class Publisher(object): class DirectPublisher(Publisher): """Publisher class for 'direct'""" - def __init__(self, session, msg_id): + def __init__(self, conf, session, msg_id): """Init a 'direct' publisher.""" super(DirectPublisher, self).__init__(session, msg_id, {"type": "Direct"}) @@ -243,47 +253,53 @@ class DirectPublisher(Publisher): class TopicPublisher(Publisher): """Publisher class for 'topic'""" - def __init__(self, session, topic): + def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(TopicPublisher, self).__init__(session, - "%s/%s" % (FLAGS.control_exchange, topic)) + super(TopicPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic)) class FanoutPublisher(Publisher): """Publisher class for 'fanout'""" - def __init__(self, session, topic): + def __init__(self, conf, session, topic): """init a 'fanout' publisher. """ - super(FanoutPublisher, self).__init__(session, - "%s_fanout" % topic, {"type": "fanout"}) + super(FanoutPublisher, self).__init__( + session, + "%s_fanout" % topic, {"type": "fanout"}) class NotifyPublisher(Publisher): """Publisher class for notifications""" - def __init__(self, session, topic): + def __init__(self, conf, session, topic): """init a 'topic' publisher. """ - super(NotifyPublisher, self).__init__(session, - "%s/%s" % (FLAGS.control_exchange, topic), - {"durable": True}) + super(NotifyPublisher, self).__init__( + session, + "%s/%s" % (conf.control_exchange, topic), + {"durable": True}) class Connection(object): """Connection object.""" - def __init__(self, server_params=None): + pool = None + + def __init__(self, conf, server_params=None): self.session = None self.consumers = {} self.consumer_thread = None + self.conf = conf if server_params is None: server_params = {} - default_params = dict(hostname=FLAGS.qpid_hostname, - port=FLAGS.qpid_port, - username=FLAGS.qpid_username, - password=FLAGS.qpid_password) + default_params = dict(hostname=self.conf.qpid_hostname, + port=self.conf.qpid_port, + username=self.conf.qpid_username, + password=self.conf.qpid_password) params = server_params for key in default_params.keys(): @@ -297,23 +313,25 @@ class Connection(object): # before we call open self.connection.username = params['username'] self.connection.password = params['password'] - self.connection.sasl_mechanisms = FLAGS.qpid_sasl_mechanisms - self.connection.reconnect = FLAGS.qpid_reconnect - if FLAGS.qpid_reconnect_timeout: - self.connection.reconnect_timeout = FLAGS.qpid_reconnect_timeout - if FLAGS.qpid_reconnect_limit: - self.connection.reconnect_limit = FLAGS.qpid_reconnect_limit - if FLAGS.qpid_reconnect_interval_max: + self.connection.sasl_mechanisms = self.conf.qpid_sasl_mechanisms + self.connection.reconnect = self.conf.qpid_reconnect + if self.conf.qpid_reconnect_timeout: + self.connection.reconnect_timeout = ( + self.conf.qpid_reconnect_timeout) + if self.conf.qpid_reconnect_limit: + self.connection.reconnect_limit = self.conf.qpid_reconnect_limit + if self.conf.qpid_reconnect_interval_max: self.connection.reconnect_interval_max = ( - FLAGS.qpid_reconnect_interval_max) - if FLAGS.qpid_reconnect_interval_min: + self.conf.qpid_reconnect_interval_max) + if self.conf.qpid_reconnect_interval_min: self.connection.reconnect_interval_min = ( - FLAGS.qpid_reconnect_interval_min) - if FLAGS.qpid_reconnect_interval: - self.connection.reconnect_interval = FLAGS.qpid_reconnect_interval - self.connection.hearbeat = FLAGS.qpid_heartbeat - self.connection.protocol = FLAGS.qpid_protocol - self.connection.tcp_nodelay = FLAGS.qpid_tcp_nodelay + self.conf.qpid_reconnect_interval_min) + if self.conf.qpid_reconnect_interval: + self.connection.reconnect_interval = ( + self.conf.qpid_reconnect_interval) + self.connection.hearbeat = self.conf.qpid_heartbeat + self.connection.protocol = self.conf.qpid_protocol + self.connection.tcp_nodelay = self.conf.qpid_tcp_nodelay # Open is part of reconnect - # NOTE(WGH) not sure we need this with the reconnect flags @@ -337,12 +355,12 @@ class Connection(object): try: self.connection.open() except qpid.messaging.exceptions.ConnectionError, e: - LOG.error(_('Unable to connect to AMQP server: %s ') % str(e)) - time.sleep(FLAGS.qpid_reconnect_interval or 1) + LOG.error(_('Unable to connect to AMQP server: %s'), e) + time.sleep(self.conf.qpid_reconnect_interval or 1) else: break - LOG.info(_('Connected to AMQP server on %s') % self.broker) + LOG.info(_('Connected to AMQP server on %s'), self.broker) self.session = self.connection.session() @@ -382,10 +400,10 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.error(_("Failed to declare consumer for topic '%(topic)s': " - "%(err_str)s") % log_info) + "%(err_str)s") % log_info) def _declare_consumer(): - consumer = consumer_cls(self.session, topic, callback) + consumer = consumer_cls(self.conf, self.session, topic, callback) self._register_consumer(consumer) return consumer @@ -397,15 +415,18 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid.messaging.exceptions.Empty): LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % - str(exc)) + str(exc)) def _consume(): nxt_receiver = self.session.next_receiver(timeout=timeout) - self._lookup_consumer(nxt_receiver).consume() + try: + self._lookup_consumer(nxt_receiver).consume() + except Exception: + LOG.exception(_("Error processing message. Skipping it.")) for iteration in itertools.count(0): if limit and iteration >= limit: @@ -428,10 +449,10 @@ class Connection(object): def _connect_error(exc): log_info = {'topic': topic, 'err_str': str(exc)} LOG.exception(_("Failed to publish message to topic " - "'%(topic)s': %(err_str)s") % log_info) + "'%(topic)s': %(err_str)s") % log_info) def _publisher_send(): - publisher = cls(self.session, topic) + publisher = cls(self.conf, self.session, topic) publisher.send(msg) return self.ensure(_connect_error, _publisher_send) @@ -443,9 +464,12 @@ class Connection(object): """ self.declare_consumer(DirectConsumer, topic, callback) - def declare_topic_consumer(self, topic, callback=None): + def declare_topic_consumer(self, topic, callback=None, queue_name=None): """Create a 'topic' consumer.""" - self.declare_consumer(TopicConsumer, topic, callback) + self.declare_consumer(functools.partial(TopicConsumer, + name=queue_name, + ), + topic, callback) def declare_fanout_consumer(self, topic, callback): """Create a 'fanout' consumer""" @@ -489,59 +513,86 @@ class Connection(object): def create_consumer(self, topic, proxy, fanout=False): """Create a consumer that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + if fanout: - consumer = FanoutConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + consumer = FanoutConsumer(self.conf, self.session, topic, proxy_cb) else: - consumer = TopicConsumer(self.session, topic, - rpc_amqp.ProxyCallback(proxy, Connection.pool)) + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb) + self._register_consumer(consumer) + + return consumer + + def create_worker(self, topic, proxy, pool_name): + """Create a worker that calls a method in a proxy object""" + proxy_cb = rpc_amqp.ProxyCallback( + self.conf, proxy, + rpc_amqp.get_connection_pool(self.conf, Connection)) + + consumer = TopicConsumer(self.conf, self.session, topic, proxy_cb, + name=pool_name) + + self._register_consumer(consumer) + return consumer -Connection.pool = rpc_amqp.Pool(connection_cls=Connection) - - -def create_connection(new=True): +def create_connection(conf, new=True): """Create a connection""" - return rpc_amqp.create_connection(new, Connection.pool) + return rpc_amqp.create_connection( + conf, new, + rpc_amqp.get_connection_pool(conf, Connection)) -def multicall(context, topic, msg, timeout=None): +def multicall(conf, context, topic, msg, timeout=None): """Make a call that returns multiple times.""" - return rpc_amqp.multicall(context, topic, msg, timeout, Connection.pool) + return rpc_amqp.multicall( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) -def call(context, topic, msg, timeout=None): +def call(conf, context, topic, msg, timeout=None): """Sends a message on a topic and wait for a response.""" - return rpc_amqp.call(context, topic, msg, timeout, Connection.pool) + return rpc_amqp.call( + conf, context, topic, msg, timeout, + rpc_amqp.get_connection_pool(conf, Connection)) -def cast(context, topic, msg): +def cast(conf, context, topic, msg): """Sends a message on a topic without waiting for a response.""" - return rpc_amqp.cast(context, topic, msg, Connection.pool) + return rpc_amqp.cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def fanout_cast(context, topic, msg): +def fanout_cast(conf, context, topic, msg): """Sends a message on a fanout exchange without waiting for a response.""" - return rpc_amqp.fanout_cast(context, topic, msg, Connection.pool) + return rpc_amqp.fanout_cast( + conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def cast_to_server(context, server_params, topic, msg): +def cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a topic to a specific server.""" - return rpc_amqp.cast_to_server(context, server_params, topic, msg, - Connection.pool) + return rpc_amqp.cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def fanout_cast_to_server(context, server_params, topic, msg): +def fanout_cast_to_server(conf, context, server_params, topic, msg): """Sends a message on a fanout exchange to a specific server.""" - return rpc_amqp.fanout_cast_to_server(context, server_params, topic, - msg, Connection.pool) + return rpc_amqp.fanout_cast_to_server( + conf, context, server_params, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) -def notify(context, topic, msg): +def notify(conf, context, topic, msg): """Sends a notification event on a topic.""" - return rpc_amqp.notify(context, topic, msg, Connection.pool) + return rpc_amqp.notify(conf, context, topic, msg, + rpc_amqp.get_connection_pool(conf, Connection)) def cleanup(): diff --git a/reddwarf/openstack/common/rpc/impl_zmq.py b/reddwarf/openstack/common/rpc/impl_zmq.py new file mode 100644 index 0000000000..0d4008aa8b --- /dev/null +++ b/reddwarf/openstack/common/rpc/impl_zmq.py @@ -0,0 +1,725 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import pprint +import socket +import string +import sys +import types +import uuid + +import eventlet +from eventlet.green import zmq +import greenlet + +from reddwarf.openstack.common import cfg +from reddwarf.openstack.common.gettextutils import _ +from reddwarf.openstack.common import importutils +from reddwarf.openstack.common import jsonutils +from reddwarf.openstack.common.rpc import common as rpc_common + + +# for convenience, are not modified. +pformat = pprint.pformat +Timeout = eventlet.timeout.Timeout +LOG = rpc_common.LOG +RemoteError = rpc_common.RemoteError +RPCException = rpc_common.RPCException + +zmq_opts = [ + cfg.StrOpt('rpc_zmq_bind_address', default='*', + help='ZeroMQ bind address. Should be a wildcard (*), ' + 'an ethernet interface, or IP. ' + 'The "host" option should point or resolve to this ' + 'address.'), + + # The module.Class to use for matchmaking. + cfg.StrOpt( + 'rpc_zmq_matchmaker', + default=('openstack.common.rpc.' + 'matchmaker.MatchMakerLocalhost'), + help='MatchMaker driver', + ), + + # The following port is unassigned by IANA as of 2012-05-21 + cfg.IntOpt('rpc_zmq_port', default=9501, + help='ZeroMQ receiver listening port'), + + cfg.IntOpt('rpc_zmq_contexts', default=1, + help='Number of ZeroMQ contexts, defaults to 1'), + + cfg.StrOpt('rpc_zmq_ipc_dir', default='/var/run/openstack', + help='Directory for holding IPC sockets'), + + cfg.StrOpt('rpc_zmq_host', default=socket.gethostname(), + help='Name of this node. Must be a valid hostname, FQDN, or ' + 'IP address. Must match "host" option, if running Nova.') +] + + +# These globals are defined in register_opts(conf), +# a mandatory initialization call +FLAGS = None +ZMQ_CTX = None # ZeroMQ Context, must be global. +matchmaker = None # memoized matchmaker object + + +def _serialize(data): + """ + Serialization wrapper + We prefer using JSON, but it cannot encode all types. + Error if a developer passes us bad data. + """ + try: + return str(jsonutils.dumps(data, ensure_ascii=True)) + except TypeError: + LOG.error(_("JSON serialization failed.")) + raise + + +def _deserialize(data): + """ + Deserialization wrapper + """ + LOG.debug(_("Deserializing: %s"), data) + return jsonutils.loads(data) + + +class ZmqSocket(object): + """ + A tiny wrapper around ZeroMQ to simplify the send/recv protocol + and connection management. + + Can be used as a Context (supports the 'with' statement). + """ + + def __init__(self, addr, zmq_type, bind=True, subscribe=None): + self.sock = ZMQ_CTX.socket(zmq_type) + self.addr = addr + self.type = zmq_type + self.subscriptions = [] + + # Support failures on sending/receiving on wrong socket type. + self.can_recv = zmq_type in (zmq.PULL, zmq.SUB) + self.can_send = zmq_type in (zmq.PUSH, zmq.PUB) + self.can_sub = zmq_type in (zmq.SUB, ) + + # Support list, str, & None for subscribe arg (cast to list) + do_sub = { + list: subscribe, + str: [subscribe], + type(None): [] + }[type(subscribe)] + + for f in do_sub: + self.subscribe(f) + + str_data = {'addr': addr, 'type': self.socket_s(), + 'subscribe': subscribe, 'bind': bind} + + LOG.debug(_("Connecting to %(addr)s with %(type)s"), str_data) + LOG.debug(_("-> Subscribed to %(subscribe)s"), str_data) + LOG.debug(_("-> bind: %(bind)s"), str_data) + + try: + if bind: + self.sock.bind(addr) + else: + self.sock.connect(addr) + except Exception: + raise RPCException(_("Could not open socket.")) + + def socket_s(self): + """Get socket type as string.""" + t_enum = ('PUSH', 'PULL', 'PUB', 'SUB', 'REP', 'REQ', 'ROUTER', + 'DEALER') + return dict(map(lambda t: (getattr(zmq, t), t), t_enum))[self.type] + + def subscribe(self, msg_filter): + """Subscribe.""" + if not self.can_sub: + raise RPCException("Cannot subscribe on this socket.") + LOG.debug(_("Subscribing to %s"), msg_filter) + + try: + self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter) + except Exception: + return + + self.subscriptions.append(msg_filter) + + def unsubscribe(self, msg_filter): + """Unsubscribe.""" + if msg_filter not in self.subscriptions: + return + self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter) + self.subscriptions.remove(msg_filter) + + def close(self): + if self.sock is None or self.sock.closed: + return + + # We must unsubscribe, or we'll leak descriptors. + if len(self.subscriptions) > 0: + for f in self.subscriptions: + try: + self.sock.setsockopt(zmq.UNSUBSCRIBE, f) + except Exception: + pass + self.subscriptions = [] + + # Linger -1 prevents lost/dropped messages + try: + self.sock.close(linger=-1) + except Exception: + pass + self.sock = None + + def recv(self): + if not self.can_recv: + raise RPCException(_("You cannot recv on this socket.")) + return self.sock.recv_multipart() + + def send(self, data): + if not self.can_send: + raise RPCException(_("You cannot send on this socket.")) + self.sock.send_multipart(data) + + +class ZmqClient(object): + """Client for ZMQ sockets.""" + + def __init__(self, addr, socket_type=zmq.PUSH, bind=False): + self.outq = ZmqSocket(addr, socket_type, bind=bind) + + def cast(self, msg_id, topic, data): + self.outq.send([str(msg_id), str(topic), str('cast'), + _serialize(data)]) + + def close(self): + self.outq.close() + + +class RpcContext(rpc_common.CommonRpcContext): + """Context that supports replying to a rpc.call.""" + def __init__(self, **kwargs): + self.replies = [] + super(RpcContext, self).__init__(**kwargs) + + def deepcopy(self): + values = self.to_dict() + values['replies'] = self.replies + return self.__class__(**values) + + def reply(self, reply=None, failure=None, ending=False): + if ending: + return + self.replies.append(reply) + + @classmethod + def marshal(self, ctx): + ctx_data = ctx.to_dict() + return _serialize(ctx_data) + + @classmethod + def unmarshal(self, data): + return RpcContext.from_dict(_deserialize(data)) + + +class InternalContext(object): + """Used by ConsumerBase as a private context for - methods.""" + + def __init__(self, proxy): + self.proxy = proxy + self.msg_waiter = None + + def _get_response(self, ctx, proxy, topic, data): + """Process a curried message and cast the result to topic.""" + LOG.debug(_("Running func with context: %s"), ctx.to_dict()) + data.setdefault('version', None) + data.setdefault('args', []) + + try: + result = proxy.dispatch( + ctx, data['version'], data['method'], **data['args']) + return ConsumerBase.normalize_reply(result, ctx.replies) + except greenlet.GreenletExit: + # ignore these since they are just from shutdowns + pass + except Exception: + return {'exc': + rpc_common.serialize_remote_exception(sys.exc_info())} + + def reply(self, ctx, proxy, + msg_id=None, context=None, topic=None, msg=None): + """Reply to a casted call.""" + # Our real method is curried into msg['args'] + + child_ctx = RpcContext.unmarshal(msg[0]) + response = ConsumerBase.normalize_reply( + self._get_response(child_ctx, proxy, topic, msg[1]), + ctx.replies) + + LOG.debug(_("Sending reply")) + cast(FLAGS, ctx, topic, { + 'method': '-process_reply', + 'args': { + 'msg_id': msg_id, + 'response': response + } + }) + + +class ConsumerBase(object): + """Base Consumer.""" + + def __init__(self): + self.private_ctx = InternalContext(None) + + @classmethod + def normalize_reply(self, result, replies): + #TODO(ewindisch): re-evaluate and document this method. + if isinstance(result, types.GeneratorType): + return list(result) + elif replies: + return replies + else: + return [result] + + def process(self, style, target, proxy, ctx, data): + # Method starting with - are + # processed internally. (non-valid method name) + method = data['method'] + + # Internal method + # uses internal context for safety. + if data['method'][0] == '-': + # For reply / process_reply + method = method[1:] + if method == 'reply': + self.private_ctx.reply(ctx, proxy, **data['args']) + return + + data.setdefault('version', None) + data.setdefault('args', []) + proxy.dispatch(ctx, data['version'], + data['method'], **data['args']) + + +class ZmqBaseReactor(ConsumerBase): + """ + A consumer class implementing a + centralized casting broker (PULL-PUSH) + for RoundRobin requests. + """ + + def __init__(self, conf): + super(ZmqBaseReactor, self).__init__() + + self.conf = conf + self.mapping = {} + self.proxies = {} + self.threads = [] + self.sockets = [] + self.subscribe = {} + + self.pool = eventlet.greenpool.GreenPool(conf.rpc_thread_pool_size) + + def register(self, proxy, in_addr, zmq_type_in, out_addr=None, + zmq_type_out=None, in_bind=True, out_bind=True, + subscribe=None): + + LOG.info(_("Registering reactor")) + + if zmq_type_in not in (zmq.PULL, zmq.SUB): + raise RPCException("Bad input socktype") + + # Items push in. + inq = ZmqSocket(in_addr, zmq_type_in, bind=in_bind, + subscribe=subscribe) + + self.proxies[inq] = proxy + self.sockets.append(inq) + + LOG.info(_("In reactor registered")) + + if not out_addr: + return + + if zmq_type_out not in (zmq.PUSH, zmq.PUB): + raise RPCException("Bad output socktype") + + # Items push out. + outq = ZmqSocket(out_addr, zmq_type_out, bind=out_bind) + + self.mapping[inq] = outq + self.mapping[outq] = inq + self.sockets.append(outq) + + LOG.info(_("Out reactor registered")) + + def consume_in_thread(self): + def _consume(sock): + LOG.info(_("Consuming socket")) + while True: + self.consume(sock) + + for k in self.proxies.keys(): + self.threads.append( + self.pool.spawn(_consume, k) + ) + + def wait(self): + for t in self.threads: + t.wait() + + def close(self): + for s in self.sockets: + s.close() + + for t in self.threads: + t.kill() + + +class ZmqProxy(ZmqBaseReactor): + """ + A consumer class implementing a + topic-based proxy, forwarding to + IPC sockets. + """ + + def __init__(self, conf): + super(ZmqProxy, self).__init__(conf) + + self.topic_proxy = {} + ipc_dir = conf.rpc_zmq_ipc_dir + + self.topic_proxy['zmq_replies'] = \ + ZmqSocket("ipc://%s/zmq_topic_zmq_replies" % (ipc_dir, ), + zmq.PUB, bind=True) + self.sockets.append(self.topic_proxy['zmq_replies']) + + def consume(self, sock): + ipc_dir = self.conf.rpc_zmq_ipc_dir + + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + msg_id, topic, style, in_msg = data + topic = topic.split('.', 1)[0] + + LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data))) + + # Handle zmq_replies magic + if topic.startswith('fanout~'): + sock_type = zmq.PUB + elif topic.startswith('zmq_replies'): + sock_type = zmq.PUB + inside = _deserialize(in_msg) + msg_id = inside[-1]['args']['msg_id'] + response = inside[-1]['args']['response'] + LOG.debug(_("->response->%s"), response) + data = [str(msg_id), _serialize(response)] + else: + sock_type = zmq.PUSH + + if not topic in self.topic_proxy: + outq = ZmqSocket("ipc://%s/zmq_topic_%s" % (ipc_dir, topic), + sock_type, bind=True) + self.topic_proxy[topic] = outq + self.sockets.append(outq) + LOG.info(_("Created topic proxy: %s"), topic) + + # It takes some time for a pub socket to open, + # before we can have any faith in doing a send() to it. + if sock_type == zmq.PUB: + eventlet.sleep(.5) + + LOG.debug(_("ROUTER RELAY-OUT START %(data)s") % {'data': data}) + self.topic_proxy[topic].send(data) + LOG.debug(_("ROUTER RELAY-OUT SUCCEEDED %(data)s") % {'data': data}) + + +class ZmqReactor(ZmqBaseReactor): + """ + A consumer class implementing a + consumer for messages. Can also be + used as a 1:1 proxy + """ + + def __init__(self, conf): + super(ZmqReactor, self).__init__(conf) + + def consume(self, sock): + #TODO(ewindisch): use zero-copy (i.e. references, not copying) + data = sock.recv() + LOG.debug(_("CONSUMER RECEIVED DATA: %s"), data) + if sock in self.mapping: + LOG.debug(_("ROUTER RELAY-OUT %(data)s") % { + 'data': data}) + self.mapping[sock].send(data) + return + + msg_id, topic, style, in_msg = data + + ctx, request = _deserialize(in_msg) + ctx = RpcContext.unmarshal(ctx) + + proxy = self.proxies[sock] + + self.pool.spawn_n(self.process, style, topic, + proxy, ctx, request) + + +class Connection(rpc_common.Connection): + """Manages connections and threads.""" + + def __init__(self, conf): + self.conf = conf + self.reactor = ZmqReactor(conf) + + def create_consumer(self, topic, proxy, fanout=False): + # Only consume on the base topic name. + topic = topic.split('.', 1)[0] + + LOG.info(_("Create Consumer for topic (%(topic)s)") % + {'topic': topic}) + + # Subscription scenarios + if fanout: + subscribe = ('', fanout)[type(fanout) == str] + sock_type = zmq.SUB + topic = 'fanout~' + topic + else: + sock_type = zmq.PULL + subscribe = None + + # Receive messages from (local) proxy + inaddr = "ipc://%s/zmq_topic_%s" % \ + (self.conf.rpc_zmq_ipc_dir, topic) + + LOG.debug(_("Consumer is a zmq.%s"), + ['PULL', 'SUB'][sock_type == zmq.SUB]) + + self.reactor.register(proxy, inaddr, sock_type, + subscribe=subscribe, in_bind=False) + + def close(self): + self.reactor.close() + + def wait(self): + self.reactor.wait() + + def consume_in_thread(self): + self.reactor.consume_in_thread() + + +def _cast(addr, context, msg_id, topic, msg, timeout=None): + timeout_cast = timeout or FLAGS.rpc_cast_timeout + payload = [RpcContext.marshal(context), msg] + + with Timeout(timeout_cast, exception=rpc_common.Timeout): + try: + conn = ZmqClient(addr) + + # assumes cast can't return an exception + conn.cast(msg_id, topic, payload) + except zmq.ZMQError: + raise RPCException("Cast failed. ZMQ Socket Exception") + finally: + if 'conn' in vars(): + conn.close() + + +def _call(addr, context, msg_id, topic, msg, timeout=None): + # timeout_response is how long we wait for a response + timeout = timeout or FLAGS.rpc_response_timeout + + # The msg_id is used to track replies. + msg_id = str(uuid.uuid4().hex) + + # Replies always come into the reply service. + reply_topic = "zmq_replies.%s" % FLAGS.rpc_zmq_host + + LOG.debug(_("Creating payload")) + # Curry the original request into a reply method. + mcontext = RpcContext.marshal(context) + payload = { + 'method': '-reply', + 'args': { + 'msg_id': msg_id, + 'context': mcontext, + 'topic': reply_topic, + 'msg': [mcontext, msg] + } + } + + LOG.debug(_("Creating queue socket for reply waiter")) + + # Messages arriving async. + # TODO(ewindisch): have reply consumer with dynamic subscription mgmt + with Timeout(timeout, exception=rpc_common.Timeout): + try: + msg_waiter = ZmqSocket( + "ipc://%s/zmq_topic_zmq_replies" % FLAGS.rpc_zmq_ipc_dir, + zmq.SUB, subscribe=msg_id, bind=False + ) + + LOG.debug(_("Sending cast")) + _cast(addr, context, msg_id, topic, payload) + + LOG.debug(_("Cast sent; Waiting reply")) + # Blocks until receives reply + msg = msg_waiter.recv() + LOG.debug(_("Received message: %s"), msg) + LOG.debug(_("Unpacking response")) + responses = _deserialize(msg[-1]) + # ZMQError trumps the Timeout error. + except zmq.ZMQError: + raise RPCException("ZMQ Socket Error") + finally: + if 'msg_waiter' in vars(): + msg_waiter.close() + + # It seems we don't need to do all of the following, + # but perhaps it would be useful for multicall? + # One effect of this is that we're checking all + # responses for Exceptions. + for resp in responses: + if isinstance(resp, types.DictType) and 'exc' in resp: + raise rpc_common.deserialize_remote_exception(FLAGS, resp['exc']) + + return responses[-1] + + +def _multi_send(method, context, topic, msg, timeout=None): + """ + Wraps the sending of messages, + dispatches to the matchmaker and sends + message to all relevant hosts. + """ + conf = FLAGS + LOG.debug(_("%(msg)s") % {'msg': ' '.join(map(pformat, (topic, msg)))}) + + queues = matchmaker.queues(topic) + LOG.debug(_("Sending message(s) to: %s"), queues) + + # Don't stack if we have no matchmaker results + if len(queues) == 0: + LOG.warn(_("No matchmaker results. Not casting.")) + # While not strictly a timeout, callers know how to handle + # this exception and a timeout isn't too big a lie. + raise rpc_common.Timeout, "No match from matchmaker." + + # This supports brokerless fanout (addresses > 1) + for queue in queues: + (_topic, ip_addr) = queue + _addr = "tcp://%s:%s" % (ip_addr, conf.rpc_zmq_port) + + if method.__name__ == '_cast': + eventlet.spawn_n(method, _addr, context, + _topic, _topic, msg, timeout) + return + return method(_addr, context, _topic, _topic, msg, timeout) + + +def create_connection(conf, new=True): + return Connection(conf) + + +def multicall(conf, *args, **kwargs): + """Multiple calls.""" + register_opts(conf) + return _multi_send(_call, *args, **kwargs) + + +def call(conf, *args, **kwargs): + """Send a message, expect a response.""" + register_opts(conf) + data = _multi_send(_call, *args, **kwargs) + return data[-1] + + +def cast(conf, *args, **kwargs): + """Send a message expecting no reply.""" + register_opts(conf) + _multi_send(_cast, *args, **kwargs) + + +def fanout_cast(conf, context, topic, msg, **kwargs): + """Send a message to all listening and expect no reply.""" + register_opts(conf) + # NOTE(ewindisch): fanout~ is used because it avoid splitting on . + # and acts as a non-subtle hint to the matchmaker and ZmqProxy. + _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) + + +def notify(conf, context, topic, msg, **kwargs): + """ + Send notification event. + Notifications are sent to topic-priority. + This differs from the AMQP drivers which send to topic.priority. + """ + register_opts(conf) + # NOTE(ewindisch): dot-priority in rpc notifier does not + # work with our assumptions. + topic.replace('.', '-') + cast(conf, context, topic, msg, **kwargs) + + +def cleanup(): + """Clean up resources in use by implementation.""" + global ZMQ_CTX + global matchmaker + matchmaker = None + ZMQ_CTX.destroy() + ZMQ_CTX = None + + +def register_opts(conf): + """Registration of options for this driver.""" + #NOTE(ewindisch): ZMQ_CTX and matchmaker + # are initialized here as this is as good + # an initialization method as any. + + # We memoize through these globals + global ZMQ_CTX + global matchmaker + global FLAGS + + if not FLAGS: + conf.register_opts(zmq_opts) + FLAGS = conf + # Don't re-set, if this method is called twice. + if not ZMQ_CTX: + ZMQ_CTX = zmq.Context(conf.rpc_zmq_contexts) + if not matchmaker: + # rpc_zmq_matchmaker should be set to a 'module.Class' + mm_path = conf.rpc_zmq_matchmaker.split('.') + mm_module = '.'.join(mm_path[:-1]) + mm_class = mm_path[-1] + + # Only initialize a class. + if mm_path[-1][0] not in string.ascii_uppercase: + LOG.error(_("Matchmaker could not be loaded.\n" + "rpc_zmq_matchmaker is not a class.")) + raise RPCException(_("Error loading Matchmaker.")) + + mm_impl = importutils.import_module(mm_module) + mm_constructor = getattr(mm_impl, mm_class) + matchmaker = mm_constructor() + + +register_opts(cfg.CONF) diff --git a/reddwarf/openstack/common/rpc/matchmaker.py b/reddwarf/openstack/common/rpc/matchmaker.py new file mode 100644 index 0000000000..92041a71aa --- /dev/null +++ b/reddwarf/openstack/common/rpc/matchmaker.py @@ -0,0 +1,258 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should except a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +import contextlib +import itertools +import json +import logging + +from reddwarf.openstack.common import cfg +from reddwarf.openstack.common.gettextutils import _ + + +matchmaker_opts = [ + # Matchmaker ring file + cfg.StrOpt('matchmaker_ringfile', + default='/etc/nova/matchmaker_ring.json', + help='Matchmaker ring file (JSON)'), +] + +CONF = cfg.CONF +CONF.register_opts(matchmaker_opts) +LOG = logging.getLogger(__name__) +contextmanager = contextlib.contextmanager + + +class MatchMakerException(Exception): + """Signified a match could not be found.""" + message = _("Match not found by MatchMaker.") + + +class Exchange(object): + """ + Implements lookups. + Subclass this to support hashtables, dns, etc. + """ + def __init__(self): + pass + + def run(self, key): + raise NotImplementedError() + + +class Binding(object): + """ + A binding on which to perform a lookup. + """ + def __init__(self): + pass + + def test(self, key): + raise NotImplementedError() + + +class MatchMakerBase(object): + """Match Maker Base Class.""" + + def __init__(self): + # Array of tuples. Index [2] toggles negation, [3] is last-if-true + self.bindings = [] + + def add_binding(self, binding, rule, last=True): + self.bindings.append((binding, rule, False, last)) + + #NOTE(ewindisch): kept the following method in case we implement the + # underlying support. + #def add_negate_binding(self, binding, rule, last=True): + # self.bindings.append((binding, rule, True, last)) + + def queues(self, key): + workers = [] + + # bit is for negate bindings - if we choose to implement it. + # last stops processing rules if this matches. + for (binding, exchange, bit, last) in self.bindings: + if binding.test(key): + workers.extend(exchange.run(key)) + + # Support last. + if last: + return workers + return workers + + +class DirectBinding(Binding): + """ + Specifies a host in the key via a '.' character + Although dots are used in the key, the behavior here is + that it maps directly to a host, thus direct. + """ + def test(self, key): + if '.' in key: + return True + return False + + +class TopicBinding(Binding): + """ + Where a 'bare' key without dots. + AMQP generally considers topic exchanges to be those *with* dots, + but we deviate here in terminology as the behavior here matches + that of a topic exchange (whereas where there are dots, behavior + matches that of a direct exchange. + """ + def test(self, key): + if '.' not in key: + return True + return False + + +class FanoutBinding(Binding): + """Match on fanout keys, where key starts with 'fanout.' string.""" + def test(self, key): + if key.startswith('fanout~'): + return True + return False + + +class StubExchange(Exchange): + """Exchange that does nothing.""" + def run(self, key): + return [(key, None)] + + +class RingExchange(Exchange): + """ + Match Maker where hosts are loaded from a static file containing + a hashmap (JSON formatted). + + __init__ takes optional ring dictionary argument, otherwise + loads the ringfile from CONF.mathcmaker_ringfile. + """ + def __init__(self, ring=None): + super(RingExchange, self).__init__() + + if ring: + self.ring = ring + else: + fh = open(CONF.matchmaker_ringfile, 'r') + self.ring = json.load(fh) + fh.close() + + self.ring0 = {} + for k in self.ring.keys(): + self.ring0[k] = itertools.cycle(self.ring[k]) + + def _ring_has(self, key): + if key in self.ring0: + return True + return False + + +class RoundRobinRingExchange(RingExchange): + """A Topic Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(RoundRobinRingExchange, self).__init__(ring) + + def run(self, key): + if not self._ring_has(key): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (key, ) + ) + return [] + host = next(self.ring0[key]) + return [(key + '.' + host, host)] + + +class FanoutRingExchange(RingExchange): + """Fanout Exchange based on a hashmap.""" + def __init__(self, ring=None): + super(FanoutRingExchange, self).__init__(ring) + + def run(self, key): + # Assume starts with "fanout~", strip it for lookup. + nkey = key.split('fanout~')[1:][0] + if not self._ring_has(nkey): + LOG.warn( + _("No key defining hosts for topic '%s', " + "see ringfile") % (nkey, ) + ) + return [] + return map(lambda x: (key + '.' + x, x), self.ring[nkey]) + + +class LocalhostExchange(Exchange): + """Exchange where all direct topics are local.""" + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + return [(key.split('.')[0] + '.localhost', 'localhost')] + + +class DirectExchange(Exchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def __init__(self): + super(Exchange, self).__init__() + + def run(self, key): + b, e = key.split('.', 1) + return [(b, e)] + + +class MatchMakerRing(MatchMakerBase): + """ + Match Maker where hosts are loaded from a static hashmap. + """ + def __init__(self, ring=None): + super(MatchMakerRing, self).__init__() + self.add_binding(FanoutBinding(), FanoutRingExchange(ring)) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), RoundRobinRingExchange(ring)) + + +class MatchMakerLocalhost(MatchMakerBase): + """ + Match Maker where all bare topics resolve to localhost. + Useful for testing. + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + self.add_binding(FanoutBinding(), LocalhostExchange()) + self.add_binding(DirectBinding(), DirectExchange()) + self.add_binding(TopicBinding(), LocalhostExchange()) + + +class MatchMakerStub(MatchMakerBase): + """ + Match Maker where topics are untouched. + Useful for testing, or for AMQP/brokered queues. + Will not work where knowledge of hosts is known (i.e. zeromq) + """ + def __init__(self): + super(MatchMakerLocalhost, self).__init__() + + self.add_binding(FanoutBinding(), StubExchange()) + self.add_binding(DirectBinding(), StubExchange()) + self.add_binding(TopicBinding(), StubExchange()) diff --git a/reddwarf/openstack/common/rpc/proxy.py b/reddwarf/openstack/common/rpc/proxy.py new file mode 100644 index 0000000000..cf4554aae1 --- /dev/null +++ b/reddwarf/openstack/common/rpc/proxy.py @@ -0,0 +1,165 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2012 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +A helper class for proxy objects to remote APIs. + +For more information about rpc API version numbers, see: + rpc/dispatcher.py +""" + + +from reddwarf.openstack.common import rpc + + +class RpcProxy(object): + """A helper class for rpc clients. + + This class is a wrapper around the RPC client API. It allows you to + specify the topic and API version in a single place. This is intended to + be used as a base class for a class that implements the client side of an + rpc API. + """ + + def __init__(self, topic, default_version): + """Initialize an RpcProxy. + + :param topic: The topic to use for all messages. + :param default_version: The default API version to request in all + outgoing messages. This can be overridden on a per-message + basis. + """ + self.topic = topic + self.default_version = default_version + super(RpcProxy, self).__init__() + + def _set_version(self, msg, vers): + """Helper method to set the version in a message. + + :param msg: The message having a version added to it. + :param vers: The version number to add to the message. + """ + msg['version'] = vers if vers else self.default_version + + def _get_topic(self, topic): + """Return the topic to use for a message.""" + return topic if topic else self.topic + + @staticmethod + def make_msg(method, **kwargs): + return {'method': method, 'args': kwargs} + + def call(self, context, msg, topic=None, version=None, timeout=None): + """rpc.call() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: The return value from the remote method. + """ + self._set_version(msg, version) + return rpc.call(context, self._get_topic(topic), msg, timeout) + + def multicall(self, context, msg, topic=None, version=None, timeout=None): + """rpc.multicall() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param timeout: (Optional) A timeout to use when waiting for the + response. If no timeout is specified, a default timeout will be + used that is usually sufficient. + :param version: (Optional) Override the requested API version in this + message. + + :returns: An iterator that lets you process each of the returned values + from the remote method as they arrive. + """ + self._set_version(msg, version) + return rpc.multicall(context, self._get_topic(topic), msg, timeout) + + def cast(self, context, msg, topic=None, version=None): + """rpc.cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast() does not wait on any return value from the + remote method. + """ + self._set_version(msg, version) + rpc.cast(context, self._get_topic(topic), msg) + + def fanout_cast(self, context, msg, topic=None, version=None): + """rpc.fanout_cast() a remote method. + + :param context: The request context + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast() does not wait on any return value + from the remote method. + """ + self._set_version(msg, version) + rpc.fanout_cast(context, self._get_topic(topic), msg) + + def cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.cast_to_server(context, server_params, self._get_topic(topic), msg) + + def fanout_cast_to_server(self, context, server_params, msg, topic=None, + version=None): + """rpc.fanout_cast_to_server() a remote method. + + :param context: The request context + :param server_params: Server parameters. See rpc.cast_to_server() for + details. + :param msg: The message to send, including the method and args. + :param topic: Override the topic for this message. + :param version: (Optional) Override the requested API version in this + message. + + :returns: None. rpc.fanout_cast_to_server() does not wait on any + return values. + """ + self._set_version(msg, version) + rpc.fanout_cast_to_server(context, server_params, + self._get_topic(topic), msg) diff --git a/reddwarf/openstack/common/timeutils.py b/reddwarf/openstack/common/timeutils.py new file mode 100644 index 0000000000..5eeaf70aa4 --- /dev/null +++ b/reddwarf/openstack/common/timeutils.py @@ -0,0 +1,109 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC. +# All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +""" +Time related utilities and helper functions. +""" + +import calendar +import datetime +import time + +import iso8601 + + +TIME_FORMAT = "%Y-%m-%dT%H:%M:%S" +PERFECT_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%f" + + +def isotime(at=None): + """Stringify time in ISO 8601 format""" + if not at: + at = utcnow() + str = at.strftime(TIME_FORMAT) + tz = at.tzinfo.tzname(None) if at.tzinfo else 'UTC' + str += ('Z' if tz == 'UTC' else tz) + return str + + +def parse_isotime(timestr): + """Parse time from ISO 8601 format""" + try: + return iso8601.parse_date(timestr) + except iso8601.ParseError as e: + raise ValueError(e.message) + except TypeError as e: + raise ValueError(e.message) + + +def strtime(at=None, fmt=PERFECT_TIME_FORMAT): + """Returns formatted utcnow.""" + if not at: + at = utcnow() + return at.strftime(fmt) + + +def parse_strtime(timestr, fmt=PERFECT_TIME_FORMAT): + """Turn a formatted time back into a datetime.""" + return datetime.datetime.strptime(timestr, fmt) + + +def normalize_time(timestamp): + """Normalize time in arbitrary timezone to UTC""" + offset = timestamp.utcoffset() + return timestamp.replace(tzinfo=None) - offset if offset else timestamp + + +def is_older_than(before, seconds): + """Return True if before is older than seconds.""" + return utcnow() - before > datetime.timedelta(seconds=seconds) + + +def utcnow_ts(): + """Timestamp version of our utcnow function.""" + return calendar.timegm(utcnow().timetuple()) + + +def utcnow(): + """Overridable version of utils.utcnow.""" + if utcnow.override_time: + return utcnow.override_time + return datetime.datetime.utcnow() + + +utcnow.override_time = None + + +def set_time_override(override_time=datetime.datetime.utcnow()): + """Override utils.utcnow to return a constant time.""" + utcnow.override_time = override_time + + +def advance_time_delta(timedelta): + """Advance overriden time using a datetime.timedelta.""" + assert(not utcnow.override_time is None) + utcnow.override_time += timedelta + + +def advance_time_seconds(seconds): + """Advance overriden time by seconds.""" + advance_time_delta(datetime.timedelta(0, seconds)) + + +def clear_time_override(): + """Remove the overridden time.""" + utcnow.override_time = None diff --git a/reddwarf/rpc/common.py b/reddwarf/rpc/common.py deleted file mode 100644 index 2323843ca9..0000000000 --- a/reddwarf/rpc/common.py +++ /dev/null @@ -1,129 +0,0 @@ -# vim: tabstop=4 shiftwidth=4 softtabstop=4 - -# Copyright 2010 United States Government as represented by the -# Administrator of the National Aeronautics and Space Administration. -# All Rights Reserved. -# Copyright 2011 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import copy -import logging - -from reddwarf.common import config -from reddwarf.common import exception - - -LOG = logging.getLogger(__name__) - - -class RemoteError(exception.ReddwarfError): - """Signifies that a remote class has raised an exception. - - Contains a string representation of the type of the original exception, - the value of the original exception, and the traceback. These are - sent to the parent as a joined string so printing the exception - contains all of the relevant info. - - """ - message = _("Remote error: %(exc_type)s %(value)s\n%(traceback)s.") - - def __init__(self, exc_type=None, value=None, traceback=None): - self.exc_type = exc_type - self.value = value - self.traceback = traceback - super(RemoteError, self).__init__(exc_type=exc_type, - value=value, - traceback=traceback) - - -class Timeout(exception.ReddwarfError): - """Signifies that a timeout has occurred. - - This exception is raised if the rpc_response_timeout is reached while - waiting for a response from the remote side. - """ - message = _("Timeout while waiting on RPC response.") - - -class Connection(object): - """A connection, returned by rpc.create_connection(). - - This class represents a connection to the message bus used for rpc. - An instance of this class should never be created by users of the rpc API. - Use rpc.create_connection() instead. - """ - def close(self): - """Close the connection. - - This method must be called when the connection will no longer be used. - It will ensure that any resources associated with the connection, such - as a network connection, and cleaned up. - """ - raise NotImplementedError() - - def create_consumer(self, topic, proxy, fanout=False): - """Create a consumer on this connection. - - A consumer is associated with a message queue on the backend message - bus. The consumer will read messages from the queue, unpack them, and - dispatch them to the proxy object. The contents of the message pulled - off of the queue will determine which method gets called on the proxy - object. - - :param topic: This is a name associated with what to consume from. - Multiple instances of a service may consume from the same - topic. For example, all instances of nova-compute consume - from a queue called "compute". In that case, the - messages will get distributed amongst the consumers in a - round-robin fashion if fanout=False. If fanout=True, - every consumer associated with this topic will get a - copy of every message. - :param proxy: The object that will handle all incoming messages. - :param fanout: Whether or not this is a fanout topic. See the - documentation for the topic parameter for some - additional comments on this. - """ - raise NotImplementedError() - - def consume_in_thread(self): - """Spawn a thread to handle incoming messages. - - Spawn a thread that will be responsible for handling all incoming - messages for consumers that were set up on this connection. - - Message dispatching inside of this is expected to be implemented in a - non-blocking manner. An example implementation would be having this - thread pull messages in for all of the consumers, but utilize a thread - pool for dispatching the messages to the proxy objects. - """ - raise NotImplementedError() - - -def _safe_log(log_func, msg, msg_data): - """Sanitizes the msg_data field before logging.""" - SANITIZE = { - 'set_admin_password': ('new_pass',), - 'run_instance': ('admin_password',), - } - method = msg_data['method'] - if method in SANITIZE: - msg_data = copy.deepcopy(msg_data) - args_to_sanitize = SANITIZE[method] - for arg in args_to_sanitize: - try: - msg_data['args'][arg] = "" - except KeyError: - pass - - return log_func(msg, msg_data) diff --git a/reddwarf/taskmanager/manager.py b/reddwarf/taskmanager/manager.py index 7e7656bf52..2cb25bc366 100644 --- a/reddwarf/taskmanager/manager.py +++ b/reddwarf/taskmanager/manager.py @@ -17,7 +17,6 @@ import logging import traceback -import weakref from eventlet import greenthread @@ -26,7 +25,7 @@ from reddwarf.common import service from reddwarf.taskmanager import models from reddwarf.taskmanager.models import BuiltInstanceTasks from reddwarf.taskmanager.models import FreshInstanceTasks - +from reddwarf.openstack.common.rpc.common import UnsupportedRpcVersion LOG = logging.getLogger(__name__) @@ -34,28 +33,12 @@ LOG = logging.getLogger(__name__) class TaskManager(service.Manager): """Task manager impl""" + RPC_API_VERSION = "1.0" + def __init__(self, *args, **kwargs): - self.tasks = weakref.WeakKeyDictionary() super(TaskManager, self).__init__(*args, **kwargs) LOG.info(_("TaskManager init %s %s") % (args, kwargs)) - def periodic_tasks(self, raise_on_error=False): - LOG.debug("No. of running tasks: %r" % len(self.tasks)) - - def _wrapper(self, method, context, *args, **kwargs): - """Maps the respective manager method with a task counter.""" - # TODO(rnirmal): Just adding a basic counter. Will revist and - # re-implement when we have actual tasks. - self.tasks[greenthread.getcurrent()] = context - try: - func = getattr(self, method) - func(context, *args, **kwargs) - except Exception as e: - LOG.error("Got an error running %s!" % method) - LOG.error(traceback.format_exc()) - finally: - del self.tasks[greenthread.getcurrent()] - def resize_volume(self, context, instance_id, new_size): instance_tasks = models.BuiltInstanceTasks.load(context, instance_id) instance_tasks.resize_volume(new_size) diff --git a/reddwarf/taskmanager/models.py b/reddwarf/taskmanager/models.py index c4439e279a..6aacdb5a9e 100644 --- a/reddwarf/taskmanager/models.py +++ b/reddwarf/taskmanager/models.py @@ -16,6 +16,7 @@ import logging from eventlet import greenthread from datetime import datetime +import traceback from novaclient import exceptions as nova_exceptions from reddwarf.common import config from reddwarf.common import remote @@ -139,6 +140,7 @@ class FreshInstanceTasks(FreshInstance): def _log_and_raise(self, exc, message, task_status): LOG.error(message) LOG.error(exc) + LOG.error(traceback.format_exc()) self.update_db(task_status=task_status) raise ReddwarfError(message=message)