[AMQP 1.0] small fixes to improve timer scalability
This patch introduces the following tweaks to the timer implementation: Reduce the number of timers that need to be tracked by reducing the timer granularity to units of seconds. Decrease the default timeout values to further reduce the total number of tracked timers. Batch multiple expiring events that share the same deadline. Inline the timer comparison code in the main event loop. Avoid using an expensive comparison method in the heap sort by using an integer primitive instead. Use monotonic time instead of time.time() Change-Id: I83e86bf203e6a641085e482c7ccf0e01f4fb4d86
This commit is contained in:
parent
1bf6eeaaf9
commit
b7717e1616
@ -27,12 +27,12 @@ functions scheduled by the Controller.
|
||||
import abc
|
||||
import collections
|
||||
import logging
|
||||
from monotonic import monotonic as now # noqa
|
||||
import os
|
||||
import platform
|
||||
import random
|
||||
import sys
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import proton
|
||||
@ -878,7 +878,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
# methods executed by Tasks created by the driver:
|
||||
|
||||
def send(self, send_task):
|
||||
if send_task.deadline and send_task.deadline <= time.time():
|
||||
if send_task.deadline and send_task.deadline <= now():
|
||||
send_task._on_timeout()
|
||||
return
|
||||
LOG.debug("Sending message to %s", send_task.target)
|
||||
|
@ -25,11 +25,12 @@ the background thread via callables.
|
||||
import errno
|
||||
import heapq
|
||||
import logging
|
||||
import math
|
||||
from monotonic import monotonic as now # noqa
|
||||
import os
|
||||
import select
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import pyngus
|
||||
@ -39,6 +40,12 @@ from oslo_messaging._i18n import _LE, _LI, _LW
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def compute_timeout(offset):
|
||||
# minimize the timer granularity to one second so we don't have to track
|
||||
# too many timers
|
||||
return math.ceil(now() + offset)
|
||||
|
||||
|
||||
class _SocketConnection(object):
|
||||
"""Associates a pyngus Connection with a python network socket,
|
||||
and handles all connection-related I/O and timer events.
|
||||
@ -65,7 +72,7 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.read_socket_input(self.connection, self.socket)
|
||||
self.connection.process(time.time())
|
||||
self.connection.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
@ -79,7 +86,7 @@ class _SocketConnection(object):
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.write_socket_output(self.connection, self.socket)
|
||||
self.connection.process(time.time())
|
||||
self.connection.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
@ -161,52 +168,70 @@ class Scheduler(object):
|
||||
"""Schedule callables to be run in the future.
|
||||
"""
|
||||
class Event(object):
|
||||
def __init__(self, callback, deadline):
|
||||
self._callback = callback
|
||||
self._deadline = deadline
|
||||
# simply hold a reference to a callback that can be set to None if the
|
||||
# alarm is canceled
|
||||
def __init__(self, callback):
|
||||
self.callback = callback
|
||||
|
||||
def cancel(self):
|
||||
# quicker than rebalancing the tree
|
||||
self._callback = None
|
||||
|
||||
def __lt__(self, other):
|
||||
return self._deadline < other._deadline
|
||||
self.callback = None
|
||||
|
||||
def __init__(self):
|
||||
self._entries = []
|
||||
self._callbacks = {}
|
||||
self._deadlines = []
|
||||
|
||||
def alarm(self, request, deadline):
|
||||
"""Request a callable be executed at a specific time
|
||||
"""
|
||||
entry = Scheduler.Event(request, deadline)
|
||||
heapq.heappush(self._entries, entry)
|
||||
try:
|
||||
callbacks = self._callbacks[deadline]
|
||||
except KeyError:
|
||||
callbacks = list()
|
||||
self._callbacks[deadline] = callbacks
|
||||
heapq.heappush(self._deadlines, deadline)
|
||||
entry = Scheduler.Event(request)
|
||||
callbacks.append(entry)
|
||||
return entry
|
||||
|
||||
def defer(self, request, delay):
|
||||
"""Request a callable be executed after delay seconds
|
||||
"""
|
||||
return self.alarm(request, time.time() + delay)
|
||||
return self.alarm(request, compute_timeout(delay))
|
||||
|
||||
@property
|
||||
def _next_deadline(self):
|
||||
"""The timestamp of the next expiring event or None
|
||||
"""
|
||||
return self._deadlines[0] if self._deadlines else None
|
||||
|
||||
def _get_delay(self, max_delay=None):
|
||||
"""Get the delay in milliseconds until the next callable needs to be
|
||||
run, or 'max_delay' if no outstanding callables or the delay to the
|
||||
next callable is > 'max_delay'.
|
||||
"""
|
||||
due = self._entries[0]._deadline if self._entries else None
|
||||
due = self._deadlines[0] if self._deadlines else None
|
||||
if due is None:
|
||||
return max_delay
|
||||
now = time.time()
|
||||
if due <= now:
|
||||
_now = now()
|
||||
if due <= _now:
|
||||
return 0
|
||||
else:
|
||||
return min(due - now, max_delay) if max_delay else due - now
|
||||
return min(due - _now, max_delay) if max_delay else due - _now
|
||||
|
||||
def _process(self):
|
||||
"""Invoke all expired callables."""
|
||||
while self._entries and self._entries[0]._deadline <= time.time():
|
||||
callback = heapq.heappop(self._entries)._callback
|
||||
if callback:
|
||||
callback()
|
||||
if self._deadlines:
|
||||
_now = now()
|
||||
try:
|
||||
while self._deadlines[0] <= _now:
|
||||
deadline = heapq.heappop(self._deadlines)
|
||||
callbacks = self._callbacks[deadline]
|
||||
del self._callbacks[deadline]
|
||||
for cb in callbacks:
|
||||
cb.callback and cb.callback()
|
||||
except IndexError:
|
||||
pass
|
||||
|
||||
|
||||
class Requests(object):
|
||||
@ -325,15 +350,17 @@ class Thread(threading.Thread):
|
||||
readfds.append(self._requests)
|
||||
writefds = [c.user_context for c in writers]
|
||||
|
||||
timeout = None
|
||||
if timers:
|
||||
deadline = timers[0].deadline # 0 == next expiring timer
|
||||
now = time.time()
|
||||
timeout = 0 if deadline <= now else deadline - now
|
||||
|
||||
# adjust timeout for any deferred requests
|
||||
timeout = self._scheduler._get_delay(timeout)
|
||||
# force select to return in time to service the next expiring timer
|
||||
d1 = self._scheduler._next_deadline
|
||||
d2 = timers[0].deadline if timers else None
|
||||
deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
|
||||
if deadline:
|
||||
_now = now()
|
||||
timeout = 0 if deadline <= _now else (deadline - _now)
|
||||
else:
|
||||
timeout = None
|
||||
|
||||
# and now we wait...
|
||||
try:
|
||||
results = select.select(readfds, writefds, [], timeout)
|
||||
except select.error as serror:
|
||||
@ -348,10 +375,12 @@ class Thread(threading.Thread):
|
||||
for r in readable:
|
||||
r.read()
|
||||
|
||||
for t in timers:
|
||||
if t.deadline > time.time():
|
||||
break
|
||||
t.process(time.time())
|
||||
if timers:
|
||||
_now = now()
|
||||
for t in timers:
|
||||
if t.deadline > _now:
|
||||
break
|
||||
t.process(_now)
|
||||
|
||||
for w in writable:
|
||||
w.write()
|
||||
|
@ -125,13 +125,13 @@ amqp1_opts = [
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
||||
cfg.IntOpt('default_send_timeout',
|
||||
default=60,
|
||||
default=30,
|
||||
min=5,
|
||||
help='The deadline for an rpc cast or call message delivery.'
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
||||
cfg.IntOpt('default_notify_timeout',
|
||||
default=60,
|
||||
default=30,
|
||||
min=5,
|
||||
help='The deadline for a sent notification message delivery.'
|
||||
' Only used when caller does not provide a timeout expiry.'),
|
||||
|
@ -24,7 +24,6 @@ import collections
|
||||
import logging
|
||||
import os
|
||||
import threading
|
||||
import time
|
||||
import uuid
|
||||
|
||||
from oslo_config import cfg
|
||||
@ -33,6 +32,7 @@ from oslo_serialization import jsonutils
|
||||
from oslo_utils import importutils
|
||||
from oslo_utils import timeutils
|
||||
|
||||
from oslo_messaging._drivers.amqp1_driver.eventloop import compute_timeout
|
||||
from oslo_messaging._drivers.amqp1_driver import opts
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers import common
|
||||
@ -105,14 +105,14 @@ class ProtonIncomingMessage(base.RpcIncomingMessage):
|
||||
response.correlation_id = self._correlation_id
|
||||
LOG.debug("Sending RPC reply to %s (%s)", self._reply_to,
|
||||
self._correlation_id)
|
||||
now = time.time()
|
||||
deadline = now + self.listener.driver._default_reply_timeout
|
||||
driver = self.listener.driver
|
||||
deadline = compute_timeout(driver._default_reply_timeout)
|
||||
task = controller.SendTask("RPC Reply", response, self._reply_to,
|
||||
# analogous to kombu missing dest t/o:
|
||||
deadline,
|
||||
retry=0,
|
||||
wait_for_ack=True)
|
||||
self.listener.driver._ctrl.add_task(task)
|
||||
driver._ctrl.add_task(task)
|
||||
rc = task.wait()
|
||||
if rc:
|
||||
# something failed. Not much we can do at this point but log
|
||||
@ -289,7 +289,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
request = marshal_request(message, ctxt, envelope)
|
||||
expire = 0
|
||||
if timeout:
|
||||
expire = time.time() + timeout # when the caller times out
|
||||
expire = compute_timeout(timeout) # when the caller times out
|
||||
# amqp uses millisecond time values, timeout is seconds
|
||||
request.ttl = int(timeout * 1000)
|
||||
request.expiry_time = int(expire * 1000)
|
||||
@ -297,7 +297,7 @@ class ProtonDriver(base.BaseDriver):
|
||||
# no timeout provided by application. If the backend is queueless
|
||||
# this could lead to a hang - provide a default to prevent this
|
||||
# TODO(kgiusti) only do this if brokerless backend
|
||||
expire = time.time() + self._default_send_timeout
|
||||
expire = compute_timeout(self._default_send_timeout)
|
||||
LOG.debug("Sending message to %s", target)
|
||||
if wait_for_reply:
|
||||
task = controller.RPCCallTask(target, request, expire, retry)
|
||||
@ -343,9 +343,10 @@ class ProtonDriver(base.BaseDriver):
|
||||
# this
|
||||
# TODO(kgiusti) should raise NotImplemented if not broker backend
|
||||
LOG.debug("Send notification to %s", target)
|
||||
deadline = compute_timeout(self._default_notify_timeout)
|
||||
task = controller.SendTask("Notify", request, target,
|
||||
time.time() + self._default_notify_timeout,
|
||||
retry, wait_for_ack=True, notification=True)
|
||||
deadline, retry, wait_for_ack=True,
|
||||
notification=True)
|
||||
self._ctrl.add_task(task)
|
||||
rc = task.wait()
|
||||
if isinstance(rc, Exception):
|
||||
|
@ -357,7 +357,7 @@ class TestAmqpSend(_AmqpBrokerTestCaseAuto):
|
||||
driver = amqp_driver.ProtonDriver(self.conf, self._broker_url)
|
||||
target = oslo_messaging.Target(topic="test-topic")
|
||||
listener = _SlowResponder(
|
||||
driver.listen(target, None, None)._poll_style_listener, 1)
|
||||
driver.listen(target, None, None)._poll_style_listener, 3)
|
||||
|
||||
self.assertRaises(oslo_messaging.MessagingTimeout,
|
||||
driver.send, target,
|
||||
|
@ -14,6 +14,7 @@ oslo.service>=1.10.0 # Apache-2.0
|
||||
oslo.i18n>=2.1.0 # Apache-2.0
|
||||
stevedore>=1.10.0 # Apache-2.0
|
||||
debtcollector>=1.2.0 # Apache-2.0
|
||||
monotonic>=0.6 # Apache-2.0
|
||||
|
||||
# for jsonutils
|
||||
six>=1.9.0 # MIT
|
||||
|
Loading…
x
Reference in New Issue
Block a user