From deb27d9c24d3745ff062bc437081bd67cf10059f Mon Sep 17 00:00:00 2001 From: Ihar Hrachyshka Date: Mon, 2 Jun 2014 17:40:38 +0200 Subject: [PATCH] Port to oslo.messaging Now that all preparations are done, actually port the code to use oslo.messaging. This patch does as little as possible. Follow up patches that refactor and cleanup the code and configuration files, will be merged later. The reason for this is to make the patch as slim as possible, to make review process more smooth and concentrated. Details: * neutron/common/rpc.py: - added init() and cleanup() to set global RPC layer state. - added utility functions: get_server(), get_client(), get_notifier() that wrap up oslo.messaging API a bit, enforcing eventlet executor and setting serializer, among other things. - removed PluginRpcDispatcher, instead introduced PluginRpcSerializer to use as a default serializer for API callbacks. * neutron/common/rpc_compat.py: - emulated incubator RPC layer behaviour thru previously introduced stub classes (RpcCallback, RpcProxy, ...) using new oslo.messaging API. - switched to using new oslo.messaging exception types. * neutron/service.py: - expect multiple RPC listeners that are of MessageHandlingServer type, not GreenThread. * neutron/common/config.py: - initialize RPC layer in init() * setup.cfg: - added entry points for old notifier drivers to retain backward compatibility. * neutron/tests/...: - introduced fake_notifier to replace impl_fake. - faked out consume_in_thread() to avoid starting RPC listeners when running unit tests. - used 'fake' transport driver. - made sure neutron.test.* exceptions are caught. - initialize and clean up RPC layer for each test case. * Ported all affected code from using neutron.openstack.common.notifier API to oslo.messaging.Notifier. * rpc.set_defaults() was renamed to rpc.set_transport_defaults() * other changes not worth mentioning here. blueprint oslo-messaging DocImpact Change-Id: I5a91c34df6e300f2dc46217b1b16352fcc3039fc --- neutron/api/v2/base.py | 30 ++--- neutron/cmd/usage_audit.py | 33 ++--- neutron/common/config.py | 9 +- neutron/common/rpc.py | 113 ++++++++++++++++-- neutron/common/rpc_compat.py | 84 +++++++++++-- neutron/db/l3_db.py | 18 ++- neutron/db/metering/metering_rpc.py | 3 +- neutron/openstack/common/service.py | 4 +- .../bigswitch/agent/restproxy_agent.py | 3 +- neutron/plugins/bigswitch/plugin.py | 4 +- neutron/plugins/brocade/NeutronPlugin.py | 4 +- .../plugins/cisco/n1kv/n1kv_neutron_plugin.py | 4 +- .../hyperv/agent/hyperv_neutron_agent.py | 6 +- neutron/plugins/hyperv/rpc_callbacks.py | 4 +- .../plugins/ibm/agent/sdnve_neutron_agent.py | 3 +- neutron/plugins/ibm/sdnve_neutron_plugin.py | 4 +- .../agent/linuxbridge_neutron_agent.py | 3 +- .../plugins/linuxbridge/lb_neutron_plugin.py | 4 +- neutron/plugins/midonet/plugin.py | 4 +- neutron/plugins/ml2/rpc.py | 12 +- .../mlnx/agent/eswitch_neutron_agent.py | 3 +- neutron/plugins/mlnx/rpc_callbacks.py | 4 +- .../plugins/nec/agent/nec_neutron_agent.py | 4 +- neutron/plugins/nec/common/config.py | 1 - neutron/plugins/nec/nec_plugin.py | 15 ++- .../ofagent/agent/ofa_neutron_agent.py | 3 +- .../agent/nvsd_neutron_agent.py | 4 +- neutron/plugins/oneconvergence/plugin.py | 4 +- .../openvswitch/agent/ovs_neutron_agent.py | 3 +- .../plugins/openvswitch/ovs_neutron_plugin.py | 4 +- .../plugins/ryu/agent/ryu_neutron_agent.py | 3 +- neutron/plugins/ryu/ryu_neutron_plugin.py | 3 +- neutron/plugins/vmware/dhcp_meta/rpc.py | 4 +- neutron/policy.py | 4 +- neutron/service.py | 18 +-- neutron/services/firewall/fwaas_plugin.py | 3 +- .../services/l3_router/l3_router_plugin.py | 3 +- .../drivers/common/agent_driver_base.py | 4 +- .../metering/agents/metering_agent.py | 9 +- .../vpn/device_drivers/cisco_ipsec.py | 11 +- neutron/services/vpn/device_drivers/ipsec.py | 10 +- .../vpn/service_drivers/cisco_ipsec.py | 3 +- neutron/services/vpn/service_drivers/ipsec.py | 3 +- neutron/tests/base.py | 46 ++++--- neutron/tests/fake_notifier.py | 50 ++++++++ .../tests/unit/hyperv/test_hyperv_rpcapi.py | 14 +-- neutron/tests/unit/linuxbridge/test_rpcapi.py | 11 +- neutron/tests/unit/ml2/test_rpcapi.py | 13 +- neutron/tests/unit/mlnx/test_rpcapi.py | 11 +- .../tests/unit/openvswitch/test_ovs_rpcapi.py | 11 +- .../services/metering/test_metering_agent.py | 6 +- neutron/tests/unit/test_agent_rpc.py | 2 +- neutron/tests/unit/test_api_v2.py | 68 +++++------ neutron/tests/unit/test_l3_plugin.py | 8 +- requirements.txt | 1 + setup.cfg | 8 ++ 56 files changed, 439 insertions(+), 282 deletions(-) create mode 100644 neutron/tests/fake_notifier.py diff --git a/neutron/api/v2/base.py b/neutron/api/v2/base.py index 2ed735e277..89ef47ed6d 100644 --- a/neutron/api/v2/base.py +++ b/neutron/api/v2/base.py @@ -27,8 +27,8 @@ from neutron.api.v2 import attributes from neutron.api.v2 import resource as wsgi_resource from neutron.common import constants as const from neutron.common import exceptions +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron import policy from neutron import quota @@ -69,7 +69,7 @@ class Controller(object): self._native_sorting = self._is_native_sorting_supported() self._policy_attrs = [name for (name, info) in self._attr_info.items() if info.get('required_by_policy')] - self._publisher_id = notifier_api.publisher_id('network') + self._notifier = n_rpc.get_notifier('network') # use plugin's dhcp notifier, if this is already instantiated agent_notifiers = getattr(plugin, 'agent_notifiers', {}) self._dhcp_agent_notifier = ( @@ -372,10 +372,8 @@ class Controller(object): def create(self, request, body=None, **kwargs): """Creates a new instance of the requested entity.""" parent_id = kwargs.get(self._parent_id_name) - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.create.start', - notifier_api.CONF.default_notification_level, body) body = Controller.prepare_request_body(request.context, body, True, self._resource, self._attr_info, @@ -419,10 +417,8 @@ class Controller(object): def notify(create_result): notifier_method = self._resource + '.create.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, create_result) self._send_dhcp_notification(request.context, create_result, @@ -458,10 +454,8 @@ class Controller(object): def delete(self, request, id, **kwargs): """Deletes the specified entity.""" - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.delete.start', - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) action = self._plugin_handlers[self.DELETE] @@ -482,10 +476,8 @@ class Controller(object): obj_deleter = getattr(self._plugin, action) obj_deleter(request.context, id, **kwargs) notifier_method = self._resource + '.delete.end' - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, notifier_method, - notifier_api.CONF.default_notification_level, {self._resource + '_id': id}) result = {self._resource: self._view(request.context, obj)} self._send_nova_notification(action, {}, result) @@ -502,10 +494,8 @@ class Controller(object): msg = _("Invalid format: %s") % request.body raise exceptions.BadRequest(resource='body', msg=msg) payload['id'] = id - notifier_api.notify(request.context, - self._publisher_id, + self._notifier.info(request.context, self._resource + '.update.start', - notifier_api.CONF.default_notification_level, payload) body = Controller.prepare_request_body(request.context, body, False, self._resource, self._attr_info, @@ -541,11 +531,7 @@ class Controller(object): obj = obj_updater(request.context, id, **kwargs) result = {self._resource: self._view(request.context, obj)} notifier_method = self._resource + '.update.end' - notifier_api.notify(request.context, - self._publisher_id, - notifier_method, - notifier_api.CONF.default_notification_level, - result) + self._notifier.info(request.context, notifier_method, result) self._send_dhcp_notification(request.context, result, notifier_method) diff --git a/neutron/cmd/usage_audit.py b/neutron/cmd/usage_audit.py index f48e0c691c..6294d710d9 100644 --- a/neutron/cmd/usage_audit.py +++ b/neutron/cmd/usage_audit.py @@ -26,9 +26,9 @@ import sys from oslo.config import cfg from neutron.common import config +from neutron.common import rpc as n_rpc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api def main(): @@ -37,33 +37,14 @@ def main(): cxt = context.get_admin_context() plugin = manager.NeutronManager.get_plugin() + notifier = n_rpc.get_notifier('network') for network in plugin.get_networks(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'network.exists', - notifier_api.INFO, - {'network': network}) + notifier.info(cxt, 'network.exists', {'network': network}) for subnet in plugin.get_subnets(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'subnet.exists', - notifier_api.INFO, - {'subnet': subnet}) + notifier.info(cxt, 'subnet.exists', {'subnet': subnet}) for port in plugin.get_ports(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'port.exists', - notifier_api.INFO, - {'port': port}) + notifier.info(cxt, 'port.exists', {'port': port}) for router in plugin.get_routers(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'router.exists', - notifier_api.INFO, - {'router': router}) + notifier.info(cxt, 'router.exists', {'router': router}) for floatingip in plugin.get_floatingips(cxt): - notifier_api.notify(cxt, - notifier_api.publisher_id('network'), - 'floatingip.exists', - notifier_api.INFO, - {'floatingip': floatingip}) + notifier.info(cxt, 'floatingip.exists', {'floatingip': floatingip}) diff --git a/neutron/common/config.py b/neutron/common/config.py index a7b7a95594..0a8232fa02 100644 --- a/neutron/common/config.py +++ b/neutron/common/config.py @@ -20,13 +20,13 @@ Routines for configuring Neutron import os from oslo.config import cfg +from oslo import messaging from paste import deploy from neutron.api.v2 import attributes from neutron.common import utils from neutron.openstack.common.db import options as db_options from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc from neutron import version @@ -125,7 +125,7 @@ cfg.CONF.register_opts(core_opts) cfg.CONF.register_cli_opts(core_cli_opts) # Ensure that the control exchange is set correctly -rpc.set_defaults(control_exchange='neutron') +messaging.set_transport_defaults(control_exchange='neutron') _SQL_CONNECTION_DEFAULT = 'sqlite://' # Update the default QueuePool parameters. These can be tweaked by the # configuration variables - max_pool_size, max_overflow and pool_timeout @@ -139,6 +139,11 @@ def init(args, **kwargs): version='%%prog %s' % version.version_info.release_string(), **kwargs) + # FIXME(ihrachys): if import is put in global, circular import + # failure occurs + from neutron.common import rpc as n_rpc + n_rpc.init(cfg.CONF) + # Validate that the base_mac is of the correct format msg = attributes._validate_regex(cfg.CONF.base_mac, attributes.MAC_PATTERN) diff --git a/neutron/common/rpc.py b/neutron/common/rpc.py index 643cf59344..98d4681404 100644 --- a/neutron/common/rpc.py +++ b/neutron/common/rpc.py @@ -15,31 +15,122 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging +from oslo.messaging import serializer as om_serializer + +from neutron.common import exceptions from neutron import context from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher LOG = logging.getLogger(__name__) -class PluginRpcDispatcher(dispatcher.RpcDispatcher): - """This class is used to convert RPC common context into +TRANSPORT = None +NOTIFIER = None + +ALLOWED_EXMODS = [ + exceptions.__name__, +] +EXTRA_EXMODS = [] + + +TRANSPORT_ALIASES = { + 'neutron.openstack.common.rpc.impl_fake': 'fake', + 'neutron.openstack.common.rpc.impl_qpid': 'qpid', + 'neutron.openstack.common.rpc.impl_kombu': 'rabbit', + 'neutron.openstack.common.rpc.impl_zmq': 'zmq', + 'neutron.rpc.impl_fake': 'fake', + 'neutron.rpc.impl_qpid': 'qpid', + 'neutron.rpc.impl_kombu': 'rabbit', + 'neutron.rpc.impl_zmq': 'zmq', +} + + +def init(conf): + global TRANSPORT, NOTIFIER + exmods = get_allowed_exmods() + TRANSPORT = messaging.get_transport(conf, + allowed_remote_exmods=exmods, + aliases=TRANSPORT_ALIASES) + NOTIFIER = messaging.Notifier(TRANSPORT) + + +def cleanup(): + global TRANSPORT, NOTIFIER + assert TRANSPORT is not None + assert NOTIFIER is not None + TRANSPORT.cleanup() + TRANSPORT = NOTIFIER = None + + +def add_extra_exmods(*args): + EXTRA_EXMODS.extend(args) + + +def clear_extra_exmods(): + del EXTRA_EXMODS[:] + + +def get_allowed_exmods(): + return ALLOWED_EXMODS + EXTRA_EXMODS + + +def get_client(target, version_cap=None, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.RPCClient(TRANSPORT, + target, + version_cap=version_cap, + serializer=serializer) + + +def get_server(target, endpoints, serializer=None): + assert TRANSPORT is not None + serializer = PluginRpcSerializer(serializer) + return messaging.get_rpc_server(TRANSPORT, + target, + endpoints, + executor='eventlet', + serializer=serializer) + + +def get_notifier(service=None, host=None, publisher_id=None): + assert NOTIFIER is not None + if not publisher_id: + publisher_id = "%s.%s" % (service, host or cfg.CONF.host) + return NOTIFIER.prepare(publisher_id=publisher_id) + + +class PluginRpcSerializer(om_serializer.Serializer): + """This serializer is used to convert RPC common context into Neutron Context. """ + def __init__(self, base): + super(PluginRpcSerializer, self).__init__() + self._base = base - def __init__(self, callbacks): - super(PluginRpcDispatcher, self).__init__(callbacks) + def serialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.serialize_entity(ctxt, entity) - def dispatch(self, rpc_ctxt, version, method, namespace, **kwargs): - rpc_ctxt_dict = rpc_ctxt.to_dict() + def deserialize_entity(self, ctxt, entity): + if not self._base: + return entity + return self._base.deserialize_entity(ctxt, entity) + + def serialize_context(self, ctxt): + return ctxt.to_dict() + + def deserialize_context(self, ctxt): + rpc_ctxt_dict = ctxt.copy() user_id = rpc_ctxt_dict.pop('user_id', None) if not user_id: user_id = rpc_ctxt_dict.pop('user', None) tenant_id = rpc_ctxt_dict.pop('tenant_id', None) if not tenant_id: tenant_id = rpc_ctxt_dict.pop('project_id', None) - neutron_ctxt = context.Context(user_id, tenant_id, - load_admin_roles=False, **rpc_ctxt_dict) - return super(PluginRpcDispatcher, self).dispatch( - neutron_ctxt, version, method, namespace, **kwargs) + return context.Context(user_id, tenant_id, + load_admin_roles=False, **rpc_ctxt_dict) diff --git a/neutron/common/rpc_compat.py b/neutron/common/rpc_compat.py index f494d53380..939551d493 100644 --- a/neutron/common/rpc_compat.py +++ b/neutron/common/rpc_compat.py @@ -13,24 +13,63 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg +from oslo import messaging + +from neutron.common import rpc as n_rpc from neutron.openstack.common import log as logging -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import common as rpc_common -from neutron.openstack.common.rpc import dispatcher as rpc_dispatcher -from neutron.openstack.common.rpc import proxy from neutron.openstack.common import service LOG = logging.getLogger(__name__) -class RpcProxy(proxy.RpcProxy): +class RpcProxy(object): ''' This class is created to facilitate migration from oslo-incubator RPC layer implementation to oslo.messaging and is intended to emulate RpcProxy class behaviour using oslo.messaging API once the migration is applied. ''' + RPC_API_NAMESPACE = None + + def __init__(self, topic, default_version, version_cap=None): + self.topic = topic + target = messaging.Target(topic=topic, version=default_version) + self._client = n_rpc.get_client(target, version_cap=version_cap) + + def make_msg(self, method, **kwargs): + return {'method': method, + 'namespace': self.RPC_API_NAMESPACE, + 'args': kwargs} + + def call(self, context, msg, **kwargs): + return self.__call_rpc_method( + context, msg, rpc_method='call', **kwargs) + + def cast(self, context, msg, **kwargs): + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def fanout_cast(self, context, msg, **kwargs): + kwargs['fanout'] = True + self.__call_rpc_method(context, msg, rpc_method='cast', **kwargs) + + def __call_rpc_method(self, context, msg, **kwargs): + options = dict( + ((opt, kwargs[opt]) + for opt in ('fanout', 'timeout', 'topic', 'version') + if kwargs.get(opt)) + ) + if msg['namespace']: + options['namespace'] = msg['namespace'] + + if options: + callee = self._client.prepare(**options) + else: + callee = self._client + + func = getattr(callee, kwargs['rpc_method']) + return func(context, msg['method'], **msg['args']) class RpcCallback(object): @@ -40,6 +79,11 @@ class RpcCallback(object): callback version using oslo.messaging API once the migration is applied. ''' + RPC_API_VERSION = '1.0' + + def __init__(self): + super(RpcCallback, self).__init__() + self.target = messaging.Target(version=self.RPC_API_VERSION) class Service(service.Service): @@ -64,8 +108,7 @@ class Service(service.Service): LOG.debug("Creating Consumer connection for Service %s" % self.topic) - dispatcher = rpc_dispatcher.RpcDispatcher([self.manager], - self.serializer) + dispatcher = [self.manager] # Share this same connection for these Consumers self.conn.create_consumer(self.topic, dispatcher, fanout=False) @@ -93,11 +136,30 @@ class Service(service.Service): super(Service, self).stop() +class Connection(object): + + def __init__(self): + super(Connection, self).__init__() + self.servers = [] + + def create_consumer(self, topic, proxy, fanout=False): + target = messaging.Target( + topic=topic, server=cfg.CONF.host, fanout=fanout) + server = n_rpc.get_server(target, proxy) + self.servers.append(server) + + def consume_in_thread(self): + for server in self.servers: + server.start() + return self.servers + + # functions -create_connection = rpc.create_connection +def create_connection(new=True): + return Connection() # exceptions -RPCException = rpc_common.RPCException -RemoteError = rpc_common.RemoteError -MessagingTimeout = rpc_common.Timeout +RPCException = messaging.MessagingException +RemoteError = messaging.RemoteError +MessagingTimeout = messaging.MessagingTimeout diff --git a/neutron/db/l3_db.py b/neutron/db/l3_db.py index 547026277a..5d2aa6e1aa 100644 --- a/neutron/db/l3_db.py +++ b/neutron/db/l3_db.py @@ -21,6 +21,7 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as l3_constants from neutron.common import exceptions as n_exc +from neutron.common import rpc as n_rpc from neutron.common import utils from neutron.db import model_base from neutron.db import models_v2 @@ -28,7 +29,6 @@ from neutron.extensions import external_net from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import uuidutils from neutron.plugins.common import constants @@ -481,11 +481,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': port['fixed_ips'][0]['subnet_id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.create', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.create', {'router_interface': info}) return info def _confirm_router_interface_not_in_use(self, context, router_id, @@ -560,11 +558,9 @@ class L3_NAT_db_mixin(l3.RouterPluginBase): 'tenant_id': port['tenant_id'], 'port_id': port['id'], 'subnet_id': subnet['id']} - notifier_api.notify(context, - notifier_api.publisher_id('network'), - 'router.interface.delete', - notifier_api.CONF.default_notification_level, - {'router_interface': info}) + notifier = n_rpc.get_notifier('network') + notifier.info( + context, 'router.interface.delete', {'router_interface': info}) return info def _get_floatingip(self, context, id): diff --git a/neutron/db/metering/metering_rpc.py b/neutron/db/metering/metering_rpc.py index 82e7d3dd1f..b55a0cf4c7 100644 --- a/neutron/db/metering/metering_rpc.py +++ b/neutron/db/metering/metering_rpc.py @@ -15,7 +15,6 @@ # under the License. from neutron.common import constants as consts -from neutron.common import rpc as p_rpc from neutron.common import utils from neutron import manager from neutron.openstack.common import log as logging @@ -32,7 +31,7 @@ class MeteringRpcCallbacks(object): self.meter_plugin = meter_plugin def create_rpc_dispatcher(self): - return p_rpc.PluginRpcDispatcher([self]) + return [self] def get_sync_data_metering(self, context, **kwargs): l3_plugin = manager.NeutronManager.get_service_plugins().get( diff --git a/neutron/openstack/common/service.py b/neutron/openstack/common/service.py index 79ae9bc5d0..4575de4b47 100644 --- a/neutron/openstack/common/service.py +++ b/neutron/openstack/common/service.py @@ -45,7 +45,9 @@ from neutron.openstack.common import systemd from neutron.openstack.common import threadgroup -rpc = importutils.try_import('neutron.openstack.common.rpc') +#rpc = importutils.try_import('neutron.openstack.common.rpc') +# TODO(ihrachys): restore once oslo-rpc code is removed from the tree +rpc = None CONF = cfg.CONF LOG = logging.getLogger(__name__) diff --git a/neutron/plugins/bigswitch/agent/restproxy_agent.py b/neutron/plugins/bigswitch/agent/restproxy_agent.py index a9c1e6653a..6cdf5913b0 100644 --- a/neutron/plugins/bigswitch/agent/restproxy_agent.py +++ b/neutron/plugins/bigswitch/agent/restproxy_agent.py @@ -36,7 +36,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import excutils from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.bigswitch import config as pl_config LOG = log.getLogger(__name__) @@ -106,7 +105,7 @@ class RestProxyAgent(rpc_compat.RpcCallback, self.topic = topics.AGENT self.plugin_rpc = PluginApi(topics.PLUGIN) self.context = q_context.get_admin_context_without_session() - self.dispatcher = dispatcher.RpcDispatcher([self]) + self.dispatcher = [self] consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] self.connection = agent_rpc.create_consumers(self.dispatcher, diff --git a/neutron/plugins/bigswitch/plugin.py b/neutron/plugins/bigswitch/plugin.py index 9249f5d6b0..712f02b3c3 100644 --- a/neutron/plugins/bigswitch/plugin.py +++ b/neutron/plugins/bigswitch/plugin.py @@ -57,7 +57,6 @@ from neutron.api import extensions as neutron_extensions from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.common import constants as const from neutron.common import exceptions -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -121,8 +120,7 @@ class RestProxyCallbacks(rpc_compat.RpcCallback, RPC_API_VERSION = '1.1' def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_port_from_device(self, device): port_id = re.sub(r"^tap", "", device) diff --git a/neutron/plugins/brocade/NeutronPlugin.py b/neutron/plugins/brocade/NeutronPlugin.py index fc1d1ad5d7..5ec3fb4016 100644 --- a/neutron/plugins/brocade/NeutronPlugin.py +++ b/neutron/plugins/brocade/NeutronPlugin.py @@ -31,7 +31,6 @@ from neutron.agent import securitygroups_rpc as sg_rpc from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -98,8 +97,7 @@ class BridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py index d3749f19da..e5c701e7d2 100644 --- a/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py +++ b/neutron/plugins/cisco/n1kv/n1kv_neutron_plugin.py @@ -28,7 +28,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -75,8 +74,7 @@ class N1kvRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class N1kvNeutronPluginV2(db_base_plugin_v2.NeutronDbPluginV2, diff --git a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py index 3447356257..f76f751f84 100644 --- a/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py +++ b/neutron/plugins/hyperv/agent/hyperv_neutron_agent.py @@ -38,7 +38,6 @@ from neutron.common import topics from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.hyperv.agent import utils from neutron.plugins.hyperv.agent import utilsfactory @@ -106,8 +105,7 @@ class HyperVSecurityAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - rpc_callback = HyperVSecurityCallbackMixin(self) - return dispatcher.RpcDispatcher([rpc_callback]) + return [HyperVSecurityCallbackMixin(self)] class HyperVSecurityCallbackMixin(rpc_compat.RpcCallback, @@ -236,7 +234,7 @@ class HyperVNeutronAgent(rpc_compat.RpcCallback): segmentation_id, port['admin_state_up']) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _get_vswitch_name(self, network_type, physical_network): if network_type != p_const.TYPE_LOCAL: diff --git a/neutron/plugins/hyperv/rpc_callbacks.py b/neutron/plugins/hyperv/rpc_callbacks.py index dafc160e75..e967286d58 100644 --- a/neutron/plugins/hyperv/rpc_callbacks.py +++ b/neutron/plugins/hyperv/rpc_callbacks.py @@ -17,7 +17,6 @@ # @author: Alessandro Pilotti, Cloudbase Solutions Srl from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import dhcp_rpc_base @@ -48,8 +47,7 @@ class HyperVRpcCallbacks( If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def get_device_details(self, rpc_context, **kwargs): """Agent requests device details.""" diff --git a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py index 1a5190d90e..b1fa1e8b65 100644 --- a/neutron/plugins/ibm/agent/sdnve_neutron_agent.py +++ b/neutron/plugins/ibm/agent/sdnve_neutron_agent.py @@ -37,7 +37,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ibm.common import config # noqa from neutron.plugins.ibm.common import constants @@ -156,7 +155,7 @@ class SdnveNeutronAgent(rpc_compat.RpcCallback): "out-of-band") def create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def setup_integration_br(self, bridge_name, reset_br, out_of_band, controller_ip=None): diff --git a/neutron/plugins/ibm/sdnve_neutron_plugin.py b/neutron/plugins/ibm/sdnve_neutron_plugin.py index d3be17e517..8a6615f2e4 100644 --- a/neutron/plugins/ibm/sdnve_neutron_plugin.py +++ b/neutron/plugins/ibm/sdnve_neutron_plugin.py @@ -23,7 +23,6 @@ from oslo.config import cfg from neutron.common import constants as n_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -54,8 +53,7 @@ class SdnveRpcCallbacks(): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def sdnve_info(self, rpc_context, **kwargs): '''Update new information.''' diff --git a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py index d586b2eb9e..5af3f674a3 100755 --- a/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py +++ b/neutron/plugins/linuxbridge/agent/linuxbridge_neutron_agent.py @@ -45,7 +45,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.linuxbridge.common import config # noqa from neutron.plugins.linuxbridge.common import constants as lconst @@ -816,7 +815,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] class LinuxBridgePluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/linuxbridge/lb_neutron_plugin.py b/neutron/plugins/linuxbridge/lb_neutron_plugin.py index 9af9a616d5..61089f63cf 100644 --- a/neutron/plugins/linuxbridge/lb_neutron_plugin.py +++ b/neutron/plugins/linuxbridge/lb_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -72,8 +71,7 @@ class LinuxBridgeRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/midonet/plugin.py b/neutron/plugins/midonet/plugin.py index 3902278e46..4495dda01d 100644 --- a/neutron/plugins/midonet/plugin.py +++ b/neutron/plugins/midonet/plugin.py @@ -29,7 +29,6 @@ from sqlalchemy.orm import exc as sa_exc from neutron.api.v2 import attributes from neutron.common import constants from neutron.common import exceptions as n_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -189,8 +188,7 @@ class MidoRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] class MidonetPluginException(n_exc.NeutronException): diff --git a/neutron/plugins/ml2/rpc.py b/neutron/plugins/ml2/rpc.py index ff4e6e7bce..e5068afb4c 100644 --- a/neutron/plugins/ml2/rpc.py +++ b/neutron/plugins/ml2/rpc.py @@ -13,9 +13,10 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo import messaging + from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -46,13 +47,15 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, # 1.0 Initial version (from openvswitch/linuxbridge) # 1.1 Support Security Group RPC + # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due to + # inheritance problems + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, notifier, type_manager): # REVISIT(kmestery): This depends on the first three super classes # not having their own __init__ functions. If an __init__() is added # to one, this could break. Fix this and add a unit test to cover this # test in H3. - # FIXME(ihrachys): we can't use rpc_compat.RpcCallback here due - # to inheritance problems super(RpcCallbacks, self).__init__(notifier, type_manager) def create_rpc_dispatcher(self): @@ -61,8 +64,7 @@ class RpcCallbacks(dhcp_rpc_base.DhcpRpcCallbackMixin, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def _device_to_port_id(cls, device): diff --git a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py index 90a97ce443..94fd2b89a4 100644 --- a/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py +++ b/neutron/plugins/mlnx/agent/eswitch_neutron_agent.py @@ -35,7 +35,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.mlnx.agent import utils from neutron.plugins.mlnx.common import config # noqa @@ -218,7 +217,7 @@ class MlnxEswitchRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] class MlnxEswitchPluginApi(agent_rpc.PluginApi, diff --git a/neutron/plugins/mlnx/rpc_callbacks.py b/neutron/plugins/mlnx/rpc_callbacks.py index fff970c43a..0eda514368 100644 --- a/neutron/plugins/mlnx/rpc_callbacks.py +++ b/neutron/plugins/mlnx/rpc_callbacks.py @@ -17,7 +17,6 @@ from oslo.config import cfg from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import api as db_api @@ -48,8 +47,7 @@ class MlnxRpcCallbacks(rpc_compat.RpcCallback, or support more than one class as the target of RPC messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/nec/agent/nec_neutron_agent.py b/neutron/plugins/nec/agent/nec_neutron_agent.py index 38b13b5b7c..c1f580ac2e 100755 --- a/neutron/plugins/nec/agent/nec_neutron_agent.py +++ b/neutron/plugins/nec/agent/nec_neutron_agent.py @@ -38,7 +38,6 @@ from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.nec.common import config @@ -157,8 +156,7 @@ class NECNeutronAgent(object): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_nec, - self.callback_sg]) + self.dispatcher = [self.callback_nec, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/nec/common/config.py b/neutron/plugins/nec/common/config.py index ed35dcb17f..70f4a1a63d 100644 --- a/neutron/plugins/nec/common/config.py +++ b/neutron/plugins/nec/common/config.py @@ -18,7 +18,6 @@ from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common import rpc # noqa from neutron.plugins.nec.common import constants as nconst diff --git a/neutron/plugins/nec/nec_plugin.py b/neutron/plugins/nec/nec_plugin.py index e36f9d63ee..2bea5c04ef 100644 --- a/neutron/plugins/nec/nec_plugin.py +++ b/neutron/plugins/nec/nec_plugin.py @@ -22,7 +22,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes as attrs from neutron.common import constants as const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -147,12 +146,12 @@ class NECPluginV2(db_base_plugin_v2.NeutronDbPluginV2, # NOTE: callback_sg is referred to from the sg unit test. self.callback_sg = SecurityGroupServerRpcCallback() - callbacks = [NECPluginV2RPCCallbacks(self.safe_reference), - DhcpRpcCallback(), - L3RpcCallback(), - self.callback_sg, - agents_db.AgentExtRpcCallback()] - self.dispatcher = q_rpc.PluginRpcDispatcher(callbacks) + self.dispatcher = [ + NECPluginV2RPCCallbacks(self.safe_reference), + DhcpRpcCallback(), + L3RpcCallback(), + self.callback_sg, + agents_db.AgentExtRpcCallback()] for svc_topic in self.service_topics.values(): self.conn.create_consumer(svc_topic, self.dispatcher, fanout=False) # Consume from all consumers in a thread @@ -722,7 +721,7 @@ class NECPluginV2RPCCallbacks(rpc_compat.RpcCallback): If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self]) + return [self] def update_ports(self, rpc_context, **kwargs): """Update ports' information and activate/deavtivate them. diff --git a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py index 7ff3040b06..c79d77a915 100644 --- a/neutron/plugins/ofagent/agent/ofa_neutron_agent.py +++ b/neutron/plugins/ofagent/agent/ofa_neutron_agent.py @@ -39,7 +39,6 @@ from neutron.common import utils as n_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.ofagent.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -351,7 +350,7 @@ class OFANeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return dispatcher.RpcDispatcher([self]) + return [self] def _provision_local_vlan_outbound_for_tunnel(self, lvid, segmentation_id, ofports): diff --git a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py index d1d3daf6ef..0ef6348dfb 100644 --- a/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py +++ b/neutron/plugins/oneconvergence/agent/nvsd_neutron_agent.py @@ -32,7 +32,6 @@ from neutron.common import topics from neutron import context as n_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log as logging -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.oneconvergence.lib import config LOG = logging.getLogger(__name__) @@ -120,8 +119,7 @@ class NVSDNeutronAgent(rpc_compat.RpcCallback): self, self.sg_agent) self.callback_sg = SecurityGroupAgentRpcCallback(self.context, self.sg_agent) - self.dispatcher = dispatcher.RpcDispatcher([self.callback_oc, - self.callback_sg]) + self.dispatcher = [self.callback_oc, self.callback_sg] # Define the listening consumer for the agent consumers = [[topics.PORT, topics.UPDATE], [topics.SECURITY_GROUP, topics.UPDATE]] diff --git a/neutron/plugins/oneconvergence/plugin.py b/neutron/plugins/oneconvergence/plugin.py index 7d7af13b0d..732ead70ae 100644 --- a/neutron/plugins/oneconvergence/plugin.py +++ b/neutron/plugins/oneconvergence/plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const from neutron.common import exceptions as nexception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -61,8 +60,7 @@ class NVSDPluginRpcCallbacks(rpc_compat.RpcCallback, def create_rpc_dispatcher(self): """Get the rpc dispatcher for this manager.""" - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @staticmethod def get_port_from_device(device): diff --git a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py index e6a58567a4..31c6274848 100644 --- a/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py +++ b/neutron/plugins/openvswitch/agent/ovs_neutron_agent.py @@ -41,7 +41,6 @@ from neutron.common import utils as q_utils from neutron import context from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.common import constants as p_const from neutron.plugins.openvswitch.common import config # noqa from neutron.plugins.openvswitch.common import constants @@ -500,7 +499,7 @@ class OVSNeutronAgent(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return dispatcher.RpcDispatcher([self]) + return [self] def provision_local_vlan(self, net_uuid, network_type, physical_network, segmentation_id): diff --git a/neutron/plugins/openvswitch/ovs_neutron_plugin.py b/neutron/plugins/openvswitch/ovs_neutron_plugin.py index 01867c4164..5e3f387b0f 100644 --- a/neutron/plugins/openvswitch/ovs_neutron_plugin.py +++ b/neutron/plugins/openvswitch/ovs_neutron_plugin.py @@ -23,7 +23,6 @@ from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -80,8 +79,7 @@ class OVSRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return q_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] @classmethod def get_port_from_device(cls, device): diff --git a/neutron/plugins/ryu/agent/ryu_neutron_agent.py b/neutron/plugins/ryu/agent/ryu_neutron_agent.py index 746a0c2f54..6086113c7f 100755 --- a/neutron/plugins/ryu/agent/ryu_neutron_agent.py +++ b/neutron/plugins/ryu/agent/ryu_neutron_agent.py @@ -42,7 +42,6 @@ from neutron.common import topics from neutron import context as q_context from neutron.extensions import securitygroup as ext_sg from neutron.openstack.common import log -from neutron.openstack.common.rpc import dispatcher from neutron.plugins.ryu.common import config # noqa @@ -209,7 +208,7 @@ class OVSNeutronOFPRyuAgent(rpc_compat.RpcCallback, consumers) def _create_rpc_dispatcher(self): - return dispatcher.RpcDispatcher([self]) + return [self] def _setup_integration_br(self, root_helper, integ_br, tunnel_ip, ovsdb_port, ovsdb_ip): diff --git a/neutron/plugins/ryu/ryu_neutron_plugin.py b/neutron/plugins/ryu/ryu_neutron_plugin.py index 35065a41e5..787ccb21c7 100644 --- a/neutron/plugins/ryu/ryu_neutron_plugin.py +++ b/neutron/plugins/ryu/ryu_neutron_plugin.py @@ -23,7 +23,6 @@ from ryu.app import rest_nw_id from neutron.agent import securitygroups_rpc as sg_rpc from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as db @@ -59,7 +58,7 @@ class RyuRpcCallbacks(rpc_compat.RpcCallback, self.ofp_rest_api_addr = ofp_rest_api_addr def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def get_ofp_rest_api(self, context, **kwargs): LOG.debug(_("get_ofp_rest_api: %s"), self.ofp_rest_api_addr) diff --git a/neutron/plugins/vmware/dhcp_meta/rpc.py b/neutron/plugins/vmware/dhcp_meta/rpc.py index 057e94d970..c32a39b372 100644 --- a/neutron/plugins/vmware/dhcp_meta/rpc.py +++ b/neutron/plugins/vmware/dhcp_meta/rpc.py @@ -24,7 +24,6 @@ from neutron.api.rpc.agentnotifiers import dhcp_rpc_agent_api from neutron.api.v2 import attributes from neutron.common import constants as const from neutron.common import exceptions as ntn_exc -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.db import agents_db from neutron.db import db_base_plugin_v2 @@ -55,8 +54,7 @@ class NSXRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. ''' - return n_rpc.PluginRpcDispatcher([self, - agents_db.AgentExtRpcCallback()]) + return [self, agents_db.AgentExtRpcCallback()] def handle_network_dhcp_access(plugin, context, network, action): diff --git a/neutron/policy.py b/neutron/policy.py index 4c64432b6f..747638287f 100644 --- a/neutron/policy.py +++ b/neutron/policy.py @@ -26,7 +26,6 @@ from oslo.config import cfg from neutron.api.v2 import attributes from neutron.common import exceptions import neutron.common.utils as utils -from neutron import manager from neutron.openstack.common import excutils from neutron.openstack.common import importutils from neutron.openstack.common import log as logging @@ -263,6 +262,9 @@ class OwnerCheck(policy.Check): # resource is handled by the core plugin. It might be worth # having a way to map resources to plugins so to make this # check more general + # FIXME(ihrachys): if import is put in global, circular + # import failure occurs + from neutron import manager f = getattr(manager.NeutronManager.get_instance().plugin, 'get_%s' % parent_res) # f *must* exist, if not found it is better to let neutron diff --git a/neutron/service.py b/neutron/service.py index 9b3073b5fb..f14021769e 100644 --- a/neutron/service.py +++ b/neutron/service.py @@ -13,13 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. -import eventlet import inspect import logging as std_logging import os import random from oslo.config import cfg +from oslo.messaging import server as rpc_server from neutron.common import config from neutron.common import rpc_compat @@ -112,23 +112,25 @@ class RpcWorker(object): """Wraps a worker to be handled by ProcessLauncher""" def __init__(self, plugin): self._plugin = plugin - self._server = None + self._servers = [] def start(self): # We may have just forked from parent process. A quick disposal of the # existing sql connections avoids producing errors later when they are # discovered to be broken. session.get_engine().pool.dispose() - self._server = self._plugin.start_rpc_listener() + self._servers = self._plugin.start_rpc_listener() def wait(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.wait() + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.wait() def stop(self): - if isinstance(self._server, eventlet.greenthread.GreenThread): - self._server.kill() - self._server = None + for server in self._servers: + if isinstance(server, rpc_server.MessageHandlingServer): + server.kill() + self._servers = [] def serve_rpc(): diff --git a/neutron/services/firewall/fwaas_plugin.py b/neutron/services/firewall/fwaas_plugin.py index fd2131e219..0238902f3b 100644 --- a/neutron/services/firewall/fwaas_plugin.py +++ b/neutron/services/firewall/fwaas_plugin.py @@ -20,7 +20,6 @@ from oslo.config import cfg from neutron.common import exceptions as n_exception -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron import context as neutron_context @@ -42,7 +41,7 @@ class FirewallCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def set_firewall_status(self, context, firewall_id, status, **kwargs): """Agent uses this to set a firewall's status.""" diff --git a/neutron/services/l3_router/l3_router_plugin.py b/neutron/services/l3_router/l3_router_plugin.py index c5505817d5..29950c984d 100644 --- a/neutron/services/l3_router/l3_router_plugin.py +++ b/neutron/services/l3_router/l3_router_plugin.py @@ -21,7 +21,6 @@ from oslo.config import cfg from neutron.api.rpc.agentnotifiers import l3_rpc_agent_api from neutron.common import constants as q_const -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import api as qdbapi @@ -46,7 +45,7 @@ class L3RouterPluginRpcCallbacks(rpc_compat.RpcCallback, If a manager would like to set an rpc API version, or support more than one class as the target of rpc messages, override this method. """ - return q_rpc.PluginRpcDispatcher([self]) + return [self] class L3RouterPlugin(db_base_plugin_v2.CommonDbMixin, diff --git a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py index 8436cb8354..85be0bacd0 100644 --- a/neutron/services/loadbalancer/drivers/common/agent_driver_base.py +++ b/neutron/services/loadbalancer/drivers/common/agent_driver_base.py @@ -22,7 +22,6 @@ from oslo.config import cfg from neutron.common import constants as q_const from neutron.common import exceptions as n_exc -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.db import agents_db @@ -66,8 +65,7 @@ class LoadBalancerCallbacks(rpc_compat.RpcCallback): self.plugin = plugin def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher( - [self, agents_db.AgentExtRpcCallback(self.plugin)]) + return [self, agents_db.AgentExtRpcCallback(self.plugin)] def get_ready_devices(self, context, host=None): with context.session.begin(subtransactions=True): diff --git a/neutron/services/metering/agents/metering_agent.py b/neutron/services/metering/agents/metering_agent.py index ba1fe6bac2..80883f41b3 100644 --- a/neutron/services/metering/agents/metering_agent.py +++ b/neutron/services/metering/agents/metering_agent.py @@ -26,6 +26,7 @@ from neutron.agent.common import config from neutron.agent import rpc as agent_rpc from neutron.common import config as common_config from neutron.common import constants as constants +from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.common import topics from neutron.common import utils @@ -34,7 +35,6 @@ from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging from neutron.openstack.common import loopingcall -from neutron.openstack.common.notifier import api as notifier_api from neutron.openstack.common import periodic_task from neutron.openstack.common import service from neutron import service as neutron_service @@ -114,11 +114,8 @@ class MeteringAgent(MeteringPluginRpc, manager.Manager): 'host': self.host} LOG.debug(_("Send metering report: %s"), data) - notifier_api.notify(self.context, - notifier_api.publisher_id('metering'), - 'l3.meter', - notifier_api.CONF.default_notification_level, - data) + notifier = n_rpc.get_notifier('metering') + notifier.info(self.context, 'l3.meter', data) info['pkts'] = 0 info['bytes'] = 0 info['time'] = 0 diff --git a/neutron/services/vpn/device_drivers/cisco_ipsec.py b/neutron/services/vpn/device_drivers/cisco_ipsec.py index ba19460d77..12904f23e3 100644 --- a/neutron/services/vpn/device_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/device_drivers/cisco_ipsec.py @@ -20,10 +20,10 @@ import requests import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron import context as ctx from neutron.openstack.common import lockutils @@ -184,12 +184,13 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): # history # 1.0 Initial version - RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.host = host self.conn = rpc_compat.create_connection(new=True) context = ctx.get_admin_context_without_session() @@ -225,7 +226,7 @@ class CiscoCsrIPsecDriver(device_drivers.DeviceDriver): for k, v in csrs_found.items()]) def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def vpnservice_updated(self, context, **kwargs): """Handle VPNaaS service driver change notifications.""" diff --git a/neutron/services/vpn/device_drivers/ipsec.py b/neutron/services/vpn/device_drivers/ipsec.py index 0d9ded9604..2480eb2727 100644 --- a/neutron/services/vpn/device_drivers/ipsec.py +++ b/neutron/services/vpn/device_drivers/ipsec.py @@ -23,11 +23,11 @@ import shutil import jinja2 import netaddr from oslo.config import cfg +from oslo import messaging import six from neutron.agent.linux import ip_lib from neutron.agent.linux import utils -from neutron.common import rpc as q_rpc from neutron.common import rpc_compat from neutron import context from neutron.openstack.common import lockutils @@ -487,9 +487,11 @@ class IPsecDriver(device_drivers.DeviceDriver): RPC_API_VERSION = '1.0' + # TODO(ihrachys): we can't use RpcCallback here due to inheritance + # issues + target = messaging.Target(version=RPC_API_VERSION) + def __init__(self, agent, host): - # TODO(ihrachys): we can't use RpcCallback here due to - # inheritance issues self.agent = agent self.conf = self.agent.conf self.root_helper = self.agent.root_helper @@ -514,7 +516,7 @@ class IPsecDriver(device_drivers.DeviceDriver): interval=self.conf.ipsec.ipsec_status_check_interval) def create_rpc_dispatcher(self): - return q_rpc.PluginRpcDispatcher([self]) + return [self] def _update_nat(self, vpnservice, func): """Setting up nat rule in iptables. diff --git a/neutron/services/vpn/service_drivers/cisco_ipsec.py b/neutron/services/vpn/service_drivers/cisco_ipsec.py index 8565723197..c2b39da9e3 100644 --- a/neutron/services/vpn/service_drivers/cisco_ipsec.py +++ b/neutron/services/vpn/service_drivers/cisco_ipsec.py @@ -16,7 +16,6 @@ import netaddr from netaddr import core as net_exc from neutron.common import exceptions -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import excutils from neutron.openstack.common import log as logging @@ -55,7 +54,7 @@ class CiscoCsrIPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Retuns info on the vpnservices on the host.""" diff --git a/neutron/services/vpn/service_drivers/ipsec.py b/neutron/services/vpn/service_drivers/ipsec.py index cf4b055d89..13b7c171b4 100644 --- a/neutron/services/vpn/service_drivers/ipsec.py +++ b/neutron/services/vpn/service_drivers/ipsec.py @@ -16,7 +16,6 @@ # under the License. import netaddr -from neutron.common import rpc as n_rpc from neutron.common import rpc_compat from neutron.openstack.common import log as logging from neutron.services.vpn.common import topics @@ -42,7 +41,7 @@ class IPsecVpnDriverCallBack(rpc_compat.RpcCallback): self.driver = driver def create_rpc_dispatcher(self): - return n_rpc.PluginRpcDispatcher([self]) + return [self] def get_vpn_services_on_host(self, context, host=None): """Returns the vpnservices on the host.""" diff --git a/neutron/tests/base.py b/neutron/tests/base.py index 87412f9244..95034f6538 100644 --- a/neutron/tests/base.py +++ b/neutron/tests/base.py @@ -29,15 +29,14 @@ import eventlet.timeout import fixtures import mock from oslo.config import cfg +from oslo.messaging import conffixture as messaging_conffixture import testtools from neutron.common import config +from neutron.common import rpc as n_rpc from neutron.db import agentschedulers_db from neutron import manager -from neutron.openstack.common.notifier import api as notifier_api -from neutron.openstack.common.notifier import test_notifier -from neutron.openstack.common import rpc -from neutron.openstack.common.rpc import impl_fake +from neutron.tests import fake_notifier from neutron.tests import post_mortem_debug @@ -58,6 +57,10 @@ def fake_use_fatal_exceptions(*args): return True +def fake_consume_in_threads(self): + return [] + + class BaseTestCase(testtools.TestCase): def cleanup_core_plugin(self): @@ -90,16 +93,10 @@ class BaseTestCase(testtools.TestCase): if core_plugin is not None: cfg.CONF.set_override('core_plugin', core_plugin) - def _cleanup_test_notifier(self): - test_notifier.NOTIFICATIONS = [] - def setup_notification_driver(self, notification_driver=None): - # to reload the drivers - self.addCleanup(notifier_api._reset_drivers) - self.addCleanup(self._cleanup_test_notifier) - notifier_api._reset_drivers() + self.addCleanup(fake_notifier.reset) if notification_driver is None: - notification_driver = [test_notifier.__name__] + notification_driver = [fake_notifier.__name__] cfg.CONF.set_override("notification_driver", notification_driver) @staticmethod @@ -113,10 +110,6 @@ class BaseTestCase(testtools.TestCase): else: conf(args) - def _cleanup_rpc_backend(self): - rpc._RPCIMPL = None - impl_fake.CONSUMERS.clear() - def setUp(self): super(BaseTestCase, self).setUp() @@ -124,8 +117,6 @@ class BaseTestCase(testtools.TestCase): # test-specific cleanup has a chance to release references. self.addCleanup(self.cleanup_core_plugin) - self.addCleanup(self._cleanup_rpc_backend) - # Configure this first to ensure pm debugging support for setUp() if os.environ.get('OS_POST_MORTEM_DEBUG') in TRUE_STRING: self.addOnException(post_mortem_debug.exception_handler) @@ -179,6 +170,25 @@ class BaseTestCase(testtools.TestCase): 'neutron.common.exceptions.NeutronException.use_fatal_exceptions', fake_use_fatal_exceptions)) + # don't actually start RPC listeners when testing + self.useFixture(fixtures.MonkeyPatch( + 'neutron.common.rpc_compat.Connection.consume_in_thread', + fake_consume_in_threads)) + + self.useFixture(fixtures.MonkeyPatch( + 'oslo.messaging.Notifier', fake_notifier.FakeNotifier)) + + self.messaging_conf = messaging_conffixture.ConfFixture(CONF) + self.messaging_conf.transport_driver = 'fake' + self.messaging_conf.response_timeout = 15 + self.useFixture(self.messaging_conf) + + self.addCleanup(n_rpc.clear_extra_exmods) + n_rpc.add_extra_exmods('neutron.test') + + self.addCleanup(n_rpc.cleanup) + n_rpc.init(CONF) + if sys.version_info < (2, 7) and getattr(self, 'fmt', '') == 'xml': raise self.skipException('XML Testing Skipped in Py26') diff --git a/neutron/tests/fake_notifier.py b/neutron/tests/fake_notifier.py new file mode 100644 index 0000000000..012f3351eb --- /dev/null +++ b/neutron/tests/fake_notifier.py @@ -0,0 +1,50 @@ +# Copyright 2014 Red Hat, Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import collections +import functools + + +NOTIFICATIONS = [] + + +def reset(): + del NOTIFICATIONS[:] + + +FakeMessage = collections.namedtuple('Message', + ['publisher_id', 'priority', + 'event_type', 'payload']) + + +class FakeNotifier(object): + + def __init__(self, transport, publisher_id=None): + self.transport = transport + self.publisher_id = publisher_id + for priority in ('debug', 'info', 'warn', 'error', 'critical'): + setattr(self, priority, + functools.partial(self._notify, priority=priority.upper())) + + def prepare(self, publisher_id=None): + if publisher_id is None: + publisher_id = self.publisher_id + return self.__class__(self.transport, publisher_id) + + def _notify(self, ctxt, event_type, payload, priority): + msg = dict(publisher_id=self.publisher_id, + priority=priority, + event_type=event_type, + payload=payload) + NOTIFICATIONS.append(msg) diff --git a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py index 4af19fc546..965842738b 100644 --- a/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py +++ b/neutron/tests/unit/hyperv/test_hyperv_rpcapi.py @@ -23,9 +23,9 @@ Unit Tests for hyperv neutron rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.hyperv import agent_notifier_api as ana from neutron.plugins.hyperv.common import constants from neutron.tests import base @@ -38,19 +38,19 @@ class rpcHyperVApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False - with mock.patch.object(rpc, rpc_method) as rpc_method_mock: + proxy = rpc_compat.RpcProxy + with mock.patch.object(proxy, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = ana.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/linuxbridge/test_rpcapi.py b/neutron/tests/unit/linuxbridge/test_rpcapi.py index 762a65be1d..616a06acd9 100644 --- a/neutron/tests/unit/linuxbridge/test_rpcapi.py +++ b/neutron/tests/unit/linuxbridge/test_rpcapi.py @@ -35,7 +35,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -49,15 +48,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = plb.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/ml2/test_rpcapi.py b/neutron/tests/unit/ml2/test_rpcapi.py index a2d3bf0eb5..af48a74f17 100644 --- a/neutron/tests/unit/ml2/test_rpcapi.py +++ b/neutron/tests/unit/ml2/test_rpcapi.py @@ -20,9 +20,9 @@ Unit Tests for ml2 rpc import mock from neutron.agent import rpc as agent_rpc +from neutron.common import rpc_compat from neutron.common import topics from neutron.openstack.common import context -from neutron.openstack.common import rpc from neutron.plugins.ml2.drivers import type_tunnel from neutron.plugins.ml2 import rpc as plugin_rpc from neutron.tests import base @@ -34,20 +34,19 @@ class RpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False + rpc = rpc_compat.RpcProxy with mock.patch.object(rpc, rpc_method) as rpc_method_mock: rpc_method_mock.return_value = expected_retval retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - - expected_args = [ctxt, topic, expected_msg] - for arg, expected_arg in zip(rpc_method_mock.call_args[0], - expected_args): - self.assertEqual(arg, expected_arg) + expected = [ + mock.call(ctxt, expected_msg, topic=topic) + ] + rpc_method_mock.assert_has_calls(expected) def test_delete_network(self): rpcapi = plugin_rpc.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/mlnx/test_rpcapi.py b/neutron/tests/unit/mlnx/test_rpcapi.py index 80dcf78277..ea34a840b1 100644 --- a/neutron/tests/unit/mlnx/test_rpcapi.py +++ b/neutron/tests/unit/mlnx/test_rpcapi.py @@ -37,7 +37,6 @@ class rpcApiTestCase(base.BaseTestCase): expected_retval = 'foo' if method == 'call' else None if not expected_msg: expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -51,15 +50,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(expected_retval, retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(expected_arg, arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = agent_notify_api.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py index 1b6a7370a7..e8f75b9f4c 100644 --- a/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py +++ b/neutron/tests/unit/openvswitch/test_ovs_rpcapi.py @@ -34,7 +34,6 @@ class rpcApiTestCase(base.BaseTestCase): ctxt = context.RequestContext('fake_user', 'fake_project') expected_retval = 'foo' if method == 'call' else None expected_msg = rpcapi.make_msg(method, **kwargs) - expected_msg['version'] = rpcapi.BASE_RPC_API_VERSION if rpc_method == 'cast' and method == 'run_instance': kwargs['call'] = False @@ -48,15 +47,19 @@ class rpcApiTestCase(base.BaseTestCase): return expected_retval self.useFixture(fixtures.MonkeyPatch( - 'neutron.openstack.common.rpc.' + rpc_method, _fake_rpc_method)) + 'neutron.common.rpc_compat.RpcProxy.' + rpc_method, + _fake_rpc_method)) retval = getattr(rpcapi, method)(ctxt, **kwargs) self.assertEqual(retval, expected_retval) - expected_args = [ctxt, topic, expected_msg] + expected_args = [ctxt, expected_msg] + expected_kwargs = {'topic': topic} - for arg, expected_arg in zip(self.fake_args, expected_args): + # skip the first argument which is 'self' + for arg, expected_arg in zip(self.fake_args[1:], expected_args): self.assertEqual(arg, expected_arg) + self.assertEqual(expected_kwargs, self.fake_kwargs) def test_delete_network(self): rpcapi = povs.AgentNotifierApi(topics.AGENT) diff --git a/neutron/tests/unit/services/metering/test_metering_agent.py b/neutron/tests/unit/services/metering/test_metering_agent.py index 3e1d0db299..b3e3511fea 100644 --- a/neutron/tests/unit/services/metering/test_metering_agent.py +++ b/neutron/tests/unit/services/metering/test_metering_agent.py @@ -18,10 +18,10 @@ import mock from oslo.config import cfg from neutron.agent.common import config -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.services.metering.agents import metering_agent from neutron.tests import base +from neutron.tests import fake_notifier _uuid = uuidutils.generate_uuid @@ -96,8 +96,8 @@ class TestMeteringOperations(base.BaseTestCase): 'bytes': 444}} self.agent._metering_loop() - self.assertNotEqual(len(test_notifier.NOTIFICATIONS), 0) - for n in test_notifier.NOTIFICATIONS: + self.assertNotEqual(len(fake_notifier.NOTIFICATIONS), 0) + for n in fake_notifier.NOTIFICATIONS: if n['event_type'] == 'l3.meter': break diff --git a/neutron/tests/unit/test_agent_rpc.py b/neutron/tests/unit/test_agent_rpc.py index bc4ae4a178..569a739566 100644 --- a/neutron/tests/unit/test_agent_rpc.py +++ b/neutron/tests/unit/test_agent_rpc.py @@ -27,7 +27,7 @@ class AgentRPCPluginApi(base.BaseTestCase): agent = rpc.PluginApi('fake_topic') ctxt = context.RequestContext('fake_user', 'fake_project') expect_val = 'foo' - with mock.patch('neutron.openstack.common.rpc.call') as rpc_call: + with mock.patch('neutron.common.rpc_compat.RpcProxy.call') as rpc_call: rpc_call.return_value = expect_val func_obj = getattr(agent, method) if method == 'tunnel_sync': diff --git a/neutron/tests/unit/test_api_v2.py b/neutron/tests/unit/test_api_v2.py index c09dd21ee9..38d54f7cac 100644 --- a/neutron/tests/unit/test_api_v2.py +++ b/neutron/tests/unit/test_api_v2.py @@ -33,12 +33,12 @@ from neutron.api.v2 import router from neutron.common import exceptions as n_exc from neutron import context from neutron import manager -from neutron.openstack.common.notifier import api as notifer_api from neutron.openstack.common import policy as common_policy from neutron.openstack.common import uuidutils from neutron import policy from neutron import quota from neutron.tests import base +from neutron.tests import fake_notifier from neutron.tests.unit import testlib_api @@ -1242,41 +1242,42 @@ class V2Views(base.BaseTestCase): class NotificationTest(APIv2TestBase): - def _resource_op_notifier(self, opname, resource, expected_errors=False, - notification_level='INFO'): + + def setUp(self): + super(NotificationTest, self).setUp() + fake_notifier.reset() + + def _resource_op_notifier(self, opname, resource, expected_errors=False): initial_input = {resource: {'name': 'myname'}} instance = self.plugin.return_value instance.get_networks.return_value = initial_input instance.get_networks_count.return_value = 0 expected_code = exc.HTTPCreated.code - with mock.patch.object(notifer_api, 'notify') as mynotifier: - if opname == 'create': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.post_json( - _get_path('networks'), - initial_input, expect_errors=expected_errors) - if opname == 'update': - res = self.api.put_json( - _get_path('networks', id=_uuid()), - initial_input, expect_errors=expected_errors) - expected_code = exc.HTTPOk.code - if opname == 'delete': - initial_input[resource]['tenant_id'] = _uuid() - res = self.api.delete( - _get_path('networks', id=_uuid()), - expect_errors=expected_errors) - expected_code = exc.HTTPNoContent.code - expected = [mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".start", - notification_level, - mock.ANY), - mock.call(mock.ANY, - 'network.' + cfg.CONF.host, - resource + "." + opname + ".end", - notification_level, - mock.ANY)] - self.assertEqual(expected, mynotifier.call_args_list) + if opname == 'create': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.post_json( + _get_path('networks'), + initial_input, expect_errors=expected_errors) + if opname == 'update': + res = self.api.put_json( + _get_path('networks', id=_uuid()), + initial_input, expect_errors=expected_errors) + expected_code = exc.HTTPOk.code + if opname == 'delete': + initial_input[resource]['tenant_id'] = _uuid() + res = self.api.delete( + _get_path('networks', id=_uuid()), + expect_errors=expected_errors) + expected_code = exc.HTTPNoContent.code + + expected_events = ('.'.join([resource, opname, "start"]), + '.'.join([resource, opname, "end"])) + self.assertEqual(len(fake_notifier.NOTIFICATIONS), + len(expected_events)) + for msg, event in zip(fake_notifier.NOTIFICATIONS, expected_events): + self.assertEqual('INFO', msg['priority']) + self.assertEqual(event, msg['event_type']) + self.assertEqual(res.status_int, expected_code) def test_network_create_notifer(self): @@ -1288,11 +1289,6 @@ class NotificationTest(APIv2TestBase): def test_network_update_notifer(self): self._resource_op_notifier('update', 'network') - def test_network_create_notifer_with_log_level(self): - cfg.CONF.set_override('default_notification_level', 'DEBUG') - self._resource_op_notifier('create', 'network', - notification_level='DEBUG') - class DHCPNotificationTest(APIv2TestBase): def _test_dhcp_notifier(self, opname, resource, initial_input=None): diff --git a/neutron/tests/unit/test_l3_plugin.py b/neutron/tests/unit/test_l3_plugin.py index b02ba15f87..4eb80d0d33 100644 --- a/neutron/tests/unit/test_l3_plugin.py +++ b/neutron/tests/unit/test_l3_plugin.py @@ -38,9 +38,9 @@ from neutron.extensions import l3 from neutron import manager from neutron.openstack.common import importutils from neutron.openstack.common import log as logging -from neutron.openstack.common.notifier import test_notifier from neutron.openstack.common import uuidutils from neutron.plugins.common import constants as service_constants +from neutron.tests import fake_notifier from neutron.tests.unit import test_agent_ext_plugin from neutron.tests.unit import test_api_v2 from neutron.tests.unit import test_api_v2_extension @@ -660,7 +660,7 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): 'subnet.create.end', 'router.interface.create', 'router.interface.delete'] - test_notifier.NOTIFICATIONS = [] + fake_notifier.reset() with self.router() as r: with self.subnet() as s: body = self._router_interface_action('add', @@ -683,9 +683,9 @@ class L3NatTestCaseBase(L3NatTestCaseMixin): self.assertEqual( set(exp_notifications), - set(n['event_type'] for n in test_notifier.NOTIFICATIONS)) + set(n['event_type'] for n in fake_notifier.NOTIFICATIONS)) - for n in test_notifier.NOTIFICATIONS: + for n in fake_notifier.NOTIFICATIONS: if n['event_type'].startswith('router.interface.'): payload = n['payload']['router_interface'] self.assertIn('id', payload) diff --git a/requirements.txt b/requirements.txt index 5ba04f255f..f34177ab21 100644 --- a/requirements.txt +++ b/requirements.txt @@ -23,6 +23,7 @@ alembic>=0.4.1 six>=1.7.0 stevedore>=0.14 oslo.config>=1.2.1 +oslo.messaging>=1.3.0 oslo.rootwrap python-novaclient>=2.17.0 diff --git a/setup.cfg b/setup.cfg index cc4db51b9c..0eaaaed0f6 100644 --- a/setup.cfg +++ b/setup.cfg @@ -169,6 +169,14 @@ neutron.ml2.mechanism_drivers = fslsdn = neutron.plugins.ml2.drivers.mechanism_fslsdn:FslsdnMechanismDriver neutron.openstack.common.cache.backends = memory = neutron.openstack.common.cache._backends.memory:MemoryBackend +# These are for backwards compat with Icehouse notification_driver configuration values +oslo.messaging.notify.drivers = + neutron.openstack.common.notifier.log_notifier = oslo.messaging.notify._impl_log:LogDriver + neutron.openstack.common.notifier.no_op_notifier = oslo.messaging.notify._impl_noop:NoOpDriver + neutron.openstack.common.notifier.rpc_notifier2 = oslo.messaging.notify._impl_messaging:MessagingV2Driver + neutron.openstack.common.notifier.rpc_notifier = oslo.messaging.notify._impl_messaging:MessagingDriver + neutron.openstack.common.notifier.test_notifier = oslo.messaging.notify._impl_test:TestDriver + [build_sphinx] all_files = 1