43c29716ee
LOG.exception adds traceback and exception information automatically, so we don't need to add it explicitly. Change-Id: I58ea46917b84c7d0ec72f52dfbb451fc85d2e8eb
85 lines
2.9 KiB
Python
85 lines
2.9 KiB
Python
# Copyright 2016 - Nokia
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
from oslo_log import log
|
|
import oslo_messaging as oslo_msg
|
|
|
|
# from oslo_messaging import serializer as oslo_serializer
|
|
|
|
LOG = log.getLogger(__name__)
|
|
|
|
DEFAULT_URL = "__default__"
|
|
TRANSPORTS = {}
|
|
# _SERIALIZER = oslo_serializer.JsonPayloadSerializer()
|
|
|
|
|
|
def setup():
|
|
# Set the default exchange under which topics are scoped
|
|
oslo_msg.set_transport_defaults('vitrage')
|
|
|
|
|
|
def get_rpc_transport(conf, url=None, optional=False, cache=True):
|
|
return get_transport(conf, url, optional, cache, rpc=True)
|
|
|
|
|
|
def get_transport(conf, url=None, optional=False, cache=True, rpc=False):
|
|
"""Initialise the oslo_messaging layer."""
|
|
global TRANSPORTS, DEFAULT_URL
|
|
cache_key = url or DEFAULT_URL + '_rpc' if rpc else ''
|
|
transport = TRANSPORTS.get(cache_key)
|
|
if not transport or not cache:
|
|
try:
|
|
if rpc:
|
|
transport = oslo_msg.get_rpc_transport(conf, url)
|
|
else:
|
|
transport = oslo_msg.get_notification_transport(conf, url)
|
|
except oslo_msg.InvalidTransportURL as e:
|
|
if not optional or e.url:
|
|
# NOTE(sileht): oslo_messaging is configured but unloadable
|
|
# so reraise the exception
|
|
raise
|
|
return None
|
|
else:
|
|
if cache:
|
|
TRANSPORTS[cache_key] = transport
|
|
return transport
|
|
|
|
|
|
def get_notification_listener(transport, targets, endpoints,
|
|
allow_requeue=False):
|
|
"""Return a configured oslo_messaging notification listener."""
|
|
return oslo_msg.get_notification_listener(
|
|
transport, targets, endpoints, executor='blocking',
|
|
allow_requeue=allow_requeue)
|
|
|
|
|
|
class VitrageNotifier(object):
|
|
"""Allows writing to message bus"""
|
|
def __init__(self, conf, publisher_id, topics):
|
|
transport = get_transport(conf)
|
|
self.notifier = oslo_msg.Notifier(
|
|
transport,
|
|
driver='messagingv2',
|
|
publisher_id=publisher_id,
|
|
topics=topics)
|
|
|
|
def notify(self, event_type, data):
|
|
LOG.debug('notify : ' + event_type + ' ' + str(data))
|
|
if self.notifier:
|
|
try:
|
|
self.notifier.info({}, event_type, data)
|
|
except Exception:
|
|
LOG.exception('Notifier cannot notify.')
|
|
else:
|
|
LOG.error('Notifier cannot notify')
|