Russell Bryant bb7a36546d Drop RpcProxy usage from neutron.agent.rpc.PluginApi
This patch removes the usage of the RpcProxy compatibility class from
the neutron.agent.rpc.PluginApi class.  The equivalent use of
oslo.messaging APIs have been put in place instead.  This simple
conversion had a pretty wide impact on unit tests, as well.

The security groups API was converted in this patch as well.  It was
necessary because the security group class is used as a mixin, so it
must be implemented the same way.  Unfortunately, the way this is used
as a mix-in is not consistent, so for now it's only conditionally
converted.

Finally, some other miscellaneous plugin specific interfaces were
converted as well.  Again, these were methods mixed-in for certain
plugins.

Note that there's one very minor functional difference in this patch.
The previous code set the base version to be '1.1'.  The right pattern
is for this to be set to '1.0'.  This version is the default version
specified by the client, telling the server that it must implement at
least this version to satisfy the request.  The default should be
'1.0' and methods that require higher than that should specify it.
From looking at other parts of the code, '1.0' vs '1.1' is not
actually important, as '1.1' was actually the addition of some
security group methods defined elsewhere.  The correction is more
about establishing the right pattern to follow.

Change-Id: I391c01e79943ef179d815ea602253720925ccce1
2014-11-19 17:18:34 +00:00

124 lines
4.8 KiB
Python

# Copyright (c) 2012 OpenStack Foundation.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
import itertools
from oslo import messaging
from neutron.common import rpc as n_rpc
from neutron.common import topics
from neutron.openstack.common.gettextutils import _LW
from neutron.openstack.common import log as logging
from neutron.openstack.common import timeutils
LOG = logging.getLogger(__name__)
def create_consumers(endpoints, prefix, topic_details):
"""Create agent RPC consumers.
:param endpoints: The list of endpoints to process the incoming messages.
:param prefix: Common prefix for the plugin/agent message queues.
:param topic_details: A list of topics. Each topic has a name, an
operation, and an optional host param keying the
subscription to topic.host for plugin calls.
:returns: A common Connection.
"""
connection = n_rpc.create_connection(new=True)
for details in topic_details:
topic, operation, node_name = itertools.islice(
itertools.chain(details, [None]), 3)
topic_name = topics.get_topic_name(prefix, topic, operation)
connection.create_consumer(topic_name, endpoints, fanout=True)
if node_name:
node_topic_name = '%s.%s' % (topic_name, node_name)
connection.create_consumer(node_topic_name,
endpoints,
fanout=False)
connection.consume_in_threads()
return connection
class PluginReportStateAPI(object):
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def report_state(self, context, agent_state, use_call=False):
cctxt = self.client.prepare()
kwargs = {
'agent_state': {'agent_state': agent_state},
'time': timeutils.strtime(),
}
method = cctxt.call if use_call else cctxt.cast
return method(context, 'report_state', **kwargs)
class PluginApi(object):
'''Agent side of the rpc API.
API version history:
1.0 - Initial version.
1.3 - get_device_details rpc signature upgrade to obtain 'host' and
return value to include fixed_ips and device_owner for
the device port
'''
def __init__(self, topic):
target = messaging.Target(topic=topic, version='1.0')
self.client = n_rpc.get_client(target)
def get_device_details(self, context, device, agent_id, host=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'get_device_details', device=device,
agent_id=agent_id, host=host)
def get_devices_details_list(self, context, devices, agent_id, host=None):
res = []
try:
cctxt = self.client.prepare(version='1.3')
res = cctxt.call(context, 'get_devices_details_list',
devices=devices, agent_id=agent_id, host=host)
except messaging.UnsupportedVersion:
# If the server has not been upgraded yet, a DVR-enabled agent
# may not work correctly, however it can function in 'degraded'
# mode, in that DVR routers may not be in the system yet, and
# it might be not necessary to retrieve info about the host.
LOG.warn(_LW('DVR functionality requires a server upgrade.'))
cctxt = self.client.prepare()
res = [
self.get_device_details(context, device, agent_id, host)
for device in devices
]
return res
def update_device_down(self, context, device, agent_id, host=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'update_device_down', device=device,
agent_id=agent_id, host=host)
def update_device_up(self, context, device, agent_id, host=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'update_device_up', device=device,
agent_id=agent_id, host=host)
def tunnel_sync(self, context, tunnel_ip, tunnel_type=None):
cctxt = self.client.prepare()
return cctxt.call(context, 'tunnel_sync', tunnel_ip=tunnel_ip,
tunnel_type=tunnel_type)