Merge "Refactor RPC client"
This commit is contained in:
commit
3158d483f4
@ -23,6 +23,8 @@ __all__ = [
|
||||
'RemoteError',
|
||||
]
|
||||
|
||||
import abc
|
||||
|
||||
from oslo_config import cfg
|
||||
import six
|
||||
|
||||
@ -81,7 +83,8 @@ class ClientSendError(exceptions.MessagingException):
|
||||
self.ex = ex
|
||||
|
||||
|
||||
class _CallContext(object):
|
||||
@six.add_metaclass(abc.ABCMeta)
|
||||
class _BaseCallContext(object):
|
||||
|
||||
_marker = object()
|
||||
|
||||
@ -96,7 +99,7 @@ class _CallContext(object):
|
||||
self.retry = retry
|
||||
self.version_cap = version_cap
|
||||
|
||||
super(_CallContext, self).__init__()
|
||||
super(_BaseCallContext, self).__init__()
|
||||
|
||||
def _make_message(self, ctxt, method, args):
|
||||
msg = dict(method=method)
|
||||
@ -123,15 +126,27 @@ class _CallContext(object):
|
||||
return (not self.version_cap or
|
||||
utils.version_is_compatible(self.version_cap, version))
|
||||
|
||||
@classmethod
|
||||
def _check_version(cls, version):
|
||||
if version is not None and version is not cls._marker:
|
||||
# quick sanity check to make sure parsable version numbers are used
|
||||
try:
|
||||
utils.version_is_compatible(version, version)
|
||||
except (IndexError, ValueError):
|
||||
raise exceptions.MessagingException(
|
||||
"Version must contain a major and minor integer. Got %s"
|
||||
% version)
|
||||
|
||||
def cast(self, ctxt, method, **kwargs):
|
||||
"""Invoke a method and return immediately. See RPCClient.cast()."""
|
||||
msg = self._make_message(ctxt, method, kwargs)
|
||||
ctxt = self.serializer.serialize_context(ctxt)
|
||||
msg_ctxt = self.serializer.serialize_context(ctxt)
|
||||
|
||||
if self.version_cap:
|
||||
self._check_version_cap(msg.get('version'))
|
||||
|
||||
try:
|
||||
self.transport._send(self.target, ctxt, msg, retry=self.retry)
|
||||
self.transport._send(self.target, msg_ctxt, msg, retry=self.retry)
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ClientSendError(self.target, ex)
|
||||
|
||||
@ -157,22 +172,26 @@ class _CallContext(object):
|
||||
retry=self.retry)
|
||||
except driver_base.TransportDriverError as ex:
|
||||
raise ClientSendError(self.target, ex)
|
||||
|
||||
return self.serializer.deserialize_entity(ctxt, result)
|
||||
|
||||
@abc.abstractmethod
|
||||
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context. See RPCClient.prepare()."""
|
||||
|
||||
|
||||
class _CallContext(_BaseCallContext):
|
||||
|
||||
_marker = _BaseCallContext._marker
|
||||
|
||||
@classmethod
|
||||
def _prepare(cls, base,
|
||||
def _prepare(cls, call_context,
|
||||
exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context. See RPCClient.prepare()."""
|
||||
if version is not None and version is not cls._marker:
|
||||
# quick sanity check to make sure parsable version numbers are used
|
||||
try:
|
||||
utils.version_is_compatible(version, version)
|
||||
except (IndexError, ValueError):
|
||||
raise exceptions.MessagingException(
|
||||
"Version must contain a major and minor integer. Got %s"
|
||||
% version)
|
||||
cls._check_version(version)
|
||||
kwargs = dict(
|
||||
exchange=exchange,
|
||||
topic=topic,
|
||||
@ -182,30 +201,29 @@ class _CallContext(object):
|
||||
fanout=fanout)
|
||||
kwargs = dict([(k, v) for k, v in kwargs.items()
|
||||
if v is not cls._marker])
|
||||
target = base.target(**kwargs)
|
||||
target = call_context.target(**kwargs)
|
||||
|
||||
if timeout is cls._marker:
|
||||
timeout = base.timeout
|
||||
if retry is cls._marker:
|
||||
retry = base.retry
|
||||
timeout = call_context.timeout
|
||||
if version_cap is cls._marker:
|
||||
version_cap = base.version_cap
|
||||
version_cap = call_context.version_cap
|
||||
if retry is cls._marker:
|
||||
retry = call_context.retry
|
||||
|
||||
return _CallContext(base.transport, target,
|
||||
base.serializer,
|
||||
return _CallContext(call_context.transport, target,
|
||||
call_context.serializer,
|
||||
timeout, version_cap, retry)
|
||||
|
||||
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
"""Prepare a method invocation context. See RPCClient.prepare()."""
|
||||
return self._prepare(self,
|
||||
exchange, topic, namespace,
|
||||
version, server, fanout,
|
||||
timeout, version_cap, retry)
|
||||
return _CallContext._prepare(self,
|
||||
exchange, topic, namespace,
|
||||
version, server, fanout,
|
||||
timeout, version_cap, retry)
|
||||
|
||||
|
||||
class RPCClient(object):
|
||||
class RPCClient(_BaseCallContext):
|
||||
|
||||
"""A class for invoking methods on remote servers.
|
||||
|
||||
@ -273,6 +291,8 @@ class RPCClient(object):
|
||||
LOG.error("Failed to send ping message")
|
||||
"""
|
||||
|
||||
_marker = _BaseCallContext._marker
|
||||
|
||||
def __init__(self, transport, target,
|
||||
timeout=None, version_cap=None, serializer=None, retry=None):
|
||||
"""Construct an RPC client.
|
||||
@ -293,20 +313,15 @@ class RPCClient(object):
|
||||
N means N retries
|
||||
:type retry: int
|
||||
"""
|
||||
self.conf = transport.conf
|
||||
if serializer is None:
|
||||
serializer = msg_serializer.NoOpSerializer()
|
||||
|
||||
super(RPCClient, self).__init__(
|
||||
transport, target, serializer, timeout, version_cap, retry
|
||||
)
|
||||
|
||||
self.conf.register_opts(_client_opts)
|
||||
|
||||
self.transport = transport
|
||||
self.target = target
|
||||
self.timeout = timeout
|
||||
self.retry = retry
|
||||
self.version_cap = version_cap
|
||||
self.serializer = serializer or msg_serializer.NoOpSerializer()
|
||||
|
||||
super(RPCClient, self).__init__()
|
||||
|
||||
_marker = _CallContext._marker
|
||||
|
||||
def prepare(self, exchange=_marker, topic=_marker, namespace=_marker,
|
||||
version=_marker, server=_marker, fanout=_marker,
|
||||
timeout=_marker, version_cap=_marker, retry=_marker):
|
||||
@ -371,7 +386,7 @@ class RPCClient(object):
|
||||
:type kwargs: dict
|
||||
:raises: MessageDeliveryFailure
|
||||
"""
|
||||
self.prepare().cast(ctxt, method, **kwargs)
|
||||
super(RPCClient, self).cast(ctxt, method, **kwargs)
|
||||
|
||||
def call(self, ctxt, method, **kwargs):
|
||||
"""Invoke a method and wait for a reply.
|
||||
@ -413,8 +428,8 @@ class RPCClient(object):
|
||||
:type kwargs: dict
|
||||
:raises: MessagingTimeout, RemoteError, MessageDeliveryFailure
|
||||
"""
|
||||
return self.prepare().call(ctxt, method, **kwargs)
|
||||
return super(RPCClient, self).call(ctxt, method, **kwargs)
|
||||
|
||||
def can_send_version(self, version=_marker):
|
||||
"""Check to see if a version is compatible with the version cap."""
|
||||
return self.prepare(version=version).can_send_version()
|
||||
return super(RPCClient, self).can_send_version(version)
|
||||
|
Loading…
x
Reference in New Issue
Block a user