kafka: disable batch for functional tests
Change-Id: I09a3049ca5f4647d0f6b002b3732a4c0edd43986
This commit is contained in:
parent
a76a51a78c
commit
799cd6fa8f
@ -26,7 +26,6 @@ import threading
|
|||||||
import kafka
|
import kafka
|
||||||
from kafka.client_async import selectors
|
from kafka.client_async import selectors
|
||||||
import kafka.errors
|
import kafka.errors
|
||||||
from oslo_config import cfg
|
|
||||||
from oslo_log import log as logging
|
from oslo_log import log as logging
|
||||||
from oslo_utils import eventletutils
|
from oslo_utils import eventletutils
|
||||||
import tenacity
|
import tenacity
|
||||||
@ -319,15 +318,10 @@ class KafkaDriver(base.BaseDriver):
|
|||||||
|
|
||||||
def __init__(self, conf, url, default_exchange=None,
|
def __init__(self, conf, url, default_exchange=None,
|
||||||
allowed_remote_exmods=None):
|
allowed_remote_exmods=None):
|
||||||
|
|
||||||
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
|
||||||
title='Kafka driver options')
|
|
||||||
conf.register_group(opt_group)
|
|
||||||
conf.register_opts(kafka_options.KAFKA_OPTS, group=opt_group)
|
|
||||||
|
|
||||||
super(KafkaDriver, self).__init__(
|
super(KafkaDriver, self).__init__(
|
||||||
conf, url, default_exchange, allowed_remote_exmods)
|
conf, url, default_exchange, allowed_remote_exmods)
|
||||||
|
|
||||||
|
kafka_options.register_opts(conf)
|
||||||
# the pool configuration properties
|
# the pool configuration properties
|
||||||
max_size = self.conf.oslo_messaging_kafka.pool_size
|
max_size = self.conf.oslo_messaging_kafka.pool_size
|
||||||
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
min_size = self.conf.oslo_messaging_kafka.conn_pool_min_size
|
||||||
|
@ -50,3 +50,10 @@ KAFKA_OPTS = [
|
|||||||
cfg.IntOpt('producer_batch_size', default=16384,
|
cfg.IntOpt('producer_batch_size', default=16384,
|
||||||
help='Size of batch for the producer async send')
|
help='Size of batch for the producer async send')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def register_opts(conf):
|
||||||
|
opt_group = cfg.OptGroup(name='oslo_messaging_kafka',
|
||||||
|
title='Kafka driver options')
|
||||||
|
conf.register_group(opt_group)
|
||||||
|
conf.register_opts(KAFKA_OPTS, group=opt_group)
|
||||||
|
@ -20,6 +20,7 @@ from oslo_config import cfg
|
|||||||
from six import moves
|
from six import moves
|
||||||
|
|
||||||
import oslo_messaging
|
import oslo_messaging
|
||||||
|
from oslo_messaging._drivers import kafka_options
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_options
|
from oslo_messaging._drivers.zmq_driver import zmq_options
|
||||||
from oslo_messaging.notify import notifier
|
from oslo_messaging.notify import notifier
|
||||||
from oslo_messaging.tests import utils as test_utils
|
from oslo_messaging.tests import utils as test_utils
|
||||||
@ -305,23 +306,27 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
|
|||||||
rpc_zmq_ipc_dir=zmq_ipc_dir)
|
rpc_zmq_ipc_dir=zmq_ipc_dir)
|
||||||
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
|
zmq_redis_port = os.environ.get('ZMQ_REDIS_PORT')
|
||||||
if zmq_redis_port:
|
if zmq_redis_port:
|
||||||
self.config(port=zmq_redis_port, group="matchmaker_redis")
|
self.config(port=zmq_redis_port,
|
||||||
self.config(check_timeout=10000, group="matchmaker_redis")
|
check_timeout=10000,
|
||||||
self.config(wait_timeout=1000, group="matchmaker_redis")
|
wait_timeout=1000,
|
||||||
|
group="matchmaker_redis")
|
||||||
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
|
zmq_use_pub_sub = os.environ.get('ZMQ_USE_PUB_SUB')
|
||||||
self.config(use_pub_sub=zmq_use_pub_sub,
|
|
||||||
group='oslo_messaging_zmq')
|
|
||||||
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
|
zmq_use_router_proxy = os.environ.get('ZMQ_USE_ROUTER_PROXY')
|
||||||
self.config(use_router_proxy=zmq_use_router_proxy,
|
|
||||||
group='oslo_messaging_zmq')
|
|
||||||
zmq_use_acks = os.environ.get('ZMQ_USE_ACKS')
|
zmq_use_acks = os.environ.get('ZMQ_USE_ACKS')
|
||||||
self.config(rpc_use_acks=zmq_use_acks,
|
self.config(use_pub_sub=zmq_use_pub_sub,
|
||||||
|
use_router_proxy=zmq_use_router_proxy,
|
||||||
|
rpc_use_acks=zmq_use_acks,
|
||||||
group='oslo_messaging_zmq')
|
group='oslo_messaging_zmq')
|
||||||
zmq_use_dynamic_connections = \
|
zmq_use_dynamic_connections = \
|
||||||
os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS')
|
os.environ.get('ZMQ_USE_DYNAMIC_CONNECTIONS')
|
||||||
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
|
self.config(use_dynamic_connections=zmq_use_dynamic_connections,
|
||||||
group='oslo_messaging_zmq')
|
group='oslo_messaging_zmq')
|
||||||
|
|
||||||
|
kafka_options.register_opts(conf)
|
||||||
|
|
||||||
|
self.config(producer_batch_size=0,
|
||||||
|
group='oslo_messaging_kafka')
|
||||||
|
|
||||||
|
|
||||||
class NotificationFixture(fixtures.Fixture):
|
class NotificationFixture(fixtures.Fixture):
|
||||||
def __init__(self, conf, url, topics, batch=None):
|
def __init__(self, conf, url, topics, batch=None):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user