Use testscenarios

Now that we're using testr rather than nose, we can actually use
testscenarios.

Nice diffstat ... 474 lines removed :)
This commit is contained in:
Mark McLoughlin 2013-06-15 10:23:56 +01:00
parent cbfb1452a4
commit 8bf3c862b3
4 changed files with 100 additions and 574 deletions

View File

@ -14,26 +14,14 @@
# under the License.
from oslo.config import cfg
import testscenarios
from oslo import messaging
from oslo.messaging.rpc import client as rpc_client
from oslo.messaging import serializer as msg_serializer
from tests import utils as test_utils
# FIXME(markmc): I'm having touble using testscenarios with nose
# import testscenarios
# load_tests = testscenarios.load_tests_apply_scenarios
def _test_scenario(name, scenarios, obj, method):
for n, args in scenarios:
if n == name:
for k, v in args.items():
setattr(obj, k, v)
break
else:
raise RuntimeError('Invalid scenario', name)
method()
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeTransport(object):
@ -64,7 +52,7 @@ class TestCastCall(test_utils.BaseTestCase):
super(TestCastCall, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(rpc_client._client_opts)
def _test_cast_call(self):
def test_cast_call(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
@ -84,21 +72,6 @@ class TestCastCall(test_utils.BaseTestCase):
method = client.call if self.call else client.cast
method(self.ctxt, 'foo', **self.args)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_cast_call)
def test_cast_no_ctxt_no_args(self):
self._test_scenario('cast_no_ctxt_no_args')
def test_call_no_ctxt_no_args(self):
self._test_scenario('call_no_ctxt_no_args')
def test_cast_ctxt_and_args(self):
self._test_scenario('cast_ctxt_and_args')
def test_call_ctxt_and_args(self):
self._test_scenario('call_ctxt_and_args')
class TestCastToTarget(test_utils.BaseTestCase):
@ -205,7 +178,7 @@ class TestCastToTarget(test_utils.BaseTestCase):
def setUp(self):
super(TestCastToTarget, self).setUp(conf=cfg.ConfigOpts())
def _test_cast_to_target(self):
def test_cast_to_target(self):
target = messaging.Target(**self.ctor)
expect_target = messaging.Target(**self.expect)
@ -227,84 +200,6 @@ class TestCastToTarget(test_utils.BaseTestCase):
client = client.prepare(**self.prepare)
client.cast({}, 'foo')
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_cast_to_target)
def test_cast_to_target_all_none(self):
self._test_scenario('all_none')
def test_cast_to_target_ctor_exchange(self):
self._test_scenario('ctor_exchange')
def test_cast_to_target_prepare_exchange(self):
self._test_scenario('prepare_exchange')
def test_cast_to_target_prepare_exchange_none(self):
self._test_scenario('prepare_exchange_none')
def test_cast_to_target_both_exchange(self):
self._test_scenario('both_exchange')
def test_cast_to_target_ctor_topic(self):
self._test_scenario('ctor_topic')
def test_cast_to_target_prepare_topic(self):
self._test_scenario('prepare_topic')
def test_cast_to_target_prepare_topic_none(self):
self._test_scenario('prepare_topic_none')
def test_cast_to_target_both_topic(self):
self._test_scenario('both_topic')
def test_cast_to_target_ctor_namespace(self):
self._test_scenario('ctor_namespace')
def test_cast_to_target_prepare_namespace(self):
self._test_scenario('prepare_namespace')
def test_cast_to_target_prepare_namespace_none(self):
self._test_scenario('prepare_namespace_none')
def test_cast_to_target_both_namespace(self):
self._test_scenario('both_namespace')
def test_cast_to_target_ctor_version(self):
self._test_scenario('ctor_version')
def test_cast_to_target_prepare_version(self):
self._test_scenario('prepare_version')
def test_cast_to_target_prepare_version_none(self):
self._test_scenario('prepare_version_none')
def test_cast_to_target_both_version(self):
self._test_scenario('both_version')
def test_cast_to_target_ctor_server(self):
self._test_scenario('ctor_server')
def test_cast_to_target_prepare_server(self):
self._test_scenario('prepare_server')
def test_cast_to_target_prepare_server_none(self):
self._test_scenario('prepare_server_none')
def test_cast_to_target_both_server(self):
self._test_scenario('both_server')
def test_cast_to_target_ctor_fanout(self):
self._test_scenario('ctor_fanout')
def test_cast_to_target_prepare_fanout(self):
self._test_scenario('prepare_fanout')
def test_cast_to_target_prepare_fanout_none(self):
self._test_scenario('prepare_fanout_none')
def test_cast_to_target_both_fanout(self):
self._test_scenario('both_fanout')
_notset = object()
@ -332,7 +227,7 @@ class TestCallTimeout(test_utils.BaseTestCase):
super(TestCallTimeout, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(rpc_client._client_opts)
def _test_call_timeout(self):
def test_call_timeout(self):
self.config(rpc_response_timeout=self.confval)
transport = _FakeTransport(self.conf)
@ -351,30 +246,6 @@ class TestCallTimeout(test_utils.BaseTestCase):
client = client.prepare(timeout=self.prepare)
client.call({}, 'foo')
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_call_timeout)
def test_call_timeout_all_none(self):
self._test_scenario('all_none')
def test_call_timeout_confval(self):
self._test_scenario('confval')
def test_call_timeout_ctor(self):
self._test_scenario('ctor')
def test_call_timeout_ctor_zero(self):
self._test_scenario('ctor_zero')
def test_call_timeout_prepare(self):
self._test_scenario('prepare')
def test_call_timeout_prepare_override(self):
self._test_scenario('prepare_override')
def test_call_timeout_prepare_zero(self):
self._test_scenario('prepare_zero')
class TestSerializer(test_utils.BaseTestCase):
@ -395,7 +266,7 @@ class TestSerializer(test_utils.BaseTestCase):
super(TestSerializer, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(rpc_client._client_opts)
def _test_call_serializer(self):
def test_call_serializer(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
@ -429,21 +300,15 @@ class TestSerializer(test_utils.BaseTestCase):
if self.retval is not None:
self.assertEquals(retval, 'd' + self.retval)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_call_serializer)
def test_serializer_cast(self):
self._test_scenario('cast')
def test_serializer_call(self):
self._test_scenario('call')
class TestVersionCap(test_utils.BaseTestCase):
scenarios = []
_call_vs_cast = [
('call', dict(call=True)),
('cast', dict(call=False)),
]
_scenarios_template = [
_cap_scenarios = [
('all_none',
dict(cap=None, prepare_cap=_notset,
version=None, prepare_version=_notset,
@ -471,21 +336,16 @@ class TestVersionCap(test_utils.BaseTestCase):
]
@classmethod
def setup_scenarios(cls):
for s in cls._scenarios_template:
s = ('call_' + s[0], s[1].copy())
s[1]['call'] = True
cls.scenarios.append(s)
for s in cls._scenarios_template:
s = ('cast_' + s[0], s[1].copy())
s[1]['call'] = False
cls.scenarios.append(s)
def generate_scenarios(cls):
cls.scenarios = (
testscenarios.multiply_scenarios(cls._call_vs_cast,
cls._cap_scenarios))
def setUp(self):
super(TestVersionCap, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(rpc_client._client_opts)
def _test_version_cap(self):
def test_version_cap(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
@ -530,47 +390,8 @@ class TestVersionCap(test_utils.BaseTestCase):
else:
self.assertTrue(self.success)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_version_cap)
def test_version_cap_call_all_none(self):
self._test_scenario('call_all_none')
def test_version_call_ctor_cap_ok(self):
self._test_scenario('call_ctor_cap_ok')
def test_version_call_ctor_cap_override_ok(self):
self._test_scenario('call_ctor_cap_override_ok')
def test_version_call_ctor_cap_override_none(self):
self._test_scenario('call_ctor_cap_override_none_ok')
def test_version_call_ctor_cap_minor_fail(self):
self._test_scenario('call_ctor_cap_minor_fail')
def test_version_call_ctor_cap_major_fail(self):
self._test_scenario('call_ctor_cap_major_fail')
def test_version_cap_cast_all_none(self):
self._test_scenario('cast_all_none')
def test_version_cast_ctor_cap_ok(self):
self._test_scenario('cast_ctor_cap_ok')
def test_version_cast_ctor_cap_override_ok(self):
self._test_scenario('cast_ctor_cap_override_ok')
def test_version_cast_ctor_cap_override_none(self):
self._test_scenario('cast_ctor_cap_override_none_ok')
def test_version_cast_ctor_cap_minor_fail(self):
self._test_scenario('cast_ctor_cap_minor_fail')
def test_version_cast_ctor_cap_major_fail(self):
self._test_scenario('cast_ctor_cap_major_fail')
TestVersionCap.setup_scenarios()
TestVersionCap.generate_scenarios()
class TestCheckForLock(test_utils.BaseTestCase):
@ -588,7 +409,7 @@ class TestCheckForLock(test_utils.BaseTestCase):
super(TestCheckForLock, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(rpc_client._client_opts)
def _test_check_for_lock(self):
def test_check_for_lock(self):
self.config(rpc_response_timeout=None)
transport = _FakeTransport(self.conf)
@ -622,15 +443,3 @@ class TestCheckForLock(test_utils.BaseTestCase):
self.assertTrue(self.warning in warnings[0])
else:
self.assertEquals(len(warnings), 0)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_check_for_lock)
def test_check_for_lock_none(self):
self._test_scenario('none')
def test_check_for_lock_one(self):
self._test_scenario('one')
def test_check_for_lock_two(self):
self._test_scenario('two')

View File

@ -13,24 +13,13 @@
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from oslo.messaging import serializer as msg_serializer
from tests import utils as test_utils
# FIXME(markmc): I'm having touble using testscenarios with nose
# import testscenarios
# load_tests = testscenarios.load_tests_apply_scenarios
def _test_scenario(name, scenarios, obj, method):
for n, args in scenarios:
if n == name:
for k, v in args.items():
setattr(obj, k, v)
break
else:
raise RuntimeError('Invalid scenario', name)
method()
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeEndpoint(object):
@ -101,7 +90,7 @@ class TestDispatcher(test_utils.BaseTestCase):
success=False, ex=messaging.UnsupportedVersion)),
]
def _test_dispatcher(self):
def test_dispatcher(self):
endpoints = []
for e in self.endpoints:
target = messaging.Target(**e) if e else None
@ -134,39 +123,6 @@ class TestDispatcher(test_utils.BaseTestCase):
else:
self.assertTrue(self.success)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_dispatcher)
def test_dispatcher_no_endpoints(self):
self._test_scenario('no_endpoints')
def test_dispatcher_default_target(self):
self._test_scenario('default_target')
def test_dispatcher_default_target_ctxt_and_args(self):
self._test_scenario('default_target_ctxt_and_args')
def test_dispatcher_default_target_namespace(self):
self._test_scenario('default_target')
def test_dispatcher_default_target_version(self):
self._test_scenario('default_target')
def test_dispatcher_default_target_no_such_method(self):
self._test_scenario('default_target_no_such_method')
def test_dispatcher_namespace(self):
self._test_scenario('namespace')
def test_dispatcher_namespace_mismatch(self):
self._test_scenario('namespace_mismatch')
def test_dispatcher_version(self):
self._test_scenario('version')
def test_dispatcher_version_mismatch(self):
self._test_scenario('version_mismatch')
class TestSerializer(test_utils.BaseTestCase):
@ -179,7 +135,7 @@ class TestSerializer(test_utils.BaseTestCase):
retval='d')),
]
def _test_serializer(self):
def test_serializer(self):
endpoint = _FakeEndpoint()
serializer = msg_serializer.NoOpSerializer
dispatcher = messaging.RPCDispatcher([endpoint], serializer)
@ -202,12 +158,3 @@ class TestSerializer(test_utils.BaseTestCase):
retval = dispatcher(self.ctxt, dict(method='foo', args=self.args))
if self.retval is not None:
self.assertEquals(retval, 's' + self.retval)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_serializer)
def test_serializer_no_args_or_retval(self):
self._test_scenario('no_args_or_retval')
def test_serializer_args_and_retval(self):
self._test_scenario('args_and_retval')

View File

@ -13,23 +13,12 @@
# License for the specific language governing permissions and limitations
# under the License.
import testscenarios
from oslo import messaging
from tests import utils as test_utils
# FIXME(markmc): I'm having touble using testscenarios with nose
# import testscenarios
# load_tests = testscenarios.load_tests_apply_scenarios
def _test_scenario(name, scenarios, obj, method):
for n, args in scenarios:
if n == name:
for k, v in args.items():
setattr(obj, k, v)
break
else:
raise RuntimeError('Invalid scenario', name)
method()
load_tests = testscenarios.load_tests_apply_scenarios
class TargetConstructorTestCase(test_utils.BaseTestCase):
@ -44,7 +33,7 @@ class TargetConstructorTestCase(test_utils.BaseTestCase):
('fanout', dict(kwargs=dict(fanout=True))),
]
def _test_constructor(self):
def test_constructor(self):
target = messaging.Target(**self.kwargs)
for k in self.kwargs:
self.assertEquals(getattr(target, k), self.kwargs[k])
@ -54,30 +43,6 @@ class TargetConstructorTestCase(test_utils.BaseTestCase):
continue
self.assertTrue(getattr(target, k) is None)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_constructor)
def test_constructor_all_none(self):
self._test_scenario('all_none')
def test_constructor_exchange(self):
self._test_scenario('exchange')
def test_constructor_topic(self):
self._test_scenario('topic')
def test_constructor_namespace(self):
self._test_scenario('namespace')
def test_constructor_version(self):
self._test_scenario('version')
def test_constructor_server(self):
self._test_scenario('server')
def test_constructor_fanout(self):
self._test_scenario('fanout')
class TargetCallableTestCase(test_utils.BaseTestCase):
@ -121,7 +86,7 @@ class TargetCallableTestCase(test_utils.BaseTestCase):
vals=dict(fanout=True))),
]
def _test_callable(self):
def test_callable(self):
target = messaging.Target(**self.attrs)
target = target(**self.kwargs)
for k in self.vals:
@ -132,48 +97,6 @@ class TargetCallableTestCase(test_utils.BaseTestCase):
continue
self.assertTrue(getattr(target, k) is None)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_callable)
def test_callable_all_none(self):
self._test_scenario('all_none')
def test_callable_exchange_attr(self):
self._test_scenario('exchange_attr')
def test_callable_exchange_arg(self):
self._test_scenario('exchange_arg')
def test_callable_topic_attr(self):
self._test_scenario('topic_attr')
def test_callable_topic_arg(self):
self._test_scenario('topic_arg')
def test_callable_namespace_attr(self):
self._test_scenario('namespace_attr')
def test_callable_namespace_arg(self):
self._test_scenario('namespace_arg')
def test_callable_version_attr(self):
self._test_scenario('version_attr')
def test_callable_version_arg(self):
self._test_scenario('version_arg')
def test_callable_server_attr(self):
self._test_scenario('server_attr')
def test_callable_server_arg(self):
self._test_scenario('server_arg')
def test_callable_fanout_attr(self):
self._test_scenario('fanout_attr')
def test_callable_fanout_arg(self):
self._test_scenario('fanout_arg')
class TargetReprTestCase(test_utils.BaseTestCase):
@ -197,70 +120,51 @@ class TargetReprTestCase(test_utils.BaseTestCase):
'fanout=True')),
]
def _test_repr(self):
def test_repr(self):
target = messaging.Target(**self.kwargs)
self.assertEquals(str(target), '<Target ' + self.repr + '>')
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_repr)
def test_repr_all_none(self):
self._test_scenario('all_none')
def test_repr_exchange(self):
self._test_scenario('exchange')
def test_repr_topic(self):
self._test_scenario('topic')
def test_repr_namespace(self):
self._test_scenario('namespace')
def test_repr_version(self):
self._test_scenario('version')
def test_repr_server(self):
self._test_scenario('server')
def test_repr_fanout(self):
self._test_scenario('fanout')
def test_repr_exchange_and_fanout(self):
self._test_scenario('exchange_and_fanout')
_notset = object()
class EqualityTestCase(test_utils.BaseTestCase):
scenarios = [
('all_none',
dict(a_kwargs=dict(), b_kwargs=dict(), equals=True)),
]
@classmethod
def _add_scenarios(cls, param):
new_scenarios = [
('a_' + param,
dict(a_kwargs={param: 'foo'},
b_kwargs={},
equals=False)),
('b_' + param,
dict(a_kwargs={},
b_kwargs={param: 'bar'},
equals=False)),
(param + '_equals',
dict(a_kwargs={param: 'foo'},
b_kwargs={param: 'foo'},
equals=True)),
(param + '_not_equals',
dict(a_kwargs={param: 'foo'},
b_kwargs={param: 'bar'},
equals=False))
def generate_scenarios(cls):
attr = [
('exchange', dict(attr='exchange')),
('topic', dict(attr='topic')),
('namespace', dict(attr='namespace')),
('version', dict(attr='version')),
('server', dict(attr='server')),
('fanout', dict(attr='fanout')),
]
a = [
('a_notset', dict(a_value=_notset)),
('a_none', dict(a_value=None)),
('a_empty', dict(a_value='')),
('a_foo', dict(a_value='foo')),
('a_bar', dict(a_value='bar')),
]
b = [
('b_notset', dict(b_value=_notset)),
('b_none', dict(b_value=None)),
('b_empty', dict(b_value='')),
('b_foo', dict(b_value='foo')),
('b_bar', dict(b_value='bar')),
]
cls.scenarios.extend(new_scenarios)
def _test_equality(self):
a = messaging.Target(**self.a_kwargs)
b = messaging.Target(**self.b_kwargs)
cls.scenarios = testscenarios.multiply_scenarios(attr, a, b)
for s in cls.scenarios:
s[1]['equals'] = (s[1]['a_value'] == s[1]['b_value'])
def test_equality(self):
a_kwargs = {self.attr : self.a_value}
b_kwargs = {self.attr : self.b_value}
a = messaging.Target(**a_kwargs)
b = messaging.Target(**b_kwargs)
if self.equals:
self.assertEquals(a, b)
@ -269,88 +173,5 @@ class EqualityTestCase(test_utils.BaseTestCase):
self.assertNotEquals(a, b)
self.assertFalse(a == b)
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_equality)
def test_equality_all_none(self):
self._test_scenario('all_none')
def test_equality_a_exchange(self):
self._test_scenario('a_exchange')
def test_equality_b_exchange(self):
self._test_scenario('b_exchange')
def test_equality_exchange_equals(self):
self._test_scenario('exchange_equals')
def test_equality_exchange_not_equals(self):
self._test_scenario('exchange_not_equals')
def test_equality_a_topic(self):
self._test_scenario('a_topic')
def test_equality_b_topic(self):
self._test_scenario('b_topic')
def test_equality_topic_equals(self):
self._test_scenario('topic_equals')
def test_equality_topic_not_equals(self):
self._test_scenario('topic_not_equals')
def test_equality_a_namespace(self):
self._test_scenario('a_namespace')
def test_equality_b_namespace(self):
self._test_scenario('b_namespace')
def test_equality_namespace_equals(self):
self._test_scenario('namespace_equals')
def test_equality_namespace_not_equals(self):
self._test_scenario('namespace_not_equals')
def test_equality_a_version(self):
self._test_scenario('a_version')
def test_equality_b_version(self):
self._test_scenario('b_version')
def test_equality_version_equals(self):
self._test_scenario('version_equals')
def test_equality_version_not_equals(self):
self._test_scenario('version_not_equals')
def test_equality_a_server(self):
self._test_scenario('a_server')
def test_equality_b_server(self):
self._test_scenario('b_server')
def test_equality_server_equals(self):
self._test_scenario('server_equals')
def test_equality_server_not_equals(self):
self._test_scenario('server_not_equals')
def test_equality_a_fanout(self):
self._test_scenario('a_fanout')
def test_equality_b_fanout(self):
self._test_scenario('b_fanout')
def test_equality_fanout_equals(self):
self._test_scenario('fanout_equals')
def test_equality_fanout_not_equals(self):
self._test_scenario('fanout_not_equals')
EqualityTestCase._add_scenarios('exchange')
EqualityTestCase._add_scenarios('topic')
EqualityTestCase._add_scenarios('namespace')
EqualityTestCase._add_scenarios('version')
EqualityTestCase._add_scenarios('server')
EqualityTestCase._add_scenarios('fanout')
EqualityTestCase.generate_scenarios()

View File

@ -19,25 +19,13 @@ import fixtures
import mox
from oslo.config import cfg
from stevedore import driver
import testscenarios
from oslo import messaging
from oslo.messaging import transport
from tests import utils as test_utils
# FIXME(markmc): I'm having touble using testscenarios with nose
# import testscenarios
# load_tests = testscenarios.load_tests_apply_scenarios
def _test_scenario(name, scenarios, obj, method):
for n, args in scenarios:
if n == name:
for k, v in args.items():
setattr(obj, k, v)
break
else:
raise RuntimeError('Invalid scenario', name)
method()
load_tests = testscenarios.load_tests_apply_scenarios
class _FakeDriver(object):
@ -66,66 +54,38 @@ class GetTransportTestCase(test_utils.BaseTestCase):
control_exchange=None,
expect=dict(backend=None,
exchange=None,
url=None),
ex=None)),
url=None))),
('rpc_backend',
dict(url=None, transport_url=None, rpc_backend='testbackend',
control_exchange=None,
expect=dict(backend='testbackend',
exchange=None,
url=None),
ex=None)),
url=None))),
('control_exchange',
dict(url=None, transport_url=None, rpc_backend=None,
control_exchange='testexchange',
expect=dict(backend=None,
exchange='testexchange',
url=None),
ex=None)),
url=None))),
('transport_url',
dict(url=None, transport_url='testtransport:', rpc_backend=None,
control_exchange=None,
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:'),
ex=None)),
url='testtransport:'))),
('url_param',
dict(url='testtransport:', transport_url=None, rpc_backend=None,
control_exchange=None,
expect=dict(backend='testtransport',
exchange=None,
url='testtransport:'),
ex=None)),
('invalid_transport_url',
dict(url=None, transport_url='invalid', rpc_backend=None,
control_exchange=None,
expect=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('invalid_url_param',
dict(url='invalid', transport_url=None, rpc_backend=None,
control_exchange=None,
expect=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('driver_load_failure',
dict(url=None, transport_url=None, rpc_backend='testbackend',
control_exchange=None,
expect=dict(backend='testbackend',
exchange=None,
url=None),
ex=dict(cls=messaging.DriverLoadFailure,
msg_contains='Failed to load',
driver='testbackend'))),
url='testtransport:'))),
]
def setUp(self):
super(GetTransportTestCase, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(transport._transport_opts)
def _test_get_transport_happy(self):
def test_get_transport(self):
self.config(rpc_backend=self.rpc_backend,
control_exchange=self.control_exchange,
transport_url=self.transport_url)
@ -153,21 +113,43 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertTrue(transport.conf is self.conf)
self.assertTrue(transport._driver is drvr)
def _test_get_transport_sad(self):
class GetTransportSadPathTestCase(test_utils.BaseTestCase):
scenarios = [
('invalid_transport_url',
dict(url=None, transport_url='invalid', rpc_backend=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('invalid_url_param',
dict(url='invalid', transport_url=None, rpc_backend=None,
ex=dict(cls=messaging.InvalidTransportURL,
msg_contains='No scheme specified',
url='invalid'))),
('driver_load_failure',
dict(url=None, transport_url=None, rpc_backend='testbackend',
ex=dict(cls=messaging.DriverLoadFailure,
msg_contains='Failed to load',
driver='testbackend'))),
]
def setUp(self):
super(GetTransportSadPathTestCase, self).setUp(conf=cfg.ConfigOpts())
self.conf.register_opts(transport._transport_opts)
def test_get_transport_sad(self):
self.config(rpc_backend=self.rpc_backend,
control_exchange=self.control_exchange,
transport_url=self.transport_url)
if self.expect:
if self.rpc_backend:
self.mox.StubOutWithMock(driver, 'DriverManager')
invoke_args = [self.conf]
invoke_kwds = dict(default_exchange=self.expect['exchange'])
if self.expect['url']:
invoke_kwds['url'] = self.expect['url']
invoke_kwds = dict(default_exchange='openstack')
driver.DriverManager('oslo.messaging.drivers',
self.expect['backend'],
self.rpc_backend,
invoke_on_load=True,
invoke_args=invoke_args,
invoke_kwds=invoke_kwds).\
@ -191,39 +173,6 @@ class GetTransportTestCase(test_utils.BaseTestCase):
self.assertTrue(hasattr(ex, k))
self.assertEquals(getattr(ex, k), v)
def _test_get_transport(self):
if self.ex is None:
self._test_get_transport_happy()
else:
self._test_get_transport_sad()
def _test_scenario(self, name):
_test_scenario(name, self.scenarios, self, self._test_get_transport)
def test_get_transport_all_none(self):
self._test_scenario('all_none')
def test_get_transport_rpc_backend(self):
self._test_scenario('rpc_backend')
def test_get_transport_control_exchange(self):
self._test_scenario('control_exchange')
def test_get_transport_transport_url(self):
self._test_scenario('transport_url')
def test_get_transport_url_param(self):
self._test_scenario('url_param')
def test_get_transport_invalid_transport_url(self):
self._test_scenario('invalid_transport_url')
def test_get_transport_invalid_url_param(self):
self._test_scenario('invalid_url_param')
def test_get_transport_driver_load_failure(self):
self._test_scenario('driver_load_failure')
# FIXME(markmc): this could be used elsewhere
class _SetDefaultsFixture(fixtures.Fixture):