diff --git a/oslo_messaging/_drivers/amqpdriver.py b/oslo_messaging/_drivers/amqpdriver.py index c222bce64..9e967c4da 100644 --- a/oslo_messaging/_drivers/amqpdriver.py +++ b/oslo_messaging/_drivers/amqpdriver.py @@ -147,12 +147,6 @@ class ReplyWaiters(object): 'Timed out waiting for a reply ' 'to message ID %s' % msg_id) - def check(self, msg_id): - try: - return self._queues[msg_id].get(block=False) - except moves.queue.Empty: - return None - def put(self, msg_id, message_data): queue = self._queues.get(msg_id) if not queue: @@ -162,13 +156,8 @@ class ReplyWaiters(object): else: queue.put(message_data) - def wake_all(self, except_id): - msg_ids = [i for i in self._queues.keys() if i != except_id] - for msg_id in msg_ids: - self.put(msg_id, self.WAKE_UP) - - def add(self, msg_id, queue): - self._queues[msg_id] = queue + def add(self, msg_id): + self._queues[msg_id] = moves.queue.Queue() if len(self._queues) > self._wrn_threshold: LOG.warn('Number of call queues is greater than warning ' 'threshold: %d. There could be a leak. Increasing' @@ -181,27 +170,41 @@ class ReplyWaiters(object): class ReplyWaiter(object): - - def __init__(self, conf, reply_q, conn, allowed_remote_exmods): - self.conf = conf + def __init__(self, reply_q, conn, allowed_remote_exmods): self.conn = conn - self.reply_q = reply_q self.allowed_remote_exmods = allowed_remote_exmods - - self.conn_lock = threading.Lock() - self.incoming = [] self.msg_id_cache = rpc_amqp._MsgIdCache() self.waiters = ReplyWaiters() - conn.declare_direct_consumer(reply_q, self) + self.conn.declare_direct_consumer(reply_q, self) + + self._thread_exit_event = threading.Event() + self._thread = threading.Thread(target=self.poll) + self._thread.daemon = True + self._thread.start() + + def stop(self): + if self._thread: + self._thread_exit_event.set() + self.conn.stop_consuming() + self._thread.join() + self._thread = None + + def poll(self): + while not self._thread_exit_event.is_set(): + try: + self.conn.consume(limit=1) + except Exception: + LOG.exception("Failed to process incoming message, " + "retrying...") def __call__(self, message): message.acknowledge() - self.incoming.append(message) + incoming_msg_id = message.pop('_msg_id', None) + self.waiters.put(incoming_msg_id, message) def listen(self, msg_id): - queue = moves.queue.Queue() - self.waiters.add(msg_id, queue) + self.waiters.add(msg_id) def unlisten(self, msg_id): self.waiters.remove(msg_id) @@ -225,96 +228,25 @@ class ReplyWaiter(object): result = data['result'] return result, ending - def _poll_connection(self, msg_id, timer): - while True: - while self.incoming: - message_data = self.incoming.pop(0) - - incoming_msg_id = message_data.pop('_msg_id', None) - if incoming_msg_id == msg_id: - return self._process_reply(message_data) - - self.waiters.put(incoming_msg_id, message_data) - - timeout = timer.check_return(self._raise_timeout_exception, msg_id) - try: - self.conn.consume(limit=1, timeout=timeout) - except rpc_common.Timeout: - self._raise_timeout_exception(msg_id) - - def _poll_queue(self, msg_id, timer): - timeout = timer.check_return(self._raise_timeout_exception, msg_id) - message = self.waiters.get(msg_id, timeout=timeout) - if message is self.waiters.WAKE_UP: - return None, None, True # lock was released - - reply, ending = self._process_reply(message) - return reply, ending, False - - def _check_queue(self, msg_id): - while True: - message = self.waiters.check(msg_id) - if message is self.waiters.WAKE_UP: - continue - if message is None: - return None, None, True # queue is empty - - reply, ending = self._process_reply(message) - return reply, ending, False - def wait(self, msg_id, timeout): - # - # NOTE(markmc): we're waiting for a reply for msg_id to come in for on - # the reply_q, but there may be other threads also waiting for replies - # to other msg_ids - # - # Only one thread can be consuming from the queue using this connection - # and we don't want to hold open a connection per thread, so instead we - # have the first thread take responsibility for passing replies not - # intended for itself to the appropriate thread. - # + # NOTE(sileht): for each msg_id we receive two amqp message + # first one with the payload, a second one to ensure the other + # have finish to send the payload timer = rpc_common.DecayingTimer(duration=timeout) timer.start() final_reply = None - while True: - if self.conn_lock.acquire(False): - # Ok, we're the thread responsible for polling the connection - try: - # Check the queue to see if a previous lock-holding thread - # queued up a reply already - while True: - reply, ending, empty = self._check_queue(msg_id) - if empty: - break - if not ending: - final_reply = reply - else: - return final_reply + ending = False + while not ending: + timeout = timer.check_return(self._raise_timeout_exception, msg_id) + try: + message = self.waiters.get(msg_id, timeout=timeout) + except moves.queue.Empty: + self._raise_timeout_exception(msg_id) - # Now actually poll the connection - while True: - reply, ending = self._poll_connection(msg_id, timer) - if not ending: - final_reply = reply - else: - return final_reply - finally: - self.conn_lock.release() - # We've got our reply, tell the other threads to wake up - # so that one of them will take over the responsibility for - # polling the connection - self.waiters.wake_all(msg_id) - else: - # We're going to wait for the first thread to pass us our reply - reply, ending, trylock = self._poll_queue(msg_id, timer) - if trylock: - # The first thread got its reply, let's try and take over - # the responsibility for polling - continue - if not ending: - final_reply = reply - else: - return final_reply + reply, ending = self._process_reply(message) + if not ending: + final_reply = reply + return final_reply class AMQPDriverBase(base.BaseDriver): @@ -349,7 +281,7 @@ class AMQPDriverBase(base.BaseDriver): conn = self._get_connection(pooled=False) - self._waiter = ReplyWaiter(self.conf, reply_q, conn, + self._waiter = ReplyWaiter(reply_q, conn, self._allowed_remote_exmods) self._reply_q = reply_q @@ -451,3 +383,11 @@ class AMQPDriverBase(base.BaseDriver): if self._connection_pool: self._connection_pool.empty() self._connection_pool = None + + with self._reply_q_lock: + if self._reply_q is not None: + self._waiter.stop() + self._reply_q_conn.close() + self._reply_q_conn = None + self._reply_q = None + self._waiter = None