diff --git a/oslo_messaging/tests/functional/test_functional.py b/oslo_messaging/tests/functional/test_functional.py index f474ec75f..32cc0190c 100644 --- a/oslo_messaging/tests/functional/test_functional.py +++ b/oslo_messaging/tests/functional/test_functional.py @@ -11,10 +11,13 @@ # License for the specific language governing permissions and limitations # under the License. -import oslo_messaging +import time +import uuid +import concurrent.futures from testtools import matchers +import oslo_messaging from oslo_messaging.tests.functional import utils @@ -103,6 +106,27 @@ class CallTestCase(utils.SkipIfNoTransportURL): f = lambda: client.subtract(increment=3) self.assertThat(f, matchers.raises(ValueError)) + def test_timeout_with_concurrently_queues(self): + transport = self.useFixture(utils.TransportFixture(self.url)) + target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()), + server="server_" + str(uuid.uuid4())) + server = self.useFixture( + utils.RpcServerFixture(self.url, target, executor="threading")) + client = utils.ClientStub(transport.transport, target, + cast=False, timeout=5) + + def short_periodical_tasks(): + for i in range(10): + client.add(increment=1) + time.sleep(1) + + with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor: + future = executor.submit(client.long_running_task, seconds=10) + executor.submit(short_periodical_tasks) + self.assertRaises(oslo_messaging.MessagingTimeout, future.result) + + self.assertEqual(10, server.endpoint.ival) + class CastTestCase(utils.SkipIfNoTransportURL): # Note: casts return immediately, so these tests utilise a special diff --git a/oslo_messaging/tests/functional/utils.py b/oslo_messaging/tests/functional/utils.py index 3aaa2d607..215930ad5 100644 --- a/oslo_messaging/tests/functional/utils.py +++ b/oslo_messaging/tests/functional/utils.py @@ -46,6 +46,9 @@ class TestServerEndpoint(object): self.sval += text return self.sval + def long_running_task(self, ctxt, seconds): + time.sleep(seconds) + class TransportFixture(fixtures.Fixture): """Fixture defined to setup the oslo_messaging transport.""" @@ -69,11 +72,13 @@ class TransportFixture(fixtures.Fixture): class RpcServerFixture(fixtures.Fixture): """Fixture to setup the TestServerEndpoint.""" - def __init__(self, url, target, endpoint=None, ctrl_target=None): + def __init__(self, url, target, endpoint=None, ctrl_target=None, + executor='blocking'): super(RpcServerFixture, self).__init__() self.url = url self.target = target self.endpoint = endpoint or TestServerEndpoint() + self.executor = executor self.syncq = moves.queue.Queue() self.ctrl_target = ctrl_target or self.target @@ -81,9 +86,11 @@ class RpcServerFixture(fixtures.Fixture): super(RpcServerFixture, self).setUp() endpoints = [self.endpoint, self] transport = self.useFixture(TransportFixture(self.url)) - self.server = oslo_messaging.get_rpc_server(transport.transport, - self.target, - endpoints) + self.server = oslo_messaging.get_rpc_server( + transport=transport.transport, + target=self.target, + endpoints=endpoints, + executor=self.executor) self._ctrl = oslo_messaging.RPCClient(transport.transport, self.ctrl_target) self._start()