Latest common updates
Fixes bug 1132908 Change-Id: I726bc5b3199d2fa606832418db9b6e20ffbc3879
This commit is contained in:
parent
f851706511
commit
38868e7ef2
@ -51,12 +51,20 @@ def _print_greenthreads():
|
|||||||
print
|
print
|
||||||
|
|
||||||
|
|
||||||
|
def _print_nativethreads():
|
||||||
|
for threadId, stack in sys._current_frames().items():
|
||||||
|
print threadId
|
||||||
|
traceback.print_stack(stack)
|
||||||
|
print
|
||||||
|
|
||||||
|
|
||||||
def initialize_if_enabled():
|
def initialize_if_enabled():
|
||||||
backdoor_locals = {
|
backdoor_locals = {
|
||||||
'exit': _dont_use_this, # So we don't exit the entire process
|
'exit': _dont_use_this, # So we don't exit the entire process
|
||||||
'quit': _dont_use_this, # So we don't exit the entire process
|
'quit': _dont_use_this, # So we don't exit the entire process
|
||||||
'fo': _find_objects,
|
'fo': _find_objects,
|
||||||
'pgt': _print_greenthreads,
|
'pgt': _print_greenthreads,
|
||||||
|
'pnt': _print_nativethreads,
|
||||||
}
|
}
|
||||||
|
|
||||||
if CONF.backdoor_port is None:
|
if CONF.backdoor_port is None:
|
||||||
|
@ -325,16 +325,11 @@ def _create_logging_excepthook(product_name):
|
|||||||
|
|
||||||
def setup(product_name):
|
def setup(product_name):
|
||||||
"""Setup logging."""
|
"""Setup logging."""
|
||||||
sys.excepthook = _create_logging_excepthook(product_name)
|
|
||||||
|
|
||||||
if CONF.log_config:
|
if CONF.log_config:
|
||||||
try:
|
logging.config.fileConfig(CONF.log_config)
|
||||||
logging.config.fileConfig(CONF.log_config)
|
|
||||||
except Exception:
|
|
||||||
traceback.print_exc()
|
|
||||||
raise
|
|
||||||
else:
|
else:
|
||||||
_setup_logging_from_conf(product_name)
|
_setup_logging_from_conf(product_name)
|
||||||
|
sys.excepthook = _create_logging_excepthook(product_name)
|
||||||
|
|
||||||
|
|
||||||
def set_defaults(logging_context_format_string):
|
def set_defaults(logging_context_format_string):
|
||||||
|
@ -25,6 +25,7 @@ Specifically, this includes impl_kombu and impl_qpid. impl_carrot also uses
|
|||||||
AMQP, but is deprecated and predates this code.
|
AMQP, but is deprecated and predates this code.
|
||||||
"""
|
"""
|
||||||
|
|
||||||
|
import collections
|
||||||
import inspect
|
import inspect
|
||||||
import sys
|
import sys
|
||||||
import uuid
|
import uuid
|
||||||
@ -54,6 +55,7 @@ amqp_opts = [
|
|||||||
|
|
||||||
cfg.CONF.register_opts(amqp_opts)
|
cfg.CONF.register_opts(amqp_opts)
|
||||||
|
|
||||||
|
UNIQUE_ID = '_unique_id'
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -236,6 +238,7 @@ def msg_reply(conf, msg_id, reply_q, connection_pool, reply=None,
|
|||||||
'failure': failure}
|
'failure': failure}
|
||||||
if ending:
|
if ending:
|
||||||
msg['ending'] = True
|
msg['ending'] = True
|
||||||
|
_add_unique_id(msg)
|
||||||
# If a reply_q exists, add the msg_id to the reply and pass the
|
# If a reply_q exists, add the msg_id to the reply and pass the
|
||||||
# reply_q to direct_send() to use it as the response queue.
|
# reply_q to direct_send() to use it as the response queue.
|
||||||
# Otherwise use the msg_id for backward compatibilty.
|
# Otherwise use the msg_id for backward compatibilty.
|
||||||
@ -302,6 +305,37 @@ def pack_context(msg, context):
|
|||||||
msg.update(context_d)
|
msg.update(context_d)
|
||||||
|
|
||||||
|
|
||||||
|
class _MsgIdCache(object):
|
||||||
|
"""This class checks any duplicate messages."""
|
||||||
|
|
||||||
|
# NOTE: This value is considered can be a configuration item, but
|
||||||
|
# it is not necessary to change its value in most cases,
|
||||||
|
# so let this value as static for now.
|
||||||
|
DUP_MSG_CHECK_SIZE = 16
|
||||||
|
|
||||||
|
def __init__(self, **kwargs):
|
||||||
|
self.prev_msgids = collections.deque([],
|
||||||
|
maxlen=self.DUP_MSG_CHECK_SIZE)
|
||||||
|
|
||||||
|
def check_duplicate_message(self, message_data):
|
||||||
|
"""AMQP consumers may read same message twice when exceptions occur
|
||||||
|
before ack is returned. This method prevents doing it.
|
||||||
|
"""
|
||||||
|
if UNIQUE_ID in message_data:
|
||||||
|
msg_id = message_data[UNIQUE_ID]
|
||||||
|
if msg_id not in self.prev_msgids:
|
||||||
|
self.prev_msgids.append(msg_id)
|
||||||
|
else:
|
||||||
|
raise rpc_common.DuplicateMessageError(msg_id=msg_id)
|
||||||
|
|
||||||
|
|
||||||
|
def _add_unique_id(msg):
|
||||||
|
"""Add unique_id for checking duplicate messages."""
|
||||||
|
unique_id = uuid.uuid4().hex
|
||||||
|
msg.update({UNIQUE_ID: unique_id})
|
||||||
|
LOG.debug(_('UNIQUE_ID is %s.') % (unique_id))
|
||||||
|
|
||||||
|
|
||||||
class _ThreadPoolWithWait(object):
|
class _ThreadPoolWithWait(object):
|
||||||
"""Base class for a delayed invocation manager used by
|
"""Base class for a delayed invocation manager used by
|
||||||
the Connection class to start up green threads
|
the Connection class to start up green threads
|
||||||
@ -349,6 +383,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
connection_pool=connection_pool,
|
connection_pool=connection_pool,
|
||||||
)
|
)
|
||||||
self.proxy = proxy
|
self.proxy = proxy
|
||||||
|
self.msg_id_cache = _MsgIdCache()
|
||||||
|
|
||||||
def __call__(self, message_data):
|
def __call__(self, message_data):
|
||||||
"""Consumer callback to call a method on a proxy object.
|
"""Consumer callback to call a method on a proxy object.
|
||||||
@ -368,6 +403,7 @@ class ProxyCallback(_ThreadPoolWithWait):
|
|||||||
if hasattr(local.store, 'context'):
|
if hasattr(local.store, 'context'):
|
||||||
del local.store.context
|
del local.store.context
|
||||||
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
rpc_common._safe_log(LOG.debug, _('received %s'), message_data)
|
||||||
|
self.msg_id_cache.check_duplicate_message(message_data)
|
||||||
ctxt = unpack_context(self.conf, message_data)
|
ctxt = unpack_context(self.conf, message_data)
|
||||||
method = message_data.get('method')
|
method = message_data.get('method')
|
||||||
args = message_data.get('args', {})
|
args = message_data.get('args', {})
|
||||||
@ -422,6 +458,7 @@ class MulticallProxyWaiter(object):
|
|||||||
self._dataqueue = queue.LightQueue()
|
self._dataqueue = queue.LightQueue()
|
||||||
# Add this caller to the reply proxy's call_waiters
|
# Add this caller to the reply proxy's call_waiters
|
||||||
self._reply_proxy.add_call_waiter(self, self._msg_id)
|
self._reply_proxy.add_call_waiter(self, self._msg_id)
|
||||||
|
self.msg_id_cache = _MsgIdCache()
|
||||||
|
|
||||||
def put(self, data):
|
def put(self, data):
|
||||||
self._dataqueue.put(data)
|
self._dataqueue.put(data)
|
||||||
@ -435,6 +472,7 @@ class MulticallProxyWaiter(object):
|
|||||||
|
|
||||||
def _process_data(self, data):
|
def _process_data(self, data):
|
||||||
result = None
|
result = None
|
||||||
|
self.msg_id_cache.check_duplicate_message(data)
|
||||||
if data['failure']:
|
if data['failure']:
|
||||||
failure = data['failure']
|
failure = data['failure']
|
||||||
result = rpc_common.deserialize_remote_exception(self._conf,
|
result = rpc_common.deserialize_remote_exception(self._conf,
|
||||||
@ -479,6 +517,7 @@ class MulticallWaiter(object):
|
|||||||
self._done = False
|
self._done = False
|
||||||
self._got_ending = False
|
self._got_ending = False
|
||||||
self._conf = conf
|
self._conf = conf
|
||||||
|
self.msg_id_cache = _MsgIdCache()
|
||||||
|
|
||||||
def done(self):
|
def done(self):
|
||||||
if self._done:
|
if self._done:
|
||||||
@ -490,6 +529,7 @@ class MulticallWaiter(object):
|
|||||||
|
|
||||||
def __call__(self, data):
|
def __call__(self, data):
|
||||||
"""The consume() callback will call this. Store the result."""
|
"""The consume() callback will call this. Store the result."""
|
||||||
|
self.msg_id_cache.check_duplicate_message(data)
|
||||||
if data['failure']:
|
if data['failure']:
|
||||||
failure = data['failure']
|
failure = data['failure']
|
||||||
self._result = rpc_common.deserialize_remote_exception(self._conf,
|
self._result = rpc_common.deserialize_remote_exception(self._conf,
|
||||||
@ -542,6 +582,7 @@ def multicall(conf, context, topic, msg, timeout, connection_pool):
|
|||||||
msg_id = uuid.uuid4().hex
|
msg_id = uuid.uuid4().hex
|
||||||
msg.update({'_msg_id': msg_id})
|
msg.update({'_msg_id': msg_id})
|
||||||
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
LOG.debug(_('MSG_ID is %s') % (msg_id))
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
|
|
||||||
# TODO(pekowski): Remove this flag and the code under the if clause
|
# TODO(pekowski): Remove this flag and the code under the if clause
|
||||||
@ -575,6 +616,7 @@ def call(conf, context, topic, msg, timeout, connection_pool):
|
|||||||
def cast(conf, context, topic, msg, connection_pool):
|
def cast(conf, context, topic, msg, connection_pool):
|
||||||
"""Sends a message on a topic without waiting for a response."""
|
"""Sends a message on a topic without waiting for a response."""
|
||||||
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
LOG.debug(_('Making asynchronous cast on %s...'), topic)
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
conn.topic_send(topic, rpc_common.serialize_msg(msg))
|
conn.topic_send(topic, rpc_common.serialize_msg(msg))
|
||||||
@ -583,6 +625,7 @@ def cast(conf, context, topic, msg, connection_pool):
|
|||||||
def fanout_cast(conf, context, topic, msg, connection_pool):
|
def fanout_cast(conf, context, topic, msg, connection_pool):
|
||||||
"""Sends a message on a fanout exchange without waiting for a response."""
|
"""Sends a message on a fanout exchange without waiting for a response."""
|
||||||
LOG.debug(_('Making asynchronous fanout cast...'))
|
LOG.debug(_('Making asynchronous fanout cast...'))
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
|
conn.fanout_send(topic, rpc_common.serialize_msg(msg))
|
||||||
@ -590,6 +633,7 @@ def fanout_cast(conf, context, topic, msg, connection_pool):
|
|||||||
|
|
||||||
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
|
def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
|
||||||
"""Sends a message on a topic to a specific server."""
|
"""Sends a message on a topic to a specific server."""
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||||
server_params=server_params) as conn:
|
server_params=server_params) as conn:
|
||||||
@ -599,6 +643,7 @@ def cast_to_server(conf, context, server_params, topic, msg, connection_pool):
|
|||||||
def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
def fanout_cast_to_server(conf, context, server_params, topic, msg,
|
||||||
connection_pool):
|
connection_pool):
|
||||||
"""Sends a message on a fanout exchange to a specific server."""
|
"""Sends a message on a fanout exchange to a specific server."""
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool, pooled=False,
|
with ConnectionContext(conf, connection_pool, pooled=False,
|
||||||
server_params=server_params) as conn:
|
server_params=server_params) as conn:
|
||||||
@ -610,6 +655,7 @@ def notify(conf, context, topic, msg, connection_pool, envelope):
|
|||||||
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
|
LOG.debug(_('Sending %(event_type)s on %(topic)s'),
|
||||||
dict(event_type=msg.get('event_type'),
|
dict(event_type=msg.get('event_type'),
|
||||||
topic=topic))
|
topic=topic))
|
||||||
|
_add_unique_id(msg)
|
||||||
pack_context(msg, context)
|
pack_context(msg, context)
|
||||||
with ConnectionContext(conf, connection_pool) as conn:
|
with ConnectionContext(conf, connection_pool) as conn:
|
||||||
if envelope:
|
if envelope:
|
||||||
|
@ -49,8 +49,8 @@ deserialize_msg().
|
|||||||
The current message format (version 2.0) is very simple. It is:
|
The current message format (version 2.0) is very simple. It is:
|
||||||
|
|
||||||
{
|
{
|
||||||
'quantum.version': <RPC Envelope Version as a String>,
|
'oslo.version': <RPC Envelope Version as a String>,
|
||||||
'quantum.message': <Application Message Payload, JSON encoded>
|
'oslo.message': <Application Message Payload, JSON encoded>
|
||||||
}
|
}
|
||||||
|
|
||||||
Message format version '1.0' is just considered to be the messages we sent
|
Message format version '1.0' is just considered to be the messages we sent
|
||||||
@ -66,8 +66,8 @@ to the messaging libraries as a dict.
|
|||||||
'''
|
'''
|
||||||
_RPC_ENVELOPE_VERSION = '2.0'
|
_RPC_ENVELOPE_VERSION = '2.0'
|
||||||
|
|
||||||
_VERSION_KEY = 'quantum.version'
|
_VERSION_KEY = 'oslo.version'
|
||||||
_MESSAGE_KEY = 'quantum.message'
|
_MESSAGE_KEY = 'oslo.message'
|
||||||
|
|
||||||
|
|
||||||
# TODO(russellb) Turn this on after Grizzly.
|
# TODO(russellb) Turn this on after Grizzly.
|
||||||
@ -125,6 +125,10 @@ class Timeout(RPCException):
|
|||||||
message = _("Timeout while waiting on RPC response.")
|
message = _("Timeout while waiting on RPC response.")
|
||||||
|
|
||||||
|
|
||||||
|
class DuplicateMessageError(RPCException):
|
||||||
|
message = _("Found duplicate message(%(msg_id)s). Skipping it.")
|
||||||
|
|
||||||
|
|
||||||
class InvalidRPCConnectionReuse(RPCException):
|
class InvalidRPCConnectionReuse(RPCException):
|
||||||
message = _("Invalid reuse of an RPC connection.")
|
message = _("Invalid reuse of an RPC connection.")
|
||||||
|
|
||||||
|
@ -198,6 +198,7 @@ class DirectConsumer(ConsumerBase):
|
|||||||
"""
|
"""
|
||||||
# Default options
|
# Default options
|
||||||
options = {'durable': False,
|
options = {'durable': False,
|
||||||
|
'queue_arguments': _get_queue_arguments(conf),
|
||||||
'auto_delete': True,
|
'auto_delete': True,
|
||||||
'exclusive': False}
|
'exclusive': False}
|
||||||
options.update(kwargs)
|
options.update(kwargs)
|
||||||
|
@ -216,12 +216,18 @@ class ZmqClient(object):
|
|||||||
socket_type = zmq.PUSH
|
socket_type = zmq.PUSH
|
||||||
self.outq = ZmqSocket(addr, socket_type, bind=bind)
|
self.outq = ZmqSocket(addr, socket_type, bind=bind)
|
||||||
|
|
||||||
def cast(self, msg_id, topic, data, serialize=True, force_envelope=False):
|
def cast(self, msg_id, topic, data, envelope=False):
|
||||||
msg_id = msg_id or 0
|
msg_id = msg_id or 0
|
||||||
|
|
||||||
if serialize:
|
if not (envelope or rpc_common._SEND_RPC_ENVELOPE):
|
||||||
data = rpc_common.serialize_msg(data, force_envelope)
|
self.outq.send(map(bytes,
|
||||||
self.outq.send(map(bytes, (msg_id, topic, 'cast', _serialize(data))))
|
(msg_id, topic, 'cast', _serialize(data))))
|
||||||
|
return
|
||||||
|
|
||||||
|
rpc_envelope = rpc_common.serialize_msg(data[1], envelope)
|
||||||
|
zmq_msg = reduce(lambda x, y: x + y, rpc_envelope.items())
|
||||||
|
self.outq.send(map(bytes,
|
||||||
|
(msg_id, topic, 'impl_zmq_v2', data[0]) + zmq_msg))
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.outq.close()
|
self.outq.close()
|
||||||
@ -320,7 +326,7 @@ class ConsumerBase(object):
|
|||||||
else:
|
else:
|
||||||
return [result]
|
return [result]
|
||||||
|
|
||||||
def process(self, style, target, proxy, ctx, data):
|
def process(self, proxy, ctx, data):
|
||||||
data.setdefault('version', None)
|
data.setdefault('version', None)
|
||||||
data.setdefault('args', {})
|
data.setdefault('args', {})
|
||||||
|
|
||||||
@ -432,12 +438,14 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
|
|
||||||
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
#TODO(ewindisch): use zero-copy (i.e. references, not copying)
|
||||||
data = sock.recv()
|
data = sock.recv()
|
||||||
msg_id, topic, style, in_msg = data
|
topic = data[1]
|
||||||
topic = topic.split('.', 1)[0]
|
|
||||||
|
|
||||||
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
|
LOG.debug(_("CONSUMER GOT %s"), ' '.join(map(pformat, data)))
|
||||||
|
|
||||||
if topic.startswith('fanout~') or topic.startswith('zmq_replies'):
|
if topic.startswith('fanout~'):
|
||||||
|
sock_type = zmq.PUB
|
||||||
|
topic = topic.split('.', 1)[0]
|
||||||
|
elif topic.startswith('zmq_replies'):
|
||||||
sock_type = zmq.PUB
|
sock_type = zmq.PUB
|
||||||
else:
|
else:
|
||||||
sock_type = zmq.PUSH
|
sock_type = zmq.PUSH
|
||||||
@ -520,6 +528,21 @@ class ZmqProxy(ZmqBaseReactor):
|
|||||||
super(ZmqProxy, self).consume_in_thread()
|
super(ZmqProxy, self).consume_in_thread()
|
||||||
|
|
||||||
|
|
||||||
|
def unflatten_envelope(packenv):
|
||||||
|
"""Unflattens the RPC envelope.
|
||||||
|
Takes a list and returns a dictionary.
|
||||||
|
i.e. [1,2,3,4] => {1: 2, 3: 4}
|
||||||
|
"""
|
||||||
|
i = iter(packenv)
|
||||||
|
h = {}
|
||||||
|
try:
|
||||||
|
while True:
|
||||||
|
k = i.next()
|
||||||
|
h[k] = i.next()
|
||||||
|
except StopIteration:
|
||||||
|
return h
|
||||||
|
|
||||||
|
|
||||||
class ZmqReactor(ZmqBaseReactor):
|
class ZmqReactor(ZmqBaseReactor):
|
||||||
"""
|
"""
|
||||||
A consumer class implementing a
|
A consumer class implementing a
|
||||||
@ -540,38 +563,50 @@ class ZmqReactor(ZmqBaseReactor):
|
|||||||
self.mapping[sock].send(data)
|
self.mapping[sock].send(data)
|
||||||
return
|
return
|
||||||
|
|
||||||
msg_id, topic, style, in_msg = data
|
|
||||||
|
|
||||||
ctx, request = rpc_common.deserialize_msg(_deserialize(in_msg))
|
|
||||||
ctx = RpcContext.unmarshal(ctx)
|
|
||||||
|
|
||||||
proxy = self.proxies[sock]
|
proxy = self.proxies[sock]
|
||||||
|
|
||||||
self.pool.spawn_n(self.process, style, topic,
|
if data[2] == 'cast': # Legacy protocol
|
||||||
proxy, ctx, request)
|
packenv = data[3]
|
||||||
|
|
||||||
|
ctx, msg = _deserialize(packenv)
|
||||||
|
request = rpc_common.deserialize_msg(msg)
|
||||||
|
ctx = RpcContext.unmarshal(ctx)
|
||||||
|
elif data[2] == 'impl_zmq_v2':
|
||||||
|
packenv = data[4:]
|
||||||
|
|
||||||
|
msg = unflatten_envelope(packenv)
|
||||||
|
request = rpc_common.deserialize_msg(msg)
|
||||||
|
|
||||||
|
# Unmarshal only after verifying the message.
|
||||||
|
ctx = RpcContext.unmarshal(data[3])
|
||||||
|
else:
|
||||||
|
LOG.error(_("ZMQ Envelope version unsupported or unknown."))
|
||||||
|
return
|
||||||
|
|
||||||
|
self.pool.spawn_n(self.process, proxy, ctx, request)
|
||||||
|
|
||||||
|
|
||||||
class Connection(rpc_common.Connection):
|
class Connection(rpc_common.Connection):
|
||||||
"""Manages connections and threads."""
|
"""Manages connections and threads."""
|
||||||
|
|
||||||
def __init__(self, conf):
|
def __init__(self, conf):
|
||||||
|
self.topics = []
|
||||||
self.reactor = ZmqReactor(conf)
|
self.reactor = ZmqReactor(conf)
|
||||||
|
|
||||||
def create_consumer(self, topic, proxy, fanout=False):
|
def create_consumer(self, topic, proxy, fanout=False):
|
||||||
# Only consume on the base topic name.
|
|
||||||
topic = topic.split('.', 1)[0]
|
|
||||||
|
|
||||||
LOG.info(_("Create Consumer for topic (%(topic)s)") %
|
|
||||||
{'topic': topic})
|
|
||||||
|
|
||||||
# Subscription scenarios
|
# Subscription scenarios
|
||||||
if fanout:
|
if fanout:
|
||||||
subscribe = ('', fanout)[type(fanout) == str]
|
|
||||||
sock_type = zmq.SUB
|
sock_type = zmq.SUB
|
||||||
topic = 'fanout~' + topic
|
subscribe = ('', fanout)[type(fanout) == str]
|
||||||
|
topic = 'fanout~' + topic.split('.', 1)[0]
|
||||||
else:
|
else:
|
||||||
sock_type = zmq.PULL
|
sock_type = zmq.PULL
|
||||||
subscribe = None
|
subscribe = None
|
||||||
|
topic = '.'.join((topic.split('.', 1)[0], CONF.rpc_zmq_host))
|
||||||
|
|
||||||
|
if topic in self.topics:
|
||||||
|
LOG.info(_("Skipping topic registration. Already registered."))
|
||||||
|
return
|
||||||
|
|
||||||
# Receive messages from (local) proxy
|
# Receive messages from (local) proxy
|
||||||
inaddr = "ipc://%s/zmq_topic_%s" % \
|
inaddr = "ipc://%s/zmq_topic_%s" % \
|
||||||
@ -582,9 +617,11 @@ class Connection(rpc_common.Connection):
|
|||||||
|
|
||||||
self.reactor.register(proxy, inaddr, sock_type,
|
self.reactor.register(proxy, inaddr, sock_type,
|
||||||
subscribe=subscribe, in_bind=False)
|
subscribe=subscribe, in_bind=False)
|
||||||
|
self.topics.append(topic)
|
||||||
|
|
||||||
def close(self):
|
def close(self):
|
||||||
self.reactor.close()
|
self.reactor.close()
|
||||||
|
self.topics = []
|
||||||
|
|
||||||
def wait(self):
|
def wait(self):
|
||||||
self.reactor.wait()
|
self.reactor.wait()
|
||||||
@ -593,8 +630,8 @@ class Connection(rpc_common.Connection):
|
|||||||
self.reactor.consume_in_thread()
|
self.reactor.consume_in_thread()
|
||||||
|
|
||||||
|
|
||||||
def _cast(addr, context, topic, msg, timeout=None, serialize=True,
|
def _cast(addr, context, topic, msg, timeout=None, envelope=False,
|
||||||
force_envelope=False, _msg_id=None):
|
_msg_id=None):
|
||||||
timeout_cast = timeout or CONF.rpc_cast_timeout
|
timeout_cast = timeout or CONF.rpc_cast_timeout
|
||||||
payload = [RpcContext.marshal(context), msg]
|
payload = [RpcContext.marshal(context), msg]
|
||||||
|
|
||||||
@ -603,7 +640,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
|
|||||||
conn = ZmqClient(addr)
|
conn = ZmqClient(addr)
|
||||||
|
|
||||||
# assumes cast can't return an exception
|
# assumes cast can't return an exception
|
||||||
conn.cast(_msg_id, topic, payload, serialize, force_envelope)
|
conn.cast(_msg_id, topic, payload, envelope)
|
||||||
except zmq.ZMQError:
|
except zmq.ZMQError:
|
||||||
raise RPCException("Cast failed. ZMQ Socket Exception")
|
raise RPCException("Cast failed. ZMQ Socket Exception")
|
||||||
finally:
|
finally:
|
||||||
@ -612,7 +649,7 @@ def _cast(addr, context, topic, msg, timeout=None, serialize=True,
|
|||||||
|
|
||||||
|
|
||||||
def _call(addr, context, topic, msg, timeout=None,
|
def _call(addr, context, topic, msg, timeout=None,
|
||||||
serialize=True, force_envelope=False):
|
envelope=False):
|
||||||
# timeout_response is how long we wait for a response
|
# timeout_response is how long we wait for a response
|
||||||
timeout = timeout or CONF.rpc_response_timeout
|
timeout = timeout or CONF.rpc_response_timeout
|
||||||
|
|
||||||
@ -642,20 +679,31 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
with Timeout(timeout, exception=rpc_common.Timeout):
|
with Timeout(timeout, exception=rpc_common.Timeout):
|
||||||
try:
|
try:
|
||||||
msg_waiter = ZmqSocket(
|
msg_waiter = ZmqSocket(
|
||||||
"ipc://%s/zmq_topic_zmq_replies" % CONF.rpc_zmq_ipc_dir,
|
"ipc://%s/zmq_topic_zmq_replies.%s" %
|
||||||
|
(CONF.rpc_zmq_ipc_dir,
|
||||||
|
CONF.rpc_zmq_host),
|
||||||
zmq.SUB, subscribe=msg_id, bind=False
|
zmq.SUB, subscribe=msg_id, bind=False
|
||||||
)
|
)
|
||||||
|
|
||||||
LOG.debug(_("Sending cast"))
|
LOG.debug(_("Sending cast"))
|
||||||
_cast(addr, context, topic, payload,
|
_cast(addr, context, topic, payload, envelope)
|
||||||
serialize=serialize, force_envelope=force_envelope)
|
|
||||||
|
|
||||||
LOG.debug(_("Cast sent; Waiting reply"))
|
LOG.debug(_("Cast sent; Waiting reply"))
|
||||||
# Blocks until receives reply
|
# Blocks until receives reply
|
||||||
msg = msg_waiter.recv()
|
msg = msg_waiter.recv()
|
||||||
LOG.debug(_("Received message: %s"), msg)
|
LOG.debug(_("Received message: %s"), msg)
|
||||||
LOG.debug(_("Unpacking response"))
|
LOG.debug(_("Unpacking response"))
|
||||||
responses = _deserialize(msg[-1])[-1]['args']['response']
|
|
||||||
|
if msg[2] == 'cast': # Legacy version
|
||||||
|
raw_msg = _deserialize(msg[-1])[-1]
|
||||||
|
elif msg[2] == 'impl_zmq_v2':
|
||||||
|
rpc_envelope = unflatten_envelope(msg[4:])
|
||||||
|
raw_msg = rpc_common.deserialize_msg(rpc_envelope)
|
||||||
|
else:
|
||||||
|
raise rpc_common.UnsupportedRpcEnvelopeVersion(
|
||||||
|
_("Unsupported or unknown ZMQ envelope returned."))
|
||||||
|
|
||||||
|
responses = raw_msg['args']['response']
|
||||||
# ZMQError trumps the Timeout error.
|
# ZMQError trumps the Timeout error.
|
||||||
except zmq.ZMQError:
|
except zmq.ZMQError:
|
||||||
raise RPCException("ZMQ Socket Error")
|
raise RPCException("ZMQ Socket Error")
|
||||||
@ -676,8 +724,8 @@ def _call(addr, context, topic, msg, timeout=None,
|
|||||||
return responses[-1]
|
return responses[-1]
|
||||||
|
|
||||||
|
|
||||||
def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
|
def _multi_send(method, context, topic, msg, timeout=None,
|
||||||
force_envelope=False, _msg_id=None):
|
envelope=False, _msg_id=None):
|
||||||
"""
|
"""
|
||||||
Wraps the sending of messages,
|
Wraps the sending of messages,
|
||||||
dispatches to the matchmaker and sends
|
dispatches to the matchmaker and sends
|
||||||
@ -703,11 +751,11 @@ def _multi_send(method, context, topic, msg, timeout=None, serialize=True,
|
|||||||
|
|
||||||
if method.__name__ == '_cast':
|
if method.__name__ == '_cast':
|
||||||
eventlet.spawn_n(method, _addr, context,
|
eventlet.spawn_n(method, _addr, context,
|
||||||
_topic, msg, timeout, serialize,
|
_topic, msg, timeout, envelope,
|
||||||
force_envelope, _msg_id)
|
_msg_id)
|
||||||
return
|
return
|
||||||
return method(_addr, context, _topic, msg, timeout,
|
return method(_addr, context, _topic, msg, timeout,
|
||||||
serialize, force_envelope)
|
envelope)
|
||||||
|
|
||||||
|
|
||||||
def create_connection(conf, new=True):
|
def create_connection(conf, new=True):
|
||||||
@ -746,8 +794,7 @@ def notify(conf, context, topic, msg, **kwargs):
|
|||||||
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||||
# work with our assumptions.
|
# work with our assumptions.
|
||||||
topic.replace('.', '-')
|
topic.replace('.', '-')
|
||||||
kwargs['serialize'] = kwargs.pop('envelope')
|
kwargs['envelope'] = kwargs.get('envelope', True)
|
||||||
kwargs['force_envelope'] = True
|
|
||||||
cast(conf, context, topic, msg, **kwargs)
|
cast(conf, context, topic, msg, **kwargs)
|
||||||
|
|
||||||
|
|
||||||
|
@ -117,9 +117,9 @@ def _run_shell_command(cmd, throw_on_error=False):
|
|||||||
output = subprocess.Popen(["/bin/sh", "-c", cmd],
|
output = subprocess.Popen(["/bin/sh", "-c", cmd],
|
||||||
stdout=subprocess.PIPE,
|
stdout=subprocess.PIPE,
|
||||||
stderr=subprocess.PIPE)
|
stderr=subprocess.PIPE)
|
||||||
|
out = output.communicate()
|
||||||
if output.returncode and throw_on_error:
|
if output.returncode and throw_on_error:
|
||||||
raise Exception("%s returned %d" % cmd, output.returncode)
|
raise Exception("%s returned %d" % cmd, output.returncode)
|
||||||
out = output.communicate()
|
|
||||||
if len(out) == 0:
|
if len(out) == 0:
|
||||||
return None
|
return None
|
||||||
if len(out[0].strip()) == 0:
|
if len(out[0].strip()) == 0:
|
||||||
@ -131,7 +131,7 @@ def write_git_changelog():
|
|||||||
"""Write a changelog based on the git changelog."""
|
"""Write a changelog based on the git changelog."""
|
||||||
new_changelog = 'ChangeLog'
|
new_changelog = 'ChangeLog'
|
||||||
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
|
if not os.getenv('SKIP_WRITE_GIT_CHANGELOG'):
|
||||||
if os.path.isdir('.git'):
|
if os.path.exists('.git'):
|
||||||
git_log_cmd = 'git log --stat'
|
git_log_cmd = 'git log --stat'
|
||||||
changelog = _run_shell_command(git_log_cmd)
|
changelog = _run_shell_command(git_log_cmd)
|
||||||
mailmap = parse_mailmap()
|
mailmap = parse_mailmap()
|
||||||
@ -147,7 +147,7 @@ def generate_authors():
|
|||||||
old_authors = 'AUTHORS.in'
|
old_authors = 'AUTHORS.in'
|
||||||
new_authors = 'AUTHORS'
|
new_authors = 'AUTHORS'
|
||||||
if not os.getenv('SKIP_GENERATE_AUTHORS'):
|
if not os.getenv('SKIP_GENERATE_AUTHORS'):
|
||||||
if os.path.isdir('.git'):
|
if os.path.exists('.git'):
|
||||||
# don't include jenkins email address in AUTHORS file
|
# don't include jenkins email address in AUTHORS file
|
||||||
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
|
git_log_cmd = ("git log --format='%aN <%aE>' | sort -u | "
|
||||||
"egrep -v '" + jenkins_email + "'")
|
"egrep -v '" + jenkins_email + "'")
|
||||||
@ -279,7 +279,7 @@ def _get_version_from_git(pre_version):
|
|||||||
revision if there is one, or tag plus number of additional revisions
|
revision if there is one, or tag plus number of additional revisions
|
||||||
if the current revision has no tag."""
|
if the current revision has no tag."""
|
||||||
|
|
||||||
if os.path.isdir('.git'):
|
if os.path.exists('.git'):
|
||||||
if pre_version:
|
if pre_version:
|
||||||
try:
|
try:
|
||||||
return _run_shell_command(
|
return _run_shell_command(
|
||||||
|
Loading…
Reference in New Issue
Block a user