diff --git a/tests/test_rpc_server.py b/tests/test_rpc_server.py index e3649ec23..fcb52f687 100644 --- a/tests/test_rpc_server.py +++ b/tests/test_rpc_server.py @@ -16,12 +16,15 @@ import threading from oslo.config import cfg +import testscenarios from oslo import messaging from tests import utils as test_utils +load_tests = testscenarios.load_tests_apply_scenarios -class TestRPCServer(test_utils.BaseTestCase): + +class ServerSetupMixin(object): class Server(object): def __init__(self, transport, topic, server, endpoint, serializer): @@ -45,14 +48,13 @@ class TestRPCServer(test_utils.BaseTestCase): def deserialize_entity(self, ctxt, entity): return 'd' + (entity or '') - def setUp(self): - super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts()) + def __init__(self): self.serializer = self.TestSerializer() - def _setup_server(self, transport, endpoint, topic='testtopic'): + def _setup_server(self, transport, endpoint, topic=None, server=None): server = self.Server(transport, - topic=topic, - server='testserver', + topic=topic or 'testtopic', + server=server or 'testserver', endpoint=endpoint, serializer=self.serializer) @@ -68,11 +70,21 @@ class TestRPCServer(test_utils.BaseTestCase): client.cast({}, 'stop') server_thread.join(timeout=30) - def _setup_client(self, transport): + def _setup_client(self, transport, topic='testtopic'): return messaging.RPCClient(transport, - messaging.Target(topic='testtopic'), + messaging.Target(topic=topic), serializer=self.serializer) + +class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin): + + def __init__(self, *args): + super(TestRPCServer, self).__init__(*args) + ServerSetupMixin.__init__(self) + + def setUp(self): + super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts()) + def test_constructor(self): transport = messaging.get_transport(self.conf, url='fake:') target = messaging.Target(topic='foo', server='bar') @@ -194,8 +206,116 @@ class TestRPCServer(test_utils.BaseTestCase): self._stop_server(client, server_thread) + +class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin): + + _exchanges = [ + ('same_exchange', dict(exchange1=None, exchange2=None)), + ('diff_exchange', dict(exchange1='x1', exchange2='x2')), + ] + + _topics = [ + ('same_topic', dict(topic1='t', topic2='t')), + ('diff_topic', dict(topic1='t1', topic2='t2')), + ] + + _server = [ + ('same_server', dict(server1=None, server2=None)), + ('diff_server', dict(server1='s1', server2='s2')), + ] + + _fanout = [ + ('not_fanout', dict(fanout1=None, fanout2=None)), + ('fanout', dict(fanout1=True, fanout2=True)), + ] + + _method = [ + ('call', dict(call1=True, call2=True)), + ('cast', dict(call1=False, call2=False)), + ] + + _endpoints = [ + ('one_endpoint', + dict(multi_endpoints=False, + expect1=['ds1', 'ds2'], + expect2=['ds1', 'ds2'])), + ('two_endpoints', + dict(multi_endpoints=True, + expect1=['ds1'], + expect2=['ds2'])), + ] + + @classmethod + def generate_scenarios(cls): + cls.scenarios = testscenarios.multiply_scenarios(cls._exchanges, + cls._topics, + cls._server, + cls._fanout, + cls._method, + cls._endpoints) + + # fanout call not supported + def filter_fanout_call(scenario): + params = scenario[1] + fanout = params['fanout1'] or params['fanout2'] + call = params['call1'] or params['call2'] + return not (call and fanout) + + # listening multiple times on same topic/server pair not supported + def filter_same_topic_and_server(scenario): + params = scenario[1] + single_topic = params['topic1'] == params['topic2'] + single_server = params['server1'] == params['server2'] + return not (single_topic and single_server) + + # fanout to multiple servers on same topic and exchange + # each endpoint will receive both messages + def fanout_to_servers(scenario): + params = scenario[1] + fanout = params['fanout1'] or params['fanout2'] + single_exchange = params['exchange1'] == params['exchange2'] + single_topic = params['topic1'] == params['topic2'] + multi_servers = params['server1'] != params['server2'] + if fanout and single_exchange and single_topic and multi_servers: + params['expect1'] = params['expect1'][:] + params['expect1'] + params['expect2'] = params['expect2'][:] + params['expect2'] + return scenario + + # multiple endpoints on same topic and exchange + # either endpoint can get either message + def single_topic_multi_endpoints(scenario): + params = scenario[1] + single_exchange = params['exchange1'] == params['exchange2'] + single_topic = params['topic1'] == params['topic2'] + if single_topic and single_exchange and params['multi_endpoints']: + params['expect_either'] = (params['expect1'] + + params['expect2']) + params['expect1'] = params['expect2'] = [] + else: + params['expect_either'] = [] + return scenario + + for f in [filter_fanout_call, filter_same_topic_and_server]: + cls.scenarios = filter(f, cls.scenarios) + for m in [fanout_to_servers, single_topic_multi_endpoints]: + cls.scenarios = map(m, cls.scenarios) + + def __init__(self, *args): + super(TestMultipleServers, self).__init__(*args) + ServerSetupMixin.__init__(self) + + def setUp(self): + super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts()) + def test_multiple_servers(self): - transport = messaging.get_transport(self.conf, url='fake:') + url1 = 'fake:///' + (self.exchange1 or '') + url2 = 'fake:///' + (self.exchange2 or '') + + transport1 = messaging.get_transport(self.conf, url=url1) + if url1 != url2: + transport2 = messaging.get_transport(self.conf, url=url1) + else: + transport2 = transport1 class TestEndpoint(object): def __init__(self): @@ -204,21 +324,52 @@ class TestRPCServer(test_utils.BaseTestCase): def ping(self, ctxt, arg): self.pings.append(arg) - endpoint = TestEndpoint() + def alive(self, ctxt): + return 'alive' - thread1 = self._setup_server(transport, endpoint, topic='topic1') - thread2 = self._setup_server(transport, endpoint, topic='topic2') + if self.multi_endpoints: + endpoint1, endpoint2 = TestEndpoint(), TestEndpoint() + else: + endpoint1 = endpoint2 = TestEndpoint() - client = self._setup_client(transport) + thread1 = self._setup_server(transport1, endpoint1, + topic=self.topic1, server=self.server1) + thread2 = self._setup_server(transport2, endpoint2, + topic=self.topic2, server=self.server2) - client.prepare(topic='topic1').cast({}, 'ping', arg='1') - client.prepare(topic='topic2').cast({}, 'ping', arg='2') + client1 = self._setup_client(transport1, topic=self.topic1) + client2 = self._setup_client(transport2, topic=self.topic2) + + client1 = client1.prepare(server=self.server1) + client2 = client2.prepare(server=self.server2) + + if self.fanout1: + client1.call({}, 'alive') + client1 = client1.prepare(fanout=True) + if self.fanout2: + client2.call({}, 'alive') + client2 = client2.prepare(fanout=True) + + (client1.call if self.call1 else client1.cast)({}, 'ping', arg='1') + (client2.call if self.call2 else client2.cast)({}, 'ping', arg='2') self.assertTrue(thread1.isAlive()) - self._stop_server(client, thread1, topic='topic1') + self._stop_server(client1.prepare(fanout=None), + thread1, topic=self.topic1) self.assertTrue(thread2.isAlive()) - self._stop_server(client, thread2, topic='topic2') + self._stop_server(client2.prepare(fanout=None), + thread2, topic=self.topic2) - self.assertEquals(len(endpoint.pings), 2) - self.assertTrue('ds1' in endpoint.pings) - self.assertTrue('ds2' in endpoint.pings) + def check(pings, expect): + self.assertEquals(len(pings), len(expect)) + for a in expect: + self.assertTrue(a in pings) + + if self.expect_either: + check(endpoint1.pings + endpoint2.pings, self.expect_either) + else: + check(endpoint1.pings, self.expect1) + check(endpoint2.pings, self.expect2) + + +TestMultipleServers.generate_scenarios()