diff --git a/oslo_messaging/rpc/client.py b/oslo_messaging/rpc/client.py index 9792d19b3..2a353fab7 100644 --- a/oslo_messaging/rpc/client.py +++ b/oslo_messaging/rpc/client.py @@ -23,6 +23,8 @@ __all__ = [ 'RemoteError', ] +import abc + from oslo_config import cfg import six @@ -81,7 +83,8 @@ class ClientSendError(exceptions.MessagingException): self.ex = ex -class _CallContext(object): +@six.add_metaclass(abc.ABCMeta) +class _BaseCallContext(object): _marker = object() @@ -96,7 +99,7 @@ class _CallContext(object): self.retry = retry self.version_cap = version_cap - super(_CallContext, self).__init__() + super(_BaseCallContext, self).__init__() def _make_message(self, ctxt, method, args): msg = dict(method=method) @@ -123,15 +126,27 @@ class _CallContext(object): return (not self.version_cap or utils.version_is_compatible(self.version_cap, version)) + @classmethod + def _check_version(cls, version): + if version is not None and version is not cls._marker: + # quick sanity check to make sure parsable version numbers are used + try: + utils.version_is_compatible(version, version) + except (IndexError, ValueError): + raise exceptions.MessagingException( + "Version must contain a major and minor integer. Got %s" + % version) + def cast(self, ctxt, method, **kwargs): """Invoke a method and return immediately. See RPCClient.cast().""" msg = self._make_message(ctxt, method, kwargs) - ctxt = self.serializer.serialize_context(ctxt) + msg_ctxt = self.serializer.serialize_context(ctxt) if self.version_cap: self._check_version_cap(msg.get('version')) + try: - self.transport._send(self.target, ctxt, msg, retry=self.retry) + self.transport._send(self.target, msg_ctxt, msg, retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) @@ -157,22 +172,26 @@ class _CallContext(object): retry=self.retry) except driver_base.TransportDriverError as ex: raise ClientSendError(self.target, ex) + return self.serializer.deserialize_entity(ctxt, result) + @abc.abstractmethod + def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, + version=_marker, server=_marker, fanout=_marker, + timeout=_marker, version_cap=_marker, retry=_marker): + """Prepare a method invocation context. See RPCClient.prepare().""" + + +class _CallContext(_BaseCallContext): + + _marker = _BaseCallContext._marker + @classmethod - def _prepare(cls, base, + def _prepare(cls, call_context, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker): - """Prepare a method invocation context. See RPCClient.prepare().""" - if version is not None and version is not cls._marker: - # quick sanity check to make sure parsable version numbers are used - try: - utils.version_is_compatible(version, version) - except (IndexError, ValueError): - raise exceptions.MessagingException( - "Version must contain a major and minor integer. Got %s" - % version) + cls._check_version(version) kwargs = dict( exchange=exchange, topic=topic, @@ -182,30 +201,29 @@ class _CallContext(object): fanout=fanout) kwargs = dict([(k, v) for k, v in kwargs.items() if v is not cls._marker]) - target = base.target(**kwargs) + target = call_context.target(**kwargs) if timeout is cls._marker: - timeout = base.timeout - if retry is cls._marker: - retry = base.retry + timeout = call_context.timeout if version_cap is cls._marker: - version_cap = base.version_cap + version_cap = call_context.version_cap + if retry is cls._marker: + retry = call_context.retry - return _CallContext(base.transport, target, - base.serializer, + return _CallContext(call_context.transport, target, + call_context.serializer, timeout, version_cap, retry) def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker): - """Prepare a method invocation context. See RPCClient.prepare().""" - return self._prepare(self, - exchange, topic, namespace, - version, server, fanout, - timeout, version_cap, retry) + return _CallContext._prepare(self, + exchange, topic, namespace, + version, server, fanout, + timeout, version_cap, retry) -class RPCClient(object): +class RPCClient(_BaseCallContext): """A class for invoking methods on remote servers. @@ -273,6 +291,8 @@ class RPCClient(object): LOG.error("Failed to send ping message") """ + _marker = _BaseCallContext._marker + def __init__(self, transport, target, timeout=None, version_cap=None, serializer=None, retry=None): """Construct an RPC client. @@ -293,20 +313,15 @@ class RPCClient(object): N means N retries :type retry: int """ - self.conf = transport.conf + if serializer is None: + serializer = msg_serializer.NoOpSerializer() + + super(RPCClient, self).__init__( + transport, target, serializer, timeout, version_cap, retry + ) + self.conf.register_opts(_client_opts) - self.transport = transport - self.target = target - self.timeout = timeout - self.retry = retry - self.version_cap = version_cap - self.serializer = serializer or msg_serializer.NoOpSerializer() - - super(RPCClient, self).__init__() - - _marker = _CallContext._marker - def prepare(self, exchange=_marker, topic=_marker, namespace=_marker, version=_marker, server=_marker, fanout=_marker, timeout=_marker, version_cap=_marker, retry=_marker): @@ -371,7 +386,7 @@ class RPCClient(object): :type kwargs: dict :raises: MessageDeliveryFailure """ - self.prepare().cast(ctxt, method, **kwargs) + super(RPCClient, self).cast(ctxt, method, **kwargs) def call(self, ctxt, method, **kwargs): """Invoke a method and wait for a reply. @@ -413,8 +428,8 @@ class RPCClient(object): :type kwargs: dict :raises: MessagingTimeout, RemoteError, MessageDeliveryFailure """ - return self.prepare().call(ctxt, method, **kwargs) + return super(RPCClient, self).call(ctxt, method, **kwargs) def can_send_version(self, version=_marker): """Check to see if a version is compatible with the version cap.""" - return self.prepare(version=version).can_send_version() + return super(RPCClient, self).can_send_version(version)