Mariadb: Support adoption of running single node mariadb deployment

This PS updates the mariadb chart to both support adoption of a
single instance of mariadb running the bash driven chart, which
did not support reforming a galera cluster by tracking state using
a configmap. Additionally basic logic is added for upgrading the
database as part of the normal rolling update flow.

Change-Id: I412de507112b38d6d2534e89f2a02f84bef3da63
Signed-off-by: Pete Birley <pete@port.direct>
This commit is contained in:
Pete Birley 2018-12-01 18:52:39 -06:00
parent 5316586d9e
commit 896385354e
2 changed files with 123 additions and 53 deletions

View File

@ -48,6 +48,10 @@ logger.addHandler(ch)
local_hostname = socket.gethostname() local_hostname = socket.gethostname()
logger.info("This instance hostname: {0}".format(local_hostname)) logger.info("This instance hostname: {0}".format(local_hostname))
# Get the instance number
instance_number = local_hostname.split("-")[-1]
logger.info("This instance number: {0}".format(instance_number))
# Setup k8s client credentials and check api version # Setup k8s client credentials and check api version
kubernetes.config.load_incluster_config() kubernetes.config.load_incluster_config()
kubernetes_version = kubernetes.client.VersionApi().get_code().git_version kubernetes_version = kubernetes.client.VersionApi().get_code().git_version
@ -109,6 +113,7 @@ def ensure_state_configmap(pod_namespace, configmap_name, configmap_body):
except: except:
k8s_api_instance.create_namespaced_config_map( k8s_api_instance.create_namespaced_config_map(
namespace=pod_namespace, body=configmap_body) namespace=pod_namespace, body=configmap_body)
return False return False
@ -351,13 +356,36 @@ def get_cluster_state():
except: except:
logger.info("The cluster configmap \"{0}\" does not exist.".format( logger.info("The cluster configmap \"{0}\" does not exist.".format(
state_configmap_name)) state_configmap_name))
time.sleep(default_sleep)
leader_expiry_raw = datetime.utcnow() + timedelta(
seconds=cluster_leader_ttl)
leader_expiry = "{0}Z".format(leader_expiry_raw.isoformat("T"))
if check_for_active_nodes():
# NOTE(portdirect): here we make the assumption that the 1st pod
# in an existing statefulset is the one to adopt as leader.
leader = "{0}-0".format("-".join(
local_hostname.split("-")[:-1]))
state = "live"
logger.info(
"The cluster is running already though unmanaged \"{0}\" will be declared leader in a \"{1}\" state".
format(leader, state))
else:
leader = local_hostname
state = "new"
logger.info(
"The cluster is new \"{0}\" will be declared leader in a \"{1}\" state".
format(leader, state))
initial_configmap_body = { initial_configmap_body = {
"apiVersion": "v1", "apiVersion": "v1",
"kind": "ConfigMap", "kind": "ConfigMap",
"metadata": { "metadata": {
"name": state_configmap_name, "name": state_configmap_name,
"annotations": { "annotations": {
"openstackhelm.openstack.org/cluster.state": "new" "openstackhelm.openstack.org/cluster.state": state,
"openstackhelm.openstack.org/leader.node": leader,
"openstackhelm.openstack.org/leader.expiry":
leader_expiry
} }
}, },
"data": {} "data": {}
@ -369,14 +397,11 @@ def get_cluster_state():
return state return state
def declare_myself_cluser_leader(ttl): def declare_myself_cluser_leader():
"""Declare the current pod as the cluster leader. """Declare the current pod as the cluster leader."""
Keyword arguments:
ttl -- the ttl for the leader period
"""
logger.info("Declaring myself current cluster leader") logger.info("Declaring myself current cluster leader")
leader_expiry_raw = datetime.utcnow() + timedelta(seconds=120) leader_expiry_raw = datetime.utcnow() + timedelta(
seconds=cluster_leader_ttl)
leader_expiry = "{0}Z".format(leader_expiry_raw.isoformat("T")) leader_expiry = "{0}Z".format(leader_expiry_raw.isoformat("T"))
set_configmap_annotation( set_configmap_annotation(
key='openstackhelm.openstack.org/leader.node', value=local_hostname) key='openstackhelm.openstack.org/leader.node', value=local_hostname)
@ -393,10 +418,10 @@ def deadmans_leader_election():
if iso8601.parse_date(leader_expiry).replace( if iso8601.parse_date(leader_expiry).replace(
tzinfo=None) < datetime.utcnow().replace(tzinfo=None): tzinfo=None) < datetime.utcnow().replace(tzinfo=None):
logger.info("Current cluster leader has expired") logger.info("Current cluster leader has expired")
declare_myself_cluser_leader(ttl=cluster_leader_ttl) declare_myself_cluser_leader()
elif local_hostname == leader_node: elif local_hostname == leader_node:
logger.info("Renewing cluster leader lease") logger.info("Renewing cluster leader lease")
declare_myself_cluser_leader(ttl=cluster_leader_ttl) declare_myself_cluser_leader()
def get_grastate_val(key): def get_grastate_val(key):
@ -452,31 +477,55 @@ def update_grastate_configmap():
def update_grastate_on_restart(): def update_grastate_on_restart():
"""Update the grastate.dat on node restart.""" """Update the grastate.dat on node restart."""
logger.info("Updating grastate info for node") logger.info("Updating grastate info for node")
if get_grastate_val(key='seqno') == '-1': if os.path.exists('/var/lib/mysql/grastate.dat'):
logger.info( if get_grastate_val(key='seqno') == '-1':
"Node shutdown was not clean, getting position via wsrep-recover") logger.info(
"Node shutdown was not clean, getting position via wsrep-recover"
)
def recover_wsrep_position(): def recover_wsrep_position():
"""Extract recoved wsrep position from uncleanly exited node.""" """Extract recoved wsrep position from uncleanly exited node."""
wsrep_recover = subprocess.Popen( wsrep_recover = subprocess.Popen(
[ [
'mysqld', '--bind-address=127.0.0.1', 'mysqld', '--bind-address=127.0.0.1',
'--wsrep_cluster_address=gcomm://', '--wsrep-recover' '--wsrep_cluster_address=gcomm://', '--wsrep-recover'
], ],
stdout=subprocess.PIPE, stdout=subprocess.PIPE,
stderr=subprocess.PIPE) stderr=subprocess.PIPE)
out, err = wsrep_recover.communicate() out, err = wsrep_recover.communicate()
for item in err.split("\n"): for item in err.split("\n"):
if "WSREP: Recovered position:" in item: if "WSREP: Recovered position:" in item:
line = item.strip().split() line = item.strip().split()
wsrep_rec_pos = line[-1].split(':')[-1] wsrep_rec_pos = line[-1].split(':')[-1]
return wsrep_rec_pos return wsrep_rec_pos
set_grastate_val(key='seqno', value=recover_wsrep_position())
else:
logger.info("Node shutdown was clean, using grastate.dat")
update_grastate_configmap()
set_grastate_val(key='seqno', value=recover_wsrep_position())
else: else:
logger.info("Node shutdown was clean, using grastate.dat") logger.info("No grastate.dat exists I am a new node")
update_grastate_configmap()
def get_active_endpoints(endpoints_name=direct_svc_name,
namespace=pod_namespace):
"""Returns a list of active endpoints.
Keyword arguments:
endpoints_name -- endpoints to check for active backends
(default direct_svc_name)
namespace -- namespace to check for endpoints (default pod_namespace)
"""
endpoints = k8s_api_instance.read_namespaced_endpoints(
name=endpoints_name, namespace=pod_namespace)
endpoints_dict = endpoints.to_dict()
addresses_index = [
i for i, s in enumerate(endpoints_dict['subsets']) if 'addresses' in s
][0]
active_endpoints = endpoints_dict['subsets'][addresses_index]['addresses']
return active_endpoints
def check_for_active_nodes(endpoints_name=direct_svc_name, def check_for_active_nodes(endpoints_name=direct_svc_name,
@ -489,13 +538,7 @@ def check_for_active_nodes(endpoints_name=direct_svc_name,
namespace -- namespace to check for endpoints (default pod_namespace) namespace -- namespace to check for endpoints (default pod_namespace)
""" """
logger.info("Checking for active nodes") logger.info("Checking for active nodes")
endpoints = k8s_api_instance.read_namespaced_endpoints( active_endpoints = get_active_endpoints()
name=endpoints_name, namespace=pod_namespace)
endpoints_dict = endpoints.to_dict()
addresses_index = [
i for i, s in enumerate(endpoints_dict['subsets']) if 'addresses' in s
][0]
active_endpoints = endpoints_dict['subsets'][addresses_index]['addresses']
if active_endpoints and len(active_endpoints) >= 1: if active_endpoints and len(active_endpoints) >= 1:
return True return True
else: else:
@ -608,7 +651,11 @@ def launch_leader_election():
def run_mysqld(cluster='existing'): def run_mysqld(cluster='existing'):
"""Launch the mysqld instance for the pod. """Launch the mysqld instance for the pod. This will also run mysql upgrade
if we are the 1st replica, and the rest of the cluster is already running.
This senario will be triggerd either following a rolling update, as this
works in reverse order for statefulset. Or restart of the 1st instance, in
which case the comand should be a no-op.
Keyword arguments: Keyword arguments:
cluster -- whether we going to form a cluster 'new' or joining an existing cluster -- whether we going to form a cluster 'new' or joining an existing
@ -621,18 +668,28 @@ def run_mysqld(cluster='existing'):
mysqld_cmd = ['mysqld'] mysqld_cmd = ['mysqld']
if cluster == 'new': if cluster == 'new':
mysqld_cmd.append('--wsrep-new-cluster') mysqld_cmd.append('--wsrep-new-cluster')
else:
if int(instance_number) == 0:
active_endpoints = get_active_endpoints()
if active_endpoints and len(active_endpoints) == (
int(mariadb_replicas) - 1):
run_cmd_with_logging([
'mysql_upgrade',
'--defaults-file=/etc/mysql/admin_user.cnf'
], logger)
run_cmd_with_logging(mysqld_cmd, logger) run_cmd_with_logging(mysqld_cmd, logger)
def mysqld_reboot(): def mysqld_reboot():
"""Reboot a mysqld cluster.""" """Reboot a mysqld cluster."""
declare_myself_cluser_leader(ttl=cluster_leader_ttl) declare_myself_cluser_leader()
set_grastate_val(key='safe_to_bootstrap', value='1') set_grastate_val(key='safe_to_bootstrap', value='1')
run_mysqld(cluster='new') run_mysqld(cluster='new')
def sigterm_shutdown(x, y): def sigterm_shutdown(x, y):
"""Shutdown the instnace of mysqld on shutdown signal.""" """Shutdown the instance of mysqld on shutdown signal."""
logger.info("Got a sigterm from the container runtime, time to go.") logger.info("Got a sigterm from the container runtime, time to go.")
stop_mysqld() stop_mysqld()
@ -642,15 +699,26 @@ signal.signal(signal.SIGTERM, sigterm_shutdown)
# Main logic loop # Main logic loop
if get_cluster_state() == 'new': if get_cluster_state() == 'new':
set_configmap_annotation( leader_node = get_configmap_value(
key='openstackhelm.openstack.org/cluster.state', value='init') type='annotation', key='openstackhelm.openstack.org/leader.node')
declare_myself_cluser_leader(ttl=cluster_leader_ttl) if leader_node == local_hostname:
launch_leader_election() set_configmap_annotation(
mysqld_bootstrap() key='openstackhelm.openstack.org/cluster.state', value='init')
update_grastate_configmap() declare_myself_cluser_leader()
set_configmap_annotation( launch_leader_election()
key='openstackhelm.openstack.org/cluster.state', value='live') mysqld_bootstrap()
run_mysqld(cluster='new') update_grastate_configmap()
set_configmap_annotation(
key='openstackhelm.openstack.org/cluster.state', value='live')
run_mysqld(cluster='new')
else:
logger.info("Waiting for cluster to start running")
while not get_cluster_state() == 'live':
time.sleep(default_sleep)
while not check_for_active_nodes():
time.sleep(default_sleep)
launch_leader_election()
run_mysqld()
elif get_cluster_state() == 'init': elif get_cluster_state() == 'init':
logger.info("Waiting for cluster to start running") logger.info("Waiting for cluster to start running")
while not get_cluster_state() == 'live': while not get_cluster_state() == 'live':

View File

@ -21,7 +21,7 @@ collation_server=utf8_unicode_ci
skip-character-set-client-handshake skip-character-set-client-handshake
# Logging # Logging
slow_query_log=on slow_query_log=off
slow_query_log_file=/var/log/mysql/mariadb-slow.log slow_query_log_file=/var/log/mysql/mariadb-slow.log
log_warnings=2 log_warnings=2
@ -75,9 +75,11 @@ table_definition_cache=1024
# TODO(tomasz.paszkowski): This needs to by dynamic based on available RAM. # TODO(tomasz.paszkowski): This needs to by dynamic based on available RAM.
innodb_buffer_pool_size=1024M innodb_buffer_pool_size=1024M
innodb_doublewrite=0 innodb_doublewrite=0
innodb_file_format=Barracuda
innodb_file_per_table=1 innodb_file_per_table=1
innodb_flush_method=O_DIRECT innodb_flush_method=O_DIRECT
innodb_io_capacity=500 innodb_io_capacity=500
innodb_locks_unsafe_for_binlog=1
innodb_log_file_size=128M innodb_log_file_size=128M
innodb_old_blocks_time=1000 innodb_old_blocks_time=1000
innodb_read_io_threads=8 innodb_read_io_threads=8
@ -93,9 +95,9 @@ wsrep_on=1
wsrep_provider=/usr/lib/galera/libgalera_smm.so wsrep_provider=/usr/lib/galera/libgalera_smm.so
wsrep_provider_options="gmcast.listen_addr=tcp://0.0.0.0:{{ tuple "oslo_db" "direct" "wsrep" . | include "helm-toolkit.endpoints.endpoint_port_lookup" }}" wsrep_provider_options="gmcast.listen_addr=tcp://0.0.0.0:{{ tuple "oslo_db" "direct" "wsrep" . | include "helm-toolkit.endpoints.endpoint_port_lookup" }}"
wsrep_slave_threads=12 wsrep_slave_threads=12
# FIX_ME(portdirect): https://mariadb.com/kb/en/library/mariabackup-overview/#granting-privileges-for-ssts
wsrep_sst_auth=root:{{ .Values.endpoints.oslo_db.auth.admin.password }} wsrep_sst_auth=root:{{ .Values.endpoints.oslo_db.auth.admin.password }}
wsrep_sst_method=mariabackup # FIXME(portdirect): use rsync for compatibility between image variations
wsrep_sst_method=rsync
[mysqldump] [mysqldump]
max-allowed-packet=16M max-allowed-packet=16M