b34ab8b1cc
This adds an optional call_monitor_timeout parameter to the RPC client, which if specified, will enable heartbeating of long-running calls by the server. This enables the user to increase the regular timeout to a much larger value, allowing calls to take a very long time, but with heartbeating to indicate that they are still running on the server side. If the server stops heartbeating, then the call_monitor_timeout takes over and we fail with the usual MessagingTimeout instead of waiting for the longer overall timeout to expire. Change-Id: I60334aaf019f177a984583528b71d00859d31f84
433 lines
15 KiB
Python
433 lines
15 KiB
Python
# Copyright (C) 2015 Cisco Systems, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Following code fixes 2 issues with kafka-python and
|
|
# The current release of eventlet (0.19.0) does not actually remove
|
|
# select.poll [1]. Because of kafka-python.selectors34 selects
|
|
# PollSelector instead of SelectSelector [2]. PollSelector relies on
|
|
# select.poll, which does not work when eventlet/greenlet is used. This
|
|
# bug in evenlet is fixed in the master branch [3], but there's no
|
|
# release of eventlet that includes this fix at this point.
|
|
|
|
import json
|
|
import threading
|
|
|
|
import kafka
|
|
from kafka.client_async import selectors
|
|
import kafka.errors
|
|
from oslo_log import log as logging
|
|
from oslo_utils import eventletutils
|
|
import tenacity
|
|
|
|
from oslo_messaging._drivers import base
|
|
from oslo_messaging._drivers import common as driver_common
|
|
from oslo_messaging._drivers.kafka_driver import kafka_options
|
|
from oslo_messaging._i18n import _LE
|
|
from oslo_messaging._i18n import _LW
|
|
from oslo_serialization import jsonutils
|
|
|
|
import logging as l
|
|
l.basicConfig(level=l.INFO)
|
|
l.getLogger("kafka").setLevel(l.WARN)
|
|
l.getLogger("stevedore").setLevel(l.WARN)
|
|
|
|
if eventletutils.is_monkey_patched('select'):
|
|
# monkeypatch the vendored SelectSelector._select like eventlet does
|
|
# https://github.com/eventlet/eventlet/blob/master/eventlet/green/selectors.py#L32
|
|
from eventlet.green import select
|
|
selectors.SelectSelector._select = staticmethod(select.select)
|
|
|
|
# Force to use the select selectors
|
|
KAFKA_SELECTOR = selectors.SelectSelector
|
|
else:
|
|
KAFKA_SELECTOR = selectors.DefaultSelector
|
|
|
|
LOG = logging.getLogger(__name__)
|
|
|
|
|
|
def unpack_message(msg):
|
|
context = {}
|
|
message = None
|
|
msg = json.loads(msg)
|
|
message = driver_common.deserialize_msg(msg)
|
|
context = message['_context']
|
|
del message['_context']
|
|
return context, message
|
|
|
|
|
|
def pack_message(ctxt, msg):
|
|
"""Pack context into msg."""
|
|
|
|
if isinstance(ctxt, dict):
|
|
context_d = ctxt
|
|
else:
|
|
context_d = ctxt.to_dict()
|
|
msg['_context'] = context_d
|
|
|
|
msg = driver_common.serialize_msg(msg)
|
|
|
|
return msg
|
|
|
|
|
|
def concat(sep, items):
|
|
return sep.join(filter(bool, items))
|
|
|
|
|
|
def target_to_topic(target, priority=None, vhost=None):
|
|
"""Convert target into topic string
|
|
|
|
:param target: Message destination target
|
|
:type target: oslo_messaging.Target
|
|
:param priority: Notification priority
|
|
:type priority: string
|
|
:param priority: Notification vhost
|
|
:type priority: string
|
|
"""
|
|
return concat(".", [target.topic, priority, vhost])
|
|
|
|
|
|
def retry_on_retriable_kafka_error(exc):
|
|
return (isinstance(exc, kafka.errors.KafkaError) and exc.retriable)
|
|
|
|
|
|
def with_reconnect(retries=None):
|
|
def decorator(func):
|
|
@tenacity.retry(
|
|
retry=tenacity.retry_if_exception(retry_on_retriable_kafka_error),
|
|
wait=tenacity.wait_fixed(1),
|
|
stop=tenacity.stop_after_attempt(retries),
|
|
reraise=True
|
|
)
|
|
def wrapper(*args, **kwargs):
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
class Connection(object):
|
|
|
|
def __init__(self, conf, url):
|
|
|
|
self.driver_conf = conf.oslo_messaging_kafka
|
|
self.security_protocol = self.driver_conf.security_protocol
|
|
self.sasl_mechanism = self.driver_conf.sasl_mechanism
|
|
self.ssl_cafile = self.driver_conf.ssl_cafile
|
|
self.url = url
|
|
self.virtual_host = url.virtual_host
|
|
self._parse_url()
|
|
|
|
def _parse_url(self):
|
|
self.hostaddrs = []
|
|
self.username = None
|
|
self.password = None
|
|
|
|
for host in self.url.hosts:
|
|
# NOTE(ansmith): connections and failover are transparently
|
|
# managed by the client library. Credentials will be
|
|
# selectd from first host encountered in transport_url
|
|
if self.username is None:
|
|
self.username = host.username
|
|
self.password = host.password
|
|
else:
|
|
if self.username != host.username:
|
|
LOG.warning(_LW("Different transport usernames detected"))
|
|
|
|
if host.hostname:
|
|
self.hostaddrs.append("%s:%s" % (
|
|
host.hostname,
|
|
host.port or self.driver_conf.kafka_default_port))
|
|
|
|
if not self.hostaddrs:
|
|
self.hostaddrs.append("%s:%s" %
|
|
(self.driver_conf.kafka_default_host,
|
|
self.driver_conf.kafka_default_port))
|
|
|
|
def reset(self):
|
|
"""Reset a connection so it can be used again."""
|
|
pass
|
|
|
|
|
|
class ConsumerConnection(Connection):
|
|
|
|
def __init__(self, conf, url):
|
|
|
|
super(ConsumerConnection, self).__init__(conf, url)
|
|
self.consumer = None
|
|
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
|
|
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
|
|
self.group_id = self.driver_conf.consumer_group
|
|
self.enable_auto_commit = self.driver_conf.enable_auto_commit
|
|
self.max_poll_records = self.driver_conf.max_poll_records
|
|
self._consume_loop_stopped = False
|
|
|
|
@with_reconnect()
|
|
def _poll_messages(self, timeout):
|
|
messages = self.consumer.poll(timeout * 1000.0)
|
|
messages = [record.value
|
|
for records in messages.values() if records
|
|
for record in records]
|
|
if not messages:
|
|
# NOTE(sileht): really ? you return payload but no messages...
|
|
# simulate timeout to consume message again
|
|
raise kafka.errors.ConsumerTimeout()
|
|
|
|
if not self.enable_auto_commit:
|
|
self.consumer.commit()
|
|
|
|
return messages
|
|
|
|
def consume(self, timeout=None):
|
|
"""Receive up to 'max_fetch_messages' messages.
|
|
|
|
:param timeout: poll timeout in seconds
|
|
"""
|
|
|
|
def _raise_timeout(exc):
|
|
raise driver_common.Timeout(str(exc))
|
|
|
|
timer = driver_common.DecayingTimer(duration=timeout)
|
|
timer.start()
|
|
|
|
poll_timeout = (self.consumer_timeout if timeout is None
|
|
else min(timeout, self.consumer_timeout))
|
|
|
|
while True:
|
|
if self._consume_loop_stopped:
|
|
return
|
|
try:
|
|
return self._poll_messages(poll_timeout)
|
|
except kafka.errors.ConsumerTimeout as exc:
|
|
poll_timeout = timer.check_return(
|
|
_raise_timeout, exc, maximum=self.consumer_timeout)
|
|
except Exception:
|
|
LOG.exception(_LE("Failed to consume messages"))
|
|
return
|
|
|
|
def stop_consuming(self):
|
|
self._consume_loop_stopped = True
|
|
|
|
def close(self):
|
|
if self.consumer:
|
|
self.consumer.close()
|
|
self.consumer = None
|
|
|
|
@with_reconnect()
|
|
def declare_topic_consumer(self, topics, group=None):
|
|
self.consumer = kafka.KafkaConsumer(
|
|
*topics, group_id=(group or self.group_id),
|
|
enable_auto_commit=self.enable_auto_commit,
|
|
bootstrap_servers=self.hostaddrs,
|
|
max_partition_fetch_bytes=self.max_fetch_bytes,
|
|
max_poll_records=self.max_poll_records,
|
|
security_protocol=self.security_protocol,
|
|
sasl_mechanism=self.sasl_mechanism,
|
|
sasl_plain_username=self.username,
|
|
sasl_plain_password=self.password,
|
|
ssl_cafile=self.ssl_cafile,
|
|
selector=KAFKA_SELECTOR
|
|
)
|
|
|
|
|
|
class ProducerConnection(Connection):
|
|
|
|
def __init__(self, conf, url):
|
|
|
|
super(ProducerConnection, self).__init__(conf, url)
|
|
self.batch_size = self.driver_conf.producer_batch_size
|
|
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
|
|
self.producer = None
|
|
self.producer_lock = threading.Lock()
|
|
|
|
def notify_send(self, topic, ctxt, msg, retry):
|
|
"""Send messages to Kafka broker.
|
|
|
|
:param topic: String of the topic
|
|
:param ctxt: context for the messages
|
|
:param msg: messages for publishing
|
|
:param retry: the number of retry
|
|
"""
|
|
retry = retry if retry >= 0 else None
|
|
message = pack_message(ctxt, msg)
|
|
message = jsonutils.dumps(message).encode('utf-8')
|
|
|
|
@with_reconnect(retries=retry)
|
|
def wrapped_with_reconnect():
|
|
self._ensure_producer()
|
|
# NOTE(sileht): This returns a future, we can use get()
|
|
# if we want to block like other driver
|
|
future = self.producer.send(topic, message)
|
|
future.get()
|
|
|
|
try:
|
|
wrapped_with_reconnect()
|
|
except Exception:
|
|
# NOTE(sileht): if something goes wrong close the producer
|
|
# connection
|
|
self._close_producer()
|
|
raise
|
|
|
|
def close(self):
|
|
self._close_producer()
|
|
|
|
def _close_producer(self):
|
|
with self.producer_lock:
|
|
if self.producer:
|
|
self.producer.close()
|
|
self.producer = None
|
|
|
|
def _ensure_producer(self):
|
|
if self.producer:
|
|
return
|
|
with self.producer_lock:
|
|
if self.producer:
|
|
return
|
|
self.producer = kafka.KafkaProducer(
|
|
bootstrap_servers=self.hostaddrs,
|
|
linger_ms=self.linger_ms,
|
|
batch_size=self.batch_size,
|
|
security_protocol=self.security_protocol,
|
|
sasl_mechanism=self.sasl_mechanism,
|
|
sasl_plain_username=self.username,
|
|
sasl_plain_password=self.password,
|
|
ssl_cafile=self.ssl_cafile,
|
|
selector=KAFKA_SELECTOR)
|
|
|
|
|
|
class OsloKafkaMessage(base.RpcIncomingMessage):
|
|
|
|
def __init__(self, ctxt, message):
|
|
super(OsloKafkaMessage, self).__init__(ctxt, message)
|
|
|
|
def requeue(self):
|
|
LOG.warning(_LW("requeue is not supported"))
|
|
|
|
def reply(self, reply=None, failure=None):
|
|
LOG.warning(_LW("reply is not supported"))
|
|
|
|
def heartbeat(self):
|
|
LOG.warning(_LW("heartbeat is not supported"))
|
|
|
|
|
|
class KafkaListener(base.PollStyleListener):
|
|
|
|
def __init__(self, conn):
|
|
super(KafkaListener, self).__init__()
|
|
self._stopped = threading.Event()
|
|
self.conn = conn
|
|
self.incoming_queue = []
|
|
|
|
# FIXME(sileht): We do a first poll to ensure we topics are created
|
|
# This is a workaround mainly for functional tests, in real life
|
|
# this is fine if topics are not created synchroneously
|
|
self.poll(5)
|
|
|
|
@base.batch_poll_helper
|
|
def poll(self, timeout=None):
|
|
while not self._stopped.is_set():
|
|
if self.incoming_queue:
|
|
return self.incoming_queue.pop(0)
|
|
try:
|
|
messages = self.conn.consume(timeout=timeout) or []
|
|
for message in messages:
|
|
msg = OsloKafkaMessage(*unpack_message(message))
|
|
self.incoming_queue.append(msg)
|
|
except driver_common.Timeout:
|
|
return None
|
|
|
|
def stop(self):
|
|
self._stopped.set()
|
|
self.conn.stop_consuming()
|
|
|
|
def cleanup(self):
|
|
self.conn.close()
|
|
|
|
|
|
class KafkaDriver(base.BaseDriver):
|
|
"""Note: Current implementation of this driver is experimental.
|
|
We will have functional and/or integrated testing enabled for this driver.
|
|
"""
|
|
|
|
def __init__(self, conf, url, default_exchange=None,
|
|
allowed_remote_exmods=None):
|
|
conf = kafka_options.register_opts(conf, url)
|
|
super(KafkaDriver, self).__init__(
|
|
conf, url, default_exchange, allowed_remote_exmods)
|
|
|
|
self.listeners = []
|
|
self.virtual_host = url.virtual_host
|
|
self.pconn = ProducerConnection(conf, url)
|
|
|
|
def cleanup(self):
|
|
self.pconn.close()
|
|
for c in self.listeners:
|
|
c.close()
|
|
self.listeners = []
|
|
|
|
def send(self, target, ctxt, message, wait_for_reply=None, timeout=None,
|
|
call_monitor_timeout=None, retry=None):
|
|
raise NotImplementedError(
|
|
'The RPC implementation for Kafka is not implemented')
|
|
|
|
def send_notification(self, target, ctxt, message, version, retry=None):
|
|
"""Send notification to Kafka brokers
|
|
|
|
:param target: Message destination target
|
|
:type target: oslo_messaging.Target
|
|
:param ctxt: Message context
|
|
:type ctxt: dict
|
|
:param message: Message payload to pass
|
|
:type message: dict
|
|
:param version: Messaging API version (currently not used)
|
|
:type version: str
|
|
:param call_monitor_timeout: Maximum time the client will wait for the
|
|
call to complete before or receive a message heartbeat indicating
|
|
the remote side is still executing.
|
|
:type call_monitor_timeout: float
|
|
:param retry: an optional default kafka consumer retries configuration
|
|
None means to retry forever
|
|
0 means no retry
|
|
N means N retries
|
|
:type retry: int
|
|
"""
|
|
self.pconn.notify_send(target_to_topic(target,
|
|
vhost=self.virtual_host),
|
|
ctxt, message, retry)
|
|
|
|
def listen(self, target, batch_size, batch_timeout):
|
|
raise NotImplementedError(
|
|
'The RPC implementation for Kafka is not implemented')
|
|
|
|
def listen_for_notifications(self, targets_and_priorities, pool,
|
|
batch_size, batch_timeout):
|
|
"""Listen to a specified list of targets on Kafka brokers
|
|
|
|
:param targets_and_priorities: List of pairs (target, priority)
|
|
priority is not used for kafka driver
|
|
target.exchange_target.topic is used as
|
|
a kafka topic
|
|
:type targets_and_priorities: list
|
|
:param pool: consumer group of Kafka consumers
|
|
:type pool: string
|
|
"""
|
|
conn = ConsumerConnection(self.conf, self._url)
|
|
topics = set()
|
|
for target, priority in targets_and_priorities:
|
|
topics.add(target_to_topic(target, priority))
|
|
|
|
conn.declare_topic_consumer(topics, pool)
|
|
|
|
listener = KafkaListener(conn)
|
|
return base.PollStyleListenerAdapter(listener, batch_size,
|
|
batch_timeout)
|