Merge "[AMQP 1.0] small fixes to improve timer scalability" into feature/amqp-dispatch-router
This commit is contained in:
commit
ba387dfd1c
@ -27,12 +27,12 @@ functions scheduled by the Controller.
|
||||
import abc
|
||||
import collections
|
||||
import logging
|
||||
from monotonic import monotonic as now # noqa
|
||||
import os
|
||||
import platform
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import proton
|
||||
@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
# methods executed by Tasks created by the driver:
|
||||
|
||||
def send(self, send_task):
|
||||
if send_task.deadline and send_task.deadline <= time.time():
|
||||
if send_task.deadline and send_task.deadline <= now():
|
||||
send_task._on_timeout()
|
||||
return
|
||||
LOG.debug("Sending message to %s", send_task.target)
|
||||
|
@ -25,11 +25,12 @@ the background thread via callables.
|
||||
import errno
|
||||
import heapq
|
||||
import logging
|
||||
import math
|
||||
from monotonic import monotonic as now # noqa
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pyngus
|
||||
@ -39,6 +40,12 @@ from oslo_messaging._i18n import _LE, _LI, _LW
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def compute_timeout(offset):
|
||||
# minimize the timer granularity to one second so we don't have to track
|
||||
# too many timers
|
||||
return math.ceil(now() + offset)
|
||||
|
||||
|
||||
class _SocketConnection(object):
|
||||
"""Associates a pyngus Connection with a python network socket,
|
||||
and handles all connection-related I/O and timer events.
|
||||
@ -65,7 +72,7 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.read_socket_input(self.connection, self.socket)
|
||||
self.connection.process(time.time())
|
||||
self.connection.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
@ -79,7 +86,7 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.write_socket_output(self.connection, self.socket)
|
||||
self.connection.process(time.time())
|
||||
self.connection.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
@ -161,52 +168,70 @@ class Scheduler(object):
|
||||
"""Schedule callables to be run in the future.
|
||||
"""
|
||||
class Event(object):
|
||||
def __init__(self, callback, deadline):
|
||||
self._callback = callback
|
||||
self._deadline = deadline
|
||||
# simply hold a reference to a callback that can be set to None if the
|
||||
# alarm is canceled
|
||||
def __init__(self, callback):
|
||||
self.callback = callback
|
||||
|
||||
def cancel(self):
|
||||
# quicker than rebalancing the tree
|
||||
self._callback = None
|
||||
|
||||
def __lt__(self, other):
|
||||
return self._deadline < other._deadline
|
||||
self.callback = None
|
||||
|
||||
def __init__(self):
|
||||
self._entries = []
|
||||
self._callbacks = {}
|
||||
self._deadlines = []
|
||||
|
||||
def alarm(self, request, deadline):
|
||||
"""Request a callable be executed at a specific time
|
||||
"""
|
||||
entry = Scheduler.Event(request, deadline)
|
||||
heapq.heappush(self._entries, entry)
|
||||
try:
|
||||
callbacks = self._callbacks[deadline]
|
||||
except KeyError:
|
||||
callbacks = list()
|
||||
self._callbacks[deadline] = callbacks
|
||||
heapq.heappush(self._deadlines, deadline)
|
||||
entry = Scheduler.Event(request)
|
||||
callbacks.append(entry)
|
||||
return entry
|
||||
|
||||
def defer(self, request, delay):
|
||||
"""Request a callable be executed after delay seconds
|
||||
"""
|
||||
return self.alarm(request, time.time() + delay)
|
||||
return self.alarm(request, compute_timeout(delay))
|
||||
|
||||
@property
|
||||
def _next_deadline(self):
|
||||
"""The timestamp of the next expiring event or None
|
||||
"""
|
||||
return self._deadlines[0] if self._deadlines else None
|
||||
|
||||
def _get_delay(self, max_delay=None):
|
||||
"""Get the delay in milliseconds until the next callable needs to be
|
||||
run, or 'max_delay' if no outstanding callables or the delay to the
|
||||
next callable is > 'max_delay'.
|
||||
"""
|
||||
due = self._entries[0]._deadline if self._entries else None
|
||||
due = self._deadlines[0] if self._deadlines else None
|
||||
if due is None:
|
||||
return max_delay
|
||||
now = time.time()
|
||||
if due <= now:
|
||||
_now = now()
|
||||
if due <= _now:
|
||||
return 0
|
||||
else:
|
||||
return min(due - now, max_delay) if max_delay else due - now
|
||||
return min(due - _now, max_delay) if max_delay else due - _now
|
||||
|
||||
def _process(self):
|
||||
"""Invoke all expired callables."""
|
||||
while self._entries and self._entries[0]._deadline <= time.time():
|
||||
callback = heapq.heappop(self._entries)._callback
|
||||
if callback:
|
||||
callback()
|
||||
if self._deadlines:
|
||||
_now = now()
|
||||
try:
|
||||
while self._deadlines[0] <= _now:
|
||||
deadline = heapq.heappop(self._deadlines)
|
||||
callbacks = self._callbacks[deadline]
|
||||
del self._callbacks[deadline]
|
||||
for cb in callbacks:
|
||||
cb.callback and cb.callback()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
|
||||
class Requests(object):
|
||||
@ -325,15 +350,17 @@ class Thread(threading.Thread):
|
||||
readfds.append(self._requests)
|
||||
writefds = [c.user_context for c in writers]
|
||||
|
||||
timeout = None
|
||||
if timers:
|
||||
deadline = timers[0].deadline # 0 == next expiring timer
|
||||
now = time.time()
|
||||
timeout = 0 if deadline <= now else deadline - now
|
||||
|
||||
# adjust timeout for any deferred requests
|
||||
timeout = self._scheduler._get_delay(timeout)
|
||||
# force select to return in time to service the next expiring timer
|
||||
d1 = self._scheduler._next_deadline
|
||||
d2 = timers[0].deadline if timers else None
|
||||
deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
|
||||
if deadline:
|
||||
_now = now()
|
||||
timeout = 0 if deadline <= _now else (deadline - _now)
|
||||
else:
|
||||
timeout = None
|
||||
|
||||
# and now we wait...
|
||||
try:
|
||||
results = select.select(readfds, writefds, [], timeout)
|
||||
except select.error as serror:
|
||||
@ -348,10 +375,12 @@ class Thread(threading.Thread):
|
||||
for r in readable:
|
||||
r.read()
|
||||
|
||||
for t in timers:
|
||||
if t.deadline > time.time():
|
||||
break
|
||||
t.process(time.time())
|
||||
if timers:
|
||||
_now = now()
|
||||
for t in timers:
|
||||
if t.deadline > _now:
|
||||
break
|
||||
t.process(_now)
|
||||
|
||||
for w in writable:
|
||||
w.write()
|
||||
|
@ -118,13 +118,13 @@ amqp1_opts = [
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
||||
cfg.IntOpt('default_send_timeout',
|
||||
default=60,
|
||||
default=30,
|
||||
min=5,
|
||||
help='The deadline for an rpc cast or call message delivery.'
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
||||
cfg.IntOpt('default_notify_timeout',
|
||||
default=60,
|
||||
default=30,
|
||||
min=5,
|
||||
help='The deadline for a sent notification message delivery.'
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
@ -24,7 +24,6 @@ import collections
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -33,6 +32,7 @@ from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout
|
||||
from oslo_messaging._drivers.amqp1_driver import opts
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common
|
||||
@ -105,14 +105,14 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
|
||||
response.correlation_id = self._correlation_id
|
||||
LOG.debug("Sending RPC reply to %s (%s)", self._reply_to,
|
||||
self._correlation_id)
|
||||
now = time.time()
|
||||
deadline = now + self.listener.driver._default_reply_timeout
|
||||
driver = self.listener.driver
|
||||
deadline = compute_timeout(driver._default_reply_timeout)
|
||||
task = controller.SendTask("RPC Reply", response, self._reply_to,
|
||||
# analogous to kombu missing dest t/o:
|
||||
deadline,
|
||||
retry=0,
|
||||
wait_for_ack=True)
|
||||
self.listener.driver._ctrl.add_task(task)
|
||||
driver._ctrl.add_task(task)
|
||||
rc = task.wait()
|
||||
if rc:
|
||||
# something failed. Not much we can do at this point but log
|
||||
@ -286,7 +286,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
request = marshal_request(message, ctxt, envelope)
|
||||
expire = 0
|
||||
if timeout:
|
||||
expire = time.time() + timeout # when the caller times out
|
||||
expire = compute_timeout(timeout) # when the caller times out
|
||||
# amqp uses millisecond time values, timeout is seconds
|
||||
request.ttl = int(timeout * 1000)
|
||||
request.expiry_time = int(expire * 1000)
|
||||
@ -294,7 +294,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
# no timeout provided by application. If the backend is queueless
|
||||
# this could lead to a hang - provide a default to prevent this
|
||||
# TODO(kgiusti) only do this if brokerless backend
|
||||
expire = time.time() + self._default_send_timeout
|
||||
expire = compute_timeout(self._default_send_timeout)
|
||||
LOG.debug("Sending message to %s", target)
|
||||
if wait_for_reply:
|
||||
task = controller.RPCCallTask(target, request, expire, retry)
|
||||
@ -339,9 +339,10 @@ class ProtonDriver(base.BaseDriver):
|
||||
# this
|
||||
# TODO(kgiusti) should raise NotImplemented if not broker backend
|
||||
LOG.debug("Send notification to %s", target)
|
||||
deadline = compute_timeout(self._default_notify_timeout)
|
||||
task = controller.SendTask("Notify", request, target,
|
||||
time.time() + self._default_notify_timeout,
|
||||
retry, wait_for_ack=True, notification=True)
|
||||
deadline, retry, wait_for_ack=True,
|
||||
notification=True)
|
||||
self._ctrl.add_task(task)
|
||||
rc = task.wait()
|
||||
if isinstance(rc, Exception):
|
||||
|
@ -357,7 +357,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
listener = _SlowResponder(
|
||||
driver.listen(target, None, None)._poll_style_listener, 1)
|
||||
driver.listen(target, None, None)._poll_style_listener, 3)
|
||||
|
||||
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||
driver.send, target,
|
||||
|
@ -14,6 +14,7 @@ oslo.service>=1.10.0 # Apache-2.0
|
||||
oslo.i18n>=2.1.0 # Apache-2.0
|
||||
stevedore>=1.10.0 # Apache-2.0
|
||||
debtcollector>=1.2.0 # Apache-2.0
|
||||
monotonic>=0.6 # Apache-2.0
|
||||
|
||||
# for jsonutils
|
||||
six>=1.9.0 # MIT
|
||||
|
Loading…
x
Reference in New Issue
Block a user