[zmq] Added redis sentinel HA implementation to zmq driver
List of redis sentinel hosts is now supported in order to use automatic failover when redis master goes down. Change-Id: I5fad4c9b6c6aea4f8f382f7469899a7d05c068c1 Closes-Bug: #1518292
This commit is contained in:
parent
3de5b6f52e
commit
6f6a0ae5bc
@ -11,6 +11,7 @@
|
|||||||
# License for the specific language governing permissions and limitations
|
# License for the specific language governing permissions and limitations
|
||||||
# under the License.
|
# under the License.
|
||||||
|
|
||||||
|
import inspect
|
||||||
import logging
|
import logging
|
||||||
|
|
||||||
from oslo_config import cfg
|
from oslo_config import cfg
|
||||||
@ -18,8 +19,10 @@ from oslo_utils import importutils
|
|||||||
|
|
||||||
from oslo_messaging._drivers.zmq_driver.matchmaker import base
|
from oslo_messaging._drivers.zmq_driver.matchmaker import base
|
||||||
from oslo_messaging._drivers.zmq_driver import zmq_address
|
from oslo_messaging._drivers.zmq_driver import zmq_address
|
||||||
|
from retrying import retry
|
||||||
|
|
||||||
redis = importutils.try_import('redis')
|
redis = importutils.try_import('redis')
|
||||||
|
redis_sentinel = importutils.try_import('redis.sentinel')
|
||||||
LOG = logging.getLogger(__name__)
|
LOG = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
@ -34,22 +37,74 @@ matchmaker_redis_opts = [
|
|||||||
default='',
|
default='',
|
||||||
secret=True,
|
secret=True,
|
||||||
help='Password for Redis server (optional).'),
|
help='Password for Redis server (optional).'),
|
||||||
|
cfg.ListOpt('sentinel_hosts',
|
||||||
|
default=[],
|
||||||
|
help='List of Redis Sentinel hosts (fault tolerance mode) e.g.\
|
||||||
|
[host:port, host1:port ... ]'),
|
||||||
|
cfg.StrOpt('sentinel_group_name',
|
||||||
|
default='oslo-messaging-zeromq',
|
||||||
|
help='Redis replica set name.'),
|
||||||
|
cfg.IntOpt('wait_timeout',
|
||||||
|
default=500,
|
||||||
|
help='Time in ms to wait between connection attempts.'),
|
||||||
|
cfg.IntOpt('check_timeout',
|
||||||
|
default=20000,
|
||||||
|
help='Time in ms to wait before the transaction is killed.'),
|
||||||
|
cfg.IntOpt('socket_timeout',
|
||||||
|
default=1000,
|
||||||
|
help='Timeout in ms on blocking socket operations'),
|
||||||
]
|
]
|
||||||
|
|
||||||
_PUBLISHERS_KEY = "PUBLISHERS"
|
_PUBLISHERS_KEY = "PUBLISHERS"
|
||||||
|
|
||||||
|
|
||||||
|
def retry_if_connection_error(ex):
|
||||||
|
return isinstance(ex, redis.ConnectionError)
|
||||||
|
|
||||||
|
|
||||||
|
def apply_retrying(obj, cfg):
|
||||||
|
for attr_name, attr in inspect.getmembers(obj):
|
||||||
|
if not (inspect.ismethod(attr) or inspect.isfunction(attr)):
|
||||||
|
continue
|
||||||
|
if attr_name.startswith("_"):
|
||||||
|
continue
|
||||||
|
setattr(
|
||||||
|
obj,
|
||||||
|
attr_name,
|
||||||
|
retry(
|
||||||
|
wait_fixed=cfg.matchmaker_redis.wait_timeout,
|
||||||
|
stop_max_delay=cfg.matchmaker_redis.check_timeout,
|
||||||
|
retry_on_exception=retry_if_connection_error
|
||||||
|
)(attr))
|
||||||
|
|
||||||
|
|
||||||
class RedisMatchMaker(base.MatchMakerBase):
|
class RedisMatchMaker(base.MatchMakerBase):
|
||||||
|
|
||||||
def __init__(self, conf, *args, **kwargs):
|
def __init__(self, conf, *args, **kwargs):
|
||||||
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
|
super(RedisMatchMaker, self).__init__(conf, *args, **kwargs)
|
||||||
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
|
self.conf.register_opts(matchmaker_redis_opts, "matchmaker_redis")
|
||||||
|
|
||||||
self._redis = redis.StrictRedis(
|
if not self.conf.matchmaker_redis.sentinel_hosts:
|
||||||
host=self.conf.matchmaker_redis.host,
|
self._redis = redis.StrictRedis(
|
||||||
port=self.conf.matchmaker_redis.port,
|
host=self.conf.matchmaker_redis.host,
|
||||||
password=self.conf.matchmaker_redis.password,
|
port=self.conf.matchmaker_redis.port,
|
||||||
)
|
password=self.conf.matchmaker_redis.password,
|
||||||
|
)
|
||||||
|
else:
|
||||||
|
socket_timeout = self.conf.matchmaker_redis.socket_timeout / 1000.
|
||||||
|
s = self.conf.matchmaker_redis.sentinel_hosts
|
||||||
|
sentinel_hosts = [tuple(i.split(":")) for i in s]
|
||||||
|
sentinel = redis.sentinel.Sentinel(
|
||||||
|
sentinels=sentinel_hosts,
|
||||||
|
socket_timeout=socket_timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
self._redis = sentinel.master_for(
|
||||||
|
self.conf.matchmaker_redis.sentinel_group_name,
|
||||||
|
socket_timeout=socket_timeout
|
||||||
|
)
|
||||||
|
|
||||||
|
apply_retrying(self, self.conf)
|
||||||
|
|
||||||
def register_publisher(self, hostname):
|
def register_publisher(self, hostname):
|
||||||
host_str = ",".join(hostname)
|
host_str = ",".join(hostname)
|
||||||
|
@ -17,6 +17,7 @@ oslotest>=1.10.0 # Apache-2.0
|
|||||||
|
|
||||||
# for test_matchmaker_redis
|
# for test_matchmaker_redis
|
||||||
redis>=2.10.0
|
redis>=2.10.0
|
||||||
|
retrying>=1.2.3,!=1.3.0 # Apache-2.0
|
||||||
|
|
||||||
# for test_impl_zmq
|
# for test_impl_zmq
|
||||||
pyzmq>=14.3.1 # LGPL+BSD
|
pyzmq>=14.3.1 # LGPL+BSD
|
||||||
|
Loading…
x
Reference in New Issue
Block a user