amqp1: Do not reuse _socket_connection on reconnect
Each _SocketConnection object is unique per-peer. For example, the properties attribute may contain keys such as 'x-ssl-peer-name'. Reusing the existing _socket_connection during failover will cause the TLS handshake to fail since the peer name will not match. There is potential for other similar-yet-unexplored bad things to happen as well. Instead, reconnect by waking up the eventloop via the _do_reconnect method, which reconstructs the connection properties to reflect the new (failed-over-to) host and ultimately crates a new _SocketConnection (or re-uses a *valid* old one) in eventloop.Thread.connect(). Closes-Bug: #1938945 Change-Id: I0c8dc447f4dc8d0d08c312a1f3e6fa1745fb69fd
This commit is contained in:
parent
01f5b37874
commit
f9de265f39
@ -1265,7 +1265,7 @@ class Controller(pyngus.ConnectionEventHandler):
|
|||||||
host = self.hosts.next()
|
host = self.hosts.next()
|
||||||
LOG.info("Reconnecting to: %(hostname)s:%(port)s",
|
LOG.info("Reconnecting to: %(hostname)s:%(port)s",
|
||||||
{'hostname': host.hostname, 'port': host.port})
|
{'hostname': host.hostname, 'port': host.port})
|
||||||
self._socket_connection.connect(host)
|
self.processor.wakeup(lambda: self._do_connect())
|
||||||
|
|
||||||
def _hard_reset(self, reason):
|
def _hard_reset(self, reason):
|
||||||
"""Reset the controller to its pre-connection state"""
|
"""Reset the controller to its pre-connection state"""
|
||||||
|
@ -1553,33 +1553,42 @@ class TestMessageRetransmit(_AmqpBrokerTestCase):
|
|||||||
|
|
||||||
|
|
||||||
@testtools.skipUnless(SSL_ENABLED, "OpenSSL not supported")
|
@testtools.skipUnless(SSL_ENABLED, "OpenSSL not supported")
|
||||||
class TestSSL(test_utils.BaseTestCase):
|
class TestSSL(TestFailover):
|
||||||
"""Test the driver's OpenSSL integration"""
|
"""Test the driver's OpenSSL integration"""
|
||||||
|
|
||||||
def setUp(self):
|
def setUp(self):
|
||||||
super(TestSSL, self).setUp()
|
self._broker = None
|
||||||
# Create the CA, server, and client SSL certificates:
|
# Create the CA, server, and client SSL certificates:
|
||||||
self._tmpdir = tempfile.mkdtemp(prefix='amqp1')
|
self._tmpdir = tempfile.mkdtemp(prefix='amqp1')
|
||||||
files = ['ca_key', 'ca_cert', 's_key', 's_req', 's_cert', 'c_key',
|
files = ['ca_key', 'ca_cert', 's_key', 's_req', 's_cert', 's2_key',
|
||||||
'c_req', 'c_cert', 'bad_cert', 'bad_req', 'bad_key']
|
's2_req', 's2_cert', 'c_key', 'c_req', 'c_cert', 'bad_cert',
|
||||||
|
'bad_req', 'bad_key']
|
||||||
conf = dict(zip(files, [os.path.join(self._tmpdir, "%s.pem" % f)
|
conf = dict(zip(files, [os.path.join(self._tmpdir, "%s.pem" % f)
|
||||||
for f in files]))
|
for f in files]))
|
||||||
conf['pw'] = 'password'
|
conf['pw'] = 'password'
|
||||||
conf['s_name'] = '127.0.0.1'
|
conf['s_name'] = '127.0.0.1'
|
||||||
|
conf['s2_name'] = '127.0.0.2'
|
||||||
conf['c_name'] = 'client.com'
|
conf['c_name'] = 'client.com'
|
||||||
|
|
||||||
self._ssl_config = conf
|
self._ssl_config = conf
|
||||||
ssl_setup = [
|
ssl_setup = [
|
||||||
# create self-signed CA certificate:
|
# create self-signed CA certificate:
|
||||||
Template('openssl req -x509 -nodes -newkey rsa:2048'
|
Template('openssl req -x509 -nodes -newkey rsa:2048'
|
||||||
' -subj "/CN=Trusted.CA.com" -keyout ${ca_key}'
|
' -subj "/CN=Trusted.CA.com" -keyout ${ca_key}'
|
||||||
' -out ${ca_cert}').substitute(conf),
|
' -out ${ca_cert}').substitute(conf),
|
||||||
# create Server key and certificate:
|
# create Server keys and certificates:
|
||||||
Template('openssl genrsa -out ${s_key} 2048').substitute(conf),
|
Template('openssl genrsa -out ${s_key} 2048').substitute(conf),
|
||||||
Template('openssl req -new -key ${s_key} -subj /CN=${s_name}'
|
Template('openssl req -new -key ${s_key} -subj /CN=${s_name}'
|
||||||
' -passin pass:${pw} -out ${s_req}').substitute(conf),
|
' -passin pass:${pw} -out ${s_req}').substitute(conf),
|
||||||
Template('openssl x509 -req -in ${s_req} -CA ${ca_cert}'
|
Template('openssl x509 -req -in ${s_req} -CA ${ca_cert}'
|
||||||
' -CAkey ${ca_key} -CAcreateserial -out'
|
' -CAkey ${ca_key} -CAcreateserial -out'
|
||||||
' ${s_cert}').substitute(conf),
|
' ${s_cert}').substitute(conf),
|
||||||
|
Template('openssl genrsa -out ${s2_key} 2048').substitute(conf),
|
||||||
|
Template('openssl req -new -key ${s2_key} -subj /CN=${s2_name}'
|
||||||
|
' -passin pass:${pw} -out ${s2_req}').substitute(conf),
|
||||||
|
Template('openssl x509 -req -in ${s2_req} -CA ${ca_cert}'
|
||||||
|
' -CAkey ${ca_key} -CAcreateserial -out'
|
||||||
|
' ${s2_cert}').substitute(conf),
|
||||||
# create a "bad" Server cert for testing CN validation:
|
# create a "bad" Server cert for testing CN validation:
|
||||||
Template('openssl genrsa -out ${bad_key} 2048').substitute(conf),
|
Template('openssl genrsa -out ${bad_key} 2048').substitute(conf),
|
||||||
Template('openssl req -new -key ${bad_key} -subj /CN=Invalid'
|
Template('openssl req -new -key ${bad_key} -subj /CN=Invalid'
|
||||||
@ -1604,10 +1613,30 @@ class TestSSL(test_utils.BaseTestCase):
|
|||||||
self._tmpdir = None
|
self._tmpdir = None
|
||||||
self.skipTest("OpenSSL tools not installed - skipping")
|
self.skipTest("OpenSSL tools not installed - skipping")
|
||||||
|
|
||||||
def _ssl_server_ok(self, url):
|
super(TestSSL, self).setUp()
|
||||||
self._broker.start()
|
|
||||||
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
|
self.config(ssl_ca_file=self._ssl_config['ca_cert'],
|
||||||
group='oslo_messaging_amqp')
|
group='oslo_messaging_amqp')
|
||||||
|
|
||||||
|
def _gen_brokers(self):
|
||||||
|
s2_conf = self._ssl_config.copy()
|
||||||
|
for item in ['name', 'key', 'req', 'cert']:
|
||||||
|
s2_conf["s_%s" % item] = s2_conf["s2_%s" % item]
|
||||||
|
|
||||||
|
return [FakeBroker(self.conf.oslo_messaging_amqp,
|
||||||
|
sock_addr=self._ssl_config['s_name'],
|
||||||
|
ssl_config=self._ssl_config),
|
||||||
|
FakeBroker(self.conf.oslo_messaging_amqp,
|
||||||
|
sock_addr=s2_conf['s_name'],
|
||||||
|
ssl_config=s2_conf)]
|
||||||
|
|
||||||
|
def _gen_transport_url(self, hosts):
|
||||||
|
url = "amqp://%s" % (",".join(map(lambda x: "%s:%d" %
|
||||||
|
(x.hostname, x.port), hosts)))
|
||||||
|
return oslo_messaging.TransportURL.parse(self.conf, url)
|
||||||
|
|
||||||
|
def _ssl_server_ok(self, url):
|
||||||
|
self._broker.start()
|
||||||
tport_url = oslo_messaging.TransportURL.parse(self.conf, url)
|
tport_url = oslo_messaging.TransportURL.parse(self.conf, url)
|
||||||
driver = amqp_driver.ProtonDriver(self.conf, tport_url)
|
driver = amqp_driver.ProtonDriver(self.conf, tport_url)
|
||||||
target = oslo_messaging.Target(topic="test-topic")
|
target = oslo_messaging.Target(topic="test-topic")
|
||||||
|
Loading…
Reference in New Issue
Block a user