High Availibilty of Music

Added capability of reading and writing to
one of the other 2 valet Music databases
incase the local database is down. The
sequence of fallback is read from the valet
configuration file

Change-Id: I0de371996c63f3d1fb386e54cf4e239b464d870a
This commit is contained in:
sg774j 2017-04-06 15:38:59 -05:00 committed by Omar Rivera
parent faee9727ab
commit 76ab80040a
8 changed files with 64 additions and 43 deletions

View File

@ -72,8 +72,9 @@ identity = {
}
music = {
'host': CONF.music.host,
'hosts': CONF.music.hosts,
'port': CONF.music.port,
'keyspace': CONF.music.keyspace,
'replication_factor': CONF.music.replication_factor,
'music_server_retries': CONF.music.music_server_retries,
}

View File

@ -18,9 +18,10 @@ password=identity_password
auth_url=auth_uri
[music]
host=music_host
hosts=music_host
port=music_port
keyspace=db_keyspace
music_server_retries=3
[engine]
datacenter_name=Region1

View File

@ -20,6 +20,7 @@ import inspect
from pecan import conf
import six
import uuid
from valet import api
from valet.api.common.i18n import _
from valet.common.music import Music
@ -287,9 +288,11 @@ def _engine_from_config(configuration):
"""Create database engine object based on configuration"""
configuration = dict(configuration)
kwargs = {
'host': configuration.get('host'),
'hosts': configuration.get('hosts'),
'port': configuration.get('port'),
'replication_factor': configuration.get('replication_factor'),
'music_server_retries': configuration.get('music_server_retries'),
'logger': api.LOG,
}
return Music(**kwargs)

View File

@ -56,7 +56,7 @@ identity_opts = [
music_group = cfg.OptGroup(name='music', title='Valet Persistence conf')
music_opts = [
cfg.StrOpt('host', default='0.0.0.0'),
cfg.ListOpt('hosts', default=['0.0.0.0']),
cfg.IntOpt('port', default=8080),
cfg.StrOpt('keyspace', default='valet'),
cfg.IntOpt('replication_factor', default=3),
@ -70,8 +70,7 @@ music_opts = [
cfg.StrOpt('resource_index_table', default='resource_log_index'),
cfg.StrOpt('app_index_table', default='app_log_index'),
cfg.StrOpt('uuid_table', default='uuid_map'),
cfg.StrOpt('db_host', default='localhost'),
# cfg.ListOpt('db_hosts', default='valet1,valet2,valet3')
cfg.IntOpt('music_server_retries', default=3),
]
def load_conf(args=None, project=DOMAIN, default_files=None):

View File

@ -19,9 +19,6 @@ import json
import requests
import time
from valet.api.common.i18n import _
from valet.common.conf import get_logger
LOG = get_logger("music")
class REST(object):
@ -34,19 +31,22 @@ class REST(object):
_urls = None
def __init__(self, hosts, port, path='/', timeout='10'):
def __init__(self, hosts, port, path='/', timeout='10', music_server_retries=3, logger=None):
"""Initializer. Accepts target host list, port, and path."""
self.hosts = hosts # List of IP or FQDNs
self.port = port # Port Number
self.path = path # Path starting with /
self.timeout = float(timeout) # REST request timeout in seconds
# Retires before failiing over to next Music server.
self.music_server_retries = music_server_retries
self.logger = logger # For logging
@property
def urls(self):
"""Return list of URLs using each host, plus the port/path."""
if not self._urls:
# make localhost as first option
urls = ['http://localhost:%s%s' % (self.port, self.path)]
urls = []
for host in self.hosts:
# Must end without a slash
urls.append('http://%(host)s:%(port)s%(path)s' % {
@ -78,28 +78,31 @@ class REST(object):
for url in self.urls:
# Try each url in turn. First one to succeed wins.
full_url = url + path
try:
data_json = json.dumps(data) if data else None
LOG.debug("Music Request: %s %s%s", method.upper(), full_url, data_json if data else '')
response = method_fn(full_url, data=data_json, headers=self.__headers(content_type), timeout=self.timeout)
response.raise_for_status()
data_json = json.dumps(data) if data else None
for attempt in range(self.music_server_retries):
# Ignore the previous exception.
try:
response = method_fn(full_url, data=data_json, headers=self.__headers(content_type), timeout=self.timeout)
response.raise_for_status()
return response
except requests.exceptions.Timeout as err:
response = requests.Response()
response.status_code = 408
response.url = full_url
LOG.debug("Music: %s", err.message)
except requests.exceptions.RequestException as err:
response = requests.Response()
response.status_code = 400
response.url = full_url
LOG.debug("Music: %s", err.message)
return response
except requests.exceptions.Timeout as err:
response = requests.Response()
response.status_code = 408
response.url = full_url
if self.logger:
self.logger.debug("Music: %s Method: %s Full Url: %s", err.message, method.upper(), full_url)
except requests.exceptions.RequestException as err:
response = requests.Response()
response.status_code = 400
response.url = full_url
if self.logger:
self.logger.debug("Music: %s Method: %s Full Url: %s", err.message, method.upper(), full_url)
# If we get here, an exception was raised for every url,
# but we passed so we could try each endpoint. Raise status
# for the last attempt (for now) so that we report something.
if response:
if response is not None:
response.raise_for_status()
@ -112,19 +115,22 @@ class Music(object):
rest = None # API Endpoint
replication_factor = None # Number of Music nodes to replicate across
def __init__(self, host=None, hosts=None, # pylint: disable=R0913
port='8080', lock_timeout=10, replication_factor=3):
def __init__(self, hosts=None, # pylint: disable=R0913
port='8080', lock_timeout=10, replication_factor=3,
music_server_retries=3, logger=None):
"""Initializer. Accept a lock_timeout for atomic operations."""
# If one host is provided, that overrides the list
if not hosts:
hosts = ['localhost']
if host:
hosts = [host]
if logger:
logger.error("No Music Hosts provided.")
kwargs = {
'hosts': hosts,
'port': port,
'path': '/MUSIC/rest',
'music_server_retries': music_server_retries,
'logger': logger,
}
self.rest = REST(**kwargs)
@ -132,6 +138,8 @@ class Music(object):
self.lock_timeout = lock_timeout
self.replication_factor = replication_factor
self.logger = logger
self.music_server_retries = music_server_retries
def create_keyspace(self, keyspace):
"""Create a keyspace."""

View File

@ -53,9 +53,11 @@ class ListenerManager(threading.Thread):
if self.config.events_listener.store:
kwargs = {
'host': self.config.music.host,
'hosts': self.config.music.hosts,
'port': self.config.music.port,
'replication_factor': self.config.music.replication_factor,
'music_server_retries': self.config.music.music_server_retries,
'logger': self.listener_logger,
}
engine = Music(**kwargs)
engine.create_keyspace(self.config.music.keyspace)

View File

@ -38,10 +38,12 @@ class MusicHandler(object):
self.config = _config
self.logger = _logger
self.music = Music(host=self.config.db_host, port=self.config.db_port,
replication_factor=self.config.replication_factor)
if self.config.db_host is not None:
self.logger.info("DB: music host = " + self.config.db_host)
self.music = Music(hosts=self.config.hosts, port=self.config.port,
replication_factor=self.config.replication_factor,
music_server_retries=self.config.music_server_retries,
logger=self.logger)
if self.config.hosts is not None:
self.logger.info("DB: music host = %s", self.config.hosts)
if self.config.replication_factor is not None:
self.logger.info("DB: music replication factor = " + str(self.config.replication_factor))

View File

@ -52,8 +52,8 @@ class Config(object):
self.db_app_table = None
self.db_uuid_table = None
self.replication_factor = 3
self.db_host = 'localhost'
self.db_port = 8080
self.hosts = ['localhost']
self.port = 8080
self.ip = None
@ -115,6 +115,9 @@ class Config(object):
self.base_flavor_mem = 0
self.base_flavor_disk = 0
# Music HA paramater
self.music_server_retries = 3
def configure(self):
"""Store config info extracted from oslo."""
status = self._init_system()
@ -201,9 +204,11 @@ class Config(object):
self.replication_factor = CONF.music.replication_factor
self.db_host = CONF.music.host
self.hosts = CONF.music.hosts
self.db_port = CONF.music.port
self.port = CONF.music.port
self.music_server_retries = CONF.music.music_server_retries
self.ip = CONF.engine.ip