diff --git a/oslo/messaging/conffixture.py b/oslo/messaging/conffixture.py index 7dd06db76..5aafeda41 100644 --- a/oslo/messaging/conffixture.py +++ b/oslo/messaging/conffixture.py @@ -46,6 +46,10 @@ class ConfFixture(fixtures.Fixture): self.conf = conf _import_opts(self.conf, 'oslo.messaging._drivers.impl_rabbit', 'rabbit_opts') + _import_opts(self.conf, + 'oslo.messaging._drivers.impl_qpid', 'qpid_opts') + _import_opts(self.conf, + 'oslo.messaging._drivers.amqp', 'amqp_opts') _import_opts(self.conf, 'oslo.messaging.rpc.client', '_client_opts') _import_opts(self.conf, 'oslo.messaging.transport', '_transport_opts') diff --git a/test-requirements.txt b/test-requirements.txt index 92f41bf37..3a9145e08 100644 --- a/test-requirements.txt +++ b/test-requirements.txt @@ -11,6 +11,9 @@ testscenarios>=0.4 testtools>=0.9.34 oslotest +# for test_qpid +qpid-python + # when we can require tox>= 1.4, this can go into tox.ini: # [testenv:cover] # deps = {[testenv]deps} coverage @@ -19,4 +22,3 @@ coverage>=3.6 # this is required for the docs build jobs sphinx>=1.1.2,<1.2 oslosphinx -qpid-python diff --git a/tests/test_qpid.py b/tests/test_qpid.py index 6c15dedb4..2e508bbac 100644 --- a/tests/test_qpid.py +++ b/tests/test_qpid.py @@ -20,7 +20,6 @@ import time import qpid import testscenarios -from oslo.config import cfg from oslo import messaging from oslo.messaging._drivers import impl_qpid as qpid_driver from tests import utils as test_utils @@ -60,128 +59,11 @@ def _is_qpidd_service_running(): return qpid_running -class TestQpidInvalidTopologyVersion(test_utils.BaseTestCase): - """Unit test cases to test invalid qpid topology version.""" - - scenarios = [ - ('direct', dict(consumer_cls=qpid_driver.DirectConsumer, - publisher_cls=qpid_driver.DirectPublisher)), - ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, - publisher_cls=qpid_driver.TopicPublisher)), - ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, - publisher_cls=qpid_driver.FanoutPublisher)), - ] +class _QpidBaseTestCase(test_utils.BaseTestCase): def setUp(self): - super(TestQpidInvalidTopologyVersion, self).setUp() - - self.qpid_opts = [ - cfg.BoolOpt('amqp_durable_queues', - default=False, - deprecated_name='rabbit_durable_queues', - deprecated_group='DEFAULT', - help='Use durable queues in amqp.'), - cfg.BoolOpt('amqp_auto_delete', - default=False, - help='Auto-delete queues in amqp.'), - cfg.IntOpt('qpid_topology_version', - default=-1, - help='qpid topology version'), - cfg.StrOpt('control_exchange', - default='openstack', - help='AMQP exchange to connect to if using Qpid'), - ] - - self.qpid_conf = cfg.ConfigOpts() - self.qpid_conf.register_opts(self.qpid_opts) - - self.fake_qpid = not _is_qpidd_service_running() - - if self.fake_qpid: - self.session = get_fake_qpid_session() - else: - self.broker = QPID_BROKER - # create connection from the qpid.messaging - self.connection = qpid.messaging.Connection(self.broker) - self.connection.open() - self.session = self.connection.session() - - def tearDown(self): - self.qpid_conf.unregister_opts(self.qpid_opts) - super(TestQpidInvalidTopologyVersion, self).tearDown() - - if self.fake_qpid: - _fake_session.flush_exchanges() - else: - self.connection.close() - - def test_invalid_topology_version(self): - def consumer_callback(msg): - pass - - msgid_or_topic = 'test' - - # not using self.assertRaises because - # 1. qpid driver raises Exception(msg) for invalid topology version - # 2. flake8 - H202 assertRaises Exception too broad - exception_msg = ("Invalid value for qpid_topology_version: %d" % - self.qpid_conf.qpid_topology_version) - recvd_exc_msg = '' - - try: - self.consumer_cls(self.qpid_conf, self.session, msgid_or_topic, - consumer_callback) - except Exception as e: - recvd_exc_msg = e.message - - self.assertEqual(exception_msg, recvd_exc_msg) - - recvd_exc_msg = '' - try: - self.publisher_cls(self.qpid_conf, self.session, msgid_or_topic) - except Exception as e: - recvd_exc_msg = e.message - - self.assertEqual(exception_msg, recvd_exc_msg) - - -class TestQpidDirectConsumerPublisher(test_utils.BaseTestCase): - """Unit test cases to test DirectConsumer and Direct Publisher.""" - - _n_qpid_topology = [ - ('v1', dict(qpid_topology=1)), - ('v2', dict(qpid_topology=2)), - ] - - _n_msgs = [ - ('single', dict(no_msgs=1)), - ('multiple', dict(no_msgs=10)), - ] - - @classmethod - def generate_scenarios(cls): - cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, - cls._n_msgs) - - def setUp(self): - super(TestQpidDirectConsumerPublisher, self).setUp() - - self.qpid_opts = [ - cfg.BoolOpt('amqp_durable_queues', - default=False, - deprecated_name='rabbit_durable_queues', - deprecated_group='DEFAULT', - help='Use durable queues in amqp.'), - cfg.BoolOpt('amqp_auto_delete', - default=False, - help='Auto-delete queues in amqp.'), - cfg.IntOpt('qpid_topology_version', - default=self.qpid_topology, - help='qpid topology version'), - ] - - self.qpid_conf = cfg.ConfigOpts() - self.qpid_conf.register_opts(self.qpid_opts) + super(_QpidBaseTestCase, self).setUp() + self.messaging_conf.transport_driver = 'qpid' self.fake_qpid = not _is_qpidd_service_running() if self.fake_qpid: @@ -206,16 +88,86 @@ class TestQpidDirectConsumerPublisher(test_utils.BaseTestCase): # the actual received messages self._expected = [] self._messages = [] + self.initialized = True def tearDown(self): - self.qpid_conf.unregister_opts(self.qpid_opts) - super(TestQpidDirectConsumerPublisher, self).tearDown() + super(_QpidBaseTestCase, self).tearDown() - if self.fake_qpid: - _fake_session.flush_exchanges() - else: - self.con_receive.close() - self.con_send.close() + if self.initialized: + if self.fake_qpid: + _fake_session.flush_exchanges() + else: + self.con_receive.close() + self.con_send.close() + + +class TestQpidInvalidTopologyVersion(_QpidBaseTestCase): + """Unit test cases to test invalid qpid topology version.""" + + scenarios = [ + ('direct', dict(consumer_cls=qpid_driver.DirectConsumer, + publisher_cls=qpid_driver.DirectPublisher)), + ('topic', dict(consumer_cls=qpid_driver.TopicConsumer, + publisher_cls=qpid_driver.TopicPublisher)), + ('fanout', dict(consumer_cls=qpid_driver.FanoutConsumer, + publisher_cls=qpid_driver.FanoutPublisher)), + ] + + def setUp(self): + super(TestQpidInvalidTopologyVersion, self).setUp() + self.config(qpid_topology_version=-1) + + def test_invalid_topology_version(self): + def consumer_callback(msg): + pass + + msgid_or_topic = 'test' + + # not using self.assertRaises because + # 1. qpid driver raises Exception(msg) for invalid topology version + # 2. flake8 - H202 assertRaises Exception too broad + exception_msg = ("Invalid value for qpid_topology_version: %d" % + self.messaging_conf.conf.qpid_topology_version) + recvd_exc_msg = '' + + try: + self.consumer_cls(self.messaging_conf.conf, + self.session_receive, + msgid_or_topic, + consumer_callback) + except Exception as e: + recvd_exc_msg = e.message + + self.assertEqual(exception_msg, recvd_exc_msg) + + recvd_exc_msg = '' + try: + self.publisher_cls(self.messaging_conf.conf, + self.session_send, + msgid_or_topic) + except Exception as e: + recvd_exc_msg = e.message + + self.assertEqual(exception_msg, recvd_exc_msg) + + +class TestQpidDirectConsumerPublisher(_QpidBaseTestCase): + """Unit test cases to test DirectConsumer and Direct Publisher.""" + + _n_qpid_topology = [ + ('v1', dict(qpid_topology=1)), + ('v2', dict(qpid_topology=2)), + ] + + _n_msgs = [ + ('single', dict(no_msgs=1)), + ('multiple', dict(no_msgs=10)), + ] + + @classmethod + def generate_scenarios(cls): + cls.scenarios = testscenarios.multiply_scenarios(cls._n_qpid_topology, + cls._n_msgs) def consumer_callback(self, msg): # This function will be called by the DirectConsumer @@ -232,11 +184,11 @@ class TestQpidDirectConsumerPublisher(test_utils.BaseTestCase): self.msgid = str(random.randint(1, 100)) # create a DirectConsumer and DirectPublisher class objects - self.dir_cons = qpid_driver.DirectConsumer(self.qpid_conf, + self.dir_cons = qpid_driver.DirectConsumer(self.messaging_conf.conf, self.session_receive, self.msgid, self.consumer_callback) - self.dir_pub = qpid_driver.DirectPublisher(self.qpid_conf, + self.dir_pub = qpid_driver.DirectPublisher(self.messaging_conf.conf, self.session_send, self.msgid) @@ -267,7 +219,7 @@ class TestQpidDirectConsumerPublisher(test_utils.BaseTestCase): TestQpidDirectConsumerPublisher.generate_scenarios() -class TestQpidTopicAndFanout(test_utils.BaseTestCase): +class TestQpidTopicAndFanout(_QpidBaseTestCase): """Unit Test cases to test TopicConsumer and TopicPublisher classes of the qpid driver and FanoutConsumer and FanoutPublisher classes @@ -312,48 +264,13 @@ class TestQpidTopicAndFanout(test_utils.BaseTestCase): cls._exchange_class) def setUp(self): - self.qpid_opts = [ - cfg.BoolOpt('amqp_durable_queues', - default=False, - deprecated_name='rabbit_durable_queues', - deprecated_group='DEFAULT', - help='Use durable queues in amqp.'), - cfg.BoolOpt('amqp_auto_delete', - default=False, - help='Auto-delete queues in amqp.'), - cfg.IntOpt('qpid_topology_version', - default=self.qpid_topology, - help='qpid topology version'), - cfg.StrOpt('control_exchange', - default='openstack', - help='AMQP exchange to connect to if using Qpid'), - ] - super(TestQpidTopicAndFanout, self).setUp() - self.qpid_conf = cfg.ConfigOpts() - self.qpid_conf.register_opts(self.qpid_opts) - - self._fake_qpid = not _is_qpidd_service_running() - - if self._fake_qpid: - self.session_receive = get_fake_qpid_session() - self.session_send = get_fake_qpid_session() - else: - self.broker = QPID_BROKER - # connection for the Consumer. - self.con_receive = qpid.messaging.Connection(self.broker) - self.con_receive.open() - # session to receive the messages - self.session_receive = self.con_receive.session() - - # connection for sending the message - self.con_send = qpid.messaging.Connection(self.broker) - self.con_send.open() - # session to send the messages - self.session_send = self.con_send.session() # to store the expected messages and the # actual received messages + # + # NOTE(dhellmann): These are dicts, where the base class uses + # lists. self._expected = {} self._messages = {} @@ -363,15 +280,6 @@ class TestQpidTopicAndFanout(test_utils.BaseTestCase): self._sender_threads = [] self._receiver_threads = [] - def tearDown(self): - self.qpid_conf.unregister_opts(self.qpid_opts) - super(TestQpidTopicAndFanout, self).tearDown() - if self._fake_qpid: - _fake_session.flush_exchanges() - else: - self.con_receive.close() - self.con_send.close() - def consumer_callback(self, msg): """callback function called by the ConsumerBase class of qpid driver. @@ -432,7 +340,7 @@ class TestQpidTopicAndFanout(test_utils.BaseTestCase): def test_qpid_topic_and_fanout(self): for receiver_id in range(self.no_receivers): - consumer = self.consumer_cls(self.qpid_conf, + consumer = self.consumer_cls(self.messaging_conf.conf, self.session_receive, self.receive_topic, self.consumer_callback) @@ -444,7 +352,7 @@ class TestQpidTopicAndFanout(test_utils.BaseTestCase): self._receiver_threads.append(thread) for sender_id in range(self.no_senders): - publisher = self.publisher_cls(self.qpid_conf, + publisher = self.publisher_cls(self.messaging_conf.conf, self.session_send, self.topic) self._senders.append(publisher)