Merge "Update oslo rpc libraries"
This commit is contained in:
commit
81f0ccc2c9
53
bin/quantum-rpc-zmq-receiver
Executable file
53
bin/quantum-rpc-zmq-receiver
Executable file
@ -0,0 +1,53 @@
|
||||
#!/usr/bin/env python
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2011 OpenStack LLC
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import eventlet
|
||||
eventlet.monkey_patch()
|
||||
|
||||
import contextlib
|
||||
import os
|
||||
import sys
|
||||
|
||||
# If ../quantum/__init__.py exists, add ../ to Python search path, so that
|
||||
# it will override what happens to be installed in /usr/(local/)lib/python...
|
||||
POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]),
|
||||
os.pardir,
|
||||
os.pardir))
|
||||
if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'quantum', '__init__.py')):
|
||||
sys.path.insert(0, POSSIBLE_TOPDIR)
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common import rpc
|
||||
from quantum.openstack.common.rpc import impl_zmq
|
||||
|
||||
CONF = cfg.CONF
|
||||
CONF.register_opts(rpc.rpc_opts)
|
||||
CONF.register_opts(impl_zmq.zmq_opts)
|
||||
|
||||
|
||||
def main():
|
||||
CONF(sys.argv[1:], project='quantum')
|
||||
logging.setup("quantum")
|
||||
|
||||
with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor:
|
||||
reactor.consume_in_thread()
|
||||
reactor.wait()
|
||||
|
||||
if __name__ == '__main__':
|
||||
main()
|
@ -32,19 +32,20 @@ import uuid
|
||||
|
||||
from eventlet import greenpool
|
||||
from eventlet import pools
|
||||
from eventlet import semaphore
|
||||
from eventlet import queue
|
||||
|
||||
from eventlet import semaphore
|
||||
# TODO(pekowsk): Remove import cfg and below comment in Havana.
|
||||
# This import should no longer be needed when the amqp_rpc_single_reply_queue
|
||||
# option is removed.
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.openstack.common import excutils
|
||||
from quantum.openstack.common.gettextutils import _
|
||||
from quantum.openstack.common import local
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import common as rpc_common
|
||||
|
||||
|
||||
# TODO(pekowski): Remove this option in Havana.
|
||||
amqp_opts = [
|
||||
cfg.BoolOpt('amqp_rpc_single_reply_queue',
|
||||
|
@ -624,8 +624,8 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, socket.timeout):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
|
@ -415,8 +415,8 @@ class Connection(object):
|
||||
|
||||
def _error_callback(exc):
|
||||
if isinstance(exc, qpid_exceptions.Empty):
|
||||
LOG.exception(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
LOG.debug(_('Timed out waiting for RPC response: %s') %
|
||||
str(exc))
|
||||
raise rpc_common.Timeout()
|
||||
else:
|
||||
LOG.exception(_('Failed to consume message from queue: %s') %
|
||||
|
@ -25,6 +25,7 @@ import eventlet
|
||||
import greenlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.openstack.common import excutils
|
||||
from quantum.openstack.common.gettextutils import _
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import jsonutils
|
||||
@ -91,8 +92,8 @@ def _serialize(data):
|
||||
try:
|
||||
return jsonutils.dumps(data, ensure_ascii=True)
|
||||
except TypeError:
|
||||
LOG.error(_("JSON serialization failed."))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("JSON serialization failed."))
|
||||
|
||||
|
||||
def _deserialize(data):
|
||||
@ -511,9 +512,9 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
ipc_dir, run_as_root=True)
|
||||
utils.execute('chmod', '750', ipc_dir, run_as_root=True)
|
||||
except utils.ProcessExecutionError:
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create IPC directory %s") %
|
||||
(ipc_dir, ))
|
||||
|
||||
try:
|
||||
self.register(consumption_proxy,
|
||||
@ -521,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor):
|
||||
zmq.PULL,
|
||||
out_bind=True)
|
||||
except zmq.ZMQError:
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
raise
|
||||
with excutils.save_and_reraise_exception():
|
||||
LOG.error(_("Could not create ZeroMQ receiver daemon. "
|
||||
"Socket may already be in use."))
|
||||
|
||||
super(ZmqProxy, self).consume_in_thread()
|
||||
|
||||
@ -594,6 +595,9 @@ class Connection(rpc_common.Connection):
|
||||
self.reactor = ZmqReactor(conf)
|
||||
|
||||
def create_consumer(self, topic, proxy, fanout=False):
|
||||
# Register with matchmaker.
|
||||
_get_matchmaker().register(topic, CONF.rpc_zmq_host)
|
||||
|
||||
# Subscription scenarios
|
||||
if fanout:
|
||||
sock_type = zmq.SUB
|
||||
@ -620,6 +624,10 @@ class Connection(rpc_common.Connection):
|
||||
self.topics.append(topic)
|
||||
|
||||
def close(self):
|
||||
_get_matchmaker().stop_heartbeat()
|
||||
for topic in self.topics:
|
||||
_get_matchmaker().unregister(topic, CONF.rpc_zmq_host)
|
||||
|
||||
self.reactor.close()
|
||||
self.topics = []
|
||||
|
||||
@ -627,6 +635,7 @@ class Connection(rpc_common.Connection):
|
||||
self.reactor.wait()
|
||||
|
||||
def consume_in_thread(self):
|
||||
_get_matchmaker().start_heartbeat()
|
||||
self.reactor.consume_in_thread()
|
||||
|
||||
|
||||
@ -742,7 +751,7 @@ def _multi_send(method, context, topic, msg, timeout=None,
|
||||
LOG.warn(_("No matchmaker results. Not casting."))
|
||||
# While not strictly a timeout, callers know how to handle
|
||||
# this exception and a timeout isn't too big a lie.
|
||||
raise rpc_common.Timeout, "No match from matchmaker."
|
||||
raise rpc_common.Timeout(_("No match from matchmaker."))
|
||||
|
||||
# This supports brokerless fanout (addresses > 1)
|
||||
for queue in queues:
|
||||
@ -785,7 +794,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs):
|
||||
_multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs)
|
||||
|
||||
|
||||
def notify(conf, context, topic, msg, **kwargs):
|
||||
def notify(conf, context, topic, msg, envelope):
|
||||
"""
|
||||
Send notification event.
|
||||
Notifications are sent to topic-priority.
|
||||
@ -793,9 +802,8 @@ def notify(conf, context, topic, msg, **kwargs):
|
||||
"""
|
||||
# NOTE(ewindisch): dot-priority in rpc notifier does not
|
||||
# work with our assumptions.
|
||||
topic.replace('.', '-')
|
||||
kwargs['envelope'] = kwargs.get('envelope', True)
|
||||
cast(conf, context, topic, msg, **kwargs)
|
||||
topic = topic.replace('.', '-')
|
||||
cast(conf, context, topic, msg, envelope=envelope)
|
||||
|
||||
|
||||
def cleanup():
|
||||
|
@ -22,6 +22,7 @@ import contextlib
|
||||
import itertools
|
||||
import json
|
||||
|
||||
import eventlet
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.openstack.common.gettextutils import _
|
||||
@ -33,6 +34,12 @@ matchmaker_opts = [
|
||||
cfg.StrOpt('matchmaker_ringfile',
|
||||
default='/etc/nova/matchmaker_ring.json',
|
||||
help='Matchmaker ring file (JSON)'),
|
||||
cfg.IntOpt('matchmaker_heartbeat_freq',
|
||||
default='300',
|
||||
help='Heartbeat frequency'),
|
||||
cfg.IntOpt('matchmaker_heartbeat_ttl',
|
||||
default='600',
|
||||
help='Heartbeat time-to-live.'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
@ -70,12 +77,73 @@ class Binding(object):
|
||||
|
||||
|
||||
class MatchMakerBase(object):
|
||||
"""Match Maker Base Class."""
|
||||
|
||||
"""
|
||||
Match Maker Base Class.
|
||||
Build off HeartbeatMatchMakerBase if building a
|
||||
heartbeat-capable MatchMaker.
|
||||
"""
|
||||
def __init__(self):
|
||||
# Array of tuples. Index [2] toggles negation, [3] is last-if-true
|
||||
self.bindings = []
|
||||
|
||||
self.no_heartbeat_msg = _('Matchmaker does not implement '
|
||||
'registration or heartbeat.')
|
||||
|
||||
def register(self, key, host):
|
||||
"""
|
||||
Register a host on a backend.
|
||||
Heartbeats, if applicable, may keepalive registration.
|
||||
"""
|
||||
pass
|
||||
|
||||
def ack_alive(self, key, host):
|
||||
"""
|
||||
Acknowledge that a key.host is alive.
|
||||
Used internally for updating heartbeats,
|
||||
but may also be used publically to acknowledge
|
||||
a system is alive (i.e. rpc message successfully
|
||||
sent to host)
|
||||
"""
|
||||
pass
|
||||
|
||||
def is_alive(self, topic, host):
|
||||
"""
|
||||
Checks if a host is alive.
|
||||
"""
|
||||
pass
|
||||
|
||||
def expire(self, topic, host):
|
||||
"""
|
||||
Explicitly expire a host's registration.
|
||||
"""
|
||||
pass
|
||||
|
||||
def send_heartbeats(self):
|
||||
"""
|
||||
Send all heartbeats.
|
||||
Use start_heartbeat to spawn a heartbeat greenthread,
|
||||
which loops this method.
|
||||
"""
|
||||
pass
|
||||
|
||||
def unregister(self, key, host):
|
||||
"""
|
||||
Unregister a topic.
|
||||
"""
|
||||
pass
|
||||
|
||||
def start_heartbeat(self):
|
||||
"""
|
||||
Spawn heartbeat greenthread.
|
||||
"""
|
||||
pass
|
||||
|
||||
def stop_heartbeat(self):
|
||||
"""
|
||||
Destroys the heartbeat greenthread.
|
||||
"""
|
||||
pass
|
||||
|
||||
def add_binding(self, binding, rule, last=True):
|
||||
self.bindings.append((binding, rule, False, last))
|
||||
|
||||
@ -99,6 +167,103 @@ class MatchMakerBase(object):
|
||||
return workers
|
||||
|
||||
|
||||
class HeartbeatMatchMakerBase(MatchMakerBase):
|
||||
"""
|
||||
Base for a heart-beat capable MatchMaker.
|
||||
Provides common methods for registering,
|
||||
unregistering, and maintaining heartbeats.
|
||||
"""
|
||||
def __init__(self):
|
||||
self.hosts = set()
|
||||
self._heart = None
|
||||
self.host_topic = {}
|
||||
|
||||
super(HeartbeatMatchMakerBase, self).__init__()
|
||||
|
||||
def send_heartbeats(self):
|
||||
"""
|
||||
Send all heartbeats.
|
||||
Use start_heartbeat to spawn a heartbeat greenthread,
|
||||
which loops this method.
|
||||
"""
|
||||
for key, host in self.host_topic:
|
||||
self.ack_alive(key, host)
|
||||
|
||||
def ack_alive(self, key, host):
|
||||
"""
|
||||
Acknowledge that a host.topic is alive.
|
||||
Used internally for updating heartbeats,
|
||||
but may also be used publically to acknowledge
|
||||
a system is alive (i.e. rpc message successfully
|
||||
sent to host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement ack_alive")
|
||||
|
||||
def backend_register(self, key, host):
|
||||
"""
|
||||
Implements registration logic.
|
||||
Called by register(self,key,host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement backend_register")
|
||||
|
||||
def backend_unregister(self, key, key_host):
|
||||
"""
|
||||
Implements de-registration logic.
|
||||
Called by unregister(self,key,host)
|
||||
"""
|
||||
raise NotImplementedError("Must implement backend_unregister")
|
||||
|
||||
def register(self, key, host):
|
||||
"""
|
||||
Register a host on a backend.
|
||||
Heartbeats, if applicable, may keepalive registration.
|
||||
"""
|
||||
self.hosts.add(host)
|
||||
self.host_topic[(key, host)] = host
|
||||
key_host = '.'.join((key, host))
|
||||
|
||||
self.backend_register(key, key_host)
|
||||
|
||||
self.ack_alive(key, host)
|
||||
|
||||
def unregister(self, key, host):
|
||||
"""
|
||||
Unregister a topic.
|
||||
"""
|
||||
if (key, host) in self.host_topic:
|
||||
del self.host_topic[(key, host)]
|
||||
|
||||
self.hosts.discard(host)
|
||||
self.backend_unregister(key, '.'.join((key, host)))
|
||||
|
||||
LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host)))
|
||||
|
||||
def start_heartbeat(self):
|
||||
"""
|
||||
Implementation of MatchMakerBase.start_heartbeat
|
||||
Launches greenthread looping send_heartbeats(),
|
||||
yielding for CONF.matchmaker_heartbeat_freq seconds
|
||||
between iterations.
|
||||
"""
|
||||
if len(self.hosts) == 0:
|
||||
raise MatchMakerException(
|
||||
_("Register before starting heartbeat."))
|
||||
|
||||
def do_heartbeat():
|
||||
while True:
|
||||
self.send_heartbeats()
|
||||
eventlet.sleep(CONF.matchmaker_heartbeat_freq)
|
||||
|
||||
self._heart = eventlet.spawn(do_heartbeat)
|
||||
|
||||
def stop_heartbeat(self):
|
||||
"""
|
||||
Destroys the heartbeat greenthread.
|
||||
"""
|
||||
if self._heart:
|
||||
self._heart.kill()
|
||||
|
||||
|
||||
class DirectBinding(Binding):
|
||||
"""
|
||||
Specifies a host in the key via a '.' character
|
||||
|
149
quantum/openstack/common/rpc/matchmaker_redis.py
Normal file
149
quantum/openstack/common/rpc/matchmaker_redis.py
Normal file
@ -0,0 +1,149 @@
|
||||
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
||||
|
||||
# Copyright 2013 Cloudscaling Group, Inc
|
||||
#
|
||||
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
||||
# not use this file except in compliance with the License. You may obtain
|
||||
# a copy of the License at
|
||||
#
|
||||
# http://www.apache.org/licenses/LICENSE-2.0
|
||||
#
|
||||
# Unless required by applicable law or agreed to in writing, software
|
||||
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
"""
|
||||
The MatchMaker classes should accept a Topic or Fanout exchange key and
|
||||
return keys for direct exchanges, per (approximate) AMQP parlance.
|
||||
"""
|
||||
|
||||
from oslo.config import cfg
|
||||
|
||||
from quantum.openstack.common import importutils
|
||||
from quantum.openstack.common import log as logging
|
||||
from quantum.openstack.common.rpc import matchmaker as mm_common
|
||||
|
||||
redis = importutils.try_import('redis')
|
||||
|
||||
|
||||
matchmaker_redis_opts = [
|
||||
cfg.StrOpt('host',
|
||||
default='127.0.0.1',
|
||||
help='Host to locate redis'),
|
||||
cfg.IntOpt('port',
|
||||
default=6379,
|
||||
help='Use this port to connect to redis host.'),
|
||||
cfg.StrOpt('password',
|
||||
default=None,
|
||||
help='Password for Redis server. (optional)'),
|
||||
]
|
||||
|
||||
CONF = cfg.CONF
|
||||
opt_group = cfg.OptGroup(name='matchmaker_redis',
|
||||
title='Options for Redis-based MatchMaker')
|
||||
CONF.register_group(opt_group)
|
||||
CONF.register_opts(matchmaker_redis_opts, opt_group)
|
||||
LOG = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class RedisExchange(mm_common.Exchange):
|
||||
def __init__(self, matchmaker):
|
||||
self.matchmaker = matchmaker
|
||||
self.redis = matchmaker.redis
|
||||
super(RedisExchange, self).__init__()
|
||||
|
||||
|
||||
class RedisTopicExchange(RedisExchange):
|
||||
"""
|
||||
Exchange where all topic keys are split, sending to second half.
|
||||
i.e. "compute.host" sends a message to "compute" running on "host"
|
||||
"""
|
||||
def run(self, topic):
|
||||
while True:
|
||||
member_name = self.redis.srandmember(topic)
|
||||
|
||||
if not member_name:
|
||||
# If this happens, there are no
|
||||
# longer any members.
|
||||
break
|
||||
|
||||
if not self.matchmaker.is_alive(topic, member_name):
|
||||
continue
|
||||
|
||||
host = member_name.split('.', 1)[1]
|
||||
return [(member_name, host)]
|
||||
return []
|
||||
|
||||
|
||||
class RedisFanoutExchange(RedisExchange):
|
||||
"""
|
||||
Return a list of all hosts.
|
||||
"""
|
||||
def run(self, topic):
|
||||
topic = topic.split('~', 1)[1]
|
||||
hosts = self.redis.smembers(topic)
|
||||
good_hosts = filter(
|
||||
lambda host: self.matchmaker.is_alive(topic, host), hosts)
|
||||
|
||||
return [(x, x.split('.', 1)[1]) for x in good_hosts]
|
||||
|
||||
|
||||
class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase):
|
||||
"""
|
||||
MatchMaker registering and looking-up hosts with a Redis server.
|
||||
"""
|
||||
def __init__(self):
|
||||
super(MatchMakerRedis, self).__init__()
|
||||
|
||||
if not redis:
|
||||
raise ImportError("Failed to import module redis.")
|
||||
|
||||
self.redis = redis.StrictRedis(
|
||||
host=CONF.matchmaker_redis.host,
|
||||
port=CONF.matchmaker_redis.port,
|
||||
password=CONF.matchmaker_redis.password)
|
||||
|
||||
self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self))
|
||||
self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange())
|
||||
self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self))
|
||||
|
||||
def ack_alive(self, key, host):
|
||||
topic = "%s.%s" % (key, host)
|
||||
if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl):
|
||||
# If we could not update the expiration, the key
|
||||
# might have been pruned. Re-register, creating a new
|
||||
# key in Redis.
|
||||
self.register(self.topic_host[host], host)
|
||||
|
||||
def is_alive(self, topic, host):
|
||||
if self.redis.ttl(host) == -1:
|
||||
self.expire(topic, host)
|
||||
return False
|
||||
return True
|
||||
|
||||
def expire(self, topic, host):
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.multi()
|
||||
pipe.delete(host)
|
||||
pipe.srem(topic, host)
|
||||
pipe.execute()
|
||||
|
||||
def backend_register(self, key, key_host):
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.multi()
|
||||
pipe.sadd(key, key_host)
|
||||
|
||||
# No value is needed, we just
|
||||
# care if it exists. Sets aren't viable
|
||||
# because only keys can expire.
|
||||
pipe.set(key_host, '')
|
||||
|
||||
pipe.execute()
|
||||
|
||||
def backend_unregister(self, key, key_host):
|
||||
with self.redis.pipeline() as pipe:
|
||||
pipe.multi()
|
||||
pipe.srem(key, key_host)
|
||||
pipe.delete(key_host)
|
||||
pipe.execute()
|
Loading…
Reference in New Issue
Block a user