Refactor base interfaces

1) Add MessageHandler base interface for on_incoming_callback replacement
2) Move message_handler parameter form Listener's __init__() to start()
3) Remove wait method from listener

Change-Id: Id414446817e3d2ff67b815074d042a9ce637ec24
This commit is contained in:
Dmitriy Ukhlov 2016-04-15 19:57:16 +03:00
parent 0b286754e2
commit 6db00c77b0
17 changed files with 132 additions and 154 deletions

View File

@ -473,7 +473,7 @@ class AMQPDriverBase(base.BaseDriver):
return self._send(target, ctxt, message, return self._send(target, ctxt, message,
envelope=(version == 2.0), notify=True, retry=retry) envelope=(version == 2.0), notify=True, retry=retry)
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN) conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn) listener = AMQPListener(self, conn)
@ -487,12 +487,11 @@ class AMQPDriverBase(base.BaseDriver):
callback=listener) callback=listener)
conn.declare_fanout_consumer(target.topic, listener) conn.declare_fanout_consumer(target.topic, listener)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
conn = self._get_connection(rpc_common.PURPOSE_LISTEN) conn = self._get_connection(rpc_common.PURPOSE_LISTEN)
listener = AMQPListener(self, conn) listener = AMQPListener(self, conn)
@ -501,8 +500,8 @@ class AMQPDriverBase(base.BaseDriver):
exchange_name=self._get_exchange(target), exchange_name=self._get_exchange(target),
topic='%s.%s' % (target.topic, priority), topic='%s.%s' % (target.topic, priority),
callback=listener, queue_name=pool) callback=listener, queue_name=pool)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def cleanup(self): def cleanup(self):
if self._connection_pool: if self._connection_pool:

View File

@ -81,11 +81,11 @@ class IncomingMessage(object):
self.message = message self.message = message
def acknowledge(self): def acknowledge(self):
"Acknowledge the message." """Acknowledge the message."""
@abc.abstractmethod @abc.abstractmethod
def requeue(self): def requeue(self):
"Requeue the message." """Requeue the message."""
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
@ -128,44 +128,39 @@ class PollStyleListener(object):
@six.add_metaclass(abc.ABCMeta) @six.add_metaclass(abc.ABCMeta)
class Listener(object): class Listener(object):
def __init__(self, on_incoming_callback, batch_size, batch_timeout, def __init__(self, batch_size, batch_timeout,
prefetch_size=-1): prefetch_size=-1):
"""Init Listener """Init Listener
:param on_incoming_callback: callback function to be executed when
listener received messages. Messages should be processed and
acked/nacked by callback
:param batch_size: desired number of messages passed to :param batch_size: desired number of messages passed to
single on_incoming_callback call single on_incoming_callback notification
:param batch_timeout: defines how long should we wait for batch_size :param batch_timeout: defines how long should we wait for batch_size
messages if we already have some messages waiting for processing messages if we already have some messages waiting for processing
:param prefetch_size: defines how many massages we want to prefetch :param prefetch_size: defines how many massages we want to prefetch
from backend (depend on driver type) by single request from backend (depend on driver type) by single request
""" """
self.on_incoming_callback = on_incoming_callback self.on_incoming_callback = None
self.batch_timeout = batch_timeout self.batch_timeout = batch_timeout
self.prefetch_size = prefetch_size self.prefetch_size = prefetch_size
if prefetch_size > 0: if prefetch_size > 0:
batch_size = min(batch_size, prefetch_size) batch_size = min(batch_size, prefetch_size)
self.batch_size = batch_size self.batch_size = batch_size
@abc.abstractmethod def start(self, on_incoming_callback):
def start(self): """Start listener.
"""Stop listener. Start the listener message polling
Stop the listener message polling
"""
@abc.abstractmethod :param on_incoming_callback: callback function to be executed when
def wait(self): listener received messages. Messages should be processed and
"""Wait listener. acked/nacked by callback
Wait for processing remained input after listener Stop
""" """
self.on_incoming_callback = on_incoming_callback
@abc.abstractmethod
def stop(self): def stop(self):
"""Stop listener. """Stop listener.
Stop the listener message polling Stop the listener message polling
""" """
self.on_incoming_callback = None
@abc.abstractmethod @abc.abstractmethod
def cleanup(self): def cleanup(self):
@ -177,21 +172,24 @@ class Listener(object):
class PollStyleListenerAdapter(Listener): class PollStyleListenerAdapter(Listener):
def __init__(self, poll_style_listener, on_incoming_callback, batch_size, def __init__(self, poll_style_listener, batch_size, batch_timeout):
batch_timeout):
super(PollStyleListenerAdapter, self).__init__( super(PollStyleListenerAdapter, self).__init__(
on_incoming_callback, batch_size, batch_timeout, batch_size, batch_timeout, poll_style_listener.prefetch_size
poll_style_listener.prefetch_size
) )
self._poll_style_listener = poll_style_listener self._poll_style_listener = poll_style_listener
self._listen_thread = threading.Thread(target=self._runner) self._listen_thread = threading.Thread(target=self._runner)
self._listen_thread.daemon = True self._listen_thread.daemon = True
self._started = False self._started = False
def start(self): def start(self, on_incoming_callback):
"""Start listener. """Start listener.
Start the listener message polling Start the listener message polling
:param on_incoming_callback: callback function to be executed when
listener received messages. Messages should be processed and
acked/nacked by callback
""" """
super(PollStyleListenerAdapter, self).start(on_incoming_callback)
self._started = True self._started = True
self._listen_thread.start() self._listen_thread.start()
@ -220,9 +218,8 @@ class PollStyleListenerAdapter(Listener):
""" """
self._started = False self._started = False
self._poll_style_listener.stop() self._poll_style_listener.stop()
def wait(self):
self._listen_thread.join() self._listen_thread.join()
super(PollStyleListenerAdapter, self).stop()
def cleanup(self): def cleanup(self):
"""Cleanup listener. """Cleanup listener.
@ -259,13 +256,12 @@ class BaseDriver(object):
"""Send a notification message to the given target.""" """Send a notification message to the given target."""
@abc.abstractmethod @abc.abstractmethod
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
"""Construct a Listener for the given target.""" """Construct a Listener for the given target."""
@abc.abstractmethod @abc.abstractmethod
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
"""Construct a notification Listener for the given list of """Construct a notification Listener for the given list of
tuple of (target, priority). tuple of (target, priority).
""" """

View File

@ -222,7 +222,7 @@ class FakeDriver(base.BaseDriver):
# transport always works # transport always works
self._send(target, ctxt, message) self._send(target, ctxt, message)
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
exchange = target.exchange or self._default_exchange exchange = target.exchange or self._default_exchange
listener = FakeListener(self._exchange_manager, listener = FakeListener(self._exchange_manager,
[oslo_messaging.Target( [oslo_messaging.Target(
@ -232,12 +232,11 @@ class FakeDriver(base.BaseDriver):
oslo_messaging.Target( oslo_messaging.Target(
topic=target.topic, topic=target.topic,
exchange=exchange)]) exchange=exchange)])
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
targets = [ targets = [
oslo_messaging.Target( oslo_messaging.Target(
topic='%s.%s' % (target.topic, priority), topic='%s.%s' % (target.topic, priority),
@ -245,8 +244,8 @@ class FakeDriver(base.BaseDriver):
for target, priority in targets_and_priorities] for target, priority in targets_and_priorities]
listener = FakeListener(self._exchange_manager, targets, pool) listener = FakeListener(self._exchange_manager, targets, pool)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def cleanup(self): def cleanup(self):
pass pass

View File

@ -340,8 +340,7 @@ class KafkaDriver(base.BaseDriver):
'The RPC implementation for Kafka is not implemented') 'The RPC implementation for Kafka is not implemented')
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
"""Listen to a specified list of targets on Kafka brokers """Listen to a specified list of targets on Kafka brokers
:param targets_and_priorities: List of pairs (target, priority) :param targets_and_priorities: List of pairs (target, priority)
@ -360,8 +359,8 @@ class KafkaDriver(base.BaseDriver):
conn.declare_topic_consumer(topics, pool) conn.declare_topic_consumer(topics, pool)
listener = KafkaListener(conn) listener = KafkaListener(conn)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def _get_connection(self, purpose): def _get_connection(self, purpose):
return driver_common.ConnectionContext(self.connection_pool, purpose) return driver_common.ConnectionContext(self.connection_pool, purpose)

View File

@ -334,17 +334,16 @@ class PikaDriver(base.BaseDriver):
retrier=retrier retrier=retrier
) )
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
listener = pika_drv_poller.RpcServicePikaPoller( listener = pika_drv_poller.RpcServicePikaPoller(
self._pika_engine, target, self._pika_engine, target,
prefetch_count=self._pika_engine.rpc_listener_prefetch_count prefetch_count=self._pika_engine.rpc_listener_prefetch_count
) )
listener.start() listener.start()
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback,
batch_size, batch_timeout): batch_size, batch_timeout):
listener = pika_drv_poller.NotificationPikaPoller( listener = pika_drv_poller.NotificationPikaPoller(
self._pika_engine, targets_and_priorities, self._pika_engine, targets_and_priorities,
@ -354,8 +353,8 @@ class PikaDriver(base.BaseDriver):
queue_name=pool queue_name=pool
) )
listener.start() listener.start()
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def cleanup(self): def cleanup(self):
self._reply_listener.cleanup() self._reply_listener.cleanup()

View File

@ -254,7 +254,7 @@ class ZmqDriver(base.BaseDriver):
client = self.notifier.get() client = self.notifier.get()
client.send_notify(target, ctxt, message, version, retry) client.send_notify(target, ctxt, message, version, retry)
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
"""Listen to a specified target on a server side """Listen to a specified target on a server side
:param target: Message destination target :param target: Message destination target
@ -262,12 +262,11 @@ class ZmqDriver(base.BaseDriver):
""" """
listener = zmq_server.ZmqServer(self, self.conf, self.matchmaker, listener = zmq_server.ZmqServer(self, self.conf, self.matchmaker,
target) target)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
"""Listen to a specified list of targets on a server side """Listen to a specified list of targets on a server side
:param targets_and_priorities: List of pairs (target, priority) :param targets_and_priorities: List of pairs (target, priority)
@ -277,8 +276,8 @@ class ZmqDriver(base.BaseDriver):
""" """
listener = zmq_server.ZmqNotificationServer( listener = zmq_server.ZmqNotificationServer(
self, self.conf, self.matchmaker, targets_and_priorities) self, self.conf, self.matchmaker, targets_and_priorities)
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def cleanup(self): def cleanup(self):
"""Cleanup all driver's connections finally """Cleanup all driver's connections finally

View File

@ -267,19 +267,18 @@ class ProtonDriver(base.BaseDriver):
return self.send(target, ctxt, message, envelope=(version == 2.0)) return self.send(target, ctxt, message, envelope=(version == 2.0))
@_ensure_connect_called @_ensure_connect_called
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
"""Construct a Listener for the given target.""" """Construct a Listener for the given target."""
LOG.debug("Listen to %s", target) LOG.debug("Listen to %s", target)
listener = ProtonListener(self) listener = ProtonListener(self)
self._ctrl.add_task(drivertasks.ListenTask(target, listener)) self._ctrl.add_task(drivertasks.ListenTask(target, listener))
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
return listener return listener
@_ensure_connect_called @_ensure_connect_called
def listen_for_notifications(self, targets_and_priorities, pool, def listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
LOG.debug("Listen for notifications %s", targets_and_priorities) LOG.debug("Listen for notifications %s", targets_and_priorities)
if pool: if pool:
raise NotImplementedError('"pool" not implemented by ' raise NotImplementedError('"pool" not implemented by '
@ -289,8 +288,8 @@ class ProtonDriver(base.BaseDriver):
topic = '%s.%s' % (target.topic, priority) topic = '%s.%s' % (target.topic, priority)
t = messaging_target.Target(topic=topic) t = messaging_target.Target(topic=topic)
self._ctrl.add_task(drivertasks.ListenTask(t, listener, True)) self._ctrl.add_task(drivertasks.ListenTask(t, listener, True))
return base.PollStyleListenerAdapter(listener, on_incoming_callback, return base.PollStyleListenerAdapter(listener, batch_size,
batch_size, batch_timeout) batch_timeout)
def cleanup(self): def cleanup(self):
"""Release all resources.""" """Release all resources."""

View File

@ -113,11 +113,12 @@ from oslo_messaging import server as msg_server
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
class NotificationServer(msg_server.MessageHandlingServer): class NotificationServerBase(msg_server.MessageHandlingServer):
def __init__(self, transport, targets, dispatcher, executor='blocking', def __init__(self, transport, targets, dispatcher, executor='blocking',
allow_requeue=True, pool=None): allow_requeue=True, pool=None, batch_size=1,
super(NotificationServer, self).__init__(transport, dispatcher, batch_timeout=None):
executor) super(NotificationServerBase, self).__init__(transport, dispatcher,
executor)
self._allow_requeue = allow_requeue self._allow_requeue = allow_requeue
self._pool = pool self._pool = pool
self.targets = targets self.targets = targets
@ -126,46 +127,42 @@ class NotificationServer(msg_server.MessageHandlingServer):
self.dispatcher.supported_priorities) self.dispatcher.supported_priorities)
) )
self._batch_size = batch_size
self._batch_timeout = batch_timeout
def _create_listener(self): def _create_listener(self):
return self.transport._listen_for_notifications( return self.transport._listen_for_notifications(
self._targets_priorities, self._pool, self._targets_priorities, self._pool, self._batch_size,
lambda incoming: self._on_incoming(incoming[0]), 1, None self._batch_timeout
)
class NotificationServer(NotificationServerBase):
def __init__(self, transport, targets, dispatcher, executor='blocking',
allow_requeue=True, pool=None):
super(NotificationServer, self).__init__(
transport, targets, dispatcher, executor, allow_requeue, pool, 1,
None
) )
def _process_incoming(self, incoming): def _process_incoming(self, incoming):
res = notify_dispatcher.NotificationResult.REQUEUE message = incoming[0]
try: try:
res = self.dispatcher.dispatch(incoming) res = self.dispatcher.dispatch(message)
except Exception: except Exception:
LOG.error(_LE('Exception during message handling'), exc_info=True) LOG.error(_LE('Exception during message handling'), exc_info=True)
try: try:
if (res == notify_dispatcher.NotificationResult.REQUEUE and if (res == notify_dispatcher.NotificationResult.REQUEUE and
self._allow_requeue): self._allow_requeue):
incoming.requeue() message.requeue()
else: else:
incoming.acknowledge() message.acknowledge()
except Exception: except Exception:
LOG.error(_LE("Fail to ack/requeue message"), exc_info=True) LOG.error(_LE("Fail to ack/requeue message"), exc_info=True)
class BatchNotificationServer(NotificationServer): class BatchNotificationServer(NotificationServerBase):
def __init__(self, transport, targets, dispatcher, executor='blocking',
allow_requeue=True, pool=None, batch_size=1,
batch_timeout=None):
super(BatchNotificationServer, self).__init__(
transport=transport, targets=targets, dispatcher=dispatcher,
executor=executor, allow_requeue=allow_requeue, pool=pool
)
self._batch_size = batch_size
self._batch_timeout = batch_timeout
def _create_listener(self):
return self.transport._listen_for_notifications(
self._targets_priorities, self._pool, self._on_incoming,
self._batch_size, self._batch_timeout,
)
def _process_incoming(self, incoming): def _process_incoming(self, incoming):
try: try:

View File

@ -118,19 +118,17 @@ class RPCServer(msg_server.MessageHandlingServer):
self._target = target self._target = target
def _create_listener(self): def _create_listener(self):
return self.transport._listen( return self.transport._listen(self._target, 1, None)
self._target,
lambda incoming: self._on_incoming(incoming[0]), 1, None
)
def _process_incoming(self, incoming): def _process_incoming(self, incoming):
incoming.acknowledge() message = incoming[0]
message.acknowledge()
try: try:
res = self.dispatcher.dispatch(incoming) res = self.dispatcher.dispatch(message)
except rpc_dispatcher.ExpectedException as e: except rpc_dispatcher.ExpectedException as e:
LOG.debug(u'Expected exception during message handling (%s)', LOG.debug(u'Expected exception during message handling (%s)',
e.exc_info[1]) e.exc_info[1])
incoming.reply(failure=e.exc_info) message.reply(failure=e.exc_info)
except Exception as e: except Exception as e:
# current sys.exc_info() content can be overriden # current sys.exc_info() content can be overriden
# by another exception raise by a log handler during # by another exception raise by a log handler during
@ -138,7 +136,7 @@ class RPCServer(msg_server.MessageHandlingServer):
exc_info = sys.exc_info() exc_info = sys.exc_info()
try: try:
LOG.exception(_LE('Exception during message handling: %s'), e) LOG.exception(_LE('Exception during message handling: %s'), e)
incoming.reply(failure=exc_info) message.reply(failure=exc_info)
finally: finally:
# NOTE(dhellmann): Remove circular object reference # NOTE(dhellmann): Remove circular object reference
# between the current stack frame and the traceback in # between the current stack frame and the traceback in
@ -146,9 +144,9 @@ class RPCServer(msg_server.MessageHandlingServer):
del exc_info del exc_info
else: else:
try: try:
incoming.reply(res) message.reply(res)
except Exception: except Exception:
LOG.Exception("Can not send reply for message %s", incoming) LOG.Exception("Can not send reply for message %s", message)
def get_rpc_server(transport, target, endpoints, def get_rpc_server(transport, target, endpoints,

View File

@ -308,8 +308,8 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
def __init__(self, transport, dispatcher, executor='blocking'): def __init__(self, transport, dispatcher, executor='blocking'):
"""Construct a message handling server. """Construct a message handling server.
The dispatcher parameter is a callable which is invoked with context The dispatcher parameter is a DispatcherBase instance which is used
and message dictionaries each time a message is received. for routing request to endpoint for processing.
The executor parameter controls how incoming messages will be received The executor parameter controls how incoming messages will be received
and dispatched. By default, the most simple executor is used - the and dispatched. By default, the most simple executor is used - the
@ -317,8 +317,9 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
:param transport: the messaging transport :param transport: the messaging transport
:type transport: Transport :type transport: Transport
:param dispatcher: a callable which is invoked for each method :param dispatcher: has a dispatch() method which is invoked for each
:type dispatcher: callable incoming request
:type dispatcher: DispatcherBase
:param executor: name of message executor - for example :param executor: name of message executor - for example
'eventlet', 'blocking' 'eventlet', 'blocking'
:type executor: str :type executor: str
@ -347,7 +348,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
super(MessageHandlingServer, self).__init__() super(MessageHandlingServer, self).__init__()
def _on_incoming(self, incoming): def _on_incoming(self, incoming):
"""Hanles on_incoming event """Handles on_incoming event
:param incoming: incoming request. :param incoming: incoming request.
""" """
@ -411,7 +412,7 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
except driver_base.TransportDriverError as ex: except driver_base.TransportDriverError as ex:
raise ServerListenError(self.target, ex) raise ServerListenError(self.target, ex)
return self.listener.start self.listener.start(self._on_incoming)
@ordered(after='start') @ordered(after='start')
def stop(self): def stop(self):
@ -436,7 +437,6 @@ class MessageHandlingServer(service.ServiceBase, _OrderedTaskRunner):
Once it's finished, the underlying driver resources associated to this Once it's finished, the underlying driver resources associated to this
server are released (like closing useless network connections). server are released (like closing useless network connections).
""" """
self.listener.wait()
self._work_executor.shutdown(wait=True) self._work_executor.shutdown(wait=True)
# Close listener connection after processing all messages # Close listener connection after processing all messages

View File

@ -204,7 +204,7 @@ class TestKafkaListener(test_utils.BaseTestCase):
fake_target = oslo_messaging.Target(topic='fake_topic') fake_target = oslo_messaging.Target(topic='fake_topic')
fake_targets_and_priorities = [(fake_target, 'info')] fake_targets_and_priorities = [(fake_target, 'info')]
self.driver.listen_for_notifications(fake_targets_and_priorities, None, self.driver.listen_for_notifications(fake_targets_and_priorities, None,
None, None, None) None, None)
self.assertEqual(1, len(fake_consumer.mock_calls)) self.assertEqual(1, len(fake_consumer.mock_calls))
@mock.patch.object(kafka_driver.Connection, '_ensure_connection') @mock.patch.object(kafka_driver.Connection, '_ensure_connection')
@ -222,7 +222,7 @@ class TestKafkaListener(test_utils.BaseTestCase):
exchange="test3"), 'error'), exchange="test3"), 'error'),
] ]
self.driver.listen_for_notifications(fake_targets_and_priorities, None, self.driver.listen_for_notifications(fake_targets_and_priorities, None,
None, None, None) None, None)
self.assertEqual(1, len(fake_consumer.mock_calls)) self.assertEqual(1, len(fake_consumer.mock_calls))
fake_consumer.assert_called_once_with(set(['fake_topic.error', fake_consumer.assert_called_once_with(set(['fake_topic.error',
'fake_topic.info']), 'fake_topic.info']),
@ -234,8 +234,7 @@ class TestKafkaListener(test_utils.BaseTestCase):
fake_target = oslo_messaging.Target(topic='fake_topic') fake_target = oslo_messaging.Target(topic='fake_topic')
fake_targets_and_priorities = [(fake_target, 'info')] fake_targets_and_priorities = [(fake_target, 'info')]
listener = self.driver.listen_for_notifications( listener = self.driver.listen_for_notifications(
fake_targets_and_priorities, None, None, None, fake_targets_and_priorities, None, None, None)._poll_style_listener
None)._poll_style_listener
listener.conn.consume = mock.MagicMock() listener.conn.consume = mock.MagicMock()
listener.conn.consume.return_value = ( listener.conn.consume.return_value = (
iter([kafka.common.KafkaMessage( iter([kafka.common.KafkaMessage(
@ -267,8 +266,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase):
targets_and_priorities = [(target, 'fake_info')] targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications( listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None, targets_and_priorities, None, None, None)._poll_style_listener
None)._poll_style_listener
fake_context = {"fake_context_key": "fake_context_value"} fake_context = {"fake_context_key": "fake_context_value"}
fake_message = {"fake_message_key": "fake_message_value"} fake_message = {"fake_message_key": "fake_message_value"}
self.driver.send_notification( self.driver.send_notification(
@ -285,8 +283,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase):
targets_and_priorities = [(target, 'fake_info')] targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications( listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None, targets_and_priorities, None, None, None)._poll_style_listener
None)._poll_style_listener
fake_context = {"fake_context_key": "fake_context_value"} fake_context = {"fake_context_key": "fake_context_value"}
fake_message = {"fake_message_key": "fake_message_value"} fake_message = {"fake_message_key": "fake_message_value"}
self.driver.send_notification( self.driver.send_notification(
@ -304,8 +301,7 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase):
targets_and_priorities = [(target, 'fake_info')] targets_and_priorities = [(target, 'fake_info')]
listener = self.driver.listen_for_notifications( listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None, targets_and_priorities, None, None, None)._poll_style_listener
None)._poll_style_listener
deadline = time.time() + 3 deadline = time.time() + 3
received_message = listener.poll(batch_timeout=3) received_message = listener.poll(batch_timeout=3)

View File

@ -435,7 +435,7 @@ class TestSendReceive(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target, None, None, None)._poll_style_listener listener = driver.listen(target, None, None)._poll_style_listener
senders = [] senders = []
replies = [] replies = []
@ -525,7 +525,7 @@ class TestPollAsync(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
target = oslo_messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target, None, None, None)._poll_style_listener listener = driver.listen(target, None, None)._poll_style_listener
received = listener.poll(timeout=0.050) received = listener.poll(timeout=0.050)
self.assertEqual([], received) self.assertEqual([], received)
@ -541,7 +541,7 @@ class TestRacyWaitForReply(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic='testtopic') target = oslo_messaging.Target(topic='testtopic')
listener = driver.listen(target, None, None, None)._poll_style_listener listener = driver.listen(target, None, None)._poll_style_listener
senders = [] senders = []
replies = [] replies = []
msgs = [] msgs = []
@ -877,7 +877,7 @@ class TestReplyWireFormat(test_utils.BaseTestCase):
server=self.server, server=self.server,
fanout=self.fanout) fanout=self.fanout)
listener = driver.listen(target, None, None, None)._poll_style_listener listener = driver.listen(target, None, None)._poll_style_listener
connection, producer = _create_producer(target) connection, producer = _create_producer(target)
self.addCleanup(connection.release) self.addCleanup(connection.release)

View File

@ -43,7 +43,7 @@ class ZmqTestPortsRange(zmq_common.ZmqBaseTestCase):
for i in range(10): for i in range(10):
try: try:
target = oslo_messaging.Target(topic='testtopic_' + str(i)) target = oslo_messaging.Target(topic='testtopic_' + str(i))
new_listener = self.driver.listen(target, None, None, None) new_listener = self.driver.listen(target, None, None)
listeners.append(new_listener) listeners.append(new_listener)
except zmq_socket.ZmqPortRangeExceededException: except zmq_socket.ZmqPortRangeExceededException:
pass pass

View File

@ -39,14 +39,13 @@ class TestServerListener(object):
self.message = None self.message = None
def listen(self, target): def listen(self, target):
self.listener = self.driver.listen(target, None, None, self.listener = self.driver.listen(target, None,
None)._poll_style_listener None)._poll_style_listener
self.executor.execute() self.executor.execute()
def listen_notifications(self, targets_and_priorities): def listen_notifications(self, targets_and_priorities):
self.listener = self.driver.listen_for_notifications( self.listener = self.driver.listen_for_notifications(
targets_and_priorities, None, None, None, targets_and_priorities, None, None, None)._poll_style_listener
None)._poll_style_listener
self.executor.execute() self.executor.execute()
def _run(self): def _run(self):

View File

@ -131,7 +131,7 @@ class TestAmqpSend(_AmqpBrokerTestCase):
"""Verify unused listener can cleanly shutdown.""" """Verify unused listener can cleanly shutdown."""
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = driver.listen(target, None, None, None)._poll_style_listener listener = driver.listen(target, None, None)._poll_style_listener
self.assertIsInstance(listener, amqp_driver.ProtonListener) self.assertIsInstance(listener, amqp_driver.ProtonListener)
driver.cleanup() driver.cleanup()
@ -139,7 +139,7 @@ class TestAmqpSend(_AmqpBrokerTestCase):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
rc = driver.send(target, {"context": True}, rc = driver.send(target, {"context": True},
{"msg": "value"}, wait_for_reply=False) {"msg": "value"}, wait_for_reply=False)
self.assertIsNone(rc) self.assertIsNone(rc)
@ -152,10 +152,10 @@ class TestAmqpSend(_AmqpBrokerTestCase):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target1 = oslo_messaging.Target(topic="test-topic", exchange="e1") target1 = oslo_messaging.Target(topic="test-topic", exchange="e1")
listener1 = _ListenerThread( listener1 = _ListenerThread(
driver.listen(target1, None, None, None)._poll_style_listener, 1) driver.listen(target1, None, None)._poll_style_listener, 1)
target2 = oslo_messaging.Target(topic="test-topic", exchange="e2") target2 = oslo_messaging.Target(topic="test-topic", exchange="e2")
listener2 = _ListenerThread( listener2 = _ListenerThread(
driver.listen(target2, None, None, None)._poll_style_listener, 1) driver.listen(target2, None, None)._poll_style_listener, 1)
rc = driver.send(target1, {"context": "whatever"}, rc = driver.send(target1, {"context": "whatever"},
{"method": "echo", "id": "e1"}, {"method": "echo", "id": "e1"},
@ -182,10 +182,10 @@ class TestAmqpSend(_AmqpBrokerTestCase):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target1 = oslo_messaging.Target(topic="test-topic", server="server1") target1 = oslo_messaging.Target(topic="test-topic", server="server1")
listener1 = _ListenerThread( listener1 = _ListenerThread(
driver.listen(target1, None, None, None)._poll_style_listener, 4) driver.listen(target1, None, None)._poll_style_listener, 4)
target2 = oslo_messaging.Target(topic="test-topic", server="server2") target2 = oslo_messaging.Target(topic="test-topic", server="server2")
listener2 = _ListenerThread( listener2 = _ListenerThread(
driver.listen(target2, None, None, None)._poll_style_listener, 3) driver.listen(target2, None, None)._poll_style_listener, 3)
shared_target = oslo_messaging.Target(topic="test-topic") shared_target = oslo_messaging.Target(topic="test-topic")
fanout_target = oslo_messaging.Target(topic="test-topic", fanout_target = oslo_messaging.Target(topic="test-topic",
@ -256,7 +256,7 @@ class TestAmqpSend(_AmqpBrokerTestCase):
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
# the listener will drop this message: # the listener will drop this message:
try: try:
@ -283,7 +283,7 @@ class TestAmqpNotification(_AmqpBrokerTestCase):
(oslo_messaging.Target(topic="topic-1"), 'error'), (oslo_messaging.Target(topic="topic-1"), 'error'),
(oslo_messaging.Target(topic="topic-2"), 'debug')] (oslo_messaging.Target(topic="topic-2"), 'debug')]
nl = driver.listen_for_notifications( nl = driver.listen_for_notifications(
notifications, None, None, None, None)._poll_style_listener notifications, None, None, None)._poll_style_listener
# send one for each support version: # send one for each support version:
msg_count = len(notifications) * 2 msg_count = len(notifications) * 2
@ -345,7 +345,7 @@ class TestAuthentication(test_utils.BaseTestCase):
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
rc = driver.send(target, {"context": True}, rc = driver.send(target, {"context": True},
{"method": "echo"}, wait_for_reply=True) {"method": "echo"}, wait_for_reply=True)
self.assertIsNotNone(rc) self.assertIsNotNone(rc)
@ -364,7 +364,7 @@ class TestAuthentication(test_utils.BaseTestCase):
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
_ListenerThread( _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
self.assertRaises(oslo_messaging.MessagingTimeout, self.assertRaises(oslo_messaging.MessagingTimeout,
driver.send, driver.send,
target, {"context": True}, target, {"context": True},
@ -436,7 +436,7 @@ mech_list: ${mechs}
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
rc = driver.send(target, {"context": True}, rc = driver.send(target, {"context": True},
{"method": "echo"}, wait_for_reply=True) {"method": "echo"}, wait_for_reply=True)
self.assertIsNotNone(rc) self.assertIsNotNone(rc)
@ -455,7 +455,7 @@ mech_list: ${mechs}
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
_ListenerThread( _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
self.assertRaises(oslo_messaging.MessagingTimeout, self.assertRaises(oslo_messaging.MessagingTimeout,
driver.send, driver.send,
target, {"context": True}, target, {"context": True},
@ -476,7 +476,7 @@ mech_list: ${mechs}
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
_ListenerThread( _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
self.assertRaises(oslo_messaging.MessagingTimeout, self.assertRaises(oslo_messaging.MessagingTimeout,
driver.send, driver.send,
target, {"context": True}, target, {"context": True},
@ -497,7 +497,7 @@ mech_list: ${mechs}
driver = amqp_driver.ProtonDriver(self.conf, url) driver = amqp_driver.ProtonDriver(self.conf, url)
target = oslo_messaging.Target(topic="test-topic") target = oslo_messaging.Target(topic="test-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 1) driver.listen(target, None, None)._poll_style_listener, 1)
rc = driver.send(target, {"context": True}, rc = driver.send(target, {"context": True},
{"method": "echo"}, wait_for_reply=True) {"method": "echo"}, wait_for_reply=True)
self.assertIsNotNone(rc) self.assertIsNotNone(rc)
@ -533,7 +533,7 @@ class TestFailover(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic="my-topic") target = oslo_messaging.Target(topic="my-topic")
listener = _ListenerThread( listener = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 2) driver.listen(target, None, None)._poll_style_listener, 2)
# wait for listener links to come up # wait for listener links to come up
# 4 == 3 links per listener + 1 for the global reply queue # 4 == 3 links per listener + 1 for the global reply queue
@ -620,9 +620,9 @@ class TestFailover(test_utils.BaseTestCase):
target = oslo_messaging.Target(topic="my-topic") target = oslo_messaging.Target(topic="my-topic")
bcast = oslo_messaging.Target(topic="my-topic", fanout=True) bcast = oslo_messaging.Target(topic="my-topic", fanout=True)
listener1 = _ListenerThread( listener1 = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 2) driver.listen(target, None, None)._poll_style_listener, 2)
listener2 = _ListenerThread( listener2 = _ListenerThread(
driver.listen(target, None, None, None)._poll_style_listener, 2) driver.listen(target, None, None)._poll_style_listener, 2)
# wait for 7 sending links to become active on the broker. # wait for 7 sending links to become active on the broker.
# 7 = 3 links per Listener + 1 global reply link # 7 = 3 links per Listener + 1 global reply link

View File

@ -38,7 +38,7 @@ class _FakeDriver(object):
def send_notification(self, *args, **kwargs): def send_notification(self, *args, **kwargs):
pass pass
def listen(self, target, on_incoming_callback, batch_size, batch_timeout): def listen(self, target, batch_size, batch_timeout):
pass pass
@ -314,10 +314,10 @@ class TestTransportMethodArgs(test_utils.BaseTestCase):
t = transport.Transport(_FakeDriver(cfg.CONF)) t = transport.Transport(_FakeDriver(cfg.CONF))
self.mox.StubOutWithMock(t._driver, 'listen') self.mox.StubOutWithMock(t._driver, 'listen')
t._driver.listen(self._target, None, 1, None) t._driver.listen(self._target, 1, None)
self.mox.ReplayAll() self.mox.ReplayAll()
t._listen(self._target, None, 1, None) t._listen(self._target, 1, None)
class TestTransportUrlCustomisation(test_utils.BaseTestCase): class TestTransportUrlCustomisation(test_utils.BaseTestCase):

View File

@ -97,25 +97,23 @@ class Transport(object):
self._driver.send_notification(target, ctxt, message, version, self._driver.send_notification(target, ctxt, message, version,
retry=retry) retry=retry)
def _listen(self, target, on_incoming_callback, batch_size, batch_timeout): def _listen(self, target, batch_size, batch_timeout):
if not (target.topic and target.server): if not (target.topic and target.server):
raise exceptions.InvalidTarget('A server\'s target must have ' raise exceptions.InvalidTarget('A server\'s target must have '
'topic and server names specified', 'topic and server names specified',
target) target)
return self._driver.listen(target, on_incoming_callback, batch_size, return self._driver.listen(target, batch_size,
batch_timeout) batch_timeout)
def _listen_for_notifications(self, targets_and_priorities, pool, def _listen_for_notifications(self, targets_and_priorities, pool,
on_incoming_callback, batch_size, batch_size, batch_timeout):
batch_timeout):
for target, priority in targets_and_priorities: for target, priority in targets_and_priorities:
if not target.topic: if not target.topic:
raise exceptions.InvalidTarget('A target must have ' raise exceptions.InvalidTarget('A target must have '
'topic specified', 'topic specified',
target) target)
return self._driver.listen_for_notifications( return self._driver.listen_for_notifications(
targets_and_priorities, pool, on_incoming_callback, batch_size, targets_and_priorities, pool, batch_size, batch_timeout
batch_timeout
) )
def cleanup(self): def cleanup(self):