Merge "[AMQP 1.0] Simplify the I/O event loop code"
This commit is contained in:
commit
c805618d9a
@ -870,7 +870,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self.link_retry_delay, send_task.service)
|
||||
self._all_senders[key] = sender
|
||||
if self.reply_link and self.reply_link.active:
|
||||
sender.attach(self._socket_connection.connection,
|
||||
sender.attach(self._socket_connection.pyngus_conn,
|
||||
self.reply_link, self.addresser)
|
||||
self._active_senders.add(key)
|
||||
sender.send_message(send_task)
|
||||
@ -901,7 +901,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._servers[key] = servers
|
||||
servers[subscribe_task._subscriber_id] = server
|
||||
if self._active:
|
||||
server.attach(self._socket_connection.connection,
|
||||
server.attach(self._socket_connection.pyngus_conn,
|
||||
self.addresser)
|
||||
|
||||
# commands executed on the processor (eventloop) via 'wakeup()':
|
||||
@ -980,7 +980,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._detach_senders()
|
||||
self._detach_servers()
|
||||
self.reply_link.detach()
|
||||
self._socket_connection.connection.close()
|
||||
self._socket_connection.pyngus_conn.close()
|
||||
else:
|
||||
# don't wait for a close from the remote, may never happen
|
||||
self.processor.shutdown()
|
||||
@ -996,7 +996,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
{'hostname': self.hosts.current.hostname,
|
||||
'port': self.hosts.current.port})
|
||||
for sender in itervalues(self._all_senders):
|
||||
sender.attach(self._socket_connection.connection,
|
||||
sender.attach(self._socket_connection.pyngus_conn,
|
||||
self.reply_link, self.addresser)
|
||||
|
||||
def _reply_link_down(self):
|
||||
@ -1005,7 +1005,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
if not self._closing:
|
||||
self._detach_senders()
|
||||
self._detach_servers()
|
||||
self._socket_connection.connection.close()
|
||||
self._socket_connection.pyngus_conn.close()
|
||||
# once closed, _handle_connection_loss() will initiate reconnect
|
||||
|
||||
# callback from eventloop on socket error
|
||||
@ -1022,7 +1022,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
"""This is a Pyngus callback, invoked by Pyngus when a non-recoverable
|
||||
error occurs on the connection.
|
||||
"""
|
||||
if connection is not self._socket_connection.connection:
|
||||
if connection is not self._socket_connection.pyngus_conn:
|
||||
# pyngus bug: ignore failure callback on destroyed connections
|
||||
return
|
||||
LOG.debug("AMQP Connection failure: %s", error)
|
||||
@ -1042,9 +1042,9 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self.addresser = self.addresser_factory(props)
|
||||
for servers in itervalues(self._servers):
|
||||
for server in itervalues(servers):
|
||||
server.attach(self._socket_connection.connection,
|
||||
server.attach(self._socket_connection.pyngus_conn,
|
||||
self.addresser)
|
||||
self.reply_link = Replies(self._socket_connection.connection,
|
||||
self.reply_link = Replies(self._socket_connection.pyngus_conn,
|
||||
self._reply_link_ready,
|
||||
self._reply_link_down,
|
||||
self._reply_credit)
|
||||
@ -1075,7 +1075,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
self._detach_senders()
|
||||
self._detach_servers()
|
||||
self.reply_link.detach()
|
||||
self._socket_connection.connection.close()
|
||||
self._socket_connection.pyngus_conn.close()
|
||||
|
||||
def sasl_done(self, connection, pn_sasl, outcome):
|
||||
"""This is a Pyngus callback invoked when the SASL handshake
|
||||
@ -1189,4 +1189,4 @@ class Controller(pyngus.ConnectionEventHandler):
|
||||
def _active(self):
|
||||
# Is the connection up
|
||||
return (self._socket_connection
|
||||
and self._socket_connection.connection.active)
|
||||
and self._socket_connection.pyngus_conn.active)
|
||||
|
@ -22,6 +22,7 @@ protocol specific intelligence is provided by the Controller and executed on
|
||||
the background thread via callables.
|
||||
"""
|
||||
|
||||
import collections
|
||||
import errno
|
||||
import heapq
|
||||
import logging
|
||||
@ -34,7 +35,6 @@ import threading
|
||||
import uuid
|
||||
|
||||
import pyngus
|
||||
from six import moves
|
||||
|
||||
from oslo_messaging._i18n import _LE, _LI, _LW
|
||||
LOG = logging.getLogger(__name__)
|
||||
@ -54,44 +54,44 @@ class _SocketConnection(object):
|
||||
def __init__(self, name, container, properties, handler):
|
||||
self.name = name
|
||||
self.socket = None
|
||||
self.pyngus_conn = None
|
||||
self._properties = properties
|
||||
# The handler is a pyngus ConnectionEventHandler, which is invoked by
|
||||
# pyngus on connection-related events (active, closed, error, etc).
|
||||
# Currently it is the Controller object.
|
||||
self._handler = handler
|
||||
self._container = container
|
||||
self.connection = None
|
||||
|
||||
def fileno(self):
|
||||
"""Allows use of a _SocketConnection in a select() call.
|
||||
"""
|
||||
return self.socket.fileno()
|
||||
|
||||
def read(self):
|
||||
"""Called when socket is read-ready."""
|
||||
def read_socket(self):
|
||||
"""Called to read from the socket."""
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.read_socket_input(self.connection, self.socket)
|
||||
self.connection.process(now())
|
||||
rc = pyngus.read_socket_input(self.pyngus_conn, self.socket)
|
||||
self.pyngus_conn.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_input()
|
||||
self.connection.close_output()
|
||||
self.pyngus_conn.close_input()
|
||||
self.pyngus_conn.close_output()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
def write(self):
|
||||
"""Called when socket is write-ready."""
|
||||
def write_socket(self):
|
||||
"""Called to write to the socket."""
|
||||
while True:
|
||||
try:
|
||||
rc = pyngus.write_socket_output(self.connection, self.socket)
|
||||
self.connection.process(now())
|
||||
rc = pyngus.write_socket_output(self.pyngus_conn, self.socket)
|
||||
self.pyngus_conn.process(now())
|
||||
return rc
|
||||
except (socket.timeout, socket.error) as e:
|
||||
# pyngus handles EAGAIN/EWOULDBLOCK and EINTER
|
||||
self.connection.close_output()
|
||||
self.connection.close_input()
|
||||
self.pyngus_conn.close_output()
|
||||
self.pyngus_conn.close_input()
|
||||
self._handler.socket_error(str(e))
|
||||
return pyngus.Connection.EOS
|
||||
|
||||
@ -127,15 +127,16 @@ class _SocketConnection(object):
|
||||
props['x-username'] = host.username
|
||||
props['x-password'] = host.password or ""
|
||||
|
||||
c = self._container.create_connection(self.name, self._handler, props)
|
||||
c.user_context = self
|
||||
self.connection = c
|
||||
self.pyngus_conn = self._container.create_connection(self.name,
|
||||
self._handler,
|
||||
props)
|
||||
self.pyngus_conn.user_context = self
|
||||
|
||||
if pyngus.VERSION < (2, 0, 0):
|
||||
# older versions of pyngus requires manual SASL configuration:
|
||||
# determine the proper SASL mechanism: PLAIN if a username/password
|
||||
# is present, else ANONYMOUS
|
||||
pn_sasl = self.connection.pn_sasl
|
||||
pn_sasl = self.pyngus_conn.pn_sasl
|
||||
if host.username:
|
||||
password = host.password if host.password else ""
|
||||
pn_sasl.plain(host.username, password)
|
||||
@ -143,7 +144,7 @@ class _SocketConnection(object):
|
||||
pn_sasl.mechanisms("ANONYMOUS")
|
||||
pn_sasl.client()
|
||||
|
||||
self.connection.open()
|
||||
self.pyngus_conn.open()
|
||||
|
||||
def reset(self, name=None):
|
||||
"""Clean up the current state, expect 'connect()' to be recalled
|
||||
@ -151,9 +152,9 @@ class _SocketConnection(object):
|
||||
"""
|
||||
# note well: since destroy() is called on the connection, do not invoke
|
||||
# this method from a pyngus callback!
|
||||
if self.connection:
|
||||
self.connection.destroy()
|
||||
self.connection = None
|
||||
if self.pyngus_conn:
|
||||
self.pyngus_conn.destroy()
|
||||
self.pyngus_conn = None
|
||||
self.close()
|
||||
if name:
|
||||
self.name = name
|
||||
@ -239,30 +240,36 @@ class Requests(object):
|
||||
loop.
|
||||
"""
|
||||
def __init__(self):
|
||||
self._requests = moves.queue.Queue(maxsize=10)
|
||||
self._requests = collections.deque()
|
||||
self._wakeup_pipe = os.pipe()
|
||||
self._pipe_ready = False # prevents blocking on an empty pipe
|
||||
self._pipe_lock = threading.Lock()
|
||||
|
||||
def wakeup(self, request=None):
|
||||
"""Enqueue a callable to be executed by the eventloop, and force the
|
||||
eventloop thread to wake up from select().
|
||||
"""
|
||||
if request:
|
||||
self._requests.put(request)
|
||||
os.write(self._wakeup_pipe[1], b'!')
|
||||
with self._pipe_lock:
|
||||
if request:
|
||||
self._requests.append(request)
|
||||
if not self._pipe_ready:
|
||||
self._pipe_ready = True
|
||||
os.write(self._wakeup_pipe[1], b'!')
|
||||
|
||||
def fileno(self):
|
||||
"""Allows this request queue to be used by select()."""
|
||||
return self._wakeup_pipe[0]
|
||||
|
||||
def read(self):
|
||||
def process_requests(self):
|
||||
"""Invoked by the eventloop thread, execute each queued callable."""
|
||||
os.read(self._wakeup_pipe[0], 512)
|
||||
# first pop of all current tasks
|
||||
requests = []
|
||||
while not self._requests.empty():
|
||||
requests.append(self._requests.get())
|
||||
# then process them, this allows callables to re-register themselves to
|
||||
# be run on the next iteration of the I/O loop
|
||||
with self._pipe_lock:
|
||||
if not self._pipe_ready:
|
||||
return
|
||||
self._pipe_ready = False
|
||||
os.read(self._wakeup_pipe[0], 512)
|
||||
requests = self._requests
|
||||
self._requests = collections.deque()
|
||||
|
||||
for r in requests:
|
||||
r()
|
||||
|
||||
@ -279,6 +286,8 @@ class Thread(threading.Thread):
|
||||
# delayed callables (only used on this thread for now):
|
||||
self._scheduler = Scheduler()
|
||||
|
||||
self._connection = None
|
||||
|
||||
# Configure a container
|
||||
if container_name is None:
|
||||
container_name = ("openstack.org/om/container/%s/%s/%s/%s" %
|
||||
@ -334,6 +343,7 @@ class Thread(threading.Thread):
|
||||
sc = _SocketConnection(key, self._container,
|
||||
properties, handler=handler)
|
||||
sc.connect(host)
|
||||
self._connection = sc
|
||||
return sc
|
||||
|
||||
def run(self):
|
||||
@ -342,18 +352,22 @@ class Thread(threading.Thread):
|
||||
self._container.name)
|
||||
|
||||
while not self._shutdown:
|
||||
readers, writers, timers = self._container.need_processing()
|
||||
|
||||
readfds = [c.user_context for c in readers]
|
||||
# additionally, always check for readability of pipe we
|
||||
# are using to wakeup processing thread by other threads
|
||||
readfds.append(self._requests)
|
||||
writefds = [c.user_context for c in writers]
|
||||
readfds = [self._requests]
|
||||
writefds = []
|
||||
deadline = self._scheduler._next_deadline
|
||||
|
||||
pyngus_conn = self._connection and self._connection.pyngus_conn
|
||||
if pyngus_conn:
|
||||
if pyngus_conn.needs_input:
|
||||
readfds.append(self._connection)
|
||||
if pyngus_conn.has_output:
|
||||
writefds.append(self._connection)
|
||||
if pyngus_conn.deadline:
|
||||
deadline = (pyngus_conn.deadline if not deadline else
|
||||
min(deadline, pyngus_conn.deadline))
|
||||
|
||||
# force select to return in time to service the next expiring timer
|
||||
d1 = self._scheduler._next_deadline
|
||||
d2 = timers[0].deadline if timers else None
|
||||
deadline = min(d1, d2) if d1 and d2 else d1 if not d2 else d2
|
||||
if deadline:
|
||||
_now = now()
|
||||
timeout = 0 if deadline <= _now else (deadline - _now)
|
||||
@ -362,7 +376,7 @@ class Thread(threading.Thread):
|
||||
|
||||
# and now we wait...
|
||||
try:
|
||||
results = select.select(readfds, writefds, [], timeout)
|
||||
select.select(readfds, writefds, [], timeout)
|
||||
except select.error as serror:
|
||||
if serror[0] == errno.EINTR:
|
||||
LOG.warning(_LW("ignoring interrupt from select(): %s"),
|
||||
@ -370,20 +384,17 @@ class Thread(threading.Thread):
|
||||
continue
|
||||
raise # assuming fatal...
|
||||
|
||||
readable, writable, ignore = results
|
||||
|
||||
for r in readable:
|
||||
r.read()
|
||||
|
||||
if timers:
|
||||
_now = now()
|
||||
for t in timers:
|
||||
if t.deadline > _now:
|
||||
break
|
||||
t.process(_now)
|
||||
|
||||
for w in writable:
|
||||
w.write()
|
||||
# Ignore the select return value - simply poll the socket for I/O.
|
||||
# Testing shows that polling improves latency over checking the
|
||||
# lists returned by select()
|
||||
self._requests.process_requests()
|
||||
if pyngus_conn:
|
||||
self._connection.read_socket()
|
||||
if pyngus_conn.deadline:
|
||||
_now = now()
|
||||
if pyngus_conn.deadline <= _now:
|
||||
pyngus_conn.process(_now)
|
||||
self._connection.write_socket()
|
||||
|
||||
self._scheduler._process() # run any deferred requests
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user