Support for several HA RabbitMQ servers.

Change-Id: I37f73867ff469133c39bf5f1a7fd48f48b0704d4
This commit is contained in:
Emilien Macchi 2012-09-27 16:20:29 +02:00
parent a385229da8
commit 9463f6f1a5
3 changed files with 158 additions and 48 deletions

View File

@ -1,5 +1,5 @@
[DEFAULT] [DEFAULT]
# The list of modules to copy from openstack-common # The list of modules to copy from openstack-common
modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,notifier,timeutils,log,context,local,rpc,gettextutils,excutils modules=cfg,exception,importutils,iniparser,jsonutils,policy,setup,network_utils,notifier,timeutils,log,context,local,rpc,gettextutils,excutils
# The base module to hold the copy of openstack.common # The base module to hold the copy of openstack.common
base=quantum base=quantum

View File

@ -0,0 +1,68 @@
# vim: tabstop=4 shiftwidth=4 softtabstop=4
# Copyright 2012 OpenStack LLC.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""
Network-related utilities and helper functions.
"""
import logging
LOG = logging.getLogger(__name__)
def parse_host_port(address, default_port=None):
"""
Interpret a string as a host:port pair.
An IPv6 address MUST be escaped if accompanied by a port,
because otherwise ambiguity ensues: 2001:db8:85a3::8a2e:370:7334
means both [2001:db8:85a3::8a2e:370:7334] and
[2001:db8:85a3::8a2e:370]:7334.
>>> parse_host_port('server01:80')
('server01', 80)
>>> parse_host_port('server01')
('server01', None)
>>> parse_host_port('server01', default_port=1234)
('server01', 1234)
>>> parse_host_port('[::1]:80')
('::1', 80)
>>> parse_host_port('[::1]')
('::1', None)
>>> parse_host_port('[::1]', default_port=1234)
('::1', 1234)
>>> parse_host_port('2001:db8:85a3::8a2e:370:7334', default_port=1234)
('2001:db8:85a3::8a2e:370:7334', 1234)
"""
if address[0] == '[':
# Escaped ipv6
_host, _port = address[1:].split(']')
host = _host
if ':' in _port:
port = _port.split(':')[1]
else:
port = default_port
else:
if address.count(':') == 1:
host, port = address.split(':')
else:
# 0 means ipv4, >1 means ipv6.
# We prohibit unescaped ipv6 addresses with port.
host = address
port = default_port
return (host, None if port is None else int(port))

View File

@ -29,10 +29,11 @@ import kombu.connection
import kombu.entity import kombu.entity
import kombu.messaging import kombu.messaging
from quantum.openstack.common import cfg from openstack.common import cfg
from quantum.openstack.common.gettextutils import _ from openstack.common.gettextutils import _
from quantum.openstack.common.rpc import amqp as rpc_amqp from openstack.common.rpc import amqp as rpc_amqp
from quantum.openstack.common.rpc import common as rpc_common from openstack.common.rpc import common as rpc_common
from openstack.common import network_utils
kombu_opts = [ kombu_opts = [
cfg.StrOpt('kombu_ssl_version', cfg.StrOpt('kombu_ssl_version',
@ -50,10 +51,13 @@ kombu_opts = [
'(valid only if SSL enabled)')), '(valid only if SSL enabled)')),
cfg.StrOpt('rabbit_host', cfg.StrOpt('rabbit_host',
default='localhost', default='localhost',
help='the RabbitMQ host'), help='The RabbitMQ broker address where a single node is used'),
cfg.IntOpt('rabbit_port', cfg.IntOpt('rabbit_port',
default=5672, default=5672,
help='the RabbitMQ port'), help='The RabbitMQ broker port where a single node is used'),
cfg.ListOpt('rabbit_hosts',
default=['$rabbit_host:$rabbit_port'],
help='RabbitMQ HA cluster host:port pairs'),
cfg.BoolOpt('rabbit_use_ssl', cfg.BoolOpt('rabbit_use_ssl',
default=False, default=False,
help='connect over SSL for RabbitMQ'), help='connect over SSL for RabbitMQ'),
@ -80,6 +84,11 @@ kombu_opts = [
cfg.BoolOpt('rabbit_durable_queues', cfg.BoolOpt('rabbit_durable_queues',
default=False, default=False,
help='use durable queues in RabbitMQ'), help='use durable queues in RabbitMQ'),
cfg.BoolOpt('rabbit_ha_queues',
default=False,
help='use H/A queues in RabbitMQ (x-ha-policy: all).'
'You need to wipe RabbitMQ database when '
'changing this option.'),
] ]
@ -88,6 +97,20 @@ cfg.CONF.register_opts(kombu_opts)
LOG = rpc_common.LOG LOG = rpc_common.LOG
def _get_queue_arguments(conf):
"""Construct the arguments for declaring a queue.
If the rabbit_ha_queues option is set, we declare a mirrored queue
as described here:
http://www.rabbitmq.com/ha.html
Setting x-ha-policy to all means that the queue will be mirrored
to all nodes in the cluster.
"""
return {'x-ha-policy': 'all'} if conf.rabbit_ha_queues else {}
class ConsumerBase(object): class ConsumerBase(object):
"""Consumer base class.""" """Consumer base class."""
@ -192,7 +215,7 @@ class TopicConsumer(ConsumerBase):
"""Consumer class for 'topic'""" """Consumer class for 'topic'"""
def __init__(self, conf, channel, topic, callback, tag, name=None, def __init__(self, conf, channel, topic, callback, tag, name=None,
**kwargs): exchange_name=None, **kwargs):
"""Init a 'topic' queue. """Init a 'topic' queue.
:param channel: the amqp channel to use :param channel: the amqp channel to use
@ -207,10 +230,12 @@ class TopicConsumer(ConsumerBase):
""" """
# Default options # Default options
options = {'durable': conf.rabbit_durable_queues, options = {'durable': conf.rabbit_durable_queues,
'queue_arguments': _get_queue_arguments(conf),
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
exchange = kombu.entity.Exchange(name=conf.control_exchange, exchange_name = exchange_name or rpc_amqp.get_control_exchange(conf)
exchange = kombu.entity.Exchange(name=exchange_name,
type='topic', type='topic',
durable=options['durable'], durable=options['durable'],
auto_delete=options['auto_delete']) auto_delete=options['auto_delete'])
@ -307,8 +332,12 @@ class TopicPublisher(Publisher):
'auto_delete': False, 'auto_delete': False,
'exclusive': False} 'exclusive': False}
options.update(kwargs) options.update(kwargs)
super(TopicPublisher, self).__init__(channel, conf.control_exchange, exchange_name = rpc_amqp.get_control_exchange(conf)
topic, type='topic', **options) super(TopicPublisher, self).__init__(channel,
exchange_name,
topic,
type='topic',
**options)
class FanoutPublisher(Publisher): class FanoutPublisher(Publisher):
@ -331,6 +360,7 @@ class NotifyPublisher(TopicPublisher):
def __init__(self, conf, channel, topic, **kwargs): def __init__(self, conf, channel, topic, **kwargs):
self.durable = kwargs.pop('durable', conf.rabbit_durable_queues) self.durable = kwargs.pop('durable', conf.rabbit_durable_queues)
self.queue_arguments = _get_queue_arguments(conf)
super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs) super(NotifyPublisher, self).__init__(conf, channel, topic, **kwargs)
def reconnect(self, channel): def reconnect(self, channel):
@ -343,7 +373,8 @@ class NotifyPublisher(TopicPublisher):
exchange=self.exchange, exchange=self.exchange,
durable=self.durable, durable=self.durable,
name=self.routing_key, name=self.routing_key,
routing_key=self.routing_key) routing_key=self.routing_key,
queue_arguments=self.queue_arguments)
queue.declare() queue.declare()
@ -368,31 +399,37 @@ class Connection(object):
if server_params is None: if server_params is None:
server_params = {} server_params = {}
# Keys to translate from server_params to kombu params # Keys to translate from server_params to kombu params
server_params_to_kombu_params = {'username': 'userid'} server_params_to_kombu_params = {'username': 'userid'}
params = {} ssl_params = self._fetch_ssl_params()
for sp_key, value in server_params.iteritems(): params_list = []
p_key = server_params_to_kombu_params.get(sp_key, sp_key) for adr in self.conf.rabbit_hosts:
params[p_key] = value hostname, port = network_utils.parse_host_port(
adr, default_port=self.conf.rabbit_port)
params.setdefault('hostname', self.conf.rabbit_host) params = {}
params.setdefault('port', self.conf.rabbit_port)
params.setdefault('userid', self.conf.rabbit_userid)
params.setdefault('password', self.conf.rabbit_password)
params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
self.params = params for sp_key, value in server_params.iteritems():
p_key = server_params_to_kombu_params.get(sp_key, sp_key)
params[p_key] = value
if self.conf.fake_rabbit: params.setdefault('hostname', hostname)
self.params['transport'] = 'memory' params.setdefault('port', port)
self.memory_transport = True params.setdefault('userid', self.conf.rabbit_userid)
else: params.setdefault('password', self.conf.rabbit_password)
self.memory_transport = False params.setdefault('virtual_host', self.conf.rabbit_virtual_host)
if self.conf.rabbit_use_ssl: if self.conf.fake_rabbit:
self.params['ssl'] = self._fetch_ssl_params() params['transport'] = 'memory'
if self.conf.rabbit_use_ssl:
params['ssl'] = ssl_params
params_list.append(params)
self.params_list = params_list
self.memory_transport = self.conf.fake_rabbit
self.connection = None self.connection = None
self.reconnect() self.reconnect()
@ -422,14 +459,14 @@ class Connection(object):
# Return the extended behavior # Return the extended behavior
return ssl_params return ssl_params
def _connect(self): def _connect(self, params):
"""Connect to rabbit. Re-establish any queues that may have """Connect to rabbit. Re-establish any queues that may have
been declared before if we are reconnecting. Exceptions should been declared before if we are reconnecting. Exceptions should
be handled by the caller. be handled by the caller.
""" """
if self.connection: if self.connection:
LOG.info(_("Reconnecting to AMQP server on " LOG.info(_("Reconnecting to AMQP server on "
"%(hostname)s:%(port)d") % self.params) "%(hostname)s:%(port)d") % params)
try: try:
self.connection.close() self.connection.close()
except self.connection_errors: except self.connection_errors:
@ -437,7 +474,7 @@ class Connection(object):
# Setting this in case the next statement fails, though # Setting this in case the next statement fails, though
# it shouldn't be doing any network operations, yet. # it shouldn't be doing any network operations, yet.
self.connection = None self.connection = None
self.connection = kombu.connection.BrokerConnection(**self.params) self.connection = kombu.connection.BrokerConnection(**params)
self.connection_errors = self.connection.connection_errors self.connection_errors = self.connection.connection_errors
if self.memory_transport: if self.memory_transport:
# Kludge to speed up tests. # Kludge to speed up tests.
@ -450,8 +487,8 @@ class Connection(object):
self.channel._new_queue('ae.undeliver') self.channel._new_queue('ae.undeliver')
for consumer in self.consumers: for consumer in self.consumers:
consumer.reconnect(self.channel) consumer.reconnect(self.channel)
LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d'), LOG.info(_('Connected to AMQP server on %(hostname)s:%(port)d') %
self.params) params)
def reconnect(self): def reconnect(self):
"""Handles reconnecting and re-establishing queues. """Handles reconnecting and re-establishing queues.
@ -464,11 +501,12 @@ class Connection(object):
attempt = 0 attempt = 0
while True: while True:
params = self.params_list[attempt % len(self.params_list)]
attempt += 1 attempt += 1
try: try:
self._connect() self._connect(params)
return return
except (self.connection_errors, IOError), e: except (IOError, self.connection_errors) as e:
pass pass
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
@ -483,12 +521,12 @@ class Connection(object):
log_info = {} log_info = {}
log_info['err_str'] = str(e) log_info['err_str'] = str(e)
log_info['max_retries'] = self.max_retries log_info['max_retries'] = self.max_retries
log_info.update(self.params) log_info.update(params)
if self.max_retries and attempt == self.max_retries: if self.max_retries and attempt == self.max_retries:
LOG.exception(_('Unable to connect to AMQP server on ' LOG.error(_('Unable to connect to AMQP server on '
'%(hostname)s:%(port)d after %(max_retries)d ' '%(hostname)s:%(port)d after %(max_retries)d '
'tries: %(err_str)s') % log_info) 'tries: %(err_str)s') % log_info)
# NOTE(comstud): Copied from original code. There's # NOTE(comstud): Copied from original code. There's
# really no better recourse because if this was a queue we # really no better recourse because if this was a queue we
# need to consume on, we have no way to consume anymore. # need to consume on, we have no way to consume anymore.
@ -502,9 +540,9 @@ class Connection(object):
sleep_time = min(sleep_time, self.interval_max) sleep_time = min(sleep_time, self.interval_max)
log_info['sleep_time'] = sleep_time log_info['sleep_time'] = sleep_time
LOG.exception(_('AMQP server on %(hostname)s:%(port)d is' LOG.error(_('AMQP server on %(hostname)s:%(port)d is '
' unreachable: %(err_str)s. Trying again in ' 'unreachable: %(err_str)s. Trying again in '
'%(sleep_time)d seconds.') % log_info) '%(sleep_time)d seconds.') % log_info)
time.sleep(sleep_time) time.sleep(sleep_time)
def ensure(self, error_callback, method, *args, **kwargs): def ensure(self, error_callback, method, *args, **kwargs):
@ -512,7 +550,8 @@ class Connection(object):
try: try:
return method(*args, **kwargs) return method(*args, **kwargs)
except (self.connection_errors, socket.timeout, IOError), e: except (self.connection_errors, socket.timeout, IOError), e:
pass if error_callback:
error_callback(e)
except Exception, e: except Exception, e:
# NOTE(comstud): Unfortunately it's possible for amqplib # NOTE(comstud): Unfortunately it's possible for amqplib
# to return an error not covered by its transport # to return an error not covered by its transport
@ -522,8 +561,8 @@ class Connection(object):
# and try to reconnect in this case. # and try to reconnect in this case.
if 'timeout' not in str(e): if 'timeout' not in str(e):
raise raise
if error_callback: if error_callback:
error_callback(e) error_callback(e)
self.reconnect() self.reconnect()
def get_channel(self): def get_channel(self):
@ -625,10 +664,12 @@ class Connection(object):
""" """
self.declare_consumer(DirectConsumer, topic, callback) self.declare_consumer(DirectConsumer, topic, callback)
def declare_topic_consumer(self, topic, callback=None, queue_name=None): def declare_topic_consumer(self, topic, callback=None, queue_name=None,
exchange_name=None):
"""Create a 'topic' consumer.""" """Create a 'topic' consumer."""
self.declare_consumer(functools.partial(TopicConsumer, self.declare_consumer(functools.partial(TopicConsumer,
name=queue_name, name=queue_name,
exchange_name=exchange_name,
), ),
topic, callback) topic, callback)
@ -749,3 +790,4 @@ def notify(conf, context, topic, msg):
def cleanup(): def cleanup():
return rpc_amqp.cleanup(Connection.pool) return rpc_amqp.cleanup(Connection.pool)