Merge "Add kafka config options for security (ssl/sasl)"

This commit is contained in:
Zuul 2018-02-25 19:12:12 +00:00 committed by Gerrit Code Review
commit 5d65ee6675
3 changed files with 82 additions and 16 deletions

View File

@ -119,24 +119,39 @@ class Connection(object):
def __init__(self, conf, url):
self.conf = conf
self.driver_conf = conf.oslo_messaging_kafka
self.security_protocol = self.driver_conf.security_protocol
self.sasl_mechanism = self.driver_conf.sasl_mechanism
self.ssl_cafile = self.driver_conf.ssl_cafile
self.url = url
self.virtual_host = url.virtual_host
self._parse_url()
def _parse_url(self):
driver_conf = self.conf.oslo_messaging_kafka
self.hostaddrs = []
self.username = None
self.password = None
for host in self.url.hosts:
# NOTE(ansmith): connections and failover are transparently
# managed by the client library. Credentials will be
# selectd from first host encountered in transport_url
if self.username is None:
self.username = host.username
self.password = host.password
else:
if self.username != host.username:
LOG.warning(_LW("Different transport usernames detected"))
if host.hostname:
self.hostaddrs.append("%s:%s" % (
host.hostname,
host.port or driver_conf.kafka_default_port))
host.port or self.driver_conf.kafka_default_port))
if not self.hostaddrs:
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
driver_conf.kafka_default_port))
self.hostaddrs.append("%s:%s" %
(self.driver_conf.kafka_default_host,
self.driver_conf.kafka_default_port))
def reset(self):
"""Reset a connection so it can be used again."""
@ -148,13 +163,12 @@ class ConsumerConnection(Connection):
def __init__(self, conf, url):
super(ConsumerConnection, self).__init__(conf, url)
driver_conf = self.conf.oslo_messaging_kafka
self.consumer = None
self.consumer_timeout = driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
self.group_id = driver_conf.consumer_group
self.enable_auto_commit = driver_conf.enable_auto_commit
self.max_poll_records = driver_conf.max_poll_records
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
self.group_id = self.driver_conf.consumer_group
self.enable_auto_commit = self.driver_conf.enable_auto_commit
self.max_poll_records = self.driver_conf.max_poll_records
self._consume_loop_stopped = False
@with_reconnect()
@ -216,6 +230,11 @@ class ConsumerConnection(Connection):
bootstrap_servers=self.hostaddrs,
max_partition_fetch_bytes=self.max_fetch_bytes,
max_poll_records=self.max_poll_records,
security_protocol=self.security_protocol,
sasl_mechanism=self.sasl_mechanism,
sasl_plain_username=self.username,
sasl_plain_password=self.password,
ssl_cafile=self.ssl_cafile,
selector=KAFKA_SELECTOR
)
@ -225,9 +244,8 @@ class ProducerConnection(Connection):
def __init__(self, conf, url):
super(ProducerConnection, self).__init__(conf, url)
driver_conf = self.conf.oslo_messaging_kafka
self.batch_size = driver_conf.producer_batch_size
self.linger_ms = driver_conf.producer_batch_timeout * 1000
self.batch_size = self.driver_conf.producer_batch_size
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
self.producer = None
self.producer_lock = threading.Lock()
@ -278,6 +296,11 @@ class ProducerConnection(Connection):
bootstrap_servers=self.hostaddrs,
linger_ms=self.linger_ms,
batch_size=self.batch_size,
security_protocol=self.security_protocol,
sasl_mechanism=self.sasl_mechanism,
sasl_plain_username=self.username,
sasl_plain_password=self.password,
ssl_cafile=self.ssl_cafile,
selector=KAFKA_SELECTOR)

View File

@ -63,7 +63,20 @@ KAFKA_OPTS = [
help='Enable asynchronous consumer commits'),
cfg.IntOpt('max_poll_records', default=500,
help='The maximum number of records returned in a poll call')
help='The maximum number of records returned in a poll call'),
cfg.StrOpt('security_protocol', default='PLAINTEXT',
choices=('PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL'),
help='Protocol used to communicate with brokers'),
cfg.StrOpt('sasl_mechanism',
default='PLAIN',
help='Mechanism when security protocol is SASL'),
cfg.StrOpt('ssl_cafile',
default='',
help='CA certificate PEM file used to verify the server'
' certificate')
]

View File

@ -39,24 +39,47 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
scenarios = [
('none', dict(url=None,
expected=dict(hostaddrs=['localhost:9092'],
username=None,
password=None,
vhost=None))),
('empty', dict(url='kafka:///',
expected=dict(hostaddrs=['localhost:9092'],
username=None,
password=None,
vhost=''))),
('host', dict(url='kafka://127.0.0.1',
expected=dict(hostaddrs=['127.0.0.1:9092'],
username=None,
password=None,
vhost=None))),
('port', dict(url='kafka://localhost:1234',
expected=dict(hostaddrs=['localhost:1234'],
username=None,
password=None,
vhost=None))),
('vhost', dict(url='kafka://localhost:1234/my_host',
expected=dict(hostaddrs=['localhost:1234'],
username=None,
password=None,
vhost='my_host'))),
('two', dict(url='kafka://localhost:1234,localhost2:1234',
expected=dict(hostaddrs=['localhost:1234',
'localhost2:1234'],
username=None,
password=None,
vhost=None))),
('user', dict(url='kafka://stack:stacksecret@localhost:9092/my_host',
expected=dict(hostaddrs=['localhost:9092'],
username='stack',
password='stacksecret',
vhost='my_host'))),
('user2', dict(url='kafka://stack:stacksecret@localhost:9092,'
'stack2:stacksecret2@localhost:1234/my_host',
expected=dict(hostaddrs=['localhost:9092',
'localhost:1234'],
username='stack',
password='stacksecret',
vhost='my_host'))),
]
def setUp(self):
@ -70,6 +93,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
driver = transport._driver
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
self.assertEqual(self.expected['username'], driver.pconn.username)
self.assertEqual(self.expected['password'], driver.pconn.password)
self.assertEqual(self.expected['vhost'], driver.virtual_host)
@ -119,6 +144,11 @@ class TestKafkaDriver(test_utils.BaseTestCase):
bootstrap_servers=['localhost:9092'],
max_partition_fetch_bytes=mock.ANY,
max_poll_records=mock.ANY,
security_protocol='PLAINTEXT',
sasl_mechanism='PLAIN',
sasl_plain_username=mock.ANY,
sasl_plain_password=mock.ANY,
ssl_cafile='',
selector=mock.ANY
)