diff --git a/oslo_messaging/tests/drivers/test_impl_zmq.py b/oslo_messaging/tests/drivers/test_impl_zmq.py index 6d287fab6..173a5dfe4 100644 --- a/oslo_messaging/tests/drivers/test_impl_zmq.py +++ b/oslo_messaging/tests/drivers/test_impl_zmq.py @@ -24,8 +24,6 @@ import testtools import oslo_messaging from oslo_messaging.tests import utils as test_utils -# NOTE(jamespage) the zmq driver implementation is currently tied -# to eventlet so we have to monkey_patch to support testing # eventlet is not yet py3 compatible, so skip if not installed eventlet = importutils.try_import('eventlet') @@ -44,6 +42,35 @@ def get_unused_port(): return port +class ZmqBaseTestCase(test_utils.BaseTestCase): + """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(ZmqBaseTestCase, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = oslo_messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + class TestConfZmqDriverLoad(test_utils.BaseTestCase): @testtools.skipIf(impl_zmq is None, "zmq not available") @@ -67,32 +94,7 @@ class stopRpc(object): self.attrs['driver'].cleanup() -class TestZmqBasics(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqBasics, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqBasics(ZmqBaseTestCase): def test_start_stop_listener(self): target = oslo_messaging.Target(topic='testtopic') @@ -277,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase): msg.requeue() -class TestZmqConnection(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqConnection, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer(self, mock_reactor): @@ -379,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase): self.assertTrue(conn.reactor.consume_in_thread.called) -class TestZmqListener(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqListener, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqListener(ZmqBaseTestCase): def test_zmqlistener_no_msg(self): listener = impl_zmq.ZmqListener(self.driver) @@ -424,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase): self.assertEqual(resp.message, msg) -class TestZmqDriver(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqDriver, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = oslo_messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqDriver(ZmqBaseTestCase): @mock.patch('oslo_messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo_messaging._drivers.impl_zmq._multi_send', autospec=True) diff --git a/tests/drivers/test_impl_zmq.py b/tests/drivers/test_impl_zmq.py index 30277c93a..091ebe87e 100644 --- a/tests/drivers/test_impl_zmq.py +++ b/tests/drivers/test_impl_zmq.py @@ -42,6 +42,35 @@ def get_unused_port(): return port +class ZmqBaseTestCase(test_utils.BaseTestCase): + """Base test case for all ZMQ tests that make use of the ZMQ Proxy""" + + @testtools.skipIf(impl_zmq is None, "zmq not available") + def setUp(self): + super(ZmqBaseTestCase, self).setUp() + self.messaging_conf.transport_driver = 'zmq' + # Get driver + transport = messaging.get_transport(self.conf) + self.driver = transport._driver + + # Set config values + self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path + kwargs = {'rpc_zmq_bind_address': '127.0.0.1', + 'rpc_zmq_host': '127.0.0.1', + 'rpc_response_timeout': 5, + 'rpc_zmq_port': get_unused_port(), + 'rpc_zmq_ipc_dir': self.internal_ipc_dir} + self.config(**kwargs) + + # Start RPC + LOG.info("Running internal zmq receiver.") + self.reactor = impl_zmq.ZmqProxy(self.conf) + self.reactor.consume_in_thread() + + self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') + self.addCleanup(stopRpc(self.__dict__)) + + class TestConfZmqDriverLoad(test_utils.BaseTestCase): @testtools.skipIf(impl_zmq is None, "zmq not available") @@ -65,32 +94,7 @@ class stopRpc(object): self.attrs['driver'].cleanup() -class TestZmqBasics(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqBasics, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqBasics(ZmqBaseTestCase): def test_start_stop_listener(self): target = messaging.Target(topic='testtopic') @@ -275,32 +279,7 @@ class TestZmqIncomingMessage(test_utils.BaseTestCase): msg.requeue() -class TestZmqConnection(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqConnection, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqConnection(ZmqBaseTestCase): @mock.patch('oslo.messaging._drivers.impl_zmq.ZmqReactor', autospec=True) def test_zmqconnection_create_consumer(self, mock_reactor): @@ -377,32 +356,7 @@ class TestZmqConnection(test_utils.BaseTestCase): self.assertTrue(conn.reactor.consume_in_thread.called) -class TestZmqListener(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqListener, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqListener(ZmqBaseTestCase): def test_zmqlistener_no_msg(self): listener = impl_zmq.ZmqListener(self.driver) @@ -422,32 +376,7 @@ class TestZmqListener(test_utils.BaseTestCase): self.assertEqual(resp.message, msg) -class TestZmqDriver(test_utils.BaseTestCase): - - @testtools.skipIf(impl_zmq is None, "zmq not available") - def setUp(self): - super(TestZmqDriver, self).setUp() - self.messaging_conf.transport_driver = 'zmq' - # Get driver - transport = messaging.get_transport(self.conf) - self.driver = transport._driver - - # Set config values - self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path - kwargs = {'rpc_zmq_bind_address': '127.0.0.1', - 'rpc_zmq_host': '127.0.0.1', - 'rpc_response_timeout': 5, - 'rpc_zmq_port': get_unused_port(), - 'rpc_zmq_ipc_dir': self.internal_ipc_dir} - self.config(**kwargs) - - # Start RPC - LOG.info("Running internal zmq receiver.") - self.reactor = impl_zmq.ZmqProxy(self.conf) - self.reactor.consume_in_thread() - - self.matchmaker = impl_zmq._get_matchmaker(host='127.0.0.1') - self.addCleanup(stopRpc(self.__dict__)) +class TestZmqDriver(ZmqBaseTestCase): @mock.patch('oslo.messaging._drivers.impl_zmq._cast', autospec=True) @mock.patch('oslo.messaging._drivers.impl_zmq._multi_send', autospec=True)