Add one more functional test for MessagingTimeout
Add a check, that MessagingTimeout raises on long-running queries, if client sends another queries at the same time. Added a long_running_task() to TestServerEndpoint and allowed to pass a message executor into the RpcServerFixture. Related bug: #1338732 Co-Authored-By: Roman Podoliaka <rpodolyaka@mirantis.com> Change-Id: Icafb6838e2d9fb76b6d1c202465c09c174a3bed9
This commit is contained in:
parent
3d483fda70
commit
cc4ca1f9ef
@ -11,10 +11,13 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import oslo_messaging
|
||||
import time
|
||||
import uuid
|
||||
|
||||
import concurrent.futures
|
||||
from testtools import matchers
|
||||
|
||||
import oslo_messaging
|
||||
from oslo_messaging.tests.functional import utils
|
||||
|
||||
|
||||
@ -103,6 +106,27 @@ class CallTestCase(utils.SkipIfNoTransportURL):
|
||||
f = lambda: client.subtract(increment=3)
|
||||
self.assertThat(f, matchers.raises(ValueError))
|
||||
|
||||
def test_timeout_with_concurrently_queues(self):
|
||||
transport = self.useFixture(utils.TransportFixture(self.url))
|
||||
target = oslo_messaging.Target(topic="topic_" + str(uuid.uuid4()),
|
||||
server="server_" + str(uuid.uuid4()))
|
||||
server = self.useFixture(
|
||||
utils.RpcServerFixture(self.url, target, executor="threading"))
|
||||
client = utils.ClientStub(transport.transport, target,
|
||||
cast=False, timeout=5)
|
||||
|
||||
def short_periodical_tasks():
|
||||
for i in range(10):
|
||||
client.add(increment=1)
|
||||
time.sleep(1)
|
||||
|
||||
with concurrent.futures.ThreadPoolExecutor(max_workers=2) as executor:
|
||||
future = executor.submit(client.long_running_task, seconds=10)
|
||||
executor.submit(short_periodical_tasks)
|
||||
self.assertRaises(oslo_messaging.MessagingTimeout, future.result)
|
||||
|
||||
self.assertEqual(10, server.endpoint.ival)
|
||||
|
||||
|
||||
class CastTestCase(utils.SkipIfNoTransportURL):
|
||||
# Note: casts return immediately, so these tests utilise a special
|
||||
|
@ -46,6 +46,9 @@ class TestServerEndpoint(object):
|
||||
self.sval += text
|
||||
return self.sval
|
||||
|
||||
def long_running_task(self, ctxt, seconds):
|
||||
time.sleep(seconds)
|
||||
|
||||
|
||||
class TransportFixture(fixtures.Fixture):
|
||||
"""Fixture defined to setup the oslo_messaging transport."""
|
||||
@ -69,11 +72,13 @@ class TransportFixture(fixtures.Fixture):
|
||||
class RpcServerFixture(fixtures.Fixture):
|
||||
"""Fixture to setup the TestServerEndpoint."""
|
||||
|
||||
def __init__(self, url, target, endpoint=None, ctrl_target=None):
|
||||
def __init__(self, url, target, endpoint=None, ctrl_target=None,
|
||||
executor='blocking'):
|
||||
super(RpcServerFixture, self).__init__()
|
||||
self.url = url
|
||||
self.target = target
|
||||
self.endpoint = endpoint or TestServerEndpoint()
|
||||
self.executor = executor
|
||||
self.syncq = moves.queue.Queue()
|
||||
self.ctrl_target = ctrl_target or self.target
|
||||
|
||||
@ -81,9 +86,11 @@ class RpcServerFixture(fixtures.Fixture):
|
||||
super(RpcServerFixture, self).setUp()
|
||||
endpoints = [self.endpoint, self]
|
||||
transport = self.useFixture(TransportFixture(self.url))
|
||||
self.server = oslo_messaging.get_rpc_server(transport.transport,
|
||||
self.target,
|
||||
endpoints)
|
||||
self.server = oslo_messaging.get_rpc_server(
|
||||
transport=transport.transport,
|
||||
target=self.target,
|
||||
endpoints=endpoints,
|
||||
executor=self.executor)
|
||||
self._ctrl = oslo_messaging.RPCClient(transport.transport,
|
||||
self.ctrl_target)
|
||||
self._start()
|
||||
|
Loading…
x
Reference in New Issue
Block a user