Add kafka config options for security (ssl/sasl)
Change-Id: Ia1be1b67a7151d449185e2ad52eff1787e8b0933
This commit is contained in:
parent
66c056dd56
commit
90f7610f9d
@ -119,24 +119,39 @@ class Connection(object):
|
|||||||
|
|
||||||
def __init__(self, conf, url):
|
def __init__(self, conf, url):
|
||||||
|
|
||||||
self.conf = conf
|
self.driver_conf = conf.oslo_messaging_kafka
|
||||||
|
self.security_protocol = self.driver_conf.security_protocol
|
||||||
|
self.sasl_mechanism = self.driver_conf.sasl_mechanism
|
||||||
|
self.ssl_cafile = self.driver_conf.ssl_cafile
|
||||||
self.url = url
|
self.url = url
|
||||||
self.virtual_host = url.virtual_host
|
self.virtual_host = url.virtual_host
|
||||||
self._parse_url()
|
self._parse_url()
|
||||||
|
|
||||||
def _parse_url(self):
|
def _parse_url(self):
|
||||||
driver_conf = self.conf.oslo_messaging_kafka
|
|
||||||
self.hostaddrs = []
|
self.hostaddrs = []
|
||||||
|
self.username = None
|
||||||
|
self.password = None
|
||||||
|
|
||||||
for host in self.url.hosts:
|
for host in self.url.hosts:
|
||||||
|
# NOTE(ansmith): connections and failover are transparently
|
||||||
|
# managed by the client library. Credentials will be
|
||||||
|
# selectd from first host encountered in transport_url
|
||||||
|
if self.username is None:
|
||||||
|
self.username = host.username
|
||||||
|
self.password = host.password
|
||||||
|
else:
|
||||||
|
if self.username != host.username:
|
||||||
|
LOG.warning(_LW("Different transport usernames detected"))
|
||||||
|
|
||||||
if host.hostname:
|
if host.hostname:
|
||||||
self.hostaddrs.append("%s:%s" % (
|
self.hostaddrs.append("%s:%s" % (
|
||||||
host.hostname,
|
host.hostname,
|
||||||
host.port or driver_conf.kafka_default_port))
|
host.port or self.driver_conf.kafka_default_port))
|
||||||
|
|
||||||
if not self.hostaddrs:
|
if not self.hostaddrs:
|
||||||
self.hostaddrs.append("%s:%s" % (driver_conf.kafka_default_host,
|
self.hostaddrs.append("%s:%s" %
|
||||||
driver_conf.kafka_default_port))
|
(self.driver_conf.kafka_default_host,
|
||||||
|
self.driver_conf.kafka_default_port))
|
||||||
|
|
||||||
def reset(self):
|
def reset(self):
|
||||||
"""Reset a connection so it can be used again."""
|
"""Reset a connection so it can be used again."""
|
||||||
@ -148,13 +163,12 @@ class ConsumerConnection(Connection):
|
|||||||
def __init__(self, conf, url):
|
def __init__(self, conf, url):
|
||||||
|
|
||||||
super(ConsumerConnection, self).__init__(conf, url)
|
super(ConsumerConnection, self).__init__(conf, url)
|
||||||
driver_conf = self.conf.oslo_messaging_kafka
|
|
||||||
self.consumer = None
|
self.consumer = None
|
||||||
self.consumer_timeout = driver_conf.kafka_consumer_timeout
|
self.consumer_timeout = self.driver_conf.kafka_consumer_timeout
|
||||||
self.max_fetch_bytes = driver_conf.kafka_max_fetch_bytes
|
self.max_fetch_bytes = self.driver_conf.kafka_max_fetch_bytes
|
||||||
self.group_id = driver_conf.consumer_group
|
self.group_id = self.driver_conf.consumer_group
|
||||||
self.enable_auto_commit = driver_conf.enable_auto_commit
|
self.enable_auto_commit = self.driver_conf.enable_auto_commit
|
||||||
self.max_poll_records = driver_conf.max_poll_records
|
self.max_poll_records = self.driver_conf.max_poll_records
|
||||||
self._consume_loop_stopped = False
|
self._consume_loop_stopped = False
|
||||||
|
|
||||||
@with_reconnect()
|
@with_reconnect()
|
||||||
@ -216,6 +230,11 @@ class ConsumerConnection(Connection):
|
|||||||
bootstrap_servers=self.hostaddrs,
|
bootstrap_servers=self.hostaddrs,
|
||||||
max_partition_fetch_bytes=self.max_fetch_bytes,
|
max_partition_fetch_bytes=self.max_fetch_bytes,
|
||||||
max_poll_records=self.max_poll_records,
|
max_poll_records=self.max_poll_records,
|
||||||
|
security_protocol=self.security_protocol,
|
||||||
|
sasl_mechanism=self.sasl_mechanism,
|
||||||
|
sasl_plain_username=self.username,
|
||||||
|
sasl_plain_password=self.password,
|
||||||
|
ssl_cafile=self.ssl_cafile,
|
||||||
selector=KAFKA_SELECTOR
|
selector=KAFKA_SELECTOR
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -225,9 +244,8 @@ class ProducerConnection(Connection):
|
|||||||
def __init__(self, conf, url):
|
def __init__(self, conf, url):
|
||||||
|
|
||||||
super(ProducerConnection, self).__init__(conf, url)
|
super(ProducerConnection, self).__init__(conf, url)
|
||||||
driver_conf = self.conf.oslo_messaging_kafka
|
self.batch_size = self.driver_conf.producer_batch_size
|
||||||
self.batch_size = driver_conf.producer_batch_size
|
self.linger_ms = self.driver_conf.producer_batch_timeout * 1000
|
||||||
self.linger_ms = driver_conf.producer_batch_timeout * 1000
|
|
||||||
self.producer = None
|
self.producer = None
|
||||||
self.producer_lock = threading.Lock()
|
self.producer_lock = threading.Lock()
|
||||||
|
|
||||||
@ -278,6 +296,11 @@ class ProducerConnection(Connection):
|
|||||||
bootstrap_servers=self.hostaddrs,
|
bootstrap_servers=self.hostaddrs,
|
||||||
linger_ms=self.linger_ms,
|
linger_ms=self.linger_ms,
|
||||||
batch_size=self.batch_size,
|
batch_size=self.batch_size,
|
||||||
|
security_protocol=self.security_protocol,
|
||||||
|
sasl_mechanism=self.sasl_mechanism,
|
||||||
|
sasl_plain_username=self.username,
|
||||||
|
sasl_plain_password=self.password,
|
||||||
|
ssl_cafile=self.ssl_cafile,
|
||||||
selector=KAFKA_SELECTOR)
|
selector=KAFKA_SELECTOR)
|
||||||
|
|
||||||
|
|
||||||
|
@ -63,7 +63,20 @@ KAFKA_OPTS = [
|
|||||||
help='Enable asynchronous consumer commits'),
|
help='Enable asynchronous consumer commits'),
|
||||||
|
|
||||||
cfg.IntOpt('max_poll_records', default=500,
|
cfg.IntOpt('max_poll_records', default=500,
|
||||||
help='The maximum number of records returned in a poll call')
|
help='The maximum number of records returned in a poll call'),
|
||||||
|
|
||||||
|
cfg.StrOpt('security_protocol', default='PLAINTEXT',
|
||||||
|
choices=('PLAINTEXT', 'SASL_PLAINTEXT', 'SSL', 'SASL_SSL'),
|
||||||
|
help='Protocol used to communicate with brokers'),
|
||||||
|
|
||||||
|
cfg.StrOpt('sasl_mechanism',
|
||||||
|
default='PLAIN',
|
||||||
|
help='Mechanism when security protocol is SASL'),
|
||||||
|
|
||||||
|
cfg.StrOpt('ssl_cafile',
|
||||||
|
default='',
|
||||||
|
help='CA certificate PEM file used to verify the server'
|
||||||
|
' certificate')
|
||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
@ -39,24 +39,47 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
|||||||
scenarios = [
|
scenarios = [
|
||||||
('none', dict(url=None,
|
('none', dict(url=None,
|
||||||
expected=dict(hostaddrs=['localhost:9092'],
|
expected=dict(hostaddrs=['localhost:9092'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost=None))),
|
vhost=None))),
|
||||||
('empty', dict(url='kafka:///',
|
('empty', dict(url='kafka:///',
|
||||||
expected=dict(hostaddrs=['localhost:9092'],
|
expected=dict(hostaddrs=['localhost:9092'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost=''))),
|
vhost=''))),
|
||||||
('host', dict(url='kafka://127.0.0.1',
|
('host', dict(url='kafka://127.0.0.1',
|
||||||
expected=dict(hostaddrs=['127.0.0.1:9092'],
|
expected=dict(hostaddrs=['127.0.0.1:9092'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost=None))),
|
vhost=None))),
|
||||||
('port', dict(url='kafka://localhost:1234',
|
('port', dict(url='kafka://localhost:1234',
|
||||||
expected=dict(hostaddrs=['localhost:1234'],
|
expected=dict(hostaddrs=['localhost:1234'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost=None))),
|
vhost=None))),
|
||||||
('vhost', dict(url='kafka://localhost:1234/my_host',
|
('vhost', dict(url='kafka://localhost:1234/my_host',
|
||||||
expected=dict(hostaddrs=['localhost:1234'],
|
expected=dict(hostaddrs=['localhost:1234'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost='my_host'))),
|
vhost='my_host'))),
|
||||||
('two', dict(url='kafka://localhost:1234,localhost2:1234',
|
('two', dict(url='kafka://localhost:1234,localhost2:1234',
|
||||||
expected=dict(hostaddrs=['localhost:1234',
|
expected=dict(hostaddrs=['localhost:1234',
|
||||||
'localhost2:1234'],
|
'localhost2:1234'],
|
||||||
|
username=None,
|
||||||
|
password=None,
|
||||||
vhost=None))),
|
vhost=None))),
|
||||||
|
('user', dict(url='kafka://stack:stacksecret@localhost:9092/my_host',
|
||||||
|
expected=dict(hostaddrs=['localhost:9092'],
|
||||||
|
username='stack',
|
||||||
|
password='stacksecret',
|
||||||
|
vhost='my_host'))),
|
||||||
|
('user2', dict(url='kafka://stack:stacksecret@localhost:9092,'
|
||||||
|
'stack2:stacksecret2@localhost:1234/my_host',
|
||||||
|
expected=dict(hostaddrs=['localhost:9092',
|
||||||
|
'localhost:1234'],
|
||||||
|
username='stack',
|
||||||
|
password='stacksecret',
|
||||||
|
vhost='my_host'))),
|
||||||
]
|
]
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
@ -70,6 +93,8 @@ class TestKafkaTransportURL(test_utils.BaseTestCase):
|
|||||||
driver = transport._driver
|
driver = transport._driver
|
||||||
|
|
||||||
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
|
self.assertEqual(self.expected['hostaddrs'], driver.pconn.hostaddrs)
|
||||||
|
self.assertEqual(self.expected['username'], driver.pconn.username)
|
||||||
|
self.assertEqual(self.expected['password'], driver.pconn.password)
|
||||||
self.assertEqual(self.expected['vhost'], driver.virtual_host)
|
self.assertEqual(self.expected['vhost'], driver.virtual_host)
|
||||||
|
|
||||||
|
|
||||||
@ -119,6 +144,11 @@ class TestKafkaDriver(test_utils.BaseTestCase):
|
|||||||
bootstrap_servers=['localhost:9092'],
|
bootstrap_servers=['localhost:9092'],
|
||||||
max_partition_fetch_bytes=mock.ANY,
|
max_partition_fetch_bytes=mock.ANY,
|
||||||
max_poll_records=mock.ANY,
|
max_poll_records=mock.ANY,
|
||||||
|
security_protocol='PLAINTEXT',
|
||||||
|
sasl_mechanism='PLAIN',
|
||||||
|
sasl_plain_username=mock.ANY,
|
||||||
|
sasl_plain_password=mock.ANY,
|
||||||
|
ssl_cafile='',
|
||||||
selector=mock.ANY
|
selector=mock.ANY
|
||||||
)
|
)
|
||||||
|
|
||||||
|
Loading…
x
Reference in New Issue
Block a user