Merge "cast() and RPC replies should not block waiting for endpoint to ack"

This commit is contained in:
Jenkins 2016-10-11 10:49:34 +00:00 committed by Gerrit Code Review
commit 368e3cfb47
3 changed files with 41 additions and 59 deletions

View File

@ -120,21 +120,20 @@ class SendTask(Task):
def _prepare(self, sender):
"""Called immediately before the message is handed off to the i/o
system. This implies that the sender link is up and credit is
available for this send request.
system. This implies that the sender link is up.
"""
pass
if not self.wait_for_ack:
# sender is not concerned with waiting for acknowledgment
# "best effort at-most-once delivery"
self._cleanup()
self._wakeup.set()
def _on_ack(self, state, info):
"""Called by eventloop thread when the ack/nack is received from the
peer.
"""
if self._wakeup.is_set():
LOG.debug("Message ACKed after send completed: %s %s", state, info)
return
if state != pyngus.SenderLink.ACCEPTED:
# TODO(kgiusti): should retry if deadline not hit
# TODO(kgiusti): could retry if deadline not hit
msg = ("{name} message send to {target} failed: remote"
" disposition: {disp}, info:"
"{info}".format(name=self.name,
@ -142,6 +141,7 @@ class SendTask(Task):
disp=state,
info=info))
self._error = exceptions.MessageDeliveryFailure(msg)
LOG.warning("%s", msg)
self._cleanup()
self._wakeup.set()
@ -149,18 +149,15 @@ class SendTask(Task):
"""Invoked by the eventloop when the send fails to complete before the
timeout is reached.
"""
if self._wakeup.is_set():
LOG.debug("Message send timeout occurred after send completed")
return
self.timer = None
if self.message.ttl:
msg = ("{name} message sent to {target} failed: timed"
" out".format(name=self.name, target=self.target))
self._error = exceptions.MessagingTimeout(msg)
else:
msg = ("{name} message sent to {target} failed:"
" undeliverable".format(name=self.name, target=self.target))
self._error = exceptions.MessageDeliveryFailure(msg)
msg = ("{name} message sent to {target} failed: timed"
" out".format(name=self.name, target=self.target))
LOG.warning("%s", msg)
# Only raise a MessagingTimeout if the caller has explicitly specified
# a timeout.
self._error = exceptions.MessagingTimeout(msg) \
if self.message.ttl else \
exceptions.MessageDeliveryFailure(msg)
self._cleanup()
self._wakeup.set()
@ -168,15 +165,11 @@ class SendTask(Task):
"""Invoked by the eventloop if the send operation fails for reasons
other than timeout and nack.
"""
if self._wakeup.is_set():
LOG.debug("Message send error occurred after send completed: %s",
str(description))
return
msg = ("{name} message sent to {target} failed:"
" {reason}".format(name=self.name,
target=self.target,
reason=description))
LOG.warning("%s", msg)
self._error = exceptions.MessageDeliveryFailure(msg)
self._cleanup()
self._wakeup.set()
@ -228,7 +221,7 @@ class RPCCallTask(SendTask):
# must wait for reply if ACCEPTED
def _cleanup(self):
if self._reply_link:
if self._reply_link and self._msg_id:
self._reply_link.cancel_response(self._msg_id)
self._msg_id = None
super(RPCCallTask, self)._cleanup()
@ -334,9 +327,7 @@ class Sender(pyngus.SenderEventHandler):
send_task.timer = self._scheduler.alarm(timer_callback,
send_task.deadline)
if not self._can_send:
self._pending_sends.append(send_task)
elif self._pending_sends:
if not self._can_send or self._pending_sends:
self._pending_sends.append(send_task)
else:
self._send(send_task)
@ -348,7 +339,7 @@ class Sender(pyngus.SenderEventHandler):
self._send_pending()
def credit_granted(self, sender_link):
self._send_pending()
pass
def sender_remote_closed(self, sender_link, pn_condition):
# The remote has initiated a close. This could happen when the message
@ -403,36 +394,30 @@ class Sender(pyngus.SenderEventHandler):
@property
def _can_send(self):
return (self._link is not None and
self._link.active and
self._link.credit > 0)
return self._link and self._link.active
def _send(self, send_task):
send_task._prepare(self)
send_task.message.address = self._address
if send_task.wait_for_ack:
def pyngus_callback(link, handle, state, info):
# invoked when the message bus (n)acks this message
if state == pyngus.SenderLink.TIMED_OUT:
# ignore pyngus timeout - we maintain our own timer
return
self._unacked.discard(send_task)
send_task._on_ack(state, info)
self._unacked.add(send_task)
self._link.send(send_task.message,
delivery_callback=pyngus_callback,
handle=self,
deadline=send_task.deadline)
else:
self._link.send(send_task.message)
# simulate ack to wakeup sender
send_task._on_ack(pyngus.SenderLink.ACCEPTED, dict())
def pyngus_callback(link, handle, state, info):
# invoked when the message bus (n)acks this message
if state == pyngus.SenderLink.TIMED_OUT:
# ignore pyngus timeout - we maintain our own timer
return
self._unacked.discard(send_task)
send_task._on_ack(state, info)
self._unacked.add(send_task)
self._link.send(send_task.message,
delivery_callback=pyngus_callback,
handle=self,
deadline=send_task.deadline)
def _send_pending(self):
# send as many pending messages as there is credit available
# send all pending messages
if self._can_send:
while self._pending_sends and self._link.credit > 0:
while self._pending_sends:
self._send(self._pending_sends.popleft())
def _open_link(self):

View File

@ -226,7 +226,7 @@ amqp1_opts = [
# Settlement control
cfg.MultiStrOpt('pre_settled',
default=['rpc-cast'],
default=['rpc-cast', 'rpc-reply'],
help="Send messages of this type pre-settled.\n"
"Pre-settled messages will not receive acknowledgement\n"
"from the peer. Note well: pre-settled messages may be\n"

View File

@ -1085,16 +1085,16 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
target.fanout = True
target.server = None
# these threads will share the same link
th = []
for i in range(3):
t = threading.Thread(target=driver.send,
args=(target, {"context": "whatever"},
{"msg": "n=%d" % i}),
kwargs={'wait_for_reply': False})
t.start()
t.join(timeout=1)
self.assertTrue(t.isAlive())
th.append(t)
# casts return once message is put on active link
t.join(timeout=30)
time.sleep(1) # ensure messages are going nowhere
self.assertEqual(self._broker.fanout_sent_count, 0)
# this will trigger the release of credit for the previous links
target.fanout = False
@ -1106,9 +1106,6 @@ class TestLinkRecovery(_AmqpBrokerTestCase):
listener.join(timeout=30)
self.assertTrue(self._broker.fanout_count == 3)
self.assertFalse(listener.isAlive())
for t in th:
t.join(timeout=30)
self.assertFalse(t.isAlive())
driver.cleanup()