Postgresql Incremental Backup and Restore
Full and incremental backup and restore strategy for postgres based on pg_basebackup and WAL shipping. Full backups are effectively data directory filesystem snapshots aided by the use of the pg_basebackup tool. Incremental backups are performed by creating recovery points and backing up the appropriate WAL files since the previous backup. Restore is done by recreating the data directory based on a pg_basebackup snapshot and copying the needed WAL files for incremental recovery. A new scenario test group for incremental backups was also added. (Note that parts of the new test have already been refactored in: https://review.openstack.org/#/c/324060 ) Implements: bp/pgsql-incremental-backup Change-Id: I60d52d8cdf7744976c2cae0ea1aca2fd8d07e902
This commit is contained in:
parent
1575feb55c
commit
72dd3d1f5c
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- Full and incremental backup and restore strategy for postgres
|
||||
based on pg_basebackup and WAL shipping.
|
@ -1044,15 +1044,20 @@ postgresql_opts = [
|
||||
'if trove_security_groups_support is True).'),
|
||||
cfg.PortOpt('postgresql_port', default=5432,
|
||||
help='The TCP port the server listens on.'),
|
||||
cfg.StrOpt('backup_strategy', default='PgDump',
|
||||
cfg.StrOpt('backup_strategy', default='PgBaseBackup',
|
||||
help='Default strategy to perform backups.'),
|
||||
cfg.DictOpt('backup_incremental_strategy', default={},
|
||||
cfg.DictOpt('backup_incremental_strategy',
|
||||
default={'PgBaseBackup': 'PgBaseBackupIncremental'},
|
||||
help='Incremental Backup Runner based on the default '
|
||||
'strategy. For strategies that do not implement an '
|
||||
'incremental, the runner will use the default full backup.'),
|
||||
cfg.StrOpt('mount_point', default='/var/lib/postgresql',
|
||||
help="Filesystem path for mounting "
|
||||
"volumes if volume support is enabled."),
|
||||
cfg.StrOpt('wal_archive_location', default='/mnt/wal_archive',
|
||||
help="Filesystem path storing WAL archive files when "
|
||||
"WAL-shipping based backups or replication "
|
||||
"is enabled."),
|
||||
cfg.BoolOpt('root_on_create', default=False,
|
||||
help='Enable the automatic creation of the root user for the '
|
||||
'service during instance-create. The generated password for '
|
||||
|
@ -228,6 +228,10 @@ class FileMode(object):
|
||||
def SET_USR_RW(cls):
|
||||
return cls(reset=[stat.S_IRUSR | stat.S_IWUSR]) # =0600
|
||||
|
||||
@classmethod
|
||||
def SET_USR_RWX(cls):
|
||||
return cls(reset=[stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR]) # =0700
|
||||
|
||||
@classmethod
|
||||
def ADD_ALL_R(cls):
|
||||
return cls(add=[stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH]) # +0444
|
||||
|
@ -103,12 +103,14 @@ class Manager(
|
||||
device.mount(mount_point)
|
||||
self.configuration_manager.save_configuration(config_contents)
|
||||
self.apply_initial_guestagent_configuration()
|
||||
self.start_db(context)
|
||||
|
||||
if backup_info:
|
||||
backup.restore(context, backup_info, '/tmp')
|
||||
pgutil.PG_ADMIN = self.ADMIN_USER
|
||||
else:
|
||||
backup.restore(context, backup_info, '/tmp')
|
||||
|
||||
self.start_db(context)
|
||||
|
||||
if not backup_info:
|
||||
self._secure(context)
|
||||
|
||||
def _secure(self, context):
|
||||
@ -123,4 +125,5 @@ class Manager(
|
||||
|
||||
def create_backup(self, context, backup_info):
|
||||
with EndNotification(context):
|
||||
self.enable_backups()
|
||||
backup.backup(context, backup_info)
|
||||
|
@ -35,6 +35,8 @@ from trove.guestagent.datastore.experimental.postgresql import pgutil
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
BACKUP_CFG_OVERRIDE = 'PgBaseBackupConfig'
|
||||
|
||||
|
||||
class PgSqlConfig(PgSqlProcess):
|
||||
"""Mixin that implements the config API.
|
||||
@ -63,6 +65,17 @@ class PgSqlConfig(PgSqlProcess):
|
||||
requires_root=True,
|
||||
override_strategy=OneFileOverrideStrategy(revision_dir))
|
||||
|
||||
@property
|
||||
def pgsql_extra_bin_dir(self):
|
||||
"""Redhat and Ubuntu packages for PgSql do not place 'extra' important
|
||||
binaries in /usr/bin, but rather in a directory like /usr/pgsql-9.4/bin
|
||||
in the case of PostgreSQL 9.4 for RHEL/CentOS
|
||||
"""
|
||||
version = self.pg_version[1]
|
||||
return {operating_system.DEBIAN: '/usr/lib/postgresql/%s/bin',
|
||||
operating_system.REDHAT: '/usr/pgsql-%s/bin',
|
||||
operating_system.SUSE: '/usr/bin'}[self.OS] % version
|
||||
|
||||
@property
|
||||
def pgsql_config(self):
|
||||
return self._find_config_file('postgresql.conf')
|
||||
@ -156,7 +169,8 @@ class PgSqlConfig(PgSqlProcess):
|
||||
# The OrderedDict is necessary to guarantee the iteration order.
|
||||
access_rules = OrderedDict(
|
||||
[('local', [['all', 'postgres,os_admin', None, 'trust'],
|
||||
['all', 'all', None, 'md5']]),
|
||||
['all', 'all', None, 'md5'],
|
||||
['replication', 'postgres,os_admin', None, 'trust']]),
|
||||
('host', [['all', 'postgres,os_admin', '127.0.0.1/32', 'trust'],
|
||||
['all', 'postgres,os_admin', '::1/128', 'trust'],
|
||||
['all', 'postgres,os_admin', 'localhost', 'trust'],
|
||||
@ -174,3 +188,35 @@ class PgSqlConfig(PgSqlProcess):
|
||||
as_root=True)
|
||||
operating_system.chmod(self.pgsql_hba_config, FileMode.SET_USR_RO,
|
||||
as_root=True)
|
||||
|
||||
def enable_backups(self):
|
||||
"""Apply necessary changes to config to enable WAL-based backups
|
||||
if we are using the PgBaseBackup strategy
|
||||
"""
|
||||
if not CONF.postgresql.backup_strategy == 'PgBaseBackup':
|
||||
return
|
||||
if self.configuration_manager.has_system_override(BACKUP_CFG_OVERRIDE):
|
||||
return
|
||||
|
||||
LOG.info("Applying changes to WAL config for use by base backups")
|
||||
wal_arch_loc = CONF.postgresql.wal_archive_location
|
||||
if not os.path.isdir(wal_arch_loc):
|
||||
raise RuntimeError(_("Cannot enable backup as WAL dir '%s' does "
|
||||
"not exist.") % wal_arch_loc)
|
||||
arch_cmd = "'test ! -f {wal_arch}/%f && cp %p {wal_arch}/%f'".format(
|
||||
wal_arch=wal_arch_loc
|
||||
)
|
||||
opts = {
|
||||
'wal_level': 'hot_standby',
|
||||
'archive_mode ': 'on',
|
||||
'max_wal_senders': 3,
|
||||
'checkpoint_segments ': 8,
|
||||
'wal_keep_segments': 8,
|
||||
'archive_command': arch_cmd
|
||||
}
|
||||
if not self.pg_version[1] in ('9.3'):
|
||||
opts['wal_log_hints'] = 'on'
|
||||
|
||||
self.configuration_manager.apply_system_override(
|
||||
opts, BACKUP_CFG_OVERRIDE)
|
||||
self.restart(None)
|
||||
|
@ -20,6 +20,8 @@ from trove.common.i18n import _
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.process import(
|
||||
PgSqlProcess)
|
||||
from trove.guestagent import pkg
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
|
||||
PgBaseBackupUtil)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
@ -46,6 +48,9 @@ class PgSqlInstall(PgSqlProcess):
|
||||
guest_id=CONF.guest_id
|
||||
)
|
||||
)
|
||||
|
||||
PgBaseBackupUtil.recreate_wal_archive_dir()
|
||||
|
||||
packager = pkg.Package()
|
||||
if not packager.pkg_is_installed(packages):
|
||||
try:
|
||||
|
@ -13,8 +13,31 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common import utils
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore.experimental.postgresql import pgutil
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.config import(
|
||||
PgSqlConfig)
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.process import(
|
||||
PgSqlProcess)
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.users import(
|
||||
PgSqlUsers)
|
||||
from trove.guestagent.strategies.backup import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAL_ARCHIVE_DIR = CONF.postgresql.wal_archive_location
|
||||
|
||||
|
||||
class PgDump(base.BackupRunner):
|
||||
"""Implementation of Backup Strategy for pg_dump."""
|
||||
@ -24,3 +47,219 @@ class PgDump(base.BackupRunner):
|
||||
def cmd(self):
|
||||
cmd = 'sudo -u postgres pg_dumpall '
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
|
||||
class PgBaseBackupUtil(object):
|
||||
|
||||
def most_recent_backup_wal(self, pos=0):
|
||||
"""
|
||||
Return the WAL file for the most recent backup
|
||||
"""
|
||||
mrb_file = self.most_recent_backup_file(pos=pos)
|
||||
# just return the first part of the filename
|
||||
return mrb_file.split('.')[0]
|
||||
|
||||
def most_recent_backup_file(self, pos=0):
|
||||
"""
|
||||
Look for the most recent .backup file that basebackup creates
|
||||
:return: a string like 000000010000000000000006.00000168.backup
|
||||
"""
|
||||
walre = re.compile("[0-9A-F]{24}.*.backup")
|
||||
wal_files = [wal_file for wal_file in os.listdir(WAL_ARCHIVE_DIR)
|
||||
if walre.search(wal_file)]
|
||||
wal_files = sorted(wal_files, reverse=True)
|
||||
if not wal_files:
|
||||
return None
|
||||
return wal_files[pos]
|
||||
|
||||
def log_files_since_last_backup(self, pos=0):
|
||||
"""Return the WAL files since the provided last backup
|
||||
pg_archivebackup depends on alphanumeric sorting to decide wal order,
|
||||
so we'll do so too:
|
||||
https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib
|
||||
/pg_archivecleanup/pg_archivecleanup.c#L122
|
||||
"""
|
||||
last_wal = self.most_recent_backup_wal(pos=pos)
|
||||
walre = re.compile("^[0-9A-F]{24}$")
|
||||
wal_files = [wal_file for wal_file in os.listdir(WAL_ARCHIVE_DIR)
|
||||
if walre.search(wal_file) and wal_file >= last_wal]
|
||||
return wal_files
|
||||
|
||||
@staticmethod
|
||||
def recreate_wal_archive_dir():
|
||||
operating_system.remove(WAL_ARCHIVE_DIR, force=True, recursive=True,
|
||||
as_root=True)
|
||||
operating_system.create_directory(WAL_ARCHIVE_DIR,
|
||||
user=PgSqlProcess.PGSQL_OWNER,
|
||||
group=PgSqlProcess.PGSQL_OWNER,
|
||||
force=True, as_root=True)
|
||||
|
||||
|
||||
class PgBaseBackup(base.BackupRunner, PgSqlConfig, PgBaseBackupUtil,
|
||||
PgSqlUsers):
|
||||
"""Base backups are taken with the pg_basebackup filesystem-level backup
|
||||
tool pg_basebackup creates a copy of the binary files in the PostgreSQL
|
||||
cluster data directory and enough WAL segments to allow the database to
|
||||
be brought back to a consistent state. Associated with each backup is a
|
||||
log location, normally indicated by the WAL file name and the position
|
||||
inside the file.
|
||||
"""
|
||||
__strategy_name__ = 'pg_basebackup'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PgBaseBackup, self).__init__(*args, **kwargs)
|
||||
self.label = None
|
||||
self.stop_segment = None
|
||||
self.start_segment = None
|
||||
self.start_wal_file = None
|
||||
self.stop_wal_file = None
|
||||
self.checkpoint_location = None
|
||||
self.mrb = None
|
||||
|
||||
@property
|
||||
def cmd(self):
|
||||
cmd = ("pg_basebackup -h %s -U %s --pgdata=-"
|
||||
" --label=%s --format=tar --xlog " %
|
||||
(self.UNIX_SOCKET_DIR, self.ADMIN_USER, self.base_filename))
|
||||
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
def base_backup_metadata(self, metadata_file):
|
||||
"""Parse the contents of the .backup file"""
|
||||
metadata = {}
|
||||
operating_system.chmod(
|
||||
metadata_file, FileMode(add=[stat.S_IROTH]), as_root=True)
|
||||
|
||||
start_re = re.compile("START WAL LOCATION: (.*) \(file (.*)\)")
|
||||
stop_re = re.compile("STOP WAL LOCATION: (.*) \(file (.*)\)")
|
||||
checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)")
|
||||
label_re = re.compile("LABEL: (.*)")
|
||||
|
||||
metadata_contents = operating_system.read_file(metadata_file)
|
||||
match = start_re.search(metadata_contents)
|
||||
if match:
|
||||
self.start_segment = match.group(1)
|
||||
metadata['start-segment'] = self.start_segment
|
||||
self.start_wal_file = match.group(2)
|
||||
metadata['start-wal-file'] = self.start_wal_file
|
||||
|
||||
match = stop_re.search(metadata_contents)
|
||||
if match:
|
||||
self.stop_segment = match.group(1)
|
||||
metadata['stop-segment'] = self.stop_segment
|
||||
self.stop_wal_file = match.group(2)
|
||||
metadata['stop-wal-file'] = self.stop_wal_file
|
||||
|
||||
match = checkpt_re.search(metadata_contents)
|
||||
if match:
|
||||
self.checkpoint_location = match.group(1)
|
||||
metadata['checkpoint-location'] = self.checkpoint_location
|
||||
|
||||
match = label_re.search(metadata_contents)
|
||||
if match:
|
||||
self.label = match.group(1)
|
||||
metadata['label'] = self.label
|
||||
return metadata
|
||||
|
||||
def check_process(self):
|
||||
# If any of the below variables were not set by either metadata()
|
||||
# or direct retrieval from the pgsql backup commands, then something
|
||||
# has gone wrong
|
||||
if not self.start_segment or not self.start_wal_file:
|
||||
LOG.info(_("Unable to determine starting WAL file/segment"))
|
||||
return False
|
||||
if not self.stop_segment or not self.stop_wal_file:
|
||||
LOG.info(_("Unable to determine ending WAL file/segment"))
|
||||
return False
|
||||
if not self.label:
|
||||
LOG.info(_("No backup label found"))
|
||||
return False
|
||||
return True
|
||||
|
||||
def metadata(self):
|
||||
"""pg_basebackup may complete, and we arrive here before the
|
||||
history file is written to the wal archive. So we need to
|
||||
handle two possibilities:
|
||||
- this is the first backup, and no history file exists yet
|
||||
- this isn't the first backup, and so the history file we retrieve
|
||||
isn't the one we just ran!
|
||||
"""
|
||||
def _metadata_found():
|
||||
LOG.debug("Polling for backup metadata... ")
|
||||
self.mrb = self.most_recent_backup_file()
|
||||
if not self.mrb:
|
||||
LOG.debug("No history files found!")
|
||||
return False
|
||||
metadata = self.base_backup_metadata(
|
||||
os.path.join(WAL_ARCHIVE_DIR, self.mrb))
|
||||
LOG.debug("Label to pg_basebackup: %s label found: %s" %
|
||||
(self.base_filename, metadata['label']))
|
||||
LOG.info(_("Metadata for backup: %s.") % str(metadata))
|
||||
return metadata['label'] == self.base_filename
|
||||
|
||||
try:
|
||||
utils.poll_until(_metadata_found, sleep_time=5, time_out=60)
|
||||
except exception.PollTimeOut:
|
||||
raise RuntimeError(_("Timeout waiting for backup metadata for"
|
||||
" backup %s") % self.base_filename)
|
||||
|
||||
return self.base_backup_metadata(
|
||||
os.path.join(WAL_ARCHIVE_DIR, self.mrb))
|
||||
|
||||
def _run_post_backup(self):
|
||||
"""Get rid of WAL data we don't need any longer"""
|
||||
arch_cleanup_bin = os.path.join(self.pgsql_extra_bin_dir,
|
||||
"pg_archivecleanup")
|
||||
bk_file = os.path.basename(self.most_recent_backup_file())
|
||||
cmd_full = " ".join((arch_cleanup_bin, WAL_ARCHIVE_DIR, bk_file))
|
||||
utils.execute("sudo", "su", "-", self.PGSQL_OWNER, "-c",
|
||||
"%s" % cmd_full)
|
||||
|
||||
|
||||
class PgBaseBackupIncremental(PgBaseBackup):
|
||||
"""To restore an incremental backup from a previous backup, in PostgreSQL,
|
||||
is effectively to replay the WAL entries to a designated point in time.
|
||||
All that is required is the most recent base backup, and all WAL files
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if (not kwargs.get('parent_location') or
|
||||
not kwargs.get('parent_checksum')):
|
||||
raise AttributeError(_('Parent missing!'))
|
||||
|
||||
super(PgBaseBackupIncremental, self).__init__(*args, **kwargs)
|
||||
self.parent_location = kwargs.get('parent_location')
|
||||
self.parent_checksum = kwargs.get('parent_checksum')
|
||||
|
||||
def _run_pre_backup(self):
|
||||
self.backup_label = self.base_filename
|
||||
result = pgutil.query("SELECT pg_start_backup('%s', true)" %
|
||||
self.backup_label)
|
||||
self.start_segment = result[0][0]
|
||||
|
||||
result = pgutil.query("SELECT pg_xlogfile_name('%s')" %
|
||||
self.start_segment)
|
||||
self.start_wal_file = result[0][0]
|
||||
|
||||
result = pgutil.query("SELECT pg_stop_backup()")
|
||||
self.stop_segment = result[0][0]
|
||||
|
||||
# We have to hack this because self.command is
|
||||
# initialized in the base class before we get here, which is
|
||||
# when we will know exactly what WAL files we want to archive
|
||||
self.command = self._cmd()
|
||||
|
||||
def _cmd(self):
|
||||
wal_file_list = self.log_files_since_last_backup(pos=1)
|
||||
cmd = 'sudo tar -cf - -C {wal_dir} {wal_list} '.format(
|
||||
wal_dir=WAL_ARCHIVE_DIR,
|
||||
wal_list=" ".join(wal_file_list))
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
def metadata(self):
|
||||
_meta = super(PgBaseBackupIncremental, self).metadata()
|
||||
_meta.update({
|
||||
'parent_location': self.parent_location,
|
||||
'parent_checksum': self.parent_checksum,
|
||||
})
|
||||
return _meta
|
||||
|
@ -13,21 +13,33 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common import stream_codecs
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.config import(
|
||||
PgSqlConfig)
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
|
||||
PgBaseBackupUtil)
|
||||
from trove.guestagent.strategies.restore import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAL_ARCHIVE_DIR = CONF.postgresql.wal_archive_location
|
||||
|
||||
|
||||
class PgDump(base.RestoreRunner):
|
||||
"""Implementation of Restore Strategy for pg_dump."""
|
||||
__strategy_name__ = 'pg_dump'
|
||||
base_restore_cmd = 'sudo -u postgres psql '
|
||||
base_restore_cmd = 'psql -U os_admin'
|
||||
|
||||
IGNORED_ERROR_PATTERNS = [
|
||||
re.compile("ERROR:\s*role \"postgres\" already exists"),
|
||||
@ -60,7 +72,7 @@ class PgDump(base.RestoreRunner):
|
||||
content_length += len(chunk)
|
||||
process.stdin.close()
|
||||
self._handle_errors(process)
|
||||
LOG.debug("Restored %s bytes from stream." % content_length)
|
||||
LOG.info(_("Restored %s bytes from stream.") % content_length)
|
||||
|
||||
return content_length
|
||||
|
||||
@ -80,3 +92,105 @@ class PgDump(base.RestoreRunner):
|
||||
raise exception(message)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
class PgBaseBackup(base.RestoreRunner, PgSqlConfig):
|
||||
"""Implementation of Restore Strategy for pg_basebackup."""
|
||||
__strategy_name__ = 'pg_basebackup'
|
||||
location = ""
|
||||
base_restore_cmd = ''
|
||||
|
||||
IGNORED_ERROR_PATTERNS = [
|
||||
re.compile("ERROR:\s*role \"postgres\" already exists"),
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.base_restore_cmd = 'sudo -u %s tar xCf %s - ' % (
|
||||
self.PGSQL_OWNER, self.pgsql_data_dir
|
||||
)
|
||||
|
||||
super(PgBaseBackup, self).__init__(*args, **kwargs)
|
||||
|
||||
def pre_restore(self):
|
||||
self.stop_db(context=None)
|
||||
PgBaseBackupUtil.recreate_wal_archive_dir()
|
||||
datadir = self.pgsql_data_dir
|
||||
operating_system.remove(datadir, force=True, recursive=True,
|
||||
as_root=True)
|
||||
operating_system.create_directory(datadir, user=self.PGSQL_OWNER,
|
||||
group=self.PGSQL_OWNER, force=True,
|
||||
as_root=True)
|
||||
|
||||
def post_restore(self):
|
||||
operating_system.chmod(self.pgsql_data_dir,
|
||||
FileMode.SET_USR_RWX(),
|
||||
as_root=True, recursive=True, force=True)
|
||||
|
||||
def write_recovery_file(self, restore=False):
|
||||
metadata = self.storage.load_metadata(self.location, self.checksum)
|
||||
recovery_conf = ""
|
||||
recovery_conf += "recovery_target_name = '%s' \n" % metadata['label']
|
||||
recovery_conf += "recovery_target_timeline = '%s' \n" % 1
|
||||
|
||||
if restore:
|
||||
recovery_conf += ("restore_command = '" +
|
||||
self.pgsql_restore_cmd + "'\n")
|
||||
|
||||
recovery_file = os.path.join(self.pgsql_data_dir, 'recovery.conf')
|
||||
operating_system.write_file(recovery_file, recovery_conf,
|
||||
codec=stream_codecs.IdentityCodec(),
|
||||
as_root=True)
|
||||
operating_system.chown(recovery_file, user=self.PGSQL_OWNER,
|
||||
group=self.PGSQL_OWNER, as_root=True)
|
||||
|
||||
|
||||
class PgBaseBackupIncremental(PgBaseBackup):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PgBaseBackupIncremental, self).__init__(*args, **kwargs)
|
||||
self.content_length = 0
|
||||
self.incr_restore_cmd = 'sudo -u %s tar -xf - -C %s ' % (
|
||||
self.PGSQL_OWNER, WAL_ARCHIVE_DIR
|
||||
)
|
||||
self.pgsql_restore_cmd = "cp " + WAL_ARCHIVE_DIR + '/%f "%p"'
|
||||
|
||||
def pre_restore(self):
|
||||
self.stop_db(context=None)
|
||||
|
||||
def post_restore(self):
|
||||
self.write_recovery_file(restore=True)
|
||||
|
||||
def _incremental_restore_cmd(self, incr=False):
|
||||
args = {'restore_location': self.restore_location}
|
||||
cmd = self.base_restore_cmd
|
||||
if incr:
|
||||
cmd = self.incr_restore_cmd
|
||||
return self.decrypt_cmd + self.unzip_cmd + (cmd % args)
|
||||
|
||||
def _incremental_restore(self, location, checksum):
|
||||
|
||||
metadata = self.storage.load_metadata(location, checksum)
|
||||
if 'parent_location' in metadata:
|
||||
LOG.info("Found parent at %s", metadata['parent_location'])
|
||||
parent_location = metadata['parent_location']
|
||||
parent_checksum = metadata['parent_checksum']
|
||||
self._incremental_restore(parent_location, parent_checksum)
|
||||
cmd = self._incremental_restore_cmd(incr=True)
|
||||
self.content_length += self._unpack(location, checksum, cmd)
|
||||
|
||||
else:
|
||||
# For the parent base backup, revert to the default restore cmd
|
||||
LOG.info(_("Recursed back to full backup."))
|
||||
|
||||
super(PgBaseBackupIncremental, self).pre_restore()
|
||||
cmd = self._incremental_restore_cmd(incr=False)
|
||||
self.content_length += self._unpack(location, checksum, cmd)
|
||||
|
||||
operating_system.chmod(self.pgsql_data_dir,
|
||||
FileMode.SET_USR_RWX(),
|
||||
as_root=True, recursive=True, force=True)
|
||||
|
||||
def _run_restore(self):
|
||||
self._incremental_restore(self.location, self.checksum)
|
||||
# content-length restored
|
||||
return self.content_length
|
||||
|
@ -144,6 +144,9 @@ instance_create_groups.extend([instance_create_group.GROUP,
|
||||
backup_groups = list(instance_create_groups)
|
||||
backup_groups.extend([backup_group.GROUP])
|
||||
|
||||
incremental_backup_groups = list(instance_create_groups)
|
||||
incremental_backup_groups.extend([backup_group.GROUP_INCREMENTAL])
|
||||
|
||||
configuration_groups = list(instance_create_groups)
|
||||
configuration_groups.extend([configuration_group.GROUP])
|
||||
|
||||
@ -178,6 +181,7 @@ common_groups.extend([guest_log_groups, module_groups])
|
||||
|
||||
# Register: Component based groups
|
||||
register(["backup"], backup_groups)
|
||||
register(["incremental_backup"], incremental_backup_groups)
|
||||
register(["cluster"], cluster_actions_groups)
|
||||
register(["configuration"], configuration_groups)
|
||||
register(["database"], database_actions_groups)
|
||||
@ -203,7 +207,8 @@ register(["couchdb_supported"], common_groups, backup_groups,
|
||||
user_actions_groups, database_actions_groups, root_actions_groups)
|
||||
register(["postgresql_supported"], common_groups,
|
||||
backup_groups, database_actions_groups, configuration_groups,
|
||||
root_actions_groups, user_actions_groups)
|
||||
root_actions_groups, user_actions_groups,
|
||||
incremental_backup_groups)
|
||||
register(["mysql_supported", "percona_supported"], common_groups,
|
||||
backup_groups, configuration_groups, database_actions_groups,
|
||||
replication_groups, root_actions_groups, user_actions_groups)
|
||||
|
@ -25,6 +25,10 @@ GROUP_BACKUP = "scenario.backup_group"
|
||||
GROUP_BACKUP_LIST = "scenario.backup_list_group"
|
||||
GROUP_RESTORE = "scenario.restore_group"
|
||||
|
||||
GROUP_INCREMENTAL = "scenario.incremental_backup_restore_group"
|
||||
GROUP_INCREMENTAL_BACKUP = "scenario.incremental_backup_group"
|
||||
GROUP_INCREMENTAL_RESTORE = "scenario.incremental_restore_group"
|
||||
|
||||
|
||||
class BackupRunnerFactory(test_runners.RunnerFactory):
|
||||
|
||||
@ -32,6 +36,12 @@ class BackupRunnerFactory(test_runners.RunnerFactory):
|
||||
_runner_cls = 'BackupRunner'
|
||||
|
||||
|
||||
class IncrementalBackupRunnerFactory(test_runners.RunnerFactory):
|
||||
|
||||
_runner_ns = 'backup_runners'
|
||||
_runner_cls = 'IncrementalBackupRunner'
|
||||
|
||||
|
||||
@test(depends_on_groups=[instance_create_group.GROUP], groups=[GROUP])
|
||||
class BackupGroup(TestGroup):
|
||||
"""Test Backup and Restore functionality."""
|
||||
@ -194,3 +204,43 @@ class BackupGroup(TestGroup):
|
||||
def check_for_incremental_backup(self):
|
||||
"""Test that backup children are deleted."""
|
||||
self.test_runner.run_check_for_incremental_backup()
|
||||
|
||||
|
||||
@test(depends_on_groups=[instance_create_group.GROUP],
|
||||
groups=[GROUP_INCREMENTAL])
|
||||
class IncrementalBackupGroup(TestGroup):
|
||||
"""Test Incremental Backup and Restore functionality."""
|
||||
|
||||
def __init__(self):
|
||||
super(IncrementalBackupGroup, self).__init__(
|
||||
IncrementalBackupRunnerFactory.instance())
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_BACKUP])
|
||||
def backup_run_single_incremental(self):
|
||||
"""Run a full and a single incremental backup"""
|
||||
self.test_runner.run_backup_incremental(increments=1)
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_BACKUP],
|
||||
depends_on=[backup_run_single_incremental])
|
||||
def restore_from_incremental(self):
|
||||
"""Launch a restore from a single incremental backup"""
|
||||
self.test_runner.run_restore_from_incremental(increment=1)
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_incremental])
|
||||
def restore_from_backup_completed(self):
|
||||
"""Wait until restoring an instance from an incr. backup completes."""
|
||||
self.test_runner.run_restore_from_backup_completed()
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_backup_completed])
|
||||
def verify_data_in_restored_instance(self):
|
||||
"""Verify data in restored instance."""
|
||||
self.test_runner.run_verify_data_in_restored_instance()
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_backup_completed],
|
||||
runs_after=[verify_data_in_restored_instance])
|
||||
def delete_restored_instance(self):
|
||||
"""Test deleting the restored instance."""
|
||||
self.test_runner.run_delete_restored_instance()
|
||||
|
@ -52,7 +52,7 @@ class PostgresqlHelper(SqlHelper):
|
||||
|
||||
def get_invalid_groups(self):
|
||||
return [{'timezone': 997},
|
||||
{"max_worker_processes": 'string_value'},
|
||||
{"vacuum_cost_delay": 'string_value'},
|
||||
{"standard_conforming_strings": 'string_value'}]
|
||||
|
||||
def get_exposed_user_log_names(self):
|
||||
|
@ -23,7 +23,90 @@ from trove.tests.scenario.helpers.test_helper import DataType
|
||||
from trove.tests.scenario.runners.test_runners import TestRunner
|
||||
|
||||
|
||||
class BackupRunner(TestRunner):
|
||||
class BackupRunnerMixin(TestRunner):
|
||||
def _verify_backup(self, backup_id):
|
||||
def _result_is_active():
|
||||
backup = self.auth_client.backups.get(backup_id)
|
||||
if backup.status == 'COMPLETED':
|
||||
return True
|
||||
else:
|
||||
self.assert_not_equal('FAILED', backup.status,
|
||||
'Backup status should not be')
|
||||
return False
|
||||
|
||||
poll_until(_result_is_active, time_out=self.TIMEOUT_BACKUP_CREATE)
|
||||
|
||||
def _wait_until_backup_is_gone(self, backup_id):
|
||||
def _backup_is_gone():
|
||||
try:
|
||||
self.auth_client.backups.get(backup_id)
|
||||
return False
|
||||
except exceptions.NotFound:
|
||||
return True
|
||||
|
||||
poll_until(_backup_is_gone,
|
||||
time_out=self.TIMEOUT_BACKUP_DELETE)
|
||||
|
||||
def assert_restore_from_backup(self, backup_ref):
|
||||
result = self._restore_from_backup(backup_ref)
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
||||
def _restore_from_backup(self, backup_ref):
|
||||
restore_point = {'backupRef': backup_ref}
|
||||
result = self.auth_client.instances.create(
|
||||
self.instance_info.name + '_restore',
|
||||
self.instance_info.dbaas_flavor_href,
|
||||
self.instance_info.volume,
|
||||
nics=self.instance_info.nics,
|
||||
restorePoint=restore_point,
|
||||
datastore=self.instance_info.dbaas_datastore,
|
||||
datastore_version=self.instance_info.dbaas_datastore_version)
|
||||
return result
|
||||
|
||||
def run_restore_from_backup_completed(
|
||||
self, expected_states=['BUILD', 'ACTIVE'],
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
expected_http_code=200):
|
||||
self.assert_restore_from_backup_completed(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
self.restore_host = self.get_instance_host(self.restore_instance_id)
|
||||
|
||||
def assert_restore_from_backup_completed(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
|
||||
def run_verify_data_in_restored_instance(self):
|
||||
self.assert_verify_backup_data(self.restore_host)
|
||||
|
||||
def run_verify_data_for_backup(self):
|
||||
self.assert_verify_backup_data(self.backup_host)
|
||||
|
||||
def assert_verify_backup_data(self, host):
|
||||
"""In order for this to work, the corresponding datastore
|
||||
'helper' class should implement the 'verify_large_data' method.
|
||||
"""
|
||||
self.test_helper.verify_data(DataType.large, host)
|
||||
|
||||
def run_delete_restored_instance(
|
||||
self, expected_states=['SHUTDOWN'],
|
||||
expected_http_code=202):
|
||||
self.assert_delete_restored_instance(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
|
||||
def assert_delete_restored_instance(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.auth_client.instances.delete(instance_id)
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
self.assert_all_gone(instance_id, expected_states[-1])
|
||||
|
||||
|
||||
class BackupRunner(BackupRunnerMixin):
|
||||
|
||||
def __init__(self):
|
||||
self.TIMEOUT_BACKUP_CREATE = 60 * 30
|
||||
@ -44,6 +127,7 @@ class BackupRunner(TestRunner):
|
||||
self.incremental_backup_info = None
|
||||
self.restore_instance_id = 0
|
||||
self.restore_host = None
|
||||
self.other_client = None
|
||||
|
||||
def run_backup_create_instance_invalid(
|
||||
self, expected_exception=exceptions.BadRequest,
|
||||
@ -72,15 +156,6 @@ class BackupRunner(TestRunner):
|
||||
"""
|
||||
self.test_helper.add_data(DataType.large, host)
|
||||
|
||||
def run_verify_data_for_backup(self):
|
||||
self.assert_verify_backup_data(self.backup_host)
|
||||
|
||||
def assert_verify_backup_data(self, host):
|
||||
"""In order for this to work, the corresponding datastore
|
||||
'helper' class should implement the 'verify_large_data' method.
|
||||
"""
|
||||
self.test_helper.verify_data(DataType.large, host)
|
||||
|
||||
def run_backup_create(self):
|
||||
self.assert_backup_create()
|
||||
|
||||
@ -242,55 +317,6 @@ class BackupRunner(TestRunner):
|
||||
def run_restore_from_backup(self):
|
||||
self.assert_restore_from_backup(self.backup_info.id)
|
||||
|
||||
def assert_restore_from_backup(self, backup_ref):
|
||||
result = self._restore_from_backup(backup_ref)
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
||||
def _restore_from_backup(self, backup_ref):
|
||||
restore_point = {'backupRef': backup_ref}
|
||||
result = self.auth_client.instances.create(
|
||||
self.instance_info.name + '_restore',
|
||||
self.instance_info.dbaas_flavor_href,
|
||||
self.instance_info.volume,
|
||||
nics=self.instance_info.nics,
|
||||
restorePoint=restore_point,
|
||||
datastore=self.instance_info.dbaas_datastore,
|
||||
datastore_version=self.instance_info.dbaas_datastore_version)
|
||||
return result
|
||||
|
||||
def run_restore_from_backup_completed(
|
||||
self, expected_states=['BUILD', 'ACTIVE'],
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
expected_http_code=200):
|
||||
self.assert_restore_from_backup_completed(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
self.restore_host = self.get_instance_host(self.restore_instance_id)
|
||||
|
||||
def assert_restore_from_backup_completed(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
|
||||
def run_verify_data_in_restored_instance(self):
|
||||
self.assert_verify_backup_data(self.restore_host)
|
||||
|
||||
def run_delete_restored_instance(
|
||||
self, expected_states=['SHUTDOWN'],
|
||||
expected_http_code=202):
|
||||
self.assert_delete_restored_instance(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
|
||||
def assert_delete_restored_instance(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.auth_client.instances.delete(instance_id)
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
self.assert_all_gone(instance_id, expected_states[-1])
|
||||
|
||||
def run_delete_unknown_backup(
|
||||
self, expected_exception=exceptions.NotFound,
|
||||
expected_http_code=404):
|
||||
@ -319,17 +345,6 @@ class BackupRunner(TestRunner):
|
||||
self.assert_client_code(expected_http_code)
|
||||
self._wait_until_backup_is_gone(backup_id)
|
||||
|
||||
def _wait_until_backup_is_gone(self, backup_id):
|
||||
def _backup_is_gone():
|
||||
try:
|
||||
self.auth_client.backups.get(backup_id)
|
||||
return False
|
||||
except exceptions.NotFound:
|
||||
return True
|
||||
|
||||
poll_until(_backup_is_gone,
|
||||
time_out=self.TIMEOUT_BACKUP_DELETE)
|
||||
|
||||
def run_check_for_incremental_backup(
|
||||
self, expected_exception=exceptions.NotFound,
|
||||
expected_http_code=404):
|
||||
@ -339,3 +354,59 @@ class BackupRunner(TestRunner):
|
||||
expected_exception, expected_http_code,
|
||||
self.auth_client.backups.get,
|
||||
self.incremental_backup_info.id)
|
||||
|
||||
|
||||
class IncrementalBackupRunner(BackupRunnerMixin):
|
||||
|
||||
def __init__(self):
|
||||
self.TIMEOUT_BACKUP_CREATE = 60 * 30
|
||||
self.TIMEOUT_BACKUP_DELETE = 120
|
||||
|
||||
super(IncrementalBackupRunner,
|
||||
self).__init__(sleep_time=20, timeout=self.TIMEOUT_BACKUP_CREATE)
|
||||
|
||||
self.BACKUP_NAME = 'backup_test'
|
||||
self.BACKUP_DESC = 'test description'
|
||||
|
||||
self.INCREMENTAL_BACKUP_NAME = 'incremental_backup_test'
|
||||
self.INCREMENTAL_BACKUP_DESC = 'incremental test description'
|
||||
|
||||
self.backup_host = None
|
||||
self.backup_info = None
|
||||
self.backup_count_prior_to_create = 0
|
||||
self.backup_count_for_instance_prior_to_create = 0
|
||||
|
||||
self.incremental_backup_info = None
|
||||
self.restore_instance_id = 0
|
||||
self.restore_host = None
|
||||
self.other_client = None
|
||||
|
||||
def run_backup_incremental(self, increments=1):
|
||||
full_backup = self.auth_client.backups.create(
|
||||
name=self.BACKUP_NAME, instance=self.instance_info.id,
|
||||
description=self.BACKUP_DESC)
|
||||
self.backup_info = full_backup
|
||||
print("Verifying backup " + full_backup.id)
|
||||
self._verify_backup(full_backup.id)
|
||||
incr_id = full_backup.id
|
||||
# TODO(atomic77) Modify add/verify data helpers to enable multiple
|
||||
# calls to add that would be required to properly test multiple
|
||||
# incremental backups
|
||||
self.test_helper.add_data(DataType.large, self.get_instance_host())
|
||||
for i in range(0, increments):
|
||||
iresult = self.auth_client.backups.create(
|
||||
name=self.INCREMENTAL_BACKUP_NAME,
|
||||
instance=self.instance_info.id,
|
||||
description=self.INCREMENTAL_BACKUP_DESC,
|
||||
parent_id=incr_id)
|
||||
print("Verifying backup " + iresult.id)
|
||||
self._verify_backup(iresult.id)
|
||||
incr_id = iresult.id
|
||||
self.incremental_backup_info = iresult
|
||||
|
||||
def run_restore_from_incremental(self, increment=1):
|
||||
result = self._restore_from_backup(self.incremental_backup_info.id)
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
@ -11,7 +11,9 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
from mock import ANY, DEFAULT, Mock, patch, PropertyMock
|
||||
from testtools.testcase import ExpectedException
|
||||
from trove.common import exception
|
||||
@ -23,6 +25,8 @@ from trove.guestagent.datastore.experimental.cassandra import (
|
||||
service as cass_service
|
||||
)
|
||||
from trove.guestagent.strategies.backup import base as backupBase
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl \
|
||||
import PgBaseBackupUtil
|
||||
from trove.guestagent.strategies.backup.mysql_impl import MySqlApp
|
||||
from trove.guestagent.strategies.restore import base as restoreBase
|
||||
from trove.guestagent.strategies.restore.mysql_impl import MySQLRestoreMixin
|
||||
@ -890,6 +894,56 @@ class RedisRestoreTests(trove_testtools.TestCase):
|
||||
self.restore_runner.restore)
|
||||
|
||||
|
||||
class PostgresqlBackupTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(PostgresqlBackupTests, self).setUp()
|
||||
self.bkutil = PgBaseBackupUtil()
|
||||
|
||||
self.b1 = ['000000010000000000000003',
|
||||
'000000010000000000000004',
|
||||
'000000010000000000000005',
|
||||
'000000010000000000000006',
|
||||
'000000010000000000000006.00000168.backup']
|
||||
|
||||
self.b2 = ['000000010000000000000007',
|
||||
'000000010000000000000008',
|
||||
'000000010000000000000009',
|
||||
'000000010000000000000010',
|
||||
'000000010000000000000009.0008A168.backup']
|
||||
|
||||
def tearDown(self):
|
||||
super(PostgresqlBackupTests, self).tearDown()
|
||||
|
||||
def test_check_most_recent_backup(self):
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1):
|
||||
mrb = self.bkutil.most_recent_backup_file()
|
||||
self.assertEqual(mrb, self.b1[4])
|
||||
mrbfile = self.bkutil.most_recent_backup_wal()
|
||||
self.assertEqual(mrbfile, self.b1[3])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1 + self.b2):
|
||||
mrb = self.bkutil.most_recent_backup_file()
|
||||
self.assertEqual(mrb, self.b2[4])
|
||||
mrbfile = self.bkutil.most_recent_backup_wal()
|
||||
self.assertEqual(mrbfile, self.b2[2])
|
||||
|
||||
def test_check_most_recent_wal_list(self):
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b1[3]])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b2):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b2[2], self.b2[3]])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1 + self.b2):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b2[2], self.b2[3]])
|
||||
|
||||
|
||||
class DB2BackupTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
Loading…
Reference in New Issue
Block a user