Merge "Add support for synchronous commit"
This commit is contained in:
commit
66c056dd56
@ -153,6 +153,8 @@ class ConsumerConnection(Connection):
|
||||
self.consumer_timeout = driver_conf.kafka_consumer_timeout
|
||||
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
||||
self.group_id = driver_conf.consumer_group
|
||||
self.enable_auto_commit = driver_conf.enable_auto_commit
|
||||
self.max_poll_records = driver_conf.max_poll_records
|
||||
self._consume_loop_stopped = False
|
||||
|
||||
@with_reconnect()
|
||||
@ -165,6 +167,10 @@ class ConsumerConnection(Connection):
|
||||
# NOTE(sileht): really ? you return payload but no messages...
|
||||
# simulate timeout to consume message again
|
||||
raise kafka.errors.ConsumerTimeout()
|
||||
|
||||
if not self.enable_auto_commit:
|
||||
self.consumer.commit()
|
||||
|
||||
return messages
|
||||
|
||||
def consume(self, timeout=None):
|
||||
@ -204,14 +210,12 @@ class ConsumerConnection(Connection):
|
||||
|
||||
@with_reconnect()
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
# TODO(Support for manual/auto_commit functionality)
|
||||
# When auto_commit is False, consumer can manually notify
|
||||
# the completion of the subscription.
|
||||
# Currently we don't support for non auto commit option
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=(group or self.group_id),
|
||||
enable_auto_commit=self.enable_auto_commit,
|
||||
bootstrap_servers=self.hostaddrs,
|
||||
max_partition_fetch_bytes=self.max_fetch_bytes,
|
||||
max_poll_records=self.max_poll_records,
|
||||
selector=KAFKA_SELECTOR
|
||||
)
|
||||
|
||||
|
@ -56,7 +56,14 @@ KAFKA_OPTS = [
|
||||
"in seconds"),
|
||||
|
||||
cfg.IntOpt('producer_batch_size', default=16384,
|
||||
help='Size of batch for the producer async send')
|
||||
help='Size of batch for the producer async send'),
|
||||
|
||||
cfg.BoolOpt('enable_auto_commit',
|
||||
default=False,
|
||||
help='Enable asynchronous consumer commits'),
|
||||
|
||||
cfg.IntOpt('max_poll_records', default=500,
|
||||
help='The maximum number of records returned in a poll call')
|
||||
]
|
||||
|
||||
|
||||
|
@ -115,8 +115,10 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
||||
targets_and_priorities, "kafka_test", 1000, 10)
|
||||
consumer.assert_called_once_with(
|
||||
*expected_topics, group_id="kafka_test",
|
||||
enable_auto_commit=mock.ANY,
|
||||
bootstrap_servers=['localhost:9092'],
|
||||
max_partition_fetch_bytes=mock.ANY,
|
||||
max_poll_records=mock.ANY,
|
||||
selector=mock.ANY
|
||||
)
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user