rpc: stop using global conf object in some functions
This is an issue for a long time, we want to use a local object carried around. Change-Id: I2a1bfbade28622751df7046a384833c2560708fd
This commit is contained in:
parent
a2013e9d81
commit
163328ee3b
@ -478,7 +478,7 @@ class AlarmChange(base.Base):
|
|||||||
def _send_notification(event, payload):
|
def _send_notification(event, payload):
|
||||||
notification = event.replace(" ", "_")
|
notification = event.replace(" ", "_")
|
||||||
notification = "alarm.%s" % notification
|
notification = "alarm.%s" % notification
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(cfg.CONF)
|
||||||
notifier = messaging.get_notifier(transport, publisher_id="aodh.api")
|
notifier = messaging.get_notifier(transport, publisher_id="aodh.api")
|
||||||
# FIXME(sileht): perhaps we need to copy some infos from the
|
# FIXME(sileht): perhaps we need to copy some infos from the
|
||||||
# pecan request headers like nova does
|
# pecan request headers like nova does
|
||||||
|
@ -92,7 +92,7 @@ class Evaluator(object):
|
|||||||
except aodh.NotImplementedError:
|
except aodh.NotImplementedError:
|
||||||
pass
|
pass
|
||||||
notification = "alarm.state_transition"
|
notification = "alarm.state_transition"
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(cfg.CONF)
|
||||||
notifier = messaging.get_notifier(transport,
|
notifier = messaging.get_notifier(transport,
|
||||||
publisher_id="aodh.evaluator")
|
publisher_id="aodh.evaluator")
|
||||||
notifier.info(context.RequestContext(), notification, payload)
|
notifier.info(context.RequestContext(), notification, payload)
|
||||||
|
@ -13,7 +13,6 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_context import context
|
from oslo_context import context
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
from oslo_serialization import jsonutils
|
from oslo_serialization import jsonutils
|
||||||
@ -55,14 +54,14 @@ def setup():
|
|||||||
oslo_messaging.set_transport_defaults('aodh')
|
oslo_messaging.set_transport_defaults('aodh')
|
||||||
|
|
||||||
|
|
||||||
def get_transport(url=None, optional=False, cache=True):
|
def get_transport(conf, url=None, optional=False, cache=True):
|
||||||
"""Initialise the oslo_messaging layer."""
|
"""Initialise the oslo_messaging layer."""
|
||||||
global TRANSPORTS, DEFAULT_URL
|
global TRANSPORTS, DEFAULT_URL
|
||||||
cache_key = url or DEFAULT_URL
|
cache_key = url or DEFAULT_URL
|
||||||
transport = TRANSPORTS.get(cache_key)
|
transport = TRANSPORTS.get(cache_key)
|
||||||
if not transport or not cache:
|
if not transport or not cache:
|
||||||
try:
|
try:
|
||||||
transport = oslo_messaging.get_transport(cfg.CONF, url)
|
transport = oslo_messaging.get_transport(conf, url)
|
||||||
except oslo_messaging.InvalidTransportURL as e:
|
except oslo_messaging.InvalidTransportURL as e:
|
||||||
if not optional or e.url:
|
if not optional or e.url:
|
||||||
# NOTE(sileht): oslo_messaging is configured but unloadable
|
# NOTE(sileht): oslo_messaging is configured but unloadable
|
||||||
@ -75,10 +74,9 @@ def get_transport(url=None, optional=False, cache=True):
|
|||||||
return transport
|
return transport
|
||||||
|
|
||||||
|
|
||||||
def get_rpc_server(transport, topic, endpoint):
|
def get_rpc_server(conf, transport, topic, endpoint):
|
||||||
"""Return a configured oslo_messaging rpc server."""
|
"""Return a configured oslo_messaging rpc server."""
|
||||||
cfg.CONF.import_opt('host', 'aodh.service')
|
target = oslo_messaging.Target(server=conf.host, topic=topic)
|
||||||
target = oslo_messaging.Target(server=cfg.CONF.host, topic=topic)
|
|
||||||
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
serializer = RequestContextSerializer(JsonPayloadSerializer())
|
||||||
return oslo_messaging.get_rpc_server(transport, target,
|
return oslo_messaging.get_rpc_server(transport, target,
|
||||||
[endpoint], executor='eventlet',
|
[endpoint], executor='eventlet',
|
||||||
|
@ -39,7 +39,7 @@ LOG = log.getLogger(__name__)
|
|||||||
|
|
||||||
class RPCAlarmNotifier(object):
|
class RPCAlarmNotifier(object):
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(cfg.CONF)
|
||||||
self.client = messaging.get_rpc_client(
|
self.client = messaging.get_rpc_client(
|
||||||
transport, topic=cfg.CONF.notifier_rpc_topic,
|
transport, topic=cfg.CONF.notifier_rpc_topic,
|
||||||
version="1.0")
|
version="1.0")
|
||||||
|
@ -225,9 +225,9 @@ class AlarmNotifierService(os_service.Service):
|
|||||||
|
|
||||||
def __init__(self):
|
def __init__(self):
|
||||||
super(AlarmNotifierService, self).__init__()
|
super(AlarmNotifierService, self).__init__()
|
||||||
transport = messaging.get_transport()
|
transport = messaging.get_transport(cfg.CONF)
|
||||||
self.rpc_server = messaging.get_rpc_server(
|
self.rpc_server = messaging.get_rpc_server(
|
||||||
transport, cfg.CONF.notifier_rpc_topic, self)
|
cfg.CONF, transport, cfg.CONF.notifier_rpc_topic, self)
|
||||||
|
|
||||||
self.notifiers = extension.ExtensionManager(
|
self.notifiers = extension.ExtensionManager(
|
||||||
self.NOTIFIER_EXTENSIONS_NAMESPACE,
|
self.NOTIFIER_EXTENSIONS_NAMESPACE,
|
||||||
|
@ -46,7 +46,7 @@ class BaseTestCase(base.BaseTestCase):
|
|||||||
|
|
||||||
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
|
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
|
||||||
# between each tests
|
# between each tests
|
||||||
self.transport = messaging.get_transport("fake://", cache=False)
|
self.transport = messaging.get_transport(conf, "fake://", cache=False)
|
||||||
self.useFixture(mockpatch.Patch(
|
self.useFixture(mockpatch.Patch(
|
||||||
'aodh.messaging.get_transport',
|
'aodh.messaging.get_transport',
|
||||||
return_value=self.transport))
|
return_value=self.transport))
|
||||||
|
@ -27,39 +27,39 @@ class MessagingTests(base.BaseTestCase):
|
|||||||
|
|
||||||
def test_get_transport_invalid_url(self):
|
def test_get_transport_invalid_url(self):
|
||||||
self.assertRaises(oslo_messaging.InvalidTransportURL,
|
self.assertRaises(oslo_messaging.InvalidTransportURL,
|
||||||
messaging.get_transport, "notvalid!")
|
messaging.get_transport, self.CONF, "notvalid!")
|
||||||
|
|
||||||
def test_get_transport_url_caching(self):
|
def test_get_transport_url_caching(self):
|
||||||
t1 = messaging.get_transport('fake://')
|
t1 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
t2 = messaging.get_transport('fake://')
|
t2 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
self.assertEqual(t1, t2)
|
self.assertEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_caching(self):
|
def test_get_transport_default_url_caching(self):
|
||||||
t1 = messaging.get_transport()
|
t1 = messaging.get_transport(self.CONF, )
|
||||||
t2 = messaging.get_transport()
|
t2 = messaging.get_transport(self.CONF, )
|
||||||
self.assertEqual(t1, t2)
|
self.assertEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_no_caching(self):
|
def test_get_transport_default_url_no_caching(self):
|
||||||
t1 = messaging.get_transport(cache=False)
|
t1 = messaging.get_transport(self.CONF, cache=False)
|
||||||
t2 = messaging.get_transport(cache=False)
|
t2 = messaging.get_transport(self.CONF, cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_url_no_caching(self):
|
def test_get_transport_url_no_caching(self):
|
||||||
t1 = messaging.get_transport('fake://', cache=False)
|
t1 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
t2 = messaging.get_transport('fake://', cache=False)
|
t2 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_default_url_caching_mix(self):
|
def test_get_transport_default_url_caching_mix(self):
|
||||||
t1 = messaging.get_transport()
|
t1 = messaging.get_transport(self.CONF, )
|
||||||
t2 = messaging.get_transport(cache=False)
|
t2 = messaging.get_transport(self.CONF, cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_url_caching_mix(self):
|
def test_get_transport_url_caching_mix(self):
|
||||||
t1 = messaging.get_transport('fake://')
|
t1 = messaging.get_transport(self.CONF, 'fake://')
|
||||||
t2 = messaging.get_transport('fake://', cache=False)
|
t2 = messaging.get_transport(self.CONF, 'fake://', cache=False)
|
||||||
self.assertNotEqual(t1, t2)
|
self.assertNotEqual(t1, t2)
|
||||||
|
|
||||||
def test_get_transport_optional(self):
|
def test_get_transport_optional(self):
|
||||||
self.CONF.set_override('rpc_backend', '')
|
self.CONF.set_override('rpc_backend', '')
|
||||||
self.assertIsNone(messaging.get_transport(optional=True,
|
self.assertIsNone(messaging.get_transport(self.CONF, optional=True,
|
||||||
cache=False))
|
cache=False))
|
||||||
|
@ -28,9 +28,9 @@ from aodh.tests import base as tests_base
|
|||||||
|
|
||||||
|
|
||||||
class FakeNotifier(object):
|
class FakeNotifier(object):
|
||||||
def __init__(self, transport):
|
def __init__(self, conf, transport):
|
||||||
self.rpc = messaging.get_rpc_server(
|
self.rpc = messaging.get_rpc_server(
|
||||||
transport, "alarm_notifier", self)
|
conf, transport, "alarm_notifier", self)
|
||||||
self.notified = []
|
self.notified = []
|
||||||
|
|
||||||
def start(self, expected_length):
|
def start(self, expected_length):
|
||||||
@ -49,7 +49,7 @@ class TestRPCAlarmNotifier(tests_base.BaseTestCase):
|
|||||||
self.CONF = self.useFixture(fixture_config.Config()).conf
|
self.CONF = self.useFixture(fixture_config.Config()).conf
|
||||||
self.setup_messaging(self.CONF)
|
self.setup_messaging(self.CONF)
|
||||||
|
|
||||||
self.notifier_server = FakeNotifier(self.transport)
|
self.notifier_server = FakeNotifier(self.CONF, self.transport)
|
||||||
self.notifier = rpc.RPCAlarmNotifier()
|
self.notifier = rpc.RPCAlarmNotifier()
|
||||||
self.alarms = [
|
self.alarms = [
|
||||||
alarms.Alarm(None, info={
|
alarms.Alarm(None, info={
|
||||||
|
Loading…
x
Reference in New Issue
Block a user