Test a bunch more RPC server scenarios
This commit is contained in:
parent
8a8685b62f
commit
2eda85e3ba
@ -16,12 +16,15 @@
|
||||
import threading
|
||||
|
||||
from oslo.config import cfg
|
||||
import testscenarios
|
||||
|
||||
from oslo import messaging
|
||||
from tests import utils as test_utils
|
||||
|
||||
load_tests = testscenarios.load_tests_apply_scenarios
|
||||
|
||||
class TestRPCServer(test_utils.BaseTestCase):
|
||||
|
||||
class ServerSetupMixin(object):
|
||||
|
||||
class Server(object):
|
||||
def __init__(self, transport, topic, server, endpoint, serializer):
|
||||
@ -45,14 +48,13 @@ class TestRPCServer(test_utils.BaseTestCase):
|
||||
def deserialize_entity(self, ctxt, entity):
|
||||
return 'd' + (entity or '')
|
||||
|
||||
def setUp(self):
|
||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||
def __init__(self):
|
||||
self.serializer = self.TestSerializer()
|
||||
|
||||
def _setup_server(self, transport, endpoint, topic='testtopic'):
|
||||
def _setup_server(self, transport, endpoint, topic=None, server=None):
|
||||
server = self.Server(transport,
|
||||
topic=topic,
|
||||
server='testserver',
|
||||
topic=topic or 'testtopic',
|
||||
server=server or 'testserver',
|
||||
endpoint=endpoint,
|
||||
serializer=self.serializer)
|
||||
|
||||
@ -68,11 +70,21 @@ class TestRPCServer(test_utils.BaseTestCase):
|
||||
client.cast({}, 'stop')
|
||||
server_thread.join(timeout=30)
|
||||
|
||||
def _setup_client(self, transport):
|
||||
def _setup_client(self, transport, topic='testtopic'):
|
||||
return messaging.RPCClient(transport,
|
||||
messaging.Target(topic='testtopic'),
|
||||
messaging.Target(topic=topic),
|
||||
serializer=self.serializer)
|
||||
|
||||
|
||||
class TestRPCServer(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
def __init__(self, *args):
|
||||
super(TestRPCServer, self).__init__(*args)
|
||||
ServerSetupMixin.__init__(self)
|
||||
|
||||
def setUp(self):
|
||||
super(TestRPCServer, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
def test_constructor(self):
|
||||
transport = messaging.get_transport(self.conf, url='fake:')
|
||||
target = messaging.Target(topic='foo', server='bar')
|
||||
@ -194,8 +206,116 @@ class TestRPCServer(test_utils.BaseTestCase):
|
||||
|
||||
self._stop_server(client, server_thread)
|
||||
|
||||
|
||||
class TestMultipleServers(test_utils.BaseTestCase, ServerSetupMixin):
|
||||
|
||||
_exchanges = [
|
||||
('same_exchange', dict(exchange1=None, exchange2=None)),
|
||||
('diff_exchange', dict(exchange1='x1', exchange2='x2')),
|
||||
]
|
||||
|
||||
_topics = [
|
||||
('same_topic', dict(topic1='t', topic2='t')),
|
||||
('diff_topic', dict(topic1='t1', topic2='t2')),
|
||||
]
|
||||
|
||||
_server = [
|
||||
('same_server', dict(server1=None, server2=None)),
|
||||
('diff_server', dict(server1='s1', server2='s2')),
|
||||
]
|
||||
|
||||
_fanout = [
|
||||
('not_fanout', dict(fanout1=None, fanout2=None)),
|
||||
('fanout', dict(fanout1=True, fanout2=True)),
|
||||
]
|
||||
|
||||
_method = [
|
||||
('call', dict(call1=True, call2=True)),
|
||||
('cast', dict(call1=False, call2=False)),
|
||||
]
|
||||
|
||||
_endpoints = [
|
||||
('one_endpoint',
|
||||
dict(multi_endpoints=False,
|
||||
expect1=['ds1', 'ds2'],
|
||||
expect2=['ds1', 'ds2'])),
|
||||
('two_endpoints',
|
||||
dict(multi_endpoints=True,
|
||||
expect1=['ds1'],
|
||||
expect2=['ds2'])),
|
||||
]
|
||||
|
||||
@classmethod
|
||||
def generate_scenarios(cls):
|
||||
cls.scenarios = testscenarios.multiply_scenarios(cls._exchanges,
|
||||
cls._topics,
|
||||
cls._server,
|
||||
cls._fanout,
|
||||
cls._method,
|
||||
cls._endpoints)
|
||||
|
||||
# fanout call not supported
|
||||
def filter_fanout_call(scenario):
|
||||
params = scenario[1]
|
||||
fanout = params['fanout1'] or params['fanout2']
|
||||
call = params['call1'] or params['call2']
|
||||
return not (call and fanout)
|
||||
|
||||
# listening multiple times on same topic/server pair not supported
|
||||
def filter_same_topic_and_server(scenario):
|
||||
params = scenario[1]
|
||||
single_topic = params['topic1'] == params['topic2']
|
||||
single_server = params['server1'] == params['server2']
|
||||
return not (single_topic and single_server)
|
||||
|
||||
# fanout to multiple servers on same topic and exchange
|
||||
# each endpoint will receive both messages
|
||||
def fanout_to_servers(scenario):
|
||||
params = scenario[1]
|
||||
fanout = params['fanout1'] or params['fanout2']
|
||||
single_exchange = params['exchange1'] == params['exchange2']
|
||||
single_topic = params['topic1'] == params['topic2']
|
||||
multi_servers = params['server1'] != params['server2']
|
||||
if fanout and single_exchange and single_topic and multi_servers:
|
||||
params['expect1'] = params['expect1'][:] + params['expect1']
|
||||
params['expect2'] = params['expect2'][:] + params['expect2']
|
||||
return scenario
|
||||
|
||||
# multiple endpoints on same topic and exchange
|
||||
# either endpoint can get either message
|
||||
def single_topic_multi_endpoints(scenario):
|
||||
params = scenario[1]
|
||||
single_exchange = params['exchange1'] == params['exchange2']
|
||||
single_topic = params['topic1'] == params['topic2']
|
||||
if single_topic and single_exchange and params['multi_endpoints']:
|
||||
params['expect_either'] = (params['expect1'] +
|
||||
params['expect2'])
|
||||
params['expect1'] = params['expect2'] = []
|
||||
else:
|
||||
params['expect_either'] = []
|
||||
return scenario
|
||||
|
||||
for f in [filter_fanout_call, filter_same_topic_and_server]:
|
||||
cls.scenarios = filter(f, cls.scenarios)
|
||||
for m in [fanout_to_servers, single_topic_multi_endpoints]:
|
||||
cls.scenarios = map(m, cls.scenarios)
|
||||
|
||||
def __init__(self, *args):
|
||||
super(TestMultipleServers, self).__init__(*args)
|
||||
ServerSetupMixin.__init__(self)
|
||||
|
||||
def setUp(self):
|
||||
super(TestMultipleServers, self).setUp(conf=cfg.ConfigOpts())
|
||||
|
||||
def test_multiple_servers(self):
|
||||
transport = messaging.get_transport(self.conf, url='fake:')
|
||||
url1 = 'fake:///' + (self.exchange1 or '')
|
||||
url2 = 'fake:///' + (self.exchange2 or '')
|
||||
|
||||
transport1 = messaging.get_transport(self.conf, url=url1)
|
||||
if url1 != url2:
|
||||
transport2 = messaging.get_transport(self.conf, url=url1)
|
||||
else:
|
||||
transport2 = transport1
|
||||
|
||||
class TestEndpoint(object):
|
||||
def __init__(self):
|
||||
@ -204,21 +324,52 @@ class TestRPCServer(test_utils.BaseTestCase):
|
||||
def ping(self, ctxt, arg):
|
||||
self.pings.append(arg)
|
||||
|
||||
endpoint = TestEndpoint()
|
||||
def alive(self, ctxt):
|
||||
return 'alive'
|
||||
|
||||
thread1 = self._setup_server(transport, endpoint, topic='topic1')
|
||||
thread2 = self._setup_server(transport, endpoint, topic='topic2')
|
||||
if self.multi_endpoints:
|
||||
endpoint1, endpoint2 = TestEndpoint(), TestEndpoint()
|
||||
else:
|
||||
endpoint1 = endpoint2 = TestEndpoint()
|
||||
|
||||
client = self._setup_client(transport)
|
||||
thread1 = self._setup_server(transport1, endpoint1,
|
||||
topic=self.topic1, server=self.server1)
|
||||
thread2 = self._setup_server(transport2, endpoint2,
|
||||
topic=self.topic2, server=self.server2)
|
||||
|
||||
client.prepare(topic='topic1').cast({}, 'ping', arg='1')
|
||||
client.prepare(topic='topic2').cast({}, 'ping', arg='2')
|
||||
client1 = self._setup_client(transport1, topic=self.topic1)
|
||||
client2 = self._setup_client(transport2, topic=self.topic2)
|
||||
|
||||
client1 = client1.prepare(server=self.server1)
|
||||
client2 = client2.prepare(server=self.server2)
|
||||
|
||||
if self.fanout1:
|
||||
client1.call({}, 'alive')
|
||||
client1 = client1.prepare(fanout=True)
|
||||
if self.fanout2:
|
||||
client2.call({}, 'alive')
|
||||
client2 = client2.prepare(fanout=True)
|
||||
|
||||
(client1.call if self.call1 else client1.cast)({}, 'ping', arg='1')
|
||||
(client2.call if self.call2 else client2.cast)({}, 'ping', arg='2')
|
||||
|
||||
self.assertTrue(thread1.isAlive())
|
||||
self._stop_server(client, thread1, topic='topic1')
|
||||
self._stop_server(client1.prepare(fanout=None),
|
||||
thread1, topic=self.topic1)
|
||||
self.assertTrue(thread2.isAlive())
|
||||
self._stop_server(client, thread2, topic='topic2')
|
||||
self._stop_server(client2.prepare(fanout=None),
|
||||
thread2, topic=self.topic2)
|
||||
|
||||
self.assertEquals(len(endpoint.pings), 2)
|
||||
self.assertTrue('ds1' in endpoint.pings)
|
||||
self.assertTrue('ds2' in endpoint.pings)
|
||||
def check(pings, expect):
|
||||
self.assertEquals(len(pings), len(expect))
|
||||
for a in expect:
|
||||
self.assertTrue(a in pings)
|
||||
|
||||
if self.expect_either:
|
||||
check(endpoint1.pings + endpoint2.pings, self.expect_either)
|
||||
else:
|
||||
check(endpoint1.pings, self.expect1)
|
||||
check(endpoint2.pings, self.expect2)
|
||||
|
||||
|
||||
TestMultipleServers.generate_scenarios()
|
||||
|
Loading…
x
Reference in New Issue
Block a user