Use escape_ipv6 from oslo.utils

The utility is available since oslo.utils 3.29.0 .

Change-Id: I12dfd8936a718eacb0ced620efd60546721f7699
This commit is contained in:
Takashi Kajinami 2024-10-26 21:44:25 +09:00
parent a5b48bf98d
commit 0aaf5d36f5

View File

@ -30,14 +30,6 @@ from zaqar.transport.websocket import factory
LOG = logging.getLogger(__name__)
# TODO(derekh): use escape_ipv6 from oslo.utils once available
def _escape_ipv6(address):
"""Escape an IP address in square brackets if IPv6"""
if netutils.is_valid_ipv6(address):
return "[%s]" % address
return address
class Driver(base.DriverBase):
def __init__(self, conf, api, cache):
@ -58,7 +50,7 @@ class Driver(base.DriverBase):
@decorators.lazy_property(write=False)
def factory(self):
uri = 'ws://' + _escape_ipv6(self._ws_conf.bind) + ':' + \
uri = 'ws://' + netutils.escape_ipv6(self._ws_conf.bind) + ':' + \
str(self._ws_conf.port)
return factory.ProtocolFactory(
uri,
@ -101,7 +93,7 @@ class Driver(base.DriverBase):
else:
host = socket.gethostname()
self.notification_factory.set_subscription_url(
'http://%s:%s/' % (_escape_ipv6(host), port))
'http://%s:%s/' % (netutils.escape_ipv6(host), port))
self._api.set_subscription_factory(self.notification_factory)
task = asyncio.Task(coro_notification)