[AMQP 1.0] Simplify the I/O event loop code

Leverage the fact that there is never more than one socket active to
simplify the event loop's I/O processing.  This removes the need to
look up active connections.  Also removes the need to query the return
values from select since there is only one socket to process. This
change improves RPC throughput under the simulator by up to 20%

As part of this change I've clarified the code's use of the pyngus
connection by renaming the connection property to something a bit more
specific.

Change-Id: If7c020bb0bd96490af78bc06659db10073b02417
This commit is contained in:
Kenneth Giusti 2016-11-28 16:04:27 -05:00
parent 13dffe9913
commit 41e23c2436
2 changed files with 79 additions and 68 deletions

View File

@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler):
self.link_retry_delay, send_task.service)
self._all_senders[key] = sender
if self.reply_link and self.reply_link.active:
sender.attach(self._socket_connection.connection,
sender.attach(self._socket_connection.pyngus_conn,
self.reply_link, self.addresser)
self._active_senders.add(key)
sender.send_message(send_task)
@ -901,7 +901,7 @@ class Controller(pyngus.ConnectionEventHandler):
self._servers[key] = servers
servers[subscribe_task._subscriber_id] = server
if self._active:
server.attach(self._socket_connection.connection,
server.attach(self._socket_connection.pyngus_conn,
self.addresser)
# commands executed on the processor (eventloop) via 'wakeup()':
@ -980,7 +980,7 @@ class Controller(pyngus.ConnectionEventHandler):
self._detach_senders()
self._detach_servers()
self.reply_link.detach()
self._socket_connection.connection.close()
self._socket_connection.pyngus_conn.close()
else:
# don't wait for a close from the remote, may never happen
self.processor.shutdown()
@ -996,7 +996,7 @@ class Controller(pyngus.ConnectionEventHandler):
{'hostname': self.hosts.current.hostname,
'port': self.hosts.current.port})
for sender in itervalues(self._all_senders):
sender.attach(self._socket_connection.connection,
sender.attach(self._socket_connection.pyngus_conn,
self.reply_link, self.addresser)
def _reply_link_down(self):
@ -1005,7 +1005,7 @@ class Controller(pyngus.ConnectionEventHandler):
if not self._closing:
self._detach_senders()
self._detach_servers()
self._socket_connection.connection.close()
self._socket_connection.pyngus_conn.close()
# once closed, _handle_connection_loss() will initiate reconnect
# callback from eventloop on socket error
@ -1022,7 +1022,7 @@ class Controller(pyngus.ConnectionEventHandler):
"""This is a Pyngus callback, invoked by Pyngus when a non-recoverable
error occurs on the connection.
"""
if connection is not self._socket_connection.connection:
if connection is not self._socket_connection.pyngus_conn:
# pyngus bug: ignore failure callback on destroyed connections
return
LOG.debug("AMQP Connection failure: %s", error)
@ -1042,9 +1042,9 @@ class Controller(pyngus.ConnectionEventHandler):
self.addresser = self.addresser_factory(props)
for servers in itervalues(self._servers):
for server in itervalues(servers):
server.attach(self._socket_connection.connection,
server.attach(self._socket_connection.pyngus_conn,
self.addresser)
self.reply_link = Replies(self._socket_connection.connection,
self.reply_link = Replies(self._socket_connection.pyngus_conn,
self._reply_link_ready,
self._reply_link_down,
self._reply_credit)
@ -1075,7 +1075,7 @@ class Controller(pyngus.ConnectionEventHandler):
self._detach_senders()
self._detach_servers()
self.reply_link.detach()
self._socket_connection.connection.close()
self._socket_connection.pyngus_conn.close()
def sasl_done(self, connection, pn_sasl, outcome):
"""This is a Pyngus callback invoked when the SASL handshake
@ -1189,4 +1189,4 @@ class Controller(pyngus.ConnectionEventHandler):
def _active(self):
# Is the connection up
return (self._socket_connection
and self._socket_connection.connection.active)
and self._socket_connection.pyngus_conn.active)

View File

@ -22,6 +22,7 @@ protocol specific intelligence is provided by the Controller and executed on
the background thread via callables.
"""
import collections
import errno
import heapq
import logging
@ -34,7 +35,6 @@ import threading
import uuid
import pyngus
from six import moves
from oslo_messaging._i18n import _LE, _LI, _LW
LOG = logging.getLogger(__name__)
@ -54,44 +54,44 @@ class _SocketConnection(object):
def __init__(self, name, container, properties, handler):
self.name = name
self.socket = None
self.pyngus_conn = None
self._properties = properties
# The handler is a pyngus ConnectionEventHandler, which is invoked by
# pyngus on connection-related events (active, closed, error, etc).
# Currently it is the Controller object.
self._handler = handler
self._container = container
self.connection = None
def fileno(self):
"""Allows use of a _SocketConnection in a select() call.
"""
return self.socket.fileno()
def read(self):
"""Called when socket is read-ready."""
def read_socket(self):
"""Called to read from the socket."""
while True:
try:
rc = pyngus.read_socket_input(self.connection, self.socket)
self.connection.process(now())
rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
self.pyngus_conn.process(now())
return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_input()
self.connection.close_output()
self.pyngus_conn.close_input()
self.pyngus_conn.close_output()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
def write(self):
"""Called when socket is write-ready."""
def write_socket(self):
"""Called to write to the socket."""
while True:
try:
rc = pyngus.write_socket_output(self.connection, self.socket)
self.connection.process(now())
rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
self.pyngus_conn.process(now())
return rc
except (socket.timeout, socket.error) as e:
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
self.connection.close_output()
self.connection.close_input()
self.pyngus_conn.close_output()
self.pyngus_conn.close_input()
self._handler.socket_error(str(e))
return pyngus.Connection.EOS
@ -127,15 +127,16 @@ class _SocketConnection(object):
props['x-username'] = host.username
props['x-password'] = host.password or ""
c = self._container.create_connection(self.name, self._handler, props)
c.user_context = self
self.connection = c
self.pyngus_conn = self._container.create_connection(self.name,
self._handler,
props)
self.pyngus_conn.user_context = self
if pyngus.VERSION < (2, 0, 0):
# older versions of pyngus requires manual SASL configuration:
# determine the proper SASL mechanism: PLAIN if a username/password
# is present, else ANONYMOUS
pn_sasl = self.connection.pn_sasl
pn_sasl = self.pyngus_conn.pn_sasl
if host.username:
password = host.password if host.password else ""
pn_sasl.plain(host.username, password)
@ -143,7 +144,7 @@ class _SocketConnection(object):
pn_sasl.mechanisms("ANONYMOUS")
pn_sasl.client()
self.connection.open()
self.pyngus_conn.open()
def reset(self, name=None):
"""Clean up the current state, expect 'connect()' to be recalled
@ -151,9 +152,9 @@ class _SocketConnection(object):
"""
# note well: since destroy() is called on the connection, do not invoke
# this method from a pyngus callback!
if self.connection:
self.connection.destroy()
self.connection = None
if self.pyngus_conn:
self.pyngus_conn.destroy()
self.pyngus_conn = None
self.close()
if name:
self.name = name
@ -239,30 +240,36 @@ class Requests(object):
loop.
"""
def __init__(self):
self._requests = moves.queue.Queue(maxsize=10)
self._requests = collections.deque()
self._wakeup_pipe = os.pipe()
self._pipe_ready = False # prevents blocking on an empty pipe
self._pipe_lock = threading.Lock()
def wakeup(self, request=None):
"""Enqueue a callable to be executed by the eventloop, and force the
eventloop thread to wake up from select().
"""
if request:
self._requests.put(request)
os.write(self._wakeup_pipe[1], b'!')
with self._pipe_lock:
if request:
self._requests.append(request)
if not self._pipe_ready:
self._pipe_ready = True
os.write(self._wakeup_pipe[1], b'!')
def fileno(self):
"""Allows this request queue to be used by select()."""
return self._wakeup_pipe[0]
def read(self):
def process_requests(self):
"""Invoked by the eventloop thread, execute each queued callable."""
os.read(self._wakeup_pipe[0], 512)
# first pop of all current tasks
requests = []
while not self._requests.empty():
requests.append(self._requests.get())
# then process them, this allows callables to re-register themselves to
# be run on the next iteration of the I/O loop
with self._pipe_lock:
if not self._pipe_ready:
return
self._pipe_ready = False
os.read(self._wakeup_pipe[0], 512)
requests = self._requests
self._requests = collections.deque()
for r in requests:
r()
@ -279,6 +286,8 @@ class Thread(threading.Thread):
# delayed callables (only used on this thread for now):
self._scheduler = Scheduler()
self._connection = None
# Configure a container
if container_name is None:
container_name = ("openstack.org/om/container/%s/%s/%s/%s" %
@ -334,6 +343,7 @@ class Thread(threading.Thread):
sc = _SocketConnection(key, self._container,
properties, handler=handler)
sc.connect(host)
self._connection = sc
return sc
def run(self):
@ -342,18 +352,22 @@ class Thread(threading.Thread):
self._container.name)
while not self._shutdown:
readers, writers, timers = self._container.need_processing()
readfds = [c.user_context for c in readers]
# additionally, always check for readability of pipe we
# are using to wakeup processing thread by other threads
readfds.append(self._requests)
writefds = [c.user_context for c in writers]
readfds = [self._requests]
writefds = []
deadline = self._scheduler._next_deadline
pyngus_conn = self._connection and self._connection.pyngus_conn
if pyngus_conn:
if pyngus_conn.needs_input:
readfds.append(self._connection)
if pyngus_conn.has_output:
writefds.append(self._connection)
if pyngus_conn.deadline:
deadline = (pyngus_conn.deadline if not deadline else
min(deadline, pyngus_conn.deadline))
# force select to return in time to service the next expiring timer
d1 = self._scheduler._next_deadline
d2 = timers[0].deadline if timers else None
deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
if deadline:
_now = now()
timeout = 0 if deadline <= _now else (deadline - _now)
@ -362,7 +376,7 @@ class Thread(threading.Thread):
# and now we wait...
try:
results = select.select(readfds, writefds, [], timeout)
select.select(readfds, writefds, [], timeout)
except select.error as serror:
if serror[0] == errno.EINTR:
LOG.warning(_LW("ignoring interrupt from select(): %s"),
@ -370,20 +384,17 @@ class Thread(threading.Thread):
continue
raise # assuming fatal...
readable, writable, ignore = results
for r in readable:
r.read()
if timers:
_now = now()
for t in timers:
if t.deadline > _now:
break
t.process(_now)
for w in writable:
w.write()
# Ignore the select return value - simply poll the socket for I/O.
# Testing shows that polling improves latency over checking the
# lists returned by select()
self._requests.process_requests()
if pyngus_conn:
self._connection.read_socket()
if pyngus_conn.deadline:
_now = now()
if pyngus_conn.deadline <= _now:
pyngus_conn.process(_now)
self._connection.write_socket()
self._scheduler._process() # run any deferred requests