Don't keep a single global TRANSPORT object

To keep the global state of messaging like the old olso-incubator
library does, we have created a global TRANSPORT object.

But since the test use the fake:// driver of oslo.messaging in tests,
this transport is shared between tests, but some tests use 'fake://',
other the default one, and someother disable the transport.

This change ensures that a different transport is used for
code tests.

Change-Id: I22317527cc4fb44ea1fb9642586e8cdcbc97030b
This commit is contained in:
Mehdi Abaakouk 2014-06-12 15:02:56 +02:00
parent 3efe03b60a
commit 253d99026e
21 changed files with 175 additions and 170 deletions

View File

@ -42,8 +42,9 @@ LOG = log.getLogger(__name__)
class RPCAlarmNotifier(object):
def __init__(self):
transport = messaging.get_transport()
self.client = messaging.get_rpc_client(
topic=cfg.CONF.alarm.notifier_rpc_topic,
transport, topic=cfg.CONF.alarm.notifier_rpc_topic,
version="1.0")
def notify(self, alarm, previous, reason, reason_data):
@ -68,8 +69,9 @@ class RPCAlarmNotifier(object):
class RPCAlarmPartitionCoordination(object):
def __init__(self):
transport = messaging.get_transport()
self.client = messaging.get_rpc_client(
topic=cfg.CONF.alarm.partition_rpc_topic,
transport, topic=cfg.CONF.alarm.partition_rpc_topic,
version="1.0")
def presence(self, uuid, priority):

View File

@ -139,8 +139,9 @@ class PartitionedAlarmService(AlarmService, os_service.Service):
def __init__(self):
super(PartitionedAlarmService, self).__init__()
transport = messaging.get_transport()
self.rpc_server = messaging.get_rpc_server(
cfg.CONF.alarm.partition_rpc_topic, self)
transport, cfg.CONF.alarm.partition_rpc_topic, self)
self._load_evaluators()
self.api_client = None
@ -193,8 +194,9 @@ class AlarmNotifierService(os_service.Service):
def __init__(self):
super(AlarmNotifierService, self).__init__()
transport = messaging.get_transport()
self.rpc_server = messaging.get_rpc_server(
cfg.CONF.alarm.notifier_rpc_topic, self)
transport, cfg.CONF.alarm.notifier_rpc_topic, self)
self.notifiers = extension.ExtensionManager(self.EXTENSIONS_NAMESPACE,
invoke_on_load=True)

View File

@ -604,7 +604,8 @@ def _make_link(rel_name, url, type, type_arg, query=None):
def _send_notification(event, payload):
notification = event.replace(" ", "_")
notification = "alarm.%s" % notification
notifier = messaging.get_notifier(publisher_id="ceilometer.api")
transport = messaging.get_transport()
notifier = messaging.get_notifier(transport, publisher_id="ceilometer.api")
# FIXME(sileht): perhaps we need to copy some infos from the
# pecan request headers like nova does
notifier.info(context.RequestContext(), notification, payload)

View File

@ -57,9 +57,10 @@ class CollectorService(os_service.Service):
if cfg.CONF.collector.udp_address:
self.tg.add_thread(self.start_udp)
if messaging.TRANSPORT is not None:
transport = messaging.get_transport(optional=True)
if transport:
self.rpc_server = messaging.get_rpc_server(
cfg.CONF.publisher_rpc.metering_topic, self)
transport, cfg.CONF.publisher_rpc.metering_topic, self)
self.rpc_server.start()
if not cfg.CONF.collector.udp_address:

View File

@ -15,16 +15,14 @@
# License for the specific language governing permissions and limitations
# under the License.
import eventlet
from oslo.config import cfg
import oslo.messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common import jsonutils
TRANSPORT = None
NOTIFIER = None
DEFAULT_URL = "__default__"
TRANSPORTS = {}
_ALIASES = {
'ceilometer.openstack.common.rpc.impl_kombu': 'rabbit',
@ -62,75 +60,69 @@ class JsonPayloadSerializer(oslo.messaging.NoOpSerializer):
return jsonutils.to_primitive(entity, convert_instances=True)
def setup(url=None, optional=False):
"""Initialise the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
if url and url.startswith("fake://"):
# NOTE(sileht): oslo.messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
eventlet.monkey_patch(time=True)
if not TRANSPORT:
def setup():
oslo.messaging.set_transport_defaults('ceilometer')
def get_transport(url=None, optional=False, cache=True):
"""Initialise the oslo.messaging layer."""
global TRANSPORTS, DEFAULT_URL
transport = TRANSPORTS.get(url)
if not transport:
try:
TRANSPORT = oslo.messaging.get_transport(cfg.CONF, url,
transport = oslo.messaging.get_transport(cfg.CONF, url,
aliases=_ALIASES)
except oslo.messaging.InvalidTransportURL as e:
TRANSPORT = None
if not optional or e.url:
# NOTE(sileht): oslo.messaging is configured but unloadable
# so reraise the exception
raise
if not NOTIFIER and TRANSPORT:
serializer = RequestContextSerializer(JsonPayloadSerializer())
NOTIFIER = oslo.messaging.Notifier(TRANSPORT, serializer=serializer)
return None
else:
if not url:
url = DEFAULT_URL
if cache:
TRANSPORTS[url] = transport
return transport
def cleanup():
"""Cleanup the oslo.messaging layer."""
global TRANSPORT, NOTIFIER
if TRANSPORT:
TRANSPORT.cleanup()
TRANSPORT = NOTIFIER = None
global TRANSPORTS, NOTIFIERS
NOTIFIERS = {}
for url in TRANSPORTS:
TRANSPORTS[url].cleanup()
del TRANSPORTS[url]
def get_rpc_server(topic, endpoint):
def get_rpc_server(transport, topic, endpoint):
"""Return a configured oslo.messaging rpc server."""
global TRANSPORT
target = oslo.messaging.Target(server=cfg.CONF.host, topic=topic)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.get_rpc_server(TRANSPORT, target, [endpoint],
executor='eventlet',
return oslo.messaging.get_rpc_server(transport, target,
[endpoint], executor='eventlet',
serializer=serializer)
def get_rpc_client(**kwargs):
def get_rpc_client(transport, **kwargs):
"""Return a configured oslo.messaging RPCClient."""
global TRANSPORT
target = oslo.messaging.Target(**kwargs)
serializer = RequestContextSerializer(JsonPayloadSerializer())
return oslo.messaging.RPCClient(TRANSPORT, target,
return oslo.messaging.RPCClient(transport, target,
serializer=serializer)
def get_notification_listener(targets, endpoints, url=None):
def get_notification_listener(transport, targets, endpoints):
"""Return a configured oslo.messaging notification listener."""
global TRANSPORT
if url:
transport = oslo.messaging.get_transport(cfg.CONF, url,
_ALIASES)
else:
transport = TRANSPORT
return oslo.messaging.get_notification_listener(
transport, targets, endpoints, executor='eventlet')
def get_notifier(publisher_id):
def get_notifier(transport, publisher_id):
"""Return a configured oslo.messaging notifier."""
global NOTIFIER
return NOTIFIER.prepare(publisher_id=publisher_id)
serializer = RequestContextSerializer(JsonPayloadSerializer())
notifier = oslo.messaging.Notifier(transport, serializer=serializer)
return notifier.prepare(publisher_id=publisher_id)
def convert_to_old_notification_format(priority, ctxt, publisher_id,

View File

@ -63,6 +63,15 @@ class NotificationService(os_service.Service):
def start(self):
super(NotificationService, self).start()
# FIXME(sileht): endpoint use notification_topics option
# and it should not because this is oslo.messaging option
# not a ceilometer, until we have a something to get
# the notification_topics in an other way
# we must create a transport to ensure the option have
# beeen registered by oslo.messaging
transport = messaging.get_transport()
messaging.get_notifier(transport, '')
self.pipeline_manager = pipeline.setup_pipeline()
self.notification_manager = self._get_notifications_manager(
@ -91,9 +100,9 @@ class NotificationService(os_service.Service):
urls = cfg.CONF.notification.messaging_urls or [None]
self.listeners = []
for url in urls:
listener = messaging.get_notification_listener(targets,
endpoints,
url)
transport = messaging.get_transport(url)
listener = messaging.get_notification_listener(
transport, targets, endpoints)
listener.start()
self.listeners.append(listener)

View File

@ -106,7 +106,8 @@ class RPCPublisher(publisher.PublisherBase):
% self.policy)
self.policy = 'default'
self.rpc_client = messaging.get_rpc_client(version='1.0')
transport = messaging.get_transport()
self.rpc_client = messaging.get_rpc_client(transport, version='1.0')
def publish_samples(self, context, samples):
"""Publish samples on RPC.

View File

@ -25,19 +25,17 @@ import mock
from six import moves
from ceilometer.alarm.partition import coordination
from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
class TestCoordinate(test.BaseTestCase):
class TestCoordinate(tests_base.BaseTestCase):
def setUp(self):
super(TestCoordinate, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.setup_messaging(self.CONF)
self.test_interval = 120
self.CONF.set_override('evaluation_interval',
@ -425,7 +423,7 @@ class TestCoordinate(test.BaseTestCase):
self.output.getvalue())
class TestPartitionIdentity(test.BaseTestCase):
class TestPartitionIdentity(tests_base.BaseTestCase):
def setUp(self):
super(TestPartitionIdentity, self).setUp()
self.id_1st = coordination.PartitionIdentity(str(uuid.uuid4()), 1)

View File

@ -20,11 +20,10 @@ import mock
import requests
from ceilometer.alarm import service
from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.tests import base as tests_base
DATA_JSON = ('{"current": "ALARM", "alarm_id": "foobar",'
@ -38,14 +37,12 @@ NOTIFICATION = dict(alarm_id='foobar',
current='ALARM')
class TestAlarmNotifier(test.BaseTestCase):
class TestAlarmNotifier(tests_base.BaseTestCase):
def setUp(self):
super(TestAlarmNotifier, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
self.service = service.AlarmNotifierService()
@mock.patch('ceilometer.pipeline.setup_pipeline', mock.MagicMock())

View File

@ -21,16 +21,13 @@ import mock
from stevedore import extension
from ceilometer.alarm import service
from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import test
from ceilometer.tests import base as tests_base
class TestPartitionedAlarmService(test.BaseTestCase):
class TestPartitionedAlarmService(tests_base.BaseTestCase):
def setUp(self):
super(TestPartitionedAlarmService, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.threshold_eval = mock.Mock()
self.api_client = mock.MagicMock()
@ -41,6 +38,8 @@ class TestPartitionedAlarmService(test.BaseTestCase):
self.CONF.set_override('partition_rpc_topic',
'fake_topic',
group='alarm')
self.setup_messaging(self.CONF)
self.partitioned = service.PartitionedAlarmService()
self.partitioned.tg = mock.Mock()
self.partitioned.partition_coordinator = mock.Mock()

View File

@ -22,14 +22,16 @@ import eventlet
from ceilometer.alarm import rpc as rpc_alarm
from ceilometer import messaging
from ceilometer.openstack.common import test
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import timeutils
from ceilometer.storage import models
from ceilometer.tests import base
class FakeNotifier(object):
def __init__(self):
self.rpc = messaging.get_rpc_server("alarm_notifier", self)
def __init__(self, transport):
self.rpc = messaging.get_rpc_server(
transport, "alarm_notifier", self)
self.notified = []
def start(self, expected_length):
@ -42,13 +44,13 @@ class FakeNotifier(object):
self.rpc.stop()
class TestRPCAlarmNotifier(test.BaseTestCase):
class TestRPCAlarmNotifier(base.BaseTestCase):
def setUp(self):
super(TestRPCAlarmNotifier, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
self.notifier_server = FakeNotifier()
self.notifier_server = FakeNotifier(self.transport)
self.notifier = rpc_alarm.RPCAlarmNotifier()
self.alarms = [
alarms.Alarm(None, info={
@ -144,9 +146,9 @@ class TestRPCAlarmNotifier(test.BaseTestCase):
class FakeCoordinator(object):
def __init__(self):
def __init__(self, transport):
self.rpc = messaging.get_rpc_server(
"alarm_partition_coordination", self)
transport, "alarm_partition_coordination", self)
self.notified = []
def presence(self, context, data):
@ -163,13 +165,13 @@ class FakeCoordinator(object):
self.rpc.stop()
class TestRPCAlarmPartitionCoordination(test.BaseTestCase):
class TestRPCAlarmPartitionCoordination(base.BaseTestCase):
def setUp(self):
super(TestRPCAlarmPartitionCoordination, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
self.coordinator_server = FakeCoordinator()
self.coordinator_server = FakeCoordinator(self.transport)
self.coordinator_server.rpc.start()
eventlet.sleep() # must be sure that fanout queue is created

View File

@ -18,20 +18,19 @@
"""
import mock
from oslo.config import cfg
from stevedore import extension
from ceilometer.alarm import service
from ceilometer import messaging
from ceilometer.openstack.common import test
from ceilometer.openstack.common.fixture import config
from ceilometer.tests import base as tests_base
class TestSingletonAlarmService(test.BaseTestCase):
class TestSingletonAlarmService(tests_base.BaseTestCase):
def setUp(self):
super(TestSingletonAlarmService, self).setUp()
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
self.threshold_eval = mock.Mock()
self.evaluators = extension.ExtensionManager.make_test_instance(
@ -51,7 +50,7 @@ class TestSingletonAlarmService(test.BaseTestCase):
def test_start(self):
test_interval = 120
cfg.CONF.set_override('evaluation_interval',
self.CONF.set_override('evaluation_interval',
test_interval,
group='alarm')
with mock.patch('ceilometerclient.client.get_client',
@ -90,13 +89,13 @@ class TestSingletonAlarmService(test.BaseTestCase):
def test_singleton_endpoint_types(self):
endpoint_types = ["internalURL", "publicURL"]
for endpoint_type in endpoint_types:
cfg.CONF.set_override('os_endpoint_type',
self.CONF.set_override('os_endpoint_type',
endpoint_type,
group='service_credentials')
with mock.patch('ceilometerclient.client.get_client') as client:
self.singleton.api_client = None
self.singleton._evaluate_assigned_alarms()
conf = cfg.CONF.service_credentials
conf = self.CONF.service_credentials
expected = [mock.call(2,
os_auth_url=conf.os_auth_url,
os_region_name=conf.os_region_name,

View File

@ -18,11 +18,10 @@
"""
from oslo.config import cfg
import oslo.messaging.conffixture
import pecan
import pecan.testing
from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.tests import db as db_test_base
OPT_GROUP_NAME = 'keystone_authtoken'
@ -39,10 +38,9 @@ class FunctionalTest(db_test_base.TestBase):
def setUp(self):
super(FunctionalTest, self).setUp()
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
self.CONF.set_override("notification_driver", "messaging")
messaging.setup("fake://")
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
self.CONF.set_override("auth_version", "v2.0",
group=OPT_GROUP_NAME)
self.CONF.set_override("policy_file",

View File

@ -1933,10 +1933,9 @@ class TestAlarms(FunctionalTest,
}
endpoint = mock.MagicMock()
target = oslo.messaging.Target(topic="notifications",
exchange="ceilometer")
listener = messaging.get_notification_listener([target],
[endpoint])
target = oslo.messaging.Target(topic="notifications")
listener = messaging.get_notification_listener(
self.transport, [target], [endpoint])
listener.start()
endpoint.info.side_effect = lambda *args: listener.stop()
self.post_json('/alarms', params=json, headers=self.auth_headers)
@ -1975,7 +1974,8 @@ class TestAlarms(FunctionalTest,
self.delete('/alarms/%s' % data[0]['alarm_id'],
headers=self.auth_headers, status=204)
get_notifier.assert_called_once_with(publisher_id='ceilometer.api')
get_notifier.assert_called_once_with(mock.ANY,
publisher_id='ceilometer.api')
calls = notifier.info.call_args_list
self.assertEqual(1, len(calls))

View File

@ -35,7 +35,7 @@ class TestPostSamples(FunctionalTest,
del m['message_signature']
self.published.append(data)
def fake_get_rpc_client(self, **kwargs):
def fake_get_rpc_client(self, *args, **kwargs):
cast_ctxt = mock.Mock()
cast_ctxt.cast.side_effect = self.fake_cast
client = mock.Mock()

View File

@ -21,13 +21,36 @@ import functools
import os.path
import six
import eventlet
import oslo.messaging
from testtools import testcase
from ceilometer import messaging
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer.openstack.common import test
from ceilometer.openstack.common import timeutils
class BaseTestCase(test.BaseTestCase):
def setup_messaging(self, conf, exchange=None):
self.useFixture(oslo.messaging.conffixture.ConfFixture(conf))
conf.set_override("notification_driver", "messaging")
if not exchange:
exchange = 'ceilometer'
conf.set_override("control_exchange", exchange)
# NOTE(sileht): oslo.messaging fake driver uses time.sleep
# for task switch, so we need to monkey_patch it
# and also ensure the correct exchange have been set
eventlet.monkey_patch(time=True)
# NOTE(sileht): Ensure a new oslo.messaging driver is loaded
# between each tests
self.transport = messaging.get_transport("fake://", cache=False)
self.useFixture(mockpatch.Patch(
'ceilometer.messaging.get_transport',
return_value=self.transport))
def assertTimestampEqual(self, first, second, msg=None):
"""Checks that two timestamps are equals.

View File

@ -23,7 +23,6 @@ import oslo.messaging
from stevedore import extension
from ceilometer.event import endpoint as event_endpoint
from ceilometer import messaging
from ceilometer.openstack.common.fixture import config
from ceilometer.storage import models
from ceilometer.tests import base as tests_base
@ -90,10 +89,9 @@ class TestEventEndpoint(tests_base.BaseTestCase):
super(TestEventEndpoint, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
self.CONF([])
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("store_events", True, group="notification")
self.setup_messaging(self.CONF)
self.mock_dispatcher = mock.MagicMock()
self.endpoint = event_endpoint.EventsNotificationEndpoint()

View File

@ -27,12 +27,11 @@ except ImportError:
import webob
REQUEST = webob
from ceilometer import messaging
from ceilometer.objectstore import swift_middleware
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common.fixture.mockpatch import PatchObject
from ceilometer.openstack.common import test
from ceilometer.openstack.common.fixture import mockpatch
from ceilometer import pipeline
from ceilometer.tests import base as tests_base
class FakeApp(object):
@ -49,7 +48,7 @@ class FakeApp(object):
return self.body
class TestSwiftMiddleware(test.BaseTestCase):
class TestSwiftMiddleware(tests_base.BaseTestCase):
class _faux_pipeline_manager(pipeline.PipelineManager):
class _faux_pipeline(object):
@ -72,20 +71,16 @@ class TestSwiftMiddleware(test.BaseTestCase):
def setUp(self):
super(TestSwiftMiddleware, self).setUp()
self.pipeline_manager = self._faux_pipeline_manager()
self.useFixture(PatchObject(pipeline, 'setup_pipeline',
self.useFixture(mockpatch.PatchObject(
pipeline, 'setup_pipeline',
side_effect=self._fake_setup_pipeline))
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF = self.useFixture(config.Config()).conf
self.setup_messaging(self.CONF)
@staticmethod
def start_response(*args):
pass
def test_rpc_setup(self):
swift_middleware.CeilometerMiddleware(FakeApp(), {})
self.assertEqual('ceilometer', self.CONF.control_exchange)
def test_get(self):
app = swift_middleware.CeilometerMiddleware(FakeApp(), {})
req = REQUEST.Request.blank('/1.0/account/container/obj',

View File

@ -28,12 +28,12 @@ from ceilometer import messaging
from ceilometer.openstack.common import context
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import network_utils
from ceilometer.openstack.common import test
from ceilometer.publisher import rpc
from ceilometer import sample
from ceilometer.tests import base as tests_base
class TestPublish(test.BaseTestCase):
class TestPublish(tests_base.BaseTestCase):
test_data = [
sample.Sample(
name='test',
@ -95,8 +95,7 @@ class TestPublish(test.BaseTestCase):
def setUp(self):
super(TestPublish, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.setup_messaging(self.CONF)
self.published = []
def test_published_no_mock(self):
@ -105,7 +104,7 @@ class TestPublish(test.BaseTestCase):
endpoint = mock.MagicMock(['record_metering_data'])
collector = messaging.get_rpc_server(
self.CONF.publisher_rpc.metering_topic, endpoint)
self.transport, self.CONF.publisher_rpc.metering_topic, endpoint)
endpoint.record_metering_data.side_effect = \
lambda *args, **kwds: collector.stop()

View File

@ -46,9 +46,7 @@ class TestCollector(tests_base.BaseTestCase):
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override('metering_secret', 'not-so-secret',
group='publisher')
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
self._setup_messaging('fake://')
self.addCleanup(messaging.cleanup)
self._setup_messaging()
self.counter = sample.Sample(
name='foobar',
@ -87,10 +85,13 @@ class TestCollector(tests_base.BaseTestCase):
def _dummy_thread_group_add_thread(method):
method()
def _setup_messaging(self, url):
messaging.cleanup()
self.CONF.set_override('rpc_backend', '')
messaging.setup(url, optional=True)
def _setup_messaging(self, enabled=True):
if enabled:
self.setup_messaging(self.CONF)
else:
self.useFixture(mockpatch.Patch(
'ceilometer.messaging.get_transport',
return_value=None))
def _setup_fake_dispatcher(self):
plugin = mock.MagicMock()
@ -127,7 +128,7 @@ class TestCollector(tests_base.BaseTestCase):
data=self.counter)
def test_udp_receive_base(self):
self._setup_messaging('')
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
self.counter['source'] = 'mysource'
self.counter['counter_name'] = self.counter['name']
@ -146,7 +147,7 @@ class TestCollector(tests_base.BaseTestCase):
self.counter)
def test_udp_receive_storage_error(self):
self._setup_messaging('')
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
mock_dispatcher.record_metering_data.side_effect = self._raise_error
@ -170,7 +171,7 @@ class TestCollector(tests_base.BaseTestCase):
raise Exception
def test_udp_receive_bad_decoding(self):
self._setup_messaging('')
self._setup_messaging(False)
udp_socket = self._make_fake_socket(self.counter)
with contextlib.nested(
mock.patch('socket.socket', return_value=udp_socket),
@ -183,7 +184,7 @@ class TestCollector(tests_base.BaseTestCase):
@mock.patch.object(collector.CollectorService, 'start_udp')
def test_only_udp(self, udp_start, rpc_start):
"""Check that only UDP is started if messaging transport is unset."""
self._setup_messaging('')
self._setup_messaging(False)
udp_socket = self._make_fake_socket(self.counter)
with mock.patch('socket.socket', return_value=udp_socket):
self.srv.start()
@ -200,7 +201,7 @@ class TestCollector(tests_base.BaseTestCase):
self.assertEqual(0, udp_start.call_count)
def test_udp_receive_valid_encoding(self):
self._setup_messaging('')
self._setup_messaging(False)
mock_dispatcher = self._setup_fake_dispatcher()
self.data_sent = []
with mock.patch('socket.socket',
@ -216,7 +217,7 @@ class TestCollector(tests_base.BaseTestCase):
self.srv.start()
mylog.info.side_effect = lambda *args: self.srv.stop()
client = messaging.get_rpc_client(version='1.0')
client = messaging.get_rpc_client(self.transport, version='1.0')
cclient = client.prepare(topic='metering')
cclient.cast(context.RequestContext(),
'record_metering_data', data=[self.utf8_msg])

View File

@ -16,7 +16,7 @@
# under the License.
"""Tests for Ceilometer notify daemon."""
import eventlet.semaphore
import eventlet
import mock
import oslo.messaging
@ -30,6 +30,8 @@ from ceilometer import notification
from ceilometer.openstack.common import context
from ceilometer.openstack.common import fileutils
from ceilometer.openstack.common.fixture import config
from ceilometer.openstack.common import timeutils
from ceilometer.publisher import test as test_publisher
from ceilometer.tests import base as tests_base
TEST_NOTICE_CTXT = {
@ -89,10 +91,9 @@ class TestNotification(tests_base.BaseTestCase):
def setUp(self):
super(TestNotification, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.CONF.set_override("connection", "log://", group='database')
self.CONF.set_override("store_events", False, group="notification")
self.setup_messaging(self.CONF)
self.srv = notification.NotificationService()
def fake_get_notifications_manager(self, pm):
@ -168,7 +169,7 @@ class TestRealNotification(tests_base.BaseTestCase):
def setUp(self):
super(TestRealNotification, self).setUp()
self.CONF = self.useFixture(config.Config()).conf
self.useFixture(oslo.messaging.conffixture.ConfFixture(self.CONF))
self.setup_messaging(self.CONF, 'nova')
pipeline = yaml.dump([{
'name': 'test_pipeline',
@ -179,44 +180,31 @@ class TestRealNotification(tests_base.BaseTestCase):
}])
self.expected_samples = 2
self.sem = eventlet.semaphore.Semaphore(0)
pipeline_cfg_file = fileutils.write_to_tempfile(content=pipeline,
prefix="pipeline",
suffix="yaml")
self.CONF.set_override("pipeline_cfg_file", pipeline_cfg_file)
self.CONF.set_override("notification_driver", "messaging")
self.CONF.set_override("control_exchange", "nova")
messaging.setup('fake://')
self.addCleanup(messaging.cleanup)
self.srv = notification.NotificationService()
self.publisher = test_publisher.TestPublisher("")
@mock.patch('ceilometer.publisher.test.TestPublisher')
def test_notification_service(self, fake_publisher_cls):
fake_publisher_cls.return_value = self.publisher
self.srv.start()
fake_publisher = fake_publisher_cls.return_value
fake_publisher.publish_samples.side_effect = \
lambda *args: self.sem.release()
notifier = messaging.get_notifier("compute.vagrant-precise")
notifier = messaging.get_notifier(self.transport,
"compute.vagrant-precise")
notifier.info(context.RequestContext(), 'compute.instance.create.end',
TEST_NOTICE_PAYLOAD)
# we should wait all the expected notification listeners finished
# processing the notification
for i in range(self.expected_samples):
self.sem.acquire(timeout=30)
# stop NotificationService
start = timeutils.utcnow()
while timeutils.delta_seconds(start, timeutils.utcnow()) < 600:
if len(self.publisher.samples) >= self.expected_samples:
break
eventlet.sleep(0)
self.srv.stop()
class SamplesMatcher(object):
def __eq__(self, samples):
for s in samples:
if s.resource_id != "9f9d01b9-4a58-4271-9e27-398b21ab20d1":
return False
return True
fake_publisher.publish_samples.assert_has_calls(
[mock.call(mock.ANY, SamplesMatcher())] * self.expected_samples
)
resources = list(set(s.resource_id for s in self.publisher.samples))
self.assertEqual(self.expected_samples, len(self.publisher.samples))
self.assertEqual(["9f9d01b9-4a58-4271-9e27-398b21ab20d1"], resources)