Postgresql Streaming Replication

Support for standard WAL based streaming replication for
postgresql guests. Sets up read-only hot standby servers.

Basic replication is supported for 9.3, but promote
requires pg_rewind which is only supported for 9.4 and
greater. Promote currently only works with single
master-replica configurations (see TODO and associated
bug 1553339 for details)

Test changes:
 - moved the replication promote tests to a separate group
 - skipped known issues with bug reference

Change-Id: I519fa58d786edddb09bf14ce7629e1be51b62600
Implements: bp/pgsql-replication
This commit is contained in:
Alex Tomic 2015-09-17 10:35:08 -04:00 committed by Amrith Kumar
parent 7d8d743d8e
commit 4c1c191def
24 changed files with 720 additions and 93 deletions

View File

@ -0,0 +1,4 @@
---
features:
- Support for standard WAL based streaming replication for
postgresql guests. Sets up read-only hot standby servers.

View File

@ -1071,6 +1071,13 @@ postgresql_opts = [
help='Incremental Backup Runner based on the default '
'strategy. For strategies that do not implement an '
'incremental, the runner will use the default full backup.'),
cfg.StrOpt('replication_strategy',
default='PostgresqlReplicationStreaming',
help='Default strategy for replication.'),
cfg.StrOpt('replication_namespace',
default='trove.guestagent.strategies.replication.experimental.'
'postgresql_impl',
help='Namespace to load replication strategies from.'),
cfg.StrOpt('mount_point', default='/var/lib/postgresql',
help="Filesystem path for mounting "
"volumes if volume support is enabled."),

View File

@ -371,7 +371,7 @@ class PropertiesCodec(StreamCodec):
return container
class KeyValueCodec(PropertiesCodec):
class KeyValueCodec(StreamCodec):
"""
Read/write data from/into a simple key=value file.
@ -388,12 +388,92 @@ class KeyValueCodec(PropertiesCodec):
}
"""
def __init__(self, delimiter='=', comment_markers=('#'),
unpack_singletons=True, string_mappings=None):
super(KeyValueCodec, self).__init__(
delimiter=delimiter, comment_markers=comment_markers,
unpack_singletons=unpack_singletons,
string_mappings=string_mappings)
BOOL_PYTHON = 0 # True, False
BOOL_LOWER = 1 # true, false
BOOL_UPPER = 2 # TRUE, FALSE
def __init__(self, delimiter='=',
comment_marker='#',
line_terminator='\r\n',
value_quoting=False,
value_quote_char="'",
bool_case=BOOL_PYTHON,
big_ints=False,
hidden_marker=None):
"""
:param delimiter: string placed between key and value
:param comment_marker: string indicating comment line in file
:param line_terminator: string placed between lines
:param value_quoting: whether or not to quote string values
:param value_quote_char: character used to quote string values
:param bool_case: BOOL_* setting case of bool values
:param big_ints: treat K/M/G at the end of ints as an int
:param hidden_marker: pattern prefixing hidden param
"""
self._delimeter = delimiter
self._comment_marker = comment_marker
self._line_terminator = line_terminator
self._value_quoting = value_quoting
self._value_quote_char = value_quote_char
self._bool_case = bool_case
self._big_ints = big_ints
self._hidden_marker = hidden_marker
def serialize(self, dict_data):
lines = []
for k, v in dict_data.items():
lines.append(k + self._delimeter + self.serialize_value(v))
return self._line_terminator.join(lines)
def deserialize(self, stream):
lines = stream.split(self._line_terminator)
result = {}
for line in lines:
line = line.lstrip().rstrip()
if line == '' or line.startswith(self._comment_marker):
continue
k, v = re.split(re.escape(self._delimeter), line, 1)
if self._value_quoting and v.startswith(self._value_quote_char):
# remove trailing comments
v = re.sub(r'%s *%s.*$' % ("'", '#'), '', v)
v = v.lstrip(
self._value_quote_char).rstrip(
self._value_quote_char)
else:
# remove trailing comments
v = re.sub('%s.*$' % self._comment_marker, '', v)
if self._hidden_marker and v.startswith(self._hidden_marker):
continue
result[k.strip()] = v
return result
def serialize_value(self, value):
if isinstance(value, bool):
if self._bool_case == self.BOOL_PYTHON:
value = str(value)
elif self._bool_case == self.BOOL_LOWER:
value = str(value).lower()
elif self._bool_case == self.BOOL_UPPER:
value = str(value).upper()
if self.should_quote_value(value):
value = self._value_quote_char + value + self._value_quote_char
return str(value)
def should_quote_value(self, value):
if not self._value_quoting:
return False
if isinstance(value, bool) or isinstance(value, int):
return False
if value.lower() in ['true', 'false']:
return False
try:
int(value)
return False
except ValueError:
pass
if self._big_ints and re.match(r'\d+[kKmMgGtTpP]', value):
return False
return True
class JsonCodec(StreamCodec):

View File

@ -25,11 +25,15 @@ from .service.root import PgSqlRoot
from .service.status import PgSqlAppStatus
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common.notification import EndNotification
from trove.common import utils
from trove.guestagent import backup
from trove.guestagent.datastore.experimental.postgresql import pgutil
from trove.guestagent.datastore import manager
from trove.guestagent.db import models
from trove.guestagent import dbaas
from trove.guestagent import guest_log
from trove.guestagent import volume
@ -108,11 +112,17 @@ class Manager(
pgutil.PG_ADMIN = self.ADMIN_USER
backup.restore(context, backup_info, '/tmp')
if snapshot:
self.attach_replica(context, snapshot, snapshot['config'])
self.start_db(context)
if not backup_info:
self._secure(context)
if not cluster_config and self.is_root_enabled(context):
self.status.report_root(context, 'postgres')
def _secure(self, context):
# Create a new administrative user for Trove and also
# disable the built-in superuser.
@ -127,3 +137,89 @@ class Manager(
with EndNotification(context):
self.enable_backups()
backup.backup(context, backup_info)
def backup_required_for_replication(self, context):
return self.replication.backup_required_for_replication()
def attach_replica(self, context, replica_info, slave_config):
self.replication.enable_as_slave(self, replica_info, None)
def detach_replica(self, context, for_failover=False):
replica_info = self.replication.detach_slave(self, for_failover)
return replica_info
def enable_as_master(self, context, replica_source_config):
self.enable_backups()
self.replication.enable_as_master(self, None)
def make_read_only(self, context, read_only):
"""There seems to be no way to flag this at the database level in
PostgreSQL at the moment -- see discussion here:
http://www.postgresql.org/message-id/flat/CA+TgmobWQJ-GCa_tWUc4=80A
1RJ2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com#CA+TgmobWQJ-GCa_tWUc4=80A1RJ
2_+Rq3w_MqaVguk_q018dqw@mail.gmail.com
"""
pass
def get_replica_context(self, context):
return self.replication.get_replica_context(None)
def get_latest_txn_id(self, context):
if self.pg_is_in_recovery():
lsn = self.pg_last_xlog_replay_location()
else:
lsn = self.pg_current_xlog_location()
LOG.info(_("Last xlog location found: %s") % lsn)
return lsn
def get_last_txn(self, context):
master_host = self.pg_primary_host()
repl_offset = self.get_latest_txn_id(context)
return master_host, repl_offset
def wait_for_txn(self, context, txn):
if not self.pg_is_in_recovery():
raise RuntimeError(_("Attempting to wait for a txn on a server "
"not in recovery mode!"))
def _wait_for_txn():
lsn = self.pg_last_xlog_replay_location()
LOG.info(_("Last xlog location found: %s") % lsn)
return lsn >= txn
try:
utils.poll_until(_wait_for_txn, time_out=120)
except exception.PollTimeOut:
raise RuntimeError(_("Timeout occurred waiting for xlog "
"offset to change to '%s'.") % txn)
def cleanup_source_on_replica_detach(self, context, replica_info):
self.replication.cleanup_source_on_replica_detach()
def demote_replication_master(self, context):
self.replication.demote_master(self)
def get_replication_snapshot(self, context, snapshot_info,
replica_source_config=None):
self.enable_backups()
self.replication.enable_as_master(None, None)
snapshot_id, log_position = (
self.replication.snapshot_for_replication(context, None, None,
snapshot_info))
mount_point = CONF.get(self.manager).mount_point
volume_stats = dbaas.get_filesystem_volume_stats(mount_point)
replication_snapshot = {
'dataset': {
'datastore_manager': self.manager,
'dataset_size': volume_stats.get('used', 0.0),
'snapshot_id': snapshot_id
},
'replication_strategy': self.replication_strategy,
'master': self.replication.get_master_ref(None, snapshot_info),
'log_position': log_position
}
return replication_snapshot

View File

@ -16,6 +16,7 @@
from oslo_log import log as logging
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.guestagent.datastore.experimental.postgresql import pgutil
from trove.guestagent.db import models

View File

@ -36,6 +36,7 @@ LOG = logging.getLogger(__name__)
CONF = cfg.CONF
BACKUP_CFG_OVERRIDE = 'PgBaseBackupConfig'
DEBUG_MODE_OVERRIDE = 'DebugLevelOverride'
class PgSqlConfig(PgSqlProcess):
@ -189,6 +190,16 @@ class PgSqlConfig(PgSqlProcess):
operating_system.chmod(self.pgsql_hba_config, FileMode.SET_USR_RO,
as_root=True)
def disable_backups(self):
"""Reverse overrides applied by PgBaseBackup strategy"""
if not self.configuration_manager.has_system_override(
BACKUP_CFG_OVERRIDE):
return
LOG.info(_("Removing configuration changes for backups"))
self.configuration_manager.remove_system_override(BACKUP_CFG_OVERRIDE)
self.remove_wal_archive_dir()
self.restart(context=None)
def enable_backups(self):
"""Apply necessary changes to config to enable WAL-based backups
if we are using the PgBaseBackup strategy
@ -209,7 +220,7 @@ class PgSqlConfig(PgSqlProcess):
opts = {
'wal_level': 'hot_standby',
'archive_mode ': 'on',
'max_wal_senders': 3,
'max_wal_senders': 8,
'checkpoint_segments ': 8,
'wal_keep_segments': 8,
'archive_command': arch_cmd
@ -220,3 +231,13 @@ class PgSqlConfig(PgSqlProcess):
self.configuration_manager.apply_system_override(
opts, BACKUP_CFG_OVERRIDE)
self.restart(None)
def disable_debugging(self, level=1):
"""Disable debug-level logging in postgres"""
self.configuration_manager.remove_system_override(DEBUG_MODE_OVERRIDE)
def enable_debugging(self, level=1):
"""Enable debug-level logging in postgres"""
opt = {'log_min_messages': 'DEBUG%s' % level}
self.configuration_manager.apply_system_override(opt,
DEBUG_MODE_OVERRIDE)

View File

@ -20,8 +20,6 @@ from trove.common.i18n import _
from trove.guestagent.datastore.experimental.postgresql.service.process import(
PgSqlProcess)
from trove.guestagent import pkg
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
PgBaseBackupUtil)
LOG = logging.getLogger(__name__)
CONF = cfg.CONF
@ -49,7 +47,7 @@ class PgSqlInstall(PgSqlProcess):
)
)
PgBaseBackupUtil.recreate_wal_archive_dir()
PgSqlProcess.recreate_wal_archive_dir()
packager = pkg.Package()
if not packager.pkg_is_installed(packages):

View File

@ -14,9 +14,11 @@
# under the License.
import os
import re
from trove.common import cfg
from trove.guestagent.common import operating_system
from trove.guestagent.datastore.experimental.postgresql import pgutil
from trove.guestagent.datastore.experimental.postgresql.service.status import (
PgSqlAppStatus)
from trove.guestagent import guest_log
@ -38,6 +40,10 @@ class PgSqlProcess(object):
def pgsql_data_dir(self):
return os.path.dirname(self.pg_version[0])
@property
def pgsql_recovery_config(self):
return os.path.join(self.pgsql_data_dir, "recovery.conf")
@property
def pg_version(self):
"""Find the database version file stored in the data directory.
@ -65,3 +71,55 @@ class PgSqlProcess(object):
PgSqlAppStatus.get().stop_db_service(
self.SERVICE_CANDIDATES, CONF.state_change_wait_time,
disable_on_boot=do_not_start_on_reboot, update_db=update_db)
def pg_checkpoint(self):
"""Wrapper for CHECKPOINT call"""
pgutil.psql("CHECKPOINT")
def pg_current_xlog_location(self):
"""Wrapper for pg_current_xlog_location()
Cannot be used against a running slave
"""
r = pgutil.query("SELECT pg_current_xlog_location()")
return r[0][0]
def pg_last_xlog_replay_location(self):
"""Wrapper for pg_last_xlog_replay_location()
For use on standby servers
"""
r = pgutil.query("SELECT pg_last_xlog_replay_location()")
return r[0][0]
def pg_is_in_recovery(self):
"""Wrapper for pg_is_in_recovery() for detecting a server in
standby mode
"""
r = pgutil.query("SELECT pg_is_in_recovery()")
return r[0][0]
def pg_primary_host(self):
"""There seems to be no way to programmatically determine this
on a hot standby, so grab what we have written to the recovery
file
"""
r = operating_system.read_file(self.PGSQL_RECOVERY_CONFIG,
as_root=True)
regexp = re.compile("host=(\d+.\d+.\d+.\d+) ")
m = regexp.search(r)
return m.group(1)
@classmethod
def recreate_wal_archive_dir(cls):
wal_archive_dir = CONF.postgresql.wal_archive_location
operating_system.remove(wal_archive_dir, force=True, recursive=True,
as_root=True)
operating_system.create_directory(wal_archive_dir,
user=cls.PGSQL_OWNER,
group=cls.PGSQL_OWNER,
force=True, as_root=True)
@classmethod
def remove_wal_archive_dir(cls):
wal_archive_dir = CONF.postgresql.wal_archive_location
operating_system.remove(wal_archive_dir, force=True, recursive=True,
as_root=True)

View File

@ -37,7 +37,7 @@ class PgSqlRoot(PgSqlUsers):
)
# There should be only one superuser (Trove's administrative account).
return len(results) > 1 or (results[0] != self.ADMIN_USER)
return len(results) > 1 or (results[0][0] != self.ADMIN_USER)
# TODO(pmalik): For future use by 'root-disable'.
# def disable_root(self, context):

View File

@ -157,14 +157,21 @@ class PgSqlUsers(PgSqlAccess):
"""Delete the specified user.
"""
with EndNotification(context):
self._drop_user(models.PostgreSQLUser.deserialize_user(user))
self._drop_user(
context, models.PostgreSQLUser.deserialize_user(user))
def _drop_user(self, user):
def _drop_user(self, context, user):
"""Drop a given Postgres user.
:param user: User to be dropped.
:type user: PostgreSQLUser
"""
# Postgresql requires that you revoke grants before dropping the user
dbs = self.list_access(context, user.name, None)
for d in dbs:
db = models.PostgreSQLSchema.deserialize_schema(d)
self.revoke_access(context, user.name, None, db.name)
LOG.info(
_("{guest_id}: Dropping user {name}.").format(
guest_id=CONF.guest_id,
@ -184,7 +191,7 @@ class PgSqlUsers(PgSqlAccess):
def _find_user(self, context, username):
"""Lookup a user with a given username.
Return a new Postgres user instance or raise if no match is found.
Return a new Postgres user instance or None if no match is found.
"""
results = pgutil.query(
pgutil.UserQuery.get(name=username),

View File

@ -1053,7 +1053,8 @@ class PostgreSQLRootUser(PostgreSQLUser):
"""Represents the PostgreSQL default superuser."""
def __init__(self, password=None, *args, **kwargs):
password = password if not None else utils.generate_random_password()
if password is None:
password = utils.generate_random_password()
super(PostgreSQLRootUser, self).__init__("postgres", password=password,
*args, **kwargs)

View File

@ -0,0 +1,299 @@
# Copyright 2014 Tesora, Inc.
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
import os
from oslo_log import log as logging
from oslo_utils import netutils
from trove.common import cfg
from trove.common import exception
from trove.common.i18n import _
from trove.common import stream_codecs
from trove.common import utils
from trove.guestagent.backup.backupagent import BackupAgent
from trove.guestagent.common import operating_system
from trove.guestagent.common.operating_system import FileMode
from trove.guestagent.datastore.experimental.postgresql import pgutil
from trove.guestagent.datastore.experimental.postgresql\
.service.config import PgSqlConfig
from trove.guestagent.datastore.experimental.postgresql\
.service.database import PgSqlDatabase
from trove.guestagent.datastore.experimental.postgresql\
.service.install import PgSqlInstall
from trove.guestagent.datastore.experimental.postgresql \
.service.process import PgSqlProcess
from trove.guestagent.datastore.experimental.postgresql\
.service.root import PgSqlRoot
from trove.guestagent.db import models
from trove.guestagent.strategies import backup
from trove.guestagent.strategies.replication import base
AGENT = BackupAgent()
CONF = cfg.CONF
REPL_BACKUP_NAMESPACE = 'trove.guestagent.strategies.backup.experimental' \
'.postgresql_impl'
REPL_BACKUP_STRATEGY = 'PgBaseBackup'
REPL_BACKUP_INCREMENTAL_STRATEGY = 'PgBaseBackupIncremental'
REPL_BACKUP_RUNNER = backup.get_backup_strategy(
REPL_BACKUP_STRATEGY, REPL_BACKUP_NAMESPACE)
REPL_BACKUP_INCREMENTAL_RUNNER = backup.get_backup_strategy(
REPL_BACKUP_INCREMENTAL_STRATEGY, REPL_BACKUP_NAMESPACE)
REPL_EXTRA_OPTS = CONF.backup_runner_options.get(REPL_BACKUP_STRATEGY, '')
LOG = logging.getLogger(__name__)
TRIGGER_FILE = '/tmp/postgresql.trigger'
REPL_USER = 'replicator'
SLAVE_STANDBY_OVERRIDE = 'SlaveStandbyOverride'
class PostgresqlReplicationStreaming(
base.Replication,
PgSqlConfig,
PgSqlDatabase,
PgSqlRoot,
PgSqlInstall,
):
def __init__(self, *args, **kwargs):
super(PostgresqlReplicationStreaming, self).__init__(*args, **kwargs)
def get_master_ref(self, service, snapshot_info):
master_ref = {
'host': netutils.get_my_ipv4(),
'port': CONF.postgresql.postgresql_port
}
return master_ref
def backup_required_for_replication(self):
return True
def snapshot_for_replication(self, context, service,
location, snapshot_info):
snapshot_id = snapshot_info['id']
replica_number = snapshot_info.get('replica_number', 1)
LOG.debug("Acquiring backup for replica number %d." % replica_number)
# Only create a backup if it's the first replica
if replica_number == 1:
AGENT.execute_backup(
context, snapshot_info, runner=REPL_BACKUP_RUNNER,
extra_opts=REPL_EXTRA_OPTS,
incremental_runner=REPL_BACKUP_INCREMENTAL_RUNNER)
else:
LOG.info(_("Using existing backup created for previous replica."))
repl_user_info = self._get_or_create_replication_user()
log_position = {
'replication_user': repl_user_info
}
return snapshot_id, log_position
def _get_or_create_replication_user(self):
# There are three scenarios we need to deal with here:
# - This is a fresh master, with no replicator user created.
# Generate a new u/p
# - We are attaching a new slave and need to give it the login creds
# Send the creds we have stored in PGDATA/.replpass
# - This is a failed-over-to slave, who will have the replicator user
# but not the credentials file. Recreate the repl user in this case
pwfile = os.path.join(self.pgsql_data_dir, ".replpass")
if self.user_exists(REPL_USER):
if operating_system.exists(pwfile, as_root=True):
pw = operating_system.read_file(pwfile, as_root=True)
else:
u = models.PostgreSQLUser(REPL_USER)
self._drop_user(context=None, user=u)
pw = self._create_replication_user(pwfile)
else:
pw = self._create_replication_user(pwfile)
repl_user_info = {
'name': REPL_USER,
'password': pw
}
return repl_user_info
def _create_replication_user(self, pwfile):
"""Create the replication user. Unfortunately, to be able to
run pg_rewind, we need SUPERUSER, not just REPLICATION privilege
"""
pw = utils.generate_random_password()
operating_system.write_file(pwfile, pw, as_root=True)
operating_system.chown(pwfile, user=self.PGSQL_OWNER,
group=self.PGSQL_OWNER, as_root=True)
operating_system.chmod(pwfile, FileMode.SET_USR_RWX(),
as_root=True)
pgutil.psql("CREATE USER %s SUPERUSER ENCRYPTED "
"password '%s';" % (REPL_USER, pw))
return pw
def enable_as_master(self, service, master_config, for_failover=False):
# For a server to be a master in postgres, we need to enable
# replication user in pg_hba and ensure that WAL logging is
# the appropriate level (use the same settings as backups)
self._get_or_create_replication_user()
hba_entry = "host replication replicator 0.0.0.0/0 md5 \n"
tmp_hba = '/tmp/pg_hba'
operating_system.copy(self.pgsql_hba_config, tmp_hba,
force=True, as_root=True)
operating_system.chmod(tmp_hba, FileMode.SET_ALL_RWX(),
as_root=True)
with open(tmp_hba, 'a+') as hba_file:
hba_file.write(hba_entry)
operating_system.copy(tmp_hba, self.pgsql_hba_config,
force=True, as_root=True)
operating_system.chmod(self.pgsql_hba_config,
FileMode.SET_USR_RWX(),
as_root=True)
operating_system.remove(tmp_hba, as_root=True)
pgutil.psql("SELECT pg_reload_conf()")
def enable_as_slave(self, service, snapshot, slave_config):
"""Adds appropriate config options to postgresql.conf, and writes out
the recovery.conf file used to set up replication
"""
self._write_standby_recovery_file(snapshot, sslmode='prefer')
self.enable_hot_standby(service)
# Ensure the WAL arch is empty before restoring
PgSqlProcess.recreate_wal_archive_dir()
def detach_slave(self, service, for_failover):
"""Touch trigger file in to disable recovery mode"""
LOG.info(_("Detaching slave, use trigger to disable recovery mode"))
operating_system.write_file(TRIGGER_FILE, '')
operating_system.chown(TRIGGER_FILE, user=self.PGSQL_OWNER,
group=self.PGSQL_OWNER, as_root=True)
def _wait_for_failover():
# Wait until slave has switched out of recovery mode
return not self.pg_is_in_recovery()
try:
utils.poll_until(_wait_for_failover, time_out=120)
except exception.PollTimeOut:
raise RuntimeError(_("Timeout occurred waiting for slave to exit"
"recovery mode"))
def cleanup_source_on_replica_detach(self, admin_service, replica_info):
pass
def _rewind_against_master(self, service):
"""Call pg_rewind to resync datadir against state of new master
We should already have a recovery.conf file in PGDATA
"""
rconf = operating_system.read_file(
service.pgsql_recovery_config,
codec=stream_codecs.KeyValueCodec(line_terminator='\n'),
as_root=True)
conninfo = rconf['primary_conninfo'].strip()
# The recovery.conf file we want should already be there, but pg_rewind
# will delete it, so copy it out first
rec = self.pgsql_recovery_config
tmprec = "/tmp/recovery.conf.bak"
operating_system.move(rec, tmprec, as_root=True)
cmd_full = " ".join(["pg_rewind", "-D", service.pgsql_data_dir,
'--source-pgdata=' + service.pgsql_data_dir,
'--source-server=' + conninfo])
out, err = utils.execute("sudo", "su", "-", self.PGSQL_OWNER, "-c",
"%s" % cmd_full, check_exit_code=0)
LOG.debug("Got stdout %s and stderr %s from pg_rewind" %
(str(out), str(err)))
operating_system.move(tmprec, rec, as_root=True)
def demote_master(self, service):
"""In order to demote a master we need to shutdown the server and call
pg_rewind against the new master to enable a proper timeline
switch.
"""
self.pg_checkpoint()
self.stop_db(context=None)
self._rewind_against_master(service)
self.start_db(context=None)
def connect_to_master(self, service, snapshot):
# All that is required in postgresql to connect to a slave is to
# restart with a recovery.conf file in the data dir, which contains
# the connection information for the master.
assert operating_system.exists(self.pgsql_recovery_config,
as_root=True)
self.restart(context=None)
def _remove_recovery_file(self):
operating_system.remove(self.pgsql_recovery_config, as_root=True)
def _write_standby_recovery_file(self, snapshot, sslmode='prefer'):
logging_config = snapshot['log_position']
conninfo_params = \
{'host': snapshot['master']['host'],
'port': snapshot['master']['port'],
'repl_user': logging_config['replication_user']['name'],
'password': logging_config['replication_user']['password'],
'sslmode': sslmode}
conninfo = 'host=%(host)s ' \
'port=%(port)s ' \
'dbname=os_admin ' \
'user=%(repl_user)s ' \
'password=%(password)s ' \
'sslmode=%(sslmode)s ' % conninfo_params
recovery_conf = "standby_mode = 'on'\n"
recovery_conf += "primary_conninfo = '" + conninfo + "'\n"
recovery_conf += "trigger_file = '/tmp/postgresql.trigger'\n"
recovery_conf += "recovery_target_timeline='latest'\n"
operating_system.write_file(self.pgsql_recovery_config, recovery_conf,
codec=stream_codecs.IdentityCodec(),
as_root=True)
operating_system.chown(self.pgsql_recovery_config, user="postgres",
group="postgres", as_root=True)
def enable_hot_standby(self, service):
opts = {'hot_standby': 'on',
'wal_level': 'hot_standby'}
# wal_log_hints for pg_rewind is only supported in 9.4+
if self.pg_version[1] in ('9.4', '9.5'):
opts['wal_log_hints'] = 'on'
service.configuration_manager.\
apply_system_override(opts, SLAVE_STANDBY_OVERRIDE)
def get_replica_context(self, service):
repl_user_info = self._get_or_create_replication_user()
log_position = {
'replication_user': repl_user_info
}
return {
'master': self.get_master_ref(None, None),
'log_position': log_position
}

View File

@ -27,8 +27,8 @@ from trove.guestagent.common import operating_system
from trove.guestagent.common.operating_system import FileMode
from trove.guestagent.datastore.experimental.postgresql.service.config import(
PgSqlConfig)
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
PgBaseBackupUtil)
from trove.guestagent.datastore.experimental.postgresql.service.process import(
PgSqlProcess)
from trove.guestagent.strategies.restore import base
CONF = cfg.CONF
@ -113,7 +113,7 @@ class PgBaseBackup(base.RestoreRunner, PgSqlConfig):
def pre_restore(self):
self.stop_db(context=None)
PgBaseBackupUtil.recreate_wal_archive_dir()
PgSqlProcess.recreate_wal_archive_dir()
datadir = self.pgsql_data_dir
operating_system.remove(datadir, force=True, recursive=True,
as_root=True)

View File

@ -92,6 +92,11 @@ class Manager(periodic_task.PeriodicTasks):
instance.db_info.save()
def promote_to_replica_source(self, context, instance_id):
# TODO(atomic77) Promote and eject need to be able to handle the case
# where a datastore like Postgresql needs to treat the slave to be
# promoted differently from the old master and the slaves which will
# be simply reassigned to a new master. See:
# https://bugs.launchpad.net/trove/+bug/1553339
def _promote_to_replica_source(old_master, master_candidate,
replica_models):

View File

@ -0,0 +1 @@
# Currently unused

View File

@ -0,0 +1 @@
# Currently unused

View File

@ -177,7 +177,10 @@ module_create_groups.extend([groups.MODULE_CREATE,
groups.MODULE_DELETE])
replication_groups = list(instance_create_groups)
replication_groups.extend([replication_group.GROUP])
replication_groups.extend([groups.REPL_INST_DELETE_WAIT])
replication_promote_groups = list(replication_groups)
replication_promote_groups.extend([replication_group.GROUP])
root_actions_groups = list(instance_create_groups)
root_actions_groups.extend([root_actions_group.GROUP])
@ -204,6 +207,7 @@ register(["instance_error_create"], instance_error_create_groups)
register(["module"], module_groups)
register(["module_create"], module_create_groups)
register(["replication"], replication_groups)
register(["replication_promote"], replication_promote_groups)
register(["root"], root_actions_groups)
register(["user"], user_actions_groups)
@ -221,15 +225,15 @@ register(["couchdb_supported"], common_groups, backup_groups,
register(["postgresql_supported"], common_groups,
backup_groups, database_actions_groups, configuration_groups,
root_actions_groups, user_actions_groups,
backup_incremental_groups)
backup_incremental_groups, replication_groups)
register(["mysql_supported", "percona_supported"], common_groups,
backup_groups, configuration_groups, database_actions_groups,
replication_groups, root_actions_groups, user_actions_groups,
replication_promote_groups, root_actions_groups, user_actions_groups,
backup_incremental_groups)
register(["mariadb_supported"], common_groups,
backup_groups, cluster_actions_groups, configuration_groups,
database_actions_groups, replication_groups, root_actions_groups,
user_actions_groups)
database_actions_groups, replication_promote_groups,
root_actions_groups, user_actions_groups)
register(["mongodb_supported"], common_groups,
backup_groups, cluster_actions_groups, configuration_groups,
database_actions_groups, root_actions_groups, user_actions_groups)
@ -237,6 +241,6 @@ register(["pxc_supported"], common_groups,
backup_groups, configuration_groups, database_actions_groups,
cluster_actions_groups, root_actions_groups, user_actions_groups)
register(["redis_supported"], common_groups,
backup_groups, replication_groups, cluster_actions_groups)
backup_groups, replication_promote_groups, cluster_actions_groups)
register(["vertica_supported"], common_groups,
cluster_actions_groups, root_actions_groups, configuration_groups)

View File

@ -100,6 +100,7 @@ REPL_INST_CREATE_WAIT = "scenario.repl_inst_create_wait_grp"
REPL_INST_MULTI_CREATE = "scenario.repl_inst_multi_create_grp"
REPL_INST_DELETE_NON_AFFINITY_WAIT = "scenario.repl_inst_delete_noaff_wait_grp"
REPL_INST_MULTI_CREATE_WAIT = "scenario.repl_inst_multi_create_wait_grp"
REPL_INST_MULTI_PROMOTE = "scenario.repl_inst_multi_promote_grp"
REPL_INST_DELETE = "scenario.repl_inst_delete_grp"
REPL_INST_DELETE_WAIT = "scenario.repl_inst_delete_wait_grp"

View File

@ -168,7 +168,8 @@ class ReplicationInstDeleteNonAffReplWaitGroup(TestGroup):
self.test_runner.run_delete_non_affinity_master()
@test(depends_on_groups=[groups.REPL_INST_DELETE_NON_AFFINITY_WAIT],
@test(depends_on_groups=[groups.REPL_INST_DELETE_NON_AFFINITY_WAIT,
groups.REPL_INST_MULTI_CREATE],
groups=[GROUP, groups.REPL_INST_MULTI_CREATE_WAIT])
class ReplicationInstMultiCreateWaitGroup(TestGroup):
"""Wait for Replication Instance Multi-Create to complete."""
@ -205,12 +206,12 @@ class ReplicationInstMultiCreateWaitGroup(TestGroup):
@test(depends_on=[add_data_to_replicate],
runs_after=[verify_data_to_replicate])
def verify_replica_data_orig(self):
def verify_replica_data_orig2(self):
"""Verify original data was transferred to replicas."""
self.test_runner.run_verify_replica_data_orig()
@test(depends_on=[add_data_to_replicate],
runs_after=[verify_replica_data_orig])
runs_after=[verify_replica_data_orig2])
def verify_replica_data_new(self):
"""Verify new data was transferred to replicas."""
self.test_runner.run_verify_replica_data_new()
@ -239,8 +240,17 @@ class ReplicationInstMultiCreateWaitGroup(TestGroup):
"""Ensure deleting valid master fails."""
self.test_runner.run_delete_valid_master()
@test(depends_on=[wait_for_multiple_replicas],
runs_after=[delete_valid_master])
@test(depends_on_groups=[groups.REPL_INST_MULTI_CREATE_WAIT],
groups=[GROUP, groups.REPL_INST_MULTI_PROMOTE])
class ReplicationInstMultiPromoteGroup(TestGroup):
"""Test Replication Instance Multi-Promote functionality."""
def __init__(self):
super(ReplicationInstMultiPromoteGroup, self).__init__(
ReplicationRunnerFactory.instance())
@test
def promote_to_replica_source(self):
"""Test promoting a replica to replica source (master)."""
self.test_runner.run_promote_to_replica_source()
@ -250,8 +260,7 @@ class ReplicationInstMultiCreateWaitGroup(TestGroup):
"""Verify data is still on new master."""
self.test_runner.run_verify_replica_data_new_master()
@test(depends_on=[wait_for_multiple_replicas,
promote_to_replica_source],
@test(depends_on=[promote_to_replica_source],
runs_after=[verify_replica_data_new_master])
def add_data_to_replicate2(self):
"""Add data to new master to verify replication."""
@ -262,8 +271,7 @@ class ReplicationInstMultiCreateWaitGroup(TestGroup):
"""Verify data exists on new master."""
self.test_runner.run_verify_data_to_replicate2()
@test(depends_on=[wait_for_multiple_replicas,
add_data_to_replicate2],
@test(depends_on=[add_data_to_replicate2],
runs_after=[verify_data_to_replicate2])
def verify_replica_data_new2(self):
"""Verify data was transferred to new replicas."""
@ -290,20 +298,9 @@ class ReplicationInstMultiCreateWaitGroup(TestGroup):
"""Verify final data was transferred to all replicas."""
self.test_runner.run_verify_final_data_replicated()
@test(depends_on=[promote_original_source],
runs_after=[verify_final_data_replicated])
def remove_replicated_data(self):
"""Remove replication data."""
self.test_runner.run_remove_replicated_data()
@test(depends_on=[promote_original_source],
runs_after=[remove_replicated_data])
def detach_replica_from_source(self):
"""Test detaching a replica from the master."""
self.test_runner.run_detach_replica_from_source()
@test(depends_on_groups=[groups.REPL_INST_MULTI_CREATE_WAIT],
runs_after_groups=[groups.REPL_INST_MULTI_PROMOTE],
groups=[GROUP, groups.REPL_INST_DELETE])
class ReplicationInstDeleteGroup(TestGroup):
"""Test Replication Instance Delete functionality."""
@ -313,6 +310,16 @@ class ReplicationInstDeleteGroup(TestGroup):
ReplicationRunnerFactory.instance())
@test
def remove_replicated_data(self):
"""Remove replication data."""
self.test_runner.run_remove_replicated_data()
@test(runs_after=[remove_replicated_data])
def detach_replica_from_source(self):
"""Test detaching a replica from the master."""
self.test_runner.run_detach_replica_from_source()
@test(runs_after=[detach_replica_from_source])
def delete_detached_replica(self):
"""Test deleting the detached replica."""
self.test_runner.run_delete_detached_replica()

View File

@ -1 +1,2 @@
BUG_EJECT_VALID_MASTER = 1622014
BUG_WRONG_API_VALIDATION = 1498573

View File

@ -17,6 +17,8 @@ from proboscis import SkipTest
from trove.common import exception
from trove.common.utils import poll_until
from trove.tests.scenario import runners
from trove.tests.scenario.runners.test_runners import SkipKnownBug
from trove.tests.scenario.runners.test_runners import TestRunner
from troveclient.compat import exceptions
@ -52,7 +54,7 @@ class DatabaseActionsRunner(TestRunner):
expected_http_code):
self.auth_client.databases.create(instance_id, serial_databases_def)
self.assert_client_code(expected_http_code)
self._wait_for_database_create(instance_id, serial_databases_def)
self.wait_for_database_create(instance_id, serial_databases_def)
return serial_databases_def
def run_databases_list(self, expected_http_code=200):
@ -106,27 +108,6 @@ class DatabaseActionsRunner(TestRunner):
self.assert_pagination_match(
list_page, full_list, limit, len(full_list))
def _wait_for_database_create(self, instance_id, expected_database_defs):
expected_db_names = {db_def['name']
for db_def in expected_database_defs}
self.report.log("Waiting for all created databases to appear in the "
"listing: %s" % expected_db_names)
def _all_exist():
all_dbs = self._get_db_names(instance_id)
return all(db in all_dbs for db in expected_db_names)
try:
poll_until(_all_exist, time_out=self.GUEST_CAST_WAIT_TIMEOUT_SEC)
self.report.log("All databases now exist on the instance.")
except exception.PollTimeOut:
self.fail("Some databases were not created within the poll "
"timeout: %ds" % self.GUEST_CAST_WAIT_TIMEOUT_SEC)
def _get_db_names(self, instance_id):
full_list = self.auth_client.databases.list(instance_id)
return {database.name: database for database in full_list}
def run_database_create_with_no_attributes(
self, expected_exception=exceptions.BadRequest,
expected_http_code=400):
@ -190,7 +171,7 @@ class DatabaseActionsRunner(TestRunner):
"listing: %s" % deleted_database_name)
def _db_is_gone():
all_dbs = self._get_db_names(instance_id)
all_dbs = self.get_db_names(instance_id)
return deleted_database_name not in all_dbs
try:
@ -229,3 +210,12 @@ class DatabaseActionsRunner(TestRunner):
def get_system_databases(self):
return self.get_datastore_config_property('ignore_dbs')
class PostgresqlDatabaseActionsRunner(DatabaseActionsRunner):
def run_system_database_create(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)
def run_system_database_delete(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)

View File

@ -16,6 +16,8 @@
from proboscis import SkipTest
from trove.common import utils
from trove.tests.scenario import runners
from trove.tests.scenario.runners.test_runners import SkipKnownBug
from trove.tests.scenario.runners.test_runners import TestRunner
from troveclient.compat import exceptions
@ -197,6 +199,9 @@ class PostgresqlRootActionsRunner(RootActionsRunner):
def run_enable_root_with_password(self):
raise SkipTest("Operation is currently not supported.")
def run_delete_root(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)
class CouchbaseRootActionsRunner(RootActionsRunner):

View File

@ -683,12 +683,14 @@ class TestRunner(object):
"Creating a helper database '%s' on instance: %s"
% (database_def['name'], instance_id))
self.auth_client.databases.create(instance_id, [database_def])
self.wait_for_database_create(instance_id, [database_def])
if user_def:
self.report.log(
"Creating a helper user '%s:%s' on instance: %s"
% (user_def['name'], user_def['password'], instance_id))
self.auth_client.users.create(instance_id, [user_def])
self.wait_for_user_create(instance_id, [user_def])
if root_def:
# Not enabling root on a single instance of the cluster here
@ -721,6 +723,48 @@ class TestRunner(object):
_get_credentials(credentials),
_get_credentials(credentials_root))
def wait_for_user_create(self, instance_id, expected_user_defs):
expected_user_names = {user_def['name']
for user_def in expected_user_defs}
self.report.log("Waiting for all created users to appear in the "
"listing: %s" % expected_user_names)
def _all_exist():
all_users = self.get_user_names(instance_id)
return all(usr in all_users for usr in expected_user_names)
try:
poll_until(_all_exist, time_out=self.GUEST_CAST_WAIT_TIMEOUT_SEC)
self.report.log("All users now exist on the instance.")
except exception.PollTimeOut:
self.fail("Some users were not created within the poll "
"timeout: %ds" % self.GUEST_CAST_WAIT_TIMEOUT_SEC)
def get_user_names(self, instance_id):
full_list = self.auth_client.users.list(instance_id)
return {user.name: user for user in full_list}
def wait_for_database_create(self, instance_id, expected_database_defs):
expected_db_names = {db_def['name']
for db_def in expected_database_defs}
self.report.log("Waiting for all created databases to appear in the "
"listing: %s" % expected_db_names)
def _all_exist():
all_dbs = self.get_db_names(instance_id)
return all(db in all_dbs for db in expected_db_names)
try:
poll_until(_all_exist, time_out=self.GUEST_CAST_WAIT_TIMEOUT_SEC)
self.report.log("All databases now exist on the instance.")
except exception.PollTimeOut:
self.fail("Some databases were not created within the poll "
"timeout: %ds" % self.GUEST_CAST_WAIT_TIMEOUT_SEC)
def get_db_names(self, instance_id):
full_list = self.auth_client.databases.list(instance_id)
return {database.name: database for database in full_list}
class CheckInstance(AttrCheck):
"""Class to check various attributes of Instance details."""

View File

@ -19,6 +19,8 @@ from proboscis import SkipTest
from trove.common import exception
from trove.common.utils import poll_until
from trove.tests.scenario import runners
from trove.tests.scenario.runners.test_runners import SkipKnownBug
from trove.tests.scenario.runners.test_runners import TestRunner
from troveclient.compat import exceptions
@ -65,30 +67,9 @@ class UserActionsRunner(TestRunner):
expected_http_code):
self.auth_client.users.create(instance_id, serial_users_def)
self.assert_client_code(expected_http_code)
self._wait_for_user_create(instance_id, serial_users_def)
self.wait_for_user_create(instance_id, serial_users_def)
return serial_users_def
def _wait_for_user_create(self, instance_id, expected_user_defs):
expected_user_names = {user_def['name']
for user_def in expected_user_defs}
self.report.log("Waiting for all created users to appear in the "
"listing: %s" % expected_user_names)
def _all_exist():
all_users = self._get_user_names(instance_id)
return all(usr in all_users for usr in expected_user_names)
try:
poll_until(_all_exist, time_out=self.GUEST_CAST_WAIT_TIMEOUT_SEC)
self.report.log("All users now exist on the instance.")
except exception.PollTimeOut:
self.fail("Some users were not created within the poll "
"timeout: %ds" % self.GUEST_CAST_WAIT_TIMEOUT_SEC)
def _get_user_names(self, instance_id):
full_list = self.auth_client.users.list(instance_id)
return {user.name: user for user in full_list}
def run_user_show(self, expected_http_code=200):
for user_def in self.user_defs:
self.assert_user_show(
@ -368,7 +349,7 @@ class UserActionsRunner(TestRunner):
user_def.update(update_attribites)
expected_def = user_def
self._wait_for_user_create(instance_id, self.user_defs)
self.wait_for_user_create(instance_id, self.user_defs)
# Verify using 'user-show' and 'user-list'.
self.assert_user_show(instance_id, expected_def, 200)
@ -415,7 +396,7 @@ class UserActionsRunner(TestRunner):
"listing: %s" % deleted_user_name)
def _db_is_gone():
all_users = self._get_user_names(instance_id)
all_users = self.get_user_names(instance_id)
return deleted_user_name not in all_users
try:
@ -517,3 +498,18 @@ class PxcUserActionsRunner(MysqlUserActionsRunner):
def __init__(self):
super(PxcUserActionsRunner, self).__init__()
class PostgresqlUserActionsRunner(UserActionsRunner):
def run_user_update_with_existing_name(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)
def run_system_user_show(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)
def run_system_user_attribute_update(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)
def run_system_user_delete(self):
raise SkipKnownBug(runners.BUG_WRONG_API_VALIDATION)