diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index bbc4b7828..ceeb07810 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -119,7 +119,7 @@ class AMQPListener(base.Listener): if self.incoming: return self.incoming.pop(0) try: - self.conn.consume(limit=1, timeout=timeout) + self.conn.consume(timeout=timeout) except rpc_common.Timeout: return None @@ -194,7 +194,7 @@ class ReplyWaiter(object): def poll(self): while not self._thread_exit_event.is_set(): try: - self.conn.consume(limit=1) + self.conn.consume() except Exception: LOG.exception("Failed to process incoming message, " "retrying...") diff --git a/oslo_messaging/_drivers/impl_qpid.py b/oslo_messaging/_drivers/impl_qpid.py index 487952f9f..3a0066826 100644 --- a/oslo_messaging/_drivers/impl_qpid.py +++ b/oslo_messaging/_drivers/impl_qpid.py @@ -652,8 +652,8 @@ class Connection(object): return self.ensure(_connect_error, _declare_consumer) - def iterconsume(self, limit=None, timeout=None): - """Return an iterator that will consume from all queues/consumers.""" + def consume(self, timeout=None): + """Consume from all queues/consumers.""" timer = rpc_common.DecayingTimer(duration=timeout) timer.start() @@ -675,7 +675,7 @@ class Connection(object): while True: if self._consume_loop_stopped: self._consume_loop_stopped = False - raise StopIteration + return try: nxt_receiver = self.session.next_receiver( @@ -692,10 +692,7 @@ class Connection(object): LOG.exception(_("Error processing message. " "Skipping it.")) - for iteration in itertools.count(0): - if limit and iteration >= limit: - raise StopIteration - yield self.ensure(_error_callback, _consume) + self.ensure(_error_callback, _consume) def publisher_send(self, cls, topic, msg, retry=None, **kwargs): """Send to a publisher based on the publisher class.""" @@ -761,15 +758,6 @@ class Connection(object): self.publisher_send(NotifyPublisher, topic=topic, msg=msg, exchange_name=exchange_name, retry=retry) - def consume(self, limit=None, timeout=None): - """Consume from all queues/consumers.""" - it = self.iterconsume(limit=limit, timeout=timeout) - while True: - try: - six.next(it) - except StopIteration: - return - def stop_consuming(self): self._consume_loop_stopped = True diff --git a/oslo_messaging/_drivers/impl_rabbit.py b/oslo_messaging/_drivers/impl_rabbit.py index ee2da428e..1b68a73e4 100644 --- a/oslo_messaging/_drivers/impl_rabbit.py +++ b/oslo_messaging/_drivers/impl_rabbit.py @@ -225,7 +225,7 @@ class ConsumerBase(object): def consume(self, *args, **kwargs): """Actually declare the consumer on the amqp channel. This will start the flow of messages from the queue. Using the - Connection.iterconsume() iterator will process the messages, + Connection.consume() will process the messages, calling the appropriate callback. If a callback is specified in kwargs, use that. Otherwise, @@ -988,11 +988,8 @@ class Connection(object): return self.ensure(_declare_consumer, error_callback=_connect_error) - def iterconsume(self, limit=None, timeout=None): - """Return an iterator that will consume from all queues/consumers. - - NOTE(sileht): Must be called within the connection lock - """ + def consume(self, timeout=None): + """Consume from all queues/consumers.""" timer = rpc_common.DecayingTimer(duration=timeout) timer.start() @@ -1023,25 +1020,22 @@ class Connection(object): else min(timeout, self._poll_timeout)) while True: if self._consume_loop_stopped: - self._consume_loop_stopped = False - raise StopIteration + return if self._heartbeat_supported_and_enabled(): self.connection.heartbeat_check( rate=self.driver_conf.heartbeat_rate) try: - return self.connection.drain_events(timeout=poll_timeout) + self.connection.drain_events(timeout=poll_timeout) + return except socket.timeout as exc: poll_timeout = timer.check_return( _raise_timeout, exc, maximum=self._poll_timeout) - for iteration in itertools.count(0): - if limit and iteration >= limit: - raise StopIteration - yield self.ensure( - _consume, - recoverable_error_callback=_recoverable_error_callback, - error_callback=_error_callback) + with self._connection_lock: + self.ensure(_consume, + recoverable_error_callback=_recoverable_error_callback, + error_callback=_error_callback) @staticmethod def _log_publisher_send_error(topic, exc): @@ -1137,16 +1131,6 @@ class Connection(object): self.publisher_send(NotifyPublisher, topic, msg, timeout=None, exchange_name=exchange_name, retry=retry, **kwargs) - def consume(self, limit=None, timeout=None): - """Consume from all queues/consumers.""" - with self._connection_lock: - it = self.iterconsume(limit=limit, timeout=timeout) - while True: - try: - six.next(it) - except StopIteration: - return - def stop_consuming(self): self._consume_loop_stopped = True diff --git a/oslo_messaging/tests/drivers/test_impl_rabbit.py b/oslo_messaging/tests/drivers/test_impl_rabbit.py index 5805c5e71..54ada6f8d 100644 --- a/oslo_messaging/tests/drivers/test_impl_rabbit.py +++ b/oslo_messaging/tests/drivers/test_impl_rabbit.py @@ -173,15 +173,19 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase): heartbeat=0, failover_strategy="shuffle") -class TestRabbitIterconsume(test_utils.BaseTestCase): +class TestRabbitConsume(test_utils.BaseTestCase): - def test_iterconsume_timeout(self): + def test_consume_timeout(self): transport = oslo_messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - conn.iterconsume(timeout=3) + # FIXME(sileht): the deadline should be 6 seconds, not 3 + # consuming with no consumer have never worked + # https://bugs.launchpad.net/oslo.messaging/+bug/1450342 + # conn.consume(timeout=3) + # kombu memory transport doesn't really raise error # so just simulate a real driver behavior conn.connection.connection.recoverable_channel_errors = (IOError,) diff --git a/tests/drivers/test_impl_rabbit.py b/tests/drivers/test_impl_rabbit.py index f534a4ae0..f27fe31dc 100644 --- a/tests/drivers/test_impl_rabbit.py +++ b/tests/drivers/test_impl_rabbit.py @@ -82,14 +82,18 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase): self.assertEqual(self.url, url) -class TestRabbitIterconsume(test_utils.BaseTestCase): +class TestRabbitConsume(test_utils.BaseTestCase): - def test_iterconsume_timeout(self): + def test_consume_timeout(self): transport = messaging.get_transport(self.conf, 'kombu+memory:////') self.addCleanup(transport.cleanup) deadline = time.time() + 3 with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: - conn.iterconsume(timeout=3) + # FIXME(sileht): the deadline should be 6 seconds, not 3 + # consuming with no consumer have never worked + # https://bugs.launchpad.net/oslo.messaging/+bug/1450342 + # conn.consume(timeout=3) + # kombu memory transport doesn't really raise error # so just simulate a real driver behavior conn.connection.connection.recoverable_channel_errors = (IOError,)