MariaDB Leader Election Enhancement
This patch set enhances the current MariaDB leader election algorithm by adding a tiebreaker in case multiple nodes have the same sequence number which happens to be the highest sequence number in the MariaDB cluster. The tiebreaker is to pick the node which has the lowest node number in the hostname. This patch also changes the 409 status code reporting inside the safe_update_configmap() function to be a warning instead of an error, as it is indicative of a collision scenario rather than an actual error. Change-Id: Ifdff0250b520bb1972d79de94a491e566ed43997
This commit is contained in:
parent
cf2e776946
commit
8673bdda53
@ -281,8 +281,14 @@ def safe_update_configmap(configmap_dict, configmap_patch):
|
|||||||
body=configmap_patch)
|
body=configmap_patch)
|
||||||
return True
|
return True
|
||||||
except kubernetes.client.rest.ApiException as error:
|
except kubernetes.client.rest.ApiException as error:
|
||||||
logger.error("Failed to set configmap: {0}".format(error))
|
if error.status == 409:
|
||||||
return error
|
# This status code indicates a collision trying to write to the
|
||||||
|
# config map while another instance is also trying the same.
|
||||||
|
logger.warning("Collision writing configmap: {0}".format(error))
|
||||||
|
return True
|
||||||
|
else:
|
||||||
|
logger.error("Failed to set configmap: {0}".format(error))
|
||||||
|
return error
|
||||||
|
|
||||||
|
|
||||||
def set_configmap_annotation(key, value):
|
def set_configmap_annotation(key, value):
|
||||||
@ -400,7 +406,7 @@ def get_cluster_state():
|
|||||||
return state
|
return state
|
||||||
|
|
||||||
|
|
||||||
def declare_myself_cluser_leader():
|
def declare_myself_cluster_leader():
|
||||||
"""Declare the current pod as the cluster leader."""
|
"""Declare the current pod as the cluster leader."""
|
||||||
logger.info("Declaring myself current cluster leader")
|
logger.info("Declaring myself current cluster leader")
|
||||||
leader_expiry_raw = datetime.utcnow() + timedelta(
|
leader_expiry_raw = datetime.utcnow() + timedelta(
|
||||||
@ -421,10 +427,10 @@ def deadmans_leader_election():
|
|||||||
if iso8601.parse_date(leader_expiry).replace(
|
if iso8601.parse_date(leader_expiry).replace(
|
||||||
tzinfo=None) < datetime.utcnow().replace(tzinfo=None):
|
tzinfo=None) < datetime.utcnow().replace(tzinfo=None):
|
||||||
logger.info("Current cluster leader has expired")
|
logger.info("Current cluster leader has expired")
|
||||||
declare_myself_cluser_leader()
|
declare_myself_cluster_leader()
|
||||||
elif local_hostname == leader_node:
|
elif local_hostname == leader_node:
|
||||||
logger.info("Renewing cluster leader lease")
|
logger.info("Renewing cluster leader lease")
|
||||||
declare_myself_cluser_leader()
|
declare_myself_cluster_leader()
|
||||||
|
|
||||||
|
|
||||||
def get_grastate_val(key):
|
def get_grastate_val(key):
|
||||||
@ -578,6 +584,43 @@ def check_if_cluster_data_is_fresh():
|
|||||||
return sample_time_ok
|
return sample_time_ok
|
||||||
|
|
||||||
|
|
||||||
|
def get_nodes_with_highest_seqno():
|
||||||
|
"""Find out which node(s) has the highest sequence number and return
|
||||||
|
them in an array."""
|
||||||
|
logger.info("Getting the node(s) with highest seqno from configmap.")
|
||||||
|
state_configmap = k8s_api_instance.read_namespaced_config_map(
|
||||||
|
name=state_configmap_name, namespace=pod_namespace)
|
||||||
|
state_configmap_dict = state_configmap.to_dict()
|
||||||
|
seqnos = dict()
|
||||||
|
for key, value in state_configmap_dict['data'].iteritems():
|
||||||
|
keyitems = key.split('.')
|
||||||
|
key = keyitems[0]
|
||||||
|
node = keyitems[1]
|
||||||
|
if key == 'seqno':
|
||||||
|
seqnos[node] = value
|
||||||
|
max_seqno = max(seqnos.values())
|
||||||
|
max_seqno_nodes = sorted(
|
||||||
|
[k for k, v in seqnos.items() if v == max_seqno])
|
||||||
|
return max_seqno_nodes
|
||||||
|
|
||||||
|
|
||||||
|
def resolve_leader_node(nodename_array):
|
||||||
|
"""From the given nodename array, determine which node is the leader
|
||||||
|
by choosing the node which has a hostname with the lowest number at
|
||||||
|
the end of it. If by chance there are two nodes with the same number
|
||||||
|
then the first one encountered will be chosen."""
|
||||||
|
logger.info("Returning the node with the lowest hostname")
|
||||||
|
lowest = sys.maxint
|
||||||
|
leader = nodename_array[0]
|
||||||
|
for nodename in nodename_array:
|
||||||
|
nodenum = int(nodename[nodename.rindex('-')+1:])
|
||||||
|
logger.info("Nodename %s has nodenum %d", nodename, nodenum)
|
||||||
|
if nodenum < lowest:
|
||||||
|
lowest = nodenum
|
||||||
|
leader = nodename
|
||||||
|
logger.info("Resolved leader is %s", leader)
|
||||||
|
return leader
|
||||||
|
|
||||||
def check_if_i_lead():
|
def check_if_i_lead():
|
||||||
"""Check on full restart of cluster if this node should lead the cluster
|
"""Check on full restart of cluster if this node should lead the cluster
|
||||||
reformation."""
|
reformation."""
|
||||||
@ -596,24 +639,13 @@ def check_if_i_lead():
|
|||||||
logger.info(
|
logger.info(
|
||||||
"Cluster info has been uptodate {0} times out of the required "
|
"Cluster info has been uptodate {0} times out of the required "
|
||||||
"{1}".format(counter, count))
|
"{1}".format(counter, count))
|
||||||
state_configmap = k8s_api_instance.read_namespaced_config_map(
|
max_seqno_nodes = get_nodes_with_highest_seqno()
|
||||||
name=state_configmap_name, namespace=pod_namespace)
|
leader_node = resolve_leader_node(max_seqno_nodes)
|
||||||
state_configmap_dict = state_configmap.to_dict()
|
if local_hostname == leader_node:
|
||||||
seqnos = dict()
|
|
||||||
for key, value in state_configmap_dict['data'].iteritems():
|
|
||||||
keyitems = key.split('.')
|
|
||||||
key = keyitems[0]
|
|
||||||
node = keyitems[1]
|
|
||||||
if key == 'seqno':
|
|
||||||
seqnos[node] = value
|
|
||||||
max_seqno = max(seqnos.values())
|
|
||||||
max_seqno_node = sorted(
|
|
||||||
[k for k, v in seqnos.items() if v == max_seqno])[0]
|
|
||||||
if local_hostname == max_seqno_node:
|
|
||||||
logger.info("I lead the cluster")
|
logger.info("I lead the cluster")
|
||||||
return True
|
return True
|
||||||
else:
|
else:
|
||||||
logger.info("{0} leads the cluster".format(max_seqno_node))
|
logger.info("{0} leads the cluster".format(leader_node))
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
|
||||||
@ -702,7 +734,7 @@ def run_mysqld(cluster='existing'):
|
|||||||
|
|
||||||
def mysqld_reboot():
|
def mysqld_reboot():
|
||||||
"""Reboot a mysqld cluster."""
|
"""Reboot a mysqld cluster."""
|
||||||
declare_myself_cluser_leader()
|
declare_myself_cluster_leader()
|
||||||
set_grastate_val(key='safe_to_bootstrap', value='1')
|
set_grastate_val(key='safe_to_bootstrap', value='1')
|
||||||
run_mysqld(cluster='new')
|
run_mysqld(cluster='new')
|
||||||
|
|
||||||
@ -723,7 +755,7 @@ if get_cluster_state() == 'new':
|
|||||||
if leader_node == local_hostname:
|
if leader_node == local_hostname:
|
||||||
set_configmap_annotation(
|
set_configmap_annotation(
|
||||||
key='openstackhelm.openstack.org/cluster.state', value='init')
|
key='openstackhelm.openstack.org/cluster.state', value='init')
|
||||||
declare_myself_cluser_leader()
|
declare_myself_cluster_leader()
|
||||||
launch_leader_election()
|
launch_leader_election()
|
||||||
mysqld_bootstrap()
|
mysqld_bootstrap()
|
||||||
update_grastate_configmap()
|
update_grastate_configmap()
|
||||||
|
Loading…
x
Reference in New Issue
Block a user