diff --git a/doc/source/transport.rst b/doc/source/transport.rst index 4c4b36080..f914269d9 100644 --- a/doc/source/transport.rst +++ b/doc/source/transport.rst @@ -8,4 +8,9 @@ Transport .. autoclass:: Transport +.. autoclass:: TransportURL + :members: + +.. autoclass:: TransportHost + .. autofunction:: set_transport_defaults diff --git a/oslo/messaging/_drivers/amqpdriver.py b/oslo/messaging/_drivers/amqpdriver.py index 930afe7da..0dacec488 100644 --- a/oslo/messaging/_drivers/amqpdriver.py +++ b/oslo/messaging/_drivers/amqpdriver.py @@ -24,7 +24,6 @@ from oslo import messaging from oslo.messaging._drivers import amqp as rpc_amqp from oslo.messaging._drivers import base from oslo.messaging._drivers import common as rpc_common -from oslo.messaging import _urls as urls LOG = logging.getLogger(__name__) @@ -241,12 +240,12 @@ class ReplyWaiter(object): class AMQPDriverBase(base.BaseDriver): - def __init__(self, conf, connection_pool, url=None, default_exchange=None, - allowed_remote_exmods=[]): + def __init__(self, conf, url, connection_pool, + default_exchange=None, allowed_remote_exmods=[]): super(AMQPDriverBase, self).__init__(conf, url, default_exchange, allowed_remote_exmods) - self._server_params = self._parse_url(self._url) + self._server_params = self._server_params_from_url(self._url) self._default_exchange = default_exchange @@ -261,38 +260,22 @@ class AMQPDriverBase(base.BaseDriver): self._reply_q_conn = None self._waiter = None - @staticmethod - def _parse_url(url): - if url is None: - return None - - parsed = urls.parse_url(url) - - # Make sure there's not a query string; that could identify - # requirements we can't comply with (e.g., ssl), so reject it if - # it's present - if parsed['parameters']: - raise messaging.InvalidTransportURL( - url, "Cannot comply with query string in transport URL") - - if not parsed['hosts']: + def _server_params_from_url(self, url): + if not url.hosts: return None sp = { - 'virtual_host': parsed['virtual_host'], + 'virtual_host': url.virtual_host, } # FIXME(markmc): support multiple hosts - host = parsed['hosts'][0] + host = url.hosts[0] - if ':' in host['host']: - (sp['hostname'], sp['port']) = host['host'].split(':', 1) - sp['port'] = int(sp['port']) - else: - sp['hostname'] = host['host'] - - sp['username'] = host['username'] - sp['password'] = host['password'] + sp['hostname'] = host.hostname + if host.port is not None: + sp['port'] = host.port + sp['username'] = host.username or '' + sp['password'] = host.password or '' return sp diff --git a/oslo/messaging/_drivers/base.py b/oslo/messaging/_drivers/base.py index 085b28dd6..614552c59 100644 --- a/oslo/messaging/_drivers/base.py +++ b/oslo/messaging/_drivers/base.py @@ -55,8 +55,8 @@ class BaseDriver(object): __metaclass__ = abc.ABCMeta - def __init__(self, conf, url=None, default_exchange=None, - allowed_remote_exmods=[]): + def __init__(self, conf, url, + default_exchange=None, allowed_remote_exmods=[]): self.conf = conf self._url = url self._default_exchange = default_exchange diff --git a/oslo/messaging/_drivers/impl_fake.py b/oslo/messaging/_drivers/impl_fake.py index 699826239..ef331b5fb 100644 --- a/oslo/messaging/_drivers/impl_fake.py +++ b/oslo/messaging/_drivers/impl_fake.py @@ -86,7 +86,7 @@ class FakeExchange(object): class FakeDriver(base.BaseDriver): - def __init__(self, conf, url=None, default_exchange=None, + def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=[]): super(FakeDriver, self).__init__(conf, url, default_exchange, allowed_remote_exmods=[]) diff --git a/oslo/messaging/_drivers/impl_qpid.py b/oslo/messaging/_drivers/impl_qpid.py index e159277ce..93eba029d 100644 --- a/oslo/messaging/_drivers/impl_qpid.py +++ b/oslo/messaging/_drivers/impl_qpid.py @@ -742,13 +742,14 @@ def cleanup(): class QpidDriver(amqpdriver.AMQPDriverBase): - def __init__(self, conf, url=None, default_exchange=None, - allowed_remote_exmods=[]): + def __init__(self, conf, url, + default_exchange=None, allowed_remote_exmods=[]): conf.register_opts(qpid_opts) conf.register_opts(rpc_amqp.amqp_opts) connection_pool = rpc_amqp.get_connection_pool(conf, Connection) - super(QpidDriver, self).__init__(conf, connection_pool, - url, default_exchange, + super(QpidDriver, self).__init__(conf, url, + connection_pool, + default_exchange, allowed_remote_exmods) diff --git a/oslo/messaging/_drivers/impl_rabbit.py b/oslo/messaging/_drivers/impl_rabbit.py index 71f65b49f..bc397232a 100644 --- a/oslo/messaging/_drivers/impl_rabbit.py +++ b/oslo/messaging/_drivers/impl_rabbit.py @@ -873,13 +873,14 @@ def cleanup(): class RabbitDriver(amqpdriver.AMQPDriverBase): - def __init__(self, conf, url=None, default_exchange=None, + def __init__(self, conf, url, default_exchange=None, allowed_remote_exmods=[]): conf.register_opts(rabbit_opts) conf.register_opts(rpc_amqp.amqp_opts) connection_pool = rpc_amqp.get_connection_pool(conf, Connection) - super(RabbitDriver, self).__init__(conf, connection_pool, - url, default_exchange, + super(RabbitDriver, self).__init__(conf, url, + connection_pool, + default_exchange, allowed_remote_exmods) diff --git a/oslo/messaging/_urls.py b/oslo/messaging/_urls.py deleted file mode 100644 index b7c865cc4..000000000 --- a/oslo/messaging/_urls.py +++ /dev/null @@ -1,111 +0,0 @@ - -# Copyright 2013 Red Hat, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); you may -# not use this file except in compliance with the License. You may obtain -# a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT -# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the -# License for the specific language governing permissions and limitations -# under the License. - -import urlparse - - -def parse_url(url): - """Parse an url. - - Assuming a URL takes the form of: - - transport://user:pass@host1:port[,hostN:portN]/virtual_host[?opt=val] - - then parse the URL and return a dictionary with the following structure: - - { - 'virtual_host': 'virtual_host', - 'transport': 'transport', - 'hosts': [{'username': 'username', - 'password': 'password' - 'host': 'host1:port1'}, - ...], - 'parameters': {'option': 'value'} - } - - Netloc is parsed following the sequence bellow: - - * It is first splitted by ',' in order to support multiple hosts - * The last parsed username and password will be propagated to the rest - of hotsts specified: - - user:passwd@host1:port1,host2:port2 - - [ - {"username": "user", "password": "passwd", "host": "host1:port1"}, - {"username": "user", "password": "passwd", "host": "host2:port2"} - ] - - * In order to avoid the above propagation, it is possible to alter the - order in which the hosts are specified or specify a set of fake credentials - using ",:@host2:port2" - - - user:passwd@host1:port1,:@host2:port2 - - [ - {"username": "user", "password": "passwd", "host": "host1:port1"}, - {"username": "", "password": "", "host": "host2:port2"} - ] - - :param url: The URL to parse - :type url: str - :returns: A dictionary with the parsed data - """ - if not url: - return None - - # NOTE(flaper87): Not PY3K compliant - if not isinstance(url, basestring): - raise TypeError("Wrong URL type") - - url = urlparse.urlparse(url) - - parsed = dict(transport=url.scheme) - - virtual_host = None - if url.path.startswith('/'): - virtual_host = url.path[1:] - parsed["virtual_host"] = virtual_host - - # NOTE(flaper87): Parse netloc. - hosts = [] - username = password = '' - for host in url.netloc.split(","): - if not host: - continue - - if "@" in host: - username, host = host.split("@", 1) - if ":" in username: - username, password = username.split(":", 1) - - hosts.append({ - "host": host, - "username": username, - "password": password, - }) - - parsed["hosts"] = hosts - - parameters = {} - if url.query: - # NOTE(flaper87): This returns a dict with - # key -> [value], those values need to be - # normalized - parameters = urlparse.parse_qs(url.query) - parsed['parameters'] = parameters - - return parsed diff --git a/oslo/messaging/transport.py b/oslo/messaging/transport.py index 14e425f0b..fa594eeab 100644 --- a/oslo/messaging/transport.py +++ b/oslo/messaging/transport.py @@ -3,6 +3,7 @@ # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # Copyright 2013 Red Hat, Inc. +# Copyright (c) 2012 Rackspace Hosting # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain @@ -20,10 +21,13 @@ __all__ = [ 'DriverLoadFailure', 'InvalidTransportURL', 'Transport', + 'TransportHost', + 'TransportURL', 'get_transport', 'set_transport_defaults', ] +import urllib import urlparse from oslo.config import cfg @@ -136,36 +140,258 @@ def get_transport(conf, url=None, allowed_remote_exmods=[]): rabbit://me:passwd@host:5672/virtual_host + and can either be passed as a string or a TransportURL object. + :param conf: the user configuration :type conf: cfg.ConfigOpts :param url: a transport URL - :type url: str + :type url: str or TransportURL :param allowed_remote_exmods: a list of modules which a client using this - transport will deserialize remote exceptions from + transport will deserialize remote exceptions + from :type allowed_remote_exmods: list """ conf.register_opts(_transport_opts) - url = url or conf.transport_url - if url is not None: - rpc_backend = urlparse.urlparse(url).scheme - if not rpc_backend: + if not isinstance(url, TransportURL): + url = url or conf.transport_url + parsed = TransportURL.parse(conf, url) + if not parsed.transport: raise InvalidTransportURL(url, 'No scheme specified in "%s"' % url) - else: - rpc_backend = conf.rpc_backend + url = parsed kwargs = dict(default_exchange=conf.control_exchange, allowed_remote_exmods=allowed_remote_exmods) - if url is not None: - kwargs['url'] = url try: mgr = driver.DriverManager('oslo.messaging.drivers', - rpc_backend, + url.transport, invoke_on_load=True, - invoke_args=[conf], + invoke_args=[conf, url], invoke_kwds=kwargs) except RuntimeError as ex: - raise DriverLoadFailure(rpc_backend, ex) + raise DriverLoadFailure(url.transport, ex) return Transport(mgr.driver) + + +class TransportHost(object): + + """A host element of a parsed transport URL.""" + + def __init__(self, hostname=None, port=None, username=None, password=None): + self.hostname = hostname + self.port = port + self.username = username + self.password = password + + def __eq__(self, other): + return vars(self) == vars(other) + + def __ne__(self, other): + return not self == other + + def __repr__(self): + attrs = [] + for a in ['hostname', 'port', 'username', 'password']: + v = getattr(self, a) + if v: + attrs.append((a, repr(v))) + values = ', '.join(['%s=%s' % i for i in attrs]) + return '' + + +class TransportURL(object): + + """A parsed transport URL. + + Transport URLs take the form:: + + transport://user:pass@host1:port[,hostN:portN]/virtual_host + + i.e. the scheme selects the transport driver, you may include multiple + hosts in netloc and the path part is a "virtual host" partition path. + + :param conf: a ConfigOpts instance + :type conf: oslo.config.cfg.ConfigOpts + :param transport: a transport name e.g. 'rabbit' or 'qpid' + :type transport: str + :param virtual_host: a virtual host path e.g. '/' + :type virtual_host: str + :param hosts: a list of TransportHost objects + :type hosts: list + """ + + def __init__(self, conf, transport=None, virtual_host=None, hosts=None): + self.conf = conf + self.conf.register_opts(_transport_opts) + self._transport = transport + self._virtual_host = virtual_host + self._hosts = hosts + if self._hosts is None: + self._hosts = [] + + @property + def transport(self): + if self._transport is None: + return self.conf.rpc_backend + else: + return self._transport + + @transport.setter + def transport(self, value): + self._transport = value + + @property + def virtual_host(self): + return self._virtual_host + + @virtual_host.setter + def virtual_host(self, value): + self._virtual_host = value + + @property + def hosts(self): + return self._hosts + + def __eq__(self, other): + return (self.transport == other.transport and + self.virtual_host == other.virtual_host and + self.hosts == other.hosts) + + def __ne__(self, other): + return not self == other + + def __repr__(self): + attrs = [] + for a in ['transport', 'virtual_host', 'hosts']: + v = getattr(self, a) + if v: + attrs.append((a, repr(v))) + values = ', '.join(['%s=%s' % i for i in attrs]) + return '' + + def __str__(self): + netlocs = [] + + for host in self.hosts: + username = host.username + password = host.password + hostname = host.hostname + port = host.port + + # Starting place for the network location + netloc = '' + + # Build the username and password portion of the transport URL + if username is not None or password is not None: + if username is not None: + netloc += urllib.quote(username, '') + if password is not None: + netloc += ':%s' % urllib.quote(password, '') + netloc += '@' + + # Build the network location portion of the transport URL + if hostname: + if ':' in hostname: + netloc += '[%s]' % hostname + else: + netloc += hostname + if port is not None: + netloc += ':%d' % port + + netlocs.append(netloc) + + # Assemble the transport URL + url = '%s://%s/' % (self.transport, ','.join(netlocs)) + + if self.virtual_host: + url += urllib.quote(self.virtual_host) + + return url + + @classmethod + def parse(cls, conf, url): + """Parse an url. + + Assuming a URL takes the form of: + + transport://user:pass@host1:port[,hostN:portN]/virtual_host + + then parse the URL and return a TransportURL object. + + Netloc is parsed following the sequence bellow: + + * It is first splitted by ',' in order to support multiple hosts + * The last parsed username and password will be propagated to the rest + of hotsts specified: + + user:passwd@host1:port1,host2:port2 + + [ + {"username": "user", "password": "passwd", "host": "host1:port1"}, + {"username": "user", "password": "passwd", "host": "host2:port2"} + ] + + * In order to avoid the above propagation, it is possible to alter the + order in which the hosts are specified or specify a set of fake + credentials using ",:@host2:port2" + + user:passwd@host1:port1,:@host2:port2 + + [ + {"username": "user", "password": "passwd", "host": "host1:port1"}, + {"username": "", "password": "", "host": "host2:port2"} + ] + + :param conf: a ConfigOpts instance + :type conf: oslo.config.cfg.ConfigOpts + :param url: The URL to parse + :type url: str + :returns: A TransportURL + """ + if not url: + return cls(conf) + + # FIXME(flaper87): Not PY3K compliant + if not isinstance(url, basestring): + raise InvalidTransportURL(url, 'Wrong URL type') + + url = urlparse.urlparse(url) + + # Make sure there's not a query string; that could identify + # requirements we can't comply with (e.g., ssl), so reject it if + # it's present + if '?' in url.path or url.query: + raise InvalidTransportURL(url.geturl(), + "Cannot comply with query string in " + "transport URL") + + virtual_host = None + if url.path.startswith('/'): + virtual_host = url.path[1:] + + hosts = [] + + username = password = '' + for host in url.netloc.split(','): + if not host: + continue + + hostname = host + username = password = port = None + + if '@' in host: + username, hostname = host.split('@', 1) + if ':' in username: + username, password = username.split(':', 1) + if ':' in hostname: + hostname, port = hostname.split(':', 1) + port = int(port) + + hosts.append(TransportHost(hostname=hostname, + port=port, + username=username, + password=password)) + + return cls(conf, url.scheme, virtual_host, hosts) diff --git a/tests/test_transport.py b/tests/test_transport.py index bbdc0b796..62da912b5 100644 --- a/tests/test_transport.py +++ b/tests/test_transport.py @@ -52,26 +52,12 @@ class _FakeManager(object): class GetTransportTestCase(test_utils.BaseTestCase): scenarios = [ - ('all_none', - dict(url=None, transport_url=None, rpc_backend=None, - control_exchange=None, allowed=None, - expect=dict(backend=None, - exchange=None, - url=None, - allowed=[]))), ('rpc_backend', dict(url=None, transport_url=None, rpc_backend='testbackend', control_exchange=None, allowed=None, expect=dict(backend='testbackend', exchange=None, - url=None, - allowed=[]))), - ('control_exchange', - dict(url=None, transport_url=None, rpc_backend=None, - control_exchange='testexchange', allowed=None, - expect=dict(backend=None, - exchange='testexchange', - url=None, + url='testbackend:', allowed=[]))), ('transport_url', dict(url=None, transport_url='testtransport:', rpc_backend=None, @@ -87,12 +73,19 @@ class GetTransportTestCase(test_utils.BaseTestCase): exchange=None, url='testtransport:', allowed=[]))), + ('control_exchange', + dict(url=None, transport_url=None, rpc_backend='testbackend', + control_exchange='testexchange', allowed=None, + expect=dict(backend='testbackend', + exchange='testexchange', + url='testbackend:', + allowed=[]))), ('allowed_remote_exmods', - dict(url=None, transport_url=None, rpc_backend=None, + dict(url=None, transport_url=None, rpc_backend='testbackend', control_exchange=None, allowed=['foo', 'bar'], - expect=dict(backend=None, + expect=dict(backend='testbackend', exchange=None, - url=None, + url='testbackend:', allowed=['foo', 'bar']))), ] @@ -107,11 +100,11 @@ class GetTransportTestCase(test_utils.BaseTestCase): self.mox.StubOutWithMock(driver, 'DriverManager') - invoke_args = [self.conf] + invoke_args = [self.conf, + messaging.TransportURL.parse(self.conf, + self.expect['url'])] invoke_kwds = dict(default_exchange=self.expect['exchange'], allowed_remote_exmods=self.expect['allowed']) - if self.expect['url']: - invoke_kwds['url'] = self.expect['url'] drvr = _FakeDriver(self.conf) driver.DriverManager('oslo.messaging.drivers', @@ -164,7 +157,9 @@ class GetTransportSadPathTestCase(test_utils.BaseTestCase): if self.rpc_backend: self.mox.StubOutWithMock(driver, 'DriverManager') - invoke_args = [self.conf] + invoke_args = [self.conf, + messaging.TransportURL.parse(self.conf, + self.url)] invoke_kwds = dict(default_exchange='openstack', allowed_remote_exmods=[]) @@ -191,7 +186,7 @@ class GetTransportSadPathTestCase(test_utils.BaseTestCase): for k, v in self.ex.items(): self.assertTrue(hasattr(ex, k)) - self.assertEqual(getattr(ex, k), v) + self.assertEqual(str(getattr(ex, k)), v) # FIXME(markmc): this could be used elsewhere diff --git a/tests/test_urls.py b/tests/test_urls.py index 3e0439f69..e4ceb8fdf 100644 --- a/tests/test_urls.py +++ b/tests/test_urls.py @@ -13,12 +13,13 @@ # License for the specific language governing permissions and limitations # under the License. +from oslo.config import cfg import testscenarios -from oslo.messaging import _urls as urls +from oslo import messaging +from oslo.messaging import transport from tests import utils as test_utils - load_tests = testscenarios.load_tests_apply_scenarios @@ -27,89 +28,163 @@ class TestParseURL(test_utils.BaseTestCase): scenarios = [ ('transport', dict(url='foo:', - expect=dict(transport='foo', - virtual_host=None, - hosts=[], - parameters={}))), + expect=dict(transport='foo'))), ('virtual_host_slash', dict(url='foo:////', - expect=dict(transport='foo', - virtual_host='/', - hosts=[], - parameters={}))), + expect=dict(transport='foo', virtual_host='/'))), ('virtual_host', dict(url='foo:///bar', - expect=dict(transport='foo', - virtual_host='bar', - hosts=[], - parameters={}))), + expect=dict(transport='foo', virtual_host='bar'))), ('host', dict(url='foo://host/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host', - username='', - password=''), - ], - parameters={}))), + dict(host='host'), + ]))), ('port', dict(url='foo://host:1234/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host:1234', - username='', - password=''), - ], - parameters={}))), + dict(host='host', port=1234), + ]))), ('username', dict(url='foo://u@host:1234/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host:1234', - username='u', - password=''), - ], - parameters={}))), + dict(host='host', port=1234, username='u'), + ]))), ('password', dict(url='foo://u:p@host:1234/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host:1234', - username='u', - password='p'), - ], - parameters={}))), + dict(host='host', port=1234, + username='u', password='p'), + ]))), ('multi_host', dict(url='foo://u:p@host1:1234,host2:4321/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host1:1234', - username='u', - password='p'), - dict(host='host2:4321', - username='u', - password='p'), - ], - parameters={}))), + dict(host='host1', port=1234, + username='u', password='p'), + dict(host='host2', port=4321), + ]))), ('multi_creds', dict(url='foo://u1:p1@host1:1234,u2:p2@host2:4321/bar', expect=dict(transport='foo', virtual_host='bar', hosts=[ - dict(host='host1:1234', - username='u1', - password='p1'), - dict(host='host2:4321', - username='u2', - password='p2'), - ], - parameters={}))), + dict(host='host1', port=1234, + username='u1', password='p1'), + dict(host='host2', port=4321, + username='u2', password='p2'), + ]))), ] + def setUp(self): + super(TestParseURL, self).setUp(conf=cfg.ConfigOpts()) + self.conf.register_opts(transport._transport_opts) + def test_parse_url(self): - self.assertEqual(urls.parse_url(self.url), self.expect) + self.config(rpc_backend=None) + + url = messaging.TransportURL.parse(self.conf, self.url) + + hosts = [] + for host in self.expect.get('hosts', []): + hosts.append(messaging.TransportHost(host.get('host'), + host.get('port'), + host.get('username'), + host.get('password'))) + expected = messaging.TransportURL(self.conf, + self.expect.get('transport'), + self.expect.get('virtual_host'), + hosts) + + self.assertEqual(url, expected) + + +class TestFormatURL(test_utils.BaseTestCase): + + scenarios = [ + ('rpc_backend', + dict(rpc_backend='testbackend', + transport=None, + virtual_host=None, + hosts=[], + expected='testbackend:///')), + ('transport', + dict(rpc_backend=None, + transport='testtransport', + virtual_host=None, + hosts=[], + expected='testtransport:///')), + ('virtual_host', + dict(rpc_backend=None, + transport='testtransport', + virtual_host='/vhost', + hosts=[], + expected='testtransport:////vhost')), + ('host', + dict(rpc_backend=None, + transport='testtransport', + virtual_host='/', + hosts=[ + dict(hostname='host', + port=10, + username='bob', + password='secret'), + ], + expected='testtransport://bob:secret@host:10//')), + ('multi_host', + dict(rpc_backend=None, + transport='testtransport', + virtual_host='', + hosts=[ + dict(hostname='h1', + port=1000, + username='b1', + password='s1'), + dict(hostname='h2', + port=2000, + username='b2', + password='s2'), + ], + expected='testtransport://b1:s1@h1:1000,b2:s2@h2:2000/')), + ('quoting', + dict(rpc_backend=None, + transport='testtransport', + virtual_host='/$', + hosts=[ + dict(hostname='host', + port=10, + username='b$', + password='s&'), + ], + expected='testtransport://b%24:s%26@host:10//%24')), + ] + + def setUp(self): + super(TestFormatURL, self).setUp(conf=cfg.ConfigOpts()) + self.conf.register_opts(transport._transport_opts) + + def test_parse_url(self): + self.config(rpc_backend=self.rpc_backend) + + hosts = [] + for host in self.hosts: + hosts.append(messaging.TransportHost(host.get('hostname'), + host.get('port'), + host.get('username'), + host.get('password'))) + + url = messaging.TransportURL(self.conf, + self.transport, + self.virtual_host, + hosts) + + self.assertEqual(str(url), self.expected)