63de855fef
new - create new connection each times single - use single connection for whole transport read_write - use two connections for whole transport (one for listening and one for sending) Change-Id: I464c83beb498453b6df2237e7b8022d47ca3fa14
543 lines
19 KiB
Python
543 lines
19 KiB
Python
# Copyright 2016 Mirantis, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import collections
|
|
import logging
|
|
import os
|
|
import threading
|
|
|
|
import futurist
|
|
from pika.adapters import select_connection
|
|
from pika import exceptions as pika_exceptions
|
|
from pika import spec as pika_spec
|
|
|
|
from oslo_utils import eventletutils
|
|
|
|
current_thread = eventletutils.fetch_current_thread_functor()
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
class ThreadSafePikaConnection(object):
|
|
def __init__(self, parameters=None,
|
|
_impl_class=select_connection.SelectConnection):
|
|
self.params = parameters
|
|
self._connection_lock = threading.Lock()
|
|
self._evt_closed = threading.Event()
|
|
self._task_queue = collections.deque()
|
|
self._pending_connection_futures = set()
|
|
|
|
create_connection_future = self._register_pending_future()
|
|
|
|
def on_open_error(conn, err):
|
|
create_connection_future.set_exception(
|
|
pika_exceptions.AMQPConnectionError(err)
|
|
)
|
|
|
|
self._impl = _impl_class(
|
|
parameters=parameters,
|
|
on_open_callback=create_connection_future.set_result,
|
|
on_open_error_callback=on_open_error,
|
|
on_close_callback=self._on_connection_close,
|
|
stop_ioloop_on_close=False,
|
|
)
|
|
self._interrupt_pipein, self._interrupt_pipeout = os.pipe()
|
|
self._impl.ioloop.add_handler(self._interrupt_pipein,
|
|
self._impl.ioloop.read_interrupt,
|
|
select_connection.READ)
|
|
|
|
self._thread = threading.Thread(target=self._process_io)
|
|
self._thread.daemon = True
|
|
self._thread_id = None
|
|
self._thread.start()
|
|
|
|
create_connection_future.result()
|
|
|
|
def _check_called_not_from_event_loop(self):
|
|
if current_thread() == self._thread_id:
|
|
raise RuntimeError("This call is not allowed from ioloop thread")
|
|
|
|
def _execute_task(self, func, *args, **kwargs):
|
|
if current_thread() == self._thread_id:
|
|
return func(*args, **kwargs)
|
|
|
|
future = futurist.Future()
|
|
self._task_queue.append((func, args, kwargs, future))
|
|
|
|
if self._evt_closed.is_set():
|
|
self._notify_all_futures_connection_close()
|
|
elif self._interrupt_pipeout is not None:
|
|
os.write(self._interrupt_pipeout, b'X')
|
|
|
|
return future.result()
|
|
|
|
def _register_pending_future(self):
|
|
future = futurist.Future()
|
|
self._pending_connection_futures.add(future)
|
|
|
|
def on_done_callback(fut):
|
|
try:
|
|
self._pending_connection_futures.remove(fut)
|
|
except KeyError:
|
|
pass
|
|
|
|
future.add_done_callback(on_done_callback)
|
|
|
|
if self._evt_closed.is_set():
|
|
self._notify_all_futures_connection_close()
|
|
return future
|
|
|
|
def _notify_all_futures_connection_close(self):
|
|
while self._task_queue:
|
|
try:
|
|
method_res_future = self._task_queue.pop()[3]
|
|
except KeyError:
|
|
break
|
|
else:
|
|
method_res_future.set_exception(
|
|
pika_exceptions.ConnectionClosed()
|
|
)
|
|
|
|
while self._pending_connection_futures:
|
|
try:
|
|
pending_connection_future = (
|
|
self._pending_connection_futures.pop()
|
|
)
|
|
except KeyError:
|
|
break
|
|
else:
|
|
pending_connection_future.set_exception(
|
|
pika_exceptions.ConnectionClosed()
|
|
)
|
|
|
|
def _on_connection_close(self, conn, reply_code, reply_text):
|
|
self._evt_closed.set()
|
|
self._notify_all_futures_connection_close()
|
|
if self._interrupt_pipeout:
|
|
os.close(self._interrupt_pipeout)
|
|
os.close(self._interrupt_pipein)
|
|
|
|
def add_on_close_callback(self, callback):
|
|
return self._execute_task(self._impl.add_on_close_callback, callback)
|
|
|
|
def _do_process_io(self):
|
|
while self._task_queue:
|
|
func, args, kwargs, future = self._task_queue.pop()
|
|
try:
|
|
res = func(*args, **kwargs)
|
|
except BaseException as e:
|
|
LOG.exception(e)
|
|
future.set_exception(e)
|
|
else:
|
|
future.set_result(res)
|
|
|
|
self._impl.ioloop.poll()
|
|
self._impl.ioloop.process_timeouts()
|
|
|
|
def _process_io(self):
|
|
self._thread_id = current_thread()
|
|
while not self._evt_closed.is_set():
|
|
try:
|
|
self._do_process_io()
|
|
except BaseException:
|
|
LOG.exception("Error during processing connection's IO")
|
|
|
|
def close(self, *args, **kwargs):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
res = self._execute_task(self._impl.close, *args, **kwargs)
|
|
|
|
self._evt_closed.wait()
|
|
self._thread.join()
|
|
return res
|
|
|
|
def channel(self, channel_number=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
channel_opened_future = self._register_pending_future()
|
|
|
|
impl_channel = self._execute_task(
|
|
self._impl.channel,
|
|
on_open_callback=channel_opened_future.set_result,
|
|
channel_number=channel_number
|
|
)
|
|
|
|
# Create our proxy channel
|
|
channel = ThreadSafePikaChannel(impl_channel, self)
|
|
|
|
# Link implementation channel with our proxy channel
|
|
impl_channel._set_cookie(channel)
|
|
|
|
channel_opened_future.result()
|
|
return channel
|
|
|
|
def add_timeout(self, timeout, callback):
|
|
return self._execute_task(self._impl.add_timeout, timeout, callback)
|
|
|
|
def remove_timeout(self, timeout_id):
|
|
return self._execute_task(self._impl.remove_timeout, timeout_id)
|
|
|
|
@property
|
|
def is_closed(self):
|
|
return self._impl.is_closed
|
|
|
|
@property
|
|
def is_closing(self):
|
|
return self._impl.is_closing
|
|
|
|
@property
|
|
def is_open(self):
|
|
return self._impl.is_open
|
|
|
|
|
|
class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
|
|
|
def __init__(self, channel_impl, connection):
|
|
self._impl = channel_impl
|
|
self._connection = connection
|
|
|
|
self._delivery_confirmation = False
|
|
|
|
self._message_returned = False
|
|
self._current_future = None
|
|
|
|
self._evt_closed = threading.Event()
|
|
|
|
self.add_on_close_callback(self._on_channel_close)
|
|
|
|
def _execute_task(self, func, *args, **kwargs):
|
|
return self._connection._execute_task(func, *args, **kwargs)
|
|
|
|
def _on_channel_close(self, channel, reply_code, reply_text):
|
|
self._evt_closed.set()
|
|
|
|
if self._current_future:
|
|
self._current_future.set_exception(
|
|
pika_exceptions.ChannelClosed(reply_code, reply_text))
|
|
|
|
def _on_message_confirmation(self, frame):
|
|
self._current_future.set_result(frame)
|
|
|
|
def add_on_close_callback(self, callback):
|
|
self._execute_task(self._impl.add_on_close_callback, callback)
|
|
|
|
def add_on_cancel_callback(self, callback):
|
|
self._execute_task(self._impl.add_on_cancel_callback, callback)
|
|
|
|
def __int__(self):
|
|
return self.channel_number
|
|
|
|
@property
|
|
def channel_number(self):
|
|
return self._impl.channel_number
|
|
|
|
@property
|
|
def is_closed(self):
|
|
return self._impl.is_closed
|
|
|
|
@property
|
|
def is_closing(self):
|
|
return self._impl.is_closing
|
|
|
|
@property
|
|
def is_open(self):
|
|
return self._impl.is_open
|
|
|
|
def close(self, reply_code=0, reply_text="Normal Shutdown"):
|
|
self._impl.close(reply_code=reply_code, reply_text=reply_text)
|
|
self._evt_closed.wait()
|
|
|
|
def _check_called_not_from_event_loop(self):
|
|
self._connection._check_called_not_from_event_loop()
|
|
|
|
def flow(self, active):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(
|
|
self._impl.flow, callback=self._current_future.set_result,
|
|
active=active
|
|
)
|
|
|
|
return self._current_future.result()
|
|
|
|
def basic_consume(self, # pylint: disable=R0913
|
|
consumer_callback,
|
|
queue,
|
|
no_ack=False,
|
|
exclusive=False,
|
|
consumer_tag=None,
|
|
arguments=None):
|
|
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(
|
|
self._impl.add_callback, self._current_future.set_result,
|
|
replies=[pika_spec.Basic.ConsumeOk], one_shot=True
|
|
)
|
|
|
|
self._impl.add_callback(self._current_future.set_result,
|
|
replies=[pika_spec.Basic.ConsumeOk],
|
|
one_shot=True)
|
|
tag = self._execute_task(
|
|
self._impl.basic_consume,
|
|
consumer_callback=consumer_callback,
|
|
queue=queue,
|
|
no_ack=no_ack,
|
|
exclusive=exclusive,
|
|
consumer_tag=consumer_tag,
|
|
arguments=arguments
|
|
)
|
|
|
|
self._current_future.result()
|
|
return tag
|
|
|
|
def basic_cancel(self, consumer_tag):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(
|
|
self._impl.basic_cancel,
|
|
callback=self._current_future.set_result,
|
|
consumer_tag=consumer_tag,
|
|
nowait=False)
|
|
self._current_future.result()
|
|
|
|
def basic_ack(self, delivery_tag=0, multiple=False):
|
|
return self._execute_task(
|
|
self._impl.basic_ack, delivery_tag=delivery_tag, multiple=multiple)
|
|
|
|
def basic_nack(self, delivery_tag=None, multiple=False, requeue=True):
|
|
return self._execute_task(
|
|
self._impl.basic_nack, delivery_tag=delivery_tag,
|
|
multiple=multiple, requeue=requeue
|
|
)
|
|
|
|
def publish(self, exchange, routing_key, body, # pylint: disable=R0913
|
|
properties=None, mandatory=False, immediate=False):
|
|
|
|
if self._delivery_confirmation:
|
|
self._check_called_not_from_event_loop()
|
|
|
|
# In publisher-acknowledgments mode
|
|
self._message_returned = False
|
|
self._current_future = futurist.Future()
|
|
|
|
self._execute_task(self._impl.basic_publish,
|
|
exchange=exchange,
|
|
routing_key=routing_key,
|
|
body=body,
|
|
properties=properties,
|
|
mandatory=mandatory,
|
|
immediate=immediate)
|
|
|
|
conf_method = self._current_future.result().method
|
|
|
|
if isinstance(conf_method, pika_spec.Basic.Nack):
|
|
raise pika_exceptions.NackError((None,))
|
|
else:
|
|
assert isinstance(conf_method, pika_spec.Basic.Ack), (
|
|
conf_method)
|
|
|
|
if self._message_returned:
|
|
raise pika_exceptions.UnroutableError((None,))
|
|
else:
|
|
# In non-publisher-acknowledgments mode
|
|
self._execute_task(self._impl.basic_publish,
|
|
exchange=exchange,
|
|
routing_key=routing_key,
|
|
body=body,
|
|
properties=properties,
|
|
mandatory=mandatory,
|
|
immediate=immediate)
|
|
|
|
def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.basic_qos,
|
|
callback=self._current_future.set_result,
|
|
prefetch_size=prefetch_size,
|
|
prefetch_count=prefetch_count,
|
|
all_channels=all_channels)
|
|
self._current_future.result()
|
|
|
|
def basic_recover(self, requeue=False):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(
|
|
self._impl.basic_recover,
|
|
callback=lambda: self._current_future.set_result(None),
|
|
requeue=requeue
|
|
)
|
|
self._current_future.result()
|
|
|
|
def basic_reject(self, delivery_tag=None, requeue=True):
|
|
self._execute_task(self._impl.basic_reject,
|
|
delivery_tag=delivery_tag,
|
|
requeue=requeue)
|
|
|
|
def _on_message_returned(self, *args, **kwargs):
|
|
self._message_returned = True
|
|
|
|
def confirm_delivery(self):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.add_callback,
|
|
callback=self._current_future.set_result,
|
|
replies=[pika_spec.Confirm.SelectOk],
|
|
one_shot=True)
|
|
self._execute_task(self._impl.confirm_delivery,
|
|
callback=self._on_message_confirmation,
|
|
nowait=False)
|
|
self._current_future.result()
|
|
|
|
self._delivery_confirmation = True
|
|
self._execute_task(self._impl.add_on_return_callback,
|
|
self._on_message_returned)
|
|
|
|
def exchange_declare(self, exchange=None, # pylint: disable=R0913
|
|
exchange_type='direct', passive=False, durable=False,
|
|
auto_delete=False, internal=False,
|
|
arguments=None, **kwargs):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.exchange_declare,
|
|
callback=self._current_future.set_result,
|
|
exchange=exchange,
|
|
exchange_type=exchange_type,
|
|
passive=passive,
|
|
durable=durable,
|
|
auto_delete=auto_delete,
|
|
internal=internal,
|
|
nowait=False,
|
|
arguments=arguments,
|
|
type=kwargs["type"] if kwargs else None)
|
|
|
|
return self._current_future.result()
|
|
|
|
def exchange_delete(self, exchange=None, if_unused=False):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.exchange_delete,
|
|
callback=self._current_future.set_result,
|
|
exchange=exchange,
|
|
if_unused=if_unused,
|
|
nowait=False)
|
|
|
|
return self._current_future.result()
|
|
|
|
def exchange_bind(self, destination=None, source=None, routing_key='',
|
|
arguments=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.exchange_bind,
|
|
callback=self._current_future.set_result,
|
|
destination=destination,
|
|
source=source,
|
|
routing_key=routing_key,
|
|
nowait=False,
|
|
arguments=arguments)
|
|
|
|
return self._current_future.result()
|
|
|
|
def exchange_unbind(self, destination=None, source=None, routing_key='',
|
|
arguments=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.exchange_unbind,
|
|
callback=self._current_future.set_result,
|
|
destination=destination,
|
|
source=source,
|
|
routing_key=routing_key,
|
|
nowait=False,
|
|
arguments=arguments)
|
|
|
|
return self._current_future.result()
|
|
|
|
def queue_declare(self, queue='', passive=False, durable=False,
|
|
exclusive=False, auto_delete=False,
|
|
arguments=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.queue_declare,
|
|
callback=self._current_future.set_result,
|
|
queue=queue,
|
|
passive=passive,
|
|
durable=durable,
|
|
exclusive=exclusive,
|
|
auto_delete=auto_delete,
|
|
nowait=False,
|
|
arguments=arguments)
|
|
|
|
return self._current_future.result()
|
|
|
|
def queue_delete(self, queue='', if_unused=False, if_empty=False):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.queue_delete,
|
|
callback=self._current_future.set_result,
|
|
queue=queue,
|
|
if_unused=if_unused,
|
|
if_empty=if_empty,
|
|
nowait=False)
|
|
|
|
return self._current_future.result()
|
|
|
|
def queue_purge(self, queue=''):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.queue_purge,
|
|
callback=self._current_future.set_result,
|
|
queue=queue,
|
|
nowait=False)
|
|
return self._current_future.result()
|
|
|
|
def queue_bind(self, queue, exchange, routing_key=None,
|
|
arguments=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.queue_bind,
|
|
callback=self._current_future.set_result,
|
|
queue=queue,
|
|
exchange=exchange,
|
|
routing_key=routing_key,
|
|
nowait=False,
|
|
arguments=arguments)
|
|
return self._current_future.result()
|
|
|
|
def queue_unbind(self, queue='', exchange=None, routing_key=None,
|
|
arguments=None):
|
|
self._check_called_not_from_event_loop()
|
|
|
|
self._current_future = futurist.Future()
|
|
self._execute_task(self._impl.queue_unbind,
|
|
callback=self._current_future.set_result,
|
|
queue=queue,
|
|
exchange=exchange,
|
|
routing_key=routing_key,
|
|
arguments=arguments)
|
|
return self._current_future.result()
|