diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index 6c90d30cb..4c8dc29c7 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -26,7 +26,6 @@ import threading import kafka from kafka.client_async import selectors import kafka.errors -from oslo_config import cfg from oslo_log import log as logging from oslo_utils import eventletutils import tenacity @@ -319,15 +318,10 @@ class KafkaDriver(base.BaseDriver): def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=None): - - opt_group = cfg.OptGroup(name='oslo_messaging_kafka', - title='Kafka driver options') - conf.register_group(opt_group) - conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group) - super(KafkaDriver, self).__init__( conf, url, default_exchange, allowed_remote_exmods) + kafka_options.register_opts(conf) # the pool configuration properties max_size = self.conf.oslo_messaging_kafka.pool_size min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size diff --git a/oslo_messaging/_drivers/kafka_options.py b/oslo_messaging/_drivers/kafka_options.py index c733a0a42..7989288ea 100644 --- a/oslo_messaging/_drivers/kafka_options.py +++ b/oslo_messaging/_drivers/kafka_options.py @@ -50,3 +50,10 @@ KAFKA_OPTS = [ cfg.IntOpt('producer_batch_size', default=16384, help='Size of batch for the producer async send') ] + + +def register_opts(conf): + opt_group = cfg.OptGroup(name='oslo_messaging_kafka', + title='Kafka driver options') + conf.register_group(opt_group) + conf.register_opts(KAFKA_OPTS, group=opt_group) diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index ebdc72533..b47ac5fc2 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -20,6 +20,7 @@ from oslo_config import cfg from six import moves import oslo_messaging +from oslo_messaging._drivers import kafka_options from oslo_messaging._drivers.zmq_driver import zmq_options from oslo_messaging.notify import notifier from oslo_messaging.tests import utils as test_utils @@ -305,23 +306,27 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase): rpc_zmq_ipc_dir=zmq_ipc_dir) zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT') if zmq_redis_port: - self.config(port=zmq_redis_port, group="matchmaker_redis") - self.config(check_timeout=10000, group="matchmaker_redis") - self.config(wait_timeout=1000, group="matchmaker_redis") + self.config(port=zmq_redis_port, + check_timeout=10000, + wait_timeout=1000, + group="matchmaker_redis") zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB') - self.config(use_pub_sub=zmq_use_pub_sub, - group='oslo_messaging_zmq') zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY') - self.config(use_router_proxy=zmq_use_router_proxy, - group='oslo_messaging_zmq') zmq_use_acks = os.environ.get('ZMQ_USE_ACKS') - self.config(rpc_use_acks=zmq_use_acks, + self.config(use_pub_sub=zmq_use_pub_sub, + use_router_proxy=zmq_use_router_proxy, + rpc_use_acks=zmq_use_acks, group='oslo_messaging_zmq') zmq_use_dynamic_connections = \ os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS') self.config(use_dynamic_connections=zmq_use_dynamic_connections, group='oslo_messaging_zmq') + kafka_options.register_opts(conf) + + self.config(producer_batch_size=0, + group='oslo_messaging_kafka') + class NotificationFixture(fixtures.Fixture): def __init__(self, conf, url, topics, batch=None):