From 68d1ba0a7157850ecdee09d64063f4c004faeed5 Mon Sep 17 00:00:00 2001 From: Alyson Deives Pereira Date: Wed, 20 Jul 2022 12:26:15 -0300 Subject: [PATCH] Use eventlet instead of gevent However, the following tests are still failing: FAILED tests/test_middleware.py::test_server_inspect_exception_middleware - zerorpc.exceptions.LostRemote: Lost remote after 10s heartbeat FAILED tests/test_middleware_before_after_exec.py::test_hook_server_before_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware' FAILED tests/test_middleware_before_after_exec.py::test_hook_server_after_exec_puller - AssertionError: assert 'echo: test' == 'echo: test with a middleware' FAILED tests/test_pubpush.py::test_pubsub_inheritance - RuntimeError: The subscriber didn't receive any published message FAILED tests/test_pubpush.py::test_pubsub_composite - RuntimeError: The subscriber didn't receive any published message Signed-off-by: Alyson Deives Pereira --- setup.py | 13 +- tests/test_buffered_channel.py | 48 ++-- tests/test_client.py | 6 +- tests/test_client_async.py | 16 +- tests/test_client_heartbeat.py | 40 +-- tests/test_heartbeat.py | 46 ++-- tests/test_middleware.py | 88 +++--- tests/test_middleware_before_after_exec.py | 81 ++++-- tests/test_middleware_client.py | 76 ++++-- tests/test_pubpush.py | 35 ++- tests/test_reqstream.py | 6 +- tests/test_server.py | 18 +- tests/test_zmq.py | 8 +- tests/zmqbug.py | 270 ++++++++++++++---- tox.ini | 2 +- zerorpc/channel.py | 30 +- zerorpc/context.py | 2 +- zerorpc/core.py | 32 +-- zerorpc/events.py | 42 ++- zerorpc/gevent_zmq.py | 301 +++++++++++++-------- zerorpc/heartbeat.py | 20 +- 21 files changed, 748 insertions(+), 432 deletions(-) diff --git a/setup.py b/setup.py index b07ebcb..d57ddcb 100644 --- a/setup.py +++ b/setup.py @@ -44,12 +44,13 @@ requirements = [ if sys.version_info < (2, 7): requirements.append('argparse') -if sys.version_info < (2, 7): - requirements.append('gevent>=1.1.0,<1.2.0') -elif sys.version_info < (3, 0): - requirements.append('gevent>=1.0') -else: - requirements.append('gevent>=1.1') +# if sys.version_info < (2, 7): +# requirements.append('gevent>=1.1.0,<1.2.0') +# elif sys.version_info < (3, 0): +# requirements.append('gevent>=1.0') +# else: +# requirements.append('gevent>=1.1') +requirements.append('eventlet>=0.24.1') with open("README.rst", "r") as fh: long_description = fh.read() diff --git a/tests/test_buffered_channel.py b/tests/test_buffered_channel.py index 20b8173..f94152c 100644 --- a/tests/test_buffered_channel.py +++ b/tests/test_buffered_channel.py @@ -28,7 +28,7 @@ from __future__ import absolute_import from builtins import range import pytest -import gevent +import eventlet import sys from zerorpc import zmq @@ -57,7 +57,7 @@ def test_close_server_bufchan(): server_bufchan = zerorpc.BufferedChannel(server_hbchan) server_bufchan.recv() - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE SERVER SOCKET!!!') server_bufchan.close() if sys.version_info < (2, 7): @@ -92,7 +92,7 @@ def test_close_client_bufchan(): server_bufchan = zerorpc.BufferedChannel(server_hbchan) server_bufchan.recv() - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE CLIENT SOCKET!!!') client_bufchan.close() if sys.version_info < (2, 7): @@ -125,7 +125,7 @@ def test_heartbeat_can_open_channel_server_close(): server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2) server_bufchan = zerorpc.BufferedChannel(server_hbchan) - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE SERVER SOCKET!!!') server_bufchan.close() if sys.version_info < (2, 7): @@ -160,12 +160,12 @@ def test_heartbeat_can_open_channel_client_close(): server_bufchan = zerorpc.BufferedChannel(server_hbchan) try: while True: - gevent.sleep(1) + eventlet.sleep(1) finally: server_bufchan.close() - server_coro = gevent.spawn(server_fn) + server_coro = eventlet.spawn(server_fn) - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE CLIENT SOCKET!!!') client_bufchan.close() client.close() @@ -173,7 +173,7 @@ def test_heartbeat_can_open_channel_client_close(): pytest.raises(zerorpc.LostRemote, server_coro.get) else: with pytest.raises(zerorpc.LostRemote): - server_coro.get() + server_coro.wait() print('SERVER LOST CLIENT :)') server.close() @@ -200,7 +200,7 @@ def test_do_some_req_rep(): assert list(event.args) == [x + x * x] client_bufchan.close() - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() coro_pool.spawn(client_do) def server_do(): @@ -217,7 +217,7 @@ def test_do_some_req_rep(): coro_pool.spawn(server_do) - coro_pool.join() + coro_pool.waitall() client.close() server.close() @@ -250,7 +250,7 @@ def test_do_some_req_rep_lost_server(): client_bufchan.recv() client_bufchan.close() - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() coro_pool.spawn(client_do) def server_do(): @@ -266,7 +266,7 @@ def test_do_some_req_rep_lost_server(): coro_pool.spawn(server_do) - coro_pool.join() + coro_pool.waitall() client.close() server.close() @@ -293,7 +293,7 @@ def test_do_some_req_rep_lost_client(): assert list(event.args) == [x + x * x] client_bufchan.close() - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() coro_pool.spawn(client_do) def server_do(): @@ -316,7 +316,7 @@ def test_do_some_req_rep_lost_client(): coro_pool.spawn(server_do) - coro_pool.join() + coro_pool.waitall() client.close() server.close() @@ -353,7 +353,7 @@ def test_do_some_req_rep_client_timeout(): assert list(event.args) == [x] client_bufchan.close() - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() coro_pool.spawn(client_do) def server_do(): @@ -367,7 +367,7 @@ def test_do_some_req_rep_client_timeout(): for x in range(20): event = server_bufchan.recv() assert event.name == 'sleep' - gevent.sleep(TIME_FACTOR * event.args[0]) + eventlet.sleep(TIME_FACTOR * event.args[0]) server_bufchan.emit('OK', event.args) pytest.raises(zerorpc.LostRemote, _do_with_assert_raises) else: @@ -375,14 +375,14 @@ def test_do_some_req_rep_client_timeout(): for x in range(20): event = server_bufchan.recv() assert event.name == 'sleep' - gevent.sleep(TIME_FACTOR * event.args[0]) + eventlet.sleep(TIME_FACTOR * event.args[0]) server_bufchan.emit('OK', event.args) server_bufchan.close() coro_pool.spawn(server_do) - coro_pool.join() + coro_pool.waitall() client.close() server.close() @@ -410,7 +410,7 @@ def test_congestion_control_server_pushing(): read_cnt.value += 1 client_bufchan.close() - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() coro_pool.spawn(client_do) def server_do(): @@ -443,7 +443,7 @@ def test_congestion_control_server_pushing(): coro_pool.spawn(server_do) try: - coro_pool.join() + coro_pool.waitall() except zerorpc.LostRemote: pass finally: @@ -485,7 +485,7 @@ def test_on_close_if(): if event.name == 'done': return seen.append(event.args) - gevent.sleep(0.1) + eventlet.sleep(0.1) def server_do(): for i in range(0, 10): @@ -494,12 +494,12 @@ def test_on_close_if(): client_bufchan.on_close_if = is_stream_done - coro_pool = gevent.pool.Pool() + coro_pool = eventlet.greenpool.GreenPool() g1 = coro_pool.spawn(client_do) g2 = coro_pool.spawn(server_do) - g1.get() # Re-raise any exceptions... - g2.get() + g1.wait() # Re-raise any exceptions... + g2.wait() assert seen == [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] diff --git a/tests/test_client.py b/tests/test_client.py index 6a692b3..b9e3be3 100644 --- a/tests/test_client.py +++ b/tests/test_client.py @@ -24,7 +24,7 @@ from __future__ import absolute_import -import gevent +import eventlet import zerorpc from .testutils import teardown, random_ipc_endpoint @@ -39,7 +39,7 @@ def test_client_connect(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client() client.connect(endpoint) @@ -56,7 +56,7 @@ def test_client_quick_connect(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(endpoint) diff --git a/tests/test_client_async.py b/tests/test_client_async.py index ced4b1f..a2272ff 100644 --- a/tests/test_client_async.py +++ b/tests/test_client_async.py @@ -26,7 +26,7 @@ from __future__ import print_function from __future__ import absolute_import import pytest -import gevent +import eventlet import sys from zerorpc import zmq @@ -43,12 +43,12 @@ def test_client_server_client_timeout_with_async(): return 42 def add(self, a, b): - gevent.sleep(TIME_FACTOR * 10) + eventlet.sleep(TIME_FACTOR * 10) return a + b srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(timeout=TIME_FACTOR * 2) client.connect(endpoint) @@ -57,11 +57,11 @@ def test_client_server_client_timeout_with_async(): if sys.version_info < (2, 7): def _do_with_assert_raises(): - print(async_result.get()) + print(async_result.wait()) pytest.raises(zerorpc.TimeoutExpired, _do_with_assert_raises) else: with pytest.raises(zerorpc.TimeoutExpired): - print(async_result.get()) + print(async_result.wait()) client.close() srv.close() @@ -79,13 +79,13 @@ def test_client_server_with_async(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client() client.connect(endpoint) async_result = client.lolita(async_=True) - assert async_result.get() == 42 + assert async_result.wait() == 42 async_result = client.add(1, 4, async_=True) - assert async_result.get() == 5 + assert async_result.wait() == 5 diff --git a/tests/test_client_heartbeat.py b/tests/test_client_heartbeat.py index 6b552a4..908c866 100644 --- a/tests/test_client_heartbeat.py +++ b/tests/test_client_heartbeat.py @@ -28,7 +28,7 @@ from __future__ import absolute_import from builtins import next from builtins import range -import gevent +import eventlet import zerorpc from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR @@ -43,11 +43,11 @@ def test_client_server_hearbeat(): return 42 def slow(self): - gevent.sleep(TIME_FACTOR * 10) + eventlet.sleep(TIME_FACTOR * 10) srv = MySrv(heartbeat=TIME_FACTOR * 1) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(heartbeat=TIME_FACTOR * 1) client.connect(endpoint) @@ -62,13 +62,13 @@ def test_client_server_activate_heartbeat(): class MySrv(zerorpc.Server): def lolita(self): - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) return 42 srv = MySrv(heartbeat=TIME_FACTOR * 4) srv.bind(endpoint) - gevent.spawn(srv.run) - gevent.sleep(0) + eventlet.spawn(srv.run) + eventlet.sleep(0) client = zerorpc.Client(heartbeat=TIME_FACTOR * 4) client.connect(endpoint) @@ -86,13 +86,13 @@ def test_client_server_passive_hearbeat(): return 42 def slow(self): - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) return 2 srv = MySrv(heartbeat=TIME_FACTOR * 4) srv.bind(endpoint) - gevent.spawn(srv.run) - gevent.sleep(0) + eventlet.spawn(srv.run) + eventlet.sleep(0) client = zerorpc.Client(heartbeat=TIME_FACTOR * 4, passive_heartbeat=True) client.connect(endpoint) @@ -112,16 +112,16 @@ def test_client_hb_doesnt_linger_on_streaming(): srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) def test_client(): assert list(client1.iter()) == list(range(42)) print('sleep 3s') - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) - gevent.spawn(test_client).join() + eventlet.spawn(test_client).wait() def est_client_drop_few(): @@ -134,7 +134,7 @@ def est_client_drop_few(): srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) client2 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) @@ -143,7 +143,7 @@ def est_client_drop_few(): assert client1.lolita() == 42 assert client2.lolita() == 42 - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) assert client3.lolita() == 42 @@ -158,7 +158,7 @@ def test_client_drop_empty_stream(): srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) @@ -167,9 +167,9 @@ def test_client_drop_empty_stream(): i = client1.iter() print('sleep 3s') - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) - gevent.spawn(test_client).join() + eventlet.spawn(test_client).wait() def test_client_drop_stream(): @@ -183,7 +183,7 @@ def test_client_drop_stream(): srv = MySrv(heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client1 = zerorpc.Client(endpoint, heartbeat=TIME_FACTOR * 1, context=zerorpc.Context()) @@ -195,6 +195,6 @@ def test_client_drop_stream(): assert list(next(i) for x in range(142)) == list(range(142)) print('sleep 3s') - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) - gevent.spawn(test_client).join() + eventlet.spawn(test_client).wait() diff --git a/tests/test_heartbeat.py b/tests/test_heartbeat.py index 14c66fd..c34d204 100644 --- a/tests/test_heartbeat.py +++ b/tests/test_heartbeat.py @@ -28,7 +28,7 @@ from __future__ import absolute_import from builtins import range import pytest -import gevent +import eventlet import sys from zerorpc import zmq @@ -55,7 +55,7 @@ def test_close_server_hbchan(): server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2) server_hbchan.recv() - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE SERVER SOCKET!!!') server_hbchan.close() if sys.version_info < (2, 7): @@ -88,7 +88,7 @@ def test_close_client_hbchan(): server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2) server_hbchan.recv() - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE CLIENT SOCKET!!!') client_hbchan.close() if sys.version_info < (2, 7): @@ -119,7 +119,7 @@ def test_heartbeat_can_open_channel_server_close(): server_channel = server.channel(event) server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2) - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE SERVER SOCKET!!!') server_hbchan.close() if sys.version_info < (2, 7): @@ -150,7 +150,7 @@ def test_heartbeat_can_open_channel_client_close(): server_channel = server.channel(event) server_hbchan = zerorpc.HeartBeatOnChannel(server_channel, freq=TIME_FACTOR * 2) - gevent.sleep(TIME_FACTOR * 3) + eventlet.sleep(TIME_FACTOR * 3) print('CLOSE CLIENT SOCKET!!!') client_hbchan.close() client.close() @@ -189,7 +189,7 @@ def test_do_some_req_rep(): assert list(event.args) == [x + x * x] client_hbchan.close() - client_task = gevent.spawn(client_do) + client_task = eventlet.spawn(client_do) def server_do(): for x in range(20): @@ -198,10 +198,10 @@ def test_do_some_req_rep(): server_hbchan.emit('OK', (sum(event.args),)) server_hbchan.close() - server_task = gevent.spawn(server_do) + server_task = eventlet.spawn(server_do) - server_task.get() - client_task.get() + server_task.wait() + client_task.wait() client.close() server.close() @@ -233,7 +233,7 @@ def test_do_some_req_rep_lost_server(): client_hbchan.recv() client_hbchan.close() - client_task = gevent.spawn(client_do) + client_task = eventlet.spawn(client_do) def server_do(): event = server.recv() @@ -245,10 +245,10 @@ def test_do_some_req_rep_lost_server(): server_hbchan.emit('OK', (sum(event.args),)) server_hbchan.close() - server_task = gevent.spawn(server_do) + server_task = eventlet.spawn(server_do) - server_task.get() - client_task.get() + server_task.wait() + client_task.wait() client.close() server.close() @@ -274,7 +274,7 @@ def test_do_some_req_rep_lost_client(): assert list(event.args) == [x + x * x] client_hbchan.close() - client_task = gevent.spawn(client_do) + client_task = eventlet.spawn(client_do) def server_do(): event = server.recv() @@ -293,10 +293,10 @@ def test_do_some_req_rep_lost_client(): server_hbchan.recv() server_hbchan.close() - server_task = gevent.spawn(server_do) + server_task = eventlet.spawn(server_do) - server_task.get() - client_task.get() + server_task.wait() + client_task.wait() client.close() server.close() @@ -332,7 +332,7 @@ def test_do_some_req_rep_client_timeout(): assert list(event.args) == [x] client_hbchan.close() - client_task = gevent.spawn(client_do) + client_task = eventlet.spawn(client_do) def server_do(): event = server.recv() @@ -344,7 +344,7 @@ def test_do_some_req_rep_client_timeout(): for x in range(20): event = server_hbchan.recv() assert event.name == 'sleep' - gevent.sleep(TIME_FACTOR * event.args[0]) + eventlet.sleep(TIME_FACTOR * event.args[0]) server_hbchan.emit('OK', event.args) pytest.raises(zerorpc.LostRemote, _do_with_assert_raises) else: @@ -352,13 +352,13 @@ def test_do_some_req_rep_client_timeout(): for x in range(20): event = server_hbchan.recv() assert event.name == 'sleep' - gevent.sleep(TIME_FACTOR * event.args[0]) + eventlet.sleep(TIME_FACTOR * event.args[0]) server_hbchan.emit('OK', event.args) server_hbchan.close() - server_task = gevent.spawn(server_do) + server_task = eventlet.spawn(server_do) - server_task.get() - client_task.get() + server_task.wait() + client_task.wait() client.close() server.close() diff --git a/tests/test_middleware.py b/tests/test_middleware.py index 3163a3a..12ba899 100644 --- a/tests/test_middleware.py +++ b/tests/test_middleware.py @@ -26,11 +26,13 @@ from __future__ import print_function from __future__ import absolute_import from builtins import str + +import greenlet from future.utils import tobytes import pytest -import gevent -import gevent.local +import eventlet +import eventlet.corolocal import random import hashlib import sys @@ -109,7 +111,7 @@ def test_resolve_endpoint_events(): cnt = c.register_middleware(Resolver()) assert cnt == 1 srv.bind('some_service') - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(heartbeat=TIME_FACTOR * 1, context=c) client.connect('some_service') @@ -123,7 +125,7 @@ class Tracer(object): '''Used by test_task_context_* tests''' def __init__(self, identity): self._identity = identity - self._locals = gevent.local.local() + self._locals = eventlet.corolocal.local() self._log = [] @property @@ -169,7 +171,7 @@ def test_task_context(): srv = zerorpc.Server(Srv(), context=srv_ctx) srv.bind(endpoint) - srv_task = gevent.spawn(srv.run) + srv_task = eventlet.spawn(srv.run) c = zerorpc.Client(context=cli_ctx) c.connect(endpoint) @@ -179,7 +181,10 @@ def test_task_context(): assert x == 42 srv.stop() - srv_task.join() + try: + srv_task.wait() + except greenlet.GreenletExit: + pass assert cli_tracer._log == [ ('new', cli_tracer.trace_id), @@ -212,7 +217,7 @@ def test_task_context_relay(): srv = zerorpc.Server(Srv(), context=srv_ctx) srv.bind(endpoint1) - srv_task = gevent.spawn(srv.run) + srv_task = eventlet.spawn(srv.run) c_relay = zerorpc.Client(context=srv_relay_ctx) c_relay.connect(endpoint1) @@ -223,7 +228,7 @@ def test_task_context_relay(): srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx) srv_relay.bind(endpoint2) - srv_relay_task = gevent.spawn(srv_relay.run) + srv_relay_task = eventlet.spawn(srv_relay.run) c = zerorpc.Client(context=cli_ctx) c.connect(endpoint2) @@ -232,8 +237,14 @@ def test_task_context_relay(): srv_relay.stop() srv.stop() - srv_relay_task.join() - srv_task.join() + try: + srv_relay_task.wait() + except greenlet.GreenletExit: + pass + try: + srv_task.wait() + except greenlet.GreenletExit: + pass assert cli_tracer._log == [ ('new', cli_tracer.trace_id), @@ -268,7 +279,7 @@ def test_task_context_relay_fork(): srv = zerorpc.Server(Srv(), context=srv_ctx) srv.bind(endpoint1) - srv_task = gevent.spawn(srv.run) + srv_task = eventlet.spawn(srv.run) c_relay = zerorpc.Client(context=srv_relay_ctx) c_relay.connect(endpoint1) @@ -277,16 +288,16 @@ def test_task_context_relay_fork(): def echo(self, msg): def dothework(msg): return c_relay.echo(msg) + 'relayed' - g = gevent.spawn(zerorpc.fork_task_context(dothework, + g = eventlet.spawn(zerorpc.fork_task_context(dothework, srv_relay_ctx), 'relay' + msg) print('relaying in separate task:', g) - r = g.get() + r = g.wait() print('back to main task') return r srv_relay = zerorpc.Server(SrvRelay(), context=srv_relay_ctx) srv_relay.bind(endpoint2) - srv_relay_task = gevent.spawn(srv_relay.run) + srv_relay_task = eventlet.spawn(srv_relay.run) c = zerorpc.Client(context=cli_ctx) c.connect(endpoint2) @@ -295,8 +306,15 @@ def test_task_context_relay_fork(): srv_relay.stop() srv.stop() - srv_relay_task.join() - srv_task.join() + try: + srv_relay_task.wait() + except greenlet.GreenletExit: + pass + + try: + srv_task.wait() + except greenlet.GreenletExit: + pass assert cli_tracer._log == [ ('new', cli_tracer.trace_id), @@ -324,25 +342,28 @@ def test_task_context_pushpull(): pusher_tracer = Tracer('[pusher]') pusher_ctx.register_middleware(pusher_tracer) - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Puller(object): def echo(self, msg): - trigger.set() + trigger.send() puller = zerorpc.Puller(Puller(), context=puller_ctx) puller.bind(endpoint) - puller_task = gevent.spawn(puller.run) + puller_task = eventlet.spawn(puller.run) c = zerorpc.Pusher(context=pusher_ctx) c.connect(endpoint) - trigger.clear() + # trigger.reset() c.echo('hello') trigger.wait() puller.stop() - puller_task.join() + try: + puller_task.wait() + except greenlet.GreenletExit: + pass assert pusher_tracer._log == [ ('new', pusher_tracer.trace_id), @@ -362,29 +383,32 @@ def test_task_context_pubsub(): publisher_tracer = Tracer('[publisher]') publisher_ctx.register_middleware(publisher_tracer) - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Subscriber(object): def echo(self, msg): - trigger.set() + trigger.send() subscriber = zerorpc.Subscriber(Subscriber(), context=subscriber_ctx) subscriber.bind(endpoint) - subscriber_task = gevent.spawn(subscriber.run) + subscriber_task = eventlet.spawn(subscriber.run) c = zerorpc.Publisher(context=publisher_ctx) c.connect(endpoint) - trigger.clear() + # trigger.reset() # We need this retry logic to wait that the subscriber.run coroutine starts # reading (the published messages will go to /dev/null until then). - while not trigger.is_set(): + while not trigger.ready(): c.echo('pub...') if trigger.wait(TIME_FACTOR * 1): break subscriber.stop() - subscriber_task.join() + try: + subscriber_task.wait() + except greenlet.GreenletExit: + pass print(publisher_tracer._log) assert ('new', publisher_tracer.trace_id) in publisher_tracer._log @@ -429,7 +453,7 @@ def test_server_inspect_exception_middleware(): module = Srv() server = zerorpc.Server(module, context=ctx) server.bind(endpoint) - gevent.spawn(server.run) + eventlet.spawn(server.run) client = zerorpc.Client() client.connect(endpoint) @@ -447,7 +471,7 @@ def test_server_inspect_exception_middleware(): def test_server_inspect_exception_middleware_puller(): endpoint = random_ipc_endpoint() - barrier = gevent.event.Event() + barrier = eventlet.event.Event() middleware = InspectExceptionMiddleware(barrier) ctx = zerorpc.Context() ctx.register_middleware(middleware) @@ -455,12 +479,12 @@ def test_server_inspect_exception_middleware_puller(): module = Srv() server = zerorpc.Puller(module, context=ctx) server.bind(endpoint) - gevent.spawn(server.run) + eventlet.spawn(server.run) client = zerorpc.Pusher() client.connect(endpoint) - barrier.clear() + # barrier.reset() client.echo('This is a test which should call the InspectExceptionMiddleware') barrier.wait(timeout=TIME_FACTOR * 2) @@ -479,7 +503,7 @@ def test_server_inspect_exception_middleware_stream(): module = Srv() server = zerorpc.Server(module, context=ctx) server.bind(endpoint) - gevent.spawn(server.run) + eventlet.spawn(server.run) client = zerorpc.Client() client.connect(endpoint) diff --git a/tests/test_middleware_before_after_exec.py b/tests/test_middleware_before_after_exec.py index 5dafeb0..32bfc4c 100644 --- a/tests/test_middleware_before_after_exec.py +++ b/tests/test_middleware_before_after_exec.py @@ -25,7 +25,9 @@ from __future__ import absolute_import from builtins import range -import gevent +import eventlet +import greenlet + import zerorpc from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR @@ -39,7 +41,7 @@ class EchoModule(object): def echo(self, msg): self.last_msg = 'echo: ' + msg if self._trigger: - self._trigger.set() + self._trigger.send() return self.last_msg @zerorpc.stream @@ -63,7 +65,7 @@ def test_hook_server_before_exec(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -78,17 +80,20 @@ def test_hook_server_before_exec(): assert test_middleware.called == True test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_before_exec_puller(): zero_ctx = zerorpc.Context() - trigger = gevent.event.Event() + trigger = eventlet.event.Event() endpoint = random_ipc_endpoint() echo_module = EchoModule(trigger) test_server = zerorpc.Puller(echo_module, context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Pusher() test_client.connect(endpoint) @@ -96,7 +101,7 @@ def test_hook_server_before_exec_puller(): test_client.echo("test") trigger.wait(timeout=TIME_FACTOR * 2) assert echo_module.last_msg == "echo: test" - trigger.clear() + # trigger.reset() # Test with a middleware test_middleware = ServerBeforeExecMiddleware() @@ -108,7 +113,10 @@ def test_hook_server_before_exec_puller(): assert test_middleware.called == True test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_before_exec_stream(): zero_ctx = zerorpc.Context() @@ -116,7 +124,7 @@ def test_hook_server_before_exec_stream(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -135,7 +143,10 @@ def test_hook_server_before_exec_stream(): assert echo == "echo: test" test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass class ServerAfterExecMiddleware(object): @@ -153,7 +164,7 @@ def test_hook_server_after_exec(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -170,17 +181,20 @@ def test_hook_server_after_exec(): assert test_middleware.reply_event_name == 'OK' test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_after_exec_puller(): zero_ctx = zerorpc.Context() - trigger = gevent.event.Event() + trigger = eventlet.event.Event() endpoint = random_ipc_endpoint() echo_module = EchoModule(trigger) test_server = zerorpc.Puller(echo_module, context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Pusher() test_client.connect(endpoint) @@ -188,7 +202,7 @@ def test_hook_server_after_exec_puller(): test_client.echo("test") trigger.wait(timeout=TIME_FACTOR * 2) assert echo_module.last_msg == "echo: test" - trigger.clear() + # trigger.reset() # Test with a middleware test_middleware = ServerAfterExecMiddleware() @@ -202,7 +216,10 @@ def test_hook_server_after_exec_puller(): assert test_middleware.reply_event_name is None test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_after_exec_stream(): zero_ctx = zerorpc.Context() @@ -210,7 +227,7 @@ def test_hook_server_after_exec_stream(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -232,7 +249,10 @@ def test_hook_server_after_exec_stream(): assert test_middleware.reply_event_name == 'STREAM_DONE' test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass class BrokenEchoModule(object): @@ -246,7 +266,7 @@ class BrokenEchoModule(object): raise RuntimeError("BrokenEchoModule") finally: if self._trigger: - self._trigger.set() + self._trigger.send() @zerorpc.stream def echoes(self, msg): @@ -258,7 +278,7 @@ def test_hook_server_after_exec_on_error(): test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -272,17 +292,20 @@ def test_hook_server_after_exec_on_error(): assert test_middleware.called == False test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_after_exec_on_error_puller(): zero_ctx = zerorpc.Context() - trigger = gevent.event.Event() + trigger = eventlet.event.Event() endpoint = random_ipc_endpoint() echo_module = BrokenEchoModule(trigger) test_server = zerorpc.Puller(echo_module, context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Pusher() test_client.connect(endpoint) @@ -298,7 +321,10 @@ def test_hook_server_after_exec_on_error_puller(): assert test_middleware.called == False test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_server_after_exec_on_error_stream(): zero_ctx = zerorpc.Context() @@ -306,7 +332,7 @@ def test_hook_server_after_exec_on_error_stream(): test_server = zerorpc.Server(BrokenEchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client() test_client.connect(endpoint) @@ -320,4 +346,7 @@ def test_hook_server_after_exec_on_error_stream(): assert test_middleware.called == False test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass diff --git a/tests/test_middleware_client.py b/tests/test_middleware_client.py index 943985e..64f7b5a 100644 --- a/tests/test_middleware_client.py +++ b/tests/test_middleware_client.py @@ -25,7 +25,9 @@ from __future__ import absolute_import from builtins import range -import gevent +import eventlet +import greenlet + import zerorpc from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR @@ -62,7 +64,7 @@ class EchoModule(object): def timeout(self, msg): self.last_msg = "timeout: " + msg - gevent.sleep(TIME_FACTOR * 2) + eventlet.sleep(TIME_FACTOR * 2) def test_hook_client_before_request(): @@ -78,7 +80,7 @@ def test_hook_client_before_request(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -93,7 +95,10 @@ def test_hook_client_before_request(): assert test_middleware.method == 'echo' test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass class ClientAfterRequestMiddleware(object): def __init__(self): @@ -111,7 +116,7 @@ def test_hook_client_after_request(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -126,7 +131,10 @@ def test_hook_client_after_request(): assert test_middleware.retcode == 'OK' test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_after_request_stream(): zero_ctx = zerorpc.Context() @@ -134,7 +142,7 @@ def test_hook_client_after_request_stream(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -156,7 +164,10 @@ def test_hook_client_after_request_stream(): assert test_middleware.retcode == 'STREAM_DONE' test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_after_request_timeout(): @@ -176,7 +187,7 @@ def test_hook_client_after_request_timeout(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx) test_client.connect(endpoint) @@ -189,7 +200,10 @@ def test_hook_client_after_request_timeout(): assert "timeout" in ex.args[0] test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass class ClientAfterFailedRequestMiddleware(object): def __init__(self): @@ -212,7 +226,7 @@ def test_hook_client_after_request_remote_error(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx) test_client.connect(endpoint) @@ -224,7 +238,10 @@ def test_hook_client_after_request_remote_error(): assert test_middleware.called == True test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_after_request_remote_error_stream(): @@ -235,7 +252,7 @@ def test_hook_client_after_request_remote_error_stream(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(timeout=TIME_FACTOR * 1, context=zero_ctx) test_client.connect(endpoint) @@ -247,7 +264,10 @@ def test_hook_client_after_request_remote_error_stream(): assert test_middleware.called == True test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_handle_remote_error_inspect(): @@ -264,7 +284,7 @@ def test_hook_client_handle_remote_error_inspect(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -277,7 +297,10 @@ def test_hook_client_handle_remote_error_inspect(): assert ex.name == "RuntimeError" test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass # This is a seriously broken idea, but possible nonetheless class ClientEvalRemoteErrorMiddleware(object): @@ -298,7 +321,7 @@ def test_hook_client_handle_remote_error_eval(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -311,7 +334,10 @@ def test_hook_client_handle_remote_error_eval(): assert "BrokenEchoModule" in ex.args[0] test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_handle_remote_error_eval_stream(): test_middleware = ClientEvalRemoteErrorMiddleware() @@ -321,7 +347,7 @@ def test_hook_client_handle_remote_error_eval_stream(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -334,7 +360,10 @@ def test_hook_client_handle_remote_error_eval_stream(): assert "BrokenEchoModule" in ex.args[0] test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass def test_hook_client_after_request_custom_error(): @@ -360,7 +389,7 @@ def test_hook_client_after_request_custom_error(): test_server = zerorpc.Server(EchoModule(), context=zero_ctx) test_server.bind(endpoint) - test_server_task = gevent.spawn(test_server.run) + test_server_task = eventlet.spawn(test_server.run) test_client = zerorpc.Client(context=zero_ctx) test_client.connect(endpoint) @@ -373,4 +402,7 @@ def test_hook_client_after_request_custom_error(): assert "BrokenEchoModule" in ex.args[0] test_server.stop() - test_server_task.join() + try: + test_server_task.wait() + except greenlet.GreenletExit: + pass diff --git a/tests/test_pubpush.py b/tests/test_pubpush.py index a99f9b4..512a1a0 100644 --- a/tests/test_pubpush.py +++ b/tests/test_pubpush.py @@ -27,8 +27,7 @@ from __future__ import print_function from __future__ import absolute_import from builtins import range -import gevent -import gevent.event +import eventlet import zerorpc from .testutils import teardown, random_ipc_endpoint @@ -39,19 +38,19 @@ def test_pushpull_inheritance(): pusher = zerorpc.Pusher() pusher.bind(endpoint) - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Puller(zerorpc.Puller): def lolita(self, a, b): print('lolita', a, b) assert a + b == 3 - trigger.set() + trigger.send() puller = Puller() puller.connect(endpoint) - gevent.spawn(puller.run) + eventlet.spawn(puller.run) - trigger.clear() + # trigger.reset() pusher.lolita(1, 2) trigger.wait() print('done') @@ -62,19 +61,19 @@ def test_pubsub_inheritance(): publisher = zerorpc.Publisher() publisher.bind(endpoint) - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Subscriber(zerorpc.Subscriber): def lolita(self, a, b): print('lolita', a, b) assert a + b == 3 - trigger.set() + trigger.send() subscriber = Subscriber() subscriber.connect(endpoint) - gevent.spawn(subscriber.run) + eventlet.spawn(subscriber.run) - trigger.clear() + # trigger.reset() # We need this retry logic to wait that the subscriber.run coroutine starts # reading (the published messages will go to /dev/null until then). for attempt in range(0, 10): @@ -87,13 +86,13 @@ def test_pubsub_inheritance(): def test_pushpull_composite(): endpoint = random_ipc_endpoint() - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Puller(object): def lolita(self, a, b): print('lolita', a, b) assert a + b == 3 - trigger.set() + trigger.send() pusher = zerorpc.Pusher() pusher.bind(endpoint) @@ -101,9 +100,9 @@ def test_pushpull_composite(): service = Puller() puller = zerorpc.Puller(service) puller.connect(endpoint) - gevent.spawn(puller.run) + eventlet.spawn(puller.run) - trigger.clear() + # trigger.reset() pusher.lolita(1, 2) trigger.wait() print('done') @@ -111,13 +110,13 @@ def test_pushpull_composite(): def test_pubsub_composite(): endpoint = random_ipc_endpoint() - trigger = gevent.event.Event() + trigger = eventlet.event.Event() class Subscriber(object): def lolita(self, a, b): print('lolita', a, b) assert a + b == 3 - trigger.set() + trigger.send() publisher = zerorpc.Publisher() publisher.bind(endpoint) @@ -125,9 +124,9 @@ def test_pubsub_composite(): service = Subscriber() subscriber = zerorpc.Subscriber(service) subscriber.connect(endpoint) - gevent.spawn(subscriber.run) + eventlet.spawn(subscriber.run) - trigger.clear() + # trigger.reset() # We need this retry logic to wait that the subscriber.run coroutine starts # reading (the published messages will go to /dev/null until then). for attempt in range(0, 10): diff --git a/tests/test_reqstream.py b/tests/test_reqstream.py index 71e1511..2cb9266 100644 --- a/tests/test_reqstream.py +++ b/tests/test_reqstream.py @@ -27,7 +27,7 @@ from __future__ import print_function from __future__ import absolute_import from builtins import range -import gevent +import eventlet import zerorpc from .testutils import teardown, random_ipc_endpoint, TIME_FACTOR @@ -55,7 +55,7 @@ def test_rcp_streaming(): srv = MySrv(heartbeat=TIME_FACTOR * 4) srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(heartbeat=TIME_FACTOR * 4) client.connect(endpoint) @@ -67,7 +67,7 @@ def test_rcp_streaming(): assert isinstance(r, Iterator) l = [] print('wait 4s for fun') - gevent.sleep(TIME_FACTOR * 4) + eventlet.sleep(TIME_FACTOR * 4) for x in r: l.append(x) assert l == list(range(10)) diff --git a/tests/test_server.py b/tests/test_server.py index 86997a9..a58f5eb 100644 --- a/tests/test_server.py +++ b/tests/test_server.py @@ -28,7 +28,7 @@ from __future__ import absolute_import from builtins import range import pytest -import gevent +import eventlet import sys from zerorpc import zmq @@ -49,7 +49,7 @@ def test_server_manual(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client_events = zerorpc.Events(zmq.DEALER) client_events.connect(endpoint) @@ -82,7 +82,7 @@ def test_client_server(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client() client.connect(endpoint) @@ -103,12 +103,12 @@ def test_client_server_client_timeout(): return 42 def add(self, a, b): - gevent.sleep(TIME_FACTOR * 10) + eventlet.sleep(TIME_FACTOR * 10) return a + b srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(timeout=TIME_FACTOR * 2) client.connect(endpoint) @@ -132,7 +132,7 @@ def test_client_server_exception(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(timeout=TIME_FACTOR * 2) client.connect(endpoint) @@ -159,7 +159,7 @@ def test_client_server_detailed_exception(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client = zerorpc.Client(timeout=TIME_FACTOR * 2) client.connect(endpoint) @@ -192,7 +192,7 @@ def test_exception_compat_v1(): srv = MySrv() srv.bind(endpoint) - gevent.spawn(srv.run) + eventlet.spawn(srv.run) client_events = zerorpc.Events(zmq.DEALER) client_events.connect(endpoint) @@ -215,7 +215,7 @@ def test_exception_compat_v1(): assert event.name == 'ERR' (msg,) = event.args print('msg only', msg) - assert msg == "NameError('donotexist',)" + assert msg == "NameError('donotexist')" client_events.close() srv.close() diff --git a/tests/test_zmq.py b/tests/test_zmq.py index 1e7b4dd..18ee39f 100644 --- a/tests/test_zmq.py +++ b/tests/test_zmq.py @@ -25,7 +25,7 @@ from __future__ import print_function from __future__ import absolute_import -import gevent +import eventlet from zerorpc import zmq from .testutils import teardown, random_ipc_endpoint @@ -61,6 +61,6 @@ def test1(): s.close() c.term() - s = gevent.spawn(server) - c = gevent.spawn(client) - c.join() + s = eventlet.spawn(server) + c = eventlet.spawn(client) + c.wait() diff --git a/tests/zmqbug.py b/tests/zmqbug.py index 1d102a2..da83fd2 100644 --- a/tests/zmqbug.py +++ b/tests/zmqbug.py @@ -28,29 +28,147 @@ from __future__ import print_function import zmq +from zmq.constants import * -import gevent.event -import gevent.core +import eventlet +import eventlet.hubs +import greenlet +from collections import deque STOP_EVERYTHING = False +class LockReleaseError(Exception): + pass + +class _QueueLock(object): + """A Lock that can be acquired by at most one thread. Any other + thread calling acquire will be blocked in a queue. When release + is called, the threads are awoken in the order they blocked, + one at a time. This lock can be required recursively by the same + thread.""" + + def __init__(self): + self._waiters = deque() + self._count = 0 + self._holder = None + self._hub = eventlet.hubs.get_hub() + + def __nonzero__(self): + return bool(self._count) + + __bool__ = __nonzero__ + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + def acquire(self): + current = greenlet.getcurrent() + if (self._waiters or self._count > 0) and self._holder is not current: + # block until lock is free + self._waiters.append(current) + self._hub.switch() + w = self._waiters.popleft() + + assert w is current, 'Waiting threads woken out of order' + assert self._count == 0, 'After waking a thread, the lock must be unacquired' + + self._holder = current + self._count += 1 + + def release(self): + if self._count <= 0: + raise LockReleaseError("Cannot release unacquired lock") + + self._count -= 1 + if self._count == 0: + self._holder = None + if self._waiters: + # wake next + self._hub.schedule_call_global(0, self._waiters[0].switch) + + +class _BlockedThread(object): + """Is either empty, or represents a single blocked thread that + blocked itself by calling the block() method. The thread can be + awoken by calling wake(). Wake() can be called multiple times and + all but the first call will have no effect.""" + + def __init__(self): + self._blocked_thread = None + self._wakeupper = None + self._hub = eventlet.hubs.get_hub() + + def __nonzero__(self): + return self._blocked_thread is not None + + __bool__ = __nonzero__ + + def block(self, deadline=None): + if self._blocked_thread is not None: + raise Exception("Cannot block more than one thread on one BlockedThread") + self._blocked_thread = greenlet.getcurrent() + + if deadline is not None: + self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake) + + try: + self._hub.switch() + finally: + self._blocked_thread = None + # cleanup the wakeup task + if self._wakeupper is not None: + # Important to cancel the wakeup task so it doesn't + # spuriously wake this greenthread later on. + self._wakeupper.cancel() + self._wakeupper = None + + def wake(self): + """Schedules the blocked thread to be awoken and return + True. If wake has already been called or if there is no + blocked thread, then this call has no effect and returns + False.""" + if self._blocked_thread is not None and self._wakeupper is None: + self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch) + return True + return False + class ZMQSocket(zmq.Socket): def __init__(self, context, socket_type): super(ZMQSocket, self).__init__(context, socket_type) on_state_changed_fd = self.getsockopt(zmq.FD) - self._readable = gevent.event.Event() - self._writable = gevent.event.Event() - try: - # gevent>=1.0 - self._state_event = gevent.hub.get_hub().loop.io( - on_state_changed_fd, gevent.core.READ) - self._state_event.start(self._on_state_changed) - except AttributeError: - # gevent<1.0 - self._state_event = gevent.core.read_event(on_state_changed_fd, - self._on_state_changed, persist=True) + self.__dict__['_eventlet_send_event'] = _BlockedThread() + self.__dict__['_eventlet_recv_event'] = _BlockedThread() + self.__dict__['_eventlet_send_lock'] = _QueueLock() + self.__dict__['_eventlet_recv_lock'] = _QueueLock() + + def event(fd): + # Some events arrived at the zmq socket. This may mean + # there's a message that can be read or there's space for + # a message to be written. + send_wake = self._eventlet_send_event.wake() + recv_wake = self._eventlet_recv_event.wake() + if not send_wake and not recv_wake: + # if no waiting send or recv thread was woken up, then + # force the zmq socket's events to be processed to + # avoid repeated wakeups + events = self.getsockopt(zmq.EVENTS) + if events & zmq.POLLOUT: + self._eventlet_send_event.wake() + if events & zmq.POLLIN: + self._eventlet_recv_event.wake() + + hub = eventlet.hubs.get_hub() + self.__dict__['_eventlet_listener'] = hub.add(hub.READ, + self.getsockopt(FD), + event, + lambda _: None, + lambda: None) + self.__dict__['_eventlet_clock'] = hub.clock def _on_state_changed(self, event=None, _evtype=None): if self.closed: @@ -64,47 +182,89 @@ class ZMQSocket(zmq.Socket): if events & zmq.POLLIN: self._readable.set() - def close(self): - if not self.closed and getattr(self, '_state_event', None): - try: - # gevent>=1.0 - self._state_event.stop() - except AttributeError: - # gevent<1.0 - self._state_event.cancel() - super(ZMQSocket, self).close() + def close(self, linger=None): + super(ZMQSocket, self).close(linger) + if self._eventlet_listener is not None: + eventlet.hubs.get_hub().remove(self._state_event) + self.__dict__['_eventlet_listener'] = None + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() def send(self, data, flags=0, copy=True, track=False): if flags & zmq.NOBLOCK: - return super(ZMQSocket, self).send(data, flags, copy, track) + result = super(ZMQSocket, self).send(data, flags, copy, track) + # Instead of calling both wake methods, could call + # self.getsockopt(EVENTS) which would trigger wakeups if + # needed. + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() + return result + + # TODO: pyzmq will copy the message buffer and create Message + # objects under some circumstances. We could do that work here + # once to avoid doing it every time the send is retried. flags |= zmq.NOBLOCK - while True: - try: - return super(ZMQSocket, self).send(data, flags, copy, track) - except zmq.ZMQError as e: - if e.errno != zmq.EAGAIN: - raise - self._writable.clear() - self._writable.wait() + with self._eventlet_send_lock: + while True: + try: + return super(ZMQSocket, self).send(data, flags, copy, track) + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + self._eventlet_send_event.block() + else: + raise + finally: + # The call to send processes 0mq events and may + # make the socket ready to recv. Wake the next + # receiver. (Could check EVENTS for POLLIN here) + self._eventlet_recv_event.wake() def recv(self, flags=0, copy=True, track=False): if flags & zmq.NOBLOCK: - return super(ZMQSocket, self).recv(flags, copy, track) + msg = super(ZMQSocket, self).recv(flags, copy, track) + # Instead of calling both wake methods, could call + # self.getsockopt(EVENTS) which would trigger wakeups if + # needed. + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() + return msg + + deadline = None + if hasattr(zmq, 'RCVTIMEO'): + sock_timeout = self.getsockopt(zmq.RCVTIMEO) + if sock_timeout == -1: + pass + elif sock_timeout > 0: + deadline = self._eventlet_clock() + sock_timeout / 1000.0 + else: + raise ValueError(sock_timeout) + flags |= zmq.NOBLOCK - while True: - try: - return super(ZMQSocket, self).recv(flags, copy, track) - except zmq.ZMQError as e: - if e.errno != zmq.EAGAIN: - raise - self._readable.clear() - while not self._readable.wait(timeout=10): - events = self.getsockopt(zmq.EVENTS) - if bool(events & zmq.POLLIN): - print("here we go, nobody told me about new messages!") - global STOP_EVERYTHING - STOP_EVERYTHING = True - raise gevent.GreenletExit() + with self._eventlet_recv_lock: + while True: + try: + return super(ZMQSocket, self).recv(flags, copy, track) + except zmq.ZMQError as e: + if e.errno == zmq.EAGAIN: + # zmq in its wisdom decided to reuse EAGAIN for timeouts + if deadline is not None and self._eventlet_clock() > deadline: + e.is_timeout = True + raise + + self._eventlet_recv_event.block(deadline=deadline) + else: + raise + finally: + # The call to recv processes 0mq events and may + # make the socket ready to send. Wake the next + # receiver. (Could check EVENTS for POLLOUT here) + while self._eventlet_send_event.wake(): + events = self.getsockopt(zmq.EVENTS) + if bool(events & zmq.POLLIN): + print("here we go, nobody told me about new messages!") + global STOP_EVERYTHING + STOP_EVERYTHING = True + raise greenlet.GreenletExit() zmq_context = zmq.Context() @@ -124,11 +284,11 @@ def server(): socket.send(msg) cnt.responded += 1 - gevent.spawn(responder) + eventlet.spawn(responder) while not STOP_EVERYTHING: print("cnt.responded=", cnt.responded) - gevent.sleep(0.5) + eventlet.sleep(0.5) def client(): @@ -149,17 +309,17 @@ def client(): def sendmsg(): while not STOP_EVERYTHING: - socket.send('', flags=zmq.SNDMORE) - socket.send('hello') + socket.send(b'', flags=zmq.SNDMORE) + socket.send(b'hello') cnt.send += 1 - gevent.sleep(0) + eventlet.sleep(0) - gevent.spawn(recvmsg) - gevent.spawn(sendmsg) + eventlet.spawn(recvmsg) + eventlet.spawn(sendmsg) while not STOP_EVERYTHING: print("cnt.recv=", cnt.recv, "cnt.send=", cnt.send) - gevent.sleep(0.5) + eventlet.sleep(0.5) -gevent.spawn(server) +eventlet.spawn(server) client() diff --git a/tox.ini b/tox.ini index 96bace8..a12cbc6 100644 --- a/tox.ini +++ b/tox.ini @@ -11,6 +11,6 @@ commands = passenv = ZPC_TEST_TIME_FACTOR [flake8] -ignore = E501,E128 +ignore = E501,E128,E129,F841,W504 filename = *.py,zerorpc exclude = tests,.git,dist,doc,*.egg-info,__pycache__,setup.py diff --git a/zerorpc/channel.py b/zerorpc/channel.py index ad21c27..bd376ec 100644 --- a/zerorpc/channel.py +++ b/zerorpc/channel.py @@ -22,11 +22,7 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. -import gevent.pool -import gevent.queue -import gevent.event -import gevent.local -import gevent.lock +import eventlet import logging from .exceptions import TimeoutExpired @@ -43,8 +39,8 @@ class ChannelMultiplexer(ChannelBase): self._channel_dispatcher_task = None self._broadcast_queue = None if events.recv_is_supported and not ignore_broadcast: - self._broadcast_queue = gevent.queue.Queue(maxsize=1) - self._channel_dispatcher_task = gevent.spawn( + self._broadcast_queue = eventlet.queue.Queue(maxsize=1) + self._channel_dispatcher_task = eventlet.spawn( self._channel_dispatcher) @property @@ -98,7 +94,7 @@ class ChannelMultiplexer(ChannelBase): def channel(self, from_event=None): if self._channel_dispatcher_task is None: - self._channel_dispatcher_task = gevent.spawn( + self._channel_dispatcher_task = eventlet.spawn( self._channel_dispatcher) return Channel(self, from_event) @@ -117,7 +113,7 @@ class Channel(ChannelBase): self._multiplexer = multiplexer self._channel_id = None self._zmqid = None - self._queue = gevent.queue.Queue(maxsize=1) + self._queue = eventlet.queue.Queue(maxsize=1) if from_event is not None: self._channel_id = from_event.header[u'message_id'] self._zmqid = from_event.identity @@ -156,7 +152,7 @@ class Channel(ChannelBase): def recv(self, timeout=None): try: event = self._queue.get(timeout=timeout) - except gevent.queue.Empty: + except eventlet.queue.Empty: raise TimeoutExpired(timeout) return event @@ -172,11 +168,11 @@ class BufferedChannel(ChannelBase): self._input_queue_size = inqueue_size self._remote_queue_open_slots = 1 self._input_queue_reserved = 1 - self._remote_can_recv = gevent.event.Event() - self._input_queue = gevent.queue.Queue() + self._remote_can_recv = eventlet.event.Event() + self._input_queue = eventlet.queue.Queue() self._verbose = False self._on_close_if = None - self._recv_task = gevent.spawn(self._recver) + self._recv_task = eventlet.spawn(self._recver) @property def recv_is_supported(self): @@ -211,7 +207,7 @@ class BufferedChannel(ChannelBase): except Exception: logger.exception('gevent_zerorpc.BufferedChannel._recver') if self._remote_queue_open_slots > 0: - self._remote_can_recv.set() + self._remote_can_recv.send() elif self._input_queue.qsize() == self._input_queue_size: raise RuntimeError( 'BufferedChannel, queue overflow on event:', event) @@ -227,12 +223,12 @@ class BufferedChannel(ChannelBase): def emit_event(self, event, timeout=None): if self._remote_queue_open_slots == 0: - self._remote_can_recv.clear() + # self._remote_can_recv.reset() # TODO Check if the result is equivalent to gevent.clear() self._remote_can_recv.wait(timeout=timeout) self._remote_queue_open_slots -= 1 try: self._channel.emit_event(event) - except: + except Exception: self._remote_queue_open_slots += 1 raise @@ -253,7 +249,7 @@ class BufferedChannel(ChannelBase): try: event = self._input_queue.get(timeout=timeout) - except gevent.queue.Empty: + except eventlet.queue.Empty: raise TimeoutExpired(timeout) self._input_queue_reserved -= 1 diff --git a/zerorpc/context.py b/zerorpc/context.py index debce26..6e20720 100644 --- a/zerorpc/context.py +++ b/zerorpc/context.py @@ -29,7 +29,7 @@ from future.utils import tobytes import uuid import random -from . import gevent_zmq as zmq +from eventlet.green import zmq class Context(zmq.Context): diff --git a/zerorpc/core.py b/zerorpc/core.py index 9dbf5cc..ea89f36 100644 --- a/zerorpc/core.py +++ b/zerorpc/core.py @@ -30,13 +30,9 @@ from future.utils import iteritems import sys import traceback -import gevent.pool -import gevent.queue -import gevent.event -import gevent.local -import gevent.lock +import eventlet -from . import gevent_zmq as zmq +from eventlet.green import zmq from .exceptions import TimeoutExpired, RemoteError, LostRemote from .channel import ChannelMultiplexer, BufferedChannel from .socket import SocketBase @@ -52,7 +48,7 @@ logger = getLogger(__name__) class ServerBase(object): def __init__(self, channel, methods=None, name=None, context=None, - pool_size=None, heartbeat=5): + pool_size=1000, heartbeat=5): self._multiplexer = ChannelMultiplexer(channel) if methods is None: @@ -60,7 +56,7 @@ class ServerBase(object): self._context = context or Context.get_instance() self._name = name or self._extract_name() - self._task_pool = gevent.pool.Pool(size=pool_size) + self._task_pool = eventlet.greenpool.GreenPool(size=pool_size) self._acceptor_task = None self._methods = self._filter_methods(ServerBase, self, methods) @@ -171,12 +167,12 @@ class ServerBase(object): self._task_pool.spawn(self._async_task, initial_event) def run(self): - self._acceptor_task = gevent.spawn(self._acceptor) + self._acceptor_task = eventlet.spawn(self._acceptor) try: - self._acceptor_task.get() + self._acceptor_task.wait() finally: self.stop() - self._task_pool.join(raise_error=True) + self._task_pool.waitall() def stop(self): if self._acceptor_task is not None: @@ -272,10 +268,8 @@ class ClientBase(object): kargs.get('async_', False) is False): return self._process_response(request_event, bufchan, timeout) - async_result = gevent.event.AsyncResult() - gevent.spawn(self._process_response, request_event, bufchan, - timeout).link(async_result) - return async_result + return eventlet.spawn(self._process_response, request_event, bufchan, + timeout) def __getattr__(self, method): return lambda *args, **kargs: self(method, *args, **kargs) @@ -283,7 +277,7 @@ class ClientBase(object): class Server(SocketBase, ServerBase): - def __init__(self, methods=None, name=None, context=None, pool_size=None, + def __init__(self, methods=None, name=None, context=None, pool_size=1000, heartbeat=5): SocketBase.__init__(self, zmq.ROUTER, context) if methods is None: @@ -368,15 +362,15 @@ class Puller(SocketBase): del exc_infos def run(self): - self._receiver_task = gevent.spawn(self._receiver) + self._receiver_task = eventlet.spawn(self._receiver) try: - self._receiver_task.get() + self._receiver_task.wait() finally: self._receiver_task = None def stop(self): if self._receiver_task is not None: - self._receiver_task.kill(block=False) + self._receiver_task.kill() class Publisher(Pusher): diff --git a/zerorpc/events.py b/zerorpc/events.py index f87d0b5..ce97ad6 100644 --- a/zerorpc/events.py +++ b/zerorpc/events.py @@ -28,15 +28,12 @@ from builtins import str from builtins import range import msgpack -import gevent.pool -import gevent.queue -import gevent.event -import gevent.local -import gevent.lock +import eventlet +import greenlet import logging import sys -from . import gevent_zmq as zmq +from eventlet.green import zmq from .exceptions import TimeoutExpired from .context import Context from .channel_base import ChannelBase @@ -50,8 +47,8 @@ else: return frame.buffer # gevent <= 1.1.0.rc5 is missing the Python3 __next__ method. -if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'): - setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next) +# if sys.version_info >= (3, 0) and gevent.version_info <= (1, 1, 0, 'rc', '5'): +# setattr(gevent.queue.Channel, '__next__', gevent.queue.Channel.next) logger = logging.getLogger(__name__) @@ -67,20 +64,20 @@ class SequentialSender(object): for i in range(len(parts) - 1): try: self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE) - except (gevent.GreenletExit, gevent.Timeout) as e: + except (greenlet.GreenletExit, eventlet.Timeout) as e: if i == 0: raise self._socket.send(parts[i], copy=False, flags=zmq.SNDMORE) try: self._socket.send(parts[-1], copy=False) - except (gevent.GreenletExit, gevent.Timeout) as e: + except (greenlet.GreenletExit, eventlet.Timeout) as e: self._socket.send(parts[-1], copy=False) if e: raise e def __call__(self, parts, timeout=None): if timeout: - with gevent.Timeout(timeout): + with eventlet.Timeout(timeout): self._send(parts) else: self._send(parts) @@ -97,7 +94,7 @@ class SequentialReceiver(object): while True: try: part = self._socket.recv(copy=False) - except (gevent.GreenletExit, gevent.Timeout) as e: + except (greenlet.GreenletExit, eventlet.Timeout) as e: if len(parts) == 0: raise part = self._socket.recv(copy=False) @@ -110,7 +107,7 @@ class SequentialReceiver(object): def __call__(self, timeout=None): if timeout: - with gevent.Timeout(timeout): + with eventlet.Timeout(timeout): return self._recv() else: return self._recv() @@ -120,21 +117,22 @@ class Sender(SequentialSender): def __init__(self, socket): self._socket = socket - self._send_queue = gevent.queue.Channel() - self._send_task = gevent.spawn(self._sender) + self._send_queue = eventlet.queue.Queue(maxsize=0) # Channel + self._send_task = eventlet.spawn(self._sender) def close(self): if self._send_task: self._send_task.kill() def _sender(self): - for parts in self._send_queue: + while True: + parts = self._send_queue.get() super(Sender, self)._send(parts) def __call__(self, parts, timeout=None): try: self._send_queue.put(parts, timeout=timeout) - except gevent.queue.Full: + except eventlet.queue.Full: raise TimeoutExpired(timeout) @@ -142,8 +140,8 @@ class Receiver(SequentialReceiver): def __init__(self, socket): self._socket = socket - self._recv_queue = gevent.queue.Channel() - self._recv_task = gevent.spawn(self._recver) + self._recv_queue = eventlet.queue.Queue(maxsize=0) # Channel + self._recv_task = eventlet.spawn(self._recver) def close(self): if self._recv_task: @@ -158,7 +156,7 @@ class Receiver(SequentialReceiver): def __call__(self, timeout=None): try: return self._recv_queue.get(timeout=timeout) - except gevent.queue.Empty: + except eventlet.queue.Empty: raise TimeoutExpired(timeout) @@ -281,11 +279,11 @@ class Events(ChannelBase): def close(self): try: self._send.close() - except (AttributeError, TypeError, gevent.GreenletExit): + except (AttributeError, TypeError, greenlet.GreenletExit): pass try: self._recv.close() - except (AttributeError, TypeError, gevent.GreenletExit): + except (AttributeError, TypeError, greenlet.GreenletExit): pass self._socket.close() diff --git a/zerorpc/gevent_zmq.py b/zerorpc/gevent_zmq.py index 9430695..bac9a48 100644 --- a/zerorpc/gevent_zmq.py +++ b/zerorpc/gevent_zmq.py @@ -22,10 +22,11 @@ # OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE # SOFTWARE. # -# Based on https://github.com/traviscline/gevent-zeromq/ +# Based on https://github.com/eventlet/eventlet/blob/37fca06d7466a698cf53a1ae6e4b3d840a3ced7a/eventlet/green/zmq.py # We want to act like zmq from zmq import * # noqa +from zmq.constants import * # noqa # Explicit import to please flake8 from zmq import ZMQError @@ -33,13 +34,112 @@ from zmq import ZMQError # A way to access original zmq import zmq as _zmq -import gevent.event -import gevent.core +import eventlet +import eventlet.hubs +from eventlet.support import greenlets as greenlet import errno from logging import getLogger +from collections import deque logger = getLogger(__name__) +class LockReleaseError(Exception): + pass + +class _QueueLock(object): + """A Lock that can be acquired by at most one thread. Any other + thread calling acquire will be blocked in a queue. When release + is called, the threads are awoken in the order they blocked, + one at a time. This lock can be required recursively by the same + thread.""" + + def __init__(self): + self._waiters = deque() + self._count = 0 + self._holder = None + self._hub = eventlet.hubs.get_hub() + + def __nonzero__(self): + return bool(self._count) + + __bool__ = __nonzero__ + + def __enter__(self): + self.acquire() + + def __exit__(self, type, value, traceback): + self.release() + + def acquire(self): + current = greenlet.getcurrent() + if (self._waiters or self._count > 0) and self._holder is not current: + # block until lock is free + self._waiters.append(current) + self._hub.switch() + w = self._waiters.popleft() + + assert w is current, 'Waiting threads woken out of order' + assert self._count == 0, 'After waking a thread, the lock must be unacquired' + + self._holder = current + self._count += 1 + + def release(self): + if self._count <= 0: + raise LockReleaseError("Cannot release unacquired lock") + + self._count -= 1 + if self._count == 0: + self._holder = None + if self._waiters: + # wake next + self._hub.schedule_call_global(0, self._waiters[0].switch) + +class _BlockedThread(object): + """Is either empty, or represents a single blocked thread that + blocked itself by calling the block() method. The thread can be + awoken by calling wake(). Wake() can be called multiple times and + all but the first call will have no effect.""" + + def __init__(self): + self._blocked_thread = None + self._wakeupper = None + self._hub = eventlet.hubs.get_hub() + + def __nonzero__(self): + return self._blocked_thread is not None + + __bool__ = __nonzero__ + + def block(self, deadline=None): + if self._blocked_thread is not None: + raise Exception("Cannot block more than one thread on one BlockedThread") + self._blocked_thread = greenlet.getcurrent() + + if deadline is not None: + self._hub.schedule_call_local(deadline - self._hub.clock(), self.wake) + + try: + self._hub.switch() + finally: + self._blocked_thread = None + # cleanup the wakeup task + if self._wakeupper is not None: + # Important to cancel the wakeup task so it doesn't + # spuriously wake this greenthread later on. + self._wakeupper.cancel() + self._wakeupper = None + + def wake(self): + """Schedules the blocked thread to be awoken and return + True. If wake has already been called or if there is no + blocked thread, then this call has no effect and returns + False.""" + if self._blocked_thread is not None and self._wakeupper is None: + self._wakeupper = self._hub.schedule_call_global(0, self._blocked_thread.switch) + return True + return False + class Context(_zmq.Context): @@ -57,47 +157,43 @@ class Socket(_zmq.Socket): # NOTE: pyzmq 13.0.0 messed up with setattr (they turned it into a # non-op) and you can't assign attributes normally anymore, hence the # tricks with self.__dict__ here - self.__dict__["_readable"] = gevent.event.Event() - self.__dict__["_writable"] = gevent.event.Event() - try: - # gevent>=1.0 - self.__dict__["_state_event"] = gevent.hub.get_hub().loop.io( - on_state_changed_fd, gevent.core.READ) - self._state_event.start(self._on_state_changed) - except AttributeError: - # gevent<1.0 - self.__dict__["_state_event"] = \ - gevent.core.read_event(on_state_changed_fd, - self._on_state_changed, persist=True) - - def _on_state_changed(self, event=None, _evtype=None): - if self.closed: - self._writable.set() - self._readable.set() - return - while True: - try: + self.__dict__['_eventlet_send_event'] = _BlockedThread() + self.__dict__['_eventlet_recv_event'] = _BlockedThread() + self.__dict__['_eventlet_send_lock'] = _QueueLock() + self.__dict__['_eventlet_recv_lock'] = _QueueLock() + + def event(fd): + # Some events arrived at the zmq socket. This may mean + # there's a message that can be read or there's space for + # a message to be written. + send_wake = self._eventlet_send_event.wake() + recv_wake = self._eventlet_recv_event.wake() + if not send_wake and not recv_wake: + # if no waiting send or recv thread was woken up, then + # force the zmq socket's events to be processed to + # avoid repeated wakeups events = self.getsockopt(_zmq.EVENTS) - break - except ZMQError as e: - if e.errno not in (_zmq.EAGAIN, errno.EINTR): - raise + if events & _zmq.POLLOUT: + self._eventlet_send_event.wake() + if events & _zmq.POLLIN: + self._eventlet_recv_event.wake() - if events & _zmq.POLLOUT: - self._writable.set() - if events & _zmq.POLLIN: - self._readable.set() + hub = eventlet.hubs.get_hub() + self.__dict__['_eventlet_listener'] = hub.add(hub.READ, + self.getsockopt(_zmq.FD), + event, + lambda _: None, + lambda: None) + self.__dict__['_eventlet_clock'] = hub.clock - def close(self): - if not self.closed and getattr(self, '_state_event', None): - try: - # gevent>=1.0 - self._state_event.stop() - except AttributeError: - # gevent<1.0 - self._state_event.cancel() - super(Socket, self).close() + def close(self, linger=None): + super(Socket, self).close(linger) + if self._eventlet_listener is not None: + eventlet.hubs.get_hub().remove(self._state_event) + self.__dict__['_eventlet_listener'] = None + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() def connect(self, *args, **kwargs): while True: @@ -109,80 +205,71 @@ class Socket(_zmq.Socket): def send(self, data, flags=0, copy=True, track=False): if flags & _zmq.NOBLOCK: - return super(Socket, self).send(data, flags, copy, track) + result = super(Socket, self).send(data, flags, copy, track) + # Instead of calling both wake methods, could call + # self.getsockopt(EVENTS) which would trigger wakeups if + # needed. + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() + return result + + # TODO: pyzmq will copy the message buffer and create Message + # objects under some circumstances. We could do that work here + # once to avoid doing it every time the send is retried. flags |= _zmq.NOBLOCK - while True: - try: - msg = super(Socket, self).send(data, flags, copy, track) - # The following call, force polling the state of the zmq socket - # (POLLIN and/or POLLOUT). It seems that a POLLIN event is often - # missed when the socket is used to send at the same time, - # forcing to poll at this exact moment seems to reduce the - # latencies when a POLLIN event is missed. The drawback is a - # reduced throughput (roughly 8.3%) in exchange of a normal - # concurrency. In other hand, without the following line, you - # loose 90% of the performances as soon as there is simultaneous - # send and recv on the socket. - self._on_state_changed() - return msg - except _zmq.ZMQError as e: - if e.errno not in (_zmq.EAGAIN, errno.EINTR): - raise - self._writable.clear() - # The following sleep(0) force gevent to switch out to another - # coroutine and seems to refresh the notion of time that gevent may - # have. This definitively eliminate the gevent bug that can trigger - # a timeout too soon under heavy load. In theory it will incur more - # CPU usage, but in practice it balance even with the extra CPU used - # when the timeout triggers too soon in the following loop. So for - # the same CPU load, you get a better throughput (roughly 18.75%). - gevent.sleep(0) - while not self._writable.wait(timeout=1): + with self._eventlet_send_lock: + while True: try: - if self.getsockopt(_zmq.EVENTS) & _zmq.POLLOUT: - logger.error("/!\\ gevent_zeromq BUG /!\\ " - "catching up after missing event (SEND) /!\\") - break + return super(Socket, self).send(data, flags, copy, track) except ZMQError as e: - if e.errno not in (_zmq.EAGAIN, errno.EINTR): + if e.errno == _zmq.EAGAIN: + self._eventlet_send_event.block() + else: raise + finally: + # The call to send processes 0mq events and may + # make the socket ready to recv. Wake the next + # receiver. (Could check EVENTS for POLLIN here) + self._eventlet_recv_event.wake() def recv(self, flags=0, copy=True, track=False): if flags & _zmq.NOBLOCK: - return super(Socket, self).recv(flags, copy, track) + msg = super(Socket, self).recv(flags, copy, track) + # Instead of calling both wake methods, could call + # self.getsockopt(EVENTS) which would trigger wakeups if + # needed. + self._eventlet_send_event.wake() + self._eventlet_recv_event.wake() + return msg + + deadline = None + if hasattr(_zmq, 'RCVTIMEO'): + sock_timeout = self.getsockopt(_zmq.RCVTIMEO) + if sock_timeout == -1: + pass + elif sock_timeout > 0: + deadline = self._eventlet_clock() + sock_timeout / 1000.0 + else: + raise ValueError(sock_timeout) + flags |= _zmq.NOBLOCK - while True: - try: - msg = super(Socket, self).recv(flags, copy, track) - # The following call, force polling the state of the zmq socket - # (POLLIN and/or POLLOUT). It seems that a POLLOUT event is - # often missed when the socket is used to receive at the same - # time, forcing to poll at this exact moment seems to reduce the - # latencies when a POLLOUT event is missed. The drawback is a - # reduced throughput (roughly 8.3%) in exchange of a normal - # concurrency. In other hand, without the following line, you - # loose 90% of the performances as soon as there is simultaneous - # send and recv on the socket. - self._on_state_changed() - return msg - except _zmq.ZMQError as e: - if e.errno not in (_zmq.EAGAIN, errno.EINTR): - raise - self._readable.clear() - # The following sleep(0) force gevent to switch out to another - # coroutine and seems to refresh the notion of time that gevent may - # have. This definitively eliminate the gevent bug that can trigger - # a timeout too soon under heavy load. In theory it will incur more - # CPU usage, but in practice it balance even with the extra CPU used - # when the timeout triggers too soon in the following loop. So for - # the same CPU load, you get a better throughput (roughly 18.75%). - gevent.sleep(0) - while not self._readable.wait(timeout=1): + with self._eventlet_recv_lock: + while True: try: - if self.getsockopt(_zmq.EVENTS) & _zmq.POLLIN: - logger.error("/!\\ gevent_zeromq BUG /!\\ " - "catching up after missing event (RECV) /!\\") - break - except ZMQError as e: - if e.errno not in (_zmq.EAGAIN, errno.EINTR): + return super(Socket, self).recv(flags, copy, track) + except _zmq.ZMQError as e: + if e.errno == _zmq.EAGAIN: + # zmq in its wisdom decided to reuse EAGAIN for timeouts + if deadline is not None and self._eventlet_clock() > deadline: + e.is_timeout = True + raise + + self._eventlet_recv_event.block(deadline=deadline) + else: raise + finally: + # The call to recv processes 0mq events and may + # make the socket ready to send. Wake the next + # receiver. (Could check EVENTS for POLLOUT here) + self._eventlet_send_event.wake() + diff --git a/zerorpc/heartbeat.py b/zerorpc/heartbeat.py index 23b974d..daa7d50 100644 --- a/zerorpc/heartbeat.py +++ b/zerorpc/heartbeat.py @@ -24,11 +24,7 @@ import time -import gevent.pool -import gevent.queue -import gevent.event -import gevent.local -import gevent.lock +import eventlet from .exceptions import LostRemote, TimeoutExpired from .channel_base import ChannelBase @@ -40,12 +36,12 @@ class HeartBeatOnChannel(ChannelBase): self._closed = False self._channel = channel self._heartbeat_freq = freq - self._input_queue = gevent.queue.Channel() + self._input_queue = eventlet.queue.Queue(maxsize=0) # Channel self._remote_last_hb = None self._lost_remote = False - self._recv_task = gevent.spawn(self._recver) + self._recv_task = eventlet.spawn(self._recver) self._heartbeat_task = None - self._parent_coroutine = gevent.getcurrent() + self._parent_coroutine = eventlet.getcurrent() self._compat_v2 = None if not passive: self._start_heartbeat() @@ -72,20 +68,20 @@ class HeartBeatOnChannel(ChannelBase): def _heartbeat(self): while True: - gevent.sleep(self._heartbeat_freq) + eventlet.sleep(self._heartbeat_freq) if self._remote_last_hb is None: self._remote_last_hb = time.time() if time.time() > self._remote_last_hb + self._heartbeat_freq * 2: self._lost_remote = True if not self._closed: - gevent.kill(self._parent_coroutine, + eventlet.kill(self._parent_coroutine, self._lost_remote_exception()) break self._channel.emit(u'_zpc_hb', (0,)) # 0 -> compat with protocol v2 def _start_heartbeat(self): if self._heartbeat_task is None and self._heartbeat_freq is not None and not self._closed: - self._heartbeat_task = gevent.spawn(self._heartbeat) + self._heartbeat_task = eventlet.spawn(self._heartbeat) def _recver(self): while True: @@ -120,7 +116,7 @@ class HeartBeatOnChannel(ChannelBase): raise self._lost_remote_exception() try: return self._input_queue.get(timeout=timeout) - except gevent.queue.Empty: + except eventlet.queue.Empty: raise TimeoutExpired(timeout) @property -- 2.25.1