Fixed inconfigurable kafka consumer offset location
PROBLEM: Consumer offset was resetting to the latest index rather than the earliest SOLUTION: Modified consumer creation to include `auto_offset_reset="smallest"` which allows the offset to reset to the earliest known index. NOTE: This does exactly what the whence parameter in SimpleConsumer.seek() is expected to do, however in order to achieve this functionality, the parameter `auto_offset_reset` MUST be set to either "largest" or "smallest". Change-Id: I887892d80f2da9619c7f11737b3ab2e1d1dacf1e
This commit is contained in:
parent
5711485cf2
commit
94e0a059b5
@ -95,6 +95,12 @@ class KafkaConsumer(object):
|
||||
|
||||
def _create_kafka_consumer(self, partitions=None):
|
||||
# No auto-commit so that commits only happen after the message is processed.
|
||||
|
||||
# auto_offset_reset is a param that alters where the current offset in the consumer
|
||||
# will modify from (see whence param in SimpleConsumer.seek()). It is imperative to set
|
||||
# this param as either "largest" or "smallest" depending on where we would like
|
||||
# to modify the offset from, no matter what whence is set to.
|
||||
|
||||
consumer = kafka_consumer.SimpleConsumer(
|
||||
self._kafka,
|
||||
self._kafka_group,
|
||||
@ -104,7 +110,8 @@ class KafkaConsumer(object):
|
||||
iter_timeout=5,
|
||||
fetch_size_bytes=self._kafka_fetch_size,
|
||||
buffer_size=self._kafka_fetch_size,
|
||||
max_buffer_size=None)
|
||||
max_buffer_size=None,
|
||||
auto_offset_reset="smallest")
|
||||
|
||||
consumer.provide_partition_info()
|
||||
consumer.fetch_last_known_offsets()
|
||||
@ -127,7 +134,7 @@ class KafkaConsumer(object):
|
||||
# an OffsetOutOfRangeError. We trap this error and seek to
|
||||
# the head of the current Kafka data. Because this error
|
||||
# only happens when Kafka removes data we're currently
|
||||
# pointing at we're gauranteed that we won't read any
|
||||
# pointing at we're guaranteed that we won't read any
|
||||
# duplicate data however we will lose any information
|
||||
# between our current offset and the new Kafka head.
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user