Get kafka notifications to work with kafka-python 0.9.5
* poll() now returns empty list instead of None * metadata_broker_list has been dropped, there's a new bootstrap_servers * Add a LOG.debug() for the actual message python simulator.py -d DEBUG --topic notifications.info --url kafka://localhost:9092 notify-server python simulator.py -d DEBUG --topic notifications --url kafka://localhost:9092 notify-client -m 1000 -w 1 More tips: http://kafka.apache.org/documentation.html#quickstart https://www.digitalocean.com/community/tutorials/how-to-install-apache-kafka-on-ubuntu-14-04 Depends-On: Ia324b4c89b05c536708baf7950857cd159578cec Change-Id: I80911d2678ea5e8d0cd6b146a1e29a58858e3144
This commit is contained in:
parent
11f78de5f0
commit
b97950ea38
@ -228,8 +228,7 @@ class Connection(object):
|
||||
def declare_topic_consumer(self, topics, group=None):
|
||||
self.consumer = kafka.KafkaConsumer(
|
||||
*topics, group_id=group,
|
||||
metadata_broker_list=["%s:%s" % (self.host, str(self.port))],
|
||||
# auto_commit_enable=self.auto_commit,
|
||||
bootstrap_servers=["%s:%s" % (self.host, str(self.port))],
|
||||
fetch_message_max_bytes=self.fetch_messages_max_bytes)
|
||||
|
||||
|
||||
@ -262,6 +261,7 @@ class KafkaListener(base.Listener):
|
||||
messages = self.conn.consume(timeout=timeout)
|
||||
for msg in messages:
|
||||
message = msg.value
|
||||
LOG.debug('poll got message : %s', message)
|
||||
message = jsonutils.loads(message)
|
||||
self.incoming_queue.append(OsloKafkaMessage(
|
||||
ctxt=message['context'], message=message['message']))
|
||||
|
@ -284,4 +284,4 @@ class TestWithRealKafkaBroker(test_utils.BaseTestCase):
|
||||
deadline = time.time() + 3
|
||||
received_message = listener.poll(timeout=3)
|
||||
self.assertEqual(0, int(deadline - time.time()))
|
||||
self.assertIsNone(received_message)
|
||||
self.assertEqual([], received_message)
|
||||
|
Loading…
x
Reference in New Issue
Block a user