diff --git a/oslo_messaging/_drivers/impl_kafka.py b/oslo_messaging/_drivers/impl_kafka.py index ec3296b5c..007727c9f 100644 --- a/oslo_messaging/_drivers/impl_kafka.py +++ b/oslo_messaging/_drivers/impl_kafka.py @@ -119,24 +119,39 @@ class Connection(object): def __init__(self, conf, url): - self.conf = conf + self.driver_conf = conf.oslo_messaging_kafka + self.security_protocol = self.driver_conf.security_protocol + self.sasl_mechanism = self.driver_conf.sasl_mechanism + self.ssl_cafile = self.driver_conf.ssl_cafile self.url = url self.virtual_host = url.virtual_host self._parse_url() def _parse_url(self): - driver_conf = self.conf.oslo_messaging_kafka self.hostaddrs = [] + self.username = None + self.password = None for host in self.url.hosts: + # NOTE(ansmith): connections and failover are transparently + # managed by the client library. Credentials will be + # selectd from first host encountered in transport_url + if self.username is None: + self.username = host.username + self.password = host.password + else: + if self.username != host.username: + LOG.warning(_LW("Different transport usernames detected")) + if host.hostname: self.hostaddrs.append("%s:%s" % ( host.hostname, - host.port or driver_conf.kafka_default_port)) + host.port or self.driver_conf.kafka_default_port)) if not self.hostaddrs: - self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host, - driver_conf.kafka_default_port)) + self.hostaddrs.append("%s:%s" % + (self.driver_conf.kafka_default_host, + self.driver_conf.kafka_default_port)) def reset(self): """Reset a connection so it can be used again.""" @@ -148,13 +163,12 @@ class ConsumerConnection(Connection): def __init__(self, conf, url): super(ConsumerConnection, self).__init__(conf, url) - driver_conf = self.conf.oslo_messaging_kafka self.consumer = None - self.consumer_timeout = driver_conf.kafka_consumer_timeout - self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes - self.group_id = driver_conf.consumer_group - self.enable_auto_commit = driver_conf.enable_auto_commit - self.max_poll_records = driver_conf.max_poll_records + self.consumer_timeout = self.driver_conf.kafka_consumer_timeout + self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes + self.group_id = self.driver_conf.consumer_group + self.enable_auto_commit = self.driver_conf.enable_auto_commit + self.max_poll_records = self.driver_conf.max_poll_records self._consume_loop_stopped = False @with_reconnect() @@ -216,6 +230,11 @@ class ConsumerConnection(Connection): bootstrap_servers=self.hostaddrs, max_partition_fetch_bytes=self.max_fetch_bytes, max_poll_records=self.max_poll_records, + security_protocol=self.security_protocol, + sasl_mechanism=self.sasl_mechanism, + sasl_plain_username=self.username, + sasl_plain_password=self.password, + ssl_cafile=self.ssl_cafile, selector=KAFKA_SELECTOR ) @@ -225,9 +244,8 @@ class ProducerConnection(Connection): def __init__(self, conf, url): super(ProducerConnection, self).__init__(conf, url) - driver_conf = self.conf.oslo_messaging_kafka - self.batch_size = driver_conf.producer_batch_size - self.linger_ms = driver_conf.producer_batch_timeout * 1000 + self.batch_size = self.driver_conf.producer_batch_size + self.linger_ms = self.driver_conf.producer_batch_timeout * 1000 self.producer = None self.producer_lock = threading.Lock() @@ -278,6 +296,11 @@ class ProducerConnection(Connection): bootstrap_servers=self.hostaddrs, linger_ms=self.linger_ms, batch_size=self.batch_size, + security_protocol=self.security_protocol, + sasl_mechanism=self.sasl_mechanism, + sasl_plain_username=self.username, + sasl_plain_password=self.password, + ssl_cafile=self.ssl_cafile, selector=KAFKA_SELECTOR) diff --git a/oslo_messaging/_drivers/kafka_driver/kafka_options.py b/oslo_messaging/_drivers/kafka_driver/kafka_options.py index e435866d0..d68b7efeb 100644 --- a/oslo_messaging/_drivers/kafka_driver/kafka_options.py +++ b/oslo_messaging/_drivers/kafka_driver/kafka_options.py @@ -63,7 +63,20 @@ KAFKA_OPTS = [ help='Enable asynchronous consumer commits'), cfg.IntOpt('max_poll_records', default=500, - help='The maximum number of records returned in a poll call') + help='The maximum number of records returned in a poll call'), + + cfg.StrOpt('security_protocol', default='PLAINTEXT', + choices=('PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL'), + help='Protocol used to communicate with brokers'), + + cfg.StrOpt('sasl_mechanism', + default='PLAIN', + help='Mechanism when security protocol is SASL'), + + cfg.StrOpt('ssl_cafile', + default='', + help='CA certificate PEM file used to verify the server' + ' certificate') ] diff --git a/oslo_messaging/tests/drivers/test_impl_kafka.py b/oslo_messaging/tests/drivers/test_impl_kafka.py index a34ce639b..a4860bd55 100644 --- a/oslo_messaging/tests/drivers/test_impl_kafka.py +++ b/oslo_messaging/tests/drivers/test_impl_kafka.py @@ -39,24 +39,47 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): scenarios = [ ('none', dict(url=None, expected=dict(hostaddrs=['localhost:9092'], + username=None, + password=None, vhost=None))), ('empty', dict(url='kafka:///', expected=dict(hostaddrs=['localhost:9092'], + username=None, + password=None, vhost=''))), ('host', dict(url='kafka://127.0.0.1', expected=dict(hostaddrs=['127.0.0.1:9092'], + username=None, + password=None, vhost=None))), ('port', dict(url='kafka://localhost:1234', expected=dict(hostaddrs=['localhost:1234'], + username=None, + password=None, vhost=None))), ('vhost', dict(url='kafka://localhost:1234/my_host', expected=dict(hostaddrs=['localhost:1234'], + username=None, + password=None, vhost='my_host'))), ('two', dict(url='kafka://localhost:1234,localhost2:1234', expected=dict(hostaddrs=['localhost:1234', 'localhost2:1234'], + username=None, + password=None, vhost=None))), - + ('user', dict(url='kafka://stack:stacksecret@localhost:9092/my_host', + expected=dict(hostaddrs=['localhost:9092'], + username='stack', + password='stacksecret', + vhost='my_host'))), + ('user2', dict(url='kafka://stack:stacksecret@localhost:9092,' + 'stack2:stacksecret2@localhost:1234/my_host', + expected=dict(hostaddrs=['localhost:9092', + 'localhost:1234'], + username='stack', + password='stacksecret', + vhost='my_host'))), ] def setUp(self): @@ -70,6 +93,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase): driver = transport._driver self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs) + self.assertEqual(self.expected['username'], driver.pconn.username) + self.assertEqual(self.expected['password'], driver.pconn.password) self.assertEqual(self.expected['vhost'], driver.virtual_host) @@ -119,6 +144,11 @@ class TestKafkaDriver(test_utils.BaseTestCase): bootstrap_servers=['localhost:9092'], max_partition_fetch_bytes=mock.ANY, max_poll_records=mock.ANY, + security_protocol='PLAINTEXT', + sasl_mechanism='PLAIN', + sasl_plain_username=mock.ANY, + sasl_plain_password=mock.ANY, + ssl_cafile='', selector=mock.ANY )