Switch driver to confluent-kafka client library

This patch switches the kafka python client from kafka-python to
confluent-kafka due to documented threading issues with the
kafka-python consumer and the recommendation to use multiplrocessing.
The confluent-kafka client leverages the high performance librdkafka
C client and is safe for multiple thread use.

This patch:
* switches to confluent-kafka library
* revises consumer and producer message operations
* utilizes event.tpool method for confluent-kafka blocking calls
* updates unit tests
* adds kafka specific timeouts for functional tests
* adds release note

Depends-On: Ice374dca539b8ed1b1965b75379bad5140121483
Change-Id: Idfb9fe3700d882c8285c6dc56b0620951178eba2
This commit is contained in:
Andy Smith 2018-09-17 13:21:42 -04:00
parent 252844879d
commit 5a842ae155
12 changed files with 221 additions and 157 deletions

View File

@ -77,7 +77,7 @@
devstack_plugins: devstack_plugins:
devstack-plugin-amqp1: git://git.openstack.org/openstack/devstack-plugin-amqp1 devstack-plugin-amqp1: git://git.openstack.org/openstack/devstack-plugin-amqp1
zuul_copy_output: zuul_copy_output:
'{{ devstack_base_dir }}/logs/qdrouterd.log': logs '{{ devstack_log_dir }}/qdrouterd.log': logs
- job: - job:
@ -102,8 +102,7 @@
devstack_plugins: devstack_plugins:
devstack-plugin-kafka: git://git.openstack.org/openstack/devstack-plugin-kafka devstack-plugin-kafka: git://git.openstack.org/openstack/devstack-plugin-kafka
zuul_copy_output: zuul_copy_output:
'{{ devstack_base_dir }}/logs/qdrouterd.log': logs '{{ devstack_log_dir }}/server.log': logs
- job: - job:
name: oslo.messaging-src-dsvm-full-kafka-centos-7 name: oslo.messaging-src-dsvm-full-kafka-centos-7

View File

@ -32,6 +32,8 @@ swig [platform:rpm amqp1]
# kafka dpkg # kafka dpkg
openjdk-8-jdk [platform:dpkg kafka] openjdk-8-jdk [platform:dpkg kafka]
librdkafka1 [platform:dpkg kafka]
# kafka rpm # kafka rpm
java-1.8.0-openjdk [platform:rpm kafka] java-1.8.0-openjdk [platform:rpm kafka]
librdkafka [platform:rpm kafka]

View File

@ -8,6 +8,6 @@ reno>=2.5.0 # Apache-2.0
# imported when the source code is parsed for generating documentation: # imported when the source code is parsed for generating documentation:
fixtures>=3.0.0 # Apache-2.0/BSD fixtures>=3.0.0 # Apache-2.0/BSD
kafka-python>=1.3.1 # Apache-2.0 confluent-kafka>=0.11.6 # Apache-2.0
pyngus>=2.2.0 # Apache-2.0 pyngus>=2.2.0 # Apache-2.0
tenacity>=3.2.1 # Apache-2.0 tenacity>=3.2.1 # Apache-2.0

View File

@ -7,6 +7,7 @@ cachetools==2.0.0
cffi==1.7.0 cffi==1.7.0
cliff==2.8.0 cliff==2.8.0
cmd2==0.8.0 cmd2==0.8.0
confluent-kafka==0.11.6
contextlib2==0.4.0 contextlib2==0.4.0
coverage==4.0 coverage==4.0
debtcollector==1.2.0 debtcollector==1.2.0
@ -26,7 +27,6 @@ hacking==0.12.0
imagesize==0.7.1 imagesize==0.7.1
iso8601==0.1.11 iso8601==0.1.11
Jinja2==2.10 Jinja2==2.10
kafka-python==1.3.1
keystoneauth1==3.4.0 keystoneauth1==3.4.0
kombu==4.0.0 kombu==4.0.0
linecache2==1.0.0 linecache2==1.0.0

View File

@ -12,54 +12,30 @@
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
# Following code fixes 2 issues with kafka-python and import logging
# The current release of eventlet (0.19.0) does not actually remove
# select.poll [1]. Because of kafka-python.selectors34 selects
# PollSelector instead of SelectSelector [2]. PollSelector relies on
# select.poll, which does not work when eventlet/greenlet is used. This
# bug in evenlet is fixed in the master branch [3], but there's no
# release of eventlet that includes this fix at this point.
import json
import threading import threading
import kafka import confluent_kafka
from kafka.client_async import selectors from confluent_kafka import KafkaException
import kafka.errors from oslo_serialization import jsonutils
from oslo_log import log as logging
from oslo_utils import eventletutils from oslo_utils import eventletutils
import tenacity from oslo_utils import importutils
from oslo_messaging._drivers import base from oslo_messaging._drivers import base
from oslo_messaging._drivers import common as driver_common from oslo_messaging._drivers import common as driver_common
from oslo_messaging._drivers.kafka_driver import kafka_options from oslo_messaging._drivers.kafka_driver import kafka_options
from oslo_messaging._i18n import _LE
from oslo_messaging._i18n import _LW
from oslo_serialization import jsonutils
import logging as l if eventletutils.EVENTLET_AVAILABLE:
l.basicConfig(level=l.INFO) tpool = importutils.try_import('eventlet.tpool')
l.getLogger("kafka").setLevel(l.WARN)
l.getLogger("stevedore").setLevel(l.WARN)
if eventletutils.is_monkey_patched('select'):
# monkeypatch the vendored SelectSelector._select like eventlet does
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
from eventlet.green import select
selectors.SelectSelector._select = staticmethod(select.select)
# Force to use the select selectors
KAFKA_SELECTOR = selectors.SelectSelector
else:
KAFKA_SELECTOR = selectors.DefaultSelector
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
def unpack_message(msg): def unpack_message(msg):
"""Unpack context and msg."""
context = {} context = {}
message = None message = None
msg = json.loads(msg) msg = jsonutils.loads(msg)
message = driver_common.deserialize_msg(msg) message = driver_common.deserialize_msg(msg)
context = message['_context'] context = message['_context']
del message['_context'] del message['_context']
@ -68,7 +44,6 @@ def unpack_message(msg):
def pack_message(ctxt, msg): def pack_message(ctxt, msg):
"""Pack context into msg.""" """Pack context into msg."""
if isinstance(ctxt, dict): if isinstance(ctxt, dict):
context_d = ctxt context_d = ctxt
else: else:
@ -97,25 +72,28 @@ def target_to_topic(target, priority=None, vhost=None):
return concat(".", [target.topic, priority, vhost]) return concat(".", [target.topic, priority, vhost])
def retry_on_retriable_kafka_error(exc): class ConsumerTimeout(KafkaException):
return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable) pass
def with_reconnect(retries=None): class AssignedPartition(object):
def decorator(func): """This class is used by the ConsumerConnection to track the
@tenacity.retry( assigned partitions.
retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error), """
wait=tenacity.wait_fixed(1), def __init__(self, topic, partition):
stop=tenacity.stop_after_attempt(retries), super(AssignedPartition, self).__init__()
reraise=True self.topic = topic
) self.partition = partition
def wrapper(*args, **kwargs): self.skey = '%s %d' % (self.topic, self.partition)
return func(*args, **kwargs)
return wrapper def to_dict(self):
return decorator return {'topic': self.topic, 'partition': self.partition}
class Connection(object): class Connection(object):
"""This is the base class for consumer and producer connections for
transport attributes.
"""
def __init__(self, conf, url): def __init__(self, conf, url):
@ -141,7 +119,7 @@ class Connection(object):
self.password = host.password self.password = host.password
else: else:
if self.username != host.username: if self.username != host.username:
LOG.warning(_LW("Different transport usernames detected")) LOG.warning("Different transport usernames detected")
if host.hostname: if host.hostname:
self.hostaddrs.append("%s:%s" % (host.hostname, host.port)) self.hostaddrs.append("%s:%s" % (host.hostname, host.port))
@ -152,7 +130,8 @@ class Connection(object):
class ConsumerConnection(Connection): class ConsumerConnection(Connection):
"""This is the class for kafka topic/assigned partition consumer
"""
def __init__(self, conf, url): def __init__(self, conf, url):
super(ConsumerConnection, self).__init__(conf, url) super(ConsumerConnection, self).__init__(conf, url)
@ -160,28 +139,59 @@ class ConsumerConnection(Connection):
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
self.group_id = self.driver_conf.consumer_group self.group_id = self.driver_conf.consumer_group
self.enable_auto_commit = self.driver_conf.enable_auto_commit self.use_auto_commit = self.driver_conf.enable_auto_commit
self.max_poll_records = self.driver_conf.max_poll_records self.max_poll_records = self.driver_conf.max_poll_records
self._consume_loop_stopped = False self._consume_loop_stopped = False
self.assignment_dict = dict()
def find_assignment(self, topic, partition):
"""Find and return existing assignment based on topic and partition"""
skey = '%s %d' % (topic, partition)
return self.assignment_dict.get(skey)
def on_assign(self, consumer, topic_partitions):
"""Rebalance on_assign callback"""
assignment = [AssignedPartition(p.topic, p.partition)
for p in topic_partitions]
self.assignment_dict = {a.skey: a for a in assignment}
for t in topic_partitions:
LOG.debug("Topic %s assigned to partition %d",
t.topic, t.partition)
def on_revoke(self, consumer, topic_partitions):
"""Rebalance on_revoke callback"""
self.assignment_dict = dict()
for t in topic_partitions:
LOG.debug("Topic %s revoked from partition %d",
t.topic, t.partition)
@with_reconnect()
def _poll_messages(self, timeout): def _poll_messages(self, timeout):
messages = self.consumer.poll(timeout * 1000.0) """Consume messages, callbacks and return list of messages"""
messages = [record.value msglist = self.consumer.consume(self.max_poll_records,
for records in messages.values() if records timeout)
for record in records]
if not messages:
# NOTE(sileht): really ? you return payload but no messages...
# simulate timeout to consume message again
raise kafka.errors.ConsumerNoMoreData()
if not self.enable_auto_commit: if ((len(self.assignment_dict) == 0) or (len(msglist) == 0)):
self.consumer.commit() raise ConsumerTimeout()
messages = []
for message in msglist:
if message is None:
break
a = self.find_assignment(message.topic(), message.partition())
if a is None:
LOG.warning(("Message for %s received on unassigned "
"partition %d"),
message.topic(), message.partition())
else:
messages.append(message.value())
if not self.use_auto_commit:
self.consumer.commit(asynchronous=False)
return messages return messages
def consume(self, timeout=None): def consume(self, timeout=None):
"""Receive up to 'max_fetch_messages' messages. """Receive messages.
:param timeout: poll timeout in seconds :param timeout: poll timeout in seconds
""" """
@ -199,12 +209,14 @@ class ConsumerConnection(Connection):
if self._consume_loop_stopped: if self._consume_loop_stopped:
return return
try: try:
if eventletutils.is_monkey_patched('thread'):
return tpool.execute(self._poll_messages, poll_timeout)
return self._poll_messages(poll_timeout) return self._poll_messages(poll_timeout)
except kafka.errors.ConsumerNoMoreData as exc: except ConsumerTimeout as exc:
poll_timeout = timer.check_return( poll_timeout = timer.check_return(
_raise_timeout, exc, maximum=self.consumer_timeout) _raise_timeout, exc, maximum=self.consumer_timeout)
except Exception: except Exception:
LOG.exception(_LE("Failed to consume messages")) LOG.exception("Failed to consume messages")
return return
def stop_consuming(self): def stop_consuming(self):
@ -215,21 +227,25 @@ class ConsumerConnection(Connection):
self.consumer.close() self.consumer.close()
self.consumer = None self.consumer = None
@with_reconnect()
def declare_topic_consumer(self, topics, group=None): def declare_topic_consumer(self, topics, group=None):
self.consumer = kafka.KafkaConsumer( conf = {
*topics, group_id=(group or self.group_id), 'bootstrap.servers': ",".join(self.hostaddrs),
enable_auto_commit=self.enable_auto_commit, 'group.id': (group or self.group_id),
bootstrap_servers=self.hostaddrs, 'enable.auto.commit': self.use_auto_commit,
max_partition_fetch_bytes=self.max_fetch_bytes, 'max.partition.fetch.bytes': self.max_fetch_bytes,
max_poll_records=self.max_poll_records, 'security.protocol': self.security_protocol,
security_protocol=self.security_protocol, 'sasl.mechanism': self.sasl_mechanism,
sasl_mechanism=self.sasl_mechanism, 'sasl.username': self.username,
sasl_plain_username=self.username, 'sasl.password': self.password,
sasl_plain_password=self.password, 'ssl.ca.location': self.ssl_cafile,
ssl_cafile=self.ssl_cafile, 'enable.partition.eof': False,
selector=KAFKA_SELECTOR 'default.topic.config': {'auto.offset.reset': 'latest'}
) }
LOG.debug("Subscribing to %s as %s", topics, (group or self.group_id))
self.consumer = confluent_kafka.Consumer(conf)
self.consumer.subscribe(topics,
on_assign=self.on_assign,
on_revoke=self.on_revoke)
class ProducerConnection(Connection): class ProducerConnection(Connection):
@ -242,6 +258,20 @@ class ProducerConnection(Connection):
self.producer = None self.producer = None
self.producer_lock = threading.Lock() self.producer_lock = threading.Lock()
def _produce_message(self, topic, message):
while True:
try:
self.producer.produce(topic, message)
except KafkaException as e:
LOG.error("Produce message failed: %s" % str(e))
except BufferError:
LOG.debug("Produce message queue full, waiting for deliveries")
self.producer.poll(0.5)
continue
break
self.producer.poll(0)
def notify_send(self, topic, ctxt, msg, retry): def notify_send(self, topic, ctxt, msg, retry):
"""Send messages to Kafka broker. """Send messages to Kafka broker.
@ -254,16 +284,11 @@ class ProducerConnection(Connection):
message = pack_message(ctxt, msg) message = pack_message(ctxt, msg)
message = jsonutils.dumps(message).encode('utf-8') message = jsonutils.dumps(message).encode('utf-8')
@with_reconnect(retries=retry)
def wrapped_with_reconnect():
self._ensure_producer()
# NOTE(sileht): This returns a future, we can use get()
# if we want to block like other driver
future = self.producer.send(topic, message)
future.get()
try: try:
wrapped_with_reconnect() self._ensure_producer()
if eventletutils.is_monkey_patched('thread'):
return tpool.execute(self._produce_message, topic, message)
return self._produce_message(topic, message)
except Exception: except Exception:
# NOTE(sileht): if something goes wrong close the producer # NOTE(sileht): if something goes wrong close the producer
# connection # connection
@ -276,7 +301,10 @@ class ProducerConnection(Connection):
def _close_producer(self): def _close_producer(self):
with self.producer_lock: with self.producer_lock:
if self.producer: if self.producer:
self.producer.close() try:
self.producer.flush()
except KafkaException:
LOG.error("Flush error during producer close")
self.producer = None self.producer = None
def _ensure_producer(self): def _ensure_producer(self):
@ -285,16 +313,17 @@ class ProducerConnection(Connection):
with self.producer_lock: with self.producer_lock:
if self.producer: if self.producer:
return return
self.producer = kafka.KafkaProducer( conf = {
bootstrap_servers=self.hostaddrs, 'bootstrap.servers': ",".join(self.hostaddrs),
linger_ms=self.linger_ms, 'linger.ms': self.linger_ms,
batch_size=self.batch_size, 'batch.num.messages': self.batch_size,
security_protocol=self.security_protocol, 'security.protocol': self.security_protocol,
sasl_mechanism=self.sasl_mechanism, 'sasl.mechanism': self.sasl_mechanism,
sasl_plain_username=self.username, 'sasl.username': self.username,
sasl_plain_password=self.password, 'sasl.password': self.password,
ssl_cafile=self.ssl_cafile, 'ssl.ca.location': self.ssl_cafile
selector=KAFKA_SELECTOR) }
self.producer = confluent_kafka.Producer(conf)
class OsloKafkaMessage(base.RpcIncomingMessage): class OsloKafkaMessage(base.RpcIncomingMessage):
@ -303,13 +332,13 @@ class OsloKafkaMessage(base.RpcIncomingMessage):
super(OsloKafkaMessage, self).__init__(ctxt, message) super(OsloKafkaMessage, self).__init__(ctxt, message)
def requeue(self): def requeue(self):
LOG.warning(_LW("requeue is not supported")) LOG.warning("requeue is not supported")
def reply(self, reply=None, failure=None): def reply(self, reply=None, failure=None):
LOG.warning(_LW("reply is not supported")) LOG.warning("reply is not supported")
def heartbeat(self): def heartbeat(self):
LOG.warning(_LW("heartbeat is not supported")) LOG.warning("heartbeat is not supported")
class KafkaListener(base.PollStyleListener): class KafkaListener(base.PollStyleListener):
@ -347,8 +376,9 @@ class KafkaListener(base.PollStyleListener):
class KafkaDriver(base.BaseDriver): class KafkaDriver(base.BaseDriver):
"""Note: Current implementation of this driver is experimental. """Kafka Driver
We will have functional and/or integrated testing enabled for this driver.
Note: Current implementation of this driver is experimental.
""" """
def __init__(self, conf, url, default_exchange=None, def __init__(self, conf, url, default_exchange=None,
@ -366,6 +396,7 @@ class KafkaDriver(base.BaseDriver):
for c in self.listeners: for c in self.listeners:
c.close() c.close()
self.listeners = [] self.listeners = []
LOG.info("Kafka messaging driver shutdown")
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None, def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
call_monitor_timeout=None, retry=None): call_monitor_timeout=None, retry=None):
@ -414,9 +445,9 @@ class KafkaDriver(base.BaseDriver):
:type pool: string :type pool: string
""" """
conn = ConsumerConnection(self.conf, self._url) conn = ConsumerConnection(self.conf, self._url)
topics = set() topics = []
for target, priority in targets_and_priorities: for target, priority in targets_and_priorities:
topics.add(target_to_topic(target, priority)) topics.append(target_to_topic(target, priority))
conn.declare_topic_consumer(topics, pool) conn.declare_topic_consumer(topics, pool)

View File

@ -11,8 +11,6 @@
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations # License for the specific language governing permissions and limitations
# under the License. # under the License.
import kafka
import kafka.errors
from six.moves import mock from six.moves import mock
import testscenarios import testscenarios
@ -77,6 +75,7 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
self.addCleanup(transport.cleanup) self.addCleanup(transport.cleanup)
driver = transport._driver driver = transport._driver
self.assertIsInstance(driver, kafka_driver.KafkaDriver)
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs) self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
self.assertEqual(self.expected['username'], driver.pconn.username) self.assertEqual(self.expected['username'], driver.pconn.username)
self.assertEqual(self.expected['password'], driver.pconn.password) self.assertEqual(self.expected['password'], driver.pconn.password)
@ -101,14 +100,20 @@ class TestKafkaDriver(test_utils.BaseTestCase):
def test_send_notification(self): def test_send_notification(self):
target = oslo_messaging.Target(topic="topic_test") target = oslo_messaging.Target(topic="topic_test")
with mock.patch("kafka.KafkaProducer") as fake_producer_class: with mock.patch("confluent_kafka.Producer") as producer:
fake_producer = fake_producer_class.return_value self.driver.send_notification(
fake_producer.send.side_effect = kafka.errors.NoBrokersAvailable target, {}, {"payload": ["test_1"]},
self.assertRaises(kafka.errors.NoBrokersAvailable, None, retry=3)
self.driver.send_notification, producer.assert_called_once_with({
target, {}, {"payload": ["test_1"]}, 'bootstrap.servers': '',
None, retry=3) 'linger.ms': mock.ANY,
self.assertEqual(3, fake_producer.send.call_count) 'batch.num.messages': mock.ANY,
'security.protocol': 'PLAINTEXT',
'sasl.mechanism': 'PLAIN',
'sasl.username': mock.ANY,
'sasl.password': mock.ANY,
'ssl.ca.location': ''
})
def test_listen(self): def test_listen(self):
target = oslo_messaging.Target(topic="topic_test") target = oslo_messaging.Target(topic="topic_test")
@ -119,23 +124,22 @@ class TestKafkaDriver(test_utils.BaseTestCase):
targets_and_priorities = [ targets_and_priorities = [
(oslo_messaging.Target(topic="topic_test_1"), "sample"), (oslo_messaging.Target(topic="topic_test_1"), "sample"),
] ]
expected_topics = ["topic_test_1.sample"] with mock.patch("confluent_kafka.Consumer") as consumer:
with mock.patch("kafka.KafkaConsumer") as consumer:
self.driver.listen_for_notifications( self.driver.listen_for_notifications(
targets_and_priorities, "kafka_test", 1000, 10) targets_and_priorities, "kafka_test", 1000, 10)
consumer.assert_called_once_with( consumer.assert_called_once_with({
*expected_topics, group_id="kafka_test", 'bootstrap.servers': '',
enable_auto_commit=mock.ANY, 'enable.partition.eof': False,
bootstrap_servers=[], 'group.id': 'kafka_test',
max_partition_fetch_bytes=mock.ANY, 'enable.auto.commit': mock.ANY,
max_poll_records=mock.ANY, 'max.partition.fetch.bytes': mock.ANY,
security_protocol='PLAINTEXT', 'security.protocol': 'PLAINTEXT',
sasl_mechanism='PLAIN', 'sasl.mechanism': 'PLAIN',
sasl_plain_username=mock.ANY, 'sasl.username': mock.ANY,
sasl_plain_password=mock.ANY, 'sasl.password': mock.ANY,
ssl_cafile='', 'ssl.ca.location': '',
selector=mock.ANY 'default.topic.config': {'auto.offset.reset': 'latest'}
) })
def test_cleanup(self): def test_cleanup(self):
listeners = [mock.MagicMock(), mock.MagicMock()] listeners = [mock.MagicMock(), mock.MagicMock()]
@ -155,10 +159,9 @@ class TestKafkaConnection(test_utils.BaseTestCase):
def test_notify(self): def test_notify(self):
with mock.patch("kafka.KafkaProducer") as fake_producer_class: with mock.patch("confluent_kafka.Producer") as producer:
fake_producer = fake_producer_class.return_value
self.driver.pconn.notify_send("fake_topic", self.driver.pconn.notify_send("fake_topic",
{"fake_ctxt": "fake_param"}, {"fake_ctxt": "fake_param"},
{"fake_text": "fake_message_1"}, {"fake_text": "fake_message_1"},
10) 10)
self.assertEqual(2, len(fake_producer.send.mock_calls)) assert producer.call_count == 1

View File

@ -328,8 +328,13 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# NOTE(sileht): Each test must not use the same topics # NOTE(sileht): Each test must not use the same topics
# to be run in parallel # to be run in parallel
# NOTE(ansmith): kafka partition assignment delay requires
# longer timeouts for test completion
def test_simple(self): def test_simple(self):
get_timeout = 1
if self.url.startswith("kafka://"): if self.url.startswith("kafka://"):
get_timeout = 5
self.conf.set_override('consumer_group', 'test_simple', self.conf.set_override('consumer_group', 'test_simple',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -338,14 +343,16 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
notifier = listener.notifier('abc') notifier = listener.notifier('abc')
notifier.info({}, 'test', 'Hello World!') notifier.info({}, 'test', 'Hello World!')
event = listener.events.get(timeout=1) event = listener.events.get(timeout=get_timeout)
self.assertEqual('info', event[0]) self.assertEqual('info', event[0])
self.assertEqual('test', event[1]) self.assertEqual('test', event[1])
self.assertEqual('Hello World!', event[2]) self.assertEqual('Hello World!', event[2])
self.assertEqual('abc', event[3]) self.assertEqual('abc', event[3])
def test_multiple_topics(self): def test_multiple_topics(self):
get_timeout = 1
if self.url.startswith("kafka://"): if self.url.startswith("kafka://"):
get_timeout = 5
self.conf.set_override('consumer_group', 'test_multiple_topics', self.conf.set_override('consumer_group', 'test_multiple_topics',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -363,7 +370,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
received = {} received = {}
while len(received) < len(sent): while len(received) < len(sent):
e = listener.events.get(timeout=1) e = listener.events.get(timeout=get_timeout)
received[e[3]] = e received[e[3]] = e
for key in received: for key in received:
@ -374,10 +381,15 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[2], actual[2]) self.assertEqual(expected[2], actual[2])
def test_multiple_servers(self): def test_multiple_servers(self):
timeout = 0.5
if self.url.startswith("amqp:"): if self.url.startswith("amqp:"):
self.skipTest("QPID-6307") self.skipTest("QPID-6307")
if self.url.startswith("kafka"): if self.url.startswith("kafka://"):
self.skipTest("Kafka: Need to be fixed") self.skipTest("Kafka: needs to be fixed")
timeout = 5
self.conf.set_override('consumer_group',
'test_multiple_servers',
group='oslo_messaging_kafka')
listener_a = self.useFixture( listener_a = self.useFixture(
utils.NotificationFixture(self.conf, self.url, ['test-topic'])) utils.NotificationFixture(self.conf, self.url, ['test-topic']))
@ -391,15 +403,17 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
for event_type, payload in events_out: for event_type, payload in events_out:
n.info({}, event_type, payload) n.info({}, event_type, payload)
events_in = [[(e[1], e[2]) for e in listener_a.get_events()], events_in = [[(e[1], e[2]) for e in listener_a.get_events(timeout)],
[(e[1], e[2]) for e in listener_b.get_events()]] [(e[1], e[2]) for e in listener_b.get_events(timeout)]]
self.assertThat(events_in, utils.IsValidDistributionOf(events_out)) self.assertThat(events_in, utils.IsValidDistributionOf(events_out))
for stream in events_in: for stream in events_in:
self.assertThat(len(stream), matchers.GreaterThan(0)) self.assertThat(len(stream), matchers.GreaterThan(0))
def test_independent_topics(self): def test_independent_topics(self):
get_timeout = 0.5
if self.url.startswith("kafka://"): if self.url.startswith("kafka://"):
get_timeout = 5
self.conf.set_override('consumer_group', self.conf.set_override('consumer_group',
'test_independent_topics_a', 'test_independent_topics_a',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -425,7 +439,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
b.info({}, event_type, payload) b.info({}, event_type, payload)
def check_received(listener, publisher, messages): def check_received(listener, publisher, messages):
actuals = sorted([listener.events.get(timeout=0.5) actuals = sorted([listener.events.get(timeout=get_timeout)
for __ in range(len(a_out))]) for __ in range(len(a_out))])
expected = sorted([['info', m[0], m[1], publisher] expected = sorted([['info', m[0], m[1], publisher]
for m in messages]) for m in messages])
@ -435,7 +449,9 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
check_received(listener_b, "pub-2", b_out) check_received(listener_b, "pub-2", b_out)
def test_all_categories(self): def test_all_categories(self):
get_timeout = 1
if self.url.startswith("kafka://"): if self.url.startswith("kafka://"):
get_timeout = 5
self.conf.set_override('consumer_group', 'test_all_categories', self.conf.set_override('consumer_group', 'test_all_categories',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
@ -451,7 +467,7 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# order between events with different categories is not guaranteed # order between events with different categories is not guaranteed
received = {} received = {}
for expected in events: for expected in events:
e = listener.events.get(timeout=1) e = listener.events.get(timeout=get_timeout)
received[e[0]] = e received[e[0]] = e
for expected in events: for expected in events:
@ -461,6 +477,8 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
self.assertEqual(expected[3], actual[2]) self.assertEqual(expected[3], actual[2])
def test_simple_batch(self): def test_simple_batch(self):
get_timeout = 3
batch_timeout = 2
if self.url.startswith("amqp:"): if self.url.startswith("amqp:"):
backend = os.environ.get("AMQP1_BACKEND") backend = os.environ.get("AMQP1_BACKEND")
if backend == "qdrouterd": if backend == "qdrouterd":
@ -468,18 +486,21 @@ class NotifyTestCase(utils.SkipIfNoTransportURL):
# sender pends until batch_size or timeout reached # sender pends until batch_size or timeout reached
self.skipTest("qdrouterd backend") self.skipTest("qdrouterd backend")
if self.url.startswith("kafka://"): if self.url.startswith("kafka://"):
get_timeout = 10
batch_timeout = 5
self.conf.set_override('consumer_group', 'test_simple_batch', self.conf.set_override('consumer_group', 'test_simple_batch',
group='oslo_messaging_kafka') group='oslo_messaging_kafka')
listener = self.useFixture( listener = self.useFixture(
utils.BatchNotificationFixture(self.conf, self.url, utils.BatchNotificationFixture(self.conf, self.url,
['test_simple_batch'], ['test_simple_batch'],
batch_size=100, batch_timeout=2)) batch_size=100,
batch_timeout=batch_timeout))
notifier = listener.notifier('abc') notifier = listener.notifier('abc')
for i in six.moves.range(0, 205): for i in six.moves.range(0, 205):
notifier.info({}, 'test%s' % i, 'Hello World!') notifier.info({}, 'test%s' % i, 'Hello World!')
events = listener.get_events(timeout=3) events = listener.get_events(timeout=get_timeout)
self.assertEqual(3, len(events)) self.assertEqual(3, len(events))
self.assertEqual(100, len(events[0][1])) self.assertEqual(100, len(events[0][1]))
self.assertEqual(100, len(events[1][1])) self.assertEqual(100, len(events[1][1]))

View File

@ -313,9 +313,6 @@ class SkipIfNoTransportURL(test_utils.BaseTestCase):
kafka_options.register_opts(conf, transport_url) kafka_options.register_opts(conf, transport_url)
self.config(producer_batch_size=0,
group='oslo_messaging_kafka')
class NotificationFixture(fixtures.Fixture): class NotificationFixture(fixtures.Fixture):
def __init__(self, conf, url, topics, batch=None): def __init__(self, conf, url, topics, batch=None):

View File

@ -0,0 +1,13 @@
---
fixes:
- |
Threading issues with the kafka-python consumer client were identified
and documented. The driver has been updated to integrate the
confluent-kafka python library. The confluent-kafka client
leverages the high performance librdkafka C client and is safe
for multiple thread use.
upgrade:
- |
With the change in the client library used, projects using the
Kafka driver should use extras oslo.messaging[kafka] to pull in
dependencies for the driver.

View File

@ -4,7 +4,7 @@ set -e
. tools/functions.sh . tools/functions.sh
SCALA_VERSION=${SCALA_VERSION:-"2.12"} SCALA_VERSION=${SCALA_VERSION:-"2.12"}
KAFKA_VERSION=${KAFKA_VERSION:-"1.1.0"} KAFKA_VERSION=${KAFKA_VERSION:-"2.0.0"}
if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then if [[ -z "$(which kafka-server-start)" ]] && [[ -z $(which kafka-server-start.sh) ]]; then
DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX) DATADIR=$(mktemp -d /tmp/OSLOMSG-KAFKA.XXXXX)

View File

@ -25,8 +25,7 @@ classifier =
amqp1 = amqp1 =
pyngus>=2.2.0 # Apache-2.0 pyngus>=2.2.0 # Apache-2.0
kafka = kafka =
kafka-python>=1.3.1 # Apache-2.0 confluent-kafka>=0.11.6 # Apache-2.0
tenacity>=4.4.0 # Apache-2.0
[files] [files]
packages = packages =

View File

@ -14,8 +14,7 @@ oslotest>=3.2.0 # Apache-2.0
pifpaf>=0.10.0 # Apache-2.0 pifpaf>=0.10.0 # Apache-2.0
# for test_impl_kafka # for test_impl_kafka
tenacity>=4.4.0 # Apache-2.0 confluent-kafka>=0.11.6 # Apache-2.0
kafka-python>=1.3.1 # Apache-2.0
# when we can require tox>= 1.4, this can go into tox.ini: # when we can require tox>= 1.4, this can go into tox.ini:
# [testenv:cover] # [testenv:cover]