Remove deprecated rabbit options
Remove the deprecated options rabbit_host, rabbit_port, rabbit_hosts, rabbit_userid, rabbit_password, rabbit_virtual_host and rabbit_max_retries which were deprecated in 5.10.0, released during Ocata. Change-Id: I39dec568e5de0b653e5af1f196537e09ef126a36 Closes-Bug: #1712394
This commit is contained in:
parent
3b1b08b74a
commit
b0d3bfceb8
@ -33,10 +33,10 @@ import kombu.messaging
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import eventletutils
|
from oslo_utils import eventletutils
|
||||||
from oslo_utils import netutils
|
|
||||||
import six
|
import six
|
||||||
from six.moves.urllib import parse
|
from six.moves.urllib import parse
|
||||||
|
|
||||||
|
import oslo_messaging
|
||||||
from oslo_messaging._drivers import amqp as rpc_amqp
|
from oslo_messaging._drivers import amqp as rpc_amqp
|
||||||
from oslo_messaging._drivers import amqpdriver
|
from oslo_messaging._drivers import amqpdriver
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
@ -101,49 +101,11 @@ rabbit_opts = [
|
|||||||
'the one we are currently connected to becomes '
|
'the one we are currently connected to becomes '
|
||||||
'unavailable. Takes effect only if more than one '
|
'unavailable. Takes effect only if more than one '
|
||||||
'RabbitMQ node is provided in config.'),
|
'RabbitMQ node is provided in config.'),
|
||||||
cfg.StrOpt('rabbit_host',
|
|
||||||
default='localhost',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='The RabbitMQ broker address where a single node is '
|
|
||||||
'used.'),
|
|
||||||
cfg.PortOpt('rabbit_port',
|
|
||||||
default=5672,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='The RabbitMQ broker port where a single node is used.'),
|
|
||||||
cfg.ListOpt('rabbit_hosts',
|
|
||||||
default=['$rabbit_host:$rabbit_port'],
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='RabbitMQ HA cluster host:port pairs.'),
|
|
||||||
cfg.StrOpt('rabbit_userid',
|
|
||||||
default='guest',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='The RabbitMQ userid.'),
|
|
||||||
cfg.StrOpt('rabbit_password',
|
|
||||||
default='guest',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='The RabbitMQ password.',
|
|
||||||
secret=True),
|
|
||||||
cfg.StrOpt('rabbit_login_method',
|
cfg.StrOpt('rabbit_login_method',
|
||||||
choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'),
|
choices=('PLAIN', 'AMQPLAIN', 'RABBIT-CR-DEMO'),
|
||||||
default='AMQPLAIN',
|
default='AMQPLAIN',
|
||||||
deprecated_group='DEFAULT',
|
deprecated_group='DEFAULT',
|
||||||
help='The RabbitMQ login method.'),
|
help='The RabbitMQ login method.'),
|
||||||
cfg.StrOpt('rabbit_virtual_host',
|
|
||||||
default='/',
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_reason="Replaced by [DEFAULT]/transport_url",
|
|
||||||
help='The RabbitMQ virtual host.'),
|
|
||||||
cfg.IntOpt('rabbit_retry_interval',
|
cfg.IntOpt('rabbit_retry_interval',
|
||||||
default=1,
|
default=1,
|
||||||
help='How frequently to retry connecting with RabbitMQ.'),
|
help='How frequently to retry connecting with RabbitMQ.'),
|
||||||
@ -156,12 +118,6 @@ rabbit_opts = [
|
|||||||
default=30,
|
default=30,
|
||||||
help='Maximum interval of RabbitMQ connection retries. '
|
help='Maximum interval of RabbitMQ connection retries. '
|
||||||
'Default is 30 seconds.'),
|
'Default is 30 seconds.'),
|
||||||
cfg.IntOpt('rabbit_max_retries',
|
|
||||||
default=0,
|
|
||||||
deprecated_for_removal=True,
|
|
||||||
deprecated_group='DEFAULT',
|
|
||||||
help='Maximum number of RabbitMQ connection retries. '
|
|
||||||
'Default is 0 (infinite retry count).'),
|
|
||||||
cfg.BoolOpt('rabbit_ha_queues',
|
cfg.BoolOpt('rabbit_ha_queues',
|
||||||
default=False,
|
default=False,
|
||||||
deprecated_group='DEFAULT',
|
deprecated_group='DEFAULT',
|
||||||
@ -450,17 +406,11 @@ class Connection(object):
|
|||||||
# NOTE(viktors): Parse config options
|
# NOTE(viktors): Parse config options
|
||||||
driver_conf = conf.oslo_messaging_rabbit
|
driver_conf = conf.oslo_messaging_rabbit
|
||||||
|
|
||||||
self.max_retries = driver_conf.rabbit_max_retries
|
|
||||||
self.interval_start = driver_conf.rabbit_retry_interval
|
self.interval_start = driver_conf.rabbit_retry_interval
|
||||||
self.interval_stepping = driver_conf.rabbit_retry_backoff
|
self.interval_stepping = driver_conf.rabbit_retry_backoff
|
||||||
self.interval_max = driver_conf.rabbit_interval_max
|
self.interval_max = driver_conf.rabbit_interval_max
|
||||||
|
|
||||||
self.login_method = driver_conf.rabbit_login_method
|
self.login_method = driver_conf.rabbit_login_method
|
||||||
self.virtual_host = driver_conf.rabbit_virtual_host
|
|
||||||
self.rabbit_hosts = driver_conf.rabbit_hosts
|
|
||||||
self.rabbit_port = driver_conf.rabbit_port
|
|
||||||
self.rabbit_userid = driver_conf.rabbit_userid
|
|
||||||
self.rabbit_password = driver_conf.rabbit_password
|
|
||||||
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
self.rabbit_ha_queues = driver_conf.rabbit_ha_queues
|
||||||
self.rabbit_transient_queues_ttl = \
|
self.rabbit_transient_queues_ttl = \
|
||||||
driver_conf.rabbit_transient_queues_ttl
|
driver_conf.rabbit_transient_queues_ttl
|
||||||
@ -483,15 +433,6 @@ class Connection(object):
|
|||||||
self.ssl_cert_file = driver_conf.ssl_cert_file
|
self.ssl_cert_file = driver_conf.ssl_cert_file
|
||||||
self.ssl_ca_file = driver_conf.ssl_ca_file
|
self.ssl_ca_file = driver_conf.ssl_ca_file
|
||||||
|
|
||||||
# Try forever?
|
|
||||||
if self.max_retries <= 0:
|
|
||||||
self.max_retries = None
|
|
||||||
|
|
||||||
if url.virtual_host is not None:
|
|
||||||
virtual_host = url.virtual_host
|
|
||||||
else:
|
|
||||||
virtual_host = self.virtual_host
|
|
||||||
|
|
||||||
self._url = ''
|
self._url = ''
|
||||||
if url.hosts:
|
if url.hosts:
|
||||||
if url.transport.startswith('kombu+'):
|
if url.transport.startswith('kombu+'):
|
||||||
@ -501,34 +442,22 @@ class Connection(object):
|
|||||||
url.transport)
|
url.transport)
|
||||||
if len(url.hosts) > 1:
|
if len(url.hosts) > 1:
|
||||||
random.shuffle(url.hosts)
|
random.shuffle(url.hosts)
|
||||||
for host in url.hosts:
|
transformed_urls = [
|
||||||
transport = url.transport.replace('kombu+', '')
|
self._transform_transport_url(url, host)
|
||||||
transport = transport.replace('rabbit', 'amqp')
|
for host in url.hosts]
|
||||||
self._url += '%s%s://%s:%s@%s:%s/%s' % (
|
self._url = ';'.join(transformed_urls)
|
||||||
";" if self._url else '',
|
|
||||||
transport,
|
|
||||||
parse.quote(host.username or ''),
|
|
||||||
parse.quote(host.password or ''),
|
|
||||||
self._parse_url_hostname(host.hostname) or '',
|
|
||||||
str(host.port or 5672),
|
|
||||||
virtual_host)
|
|
||||||
elif url.transport.startswith('kombu+'):
|
elif url.transport.startswith('kombu+'):
|
||||||
# NOTE(sileht): url have a + but no hosts
|
# NOTE(sileht): url have a + but no hosts
|
||||||
# (like kombu+memory:///), pass it to kombu as-is
|
# (like kombu+memory:///), pass it to kombu as-is
|
||||||
transport = url.transport.replace('kombu+', '')
|
transport = url.transport.replace('kombu+', '')
|
||||||
self._url = "%s://%s" % (transport, virtual_host)
|
self._url = "%s://" % transport
|
||||||
else:
|
if url.virtual_host:
|
||||||
if len(self.rabbit_hosts) > 1:
|
self._url += url.virtual_host
|
||||||
random.shuffle(self.rabbit_hosts)
|
elif not url.hosts:
|
||||||
for adr in self.rabbit_hosts:
|
host = oslo_messaging.transport.TransportHost('')
|
||||||
hostname, port = netutils.parse_host_port(
|
self._url = self._transform_transport_url(
|
||||||
adr, default_port=self.rabbit_port)
|
url, host, default_username='guest', default_password='guest',
|
||||||
self._url += '%samqp://%s:%s@%s:%s/%s' % (
|
default_hostname='localhost')
|
||||||
";" if self._url else '',
|
|
||||||
parse.quote(self.rabbit_userid, ''),
|
|
||||||
parse.quote(self.rabbit_password, ''),
|
|
||||||
self._parse_url_hostname(hostname), port,
|
|
||||||
virtual_host)
|
|
||||||
|
|
||||||
self._initial_pid = os.getpid()
|
self._initial_pid = os.getpid()
|
||||||
|
|
||||||
@ -656,6 +585,18 @@ class Connection(object):
|
|||||||
except KeyError:
|
except KeyError:
|
||||||
raise RuntimeError(_("Invalid SSL version : %s") % version)
|
raise RuntimeError(_("Invalid SSL version : %s") % version)
|
||||||
|
|
||||||
|
def _transform_transport_url(self, url, host, default_username='',
|
||||||
|
default_password='', default_hostname=''):
|
||||||
|
transport = url.transport.replace('kombu+', '')
|
||||||
|
transport = transport.replace('rabbit', 'amqp')
|
||||||
|
return '%s://%s:%s@%s:%s/%s' % (
|
||||||
|
transport,
|
||||||
|
parse.quote(host.username or default_username),
|
||||||
|
parse.quote(host.password or default_password),
|
||||||
|
self._parse_url_hostname(host.hostname) or default_hostname,
|
||||||
|
str(host.port or 5672),
|
||||||
|
url.virtual_host)
|
||||||
|
|
||||||
def _parse_url_hostname(self, hostname):
|
def _parse_url_hostname(self, hostname):
|
||||||
"""Handles hostname returned from urlparse and checks whether it's
|
"""Handles hostname returned from urlparse and checks whether it's
|
||||||
ipaddress. If it's ipaddress it ensures that it has brackets for IPv6.
|
ipaddress. If it's ipaddress it ensures that it has brackets for IPv6.
|
||||||
@ -704,8 +645,7 @@ class Connection(object):
|
|||||||
recoverable_error_callback=None, error_callback=None,
|
recoverable_error_callback=None, error_callback=None,
|
||||||
timeout_is_error=True):
|
timeout_is_error=True):
|
||||||
"""Will retry up to retry number of times.
|
"""Will retry up to retry number of times.
|
||||||
retry = None means use the value of rabbit_max_retries
|
retry = None or -1 means to retry forever
|
||||||
retry = -1 means to retry forever
|
|
||||||
retry = 0 means no retry
|
retry = 0 means no retry
|
||||||
retry = N means N retries
|
retry = N means N retries
|
||||||
|
|
||||||
@ -720,10 +660,8 @@ class Connection(object):
|
|||||||
"latest/reference/transport.html"))
|
"latest/reference/transport.html"))
|
||||||
self._initial_pid = current_pid
|
self._initial_pid = current_pid
|
||||||
|
|
||||||
if retry is None:
|
|
||||||
retry = self.max_retries
|
|
||||||
if retry is None or retry < 0:
|
if retry is None or retry < 0:
|
||||||
retry = None
|
retry = float('inf')
|
||||||
|
|
||||||
def on_error(exc, interval):
|
def on_error(exc, interval):
|
||||||
LOG.debug("[%s] Received recoverable error from kombu:"
|
LOG.debug("[%s] Received recoverable error from kombu:"
|
||||||
|
@ -946,9 +946,9 @@ class RpcKombuHATestCase(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(RpcKombuHATestCase, self).setUp()
|
super(RpcKombuHATestCase, self).setUp()
|
||||||
self.brokers = ['host1', 'host2', 'host3', 'host4', 'host5']
|
transport_url = 'rabbit:/host1,host2,host3,host4,host5/'
|
||||||
self.config(rabbit_hosts=self.brokers,
|
self.messaging_conf.transport_url = transport_url
|
||||||
rabbit_retry_interval=0.01,
|
self.config(rabbit_retry_interval=0.01,
|
||||||
rabbit_retry_backoff=0.01,
|
rabbit_retry_backoff=0.01,
|
||||||
kombu_reconnect_delay=0,
|
kombu_reconnect_delay=0,
|
||||||
heartbeat_timeout_threshold=0,
|
heartbeat_timeout_threshold=0,
|
||||||
|
@ -26,7 +26,6 @@ class TestConfigOptsProxy(test_utils.BaseTestCase):
|
|||||||
group = 'oslo_messaging_rabbit'
|
group = 'oslo_messaging_rabbit'
|
||||||
self.config(rabbit_retry_interval=1,
|
self.config(rabbit_retry_interval=1,
|
||||||
rabbit_qos_prefetch_count=0,
|
rabbit_qos_prefetch_count=0,
|
||||||
rabbit_max_retries=3,
|
|
||||||
group=group)
|
group=group)
|
||||||
dummy_opts = [cfg.ListOpt('list_str', item_type=types.String(),
|
dummy_opts = [cfg.ListOpt('list_str', item_type=types.String(),
|
||||||
default=[]),
|
default=[]),
|
||||||
@ -53,7 +52,6 @@ class TestConfigOptsProxy(test_utils.BaseTestCase):
|
|||||||
self.assertEqual(1, conf.oslo_messaging_rabbit.rabbit_retry_interval)
|
self.assertEqual(1, conf.oslo_messaging_rabbit.rabbit_retry_interval)
|
||||||
self.assertEqual(2,
|
self.assertEqual(2,
|
||||||
conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
|
conf.oslo_messaging_rabbit.rabbit_qos_prefetch_count)
|
||||||
self.assertEqual(3, conf.oslo_messaging_rabbit.rabbit_max_retries)
|
|
||||||
self.assertEqual(['1', '2', '3'], conf.oslo_messaging_rabbit.list_str)
|
self.assertEqual(['1', '2', '3'], conf.oslo_messaging_rabbit.list_str)
|
||||||
self.assertEqual([1, 2, 3], conf.oslo_messaging_rabbit.list_int)
|
self.assertEqual([1, 2, 3], conf.oslo_messaging_rabbit.list_int)
|
||||||
self.assertEqual({'x': '1', 'y': '2', 'z': '3'},
|
self.assertEqual({'x': '1', 'y': '2', 'z': '3'},
|
||||||
|
Loading…
Reference in New Issue
Block a user