Merge "Port ZMQ driver to Python 3"
This commit is contained in:
commit
6a1d3542ea
@ -106,7 +106,7 @@ def _serialize(data):
|
||||
|
||||
def _deserialize(data):
|
||||
"""Deserialization wrapper."""
|
||||
LOG.debug("Deserializing: %s", data)
|
||||
LOG.debug("Deserializing: %r", data)
|
||||
return jsonutils.loads(data)
|
||||
|
||||
|
||||
@ -180,7 +180,10 @@ class ZmqSocket(object):
|
||||
LOG.debug("Subscribing to %s", msg_filter)
|
||||
|
||||
try:
|
||||
self.sock.setsockopt(zmq.SUBSCRIBE, msg_filter)
|
||||
arg = msg_filter
|
||||
if six.PY3:
|
||||
arg = arg.encode('utf-8')
|
||||
self.sock.setsockopt(zmq.SUBSCRIBE, arg)
|
||||
except Exception:
|
||||
return
|
||||
|
||||
@ -190,7 +193,10 @@ class ZmqSocket(object):
|
||||
"""Unsubscribe."""
|
||||
if msg_filter not in self.subscriptions:
|
||||
return
|
||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, msg_filter)
|
||||
arg = msg_filter
|
||||
if six.PY3:
|
||||
arg = arg.encode('utf-8')
|
||||
self.sock.setsockopt(zmq.UNSUBSCRIBE, arg)
|
||||
self.subscriptions.remove(msg_filter)
|
||||
|
||||
@property
|
||||
@ -240,7 +246,10 @@ class ZmqClient(object):
|
||||
self.outq = ZmqSocket(addr, zmq.PUSH, bind=False)
|
||||
|
||||
def cast(self, msg_id, topic, data, envelope):
|
||||
msg_id = msg_id or 0
|
||||
msg_id = msg_id or '0'
|
||||
|
||||
if six.PY3:
|
||||
msg_id = msg_id.encode('utf-8')
|
||||
|
||||
if not envelope:
|
||||
data = _serialize(data)
|
||||
@ -574,13 +583,13 @@ class ZmqReactor(ZmqBaseReactor):
|
||||
|
||||
proxy = self.proxies[sock]
|
||||
|
||||
if data[2] == 'cast': # Legacy protocol
|
||||
if data[2] == b'cast': # Legacy protocol
|
||||
packenv = data[3]
|
||||
|
||||
ctx, msg = _deserialize(packenv)
|
||||
request = rpc_common.deserialize_msg(msg)
|
||||
ctx = RpcContext.unmarshal(ctx)
|
||||
elif data[2] == 'impl_zmq_v2':
|
||||
elif data[2] == b'impl_zmq_v2':
|
||||
packenv = data[4:]
|
||||
|
||||
msg = unflatten_envelope(packenv)
|
||||
@ -724,9 +733,9 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
LOG.debug("Received message: %s", msg)
|
||||
LOG.debug("Unpacking response")
|
||||
|
||||
if msg[2] == 'cast': # Legacy version
|
||||
if msg[2] == b'cast': # Legacy version
|
||||
raw_msg = _deserialize(msg[-1])[-1]
|
||||
elif msg[2] == 'impl_zmq_v2':
|
||||
elif msg[2] == b'impl_zmq_v2':
|
||||
rpc_envelope = unflatten_envelope(msg[4:])
|
||||
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
|
||||
else:
|
||||
@ -748,7 +757,7 @@ def _call(addr, context, topic, msg, timeout=None,
|
||||
# One effect of this is that we're checking all
|
||||
# responses for Exceptions.
|
||||
for resp in responses:
|
||||
if isinstance(resp, types.DictType) and 'exc' in resp:
|
||||
if isinstance(resp, dict) and 'exc' in resp:
|
||||
raise rpc_common.deserialize_remote_exception(
|
||||
resp['exc'], allowed_remote_exmods)
|
||||
|
||||
|
@ -17,6 +17,9 @@ oslotest>=1.5.1 # Apache-2.0
|
||||
# for test_matchmaker_redis
|
||||
redis>=2.10.0
|
||||
|
||||
# for test_impl_zmq
|
||||
pyzmq>=14.3.1 # LGPL+BSD
|
||||
|
||||
# when we can require tox>= 1.4, this can go into tox.ini:
|
||||
# [testenv:cover]
|
||||
# deps = {[testenv]deps} coverage
|
||||
|
Loading…
x
Reference in New Issue
Block a user