Implements configurable connection factory
new - create new connection each times single - use single connection for whole transport read_write - use two connections for whole transport (one for listening and one for sending) Change-Id: I464c83beb498453b6df2237e7b8022d47ca3fa14
This commit is contained in:
parent
6037b2b544
commit
63de855fef
@ -19,6 +19,8 @@ import pika_pool
|
||||
import retrying
|
||||
|
||||
from oslo_messaging._drivers import base
|
||||
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
|
||||
pika_drv_conn_factory)
|
||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||
from oslo_messaging._drivers.pika_driver import pika_engine as pika_drv_engine
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
@ -29,27 +31,6 @@ from oslo_messaging import exceptions
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
pika_opts = [
|
||||
cfg.IntOpt('channel_max', default=None,
|
||||
help='Maximum number of channels to allow'),
|
||||
cfg.IntOpt('frame_max', default=None,
|
||||
help='The maximum byte size for an AMQP frame'),
|
||||
cfg.IntOpt('heartbeat_interval', default=3,
|
||||
help="How often to send heartbeats for consumer's connections"),
|
||||
cfg.BoolOpt('ssl', default=None,
|
||||
help='Enable SSL'),
|
||||
cfg.DictOpt('ssl_options', default=None,
|
||||
help='Arguments passed to ssl.wrap_socket'),
|
||||
cfg.FloatOpt('socket_timeout', default=0.25,
|
||||
help="Set socket timeout in seconds for connection's socket"),
|
||||
cfg.FloatOpt('tcp_user_timeout', default=0.25,
|
||||
help="Set TCP_USER_TIMEOUT in seconds for connection's "
|
||||
"socket"),
|
||||
cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
|
||||
help="Set delay for reconnection to some host which has "
|
||||
"connection error")
|
||||
]
|
||||
|
||||
pika_pool_opts = [
|
||||
cfg.IntOpt('pool_max_size', default=30,
|
||||
help="Maximum number of connections to keep queued."),
|
||||
@ -141,7 +122,7 @@ class PikaDriver(base.BaseDriver):
|
||||
opt_group = cfg.OptGroup(name='oslo_messaging_pika',
|
||||
title='Pika driver options')
|
||||
conf.register_group(opt_group)
|
||||
conf.register_opts(pika_opts, group=opt_group)
|
||||
conf.register_opts(pika_drv_conn_factory.pika_opts, group=opt_group)
|
||||
conf.register_opts(pika_pool_opts, group=opt_group)
|
||||
conf.register_opts(rpc_opts, group=opt_group)
|
||||
conf.register_opts(notification_opts, group=opt_group)
|
||||
@ -350,3 +331,4 @@ class PikaDriver(base.BaseDriver):
|
||||
|
||||
def cleanup(self):
|
||||
self._reply_listener.cleanup()
|
||||
self._pika_engine.cleanup()
|
||||
|
@ -18,7 +18,6 @@ import os
|
||||
import threading
|
||||
|
||||
import futurist
|
||||
import pika
|
||||
from pika.adapters import select_connection
|
||||
from pika import exceptions as pika_exceptions
|
||||
from pika import spec as pika_spec
|
||||
@ -31,8 +30,9 @@ LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class ThreadSafePikaConnection(object):
|
||||
def __init__(self, params=None):
|
||||
self.params = params
|
||||
def __init__(self, parameters=None,
|
||||
_impl_class=select_connection.SelectConnection):
|
||||
self.params = parameters
|
||||
self._connection_lock = threading.Lock()
|
||||
self._evt_closed = threading.Event()
|
||||
self._task_queue = collections.deque()
|
||||
@ -45,8 +45,8 @@ class ThreadSafePikaConnection(object):
|
||||
pika_exceptions.AMQPConnectionError(err)
|
||||
)
|
||||
|
||||
self._impl = pika.SelectConnection(
|
||||
parameters=params,
|
||||
self._impl = _impl_class(
|
||||
parameters=parameters,
|
||||
on_open_callback=create_connection_future.set_result,
|
||||
on_open_error_callback=on_open_error,
|
||||
on_close_callback=self._on_connection_close,
|
||||
@ -64,6 +64,10 @@ class ThreadSafePikaConnection(object):
|
||||
|
||||
create_connection_future.result()
|
||||
|
||||
def _check_called_not_from_event_loop(self):
|
||||
if current_thread() == self._thread_id:
|
||||
raise RuntimeError("This call is not allowed from ioloop thread")
|
||||
|
||||
def _execute_task(self, func, *args, **kwargs):
|
||||
if current_thread() == self._thread_id:
|
||||
return func(*args, **kwargs)
|
||||
@ -150,6 +154,8 @@ class ThreadSafePikaConnection(object):
|
||||
LOG.exception("Error during processing connection's IO")
|
||||
|
||||
def close(self, *args, **kwargs):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
res = self._execute_task(self._impl.close, *args, **kwargs)
|
||||
|
||||
self._evt_closed.wait()
|
||||
@ -157,6 +163,8 @@ class ThreadSafePikaConnection(object):
|
||||
return res
|
||||
|
||||
def channel(self, channel_number=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
channel_opened_future = self._register_pending_future()
|
||||
|
||||
impl_channel = self._execute_task(
|
||||
@ -250,12 +258,18 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
self._impl.close(reply_code=reply_code, reply_text=reply_text)
|
||||
self._evt_closed.wait()
|
||||
|
||||
def _check_called_not_from_event_loop(self):
|
||||
self._connection._check_called_not_from_event_loop()
|
||||
|
||||
def flow(self, active):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(
|
||||
self._impl.flow, callback=self._current_future.set_result,
|
||||
active=active
|
||||
)
|
||||
|
||||
return self._current_future.result()
|
||||
|
||||
def basic_consume(self, # pylint: disable=R0913
|
||||
@ -265,6 +279,9 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
exclusive=False,
|
||||
consumer_tag=None,
|
||||
arguments=None):
|
||||
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(
|
||||
self._impl.add_callback, self._current_future.set_result,
|
||||
@ -288,6 +305,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
return tag
|
||||
|
||||
def basic_cancel(self, consumer_tag):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(
|
||||
self._impl.basic_cancel,
|
||||
@ -310,6 +329,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
properties=None, mandatory=False, immediate=False):
|
||||
|
||||
if self._delivery_confirmation:
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
# In publisher-acknowledgments mode
|
||||
self._message_returned = False
|
||||
self._current_future = futurist.Future()
|
||||
@ -343,6 +364,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
immediate=immediate)
|
||||
|
||||
def basic_qos(self, prefetch_size=0, prefetch_count=0, all_channels=False):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.basic_qos,
|
||||
callback=self._current_future.set_result,
|
||||
@ -352,6 +375,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
self._current_future.result()
|
||||
|
||||
def basic_recover(self, requeue=False):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(
|
||||
self._impl.basic_recover,
|
||||
@ -369,6 +394,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
self._message_returned = True
|
||||
|
||||
def confirm_delivery(self):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.add_callback,
|
||||
callback=self._current_future.set_result,
|
||||
@ -387,6 +414,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
exchange_type='direct', passive=False, durable=False,
|
||||
auto_delete=False, internal=False,
|
||||
arguments=None, **kwargs):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.exchange_declare,
|
||||
callback=self._current_future.set_result,
|
||||
@ -403,6 +432,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
return self._current_future.result()
|
||||
|
||||
def exchange_delete(self, exchange=None, if_unused=False):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.exchange_delete,
|
||||
callback=self._current_future.set_result,
|
||||
@ -414,6 +445,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
|
||||
def exchange_bind(self, destination=None, source=None, routing_key='',
|
||||
arguments=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.exchange_bind,
|
||||
callback=self._current_future.set_result,
|
||||
@ -427,6 +460,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
|
||||
def exchange_unbind(self, destination=None, source=None, routing_key='',
|
||||
arguments=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.exchange_unbind,
|
||||
callback=self._current_future.set_result,
|
||||
@ -441,6 +476,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
def queue_declare(self, queue='', passive=False, durable=False,
|
||||
exclusive=False, auto_delete=False,
|
||||
arguments=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.queue_declare,
|
||||
callback=self._current_future.set_result,
|
||||
@ -455,6 +492,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
return self._current_future.result()
|
||||
|
||||
def queue_delete(self, queue='', if_unused=False, if_empty=False):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.queue_delete,
|
||||
callback=self._current_future.set_result,
|
||||
@ -466,6 +505,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
return self._current_future.result()
|
||||
|
||||
def queue_purge(self, queue=''):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.queue_purge,
|
||||
callback=self._current_future.set_result,
|
||||
@ -475,6 +516,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
|
||||
def queue_bind(self, queue, exchange, routing_key=None,
|
||||
arguments=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.queue_bind,
|
||||
callback=self._current_future.set_result,
|
||||
@ -487,6 +530,8 @@ class ThreadSafePikaChannel(object): # pylint: disable=R0904,R0902
|
||||
|
||||
def queue_unbind(self, queue='', exchange=None, routing_key=None,
|
||||
arguments=None):
|
||||
self._check_called_not_from_event_loop()
|
||||
|
||||
self._current_future = futurist.Future()
|
||||
self._execute_task(self._impl.queue_unbind,
|
||||
callback=self._current_future.set_result,
|
||||
|
307
oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
Normal file
307
oslo_messaging/_drivers/pika_driver/pika_connection_factory.py
Normal file
@ -0,0 +1,307 @@
|
||||
# Copyright 2016 Mirantis, Inc.
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
import random
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_config import cfg
|
||||
import pika
|
||||
from pika import credentials as pika_credentials
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||
from oslo_messaging._drivers.pika_driver import pika_connection
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
# constant for setting tcp_user_timeout socket option
|
||||
# (it should be defined in 'select' module of standard library in future)
|
||||
TCP_USER_TIMEOUT = 18
|
||||
|
||||
# constants for creating connection statistics
|
||||
HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
|
||||
|
||||
pika_opts = [
|
||||
cfg.IntOpt('channel_max', default=None,
|
||||
help='Maximum number of channels to allow'),
|
||||
cfg.IntOpt('frame_max', default=None,
|
||||
help='The maximum byte size for an AMQP frame'),
|
||||
cfg.IntOpt('heartbeat_interval', default=3,
|
||||
help="How often to send heartbeats for consumer's connections"),
|
||||
cfg.BoolOpt('ssl', default=None,
|
||||
help='Enable SSL'),
|
||||
cfg.DictOpt('ssl_options', default=None,
|
||||
help='Arguments passed to ssl.wrap_socket'),
|
||||
cfg.FloatOpt('socket_timeout', default=0.25,
|
||||
help="Set socket timeout in seconds for connection's socket"),
|
||||
cfg.FloatOpt('tcp_user_timeout', default=0.25,
|
||||
help="Set TCP_USER_TIMEOUT in seconds for connection's "
|
||||
"socket"),
|
||||
cfg.FloatOpt('host_connection_reconnect_delay', default=0.25,
|
||||
help="Set delay for reconnection to some host which has "
|
||||
"connection error"),
|
||||
cfg.StrOpt('connection_factory', default="single",
|
||||
choices=["new", "single", "read_write"],
|
||||
help='Connection factory implementation')
|
||||
]
|
||||
|
||||
|
||||
class PikaConnectionFactory(object):
|
||||
|
||||
def __init__(self, url, conf):
|
||||
self._url = url
|
||||
self._conf = conf
|
||||
|
||||
self._connection_lock = threading.RLock()
|
||||
|
||||
if not url.hosts:
|
||||
raise ValueError("You should provide at least one RabbitMQ host")
|
||||
|
||||
# initializing connection parameters for configured RabbitMQ hosts
|
||||
self._common_pika_params = {
|
||||
'virtual_host': url.virtual_host,
|
||||
'channel_max': conf.oslo_messaging_pika.channel_max,
|
||||
'frame_max': conf.oslo_messaging_pika.frame_max,
|
||||
'ssl': conf.oslo_messaging_pika.ssl,
|
||||
'ssl_options': conf.oslo_messaging_pika.ssl_options,
|
||||
'socket_timeout': conf.oslo_messaging_pika.socket_timeout
|
||||
}
|
||||
|
||||
self._host_list = url.hosts
|
||||
self._heartbeat_interval = conf.oslo_messaging_pika.heartbeat_interval
|
||||
self._host_connection_reconnect_delay = (
|
||||
conf.oslo_messaging_pika.host_connection_reconnect_delay
|
||||
)
|
||||
self._tcp_user_timeout = conf.oslo_messaging_pika.tcp_user_timeout
|
||||
|
||||
self._connection_host_status = {}
|
||||
|
||||
self._cur_connection_host_num = random.randint(
|
||||
0, len(url.hosts) - 1
|
||||
)
|
||||
|
||||
def cleanup(self):
|
||||
pass
|
||||
|
||||
def create_connection(self, for_listening=False):
|
||||
"""Create and return connection to any available host.
|
||||
|
||||
:return: created connection
|
||||
:raise: ConnectionException if all hosts are not reachable
|
||||
"""
|
||||
|
||||
with self._connection_lock:
|
||||
|
||||
host_count = len(self._host_list)
|
||||
connection_attempts = host_count
|
||||
|
||||
while connection_attempts > 0:
|
||||
self._cur_connection_host_num += 1
|
||||
self._cur_connection_host_num %= host_count
|
||||
try:
|
||||
return self._create_host_connection(
|
||||
self._cur_connection_host_num, for_listening
|
||||
)
|
||||
except pika_drv_cmns.PIKA_CONNECTIVITY_ERRORS as e:
|
||||
LOG.warning("Can't establish connection to host. %s", e)
|
||||
except pika_drv_exc.HostConnectionNotAllowedException as e:
|
||||
LOG.warning("Connection to host is not allowed. %s", e)
|
||||
|
||||
connection_attempts -= 1
|
||||
|
||||
raise pika_drv_exc.EstablishConnectionException(
|
||||
"Can not establish connection to any configured RabbitMQ "
|
||||
"host: " + str(self._host_list)
|
||||
)
|
||||
|
||||
def _set_tcp_user_timeout(self, s):
|
||||
if not self._tcp_user_timeout:
|
||||
return
|
||||
try:
|
||||
s.setsockopt(
|
||||
socket.IPPROTO_TCP, TCP_USER_TIMEOUT,
|
||||
int(self._tcp_user_timeout * 1000)
|
||||
)
|
||||
except socket.error:
|
||||
LOG.warning(
|
||||
"Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
|
||||
)
|
||||
|
||||
def _create_host_connection(self, host_index, for_listening):
|
||||
"""Create new connection to host #host_index
|
||||
|
||||
:param host_index: Integer, number of host for connection establishing
|
||||
:param for_listening: Boolean, creates connection for listening
|
||||
if True
|
||||
:return: New connection
|
||||
"""
|
||||
host = self._host_list[host_index]
|
||||
|
||||
cur_time = time.time()
|
||||
|
||||
host_connection_status = self._connection_host_status.get(host)
|
||||
|
||||
if host_connection_status is None:
|
||||
host_connection_status = {
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0,
|
||||
HOST_CONNECTION_LAST_TRY_TIME: 0
|
||||
}
|
||||
self._connection_host_status[host] = host_connection_status
|
||||
|
||||
last_success_time = host_connection_status[
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
]
|
||||
last_time = host_connection_status[
|
||||
HOST_CONNECTION_LAST_TRY_TIME
|
||||
]
|
||||
|
||||
# raise HostConnectionNotAllowedException if we tried to establish
|
||||
# connection in last 'host_connection_reconnect_delay' and got
|
||||
# failure
|
||||
if (last_time != last_success_time and
|
||||
cur_time - last_time <
|
||||
self._host_connection_reconnect_delay):
|
||||
raise pika_drv_exc.HostConnectionNotAllowedException(
|
||||
"Connection to host #{} is not allowed now because of "
|
||||
"previous failure".format(host_index)
|
||||
)
|
||||
|
||||
try:
|
||||
connection = self._do_create_host_connection(
|
||||
host, for_listening
|
||||
)
|
||||
self._connection_host_status[host][
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
] = cur_time
|
||||
|
||||
return connection
|
||||
finally:
|
||||
self._connection_host_status[host][
|
||||
HOST_CONNECTION_LAST_TRY_TIME
|
||||
] = cur_time
|
||||
|
||||
def _do_create_host_connection(self, host, for_listening):
|
||||
connection_params = pika.ConnectionParameters(
|
||||
host=host.hostname,
|
||||
port=host.port,
|
||||
credentials=pika_credentials.PlainCredentials(
|
||||
host.username, host.password
|
||||
),
|
||||
heartbeat_interval=(
|
||||
self._heartbeat_interval if for_listening else None
|
||||
),
|
||||
**self._common_pika_params
|
||||
)
|
||||
if for_listening:
|
||||
connection = pika_connection.ThreadSafePikaConnection(
|
||||
parameters=connection_params
|
||||
)
|
||||
else:
|
||||
connection = pika.BlockingConnection(
|
||||
parameters=connection_params
|
||||
)
|
||||
connection.params = connection_params
|
||||
|
||||
self._set_tcp_user_timeout(connection._impl.socket)
|
||||
return connection
|
||||
|
||||
|
||||
class NotClosableConnection(object):
|
||||
def __init__(self, connection):
|
||||
self._connection = connection
|
||||
|
||||
def __getattr__(self, item):
|
||||
return getattr(self._connection, item)
|
||||
|
||||
def close(self):
|
||||
pass
|
||||
|
||||
|
||||
class SinglePikaConnectionFactory(PikaConnectionFactory):
|
||||
def __init__(self, url, conf):
|
||||
super(SinglePikaConnectionFactory, self).__init__(url, conf)
|
||||
self._connection = None
|
||||
|
||||
def create_connection(self, for_listening=False):
|
||||
with self._connection_lock:
|
||||
if self._connection is None or not self._connection.is_open:
|
||||
self._connection = (
|
||||
super(SinglePikaConnectionFactory, self).create_connection(
|
||||
True
|
||||
)
|
||||
)
|
||||
return NotClosableConnection(self._connection)
|
||||
|
||||
def cleanup(self):
|
||||
with self._connection_lock:
|
||||
if self._connection is not None and self._connection.is_open:
|
||||
try:
|
||||
self._connection.close()
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Unexpected exception during connection closing",
|
||||
exc_info=True
|
||||
)
|
||||
self._connection = None
|
||||
|
||||
|
||||
class ReadWritePikaConnectionFactory(PikaConnectionFactory):
|
||||
def __init__(self, url, conf):
|
||||
super(ReadWritePikaConnectionFactory, self).__init__(url, conf)
|
||||
self._read_connection = None
|
||||
self._write_connection = None
|
||||
|
||||
def create_connection(self, for_listening=False):
|
||||
with self._connection_lock:
|
||||
if for_listening:
|
||||
if (self._read_connection is None or
|
||||
not self._read_connection.is_open):
|
||||
self._read_connection = super(
|
||||
ReadWritePikaConnectionFactory, self
|
||||
).create_connection(True)
|
||||
return NotClosableConnection(self._read_connection)
|
||||
else:
|
||||
if (self._write_connection is None or
|
||||
not self._write_connection.is_open):
|
||||
self._write_connection = super(
|
||||
ReadWritePikaConnectionFactory, self
|
||||
).create_connection(True)
|
||||
return NotClosableConnection(self._write_connection)
|
||||
|
||||
def cleanup(self):
|
||||
with self._connection_lock:
|
||||
if (self._read_connection is not None and
|
||||
self._read_connection.is_open):
|
||||
try:
|
||||
self._read_connection.close()
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Unexpected exception during connection closing",
|
||||
exc_info=True
|
||||
)
|
||||
self._read_connection = None
|
||||
|
||||
if (self._write_connection is not None and
|
||||
self._write_connection.is_open):
|
||||
try:
|
||||
self._write_connection.close()
|
||||
except Exception:
|
||||
LOG.warning(
|
||||
"Unexpected exception during connection closing",
|
||||
exc_info=True
|
||||
)
|
||||
self._write_connection = None
|
@ -11,28 +11,20 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
import logging
|
||||
import os
|
||||
import random
|
||||
import socket
|
||||
import threading
|
||||
import time
|
||||
|
||||
from oslo_log import log as logging
|
||||
from oslo_utils import eventletutils
|
||||
import pika
|
||||
from pika import credentials as pika_credentials
|
||||
|
||||
import pika_pool
|
||||
import uuid
|
||||
|
||||
from oslo_utils import eventletutils
|
||||
import pika_pool
|
||||
from stevedore import driver
|
||||
|
||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||
from oslo_messaging._drivers.pika_driver import pika_connection
|
||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
_PID = None
|
||||
|
||||
|
||||
class _PooledConnectionWithConfirmations(pika_pool.Connection):
|
||||
"""Derived from 'pika_pool.Connection' and extends its logic - adds
|
||||
@ -53,17 +45,24 @@ class PikaEngine(object):
|
||||
etc.
|
||||
"""
|
||||
|
||||
# constants for creating connection statistics
|
||||
HOST_CONNECTION_LAST_TRY_TIME = "last_try_time"
|
||||
HOST_CONNECTION_LAST_SUCCESS_TRY_TIME = "last_success_try_time"
|
||||
|
||||
# constant for setting tcp_user_timeout socket option
|
||||
# (it should be defined in 'select' module of standard library in future)
|
||||
TCP_USER_TIMEOUT = 18
|
||||
|
||||
def __init__(self, conf, url, default_exchange=None,
|
||||
allowed_remote_exmods=None):
|
||||
self.conf = conf
|
||||
self.url = url
|
||||
|
||||
self._connection_factory_type = (
|
||||
self.conf.oslo_messaging_pika.connection_factory
|
||||
)
|
||||
|
||||
self._connection_factory = None
|
||||
self._connection_without_confirmation_pool = None
|
||||
self._connection_with_confirmation_pool = None
|
||||
self._pid = None
|
||||
self._init_lock = threading.Lock()
|
||||
|
||||
self.host_connection_reconnect_delay = (
|
||||
conf.oslo_messaging_pika.host_connection_reconnect_delay
|
||||
)
|
||||
|
||||
# processing rpc options
|
||||
self.default_rpc_exchange = (
|
||||
@ -136,201 +135,78 @@ class PikaEngine(object):
|
||||
raise ValueError("notification_retry_delay should be non-negative "
|
||||
"integer")
|
||||
|
||||
self._tcp_user_timeout = self.conf.oslo_messaging_pika.tcp_user_timeout
|
||||
self.host_connection_reconnect_delay = (
|
||||
self.conf.oslo_messaging_pika.host_connection_reconnect_delay
|
||||
)
|
||||
self._heartbeat_interval = (
|
||||
self.conf.oslo_messaging_pika.heartbeat_interval
|
||||
)
|
||||
|
||||
# initializing connection parameters for configured RabbitMQ hosts
|
||||
self._common_pika_params = {
|
||||
'virtual_host': url.virtual_host,
|
||||
'channel_max': self.conf.oslo_messaging_pika.channel_max,
|
||||
'frame_max': self.conf.oslo_messaging_pika.frame_max,
|
||||
'ssl': self.conf.oslo_messaging_pika.ssl,
|
||||
'ssl_options': self.conf.oslo_messaging_pika.ssl_options,
|
||||
'socket_timeout': self.conf.oslo_messaging_pika.socket_timeout,
|
||||
}
|
||||
|
||||
self._connection_lock = threading.RLock()
|
||||
self._pid = None
|
||||
|
||||
self._connection_host_status = {}
|
||||
|
||||
if not url.hosts:
|
||||
raise ValueError("You should provide at least one RabbitMQ host")
|
||||
|
||||
self._host_list = url.hosts
|
||||
|
||||
self._cur_connection_host_num = random.randint(
|
||||
0, len(self._host_list) - 1
|
||||
)
|
||||
|
||||
# initializing 2 connection pools: 1st for connections without
|
||||
# confirmations, 2nd - with confirmations
|
||||
self.connection_without_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
self.connection_with_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
self.connection_with_confirmation_pool.Connection = (
|
||||
_PooledConnectionWithConfirmations
|
||||
)
|
||||
|
||||
def create_connection(self, for_listening=False):
|
||||
"""Create and return connection to any available host.
|
||||
|
||||
:return: created connection
|
||||
:raise: ConnectionException if all hosts are not reachable
|
||||
"""
|
||||
|
||||
with self._connection_lock:
|
||||
self._init_if_needed()
|
||||
|
||||
host_count = len(self._host_list)
|
||||
connection_attempts = host_count
|
||||
|
||||
while connection_attempts > 0:
|
||||
self._cur_connection_host_num += 1
|
||||
self._cur_connection_host_num %= host_count
|
||||
try:
|
||||
return self.create_host_connection(
|
||||
self._cur_connection_host_num, for_listening
|
||||
)
|
||||
except pika_pool.Connection.connectivity_errors as e:
|
||||
LOG.warning("Can't establish connection to host. %s", e)
|
||||
except pika_drv_exc.HostConnectionNotAllowedException as e:
|
||||
LOG.warning("Connection to host is not allowed. %s", e)
|
||||
|
||||
connection_attempts -= 1
|
||||
|
||||
raise pika_drv_exc.EstablishConnectionException(
|
||||
"Can not establish connection to any configured RabbitMQ "
|
||||
"host: " + str(self._host_list)
|
||||
)
|
||||
|
||||
def _set_tcp_user_timeout(self, s):
|
||||
if not self._tcp_user_timeout:
|
||||
return
|
||||
try:
|
||||
s.setsockopt(
|
||||
socket.IPPROTO_TCP, self.TCP_USER_TIMEOUT,
|
||||
int(self._tcp_user_timeout * 1000)
|
||||
)
|
||||
except socket.error:
|
||||
LOG.warning(
|
||||
"Whoops, this kernel doesn't seem to support TCP_USER_TIMEOUT."
|
||||
)
|
||||
|
||||
def _init_if_needed(self):
|
||||
global _PID
|
||||
|
||||
cur_pid = os.getpid()
|
||||
|
||||
if _PID != cur_pid:
|
||||
if _PID:
|
||||
if self._pid == cur_pid:
|
||||
return
|
||||
|
||||
with self._init_lock:
|
||||
if self._pid == cur_pid:
|
||||
return
|
||||
|
||||
if self._pid:
|
||||
LOG.warning("New pid is detected. Old: %s, new: %s. "
|
||||
"Cleaning up...", _PID, cur_pid)
|
||||
# Note(dukhlov): we need to force select poller usage in case when
|
||||
# 'thread' module is monkey patched becase current eventlet
|
||||
# implementation does not support patching of poll/epoll/kqueue
|
||||
"Cleaning up...", self._pid, cur_pid)
|
||||
|
||||
# Note(dukhlov): we need to force select poller usage in case
|
||||
# when 'thread' module is monkey patched becase current
|
||||
# eventlet implementation does not support patching of
|
||||
# poll/epoll/kqueue
|
||||
if eventletutils.is_monkey_patched("thread"):
|
||||
from pika.adapters import select_connection
|
||||
select_connection.SELECT_TYPE = "select"
|
||||
|
||||
_PID = cur_pid
|
||||
|
||||
def create_host_connection(self, host_index, for_listening=False):
|
||||
"""Create new connection to host #host_index
|
||||
|
||||
:param host_index: Integer, number of host for connection establishing
|
||||
:param for_listening: Boolean, creates connection for listening
|
||||
if True
|
||||
:return: New connection
|
||||
"""
|
||||
with self._connection_lock:
|
||||
self._init_if_needed()
|
||||
|
||||
host = self._host_list[host_index]
|
||||
|
||||
connection_params = pika.ConnectionParameters(
|
||||
host=host.hostname,
|
||||
port=host.port,
|
||||
credentials=pika_credentials.PlainCredentials(
|
||||
host.username, host.password
|
||||
),
|
||||
heartbeat_interval=(
|
||||
self._heartbeat_interval if for_listening else None
|
||||
),
|
||||
**self._common_pika_params
|
||||
mgr = driver.DriverManager(
|
||||
'oslo.messaging.pika.connection_factory',
|
||||
self._connection_factory_type
|
||||
)
|
||||
|
||||
cur_time = time.time()
|
||||
self._connection_factory = mgr.driver(self.url, self.conf)
|
||||
|
||||
host_connection_status = self._connection_host_status.get(host)
|
||||
# initializing 2 connection pools: 1st for connections without
|
||||
# confirmations, 2nd - with confirmations
|
||||
self._connection_without_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
if host_connection_status is None:
|
||||
host_connection_status = {
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME: 0,
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME: 0
|
||||
}
|
||||
self._connection_host_status[host] = host_connection_status
|
||||
self._connection_with_confirmation_pool = pika_pool.QueuedPool(
|
||||
create=self.create_connection,
|
||||
max_size=self.conf.oslo_messaging_pika.pool_max_size,
|
||||
max_overflow=self.conf.oslo_messaging_pika.pool_max_overflow,
|
||||
timeout=self.conf.oslo_messaging_pika.pool_timeout,
|
||||
recycle=self.conf.oslo_messaging_pika.pool_recycle,
|
||||
stale=self.conf.oslo_messaging_pika.pool_stale,
|
||||
)
|
||||
|
||||
last_success_time = host_connection_status[
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
]
|
||||
last_time = host_connection_status[
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME
|
||||
]
|
||||
self._connection_with_confirmation_pool.Connection = (
|
||||
_PooledConnectionWithConfirmations
|
||||
)
|
||||
|
||||
# raise HostConnectionNotAllowedException if we tried to establish
|
||||
# connection in last 'host_connection_reconnect_delay' and got
|
||||
# failure
|
||||
if (last_time != last_success_time and
|
||||
cur_time - last_time <
|
||||
self.host_connection_reconnect_delay):
|
||||
raise pika_drv_exc.HostConnectionNotAllowedException(
|
||||
"Connection to host #{} is not allowed now because of "
|
||||
"previous failure".format(host_index)
|
||||
)
|
||||
self._pid = cur_pid
|
||||
|
||||
try:
|
||||
if for_listening:
|
||||
connection = pika_connection.ThreadSafePikaConnection(
|
||||
params=connection_params
|
||||
)
|
||||
else:
|
||||
connection = pika.BlockingConnection(
|
||||
parameters=connection_params
|
||||
)
|
||||
connection.params = connection_params
|
||||
def create_connection(self, for_listening=False):
|
||||
self._init_if_needed()
|
||||
return self._connection_factory.create_connection(for_listening)
|
||||
|
||||
self._set_tcp_user_timeout(connection._impl.socket)
|
||||
@property
|
||||
def connection_without_confirmation_pool(self):
|
||||
self._init_if_needed()
|
||||
return self._connection_without_confirmation_pool
|
||||
|
||||
self._connection_host_status[host][
|
||||
self.HOST_CONNECTION_LAST_SUCCESS_TRY_TIME
|
||||
] = cur_time
|
||||
@property
|
||||
def connection_with_confirmation_pool(self):
|
||||
self._init_if_needed()
|
||||
return self._connection_with_confirmation_pool
|
||||
|
||||
return connection
|
||||
finally:
|
||||
self._connection_host_status[host][
|
||||
self.HOST_CONNECTION_LAST_TRY_TIME
|
||||
] = cur_time
|
||||
def cleanup(self):
|
||||
if self._connection_factory:
|
||||
self._connection_factory.cleanup()
|
||||
|
||||
def declare_exchange_by_channel(self, channel, exchange, exchange_type,
|
||||
durable):
|
||||
|
@ -25,6 +25,7 @@ from oslo_messaging._drivers import base as drivers_base
|
||||
from oslo_messaging._drivers import impl_pika
|
||||
from oslo_messaging._drivers import impl_rabbit
|
||||
from oslo_messaging._drivers import impl_zmq
|
||||
from oslo_messaging._drivers.pika_driver import pika_connection_factory
|
||||
from oslo_messaging._drivers.protocols.amqp import opts as amqp_opts
|
||||
from oslo_messaging._drivers.zmq_driver.matchmaker import matchmaker_redis
|
||||
from oslo_messaging.notify import notifier
|
||||
@ -48,8 +49,9 @@ _opts = [
|
||||
('oslo_messaging_notifications', notifier._notifier_opts),
|
||||
('oslo_messaging_rabbit', list(
|
||||
itertools.chain(amqp.amqp_opts, impl_rabbit.rabbit_opts,
|
||||
impl_pika.pika_opts, impl_pika.pika_pool_opts,
|
||||
impl_pika.notification_opts, impl_pika.rpc_opts))),
|
||||
pika_connection_factory.pika_opts,
|
||||
impl_pika.pika_pool_opts, impl_pika.notification_opts,
|
||||
impl_pika.rpc_opts))),
|
||||
]
|
||||
|
||||
|
||||
|
15
setup.cfg
15
setup.cfg
@ -55,6 +55,21 @@ oslo.messaging.notify.drivers =
|
||||
noop = oslo_messaging.notify._impl_noop:NoOpDriver
|
||||
routing = oslo_messaging.notify._impl_routing:RoutingDriver
|
||||
|
||||
oslo.messaging.pika.connection_factory =
|
||||
# Creates new connection for each create_connection call. Old-style behaviour
|
||||
# Uses a much more connections then single and read_write factories but still avalable as
|
||||
# an option
|
||||
new = oslo_messaging._drivers.pika_driver.pika_connection_factory:PikaConnectionFactory
|
||||
|
||||
# Creates only one connection for transport and return it for each create connection call
|
||||
# it is default, but you can not use it with synchronous executor
|
||||
single = oslo_messaging._drivers.pika_driver.pika_connection_factory:SinglePikaConnectionFactory
|
||||
|
||||
# Create two connections - one for listening and another one for sending and return them
|
||||
# for each create connection call depending on connection purpose. Creates one more connection
|
||||
# but you can use it with synchronous executor
|
||||
read_write = oslo_messaging._drivers.pika_driver.pika_connection_factory:ReadWritePikaConnectionFactory
|
||||
|
||||
oslo.messaging.zmq.matchmaker =
|
||||
# Matchmakers for ZeroMQ
|
||||
dummy = oslo_messaging._drivers.zmq_driver.matchmaker.base:DummyMatchMaker
|
||||
|
Loading…
x
Reference in New Issue
Block a user