From ec72e722a5628568fe7ba25385d31f8ebcbb8130 Mon Sep 17 00:00:00 2001 From: Alyson Deives Pereira Date: Fri, 29 Jul 2022 15:22:30 -0300 Subject: [PATCH] Allow usage of kwargs Based on https://github.com/0rpc/zerorpc-python/commit/674678de3e4744f86fd186582bc4b31daa86e785 Signed-off-by: Alyson Deives Pereira --- tests/test_events.py | 14 ++++++------ tests/test_kwargs.py | 50 +++++++++++++++++++++++++++++++++++++++++ zerorpc/channel.py | 12 +++++----- zerorpc/channel_base.py | 6 ++--- zerorpc/core.py | 34 ++++++++++++++-------------- zerorpc/events.py | 20 +++++++++++------ zerorpc/heartbeat.py | 4 ++-- zerorpc/patterns.py | 4 ++-- 8 files changed, 100 insertions(+), 44 deletions(-) create mode 100644 tests/test_kwargs.py diff --git a/tests/test_events.py b/tests/test_events.py index 7acf98e..c9e1c30 100644 --- a/tests/test_events.py +++ b/tests/test_events.py @@ -47,25 +47,25 @@ def test_context(): def test_event(): context = MokupContext() - event = zerorpc.Event('mylittleevent', (None,), context=context) + event = zerorpc.Event('mylittleevent', (None,), {}, context=context) print(event) assert event.name == 'mylittleevent' assert event.header['message_id'] == 0 assert event.args == (None,) - event = zerorpc.Event('mylittleevent2', ('42',), context=context) + event = zerorpc.Event('mylittleevent2', ('42',), {}, context=context) print(event) assert event.name == 'mylittleevent2' assert event.header['message_id'] == 1 assert event.args == ('42',) - event = zerorpc.Event('mylittleevent3', ('a', 42), context=context) + event = zerorpc.Event('mylittleevent3', ('a', 42), {}, context=context) print(event) assert event.name == 'mylittleevent3' assert event.header['message_id'] == 2 assert event.args == ('a', 42) - event = zerorpc.Event('mylittleevent4', ('', 21), context=context) + event = zerorpc.Event('mylittleevent4', ('', 21), {}, context=context) print(event) assert event.name == 'mylittleevent4' assert event.header['message_id'] == 3 @@ -79,14 +79,14 @@ def test_event(): assert unpacked.header['message_id'] == 3 assert list(unpacked.args) == ['', 21] - event = zerorpc.Event('mylittleevent5', ('c', 24, True), + event = zerorpc.Event('mylittleevent5', ('c', 24, True), {}, header={'lol': 'rofl'}, context=None) print(event) assert event.name == 'mylittleevent5' assert event.header['lol'] == 'rofl' assert event.args == ('c', 24, True) - event = zerorpc.Event('mod', (42,), context=context) + event = zerorpc.Event('mod', (42,), {}, context=context) print(event) assert event.name == 'mod' assert event.header['message_id'] == 4 @@ -177,7 +177,7 @@ def test_events_push_pull(): def test_msgpack(): context = zerorpc.Context() - event = zerorpc.Event(u'myevent', (u'a',), context=context) + event = zerorpc.Event(u'myevent', (u'a',), {}, context=context) print(event) # note here that str is an unicode string in all Python version (thanks to # the builtin str import). diff --git a/tests/test_kwargs.py b/tests/test_kwargs.py new file mode 100644 index 0000000..e3d7009 --- /dev/null +++ b/tests/test_kwargs.py @@ -0,0 +1,50 @@ +from __future__ import absolute_import +import eventlet + +import zerorpc +from .testutils import teardown, random_ipc_endpoint + +def test_client_connect(): + endpoint = random_ipc_endpoint() + + class MySrv(zerorpc.Server): + + def echo(self, *args, **kwargs): + return args, kwargs + + srv = MySrv() + srv.bind(endpoint) + eventlet.spawn(srv.run) + + client = zerorpc.Client() + client.connect(endpoint) + + args = 1, 2, 3 + kwargs = {'a': 7, 'b': 8} + res = client.echo(*args, **kwargs) + assert len(res) == 2 + assert res[0] == args + assert len(res[1]) == 3 + assert 'a' in res[1] and 'b' in res[1] + +def test_client_quick_connect(): + endpoint = random_ipc_endpoint() + + class MySrv(zerorpc.Server): + + def echo(self, *args, **kwargs): + return args, kwargs + + srv = MySrv() + srv.bind(endpoint) + eventlet.spawn(srv.run) + + client = zerorpc.Client(endpoint) + + args = 1, 2, 3 + kwargs = {'a': 7, 'b': 8} + res = client.echo(*args, **kwargs) + assert len(res) == 2 + assert res[0] == args + assert len(res[1]) == 3 + assert 'a' in res[1] and 'b' in res[1] diff --git a/zerorpc/channel.py b/zerorpc/channel.py index bd376ec..df77bc0 100644 --- a/zerorpc/channel.py +++ b/zerorpc/channel.py @@ -55,8 +55,8 @@ class ChannelMultiplexer(ChannelBase): if self._channel_dispatcher_task: self._channel_dispatcher_task.kill() - def new_event(self, name, args, xheader=None): - return self._events.new_event(name, args, xheader) + def new_event(self, name, args, kwargs=None, xheader=None): + return self._events.new_event(name, args, kwargs, xheader) def emit_event(self, event, timeout=None): return self._events.emit_event(event, timeout) @@ -135,8 +135,8 @@ class Channel(ChannelBase): logger.debug('-x- closed channel %s', self._channel_id) self._channel_id = None - def new_event(self, name, args, xheader=None): - event = self._multiplexer.new_event(name, args, xheader) + def new_event(self, name, args, kwargs=None, xheader=None): + event = self._multiplexer.new_event(name, args, kwargs, xheader) if self._channel_id is None: self._channel_id = event.header[u'message_id'] self._multiplexer._active_channels[self._channel_id] = self @@ -218,8 +218,8 @@ class BufferedChannel(ChannelBase): self.close() return - def new_event(self, name, args, xheader=None): - return self._channel.new_event(name, args, xheader) + def new_event(self, name, args, kwargs=None, xheader=None): + return self._channel.new_event(name, args, kwargs, xheader) def emit_event(self, event, timeout=None): if self._remote_queue_open_slots == 0: diff --git a/zerorpc/channel_base.py b/zerorpc/channel_base.py index a391b08..35fc695 100644 --- a/zerorpc/channel_base.py +++ b/zerorpc/channel_base.py @@ -40,14 +40,14 @@ class ChannelBase(object): def close(self): raise NotImplementedError() - def new_event(self, name, args, xheader=None): + def new_event(self, name, args, kwargs=None, xheader=None): raise NotImplementedError() def emit_event(self, event, timeout=None): raise NotImplementedError() - def emit(self, name, args, xheader=None, timeout=None): - event = self.new_event(name, args, xheader) + def emit(self, name, args, kwargs=None, xheader=None, timeout=None): + event = self.new_event(name, args, kwargs, xheader) return self.emit_event(event, timeout) def recv(self, timeout=None): diff --git a/zerorpc/core.py b/zerorpc/core.py index 90b1aae..dc2b484 100644 --- a/zerorpc/core.py +++ b/zerorpc/core.py @@ -117,10 +117,10 @@ class ServerBase(object): lambda m: self._methods[m]._zerorpc_args() self._methods['_zerorpc_inspect'] = self._zerorpc_inspect - def __call__(self, method, *args): + def __call__(self, method, *args, **kwargs): if method not in self._methods: raise NameError(method) - return self._methods[method](*args) + return self._methods[method](*args, **kwargs) def _print_traceback(self, protocol_v1, exc_infos): logger.exception('') @@ -233,7 +233,7 @@ class ClientBase(object): return pattern.process_answer(self._context, bufchan, request_event, reply_event, self._handle_remote_error) - def __call__(self, method, *args, **kargs): + def __call__(self, method, *args, **kwargs): # here `method` is either a string of bytes or an unicode string in # Python2 and Python3. Python2: str aka a byte string containing ASCII # (unless the user explicitly provide an unicode string). Python3: str @@ -251,28 +251,28 @@ class ClientBase(object): if isinstance(method, bytes): method = method.decode('utf-8') - timeout = kargs.get('timeout', self._timeout) + timeout = kwargs.pop('timeout_', self._timeout) channel = self._multiplexer.channel() hbchan = HeartBeatOnChannel(channel, freq=self._heartbeat_freq, passive=self._passive_heartbeat) - bufchan = BufferedChannel(hbchan, inqueue_size=kargs.get('slots', 100)) + bufchan = BufferedChannel(hbchan, inqueue_size=kwargs.pop('slots_', 100)) xheader = self._context.hook_get_task_context() - request_event = bufchan.new_event(method, args, xheader) + request_event = bufchan.new_event(method, args, kwargs, xheader) self._context.hook_client_before_request(request_event) bufchan.emit_event(request_event) # In python 3.7, "async" is a reserved keyword, clients should now use # "async_": support both for the time being - if (kargs.get('async', False) is False and - kargs.get('async_', False) is False): + async_ = kwargs.pop('async_', False) + if not async_: return self._process_response(request_event, bufchan, timeout) return eventlet.spawn(self._process_response, request_event, bufchan, timeout) def __getattr__(self, method): - return lambda *args, **kargs: self(method, *args, **kargs) + return lambda *args, **kwargs: self(method, *args, **kwargs) class Server(SocketBase, ServerBase): @@ -313,12 +313,12 @@ class Pusher(SocketBase): def __init__(self, context=None, zmq_socket=zmq.PUSH): super(Pusher, self).__init__(zmq_socket, context=context) - def __call__(self, method, *args): - self._events.emit(method, args, + def __call__(self, method, *args, **kwargs): + self._events.emit(method, args, kwargs, self._context.hook_get_task_context()) def __getattr__(self, method): - return lambda *args: self(method, *args) + return lambda *args, **kwargs: self(method, *args, **kwargs) class Puller(SocketBase): @@ -336,10 +336,10 @@ class Puller(SocketBase): self.stop() super(Puller, self).close() - def __call__(self, method, *args): + def __call__(self, method, *args, **kwargs): if method not in self._methods: raise NameError(method) - return self._methods[method](*args) + return self._methods[method](*args, **kwargs) def _receiver(self): while True: @@ -349,7 +349,7 @@ class Puller(SocketBase): raise NameError(event.name) self._context.hook_load_task_context(event.header) self._context.hook_server_before_exec(event) - self._methods[event.name](*event.args) + self._methods[event.name](*event.args, **event.kwargs) # In Push/Pull their is no reply to send, hence None for the # reply_event argument self._context.hook_server_after_exec(event, None) @@ -422,7 +422,7 @@ def fork_task_context(functor, context=None): context = context or Context.get_instance() xheader = context.hook_get_task_context() - def wrapped(*args, **kargs): + def wrapped(*args, **kwargs): context.hook_load_task_context(xheader) - return functor(*args, **kargs) + return functor(*args, **kwargs) return wrapped diff --git a/zerorpc/events.py b/zerorpc/events.py index 295220e..f24cc2f 100644 --- a/zerorpc/events.py +++ b/zerorpc/events.py @@ -162,15 +162,16 @@ class Receiver(SequentialReceiver): class Event(object): - __slots__ = ['_name', '_args', '_header', '_identity'] + __slots__ = ['_name', '_args', '_kwargs', '_header', '_identity'] # protocol details: # - `name` and `header` keys must be unicode strings. # - `message_id` and 'response_to' values are opaque bytes string. # - `v' value is an integer. - def __init__(self, name, args, context, header=None): + def __init__(self, name, args, kwargs, context, header=None): self._name = name self._args = args + self._kwargs = kwargs or {} if header is None: self._header = {u'message_id': context.new_msgid(), u'v': 3} else: @@ -193,6 +194,10 @@ class Event(object): def args(self): return self._args + @property + def kwargs(self): + return self._kwargs + @property def identity(self): return self._identity @@ -202,7 +207,7 @@ class Event(object): self._identity = v def pack(self, encoder=None): - payload = (self._header, self._name, self._args) + payload = (self._header, self._name, self._args, self._kwargs) r = msgpack.Packer(use_bin_type=True, default=encoder).pack(payload) return r @@ -213,7 +218,7 @@ class Event(object): unpacked_msg = unpacker.unpack() try: - (header, name, args) = unpacked_msg + (header, name, args, kwargs) = unpacked_msg except Exception as e: raise Exception('invalid msg format "{0}": {1}'.format( unpacked_msg, e)) @@ -222,7 +227,7 @@ class Event(object): if not isinstance(header, dict): header = {} - return Event(name, args, None, header) + return Event(name, args, kwargs, None, header) def __str__(self, ignore_args=False): if ignore_args: @@ -237,6 +242,7 @@ class Event(object): identity = ', '.join(repr(x.bytes) for x in self._identity) return '<{0}> {1} {2} {3}'.format(identity, self._name, self._header, args) + # TODO include kwargs? return '{0} {1} {2}'.format(self._name, self._header, args) @@ -333,8 +339,8 @@ class Events(ChannelBase): logger.debug('disconnected from %s (status=%s)', endpoint_, r[-1]) return r - def new_event(self, name, args, xheader=None): - event = Event(name, args, context=self._context) + def new_event(self, name, args, kwargs=None, xheader=None): + event = Event(name, args, kwargs, context=self._context) if xheader: event.header.update(xheader) return event diff --git a/zerorpc/heartbeat.py b/zerorpc/heartbeat.py index daa7d50..7bd1166 100644 --- a/zerorpc/heartbeat.py +++ b/zerorpc/heartbeat.py @@ -101,10 +101,10 @@ class HeartBeatOnChannel(ChannelBase): return LostRemote('Lost remote after {0}s heartbeat'.format( self._heartbeat_freq * 2)) - def new_event(self, name, args, header=None): + def new_event(self, name, args, kwargs=None, header=None): if self._compat_v2 and name == u'_zpc_more': name = u'_zpc_hb' - return self._channel.new_event(name, args, header) + return self._channel.new_event(name, args, kwargs, header) def emit_event(self, event, timeout=None): if self._lost_remote: diff --git a/zerorpc/patterns.py b/zerorpc/patterns.py index 3623e17..a6f44f1 100644 --- a/zerorpc/patterns.py +++ b/zerorpc/patterns.py @@ -27,7 +27,7 @@ class ReqRep(object): def process_call(self, context, channel, req_event, functor): context.hook_server_before_exec(req_event) - result = functor(*req_event.args) + result = functor(*req_event.args, **req_event.kwargs) rep_event = channel.new_event(u'OK', (result,), context.hook_get_task_context()) context.hook_server_after_exec(req_event, rep_event) @@ -54,7 +54,7 @@ class ReqStream(object): def process_call(self, context, channel, req_event, functor): context.hook_server_before_exec(req_event) xheader = context.hook_get_task_context() - for result in iter(functor(*req_event.args)): + for result in iter(functor(*req_event.args, **req_event.kwargs)): channel.emit(u'STREAM', result, xheader) done_event = channel.new_event(u'STREAM_DONE', None, xheader) # NOTE: "We" made the choice to call the hook once the stream is done, -- 2.25.1