Merge "MariaDB GTID Replication"

This commit is contained in:
Jenkins 2016-01-28 04:04:58 +00:00 committed by Gerrit Code Review
commit 00c1e12909
9 changed files with 177 additions and 59 deletions

View File

@ -1196,10 +1196,11 @@ mariadb_opts = [
help='Default strategy to perform backups.', help='Default strategy to perform backups.',
deprecated_name='backup_strategy', deprecated_name='backup_strategy',
deprecated_group='DEFAULT'), deprecated_group='DEFAULT'),
cfg.StrOpt('replication_strategy', default='MysqlBinlogReplication', cfg.StrOpt('replication_strategy', default='MariaDBGTIDReplication',
help='Default strategy for replication.'), help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace', cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.mysql_binlog', default='trove.guestagent.strategies.replication.experimental'
'.mariadb_gtid',
help='Namespace to load replication strategies from.'), help='Namespace to load replication strategies from.'),
cfg.StrOpt('mount_point', default='/var/lib/mysql', cfg.StrOpt('mount_point', default='/var/lib/mysql',
help="Filesystem path for mounting " help="Filesystem path for mounting "

View File

@ -37,6 +37,38 @@ class MySqlApp(service.BaseMySqlApp):
super(MySqlApp, self).__init__(status, LocalSqlClient, super(MySqlApp, self).__init__(status, LocalSqlClient,
KeepAliveConnection) KeepAliveConnection)
def _get_slave_status(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SHOW SLAVE STATUS').first()
def _get_master_UUID(self):
slave_status = self._get_slave_status()
return slave_status and slave_status['Master_Server_Id'] or None
def _get_gtid_executed(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SELECT @@global.gtid_binlog_pos').first()[0]
def get_last_txn(self):
master_UUID = self._get_master_UUID()
last_txn_id = '0'
gtid_executed = self._get_gtid_executed()
for gtid_set in gtid_executed.split(','):
uuid_set = gtid_set.split('-')
if uuid_set[1] == master_UUID:
last_txn_id = uuid_set[-1]
break
return master_UUID, int(last_txn_id)
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
with self.local_sql_client(self.get_engine()) as client:
client.execute("SELECT MASTER_GTID_WAIT('%s')" % txn)
class MySqlRootAccess(service.BaseMySqlRootAccess): class MySqlRootAccess(service.BaseMySqlRootAccess):
def __init__(self): def __init__(self):

View File

@ -37,6 +37,39 @@ class MySqlApp(service.BaseMySqlApp):
super(MySqlApp, self).__init__(status, LocalSqlClient, super(MySqlApp, self).__init__(status, LocalSqlClient,
KeepAliveConnection) KeepAliveConnection)
def _get_slave_status(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SHOW SLAVE STATUS').first()
def _get_master_UUID(self):
slave_status = self._get_slave_status()
return slave_status and slave_status['Master_UUID'] or None
def _get_gtid_executed(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SELECT @@global.gtid_executed').first()[0]
def get_last_txn(self):
master_UUID = self._get_master_UUID()
last_txn_id = '0'
gtid_executed = self._get_gtid_executed()
for gtid_set in gtid_executed.split(','):
uuid_set = gtid_set.split(':')
if uuid_set[0] == master_UUID:
last_txn_id = uuid_set[-1].split('-')[-1]
break
return master_UUID, int(last_txn_id)
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
with self.local_sql_client(self.get_engine()) as client:
client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')"
% txn)
class MySqlRootAccess(service.BaseMySqlRootAccess): class MySqlRootAccess(service.BaseMySqlRootAccess):
def __init__(self): def __init__(self):

View File

@ -17,6 +17,8 @@
# #
from oslo_log import log as logging from oslo_log import log as logging
from trove.common.i18n import _
from trove.guestagent.datastore.mysql_common import service from trove.guestagent.datastore.mysql_common import service
LOG = logging.getLogger(__name__) LOG = logging.getLogger(__name__)
@ -40,6 +42,54 @@ class MySqlApp(service.BaseMySqlApp):
super(MySqlApp, self).__init__(status, LocalSqlClient, super(MySqlApp, self).__init__(status, LocalSqlClient,
KeepAliveConnection) KeepAliveConnection)
# DEPRECATED: Mantain for API Compatibility
def get_txn_count(self):
LOG.info(_("Retrieving latest txn id."))
txn_count = 0
with self.local_sql_client(self.get_engine()) as client:
result = client.execute('SELECT @@global.gtid_executed').first()
for uuid_set in result[0].split(','):
for interval in uuid_set.split(':')[1:]:
if '-' in interval:
iparts = interval.split('-')
txn_count += int(iparts[1]) - int(iparts[0])
else:
txn_count += 1
return txn_count
def _get_slave_status(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SHOW SLAVE STATUS').first()
def _get_master_UUID(self):
slave_status = self._get_slave_status()
return slave_status and slave_status['Master_UUID'] or None
def _get_gtid_executed(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SELECT @@global.gtid_executed').first()[0]
def get_last_txn(self):
master_UUID = self._get_master_UUID()
last_txn_id = '0'
gtid_executed = self._get_gtid_executed()
for gtid_set in gtid_executed.split(','):
uuid_set = gtid_set.split(':')
if uuid_set[0] == master_UUID:
last_txn_id = uuid_set[-1].split('-')[-1]
break
return master_UUID, int(last_txn_id)
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
with self.local_sql_client(self.get_engine()) as client:
client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')"
% txn)
class MySqlRootAccess(service.BaseMySqlRootAccess): class MySqlRootAccess(service.BaseMySqlRootAccess):
def __init__(self): def __init__(self):

View File

@ -974,54 +974,6 @@ class BaseMySqlApp(object):
LOG.info(_("Resetting configuration.")) LOG.info(_("Resetting configuration."))
self._reset_configuration(config_contents) self._reset_configuration(config_contents)
# DEPRECATED: Mantain for API Compatibility
def get_txn_count(self):
LOG.info(_("Retrieving latest txn id."))
txn_count = 0
with self.local_sql_client(self.get_engine()) as client:
result = client.execute('SELECT @@global.gtid_executed').first()
for uuid_set in result[0].split(','):
for interval in uuid_set.split(':')[1:]:
if '-' in interval:
iparts = interval.split('-')
txn_count += int(iparts[1]) - int(iparts[0])
else:
txn_count += 1
return txn_count
def _get_slave_status(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SHOW SLAVE STATUS').first()
def _get_master_UUID(self):
slave_status = self._get_slave_status()
return slave_status and slave_status['Master_UUID'] or None
def _get_gtid_executed(self):
with self.local_sql_client(self.get_engine()) as client:
return client.execute('SELECT @@global.gtid_executed').first()[0]
def get_last_txn(self):
master_UUID = self._get_master_UUID()
last_txn_id = '0'
gtid_executed = self._get_gtid_executed()
for gtid_set in gtid_executed.split(','):
uuid_set = gtid_set.split(':')
if uuid_set[0] == master_UUID:
last_txn_id = uuid_set[-1].split('-')[-1]
break
return master_UUID, int(last_txn_id)
def get_latest_txn_id(self):
LOG.info(_("Retrieving latest txn id."))
return self._get_gtid_executed()
def wait_for_txn(self, txn):
LOG.info(_("Waiting on txn '%s'.") % txn)
with self.local_sql_client(self.get_engine()) as client:
client.execute("SELECT WAIT_UNTIL_SQL_THREAD_AFTER_GTIDS('%s')"
% txn)
def reset_admin_password(self, admin_password): def reset_admin_password(self, admin_password):
"""Replace the password in the my.cnf file.""" """Replace the password in the my.cnf file."""
# grant the new admin password # grant the new admin password

View File

@ -0,0 +1,48 @@
# Copyright 2016 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
from oslo_log import log as logging
from trove.common import cfg
from trove.guestagent.backup.backupagent import BackupAgent
from trove.guestagent.strategies.replication import mysql_base
AGENT = BackupAgent()
CONF = cfg.CONF
LOG = logging.getLogger(__name__)
class MariaDBGTIDReplication(mysql_base.MysqlReplicationBase):
"""MariaDB Replication coordinated by GTIDs."""
def connect_to_master(self, service, snapshot):
logging_config = snapshot['log_position']
LOG.debug("connect_to_master %s" % logging_config['replication_user'])
change_master_cmd = (
"CHANGE MASTER TO MASTER_HOST='%(host)s', "
"MASTER_PORT=%(port)s, "
"MASTER_USER='%(user)s', "
"MASTER_PASSWORD='%(password)s', "
"MASTER_USE_GTID=slave_pos" %
{
'host': snapshot['master']['host'],
'port': snapshot['master']['port'],
'user': logging_config['replication_user']['name'],
'password': logging_config['replication_user']['password']
})
service.execute_on_client(change_master_cmd)
service.start_slave()

View File

@ -81,6 +81,13 @@ class MysqlReplicationBase(base.Replication):
return replication_user return replication_user
def backup_runner_for_replication(self):
return {
'runner': REPL_BACKUP_RUNNER,
'extra_opts': REPL_EXTRA_OPTS,
'incremental_runner': REPL_BACKUP_INCREMENTAL_RUNNER
}
def snapshot_for_replication(self, context, service, def snapshot_for_replication(self, context, service,
location, snapshot_info): location, snapshot_info):
snapshot_id = snapshot_info['id'] snapshot_id = snapshot_info['id']
@ -90,9 +97,8 @@ class MysqlReplicationBase(base.Replication):
# Only create a backup if it's the first replica # Only create a backup if it's the first replica
if replica_number == 1: if replica_number == 1:
AGENT.execute_backup( AGENT.execute_backup(
context, snapshot_info, runner=REPL_BACKUP_RUNNER, context, snapshot_info,
extra_opts=REPL_EXTRA_OPTS, **self.backup_runner_for_replication())
incremental_runner=REPL_BACKUP_INCREMENTAL_RUNNER)
else: else:
LOG.debug("Using existing backup created for previous replica.") LOG.debug("Using existing backup created for previous replica.")
LOG.debug("Replication snapshot %s used for replica number %d." LOG.debug("Replication snapshot %s used for replica number %d."
@ -119,13 +125,9 @@ class MysqlReplicationBase(base.Replication):
def enable_as_slave(self, service, snapshot, slave_config): def enable_as_slave(self, service, snapshot, slave_config):
try: try:
LOG.debug("enable_as_slave: about to call write_overrides")
service.write_replication_replica_overrides(slave_config) service.write_replication_replica_overrides(slave_config)
LOG.debug("enable_as_slave: about to call restart")
service.restart() service.restart()
LOG.debug("enable_as_slave: about to call connect_to_master")
self.connect_to_master(service, snapshot) self.connect_to_master(service, snapshot)
LOG.debug("enable_as_slave: after call connect_to_master")
except Exception: except Exception:
LOG.exception(_("Exception enabling guest as replica")) LOG.exception(_("Exception enabling guest as replica"))
raise raise

View File

@ -1,4 +1,4 @@
[mysqld] [mysqld]
log_bin = /var/lib/mysql/data/mysql-bin.log log_bin = /var/lib/mysql/data/mysql-bin.log
relay_log = /var/lib/mysql/data/mysql-relay-bin.log relay_log = /var/lib/mysql/data/mysqld-relay-bin.log
read_only = true read_only = true

View File

@ -1,2 +1,2 @@
[mysqld] [mysqld]
log_bin = /var/lib/mysql/data/mysql-bin.log log_bin = /var/lib/mysql/data/mariadb-bin.log