fix rpc transport

Change-Id: Ie5709887c17e4d62ca10e281026465ab7561a023
This commit is contained in:
Eyal 2017-08-15 15:13:32 +03:00
parent e8217d87cf
commit 6ee573841e
3 changed files with 16 additions and 9 deletions

View File

@ -36,7 +36,7 @@ class RPCHook(hooks.PecanHook):
"""Create and attach an rpc to the request. """ """Create and attach an rpc to the request. """
def __init__(self, conf): def __init__(self, conf):
transport = messaging.get_transport(conf) transport = messaging.get_rpc_transport(conf)
target = oslo_messaging.Target(topic=conf.rpc_topic) target = oslo_messaging.Target(topic=conf.rpc_topic)
self.client = vitrage_rpc.get_client(transport, target) self.client = vitrage_rpc.get_client(transport, target)

View File

@ -44,7 +44,7 @@ class VitrageApiHandlerService(os_service.Service):
super(VitrageApiHandlerService, self).start() super(VitrageApiHandlerService, self).start()
transport = messaging.get_transport(self.conf) transport = messaging.get_rpc_transport(self.conf)
rabbit_hosts = self.conf.oslo_messaging_rabbit.rabbit_hosts rabbit_hosts = self.conf.oslo_messaging_rabbit.rabbit_hosts
target = oslo_messaging.Target(topic=self.conf.rpc_topic, target = oslo_messaging.Target(topic=self.conf.rpc_topic,
server=rabbit_hosts) server=rabbit_hosts)

View File

@ -12,7 +12,7 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
from oslo_log import log from oslo_log import log
import oslo_messaging import oslo_messaging as oslo_msg
# from oslo_messaging import serializer as oslo_serializer # from oslo_messaging import serializer as oslo_serializer
@ -25,18 +25,25 @@ TRANSPORTS = {}
def setup(): def setup():
# Set the default exchange under which topics are scoped # Set the default exchange under which topics are scoped
oslo_messaging.set_transport_defaults('vitrage') oslo_msg.set_transport_defaults('vitrage')
def get_transport(conf, url=None, optional=False, cache=True): def get_rpc_transport(conf, url=None, optional=False, cache=True):
return get_transport(conf, url, optional, cache, rpc=True)
def get_transport(conf, url=None, optional=False, cache=True, rpc=False):
"""Initialise the oslo_messaging layer.""" """Initialise the oslo_messaging layer."""
global TRANSPORTS, DEFAULT_URL global TRANSPORTS, DEFAULT_URL
cache_key = url or DEFAULT_URL cache_key = url or DEFAULT_URL
transport = TRANSPORTS.get(cache_key) transport = TRANSPORTS.get(cache_key)
if not transport or not cache: if not transport or not cache:
try: try:
transport = oslo_messaging.get_notification_transport(conf, url) if rpc:
except oslo_messaging.InvalidTransportURL as e: transport = oslo_msg.get_rpc_transport(conf, url)
else:
transport = oslo_msg.get_notification_transport(conf, url)
except oslo_msg.InvalidTransportURL as e:
if not optional or e.url: if not optional or e.url:
# NOTE(sileht): oslo_messaging is configured but unloadable # NOTE(sileht): oslo_messaging is configured but unloadable
# so reraise the exception # so reraise the exception
@ -51,7 +58,7 @@ def get_transport(conf, url=None, optional=False, cache=True):
def get_notification_listener(transport, targets, endpoints, def get_notification_listener(transport, targets, endpoints,
allow_requeue=False): allow_requeue=False):
"""Return a configured oslo_messaging notification listener.""" """Return a configured oslo_messaging notification listener."""
return oslo_messaging.get_notification_listener( return oslo_msg.get_notification_listener(
transport, targets, endpoints, executor='blocking', transport, targets, endpoints, executor='blocking',
allow_requeue=allow_requeue) allow_requeue=allow_requeue)
@ -60,7 +67,7 @@ class VitrageNotifier(object):
"""Allows writing to message bus""" """Allows writing to message bus"""
def __init__(self, conf, publisher_id, topic): def __init__(self, conf, publisher_id, topic):
transport = get_transport(conf) transport = get_transport(conf)
self.notifier = oslo_messaging.Notifier( self.notifier = oslo_msg.Notifier(
transport, transport,
driver='messagingv2', driver='messagingv2',
publisher_id=publisher_id, publisher_id=publisher_id,