Add support for redis-sentinel
This patch adds the ability to configure Sentinel support in zaqar.conf. The code has been tested using one and two sentinels managing one master and one slave. Failovers from master to slave were intiated and it was verified that clients can continue to query the service with no downtime in the case of a lost Redis node. Co-Authored-By: Kurt Griffiths <kurt.griffiths@rackspace.com> Closes-Bug: 1367020 Change-Id: Idcaaab3ba17c84a31b918ad8376f1d5fc105b53a
This commit is contained in:
parent
b386d6d078
commit
9fb04ba99a
@ -20,6 +20,7 @@ import mock
|
||||
from oslo.utils import timeutils
|
||||
import redis
|
||||
|
||||
from zaqar.common import errors
|
||||
from zaqar.openstack.common.cache import cache as oslo_cache
|
||||
from zaqar.queues import storage
|
||||
from zaqar.queues.storage.redis import controllers
|
||||
@ -190,6 +191,88 @@ class RedisDriverTest(testing.TestBase):
|
||||
except RuntimeError:
|
||||
self.fail('version match failed')
|
||||
|
||||
def test_connection_url_invalid(self):
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'red://example.com')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis://')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis://example.com:not_an_integer')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis://s1:not_an_integer,s2?master=obi-wan')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis://s1,s2')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis:')
|
||||
|
||||
self.assertRaises(errors.ConfigurationError,
|
||||
driver.ConnectionURI,
|
||||
'redis:')
|
||||
|
||||
def test_connection_url_tcp(self):
|
||||
uri = driver.ConnectionURI('redis://example.com')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_TCP)
|
||||
self.assertEqual(uri.port, 6379)
|
||||
self.assertEqual(uri.socket_timeout, 0.1)
|
||||
|
||||
uri = driver.ConnectionURI('redis://example.com:7777')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_TCP)
|
||||
self.assertEqual(uri.port, 7777)
|
||||
|
||||
uri = driver.ConnectionURI(
|
||||
'redis://example.com:7777?socket_timeout=1')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_TCP)
|
||||
self.assertEqual(uri.port, 7777)
|
||||
self.assertEqual(uri.socket_timeout, 1.0)
|
||||
|
||||
def test_connection_uri_unix_socket(self):
|
||||
uri = driver.ConnectionURI('redis:/tmp/redis.sock')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_UNIX)
|
||||
self.assertEqual(uri.unix_socket_path, '/tmp/redis.sock')
|
||||
self.assertEqual(uri.socket_timeout, 0.1)
|
||||
|
||||
uri = driver.ConnectionURI('redis:/tmp/redis.sock?socket_timeout=1.5')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_UNIX)
|
||||
self.assertEqual(uri.unix_socket_path, '/tmp/redis.sock')
|
||||
self.assertEqual(uri.socket_timeout, 1.5)
|
||||
|
||||
def test_connection_uri_sentinel(self):
|
||||
uri = driver.ConnectionURI('redis://s1?master=dumbledore')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_SENTINEL)
|
||||
self.assertEqual(uri.sentinels, [('s1', 26379)])
|
||||
self.assertEqual(uri.master, 'dumbledore')
|
||||
self.assertEqual(uri.socket_timeout, 0.1)
|
||||
|
||||
uri = driver.ConnectionURI('redis://s1,s2?master=dumbledore')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_SENTINEL)
|
||||
self.assertEqual(uri.sentinels, [('s1', 26379), ('s2', 26379)])
|
||||
self.assertEqual(uri.master, 'dumbledore')
|
||||
self.assertEqual(uri.socket_timeout, 0.1)
|
||||
|
||||
uri = driver.ConnectionURI('redis://s1:26389,s1?master=dumbledore')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_SENTINEL)
|
||||
self.assertEqual(uri.sentinels, [('s1', 26389), ('s1', 26379)])
|
||||
self.assertEqual(uri.master, 'dumbledore')
|
||||
self.assertEqual(uri.socket_timeout, 0.1)
|
||||
|
||||
uri = driver.ConnectionURI(
|
||||
'redis://s1?master=dumbledore&socket_timeout=0.5')
|
||||
self.assertEqual(uri.strategy, driver.STRATEGY_SENTINEL)
|
||||
self.assertEqual(uri.sentinels, [('s1', 26379)])
|
||||
self.assertEqual(uri.master, 'dumbledore')
|
||||
self.assertEqual(uri.socket_timeout, 0.5)
|
||||
|
||||
|
||||
@testing.requires_redis
|
||||
class RedisQueuesTest(base.QueueControllerTest):
|
||||
|
@ -24,3 +24,7 @@ class PatternNotFound(Exception):
|
||||
|
||||
class InvalidOperation(Exception):
|
||||
"""Raised when attempted a non existent operation."""
|
||||
|
||||
|
||||
class ConfigurationError(Exception):
|
||||
"""An invalid value was used for a Zaqar configuration option."""
|
||||
|
@ -11,11 +11,12 @@
|
||||
# implied.
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import redis
|
||||
import redis.sentinel
|
||||
from six.moves import urllib
|
||||
|
||||
from zaqar.common import decorators
|
||||
from zaqar.common import errors
|
||||
from zaqar.i18n import _
|
||||
from zaqar.openstack.common import log as logging
|
||||
from zaqar.queues import storage
|
||||
@ -24,17 +25,117 @@ from zaqar.queues.storage.redis import options
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
REDIS_DEFAULT_PORT = 6379
|
||||
SENTINEL_DEFAULT_PORT = 26379
|
||||
DEFAULT_SOCKET_TIMEOUT = 0.1
|
||||
|
||||
STRATEGY_TCP = 1
|
||||
STRATEGY_UNIX = 2
|
||||
STRATEGY_SENTINEL = 3
|
||||
|
||||
|
||||
def _get_redis_client(conf):
|
||||
# TODO(prashanthr_): Add SSL support
|
||||
parsed_url = urllib.parse.urlparse(conf.uri)
|
||||
class ConnectionURI(object):
|
||||
def __init__(self, uri):
|
||||
# TODO(prashanthr_): Add SSL support
|
||||
try:
|
||||
parsed_url = urllib.parse.urlparse(uri)
|
||||
except SyntaxError:
|
||||
raise errors.ConfigurationError(_('Malformed Redis URI'))
|
||||
|
||||
if parsed_url.hostname:
|
||||
port = parsed_url.port or REDIS_DEFAULT_PORT
|
||||
return redis.StrictRedis(host=parsed_url.hostname, port=port)
|
||||
else:
|
||||
return redis.StrictRedis(unix_socket_path=parsed_url.path)
|
||||
if parsed_url.scheme != 'redis':
|
||||
raise errors.ConfigurationError(_('Invalid scheme in Redis URI'))
|
||||
|
||||
# NOTE(kgriffs): Python 2.6 has a bug that causes the
|
||||
# query string to be appended to the path when given a
|
||||
# hostless URL.
|
||||
path = parsed_url.path
|
||||
if '?' in path:
|
||||
path, sep, query = path.partition('?')
|
||||
else:
|
||||
query = parsed_url.query
|
||||
|
||||
query_params = dict(urllib.parse.parse_qsl(query))
|
||||
|
||||
# Generic
|
||||
self.strategy = None
|
||||
self.socket_timeout = float(query_params.get('socket_timeout',
|
||||
DEFAULT_SOCKET_TIMEOUT))
|
||||
|
||||
# TCP
|
||||
self.port = None
|
||||
self.hostname = None
|
||||
|
||||
# UNIX socket
|
||||
self.unix_socket_path = None
|
||||
|
||||
# Sentinel
|
||||
self.master = None
|
||||
self.sentinels = []
|
||||
|
||||
if 'master' in query_params:
|
||||
# NOTE(prashanthr_): Configure redis driver in sentinel mode
|
||||
self.strategy = STRATEGY_SENTINEL
|
||||
self.master = query_params['master']
|
||||
|
||||
# NOTE(kgriffs): Have to parse list of sentinel hosts ourselves
|
||||
# since urllib doesn't support it.
|
||||
for each_host in parsed_url.netloc.split(','):
|
||||
name, sep, port = each_host.partition(':')
|
||||
|
||||
if port:
|
||||
try:
|
||||
port = int(port)
|
||||
except ValueError:
|
||||
msg = _('The Redis configuration URI contains an '
|
||||
'invalid port')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
else:
|
||||
port = SENTINEL_DEFAULT_PORT
|
||||
|
||||
self.sentinels.append((name, port))
|
||||
|
||||
if not self.sentinels:
|
||||
msg = _('The Redis configuration URI does not define any '
|
||||
'sentinel hosts')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
elif parsed_url.netloc:
|
||||
if ',' in parsed_url.netloc:
|
||||
# NOTE(kgriffs): They probably were specifying
|
||||
# a list of sentinel hostnames, but forgot to
|
||||
# add 'master' to the query string.
|
||||
msg = _('The Redis URI specifies multiple sentinel hosts, '
|
||||
'but is missing the "master" query string '
|
||||
'parameter. Please set "master" to the name of '
|
||||
'the Redis master server as specified in the '
|
||||
'sentinel configuration file.')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
self.strategy = STRATEGY_TCP
|
||||
try:
|
||||
self.port = parsed_url.port or REDIS_DEFAULT_PORT
|
||||
except ValueError:
|
||||
msg = _('The Redis configuration URI contains an '
|
||||
'invalid port')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
if not parsed_url.hostname:
|
||||
msg = _('Missing host name in Redis URI')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
self.hostname = parsed_url.hostname
|
||||
|
||||
else:
|
||||
self.strategy = STRATEGY_UNIX
|
||||
|
||||
if not path:
|
||||
msg = _('Missing path in Redis URI')
|
||||
raise errors.ConfigurationError(msg)
|
||||
|
||||
self.unix_socket_path = path
|
||||
|
||||
assert self.strategy in (STRATEGY_TCP, STRATEGY_UNIX,
|
||||
STRATEGY_SENTINEL)
|
||||
|
||||
|
||||
class DataDriver(storage.DataDriverBase):
|
||||
@ -77,7 +178,7 @@ class DataDriver(storage.DataDriverBase):
|
||||
@decorators.lazy_property(write=False)
|
||||
def connection(self):
|
||||
"""Redis client connection instance."""
|
||||
return _get_redis_client(self.redis_conf)
|
||||
return _get_redis_client(self)
|
||||
|
||||
@decorators.lazy_property(write=False)
|
||||
def queue_controller(self):
|
||||
@ -105,7 +206,7 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
@decorators.lazy_property(write=False)
|
||||
def connection(self):
|
||||
"""Redis client connection instance."""
|
||||
return _get_redis_client(self.redis_conf)
|
||||
return _get_redis_client(self)
|
||||
|
||||
@property
|
||||
def pools_controller(self):
|
||||
@ -118,3 +219,28 @@ class ControlDriver(storage.ControlDriverBase):
|
||||
@property
|
||||
def flavors_controller(self):
|
||||
raise NotImplementedError()
|
||||
|
||||
|
||||
def _get_redis_client(driver):
|
||||
conf = driver.redis_conf
|
||||
connection_uri = ConnectionURI(conf.uri)
|
||||
|
||||
if connection_uri.strategy == STRATEGY_SENTINEL:
|
||||
sentinel = redis.sentinel.Sentinel(
|
||||
connection_uri.sentinels,
|
||||
socket_timeout=connection_uri.socket_timeout)
|
||||
|
||||
# NOTE(prashanthr_): The socket_timeout parameter being generic
|
||||
# to all redis connections is inherited from the parameters for
|
||||
# sentinel.
|
||||
return sentinel.master_for(connection_uri.master)
|
||||
|
||||
elif connection_uri.strategy == STRATEGY_TCP:
|
||||
return redis.StrictRedis(
|
||||
host=connection_uri.hostname,
|
||||
port=connection_uri.port,
|
||||
socket_timeout=connection_uri.socket_timeout)
|
||||
else:
|
||||
return redis.StrictRedis(
|
||||
unix_socket_path=connection_uri.unix_socket_path,
|
||||
socket_timeout=connection_uri.socket_timeout)
|
||||
|
@ -19,9 +19,25 @@ from oslo.config import cfg
|
||||
|
||||
REDIS_OPTIONS = (
|
||||
cfg.StrOpt('uri', default="redis://127.0.0.1:6379",
|
||||
help=('Redis Server URI. Can also use a '
|
||||
'socket file based connector. '
|
||||
'Ex: redis:/tmp/redis.sock')),
|
||||
help=('Redis connection URI, taking one of three forms. '
|
||||
'For a direct connection to a Redis server, use '
|
||||
'the form "redis://host[:port][?options]", where '
|
||||
'port defaults to 6379 if not specified. For an '
|
||||
'HA master-slave Redis cluster using Redis Sentinel, '
|
||||
'use the form "redis://host1[:port1]'
|
||||
'[,host2[:port2],...,hostN[:portN]][?options]", '
|
||||
'where each host specified corresponds to an '
|
||||
'instance of redis-sentinel. In this form, the '
|
||||
'name of the Redis master used in the Sentinel '
|
||||
'configuration must be included in the query '
|
||||
'string as "master=<name>". Finally, to connect '
|
||||
'to a local instance of Redis over a unix socket, '
|
||||
'you may use the form '
|
||||
'"redis:/path/to/redis.sock[?options]". In all '
|
||||
'forms, the "socket_timeout" option may be '
|
||||
'specified in the query string. Its value is '
|
||||
'given in seconds. If not provided, '
|
||||
'"socket_timeout" defaults to 0.1 seconds.')),
|
||||
|
||||
cfg.IntOpt('max_reconnect_attempts', default=10,
|
||||
help=('Maximum number of times to retry an operation that '
|
||||
|
@ -157,7 +157,23 @@ def retries_on_connection_error(func):
|
||||
for attempt in range(max_attemps):
|
||||
try:
|
||||
return func(self, *args, **kwargs)
|
||||
|
||||
except redis.exceptions.ConnectionError:
|
||||
# NOTE(kgriffs): redis-py will retry once itself,
|
||||
# but if the command cannot be sent the second time after
|
||||
# disconnecting and reconnecting, the error is raised
|
||||
# and we will catch it here.
|
||||
#
|
||||
# NOTE(kgriffs): When using a sentinel, if a master fails
|
||||
# the initial retry will gracefully fail over to the
|
||||
# new master if the sentinel failover delay is low enough;
|
||||
# if the delay is too long, then redis-py will get a
|
||||
# MasterNotFoundError (a subclass of ConnectionError) on
|
||||
# it's retry, which will then just get raised and caught
|
||||
# here, in which case we will keep retrying until the
|
||||
# sentinel completes the failover and stops raising
|
||||
# MasterNotFoundError.
|
||||
|
||||
ex = sys.exc_info()[1]
|
||||
LOG.warn(_(u'Caught ConnectionError, retrying the '
|
||||
'call to {0}').format(func))
|
||||
|
Loading…
x
Reference in New Issue
Block a user