diff --git a/oslo_messaging/_drivers/amqp1_driver/controller.py b/oslo_messaging/_drivers/amqp1_driver/controller.py index 007ca3a88..4a9361ceb 100644 --- a/oslo_messaging/_drivers/amqp1_driver/controller.py +++ b/oslo_messaging/_drivers/amqp1_driver/controller.py @@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler): self.link_retry_delay, send_task.service) self._all_senders[key] = sender if self.reply_link and self.reply_link.active: - sender.attach(self._socket_connection.connection, + sender.attach(self._socket_connection.pyngus_conn, self.reply_link, self.addresser) self._active_senders.add(key) sender.send_message(send_task) @@ -901,7 +901,7 @@ class Controller(pyngus.ConnectionEventHandler): self._servers[key] = servers servers[subscribe_task._subscriber_id] = server if self._active: - server.attach(self._socket_connection.connection, + server.attach(self._socket_connection.pyngus_conn, self.addresser) # commands executed on the processor (eventloop) via 'wakeup()': @@ -980,7 +980,7 @@ class Controller(pyngus.ConnectionEventHandler): self._detach_senders() self._detach_servers() self.reply_link.detach() - self._socket_connection.connection.close() + self._socket_connection.pyngus_conn.close() else: # don't wait for a close from the remote, may never happen self.processor.shutdown() @@ -996,7 +996,7 @@ class Controller(pyngus.ConnectionEventHandler): {'hostname': self.hosts.current.hostname, 'port': self.hosts.current.port}) for sender in itervalues(self._all_senders): - sender.attach(self._socket_connection.connection, + sender.attach(self._socket_connection.pyngus_conn, self.reply_link, self.addresser) def _reply_link_down(self): @@ -1005,7 +1005,7 @@ class Controller(pyngus.ConnectionEventHandler): if not self._closing: self._detach_senders() self._detach_servers() - self._socket_connection.connection.close() + self._socket_connection.pyngus_conn.close() # once closed, _handle_connection_loss() will initiate reconnect # callback from eventloop on socket error @@ -1022,7 +1022,7 @@ class Controller(pyngus.ConnectionEventHandler): """This is a Pyngus callback, invoked by Pyngus when a non-recoverable error occurs on the connection. """ - if connection is not self._socket_connection.connection: + if connection is not self._socket_connection.pyngus_conn: # pyngus bug: ignore failure callback on destroyed connections return LOG.debug("AMQP Connection failure: %s", error) @@ -1042,9 +1042,9 @@ class Controller(pyngus.ConnectionEventHandler): self.addresser = self.addresser_factory(props) for servers in itervalues(self._servers): for server in itervalues(servers): - server.attach(self._socket_connection.connection, + server.attach(self._socket_connection.pyngus_conn, self.addresser) - self.reply_link = Replies(self._socket_connection.connection, + self.reply_link = Replies(self._socket_connection.pyngus_conn, self._reply_link_ready, self._reply_link_down, self._reply_credit) @@ -1075,7 +1075,7 @@ class Controller(pyngus.ConnectionEventHandler): self._detach_senders() self._detach_servers() self.reply_link.detach() - self._socket_connection.connection.close() + self._socket_connection.pyngus_conn.close() def sasl_done(self, connection, pn_sasl, outcome): """This is a Pyngus callback invoked when the SASL handshake @@ -1189,4 +1189,4 @@ class Controller(pyngus.ConnectionEventHandler): def _active(self): # Is the connection up return (self._socket_connection - and self._socket_connection.connection.active) + and self._socket_connection.pyngus_conn.active) diff --git a/oslo_messaging/_drivers/amqp1_driver/eventloop.py b/oslo_messaging/_drivers/amqp1_driver/eventloop.py index 71dce0827..0f3b5da02 100644 --- a/oslo_messaging/_drivers/amqp1_driver/eventloop.py +++ b/oslo_messaging/_drivers/amqp1_driver/eventloop.py @@ -22,6 +22,7 @@ protocol specific intelligence is provided by the Controller and executed on the background thread via callables. """ +import collections import errno import heapq import logging @@ -34,7 +35,6 @@ import threading import uuid import pyngus -from six import moves from oslo_messaging._i18n import _LE, _LI, _LW LOG = logging.getLogger(__name__) @@ -54,44 +54,44 @@ class _SocketConnection(object): def __init__(self, name, container, properties, handler): self.name = name self.socket = None + self.pyngus_conn = None self._properties = properties # The handler is a pyngus ConnectionEventHandler, which is invoked by # pyngus on connection-related events (active, closed, error, etc). # Currently it is the Controller object. self._handler = handler self._container = container - self.connection = None def fileno(self): """Allows use of a _SocketConnection in a select() call. """ return self.socket.fileno() - def read(self): - """Called when socket is read-ready.""" + def read_socket(self): + """Called to read from the socket.""" while True: try: - rc = pyngus.read_socket_input(self.connection, self.socket) - self.connection.process(now()) + rc = pyngus.read_socket_input(self.pyngus_conn, self.socket) + self.pyngus_conn.process(now()) return rc except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER - self.connection.close_input() - self.connection.close_output() + self.pyngus_conn.close_input() + self.pyngus_conn.close_output() self._handler.socket_error(str(e)) return pyngus.Connection.EOS - def write(self): - """Called when socket is write-ready.""" + def write_socket(self): + """Called to write to the socket.""" while True: try: - rc = pyngus.write_socket_output(self.connection, self.socket) - self.connection.process(now()) + rc = pyngus.write_socket_output(self.pyngus_conn, self.socket) + self.pyngus_conn.process(now()) return rc except (socket.timeout, socket.error) as e: # pyngus handles EAGAIN/EWOULDBLOCK and EINTER - self.connection.close_output() - self.connection.close_input() + self.pyngus_conn.close_output() + self.pyngus_conn.close_input() self._handler.socket_error(str(e)) return pyngus.Connection.EOS @@ -127,15 +127,16 @@ class _SocketConnection(object): props['x-username'] = host.username props['x-password'] = host.password or "" - c = self._container.create_connection(self.name, self._handler, props) - c.user_context = self - self.connection = c + self.pyngus_conn = self._container.create_connection(self.name, + self._handler, + props) + self.pyngus_conn.user_context = self if pyngus.VERSION < (2, 0, 0): # older versions of pyngus requires manual SASL configuration: # determine the proper SASL mechanism: PLAIN if a username/password # is present, else ANONYMOUS - pn_sasl = self.connection.pn_sasl + pn_sasl = self.pyngus_conn.pn_sasl if host.username: password = host.password if host.password else "" pn_sasl.plain(host.username, password) @@ -143,7 +144,7 @@ class _SocketConnection(object): pn_sasl.mechanisms("ANONYMOUS") pn_sasl.client() - self.connection.open() + self.pyngus_conn.open() def reset(self, name=None): """Clean up the current state, expect 'connect()' to be recalled @@ -151,9 +152,9 @@ class _SocketConnection(object): """ # note well: since destroy() is called on the connection, do not invoke # this method from a pyngus callback! - if self.connection: - self.connection.destroy() - self.connection = None + if self.pyngus_conn: + self.pyngus_conn.destroy() + self.pyngus_conn = None self.close() if name: self.name = name @@ -239,30 +240,36 @@ class Requests(object): loop. """ def __init__(self): - self._requests = moves.queue.Queue(maxsize=10) + self._requests = collections.deque() self._wakeup_pipe = os.pipe() + self._pipe_ready = False # prevents blocking on an empty pipe + self._pipe_lock = threading.Lock() def wakeup(self, request=None): """Enqueue a callable to be executed by the eventloop, and force the eventloop thread to wake up from select(). """ - if request: - self._requests.put(request) - os.write(self._wakeup_pipe[1], b'!') + with self._pipe_lock: + if request: + self._requests.append(request) + if not self._pipe_ready: + self._pipe_ready = True + os.write(self._wakeup_pipe[1], b'!') def fileno(self): """Allows this request queue to be used by select().""" return self._wakeup_pipe[0] - def read(self): + def process_requests(self): """Invoked by the eventloop thread, execute each queued callable.""" - os.read(self._wakeup_pipe[0], 512) - # first pop of all current tasks - requests = [] - while not self._requests.empty(): - requests.append(self._requests.get()) - # then process them, this allows callables to re-register themselves to - # be run on the next iteration of the I/O loop + with self._pipe_lock: + if not self._pipe_ready: + return + self._pipe_ready = False + os.read(self._wakeup_pipe[0], 512) + requests = self._requests + self._requests = collections.deque() + for r in requests: r() @@ -279,6 +286,8 @@ class Thread(threading.Thread): # delayed callables (only used on this thread for now): self._scheduler = Scheduler() + self._connection = None + # Configure a container if container_name is None: container_name = ("openstack.org/om/container/%s/%s/%s/%s" % @@ -334,6 +343,7 @@ class Thread(threading.Thread): sc = _SocketConnection(key, self._container, properties, handler=handler) sc.connect(host) + self._connection = sc return sc def run(self): @@ -342,18 +352,22 @@ class Thread(threading.Thread): self._container.name) while not self._shutdown: - readers, writers, timers = self._container.need_processing() - readfds = [c.user_context for c in readers] - # additionally, always check for readability of pipe we - # are using to wakeup processing thread by other threads - readfds.append(self._requests) - writefds = [c.user_context for c in writers] + readfds = [self._requests] + writefds = [] + deadline = self._scheduler._next_deadline + + pyngus_conn = self._connection and self._connection.pyngus_conn + if pyngus_conn: + if pyngus_conn.needs_input: + readfds.append(self._connection) + if pyngus_conn.has_output: + writefds.append(self._connection) + if pyngus_conn.deadline: + deadline = (pyngus_conn.deadline if not deadline else + min(deadline, pyngus_conn.deadline)) # force select to return in time to service the next expiring timer - d1 = self._scheduler._next_deadline - d2 = timers[0].deadline if timers else None - deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2 if deadline: _now = now() timeout = 0 if deadline <= _now else (deadline - _now) @@ -362,7 +376,7 @@ class Thread(threading.Thread): # and now we wait... try: - results = select.select(readfds, writefds, [], timeout) + select.select(readfds, writefds, [], timeout) except select.error as serror: if serror[0] == errno.EINTR: LOG.warning(_LW("ignoring interrupt from select(): %s"), @@ -370,20 +384,17 @@ class Thread(threading.Thread): continue raise # assuming fatal... - readable, writable, ignore = results - - for r in readable: - r.read() - - if timers: - _now = now() - for t in timers: - if t.deadline > _now: - break - t.process(_now) - - for w in writable: - w.write() + # Ignore the select return value - simply poll the socket for I/O. + # Testing shows that polling improves latency over checking the + # lists returned by select() + self._requests.process_requests() + if pyngus_conn: + self._connection.read_socket() + if pyngus_conn.deadline: + _now = now() + if pyngus_conn.deadline <= _now: + pyngus_conn.process(_now) + self._connection.write_socket() self._scheduler._process() # run any deferred requests