diff --git a/vitrage/api/hooks.py b/vitrage/api/hooks.py index a1ee96279..6c965352e 100644 --- a/vitrage/api/hooks.py +++ b/vitrage/api/hooks.py @@ -36,7 +36,7 @@ class RPCHook(hooks.PecanHook): """Create and attach an rpc to the request. """ def __init__(self, conf): - transport = messaging.get_transport(conf) + transport = messaging.get_rpc_transport(conf) target = oslo_messaging.Target(topic=conf.rpc_topic) self.client = vitrage_rpc.get_client(transport, target) diff --git a/vitrage/api_handler/service.py b/vitrage/api_handler/service.py index 18fc69090..2fd507ff9 100644 --- a/vitrage/api_handler/service.py +++ b/vitrage/api_handler/service.py @@ -44,7 +44,7 @@ class VitrageApiHandlerService(os_service.Service): super(VitrageApiHandlerService, self).start() - transport = messaging.get_transport(self.conf) + transport = messaging.get_rpc_transport(self.conf) rabbit_hosts = self.conf.oslo_messaging_rabbit.rabbit_hosts target = oslo_messaging.Target(topic=self.conf.rpc_topic, server=rabbit_hosts) diff --git a/vitrage/messaging.py b/vitrage/messaging.py index c1f9c8489..5465afef9 100644 --- a/vitrage/messaging.py +++ b/vitrage/messaging.py @@ -12,7 +12,7 @@ # License for the specific language governing permissions and limitations # under the License. from oslo_log import log -import oslo_messaging +import oslo_messaging as oslo_msg # from oslo_messaging import serializer as oslo_serializer @@ -25,18 +25,25 @@ TRANSPORTS = {} def setup(): # Set the default exchange under which topics are scoped - oslo_messaging.set_transport_defaults('vitrage') + oslo_msg.set_transport_defaults('vitrage') -def get_transport(conf, url=None, optional=False, cache=True): +def get_rpc_transport(conf, url=None, optional=False, cache=True): + return get_transport(conf, url, optional, cache, rpc=True) + + +def get_transport(conf, url=None, optional=False, cache=True, rpc=False): """Initialise the oslo_messaging layer.""" global TRANSPORTS, DEFAULT_URL cache_key = url or DEFAULT_URL transport = TRANSPORTS.get(cache_key) if not transport or not cache: try: - transport = oslo_messaging.get_notification_transport(conf, url) - except oslo_messaging.InvalidTransportURL as e: + if rpc: + transport = oslo_msg.get_rpc_transport(conf, url) + else: + transport = oslo_msg.get_notification_transport(conf, url) + except oslo_msg.InvalidTransportURL as e: if not optional or e.url: # NOTE(sileht): oslo_messaging is configured but unloadable # so reraise the exception @@ -51,7 +58,7 @@ def get_transport(conf, url=None, optional=False, cache=True): def get_notification_listener(transport, targets, endpoints, allow_requeue=False): """Return a configured oslo_messaging notification listener.""" - return oslo_messaging.get_notification_listener( + return oslo_msg.get_notification_listener( transport, targets, endpoints, executor='blocking', allow_requeue=allow_requeue) @@ -60,7 +67,7 @@ class VitrageNotifier(object): """Allows writing to message bus""" def __init__(self, conf, publisher_id, topic): transport = get_transport(conf) - self.notifier = oslo_messaging.Notifier( + self.notifier = oslo_msg.Notifier( transport, driver='messagingv2', publisher_id=publisher_id,