diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index 1ae65af0d..73512742f 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -466,7 +466,6 @@ class Connection(object): pools = {} def __init__(self, conf, url, purpose): - self.consumers = [] self.conf = conf self.driver_conf = self.conf.oslo_messaging_rabbit self.max_retries = self.driver_conf.rabbit_max_retries @@ -525,7 +524,8 @@ class Connection(object): self._initial_pid = os.getpid() - self.do_consume = True + self._consumers = [] + self._new_consumers = [] self._consume_loop_stopped = False self.channel = None @@ -705,7 +705,7 @@ class Connection(object): a new channel, we use it the reconfigure our consumers. """ self._set_current_channel(new_channel) - for consumer in self.consumers: + for consumer in self._consumers: consumer.declare(new_channel) LOG.info(_LI('Reconnected to AMQP server on ' @@ -780,7 +780,7 @@ class Connection(object): except recoverable_errors: self._set_current_channel(None) self.ensure_connection() - self.consumers = [] + self._consumers = [] def _heartbeat_supported_and_enabled(self): if self.driver_conf.heartbeat_timeout_threshold <= 0: @@ -859,7 +859,8 @@ class Connection(object): def _declare_consumer(): consumer.declare(self.channel) - self.consumers.append(consumer) + self._consumers.append(consumer) + self._new_consumers.append(consumer) return consumer with self._connection_lock: @@ -877,7 +878,7 @@ class Connection(object): raise rpc_common.Timeout() def _recoverable_error_callback(exc): - self.do_consume = True + self._new_consumers = self._consumers timer.check_return(_raise_timeout, exc) def _error_callback(exc): @@ -886,10 +887,11 @@ class Connection(object): exc) def _consume(): - if self.do_consume: - for tag, consumer in enumerate(self.consumers): - consumer.consume(tag=tag) - self.do_consume = False + if self._new_consumers: + for tag, consumer in enumerate(self._consumers): + if consumer in self._new_consumers: + consumer.consume(tag=tag) + self._new_consumers = [] poll_timeout = (self._poll_timeout if timeout is None else min(timeout, self._poll_timeout)) diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 54ada6f8d..c346019ee 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -179,12 +179,10 @@ class TestRabbitConsume(test_utils.BaseTestCase): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) - deadline = time.time() + 3 + deadline = time.time() + 6 with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - # FIXME(sileht): the deadline should be 6 seconds, not 3 - # consuming with no consumer have never worked - # https://bugs.launchpad.net/oslo.messaging/+bug/1450342 - # conn.consume(timeout=3) + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -192,10 +190,8 @@ class TestRabbitConsume(test_utils.BaseTestCase): conn.declare_fanout_consumer("notif.info", lambda msg: True) with mock.patch('kombu.connection.Connection.drain_events', side_effect=IOError): - try: - conn.consume(timeout=3) - except driver_common.Timeout: - pass + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=3) self.assertEqual(0, int(deadline - time.time())) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index f27fe31dc..515e49bec 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -87,12 +87,10 @@ class TestRabbitConsume(test_utils.BaseTestCase): def test_consume_timeout(self): transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) - deadline = time.time() + 3 + deadline = time.time() + 6 with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - # FIXME(sileht): the deadline should be 6 seconds, not 3 - # consuming with no consumer have never worked - # https://bugs.launchpad.net/oslo.messaging/+bug/1450342 - # conn.consume(timeout=3) + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=3) # kombu memory transport doesn't really raise error # so just simulate a real driver behavior @@ -100,10 +98,8 @@ class TestRabbitConsume(test_utils.BaseTestCase): conn.declare_fanout_consumer("notif.info", lambda msg: True) with mock.patch('kombu.connection.Connection.drain_events', side_effect=IOError): - try: - conn.consume(timeout=3) - except driver_common.Timeout: - pass + self.assertRaises(driver_common.Timeout, + conn.consume, timeout=3) self.assertEqual(0, int(deadline - time.time()))