Merge "[AMQP 1.0] Add link credit configuration options" into feature/amqp-dispatch-router
This commit is contained in:
commit
5d5596a43d
@ -459,7 +459,7 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
routed to the proper incoming queue using the correlation-id header in the
|
||||
message.
|
||||
"""
|
||||
def __init__(self, connection, on_ready, on_down):
|
||||
def __init__(self, connection, on_ready, on_down, capacity):
|
||||
self._correlation = {} # map of correlation-id to response queue
|
||||
self._on_ready = on_ready
|
||||
self._on_down = on_down
|
||||
@ -469,13 +469,13 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
event_handler=self,
|
||||
name=rname)
|
||||
|
||||
# capacity determines the maximum number of reply messages this link
|
||||
# can receive. As messages are received and credit is consumed, this
|
||||
# driver will 'top up' the credit back to max capacity. This number
|
||||
# should be large enough to avoid needlessly flow-controlling the
|
||||
# replies.
|
||||
self.capacity = 100 # TODO(kgiusti) guesstimate - make configurable
|
||||
self._credit = 0
|
||||
# capacity determines the maximum number of reply messages this link is
|
||||
# willing to receive. As messages are received and capacity is
|
||||
# consumed, this driver will 'top up' the capacity back to max
|
||||
# capacity. This number should be large enough to avoid needlessly
|
||||
# flow-controlling the replies.
|
||||
self._capacity = capacity
|
||||
self._capacity_low = (capacity + 1) / 2
|
||||
self._receiver.open()
|
||||
|
||||
def detach(self):
|
||||
@ -526,7 +526,7 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
messages.
|
||||
"""
|
||||
LOG.debug("Replies link active src=%s", self._receiver.source_address)
|
||||
self._update_credit()
|
||||
receiver_link.add_capacity(self._capacity)
|
||||
self._on_ready()
|
||||
|
||||
def receiver_remote_closed(self, receiver, pn_condition):
|
||||
@ -551,26 +551,20 @@ class Replies(pyngus.ReceiverEventHandler):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when a new message
|
||||
arrives on this receiver link from the peer.
|
||||
"""
|
||||
self._credit = self._credit - 1
|
||||
self._update_credit()
|
||||
|
||||
key = message.correlation_id
|
||||
if key in self._correlation:
|
||||
LOG.debug("Received response for msg id=%s", key)
|
||||
LOG.debug("Received response for msg id=%s", key)
|
||||
try:
|
||||
self._correlation[key](message)
|
||||
# cleanup (only need one response per request)
|
||||
del self._correlation[key]
|
||||
receiver.message_accepted(handle)
|
||||
else:
|
||||
except KeyError:
|
||||
LOG.warning(_LW("Can't find receiver for response msg id=%s, "
|
||||
"dropping!"), key)
|
||||
receiver.message_modified(handle, True, True, None)
|
||||
|
||||
def _update_credit(self):
|
||||
# ensure we have enough credit
|
||||
if self._credit < self.capacity / 2:
|
||||
self._receiver.add_capacity(self.capacity - self._credit)
|
||||
self._credit = self.capacity
|
||||
if receiver.capacity <= self._capacity_low:
|
||||
receiver.add_capacity(self._capacity - receiver.capacity)
|
||||
|
||||
|
||||
class Server(pyngus.ReceiverEventHandler):
|
||||
@ -578,11 +572,12 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
from a given target. Messages arriving on the links are placed on the
|
||||
'incoming' queue.
|
||||
"""
|
||||
def __init__(self, target, incoming, scheduler, delay):
|
||||
def __init__(self, target, incoming, scheduler, delay, capacity):
|
||||
self._target = target
|
||||
self._incoming = incoming
|
||||
self._addresses = []
|
||||
self._capacity = 500 # credit per link
|
||||
self._capacity = capacity # credit per each link
|
||||
self._capacity_low = (capacity + 1) / 2
|
||||
self._receivers = []
|
||||
self._scheduler = scheduler
|
||||
self._delay = delay # for link re-attach
|
||||
@ -660,7 +655,7 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
receiver.message_released(handle)
|
||||
else:
|
||||
receiver.message_accepted(handle)
|
||||
if receiver.capacity < self._capacity / 2:
|
||||
if receiver.capacity <= self._capacity_low:
|
||||
receiver.add_capacity(self._capacity - receiver.capacity)
|
||||
else:
|
||||
LOG.debug("Can't find receiver for settlement")
|
||||
@ -676,11 +671,6 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
event_handler=self,
|
||||
name=name,
|
||||
properties=props)
|
||||
|
||||
# TODO(kgiusti) Hardcoding credit here is sub-optimal. A better
|
||||
# approach would monitor for a back-up of inbound messages to be
|
||||
# processed by the consuming application and backpressure the
|
||||
# sender based on configured thresholds.
|
||||
r.add_capacity(self._capacity)
|
||||
r.open()
|
||||
return r
|
||||
@ -701,8 +691,9 @@ class Server(pyngus.ReceiverEventHandler):
|
||||
|
||||
class RPCServer(Server):
|
||||
"""Subscribes to RPC addresses"""
|
||||
def __init__(self, target, incoming, scheduler, delay):
|
||||
super(RPCServer, self).__init__(target, incoming, scheduler, delay)
|
||||
def __init__(self, target, incoming, scheduler, delay, capacity):
|
||||
super(RPCServer, self).__init__(target, incoming, scheduler, delay,
|
||||
capacity)
|
||||
|
||||
def attach(self, connection, addresser):
|
||||
# Generate the AMQP 1.0 addresses for the base class
|
||||
@ -717,9 +708,9 @@ class RPCServer(Server):
|
||||
|
||||
class NotificationServer(Server):
|
||||
"""Subscribes to Notification addresses"""
|
||||
def __init__(self, target, incoming, scheduler, delay):
|
||||
def __init__(self, target, incoming, scheduler, delay, capacity):
|
||||
super(NotificationServer, self).__init__(target, incoming, scheduler,
|
||||
delay)
|
||||
delay, capacity)
|
||||
|
||||
def attach(self, connection, addresser):
|
||||
# Generate the AMQP 1.0 addresses for the base class
|
||||
@ -841,6 +832,10 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
# prevent queuing up multiple requests to run _process_tasks()
|
||||
self._process_tasks_scheduled = False
|
||||
self._process_tasks_lock = threading.Lock()
|
||||
# credit levels for incoming links
|
||||
self._reply_credit = _opts.reply_link_credit
|
||||
self._rpc_credit = _opts.rpc_server_credit
|
||||
self._notify_credit = _opts.notify_server_credit
|
||||
|
||||
def connect(self):
|
||||
"""Connect to the messaging service."""
|
||||
@ -899,13 +894,15 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
server = NotificationServer(subscribe_task._target,
|
||||
subscribe_task._in_queue,
|
||||
self.processor,
|
||||
self.link_retry_delay)
|
||||
self.link_retry_delay,
|
||||
self._notify_credit)
|
||||
else:
|
||||
t = "RPC"
|
||||
server = RPCServer(subscribe_task._target,
|
||||
subscribe_task._in_queue,
|
||||
self.processor,
|
||||
self.link_retry_delay)
|
||||
self.link_retry_delay,
|
||||
self._rpc_credit)
|
||||
|
||||
LOG.debug("Subscribing to %(type)s target %(target)s",
|
||||
{'type': t, 'target': subscribe_task._target})
|
||||
@ -1061,7 +1058,8 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self.addresser)
|
||||
self.reply_link = Replies(self._socket_connection.connection,
|
||||
self._reply_link_ready,
|
||||
self._reply_link_down)
|
||||
self._reply_link_down,
|
||||
self._reply_credit)
|
||||
self._delay = 1
|
||||
|
||||
def connection_closed(self, connection):
|
||||
|
@ -208,5 +208,22 @@ amqp1_opts = [
|
||||
"Target.exchange if set\n"
|
||||
"else default_rpc_exchange if set\n"
|
||||
"else control_exchange if set\n"
|
||||
"else 'rpc'")
|
||||
"else 'rpc'"),
|
||||
|
||||
# Message Credit Levels
|
||||
|
||||
cfg.IntOpt('reply_link_credit',
|
||||
default=200,
|
||||
min=1,
|
||||
help='Window size for incoming RPC Reply messages.'),
|
||||
|
||||
cfg.IntOpt('rpc_server_credit',
|
||||
default=100,
|
||||
min=1,
|
||||
help='Window size for incoming RPC Request messages'),
|
||||
|
||||
cfg.IntOpt('notify_server_credit',
|
||||
default=100,
|
||||
min=1,
|
||||
help='Window size for incoming Notification messages')
|
||||
]
|
||||
|
@ -464,6 +464,29 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
|
||||
|
||||
driver.cleanup()
|
||||
|
||||
def test_sender_minimal_credit(self):
|
||||
# ensure capacity is replenished when only 1 credit is configured
|
||||
self.config(reply_link_credit=1,
|
||||
rpc_server_credit=1,
|
||||
group="oslo_messaging_amqp")
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
target = oslo_messaging.Target(topic="test-topic", server="server")
|
||||
listener = _ListenerThread(driver.listen(target,
|
||||
None,
|
||||
None)._poll_style_listener,
|
||||
4)
|
||||
for i in range(4):
|
||||
threading.Thread(target=driver.send,
|
||||
args=(target,
|
||||
{"context": "whatever"},
|
||||
{"method": "echo"}),
|
||||
kwargs={'wait_for_reply': True}).start()
|
||||
predicate = lambda: (self._broker.direct_count == 8)
|
||||
_wait_until(predicate, 30)
|
||||
self.assertTrue(predicate())
|
||||
listener.join(timeout=30)
|
||||
driver.cleanup()
|
||||
|
||||
|
||||
class TestAmqpNotification(_AmqpBrokerTestCaseAuto):
|
||||
"""Test sending and receiving notifications."""
|
||||
@ -998,7 +1021,7 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
|
||||
|
||||
def _on_active(link):
|
||||
# refuse granting credit for the broadcast link
|
||||
if link.source_address.startswith("broadcast"):
|
||||
if self._broker._addresser._is_multicast(link.source_address):
|
||||
self._blocked_links.add(link)
|
||||
else:
|
||||
# unblock all link when RPC call is made
|
||||
@ -1018,13 +1041,17 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
|
||||
target.fanout = True
|
||||
target.server = None
|
||||
# these threads will share the same link
|
||||
th = []
|
||||
for i in range(3):
|
||||
threading.Thread(target=driver.send,
|
||||
args=(target, {"context": "whatever"},
|
||||
{"msg": "n=%d" % i}),
|
||||
kwargs={'wait_for_reply': False}).start()
|
||||
|
||||
time.sleep(0.5)
|
||||
t = threading.Thread(target=driver.send,
|
||||
args=(target, {"context": "whatever"},
|
||||
{"msg": "n=%d" % i}),
|
||||
kwargs={'wait_for_reply': False})
|
||||
t.start()
|
||||
t.join(timeout=1)
|
||||
self.assertTrue(t.isAlive())
|
||||
th.append(t)
|
||||
self.assertEqual(self._broker.fanout_sent_count, 0)
|
||||
# this will trigger the release of credit for the previous links
|
||||
target.fanout = False
|
||||
rc = driver.send(target, {"context": "whatever"},
|
||||
@ -1035,6 +1062,9 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
|
||||
listener.join(timeout=30)
|
||||
self.assertTrue(self._broker.fanout_count == 3)
|
||||
self.assertFalse(listener.isAlive())
|
||||
for t in th:
|
||||
t.join(timeout=30)
|
||||
self.assertFalse(t.isAlive())
|
||||
driver.cleanup()
|
||||
|
||||
|
||||
@ -1495,7 +1525,7 @@ class FakeBroker(threading.Thread):
|
||||
"""Forward this message out the proper sending link."""
|
||||
self.server.forward_message(message, handle, receiver_link)
|
||||
if self.link.capacity < 1:
|
||||
self.server.credit_exhausted(self.link)
|
||||
self.server.on_credit_exhausted(self.link)
|
||||
|
||||
def __init__(self, cfg,
|
||||
sock_addr="", sock_port=0,
|
||||
|
Loading…
x
Reference in New Issue
Block a user