rabbit/qpid: simplify the consumer loop

The consumer loop is over engineered, it returns unused return,
iterconsume creates an iterator directly consumed by 'consume' without
special handling, and in some case kombu error callback are called when
the iterator is stopped and log useless error.

And in reality the consumer is always called when limit=1.

This change simplifies that, by removing the loop and removes all
returns stuffs.

Closes bug: #1450336

Change-Id: Ia2cb52c8577b29e74d4d2b0ed0b535102f2d55c7
This commit is contained in:
Mehdi Abaakouk 2015-04-30 08:56:20 +02:00
parent 5b9fb69802
commit 6c91066c72
5 changed files with 30 additions and 50 deletions

View File

@ -119,7 +119,7 @@ class AMQPListener(base.Listener):
if self.incoming: if self.incoming:
return self.incoming.pop(0) return self.incoming.pop(0)
try: try:
self.conn.consume(limit=1, timeout=timeout) self.conn.consume(timeout=timeout)
except rpc_common.Timeout: except rpc_common.Timeout:
return None return None
@ -194,7 +194,7 @@ class ReplyWaiter(object):
def poll(self): def poll(self):
while not self._thread_exit_event.is_set(): while not self._thread_exit_event.is_set():
try: try:
self.conn.consume(limit=1) self.conn.consume()
except Exception: except Exception:
LOG.exception("Failed to process incoming message, " LOG.exception("Failed to process incoming message, "
"retrying...") "retrying...")

View File

@ -652,8 +652,8 @@ class Connection(object):
return self.ensure(_connect_error, _declare_consumer) return self.ensure(_connect_error, _declare_consumer)
def iterconsume(self, limit=None, timeout=None): def consume(self, timeout=None):
"""Return an iterator that will consume from all queues/consumers.""" """Consume from all queues/consumers."""
timer = rpc_common.DecayingTimer(duration=timeout) timer = rpc_common.DecayingTimer(duration=timeout)
timer.start() timer.start()
@ -675,7 +675,7 @@ class Connection(object):
while True: while True:
if self._consume_loop_stopped: if self._consume_loop_stopped:
self._consume_loop_stopped = False self._consume_loop_stopped = False
raise StopIteration return
try: try:
nxt_receiver = self.session.next_receiver( nxt_receiver = self.session.next_receiver(
@ -692,10 +692,7 @@ class Connection(object):
LOG.exception(_("Error processing message. " LOG.exception(_("Error processing message. "
"Skipping it.")) "Skipping it."))
for iteration in itertools.count(0): self.ensure(_error_callback, _consume)
if limit and iteration >= limit:
raise StopIteration
yield self.ensure(_error_callback, _consume)
def publisher_send(self, cls, topic, msg, retry=None, **kwargs): def publisher_send(self, cls, topic, msg, retry=None, **kwargs):
"""Send to a publisher based on the publisher class.""" """Send to a publisher based on the publisher class."""
@ -761,15 +758,6 @@ class Connection(object):
self.publisher_send(NotifyPublisher, topic=topic, msg=msg, self.publisher_send(NotifyPublisher, topic=topic, msg=msg,
exchange_name=exchange_name, retry=retry) exchange_name=exchange_name, retry=retry)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
it = self.iterconsume(limit=limit, timeout=timeout)
while True:
try:
six.next(it)
except StopIteration:
return
def stop_consuming(self): def stop_consuming(self):
self._consume_loop_stopped = True self._consume_loop_stopped = True

View File

@ -225,7 +225,7 @@ class ConsumerBase(object):
def consume(self, *args, **kwargs): def consume(self, *args, **kwargs):
"""Actually declare the consumer on the amqp channel. This will """Actually declare the consumer on the amqp channel. This will
start the flow of messages from the queue. Using the start the flow of messages from the queue. Using the
Connection.iterconsume() iterator will process the messages, Connection.consume() will process the messages,
calling the appropriate callback. calling the appropriate callback.
If a callback is specified in kwargs, use that. Otherwise, If a callback is specified in kwargs, use that. Otherwise,
@ -988,11 +988,8 @@ class Connection(object):
return self.ensure(_declare_consumer, return self.ensure(_declare_consumer,
error_callback=_connect_error) error_callback=_connect_error)
def iterconsume(self, limit=None, timeout=None): def consume(self, timeout=None):
"""Return an iterator that will consume from all queues/consumers. """Consume from all queues/consumers."""
NOTE(sileht): Must be called within the connection lock
"""
timer = rpc_common.DecayingTimer(duration=timeout) timer = rpc_common.DecayingTimer(duration=timeout)
timer.start() timer.start()
@ -1023,25 +1020,22 @@ class Connection(object):
else min(timeout, self._poll_timeout)) else min(timeout, self._poll_timeout))
while True: while True:
if self._consume_loop_stopped: if self._consume_loop_stopped:
self._consume_loop_stopped = False return
raise StopIteration
if self._heartbeat_supported_and_enabled(): if self._heartbeat_supported_and_enabled():
self.connection.heartbeat_check( self.connection.heartbeat_check(
rate=self.driver_conf.heartbeat_rate) rate=self.driver_conf.heartbeat_rate)
try: try:
return self.connection.drain_events(timeout=poll_timeout) self.connection.drain_events(timeout=poll_timeout)
return
except socket.timeout as exc: except socket.timeout as exc:
poll_timeout = timer.check_return( poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self._poll_timeout) _raise_timeout, exc, maximum=self._poll_timeout)
for iteration in itertools.count(0): with self._connection_lock:
if limit and iteration >= limit: self.ensure(_consume,
raise StopIteration recoverable_error_callback=_recoverable_error_callback,
yield self.ensure( error_callback=_error_callback)
_consume,
recoverable_error_callback=_recoverable_error_callback,
error_callback=_error_callback)
@staticmethod @staticmethod
def _log_publisher_send_error(topic, exc): def _log_publisher_send_error(topic, exc):
@ -1137,16 +1131,6 @@ class Connection(object):
self.publisher_send(NotifyPublisher, topic, msg, timeout=None, self.publisher_send(NotifyPublisher, topic, msg, timeout=None,
exchange_name=exchange_name, retry=retry, **kwargs) exchange_name=exchange_name, retry=retry, **kwargs)
def consume(self, limit=None, timeout=None):
"""Consume from all queues/consumers."""
with self._connection_lock:
it = self.iterconsume(limit=limit, timeout=timeout)
while True:
try:
six.next(it)
except StopIteration:
return
def stop_consuming(self): def stop_consuming(self):
self._consume_loop_stopped = True self._consume_loop_stopped = True

View File

@ -173,15 +173,19 @@ class TestRabbitDriverLoadSSL(test_utils.BaseTestCase):
heartbeat=0, failover_strategy="shuffle") heartbeat=0, failover_strategy="shuffle")
class TestRabbitIterconsume(test_utils.BaseTestCase): class TestRabbitConsume(test_utils.BaseTestCase):
def test_iterconsume_timeout(self): def test_consume_timeout(self):
transport = oslo_messaging.get_transport(self.conf, transport = oslo_messaging.get_transport(self.conf,
'kombu+memory:////') 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
deadline = time.time() + 3 deadline = time.time() + 3
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3) # FIXME(sileht): the deadline should be 6 seconds, not 3
# consuming with no consumer have never worked
# https://bugs.launchpad.net/oslo.messaging/+bug/1450342
# conn.consume(timeout=3)
# kombu memory transport doesn't really raise error # kombu memory transport doesn't really raise error
# so just simulate a real driver behavior # so just simulate a real driver behavior
conn.connection.connection.recoverable_channel_errors = (IOError,) conn.connection.connection.recoverable_channel_errors = (IOError,)

View File

@ -82,14 +82,18 @@ class TestRabbitDriverLoad(test_utils.BaseTestCase):
self.assertEqual(self.url, url) self.assertEqual(self.url, url)
class TestRabbitIterconsume(test_utils.BaseTestCase): class TestRabbitConsume(test_utils.BaseTestCase):
def test_iterconsume_timeout(self): def test_consume_timeout(self):
transport = messaging.get_transport(self.conf, 'kombu+memory:////') transport = messaging.get_transport(self.conf, 'kombu+memory:////')
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
deadline = time.time() + 3 deadline = time.time() + 3
with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn: with transport._driver._get_connection(amqp.PURPOSE_LISTEN) as conn:
conn.iterconsume(timeout=3) # FIXME(sileht): the deadline should be 6 seconds, not 3
# consuming with no consumer have never worked
# https://bugs.launchpad.net/oslo.messaging/+bug/1450342
# conn.consume(timeout=3)
# kombu memory transport doesn't really raise error # kombu memory transport doesn't really raise error
# so just simulate a real driver behavior # so just simulate a real driver behavior
conn.connection.connection.recoverable_channel_errors = (IOError,) conn.connection.connection.recoverable_channel_errors = (IOError,)