From 5810d016e18a655c7ef2f3c3e7a74dbedc690812 Mon Sep 17 00:00:00 2001 From: James Page Date: Wed, 28 Feb 2024 12:13:33 +0000 Subject: [PATCH] Update sslutils.wrap for newer Pythons Make use of SSLContext to configure and then wrap sockets. For Pythons >= 3.12 ssl.wrap_socket was been remove and this is the only approach for configuring SSL options. This should be compatible with Python 3.6 or later. Change-Id: Ie6632c60db2bda10ad13a2ac2931b88ad52c10f4 --- oslo_service/sslutils.py | 41 +++++++++++----------- oslo_service/tests/test_sslutils.py | 53 +++++++++++++++++------------ 2 files changed, 51 insertions(+), 43 deletions(-) diff --git a/oslo_service/sslutils.py b/oslo_service/sslutils.py index 5fdd91b9..222207c6 100644 --- a/oslo_service/sslutils.py +++ b/oslo_service/sslutils.py @@ -77,28 +77,27 @@ def is_enabled(conf): def wrap(conf, sock): conf.register_opts(_options.ssl_opts, config_section) - ssl_kwargs = { - 'server_side': True, - 'certfile': conf.ssl.cert_file, - 'keyfile': conf.ssl.key_file, - 'cert_reqs': ssl.CERT_NONE, - } + + ssl_version = ssl.PROTOCOL_TLS_SERVER + if conf.ssl.version: + key = conf.ssl.version.lower() + try: + ssl_version = _SSL_PROTOCOLS[key] + except KeyError: + raise RuntimeError( + _("Invalid SSL version : %s") % conf.ssl.version) + + context = ssl.SSLContext(ssl_version) + context.load_cert_chain(conf.ssl.cert_file, conf.ssl.key_file) if conf.ssl.ca_file: - ssl_kwargs['ca_certs'] = conf.ssl.ca_file - ssl_kwargs['cert_reqs'] = ssl.CERT_REQUIRED + context.verify_mode = ssl.CERT_REQUIRED + context.load_verify_locations(conf.ssl.ca_file) + else: + context.check_hostname = False + context.verify_mode = ssl.CERT_NONE - if conf.ssl.version: - key = conf.ssl.version.lower() - try: - ssl_kwargs['ssl_version'] = _SSL_PROTOCOLS[key] - except KeyError: - raise RuntimeError( - _("Invalid SSL version : %s") % conf.ssl.version) + if conf.ssl.ciphers: + context.set_ciphers(conf.ssl.ciphers) - if conf.ssl.ciphers: - ssl_kwargs['ciphers'] = conf.ssl.ciphers - - # NOTE(eezhova): SSL/TLS protocol version is injected in ssl_kwargs above, - # so skipping bandit check - return ssl.wrap_socket(sock, **ssl_kwargs) # nosec + return context.wrap_socket(sock, server_side=True) diff --git a/oslo_service/tests/test_sslutils.py b/oslo_service/tests/test_sslutils.py index 026911d3..ddc77e5a 100644 --- a/oslo_service/tests/test_sslutils.py +++ b/oslo_service/tests/test_sslutils.py @@ -78,24 +78,40 @@ class SslutilsTestCase(base.ServiceBaseTestCase): group=sslutils.config_section) self.assertRaises(RuntimeError, sslutils.is_enabled, self.conf) - @mock.patch("ssl.wrap_socket") + @mock.patch("ssl.SSLContext") @mock.patch("os.path.exists") - def _test_wrap(self, exists_mock, wrap_socket_mock, **kwargs): + def _test_wrap(self, exists_mock, ssl_context_mock, + ca_file=None, + ciphers=None, + ssl_version=None): exists_mock.return_value = True sock = mock.Mock() + context = mock.Mock() + ssl_context_mock.return_value = context self.conf.set_default("cert_file", self.cert_file_name, group=sslutils.config_section) self.conf.set_default("key_file", self.key_file_name, group=sslutils.config_section) - ssl_kwargs = {'server_side': True, - 'certfile': self.conf.ssl.cert_file, - 'keyfile': self.conf.ssl.key_file, - 'cert_reqs': ssl.CERT_NONE, - } - if kwargs: - ssl_kwargs.update(**kwargs) sslutils.wrap(self.conf, sock) - wrap_socket_mock.assert_called_once_with(sock, **ssl_kwargs) + ssl_version = ssl_version or ssl.PROTOCOL_TLS_SERVER + ssl_context_mock.assert_called_once_with(ssl_version) + context.load_cert_chain.assert_called_once_with( + self.conf.ssl.cert_file, + self.conf.ssl.key_file, + ) + if ca_file: + self.assertEqual(context.verify_mode, ssl.CERT_REQUIRED) + context.load_verify_locations.assert_called_once_with( + ca_file + ) + else: + self.assertEqual(context.verify_mode, ssl.CERT_NONE) + self.assertFalse(context.check_hostname) + if ciphers: + context.set_ciphers.assert_called_once_with( + ciphers + ) + context.wrap_socket.assert_called_once_with(sock, server_side=True) def test_wrap(self): self._test_wrap() @@ -103,10 +119,7 @@ class SslutilsTestCase(base.ServiceBaseTestCase): def test_wrap_ca_file(self): self.conf.set_default("ca_file", self.ca_file_name, group=sslutils.config_section) - ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file, - 'cert_reqs': ssl.CERT_REQUIRED - } - self._test_wrap(**ssl_kwargs) + self._test_wrap(ca_file=self.conf.ssl.ca_file) def test_wrap_ciphers(self): self.conf.set_default("ca_file", self.ca_file_name, @@ -118,17 +131,13 @@ class SslutilsTestCase(base.ServiceBaseTestCase): ) self.conf.set_default("ciphers", ciphers, group=sslutils.config_section) - ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file, - 'cert_reqs': ssl.CERT_REQUIRED, - 'ciphers': ciphers} - self._test_wrap(**ssl_kwargs) + self._test_wrap(ca_file=self.conf.ssl.ca_file, + ciphers=self.conf.ssl.ciphers) def test_wrap_ssl_version(self): self.conf.set_default("ca_file", self.ca_file_name, group=sslutils.config_section) self.conf.set_default("version", "tlsv1", group=sslutils.config_section) - ssl_kwargs = {'ca_certs': self.conf.ssl.ca_file, - 'cert_reqs': ssl.CERT_REQUIRED, - 'ssl_version': ssl.PROTOCOL_TLSv1} - self._test_wrap(**ssl_kwargs) + self._test_wrap(ca_file=self.conf.ssl.ca_file, + ssl_version=ssl.PROTOCOL_TLSv1)