drivers: use common.ConfigOptsProxy everywhere
ConfigOptsProxy have been implemented only pika driver while the oslo.messaging allow to pass the query string for all drivers. This change fixes that. Closes-Bug: #1666903 Closes-Bug: #1607889 Next step is to validate the query with ConfigOptsProxy, to raise appropriate exception in case of mis-configuration. Change-Id: I573334e774ccf33ecd27a85067045f3c6489ee89
This commit is contained in:
parent
41fa1ac649
commit
58b026a2aa
@ -200,6 +200,7 @@ class ProtonDriver(base.BaseDriver):
|
|||||||
title='AMQP 1.0 driver options')
|
title='AMQP 1.0 driver options')
|
||||||
conf.register_group(opt_group)
|
conf.register_group(opt_group)
|
||||||
conf.register_opts(opts.amqp1_opts, group=opt_group)
|
conf.register_opts(opts.amqp1_opts, group=opt_group)
|
||||||
|
conf = common.ConfigOptsProxy(conf, url)
|
||||||
|
|
||||||
self._hosts = url.hosts
|
self._hosts = url.hosts
|
||||||
self._conf = conf
|
self._conf = conf
|
||||||
|
@ -309,10 +309,10 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
|
|
||||||
def __init__(self, conf, url, default_exchange=None,
|
def __init__(self, conf, url, default_exchange=None,
|
||||||
allowed_remote_exmods=None):
|
allowed_remote_exmods=None):
|
||||||
|
conf = kafka_options.register_opts(conf, url)
|
||||||
super(KafkaDriver, self).__init__(
|
super(KafkaDriver, self).__init__(
|
||||||
conf, url, default_exchange, allowed_remote_exmods)
|
conf, url, default_exchange, allowed_remote_exmods)
|
||||||
|
|
||||||
kafka_options.register_opts(conf)
|
|
||||||
# the pool configuration properties
|
# the pool configuration properties
|
||||||
max_size = self.conf.oslo_messaging_kafka.pool_size
|
max_size = self.conf.oslo_messaging_kafka.pool_size
|
||||||
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
||||||
|
@ -19,6 +19,7 @@ import pika_pool
|
|||||||
import tenacity
|
import tenacity
|
||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
|
from oslo_messaging._drivers import common
|
||||||
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
|
from oslo_messaging._drivers.pika_driver import (pika_connection_factory as
|
||||||
pika_drv_conn_factory)
|
pika_drv_conn_factory)
|
||||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||||
@ -143,6 +144,7 @@ class PikaDriver(base.BaseDriver):
|
|||||||
conf.register_opts(message_opts, group=opt_group)
|
conf.register_opts(message_opts, group=opt_group)
|
||||||
conf.register_opts(rpc_opts, group=opt_group)
|
conf.register_opts(rpc_opts, group=opt_group)
|
||||||
conf.register_opts(notification_opts, group=opt_group)
|
conf.register_opts(notification_opts, group=opt_group)
|
||||||
|
conf = common.ConfigOptsProxy(conf, url)
|
||||||
|
|
||||||
self._pika_engine = pika_drv_engine.PikaEngine(
|
self._pika_engine = pika_drv_engine.PikaEngine(
|
||||||
conf, url, default_exchange, allowed_remote_exmods
|
conf, url, default_exchange, allowed_remote_exmods
|
||||||
|
@ -1335,6 +1335,7 @@ class RabbitDriver(amqpdriver.AMQPDriverBase):
|
|||||||
conf.register_opts(rabbit_opts, group=opt_group)
|
conf.register_opts(rabbit_opts, group=opt_group)
|
||||||
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
conf.register_opts(rpc_amqp.amqp_opts, group=opt_group)
|
||||||
conf.register_opts(base.base_opts, group=opt_group)
|
conf.register_opts(base.base_opts, group=opt_group)
|
||||||
|
conf = rpc_common.ConfigOptsProxy(conf, url)
|
||||||
|
|
||||||
self.missing_destination_retry_timeout = (
|
self.missing_destination_retry_timeout = (
|
||||||
conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)
|
conf.oslo_messaging_rabbit.kombu_missing_consumer_retry_timeout)
|
||||||
|
@ -94,7 +94,7 @@ class ZmqDriver(base.BaseDriver):
|
|||||||
if zmq is None:
|
if zmq is None:
|
||||||
raise ImportError(_LE("ZeroMQ is not available!"))
|
raise ImportError(_LE("ZeroMQ is not available!"))
|
||||||
|
|
||||||
zmq_options.register_opts(conf)
|
conf = zmq_options.register_opts(conf, url)
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.allowed_remote_exmods = allowed_remote_exmods
|
self.allowed_remote_exmods = allowed_remote_exmods
|
||||||
|
|
||||||
|
@ -13,6 +13,8 @@
|
|||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
|
from oslo_messaging._drivers import common
|
||||||
|
|
||||||
KAFKA_OPTS = [
|
KAFKA_OPTS = [
|
||||||
cfg.StrOpt('kafka_default_host', default='localhost',
|
cfg.StrOpt('kafka_default_host', default='localhost',
|
||||||
deprecated_for_removal=True,
|
deprecated_for_removal=True,
|
||||||
@ -52,8 +54,9 @@ KAFKA_OPTS = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def register_opts(conf):
|
def register_opts(conf, url):
|
||||||
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
||||||
title='Kafka driver options')
|
title='Kafka driver options')
|
||||||
conf.register_group(opt_group)
|
conf.register_group(opt_group)
|
||||||
conf.register_opts(KAFKA_OPTS, group=opt_group)
|
conf.register_opts(KAFKA_OPTS, group=opt_group)
|
||||||
|
return common.ConfigOptsProxy(conf, url)
|
||||||
|
@ -20,7 +20,6 @@ from oslo_utils import eventletutils
|
|||||||
import pika_pool
|
import pika_pool
|
||||||
from stevedore import driver
|
from stevedore import driver
|
||||||
|
|
||||||
from oslo_messaging._drivers import common as drv_cmn
|
|
||||||
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
from oslo_messaging._drivers.pika_driver import pika_commons as pika_drv_cmns
|
||||||
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
from oslo_messaging._drivers.pika_driver import pika_exceptions as pika_drv_exc
|
||||||
|
|
||||||
@ -48,7 +47,6 @@ class PikaEngine(object):
|
|||||||
|
|
||||||
def __init__(self, conf, url, default_exchange=None,
|
def __init__(self, conf, url, default_exchange=None,
|
||||||
allowed_remote_exmods=None):
|
allowed_remote_exmods=None):
|
||||||
conf = drv_cmn.ConfigOptsProxy(conf, url)
|
|
||||||
self.conf = conf
|
self.conf = conf
|
||||||
self.url = url
|
self.url = url
|
||||||
|
|
||||||
|
@ -17,6 +17,7 @@ import socket
|
|||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
|
|
||||||
from oslo_messaging._drivers import base
|
from oslo_messaging._drivers import base
|
||||||
|
from oslo_messaging._drivers import common
|
||||||
from oslo_messaging import server
|
from oslo_messaging import server
|
||||||
|
|
||||||
|
|
||||||
@ -203,9 +204,10 @@ zmq_opts = [
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
def register_opts(conf):
|
def register_opts(conf, url):
|
||||||
opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
|
opt_group = cfg.OptGroup(name='oslo_messaging_zmq',
|
||||||
title='ZeroMQ driver options')
|
title='ZeroMQ driver options')
|
||||||
conf.register_opts(zmq_opts, group=opt_group)
|
conf.register_opts(zmq_opts, group=opt_group)
|
||||||
conf.register_opts(server._pool_opts)
|
conf.register_opts(server._pool_opts)
|
||||||
conf.register_opts(base.base_opts)
|
conf.register_opts(base.base_opts)
|
||||||
|
return common.ConfigOptsProxy(conf, url)
|
||||||
|
@ -38,7 +38,7 @@ class TestZmqAckManager(test_utils.BaseTestCase):
|
|||||||
|
|
||||||
# register and set necessary config opts
|
# register and set necessary config opts
|
||||||
self.messaging_conf.transport_driver = 'zmq'
|
self.messaging_conf.transport_driver = 'zmq'
|
||||||
zmq_options.register_opts(self.conf)
|
zmq_options.register_opts(self.conf, mock.MagicMock())
|
||||||
kwargs = {'rpc_zmq_matchmaker': 'dummy',
|
kwargs = {'rpc_zmq_matchmaker': 'dummy',
|
||||||
'use_pub_sub': False,
|
'use_pub_sub': False,
|
||||||
'use_router_proxy': True,
|
'use_router_proxy': True,
|
||||||
|
@ -16,6 +16,7 @@ import logging
|
|||||||
import threading
|
import threading
|
||||||
|
|
||||||
import fixtures
|
import fixtures
|
||||||
|
from six.moves import mock
|
||||||
import testtools
|
import testtools
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
@ -72,7 +73,7 @@ class ZmqBaseTestCase(test_utils.BaseTestCase):
|
|||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(ZmqBaseTestCase, self).setUp()
|
super(ZmqBaseTestCase, self).setUp()
|
||||||
self.messaging_conf.transport_driver = 'zmq'
|
self.messaging_conf.transport_driver = 'zmq'
|
||||||
zmq_options.register_opts(self.conf)
|
zmq_options.register_opts(self.conf, mock.MagicMock())
|
||||||
|
|
||||||
# Set config values
|
# Set config values
|
||||||
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
self.internal_ipc_dir = self.useFixture(fixtures.TempDir()).path
|
||||||
|
@ -294,7 +294,9 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
|||||||
if not self.url:
|
if not self.url:
|
||||||
self.skipTest("No transport url configured")
|
self.skipTest("No transport url configured")
|
||||||
|
|
||||||
zmq_options.register_opts(conf)
|
transport_url = oslo_messaging.TransportURL.parse(conf, self.url)
|
||||||
|
|
||||||
|
zmq_options.register_opts(conf, transport_url)
|
||||||
|
|
||||||
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
|
zmq_matchmaker = os.environ.get('ZMQ_MATCHMAKER')
|
||||||
if zmq_matchmaker:
|
if zmq_matchmaker:
|
||||||
@ -322,7 +324,7 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
|||||||
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
|
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
|
||||||
group='oslo_messaging_zmq')
|
group='oslo_messaging_zmq')
|
||||||
|
|
||||||
kafka_options.register_opts(conf)
|
kafka_options.register_opts(conf, transport_url)
|
||||||
|
|
||||||
self.config(producer_batch_size=0,
|
self.config(producer_batch_size=0,
|
||||||
group='oslo_messaging_kafka')
|
group='oslo_messaging_kafka')
|
||||||
|
Loading…
x
Reference in New Issue
Block a user