diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index e91c3c306..871afc22a 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -459,7 +459,7 @@ class Replies(pyngus.ReceiverEventHandler): routed to the proper incoming queue using the correlation-id header in the message. """ - def __init__(self, connection, on_ready, on_down): + def __init__(self, connection, on_ready, on_down, capacity): self._correlation = {} # map of correlation-id to response queue self._on_ready = on_ready self._on_down = on_down @@ -469,13 +469,13 @@ class Replies(pyngus.ReceiverEventHandler): event_handler=self, name=rname) - # capacity determines the maximum number of reply messages this link - # can receive. As messages are received and credit is consumed, this - # driver will 'top up' the credit back to max capacity. This number - # should be large enough to avoid needlessly flow-controlling the - # replies. - self.capacity = 100 # TODO(kgiusti) guesstimate - make configurable - self._credit = 0 + # capacity determines the maximum number of reply messages this link is + # willing to receive. As messages are received and capacity is + # consumed, this driver will 'top up' the capacity back to max + # capacity. This number should be large enough to avoid needlessly + # flow-controlling the replies. + self._capacity = capacity + self._capacity_low = (capacity + 1) / 2 self._receiver.open() def detach(self): @@ -526,7 +526,7 @@ class Replies(pyngus.ReceiverEventHandler): messages. """ LOG.debug("Replies link active src=%s", self._receiver.source_address) - self._update_credit() + receiver_link.add_capacity(self._capacity) self._on_ready() def receiver_remote_closed(self, receiver, pn_condition): @@ -551,26 +551,20 @@ class Replies(pyngus.ReceiverEventHandler): """This is a Pyngus callback, invoked by Pyngus when a new message arrives on this receiver link from the peer. """ - self._credit = self._credit - 1 - self._update_credit() - key = message.correlation_id - if key in self._correlation: - LOG.debug("Received response for msg id=%s", key) + LOG.debug("Received response for msg id=%s", key) + try: self._correlation[key](message) # cleanup (only need one response per request) del self._correlation[key] receiver.message_accepted(handle) - else: + except KeyError: LOG.warning(_LW("Can't find receiver for response msg id=%s, " "dropping!"), key) receiver.message_modified(handle, True, True, None) - - def _update_credit(self): # ensure we have enough credit - if self._credit < self.capacity / 2: - self._receiver.add_capacity(self.capacity - self._credit) - self._credit = self.capacity + if receiver.capacity <= self._capacity_low: + receiver.add_capacity(self._capacity - receiver.capacity) class Server(pyngus.ReceiverEventHandler): @@ -578,11 +572,12 @@ class Server(pyngus.ReceiverEventHandler): from a given target. Messages arriving on the links are placed on the 'incoming' queue. """ - def __init__(self, target, incoming, scheduler, delay): + def __init__(self, target, incoming, scheduler, delay, capacity): self._target = target self._incoming = incoming self._addresses = [] - self._capacity = 500 # credit per link + self._capacity = capacity # credit per each link + self._capacity_low = (capacity + 1) / 2 self._receivers = [] self._scheduler = scheduler self._delay = delay # for link re-attach @@ -660,7 +655,7 @@ class Server(pyngus.ReceiverEventHandler): receiver.message_released(handle) else: receiver.message_accepted(handle) - if receiver.capacity < self._capacity / 2: + if receiver.capacity <= self._capacity_low: receiver.add_capacity(self._capacity - receiver.capacity) else: LOG.debug("Can't find receiver for settlement") @@ -676,11 +671,6 @@ class Server(pyngus.ReceiverEventHandler): event_handler=self, name=name, properties=props) - - # TODO(kgiusti) Hardcoding credit here is sub-optimal. A better - # approach would monitor for a back-up of inbound messages to be - # processed by the consuming application and backpressure the - # sender based on configured thresholds. r.add_capacity(self._capacity) r.open() return r @@ -701,8 +691,9 @@ class Server(pyngus.ReceiverEventHandler): class RPCServer(Server): """Subscribes to RPC addresses""" - def __init__(self, target, incoming, scheduler, delay): - super(RPCServer, self).__init__(target, incoming, scheduler, delay) + def __init__(self, target, incoming, scheduler, delay, capacity): + super(RPCServer, self).__init__(target, incoming, scheduler, delay, + capacity) def attach(self, connection, addresser): # Generate the AMQP 1.0 addresses for the base class @@ -717,9 +708,9 @@ class RPCServer(Server): class NotificationServer(Server): """Subscribes to Notification addresses""" - def __init__(self, target, incoming, scheduler, delay): + def __init__(self, target, incoming, scheduler, delay, capacity): super(NotificationServer, self).__init__(target, incoming, scheduler, - delay) + delay, capacity) def attach(self, connection, addresser): # Generate the AMQP 1.0 addresses for the base class @@ -844,6 +835,10 @@ class Controller(pyngus.ConnectionEventHandler): # prevent queuing up multiple requests to run _process_tasks() self._process_tasks_scheduled = False self._process_tasks_lock = threading.Lock() + # credit levels for incoming links + self._reply_credit = _opts.reply_link_credit + self._rpc_credit = _opts.rpc_server_credit + self._notify_credit = _opts.notify_server_credit def connect(self): """Connect to the messaging service.""" @@ -902,13 +897,15 @@ class Controller(pyngus.ConnectionEventHandler): server = NotificationServer(subscribe_task._target, subscribe_task._in_queue, self.processor, - self.link_retry_delay) + self.link_retry_delay, + self._notify_credit) else: t = "RPC" server = RPCServer(subscribe_task._target, subscribe_task._in_queue, self.processor, - self.link_retry_delay) + self.link_retry_delay, + self._rpc_credit) LOG.debug("Subscribing to %(type)s target %(target)s", {'type': t, 'target': subscribe_task._target}) @@ -1064,7 +1061,8 @@ class Controller(pyngus.ConnectionEventHandler): self.addresser) self.reply_link = Replies(self._socket_connection.connection, self._reply_link_ready, - self._reply_link_down) + self._reply_link_down, + self._reply_credit) self._delay = 1 def connection_closed(self, connection): diff --git a/oslo_messaging/_drivers/amqp1_driver/opts.py b/oslo_messaging/_drivers/amqp1_driver/opts.py index cfc23002d..c24b48fd5 100644 --- a/oslo_messaging/_drivers/amqp1_driver/opts.py +++ b/oslo_messaging/_drivers/amqp1_driver/opts.py @@ -215,5 +215,22 @@ amqp1_opts = [ "Target.exchange if set\n" "else default_rpc_exchange if set\n" "else control_exchange if set\n" - "else 'rpc'") + "else 'rpc'"), + + # Message Credit Levels + + cfg.IntOpt('reply_link_credit', + default=200, + min=1, + help='Window size for incoming RPC Reply messages.'), + + cfg.IntOpt('rpc_server_credit', + default=100, + min=1, + help='Window size for incoming RPC Request messages'), + + cfg.IntOpt('notify_server_credit', + default=100, + min=1, + help='Window size for incoming Notification messages') ] diff --git a/oslo_messaging/tests/drivers/test_amqp_driver.py b/oslo_messaging/tests/drivers/test_amqp_driver.py index fc4f85563..4a5f73320 100644 --- a/oslo_messaging/tests/drivers/test_amqp_driver.py +++ b/oslo_messaging/tests/drivers/test_amqp_driver.py @@ -464,6 +464,29 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto): driver.cleanup() + def test_sender_minimal_credit(self): + # ensure capacity is replenished when only 1 credit is configured + self.config(reply_link_credit=1, + rpc_server_credit=1, + group="oslo_messaging_amqp") + driver = amqp_driver.ProtonDriver(self.conf, self._broker_url) + target = oslo_messaging.Target(topic="test-topic", server="server") + listener = _ListenerThread(driver.listen(target, + None, + None)._poll_style_listener, + 4) + for i in range(4): + threading.Thread(target=driver.send, + args=(target, + {"context": "whatever"}, + {"method": "echo"}), + kwargs={'wait_for_reply': True}).start() + predicate = lambda: (self._broker.direct_count == 8) + _wait_until(predicate, 30) + self.assertTrue(predicate()) + listener.join(timeout=30) + driver.cleanup() + class TestAmqpNotification(_AmqpBrokerTestCaseAuto): """Test sending and receiving notifications.""" @@ -999,7 +1022,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase): def _on_active(link): # refuse granting credit for the broadcast link - if link.source_address.startswith("broadcast"): + if self._broker._addresser._is_multicast(link.source_address): self._blocked_links.add(link) else: # unblock all link when RPC call is made @@ -1019,13 +1042,17 @@ class TestLinkRecovery(_AmqpBrokerTestCase): target.fanout = True target.server = None # these threads will share the same link + th = [] for i in range(3): - threading.Thread(target=driver.send, - args=(target, {"context": "whatever"}, - {"msg": "n=%d" % i}), - kwargs={'wait_for_reply': False}).start() - - time.sleep(0.5) + t = threading.Thread(target=driver.send, + args=(target, {"context": "whatever"}, + {"msg": "n=%d" % i}), + kwargs={'wait_for_reply': False}) + t.start() + t.join(timeout=1) + self.assertTrue(t.isAlive()) + th.append(t) + self.assertEqual(self._broker.fanout_sent_count, 0) # this will trigger the release of credit for the previous links target.fanout = False rc = driver.send(target, {"context": "whatever"}, @@ -1036,6 +1063,9 @@ class TestLinkRecovery(_AmqpBrokerTestCase): listener.join(timeout=30) self.assertTrue(self._broker.fanout_count == 3) self.assertFalse(listener.isAlive()) + for t in th: + t.join(timeout=30) + self.assertFalse(t.isAlive()) driver.cleanup() @@ -1496,7 +1526,7 @@ class FakeBroker(threading.Thread): """Forward this message out the proper sending link.""" self.server.forward_message(message, handle, receiver_link) if self.link.capacity < 1: - self.server.credit_exhausted(self.link) + self.server.on_credit_exhausted(self.link) def __init__(self, cfg, sock_addr="", sock_port=0,