From b2a9876b112acda09d29c95f53848d2748041b6c Mon Sep 17 00:00:00 2001 From: Michael J Fork Date: Mon, 11 Mar 2013 13:26:59 +0000 Subject: [PATCH] Update oslo rpc libraries Update oslo rpc libraries to capture changes, primarly motivated by secret=True flag on password config options. Added bin/quantum-rpc-zmq-receiver Change-Id: I40d3ef1a85901e5b9eab803cc67cdad3cbb0719a --- bin/quantum-rpc-zmq-receiver | 53 ++++++ quantum/openstack/common/rpc/amqp.py | 5 +- quantum/openstack/common/rpc/impl_kombu.py | 4 +- quantum/openstack/common/rpc/impl_qpid.py | 4 +- quantum/openstack/common/rpc/impl_zmq.py | 34 ++-- quantum/openstack/common/rpc/matchmaker.py | 169 +++++++++++++++++- .../openstack/common/rpc/matchmaker_redis.py | 149 +++++++++++++++ 7 files changed, 397 insertions(+), 21 deletions(-) create mode 100755 bin/quantum-rpc-zmq-receiver create mode 100644 quantum/openstack/common/rpc/matchmaker_redis.py diff --git a/bin/quantum-rpc-zmq-receiver b/bin/quantum-rpc-zmq-receiver new file mode 100755 index 0000000000..d79fdb515f --- /dev/null +++ b/bin/quantum-rpc-zmq-receiver @@ -0,0 +1,53 @@ +#!/usr/bin/env python +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2011 OpenStack LLC +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. + +import eventlet +eventlet.monkey_patch() + +import contextlib +import os +import sys + +# If ../quantum/__init__.py exists, add ../ to Python search path, so that +# it will override what happens to be installed in /usr/(local/)lib/python... +POSSIBLE_TOPDIR = os.path.normpath(os.path.join(os.path.abspath(sys.argv[0]), + os.pardir, + os.pardir)) +if os.path.exists(os.path.join(POSSIBLE_TOPDIR, 'quantum', '__init__.py')): + sys.path.insert(0, POSSIBLE_TOPDIR) + +from oslo.config import cfg + +from quantum.openstack.common import log as logging +from quantum.openstack.common import rpc +from quantum.openstack.common.rpc import impl_zmq + +CONF = cfg.CONF +CONF.register_opts(rpc.rpc_opts) +CONF.register_opts(impl_zmq.zmq_opts) + + +def main(): + CONF(sys.argv[1:], project='quantum') + logging.setup("quantum") + + with contextlib.closing(impl_zmq.ZmqProxy(CONF)) as reactor: + reactor.consume_in_thread() + reactor.wait() + +if __name__ == '__main__': + main() diff --git a/quantum/openstack/common/rpc/amqp.py b/quantum/openstack/common/rpc/amqp.py index a6d5002d57..d648c4fc3d 100644 --- a/quantum/openstack/common/rpc/amqp.py +++ b/quantum/openstack/common/rpc/amqp.py @@ -32,19 +32,20 @@ import uuid from eventlet import greenpool from eventlet import pools -from eventlet import semaphore from eventlet import queue - +from eventlet import semaphore # TODO(pekowsk): Remove import cfg and below comment in Havana. # This import should no longer be needed when the amqp_rpc_single_reply_queue # option is removed. from oslo.config import cfg + from quantum.openstack.common import excutils from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import local from quantum.openstack.common import log as logging from quantum.openstack.common.rpc import common as rpc_common + # TODO(pekowski): Remove this option in Havana. amqp_opts = [ cfg.BoolOpt('amqp_rpc_single_reply_queue', diff --git a/quantum/openstack/common/rpc/impl_kombu.py b/quantum/openstack/common/rpc/impl_kombu.py index 2a7f2dd8be..cbc92bdec3 100644 --- a/quantum/openstack/common/rpc/impl_kombu.py +++ b/quantum/openstack/common/rpc/impl_kombu.py @@ -624,8 +624,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, socket.timeout): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/quantum/openstack/common/rpc/impl_qpid.py b/quantum/openstack/common/rpc/impl_qpid.py index ba0920a936..db165bceb9 100644 --- a/quantum/openstack/common/rpc/impl_qpid.py +++ b/quantum/openstack/common/rpc/impl_qpid.py @@ -415,8 +415,8 @@ class Connection(object): def _error_callback(exc): if isinstance(exc, qpid_exceptions.Empty): - LOG.exception(_('Timed out waiting for RPC response: %s') % - str(exc)) + LOG.debug(_('Timed out waiting for RPC response: %s') % + str(exc)) raise rpc_common.Timeout() else: LOG.exception(_('Failed to consume message from queue: %s') % diff --git a/quantum/openstack/common/rpc/impl_zmq.py b/quantum/openstack/common/rpc/impl_zmq.py index 421457a42e..dc8a5e8675 100644 --- a/quantum/openstack/common/rpc/impl_zmq.py +++ b/quantum/openstack/common/rpc/impl_zmq.py @@ -25,6 +25,7 @@ import eventlet import greenlet from oslo.config import cfg +from quantum.openstack.common import excutils from quantum.openstack.common.gettextutils import _ from quantum.openstack.common import importutils from quantum.openstack.common import jsonutils @@ -91,8 +92,8 @@ def _serialize(data): try: return jsonutils.dumps(data, ensure_ascii=True) except TypeError: - LOG.error(_("JSON serialization failed.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("JSON serialization failed.")) def _deserialize(data): @@ -511,9 +512,9 @@ class ZmqProxy(ZmqBaseReactor): ipc_dir, run_as_root=True) utils.execute('chmod', '750', ipc_dir, run_as_root=True) except utils.ProcessExecutionError: - LOG.error(_("Could not create IPC directory %s") % - (ipc_dir, )) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create IPC directory %s") % + (ipc_dir, )) try: self.register(consumption_proxy, @@ -521,9 +522,9 @@ class ZmqProxy(ZmqBaseReactor): zmq.PULL, out_bind=True) except zmq.ZMQError: - LOG.error(_("Could not create ZeroMQ receiver daemon. " - "Socket may already be in use.")) - raise + with excutils.save_and_reraise_exception(): + LOG.error(_("Could not create ZeroMQ receiver daemon. " + "Socket may already be in use.")) super(ZmqProxy, self).consume_in_thread() @@ -594,6 +595,9 @@ class Connection(rpc_common.Connection): self.reactor = ZmqReactor(conf) def create_consumer(self, topic, proxy, fanout=False): + # Register with matchmaker. + _get_matchmaker().register(topic, CONF.rpc_zmq_host) + # Subscription scenarios if fanout: sock_type = zmq.SUB @@ -620,6 +624,10 @@ class Connection(rpc_common.Connection): self.topics.append(topic) def close(self): + _get_matchmaker().stop_heartbeat() + for topic in self.topics: + _get_matchmaker().unregister(topic, CONF.rpc_zmq_host) + self.reactor.close() self.topics = [] @@ -627,6 +635,7 @@ class Connection(rpc_common.Connection): self.reactor.wait() def consume_in_thread(self): + _get_matchmaker().start_heartbeat() self.reactor.consume_in_thread() @@ -742,7 +751,7 @@ def _multi_send(method, context, topic, msg, timeout=None, LOG.warn(_("No matchmaker results. Not casting.")) # While not strictly a timeout, callers know how to handle # this exception and a timeout isn't too big a lie. - raise rpc_common.Timeout, "No match from matchmaker." + raise rpc_common.Timeout(_("No match from matchmaker.")) # This supports brokerless fanout (addresses > 1) for queue in queues: @@ -785,7 +794,7 @@ def fanout_cast(conf, context, topic, msg, **kwargs): _multi_send(_cast, context, 'fanout~' + str(topic), msg, **kwargs) -def notify(conf, context, topic, msg, **kwargs): +def notify(conf, context, topic, msg, envelope): """ Send notification event. Notifications are sent to topic-priority. @@ -793,9 +802,8 @@ def notify(conf, context, topic, msg, **kwargs): """ # NOTE(ewindisch): dot-priority in rpc notifier does not # work with our assumptions. - topic.replace('.', '-') - kwargs['envelope'] = kwargs.get('envelope', True) - cast(conf, context, topic, msg, **kwargs) + topic = topic.replace('.', '-') + cast(conf, context, topic, msg, envelope=envelope) def cleanup(): diff --git a/quantum/openstack/common/rpc/matchmaker.py b/quantum/openstack/common/rpc/matchmaker.py index 874e022ab1..4fa0d98a3e 100644 --- a/quantum/openstack/common/rpc/matchmaker.py +++ b/quantum/openstack/common/rpc/matchmaker.py @@ -22,6 +22,7 @@ import contextlib import itertools import json +import eventlet from oslo.config import cfg from quantum.openstack.common.gettextutils import _ @@ -33,6 +34,12 @@ matchmaker_opts = [ cfg.StrOpt('matchmaker_ringfile', default='/etc/nova/matchmaker_ring.json', help='Matchmaker ring file (JSON)'), + cfg.IntOpt('matchmaker_heartbeat_freq', + default='300', + help='Heartbeat frequency'), + cfg.IntOpt('matchmaker_heartbeat_ttl', + default='600', + help='Heartbeat time-to-live.'), ] CONF = cfg.CONF @@ -70,12 +77,73 @@ class Binding(object): class MatchMakerBase(object): - """Match Maker Base Class.""" - + """ + Match Maker Base Class. + Build off HeartbeatMatchMakerBase if building a + heartbeat-capable MatchMaker. + """ def __init__(self): # Array of tuples. Index [2] toggles negation, [3] is last-if-true self.bindings = [] + self.no_heartbeat_msg = _('Matchmaker does not implement ' + 'registration or heartbeat.') + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + pass + + def ack_alive(self, key, host): + """ + Acknowledge that a key.host is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + pass + + def is_alive(self, topic, host): + """ + Checks if a host is alive. + """ + pass + + def expire(self, topic, host): + """ + Explicitly expire a host's registration. + """ + pass + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + pass + + def unregister(self, key, host): + """ + Unregister a topic. + """ + pass + + def start_heartbeat(self): + """ + Spawn heartbeat greenthread. + """ + pass + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + pass + def add_binding(self, binding, rule, last=True): self.bindings.append((binding, rule, False, last)) @@ -99,6 +167,103 @@ class MatchMakerBase(object): return workers +class HeartbeatMatchMakerBase(MatchMakerBase): + """ + Base for a heart-beat capable MatchMaker. + Provides common methods for registering, + unregistering, and maintaining heartbeats. + """ + def __init__(self): + self.hosts = set() + self._heart = None + self.host_topic = {} + + super(HeartbeatMatchMakerBase, self).__init__() + + def send_heartbeats(self): + """ + Send all heartbeats. + Use start_heartbeat to spawn a heartbeat greenthread, + which loops this method. + """ + for key, host in self.host_topic: + self.ack_alive(key, host) + + def ack_alive(self, key, host): + """ + Acknowledge that a host.topic is alive. + Used internally for updating heartbeats, + but may also be used publically to acknowledge + a system is alive (i.e. rpc message successfully + sent to host) + """ + raise NotImplementedError("Must implement ack_alive") + + def backend_register(self, key, host): + """ + Implements registration logic. + Called by register(self,key,host) + """ + raise NotImplementedError("Must implement backend_register") + + def backend_unregister(self, key, key_host): + """ + Implements de-registration logic. + Called by unregister(self,key,host) + """ + raise NotImplementedError("Must implement backend_unregister") + + def register(self, key, host): + """ + Register a host on a backend. + Heartbeats, if applicable, may keepalive registration. + """ + self.hosts.add(host) + self.host_topic[(key, host)] = host + key_host = '.'.join((key, host)) + + self.backend_register(key, key_host) + + self.ack_alive(key, host) + + def unregister(self, key, host): + """ + Unregister a topic. + """ + if (key, host) in self.host_topic: + del self.host_topic[(key, host)] + + self.hosts.discard(host) + self.backend_unregister(key, '.'.join((key, host))) + + LOG.info(_("Matchmaker unregistered: %s, %s" % (key, host))) + + def start_heartbeat(self): + """ + Implementation of MatchMakerBase.start_heartbeat + Launches greenthread looping send_heartbeats(), + yielding for CONF.matchmaker_heartbeat_freq seconds + between iterations. + """ + if len(self.hosts) == 0: + raise MatchMakerException( + _("Register before starting heartbeat.")) + + def do_heartbeat(): + while True: + self.send_heartbeats() + eventlet.sleep(CONF.matchmaker_heartbeat_freq) + + self._heart = eventlet.spawn(do_heartbeat) + + def stop_heartbeat(self): + """ + Destroys the heartbeat greenthread. + """ + if self._heart: + self._heart.kill() + + class DirectBinding(Binding): """ Specifies a host in the key via a '.' character diff --git a/quantum/openstack/common/rpc/matchmaker_redis.py b/quantum/openstack/common/rpc/matchmaker_redis.py new file mode 100644 index 0000000000..942e1a2bed --- /dev/null +++ b/quantum/openstack/common/rpc/matchmaker_redis.py @@ -0,0 +1,149 @@ +# vim: tabstop=4 shiftwidth=4 softtabstop=4 + +# Copyright 2013 Cloudscaling Group, Inc +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may +# not use this file except in compliance with the License. You may obtain +# a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT +# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the +# License for the specific language governing permissions and limitations +# under the License. +""" +The MatchMaker classes should accept a Topic or Fanout exchange key and +return keys for direct exchanges, per (approximate) AMQP parlance. +""" + +from oslo.config import cfg + +from quantum.openstack.common import importutils +from quantum.openstack.common import log as logging +from quantum.openstack.common.rpc import matchmaker as mm_common + +redis = importutils.try_import('redis') + + +matchmaker_redis_opts = [ + cfg.StrOpt('host', + default='127.0.0.1', + help='Host to locate redis'), + cfg.IntOpt('port', + default=6379, + help='Use this port to connect to redis host.'), + cfg.StrOpt('password', + default=None, + help='Password for Redis server. (optional)'), +] + +CONF = cfg.CONF +opt_group = cfg.OptGroup(name='matchmaker_redis', + title='Options for Redis-based MatchMaker') +CONF.register_group(opt_group) +CONF.register_opts(matchmaker_redis_opts, opt_group) +LOG = logging.getLogger(__name__) + + +class RedisExchange(mm_common.Exchange): + def __init__(self, matchmaker): + self.matchmaker = matchmaker + self.redis = matchmaker.redis + super(RedisExchange, self).__init__() + + +class RedisTopicExchange(RedisExchange): + """ + Exchange where all topic keys are split, sending to second half. + i.e. "compute.host" sends a message to "compute" running on "host" + """ + def run(self, topic): + while True: + member_name = self.redis.srandmember(topic) + + if not member_name: + # If this happens, there are no + # longer any members. + break + + if not self.matchmaker.is_alive(topic, member_name): + continue + + host = member_name.split('.', 1)[1] + return [(member_name, host)] + return [] + + +class RedisFanoutExchange(RedisExchange): + """ + Return a list of all hosts. + """ + def run(self, topic): + topic = topic.split('~', 1)[1] + hosts = self.redis.smembers(topic) + good_hosts = filter( + lambda host: self.matchmaker.is_alive(topic, host), hosts) + + return [(x, x.split('.', 1)[1]) for x in good_hosts] + + +class MatchMakerRedis(mm_common.HeartbeatMatchMakerBase): + """ + MatchMaker registering and looking-up hosts with a Redis server. + """ + def __init__(self): + super(MatchMakerRedis, self).__init__() + + if not redis: + raise ImportError("Failed to import module redis.") + + self.redis = redis.StrictRedis( + host=CONF.matchmaker_redis.host, + port=CONF.matchmaker_redis.port, + password=CONF.matchmaker_redis.password) + + self.add_binding(mm_common.FanoutBinding(), RedisFanoutExchange(self)) + self.add_binding(mm_common.DirectBinding(), mm_common.DirectExchange()) + self.add_binding(mm_common.TopicBinding(), RedisTopicExchange(self)) + + def ack_alive(self, key, host): + topic = "%s.%s" % (key, host) + if not self.redis.expire(topic, CONF.matchmaker_heartbeat_ttl): + # If we could not update the expiration, the key + # might have been pruned. Re-register, creating a new + # key in Redis. + self.register(self.topic_host[host], host) + + def is_alive(self, topic, host): + if self.redis.ttl(host) == -1: + self.expire(topic, host) + return False + return True + + def expire(self, topic, host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.delete(host) + pipe.srem(topic, host) + pipe.execute() + + def backend_register(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.sadd(key, key_host) + + # No value is needed, we just + # care if it exists. Sets aren't viable + # because only keys can expire. + pipe.set(key_host, '') + + pipe.execute() + + def backend_unregister(self, key, key_host): + with self.redis.pipeline() as pipe: + pipe.multi() + pipe.srem(key, key_host) + pipe.delete(key_host) + pipe.execute()