Merge "Run reset master when setting up mysql replicas"
This commit is contained in:
commit
8ea0f2bcaa
@ -106,7 +106,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
"""Internal. Given a MySQLUser, populate its databases attribute."""
|
"""Internal. Given a MySQLUser, populate its databases attribute."""
|
||||||
LOG.debug("Associating dbs to user %(name)s at %(host)s.",
|
LOG.debug("Associating dbs to user %(name)s at %(host)s.",
|
||||||
{'name': user.name, 'host': user.host})
|
{'name': user.name, 'host': user.host})
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
q = sql_query.Query()
|
q = sql_query.Query()
|
||||||
q.columns = ["grantee", "table_schema"]
|
q.columns = ["grantee", "table_schema"]
|
||||||
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
|
q.tables = ["information_schema.SCHEMA_PRIVILEGES"]
|
||||||
@ -122,7 +123,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
def change_passwords(self, users):
|
def change_passwords(self, users):
|
||||||
"""Change the passwords of one or more existing users."""
|
"""Change the passwords of one or more existing users."""
|
||||||
LOG.debug("Changing the password of some users.")
|
LOG.debug("Changing the password of some users.")
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
for item in users:
|
for item in users:
|
||||||
LOG.debug("Changing password for user %s.", item)
|
LOG.debug("Changing password for user %s.", item)
|
||||||
user_dict = {'_name': item['name'],
|
user_dict = {'_name': item['name'],
|
||||||
@ -146,7 +148,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
new_password = user_attrs.get('password')
|
new_password = user_attrs.get('password')
|
||||||
|
|
||||||
if new_name or new_host or new_password:
|
if new_name or new_host or new_password:
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
if new_password is not None:
|
if new_password is not None:
|
||||||
uu = sql_query.SetPassword(
|
uu = sql_query.SetPassword(
|
||||||
user.name, host=user.host,
|
user.name, host=user.host,
|
||||||
@ -165,7 +168,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
def create_databases(self, databases):
|
def create_databases(self, databases):
|
||||||
"""Create the list of specified databases."""
|
"""Create the list of specified databases."""
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
for item in databases:
|
for item in databases:
|
||||||
mydb = models.MySQLSchema.deserialize(item)
|
mydb = models.MySQLSchema.deserialize(item)
|
||||||
mydb.check_create()
|
mydb.check_create()
|
||||||
@ -180,7 +184,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
"""Create users and grant them privileges for the
|
"""Create users and grant them privileges for the
|
||||||
specified databases.
|
specified databases.
|
||||||
"""
|
"""
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
for item in users:
|
for item in users:
|
||||||
user = models.MySQLUser.deserialize(item)
|
user = models.MySQLUser.deserialize(item)
|
||||||
user.check_create()
|
user.check_create()
|
||||||
@ -200,7 +205,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
|
|
||||||
def delete_database(self, database):
|
def delete_database(self, database):
|
||||||
"""Delete the specified database."""
|
"""Delete the specified database."""
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
mydb = models.MySQLSchema.deserialize(database)
|
mydb = models.MySQLSchema.deserialize(database)
|
||||||
mydb.check_delete()
|
mydb.check_delete()
|
||||||
dd = sql_query.DropDatabase(mydb.name)
|
dd = sql_query.DropDatabase(mydb.name)
|
||||||
@ -214,7 +220,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
self.delete_user_by_name(mysql_user.name, mysql_user.host)
|
self.delete_user_by_name(mysql_user.name, mysql_user.host)
|
||||||
|
|
||||||
def delete_user_by_name(self, name, host='%'):
|
def delete_user_by_name(self, name, host='%'):
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
du = sql_query.DropUser(name, host=host)
|
du = sql_query.DropUser(name, host=host)
|
||||||
t = text(str(du))
|
t = text(str(du))
|
||||||
LOG.debug("delete_user_by_name: %s", t)
|
LOG.debug("delete_user_by_name: %s", t)
|
||||||
@ -240,7 +247,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
": %(reason)s") %
|
": %(reason)s") %
|
||||||
{'user': username, 'reason': err_msg}
|
{'user': username, 'reason': err_msg}
|
||||||
)
|
)
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
q = sql_query.Query()
|
q = sql_query.Query()
|
||||||
q.columns = ['User', 'Host']
|
q.columns = ['User', 'Host']
|
||||||
q.tables = ['mysql.user']
|
q.tables = ['mysql.user']
|
||||||
@ -262,7 +270,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
"""Grant a user permission to use a given database."""
|
"""Grant a user permission to use a given database."""
|
||||||
user = self._get_user(username, hostname)
|
user = self._get_user(username, hostname)
|
||||||
mydb = None # cache the model as we just want name validation
|
mydb = None # cache the model as we just want name validation
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
for database in databases:
|
for database in databases:
|
||||||
try:
|
try:
|
||||||
if mydb:
|
if mydb:
|
||||||
@ -365,7 +374,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
LOG.debug("The following user names are on ignore list and will "
|
LOG.debug("The following user names are on ignore list and will "
|
||||||
"be omitted from the listing: %s", ignored_user_names)
|
"be omitted from the listing: %s", ignored_user_names)
|
||||||
users = []
|
users = []
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
iq = sql_query.Query() # Inner query.
|
iq = sql_query.Query() # Inner query.
|
||||||
iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"]
|
iq.columns = ['User', 'Host', "CONCAT(User, '@', Host) as Marker"]
|
||||||
iq.tables = ['mysql.user']
|
iq.tables = ['mysql.user']
|
||||||
@ -407,7 +417,8 @@ class BaseMySqlAdmin(object, metaclass=abc.ABCMeta):
|
|||||||
def revoke_access(self, username, hostname, database):
|
def revoke_access(self, username, hostname, database):
|
||||||
"""Revoke a user's permission to use a given database."""
|
"""Revoke a user's permission to use a given database."""
|
||||||
user = self._get_user(username, hostname)
|
user = self._get_user(username, hostname)
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
r = sql_query.Revoke(database=database,
|
r = sql_query.Revoke(database=database,
|
||||||
user=user.name,
|
user=user.name,
|
||||||
host=user.host)
|
host=user.host)
|
||||||
@ -463,9 +474,10 @@ class BaseMySqlApp(service.BaseDbApp):
|
|||||||
|
|
||||||
return ENGINE
|
return ENGINE
|
||||||
|
|
||||||
def execute_sql(self, sql_statement):
|
def execute_sql(self, sql_statement, use_flush=False):
|
||||||
LOG.debug("Executing SQL: %s", sql_statement)
|
LOG.debug("Executing SQL: %s", sql_statement)
|
||||||
with mysql_util.SqlClient(self.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.get_engine(), use_flush=use_flush) as client:
|
||||||
return client.execute(sql_statement)
|
return client.execute(sql_statement)
|
||||||
|
|
||||||
def get_data_dir(self):
|
def get_data_dir(self):
|
||||||
@ -513,21 +525,21 @@ class BaseMySqlApp(service.BaseDbApp):
|
|||||||
|
|
||||||
engine = sqlalchemy.create_engine(
|
engine = sqlalchemy.create_engine(
|
||||||
CONNECTION_STR_FORMAT % ('root', root_pass), echo=True)
|
CONNECTION_STR_FORMAT % ('root', root_pass), echo=True)
|
||||||
with mysql_util.SqlClient(engine, use_flush=False) as client:
|
with mysql_util.SqlClient(engine) as client:
|
||||||
self._create_admin_user(client, admin_password)
|
self._create_admin_user(client, admin_password)
|
||||||
|
|
||||||
engine = sqlalchemy.create_engine(
|
engine = sqlalchemy.create_engine(
|
||||||
CONNECTION_STR_FORMAT % (ADMIN_USER_NAME,
|
CONNECTION_STR_FORMAT % (ADMIN_USER_NAME,
|
||||||
urllib.parse.quote(admin_password)),
|
urllib.parse.quote(admin_password)),
|
||||||
echo=True)
|
echo=True)
|
||||||
with mysql_util.SqlClient(engine) as client:
|
with mysql_util.SqlClient(engine, use_flush=True) as client:
|
||||||
self._remove_anonymous_user(client)
|
self._remove_anonymous_user(client)
|
||||||
|
|
||||||
self.save_password(ADMIN_USER_NAME, admin_password)
|
self.save_password(ADMIN_USER_NAME, admin_password)
|
||||||
LOG.info("MySQL secure complete.")
|
LOG.info("MySQL secure complete.")
|
||||||
|
|
||||||
def secure_root(self):
|
def secure_root(self):
|
||||||
with mysql_util.SqlClient(self.get_engine()) as client:
|
with mysql_util.SqlClient(self.get_engine(), use_flush=True) as client:
|
||||||
self._remove_remote_root_access(client)
|
self._remove_remote_root_access(client)
|
||||||
|
|
||||||
def _remove_anonymous_user(self, client):
|
def _remove_anonymous_user(self, client):
|
||||||
@ -742,7 +754,7 @@ class BaseMySqlApp(service.BaseDbApp):
|
|||||||
LOG.info("Granting replication slave privilege for %s",
|
LOG.info("Granting replication slave privilege for %s",
|
||||||
replication_user['name'])
|
replication_user['name'])
|
||||||
|
|
||||||
with mysql_util.SqlClient(self.get_engine()) as client:
|
with mysql_util.SqlClient(self.get_engine(), use_flush=True) as client:
|
||||||
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
|
g = sql_query.Grant(permissions=['REPLICATION SLAVE'],
|
||||||
user=replication_user['name'],
|
user=replication_user['name'],
|
||||||
clear=replication_user['password'])
|
clear=replication_user['password'])
|
||||||
@ -848,7 +860,8 @@ class BaseMySqlRootAccess(object):
|
|||||||
reset the root password.
|
reset the root password.
|
||||||
"""
|
"""
|
||||||
user = models.MySQLUser.root(password=root_password)
|
user = models.MySQLUser.root(password=root_password)
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
try:
|
try:
|
||||||
cu = sql_query.CreateUser(user.name, host=user.host)
|
cu = sql_query.CreateUser(user.name, host=user.host)
|
||||||
t = text(str(cu))
|
t = text(str(cu))
|
||||||
@ -857,7 +870,9 @@ class BaseMySqlRootAccess(object):
|
|||||||
# Ignore, user is already created, just reset the password
|
# Ignore, user is already created, just reset the password
|
||||||
# TODO(rnirmal): More fine grained error checking later on
|
# TODO(rnirmal): More fine grained error checking later on
|
||||||
LOG.debug(err)
|
LOG.debug(err)
|
||||||
with mysql_util.SqlClient(self.mysql_app.get_engine()) as client:
|
|
||||||
|
with mysql_util.SqlClient(
|
||||||
|
self.mysql_app.get_engine(), use_flush=True) as client:
|
||||||
uu = sql_query.SetPassword(
|
uu = sql_query.SetPassword(
|
||||||
user.name, host=user.host, new_password=user.password,
|
user.name, host=user.host, new_password=user.password,
|
||||||
ds=CONF.datastore_manager, ds_version=CONF.datastore_version
|
ds=CONF.datastore_manager, ds_version=CONF.datastore_version
|
||||||
|
@ -31,6 +31,10 @@ class MysqlGTIDReplication(mysql_base.MysqlReplicationBase):
|
|||||||
last_gtid = self.read_last_master_gtid(service)
|
last_gtid = self.read_last_master_gtid(service)
|
||||||
LOG.info("last_gtid value is %s", last_gtid)
|
LOG.info("last_gtid value is %s", last_gtid)
|
||||||
if '-' in last_gtid:
|
if '-' in last_gtid:
|
||||||
|
# See
|
||||||
|
# https://avdeo.com/tag/error-1840-hy000-global-gtid_purged-can-only-be-set-when/
|
||||||
|
# Also, FLUSH PRIVILEGES will restore gtid_executed.
|
||||||
|
service.execute_sql('RESET MASTER')
|
||||||
set_gtid_cmd = "SET GLOBAL gtid_purged='%s'" % last_gtid
|
set_gtid_cmd = "SET GLOBAL gtid_purged='%s'" % last_gtid
|
||||||
service.execute_sql(set_gtid_cmd)
|
service.execute_sql(set_gtid_cmd)
|
||||||
|
|
||||||
|
@ -25,14 +25,14 @@ FLUSH = text(sql_query.FLUSH)
|
|||||||
class SqlClient(object):
|
class SqlClient(object):
|
||||||
"""A sqlalchemy wrapper to manage transactions."""
|
"""A sqlalchemy wrapper to manage transactions."""
|
||||||
|
|
||||||
def __init__(self, engine, use_flush=True):
|
def __init__(self, engine, use_flush=False):
|
||||||
self.engine = engine
|
self.engine = engine
|
||||||
self.use_flush = use_flush
|
self.use_flush = use_flush
|
||||||
|
|
||||||
def __enter__(self):
|
def __enter__(self):
|
||||||
self.conn = self.engine.connect()
|
self.conn = self.engine.connect()
|
||||||
self.trans = self.conn.begin()
|
self.trans = self.conn.begin()
|
||||||
return self.conn
|
return self
|
||||||
|
|
||||||
def __exit__(self, type, value, traceback):
|
def __exit__(self, type, value, traceback):
|
||||||
if self.trans:
|
if self.trans:
|
||||||
@ -48,10 +48,11 @@ class SqlClient(object):
|
|||||||
LOG.debug('Execute SQL: %s', t)
|
LOG.debug('Execute SQL: %s', t)
|
||||||
try:
|
try:
|
||||||
return self.conn.execute(t, kwargs)
|
return self.conn.execute(t, kwargs)
|
||||||
except Exception:
|
except Exception as err:
|
||||||
|
LOG.error(f'Failed to execute SQL {t}, error: {err}')
|
||||||
self.trans.rollback()
|
self.trans.rollback()
|
||||||
self.trans = None
|
self.trans = None
|
||||||
raise
|
raise err
|
||||||
|
|
||||||
|
|
||||||
def connection_checkout(dbapi_con, con_record, con_proxy):
|
def connection_checkout(dbapi_con, con_record, con_proxy):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user