Merge "Postgresql Incremental Backup and Restore"
This commit is contained in:
commit
9147f9dd6b
@ -0,0 +1,4 @@
|
||||
---
|
||||
features:
|
||||
- Full and incremental backup and restore strategy for postgres
|
||||
based on pg_basebackup and WAL shipping.
|
@ -1044,15 +1044,20 @@ postgresql_opts = [
|
||||
'if trove_security_groups_support is True).'),
|
||||
cfg.PortOpt('postgresql_port', default=5432,
|
||||
help='The TCP port the server listens on.'),
|
||||
cfg.StrOpt('backup_strategy', default='PgDump',
|
||||
cfg.StrOpt('backup_strategy', default='PgBaseBackup',
|
||||
help='Default strategy to perform backups.'),
|
||||
cfg.DictOpt('backup_incremental_strategy', default={},
|
||||
cfg.DictOpt('backup_incremental_strategy',
|
||||
default={'PgBaseBackup': 'PgBaseBackupIncremental'},
|
||||
help='Incremental Backup Runner based on the default '
|
||||
'strategy. For strategies that do not implement an '
|
||||
'incremental, the runner will use the default full backup.'),
|
||||
cfg.StrOpt('mount_point', default='/var/lib/postgresql',
|
||||
help="Filesystem path for mounting "
|
||||
"volumes if volume support is enabled."),
|
||||
cfg.StrOpt('wal_archive_location', default='/mnt/wal_archive',
|
||||
help="Filesystem path storing WAL archive files when "
|
||||
"WAL-shipping based backups or replication "
|
||||
"is enabled."),
|
||||
cfg.BoolOpt('root_on_create', default=False,
|
||||
help='Enable the automatic creation of the root user for the '
|
||||
'service during instance-create. The generated password for '
|
||||
|
@ -228,6 +228,10 @@ class FileMode(object):
|
||||
def SET_USR_RW(cls):
|
||||
return cls(reset=[stat.S_IRUSR | stat.S_IWUSR]) # =0600
|
||||
|
||||
@classmethod
|
||||
def SET_USR_RWX(cls):
|
||||
return cls(reset=[stat.S_IRUSR | stat.S_IWUSR | stat.S_IXUSR]) # =0700
|
||||
|
||||
@classmethod
|
||||
def ADD_ALL_R(cls):
|
||||
return cls(add=[stat.S_IRUSR | stat.S_IRGRP | stat.S_IROTH]) # +0444
|
||||
|
@ -103,12 +103,14 @@ class Manager(
|
||||
device.mount(mount_point)
|
||||
self.configuration_manager.save_configuration(config_contents)
|
||||
self.apply_initial_guestagent_configuration()
|
||||
self.start_db(context)
|
||||
|
||||
if backup_info:
|
||||
backup.restore(context, backup_info, '/tmp')
|
||||
pgutil.PG_ADMIN = self.ADMIN_USER
|
||||
else:
|
||||
backup.restore(context, backup_info, '/tmp')
|
||||
|
||||
self.start_db(context)
|
||||
|
||||
if not backup_info:
|
||||
self._secure(context)
|
||||
|
||||
def _secure(self, context):
|
||||
@ -123,4 +125,5 @@ class Manager(
|
||||
|
||||
def create_backup(self, context, backup_info):
|
||||
with EndNotification(context):
|
||||
self.enable_backups()
|
||||
backup.backup(context, backup_info)
|
||||
|
@ -35,6 +35,8 @@ from trove.guestagent.datastore.experimental.postgresql import pgutil
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
|
||||
BACKUP_CFG_OVERRIDE = 'PgBaseBackupConfig'
|
||||
|
||||
|
||||
class PgSqlConfig(PgSqlProcess):
|
||||
"""Mixin that implements the config API.
|
||||
@ -63,6 +65,17 @@ class PgSqlConfig(PgSqlProcess):
|
||||
requires_root=True,
|
||||
override_strategy=OneFileOverrideStrategy(revision_dir))
|
||||
|
||||
@property
|
||||
def pgsql_extra_bin_dir(self):
|
||||
"""Redhat and Ubuntu packages for PgSql do not place 'extra' important
|
||||
binaries in /usr/bin, but rather in a directory like /usr/pgsql-9.4/bin
|
||||
in the case of PostgreSQL 9.4 for RHEL/CentOS
|
||||
"""
|
||||
version = self.pg_version[1]
|
||||
return {operating_system.DEBIAN: '/usr/lib/postgresql/%s/bin',
|
||||
operating_system.REDHAT: '/usr/pgsql-%s/bin',
|
||||
operating_system.SUSE: '/usr/bin'}[self.OS] % version
|
||||
|
||||
@property
|
||||
def pgsql_config(self):
|
||||
return self._find_config_file('postgresql.conf')
|
||||
@ -156,7 +169,8 @@ class PgSqlConfig(PgSqlProcess):
|
||||
# The OrderedDict is necessary to guarantee the iteration order.
|
||||
access_rules = OrderedDict(
|
||||
[('local', [['all', 'postgres,os_admin', None, 'trust'],
|
||||
['all', 'all', None, 'md5']]),
|
||||
['all', 'all', None, 'md5'],
|
||||
['replication', 'postgres,os_admin', None, 'trust']]),
|
||||
('host', [['all', 'postgres,os_admin', '127.0.0.1/32', 'trust'],
|
||||
['all', 'postgres,os_admin', '::1/128', 'trust'],
|
||||
['all', 'postgres,os_admin', 'localhost', 'trust'],
|
||||
@ -174,3 +188,35 @@ class PgSqlConfig(PgSqlProcess):
|
||||
as_root=True)
|
||||
operating_system.chmod(self.pgsql_hba_config, FileMode.SET_USR_RO,
|
||||
as_root=True)
|
||||
|
||||
def enable_backups(self):
|
||||
"""Apply necessary changes to config to enable WAL-based backups
|
||||
if we are using the PgBaseBackup strategy
|
||||
"""
|
||||
if not CONF.postgresql.backup_strategy == 'PgBaseBackup':
|
||||
return
|
||||
if self.configuration_manager.has_system_override(BACKUP_CFG_OVERRIDE):
|
||||
return
|
||||
|
||||
LOG.info("Applying changes to WAL config for use by base backups")
|
||||
wal_arch_loc = CONF.postgresql.wal_archive_location
|
||||
if not os.path.isdir(wal_arch_loc):
|
||||
raise RuntimeError(_("Cannot enable backup as WAL dir '%s' does "
|
||||
"not exist.") % wal_arch_loc)
|
||||
arch_cmd = "'test ! -f {wal_arch}/%f && cp %p {wal_arch}/%f'".format(
|
||||
wal_arch=wal_arch_loc
|
||||
)
|
||||
opts = {
|
||||
'wal_level': 'hot_standby',
|
||||
'archive_mode ': 'on',
|
||||
'max_wal_senders': 3,
|
||||
'checkpoint_segments ': 8,
|
||||
'wal_keep_segments': 8,
|
||||
'archive_command': arch_cmd
|
||||
}
|
||||
if not self.pg_version[1] in ('9.3'):
|
||||
opts['wal_log_hints'] = 'on'
|
||||
|
||||
self.configuration_manager.apply_system_override(
|
||||
opts, BACKUP_CFG_OVERRIDE)
|
||||
self.restart(None)
|
||||
|
@ -20,6 +20,8 @@ from trove.common.i18n import _
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.process import(
|
||||
PgSqlProcess)
|
||||
from trove.guestagent import pkg
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
|
||||
PgBaseBackupUtil)
|
||||
|
||||
LOG = logging.getLogger(__name__)
|
||||
CONF = cfg.CONF
|
||||
@ -46,6 +48,9 @@ class PgSqlInstall(PgSqlProcess):
|
||||
guest_id=CONF.guest_id
|
||||
)
|
||||
)
|
||||
|
||||
PgBaseBackupUtil.recreate_wal_archive_dir()
|
||||
|
||||
packager = pkg.Package()
|
||||
if not packager.pkg_is_installed(packages):
|
||||
try:
|
||||
|
@ -13,8 +13,31 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
import stat
|
||||
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common import utils
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore.experimental.postgresql import pgutil
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.config import(
|
||||
PgSqlConfig)
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.process import(
|
||||
PgSqlProcess)
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.users import(
|
||||
PgSqlUsers)
|
||||
from trove.guestagent.strategies.backup import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAL_ARCHIVE_DIR = CONF.postgresql.wal_archive_location
|
||||
|
||||
|
||||
class PgDump(base.BackupRunner):
|
||||
"""Implementation of Backup Strategy for pg_dump."""
|
||||
@ -24,3 +47,219 @@ class PgDump(base.BackupRunner):
|
||||
def cmd(self):
|
||||
cmd = 'sudo -u postgres pg_dumpall '
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
|
||||
class PgBaseBackupUtil(object):
|
||||
|
||||
def most_recent_backup_wal(self, pos=0):
|
||||
"""
|
||||
Return the WAL file for the most recent backup
|
||||
"""
|
||||
mrb_file = self.most_recent_backup_file(pos=pos)
|
||||
# just return the first part of the filename
|
||||
return mrb_file.split('.')[0]
|
||||
|
||||
def most_recent_backup_file(self, pos=0):
|
||||
"""
|
||||
Look for the most recent .backup file that basebackup creates
|
||||
:return: a string like 000000010000000000000006.00000168.backup
|
||||
"""
|
||||
walre = re.compile("[0-9A-F]{24}.*.backup")
|
||||
wal_files = [wal_file for wal_file in os.listdir(WAL_ARCHIVE_DIR)
|
||||
if walre.search(wal_file)]
|
||||
wal_files = sorted(wal_files, reverse=True)
|
||||
if not wal_files:
|
||||
return None
|
||||
return wal_files[pos]
|
||||
|
||||
def log_files_since_last_backup(self, pos=0):
|
||||
"""Return the WAL files since the provided last backup
|
||||
pg_archivebackup depends on alphanumeric sorting to decide wal order,
|
||||
so we'll do so too:
|
||||
https://github.com/postgres/postgres/blob/REL9_4_STABLE/contrib
|
||||
/pg_archivecleanup/pg_archivecleanup.c#L122
|
||||
"""
|
||||
last_wal = self.most_recent_backup_wal(pos=pos)
|
||||
walre = re.compile("^[0-9A-F]{24}$")
|
||||
wal_files = [wal_file for wal_file in os.listdir(WAL_ARCHIVE_DIR)
|
||||
if walre.search(wal_file) and wal_file >= last_wal]
|
||||
return wal_files
|
||||
|
||||
@staticmethod
|
||||
def recreate_wal_archive_dir():
|
||||
operating_system.remove(WAL_ARCHIVE_DIR, force=True, recursive=True,
|
||||
as_root=True)
|
||||
operating_system.create_directory(WAL_ARCHIVE_DIR,
|
||||
user=PgSqlProcess.PGSQL_OWNER,
|
||||
group=PgSqlProcess.PGSQL_OWNER,
|
||||
force=True, as_root=True)
|
||||
|
||||
|
||||
class PgBaseBackup(base.BackupRunner, PgSqlConfig, PgBaseBackupUtil,
|
||||
PgSqlUsers):
|
||||
"""Base backups are taken with the pg_basebackup filesystem-level backup
|
||||
tool pg_basebackup creates a copy of the binary files in the PostgreSQL
|
||||
cluster data directory and enough WAL segments to allow the database to
|
||||
be brought back to a consistent state. Associated with each backup is a
|
||||
log location, normally indicated by the WAL file name and the position
|
||||
inside the file.
|
||||
"""
|
||||
__strategy_name__ = 'pg_basebackup'
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PgBaseBackup, self).__init__(*args, **kwargs)
|
||||
self.label = None
|
||||
self.stop_segment = None
|
||||
self.start_segment = None
|
||||
self.start_wal_file = None
|
||||
self.stop_wal_file = None
|
||||
self.checkpoint_location = None
|
||||
self.mrb = None
|
||||
|
||||
@property
|
||||
def cmd(self):
|
||||
cmd = ("pg_basebackup -h %s -U %s --pgdata=-"
|
||||
" --label=%s --format=tar --xlog " %
|
||||
(self.UNIX_SOCKET_DIR, self.ADMIN_USER, self.base_filename))
|
||||
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
def base_backup_metadata(self, metadata_file):
|
||||
"""Parse the contents of the .backup file"""
|
||||
metadata = {}
|
||||
operating_system.chmod(
|
||||
metadata_file, FileMode(add=[stat.S_IROTH]), as_root=True)
|
||||
|
||||
start_re = re.compile("START WAL LOCATION: (.*) \(file (.*)\)")
|
||||
stop_re = re.compile("STOP WAL LOCATION: (.*) \(file (.*)\)")
|
||||
checkpt_re = re.compile("CHECKPOINT LOCATION: (.*)")
|
||||
label_re = re.compile("LABEL: (.*)")
|
||||
|
||||
metadata_contents = operating_system.read_file(metadata_file)
|
||||
match = start_re.search(metadata_contents)
|
||||
if match:
|
||||
self.start_segment = match.group(1)
|
||||
metadata['start-segment'] = self.start_segment
|
||||
self.start_wal_file = match.group(2)
|
||||
metadata['start-wal-file'] = self.start_wal_file
|
||||
|
||||
match = stop_re.search(metadata_contents)
|
||||
if match:
|
||||
self.stop_segment = match.group(1)
|
||||
metadata['stop-segment'] = self.stop_segment
|
||||
self.stop_wal_file = match.group(2)
|
||||
metadata['stop-wal-file'] = self.stop_wal_file
|
||||
|
||||
match = checkpt_re.search(metadata_contents)
|
||||
if match:
|
||||
self.checkpoint_location = match.group(1)
|
||||
metadata['checkpoint-location'] = self.checkpoint_location
|
||||
|
||||
match = label_re.search(metadata_contents)
|
||||
if match:
|
||||
self.label = match.group(1)
|
||||
metadata['label'] = self.label
|
||||
return metadata
|
||||
|
||||
def check_process(self):
|
||||
# If any of the below variables were not set by either metadata()
|
||||
# or direct retrieval from the pgsql backup commands, then something
|
||||
# has gone wrong
|
||||
if not self.start_segment or not self.start_wal_file:
|
||||
LOG.info(_("Unable to determine starting WAL file/segment"))
|
||||
return False
|
||||
if not self.stop_segment or not self.stop_wal_file:
|
||||
LOG.info(_("Unable to determine ending WAL file/segment"))
|
||||
return False
|
||||
if not self.label:
|
||||
LOG.info(_("No backup label found"))
|
||||
return False
|
||||
return True
|
||||
|
||||
def metadata(self):
|
||||
"""pg_basebackup may complete, and we arrive here before the
|
||||
history file is written to the wal archive. So we need to
|
||||
handle two possibilities:
|
||||
- this is the first backup, and no history file exists yet
|
||||
- this isn't the first backup, and so the history file we retrieve
|
||||
isn't the one we just ran!
|
||||
"""
|
||||
def _metadata_found():
|
||||
LOG.debug("Polling for backup metadata... ")
|
||||
self.mrb = self.most_recent_backup_file()
|
||||
if not self.mrb:
|
||||
LOG.debug("No history files found!")
|
||||
return False
|
||||
metadata = self.base_backup_metadata(
|
||||
os.path.join(WAL_ARCHIVE_DIR, self.mrb))
|
||||
LOG.debug("Label to pg_basebackup: %s label found: %s" %
|
||||
(self.base_filename, metadata['label']))
|
||||
LOG.info(_("Metadata for backup: %s.") % str(metadata))
|
||||
return metadata['label'] == self.base_filename
|
||||
|
||||
try:
|
||||
utils.poll_until(_metadata_found, sleep_time=5, time_out=60)
|
||||
except exception.PollTimeOut:
|
||||
raise RuntimeError(_("Timeout waiting for backup metadata for"
|
||||
" backup %s") % self.base_filename)
|
||||
|
||||
return self.base_backup_metadata(
|
||||
os.path.join(WAL_ARCHIVE_DIR, self.mrb))
|
||||
|
||||
def _run_post_backup(self):
|
||||
"""Get rid of WAL data we don't need any longer"""
|
||||
arch_cleanup_bin = os.path.join(self.pgsql_extra_bin_dir,
|
||||
"pg_archivecleanup")
|
||||
bk_file = os.path.basename(self.most_recent_backup_file())
|
||||
cmd_full = " ".join((arch_cleanup_bin, WAL_ARCHIVE_DIR, bk_file))
|
||||
utils.execute("sudo", "su", "-", self.PGSQL_OWNER, "-c",
|
||||
"%s" % cmd_full)
|
||||
|
||||
|
||||
class PgBaseBackupIncremental(PgBaseBackup):
|
||||
"""To restore an incremental backup from a previous backup, in PostgreSQL,
|
||||
is effectively to replay the WAL entries to a designated point in time.
|
||||
All that is required is the most recent base backup, and all WAL files
|
||||
"""
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
if (not kwargs.get('parent_location') or
|
||||
not kwargs.get('parent_checksum')):
|
||||
raise AttributeError(_('Parent missing!'))
|
||||
|
||||
super(PgBaseBackupIncremental, self).__init__(*args, **kwargs)
|
||||
self.parent_location = kwargs.get('parent_location')
|
||||
self.parent_checksum = kwargs.get('parent_checksum')
|
||||
|
||||
def _run_pre_backup(self):
|
||||
self.backup_label = self.base_filename
|
||||
result = pgutil.query("SELECT pg_start_backup('%s', true)" %
|
||||
self.backup_label)
|
||||
self.start_segment = result[0][0]
|
||||
|
||||
result = pgutil.query("SELECT pg_xlogfile_name('%s')" %
|
||||
self.start_segment)
|
||||
self.start_wal_file = result[0][0]
|
||||
|
||||
result = pgutil.query("SELECT pg_stop_backup()")
|
||||
self.stop_segment = result[0][0]
|
||||
|
||||
# We have to hack this because self.command is
|
||||
# initialized in the base class before we get here, which is
|
||||
# when we will know exactly what WAL files we want to archive
|
||||
self.command = self._cmd()
|
||||
|
||||
def _cmd(self):
|
||||
wal_file_list = self.log_files_since_last_backup(pos=1)
|
||||
cmd = 'sudo tar -cf - -C {wal_dir} {wal_list} '.format(
|
||||
wal_dir=WAL_ARCHIVE_DIR,
|
||||
wal_list=" ".join(wal_file_list))
|
||||
return cmd + self.zip_cmd + self.encrypt_cmd
|
||||
|
||||
def metadata(self):
|
||||
_meta = super(PgBaseBackupIncremental, self).metadata()
|
||||
_meta.update({
|
||||
'parent_location': self.parent_location,
|
||||
'parent_checksum': self.parent_checksum,
|
||||
})
|
||||
return _meta
|
||||
|
@ -13,21 +13,33 @@
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import os
|
||||
import re
|
||||
|
||||
from eventlet.green import subprocess
|
||||
from oslo_log import log as logging
|
||||
|
||||
from trove.common import cfg
|
||||
from trove.common import exception
|
||||
from trove.common.i18n import _
|
||||
from trove.common import stream_codecs
|
||||
from trove.guestagent.common import operating_system
|
||||
from trove.guestagent.common.operating_system import FileMode
|
||||
from trove.guestagent.datastore.experimental.postgresql.service.config import(
|
||||
PgSqlConfig)
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl import(
|
||||
PgBaseBackupUtil)
|
||||
from trove.guestagent.strategies.restore import base
|
||||
|
||||
CONF = cfg.CONF
|
||||
LOG = logging.getLogger(__name__)
|
||||
WAL_ARCHIVE_DIR = CONF.postgresql.wal_archive_location
|
||||
|
||||
|
||||
class PgDump(base.RestoreRunner):
|
||||
"""Implementation of Restore Strategy for pg_dump."""
|
||||
__strategy_name__ = 'pg_dump'
|
||||
base_restore_cmd = 'sudo -u postgres psql '
|
||||
base_restore_cmd = 'psql -U os_admin'
|
||||
|
||||
IGNORED_ERROR_PATTERNS = [
|
||||
re.compile("ERROR:\s*role \"postgres\" already exists"),
|
||||
@ -60,7 +72,7 @@ class PgDump(base.RestoreRunner):
|
||||
content_length += len(chunk)
|
||||
process.stdin.close()
|
||||
self._handle_errors(process)
|
||||
LOG.debug("Restored %s bytes from stream." % content_length)
|
||||
LOG.info(_("Restored %s bytes from stream.") % content_length)
|
||||
|
||||
return content_length
|
||||
|
||||
@ -80,3 +92,105 @@ class PgDump(base.RestoreRunner):
|
||||
raise exception(message)
|
||||
except OSError:
|
||||
pass
|
||||
|
||||
|
||||
class PgBaseBackup(base.RestoreRunner, PgSqlConfig):
|
||||
"""Implementation of Restore Strategy for pg_basebackup."""
|
||||
__strategy_name__ = 'pg_basebackup'
|
||||
location = ""
|
||||
base_restore_cmd = ''
|
||||
|
||||
IGNORED_ERROR_PATTERNS = [
|
||||
re.compile("ERROR:\s*role \"postgres\" already exists"),
|
||||
]
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
self.base_restore_cmd = 'sudo -u %s tar xCf %s - ' % (
|
||||
self.PGSQL_OWNER, self.pgsql_data_dir
|
||||
)
|
||||
|
||||
super(PgBaseBackup, self).__init__(*args, **kwargs)
|
||||
|
||||
def pre_restore(self):
|
||||
self.stop_db(context=None)
|
||||
PgBaseBackupUtil.recreate_wal_archive_dir()
|
||||
datadir = self.pgsql_data_dir
|
||||
operating_system.remove(datadir, force=True, recursive=True,
|
||||
as_root=True)
|
||||
operating_system.create_directory(datadir, user=self.PGSQL_OWNER,
|
||||
group=self.PGSQL_OWNER, force=True,
|
||||
as_root=True)
|
||||
|
||||
def post_restore(self):
|
||||
operating_system.chmod(self.pgsql_data_dir,
|
||||
FileMode.SET_USR_RWX(),
|
||||
as_root=True, recursive=True, force=True)
|
||||
|
||||
def write_recovery_file(self, restore=False):
|
||||
metadata = self.storage.load_metadata(self.location, self.checksum)
|
||||
recovery_conf = ""
|
||||
recovery_conf += "recovery_target_name = '%s' \n" % metadata['label']
|
||||
recovery_conf += "recovery_target_timeline = '%s' \n" % 1
|
||||
|
||||
if restore:
|
||||
recovery_conf += ("restore_command = '" +
|
||||
self.pgsql_restore_cmd + "'\n")
|
||||
|
||||
recovery_file = os.path.join(self.pgsql_data_dir, 'recovery.conf')
|
||||
operating_system.write_file(recovery_file, recovery_conf,
|
||||
codec=stream_codecs.IdentityCodec(),
|
||||
as_root=True)
|
||||
operating_system.chown(recovery_file, user=self.PGSQL_OWNER,
|
||||
group=self.PGSQL_OWNER, as_root=True)
|
||||
|
||||
|
||||
class PgBaseBackupIncremental(PgBaseBackup):
|
||||
|
||||
def __init__(self, *args, **kwargs):
|
||||
super(PgBaseBackupIncremental, self).__init__(*args, **kwargs)
|
||||
self.content_length = 0
|
||||
self.incr_restore_cmd = 'sudo -u %s tar -xf - -C %s ' % (
|
||||
self.PGSQL_OWNER, WAL_ARCHIVE_DIR
|
||||
)
|
||||
self.pgsql_restore_cmd = "cp " + WAL_ARCHIVE_DIR + '/%f "%p"'
|
||||
|
||||
def pre_restore(self):
|
||||
self.stop_db(context=None)
|
||||
|
||||
def post_restore(self):
|
||||
self.write_recovery_file(restore=True)
|
||||
|
||||
def _incremental_restore_cmd(self, incr=False):
|
||||
args = {'restore_location': self.restore_location}
|
||||
cmd = self.base_restore_cmd
|
||||
if incr:
|
||||
cmd = self.incr_restore_cmd
|
||||
return self.decrypt_cmd + self.unzip_cmd + (cmd % args)
|
||||
|
||||
def _incremental_restore(self, location, checksum):
|
||||
|
||||
metadata = self.storage.load_metadata(location, checksum)
|
||||
if 'parent_location' in metadata:
|
||||
LOG.info("Found parent at %s", metadata['parent_location'])
|
||||
parent_location = metadata['parent_location']
|
||||
parent_checksum = metadata['parent_checksum']
|
||||
self._incremental_restore(parent_location, parent_checksum)
|
||||
cmd = self._incremental_restore_cmd(incr=True)
|
||||
self.content_length += self._unpack(location, checksum, cmd)
|
||||
|
||||
else:
|
||||
# For the parent base backup, revert to the default restore cmd
|
||||
LOG.info(_("Recursed back to full backup."))
|
||||
|
||||
super(PgBaseBackupIncremental, self).pre_restore()
|
||||
cmd = self._incremental_restore_cmd(incr=False)
|
||||
self.content_length += self._unpack(location, checksum, cmd)
|
||||
|
||||
operating_system.chmod(self.pgsql_data_dir,
|
||||
FileMode.SET_USR_RWX(),
|
||||
as_root=True, recursive=True, force=True)
|
||||
|
||||
def _run_restore(self):
|
||||
self._incremental_restore(self.location, self.checksum)
|
||||
# content-length restored
|
||||
return self.content_length
|
||||
|
@ -144,6 +144,9 @@ instance_create_groups.extend([instance_create_group.GROUP,
|
||||
backup_groups = list(instance_create_groups)
|
||||
backup_groups.extend([backup_group.GROUP])
|
||||
|
||||
incremental_backup_groups = list(instance_create_groups)
|
||||
incremental_backup_groups.extend([backup_group.GROUP_INCREMENTAL])
|
||||
|
||||
configuration_groups = list(instance_create_groups)
|
||||
configuration_groups.extend([configuration_group.GROUP])
|
||||
|
||||
@ -178,6 +181,7 @@ common_groups.extend([guest_log_groups, module_groups])
|
||||
|
||||
# Register: Component based groups
|
||||
register(["backup"], backup_groups)
|
||||
register(["incremental_backup"], incremental_backup_groups)
|
||||
register(["cluster"], cluster_actions_groups)
|
||||
register(["configuration"], configuration_groups)
|
||||
register(["database"], database_actions_groups)
|
||||
@ -203,7 +207,8 @@ register(["couchdb_supported"], common_groups, backup_groups,
|
||||
user_actions_groups, database_actions_groups, root_actions_groups)
|
||||
register(["postgresql_supported"], common_groups,
|
||||
backup_groups, database_actions_groups, configuration_groups,
|
||||
root_actions_groups, user_actions_groups)
|
||||
root_actions_groups, user_actions_groups,
|
||||
incremental_backup_groups)
|
||||
register(["mysql_supported", "percona_supported"], common_groups,
|
||||
backup_groups, configuration_groups, database_actions_groups,
|
||||
replication_groups, root_actions_groups, user_actions_groups)
|
||||
|
@ -25,6 +25,10 @@ GROUP_BACKUP = "scenario.backup_group"
|
||||
GROUP_BACKUP_LIST = "scenario.backup_list_group"
|
||||
GROUP_RESTORE = "scenario.restore_group"
|
||||
|
||||
GROUP_INCREMENTAL = "scenario.incremental_backup_restore_group"
|
||||
GROUP_INCREMENTAL_BACKUP = "scenario.incremental_backup_group"
|
||||
GROUP_INCREMENTAL_RESTORE = "scenario.incremental_restore_group"
|
||||
|
||||
|
||||
class BackupRunnerFactory(test_runners.RunnerFactory):
|
||||
|
||||
@ -32,6 +36,12 @@ class BackupRunnerFactory(test_runners.RunnerFactory):
|
||||
_runner_cls = 'BackupRunner'
|
||||
|
||||
|
||||
class IncrementalBackupRunnerFactory(test_runners.RunnerFactory):
|
||||
|
||||
_runner_ns = 'backup_runners'
|
||||
_runner_cls = 'IncrementalBackupRunner'
|
||||
|
||||
|
||||
@test(depends_on_groups=[instance_create_group.GROUP], groups=[GROUP])
|
||||
class BackupGroup(TestGroup):
|
||||
"""Test Backup and Restore functionality."""
|
||||
@ -194,3 +204,43 @@ class BackupGroup(TestGroup):
|
||||
def check_for_incremental_backup(self):
|
||||
"""Test that backup children are deleted."""
|
||||
self.test_runner.run_check_for_incremental_backup()
|
||||
|
||||
|
||||
@test(depends_on_groups=[instance_create_group.GROUP],
|
||||
groups=[GROUP_INCREMENTAL])
|
||||
class IncrementalBackupGroup(TestGroup):
|
||||
"""Test Incremental Backup and Restore functionality."""
|
||||
|
||||
def __init__(self):
|
||||
super(IncrementalBackupGroup, self).__init__(
|
||||
IncrementalBackupRunnerFactory.instance())
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_BACKUP])
|
||||
def backup_run_single_incremental(self):
|
||||
"""Run a full and a single incremental backup"""
|
||||
self.test_runner.run_backup_incremental(increments=1)
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_BACKUP],
|
||||
depends_on=[backup_run_single_incremental])
|
||||
def restore_from_incremental(self):
|
||||
"""Launch a restore from a single incremental backup"""
|
||||
self.test_runner.run_restore_from_incremental(increment=1)
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_incremental])
|
||||
def restore_from_backup_completed(self):
|
||||
"""Wait until restoring an instance from an incr. backup completes."""
|
||||
self.test_runner.run_restore_from_backup_completed()
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_backup_completed])
|
||||
def verify_data_in_restored_instance(self):
|
||||
"""Verify data in restored instance."""
|
||||
self.test_runner.run_verify_data_in_restored_instance()
|
||||
|
||||
@test(groups=[GROUP_INCREMENTAL_RESTORE],
|
||||
depends_on=[restore_from_backup_completed],
|
||||
runs_after=[verify_data_in_restored_instance])
|
||||
def delete_restored_instance(self):
|
||||
"""Test deleting the restored instance."""
|
||||
self.test_runner.run_delete_restored_instance()
|
||||
|
@ -52,7 +52,7 @@ class PostgresqlHelper(SqlHelper):
|
||||
|
||||
def get_invalid_groups(self):
|
||||
return [{'timezone': 997},
|
||||
{"max_worker_processes": 'string_value'},
|
||||
{"vacuum_cost_delay": 'string_value'},
|
||||
{"standard_conforming_strings": 'string_value'}]
|
||||
|
||||
def get_exposed_user_log_names(self):
|
||||
|
@ -23,7 +23,90 @@ from trove.tests.scenario.helpers.test_helper import DataType
|
||||
from trove.tests.scenario.runners.test_runners import TestRunner
|
||||
|
||||
|
||||
class BackupRunner(TestRunner):
|
||||
class BackupRunnerMixin(TestRunner):
|
||||
def _verify_backup(self, backup_id):
|
||||
def _result_is_active():
|
||||
backup = self.auth_client.backups.get(backup_id)
|
||||
if backup.status == 'COMPLETED':
|
||||
return True
|
||||
else:
|
||||
self.assert_not_equal('FAILED', backup.status,
|
||||
'Backup status should not be')
|
||||
return False
|
||||
|
||||
poll_until(_result_is_active, time_out=self.TIMEOUT_BACKUP_CREATE)
|
||||
|
||||
def _wait_until_backup_is_gone(self, backup_id):
|
||||
def _backup_is_gone():
|
||||
try:
|
||||
self.auth_client.backups.get(backup_id)
|
||||
return False
|
||||
except exceptions.NotFound:
|
||||
return True
|
||||
|
||||
poll_until(_backup_is_gone,
|
||||
time_out=self.TIMEOUT_BACKUP_DELETE)
|
||||
|
||||
def assert_restore_from_backup(self, backup_ref):
|
||||
result = self._restore_from_backup(backup_ref)
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
||||
def _restore_from_backup(self, backup_ref):
|
||||
restore_point = {'backupRef': backup_ref}
|
||||
result = self.auth_client.instances.create(
|
||||
self.instance_info.name + '_restore',
|
||||
self.instance_info.dbaas_flavor_href,
|
||||
self.instance_info.volume,
|
||||
nics=self.instance_info.nics,
|
||||
restorePoint=restore_point,
|
||||
datastore=self.instance_info.dbaas_datastore,
|
||||
datastore_version=self.instance_info.dbaas_datastore_version)
|
||||
return result
|
||||
|
||||
def run_restore_from_backup_completed(
|
||||
self, expected_states=['BUILD', 'ACTIVE'],
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
expected_http_code=200):
|
||||
self.assert_restore_from_backup_completed(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
self.restore_host = self.get_instance_host(self.restore_instance_id)
|
||||
|
||||
def assert_restore_from_backup_completed(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
|
||||
def run_verify_data_in_restored_instance(self):
|
||||
self.assert_verify_backup_data(self.restore_host)
|
||||
|
||||
def run_verify_data_for_backup(self):
|
||||
self.assert_verify_backup_data(self.backup_host)
|
||||
|
||||
def assert_verify_backup_data(self, host):
|
||||
"""In order for this to work, the corresponding datastore
|
||||
'helper' class should implement the 'verify_large_data' method.
|
||||
"""
|
||||
self.test_helper.verify_data(DataType.large, host)
|
||||
|
||||
def run_delete_restored_instance(
|
||||
self, expected_states=['SHUTDOWN'],
|
||||
expected_http_code=202):
|
||||
self.assert_delete_restored_instance(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
|
||||
def assert_delete_restored_instance(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.auth_client.instances.delete(instance_id)
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
self.assert_all_gone(instance_id, expected_states[-1])
|
||||
|
||||
|
||||
class BackupRunner(BackupRunnerMixin):
|
||||
|
||||
def __init__(self):
|
||||
self.TIMEOUT_BACKUP_CREATE = 60 * 30
|
||||
@ -44,6 +127,7 @@ class BackupRunner(TestRunner):
|
||||
self.incremental_backup_info = None
|
||||
self.restore_instance_id = 0
|
||||
self.restore_host = None
|
||||
self.other_client = None
|
||||
|
||||
def run_backup_create_instance_invalid(
|
||||
self, expected_exception=exceptions.BadRequest,
|
||||
@ -72,15 +156,6 @@ class BackupRunner(TestRunner):
|
||||
"""
|
||||
self.test_helper.add_data(DataType.large, host)
|
||||
|
||||
def run_verify_data_for_backup(self):
|
||||
self.assert_verify_backup_data(self.backup_host)
|
||||
|
||||
def assert_verify_backup_data(self, host):
|
||||
"""In order for this to work, the corresponding datastore
|
||||
'helper' class should implement the 'verify_large_data' method.
|
||||
"""
|
||||
self.test_helper.verify_data(DataType.large, host)
|
||||
|
||||
def run_backup_create(self):
|
||||
self.assert_backup_create()
|
||||
|
||||
@ -242,55 +317,6 @@ class BackupRunner(TestRunner):
|
||||
def run_restore_from_backup(self):
|
||||
self.assert_restore_from_backup(self.backup_info.id)
|
||||
|
||||
def assert_restore_from_backup(self, backup_ref):
|
||||
result = self._restore_from_backup(backup_ref)
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
||||
def _restore_from_backup(self, backup_ref):
|
||||
restore_point = {'backupRef': backup_ref}
|
||||
result = self.auth_client.instances.create(
|
||||
self.instance_info.name + '_restore',
|
||||
self.instance_info.dbaas_flavor_href,
|
||||
self.instance_info.volume,
|
||||
nics=self.instance_info.nics,
|
||||
restorePoint=restore_point,
|
||||
datastore=self.instance_info.dbaas_datastore,
|
||||
datastore_version=self.instance_info.dbaas_datastore_version)
|
||||
return result
|
||||
|
||||
def run_restore_from_backup_completed(
|
||||
self, expected_states=['BUILD', 'ACTIVE'],
|
||||
# TODO(peterstac) - This should probably return code 202
|
||||
expected_http_code=200):
|
||||
self.assert_restore_from_backup_completed(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
self.restore_host = self.get_instance_host(self.restore_instance_id)
|
||||
|
||||
def assert_restore_from_backup_completed(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
|
||||
def run_verify_data_in_restored_instance(self):
|
||||
self.assert_verify_backup_data(self.restore_host)
|
||||
|
||||
def run_delete_restored_instance(
|
||||
self, expected_states=['SHUTDOWN'],
|
||||
expected_http_code=202):
|
||||
self.assert_delete_restored_instance(
|
||||
self.restore_instance_id, expected_states, expected_http_code)
|
||||
|
||||
def assert_delete_restored_instance(
|
||||
self, instance_id, expected_states, expected_http_code):
|
||||
self.auth_client.instances.delete(instance_id)
|
||||
self.assert_instance_action(instance_id, expected_states,
|
||||
expected_http_code)
|
||||
self.assert_all_gone(instance_id, expected_states[-1])
|
||||
|
||||
def run_delete_unknown_backup(
|
||||
self, expected_exception=exceptions.NotFound,
|
||||
expected_http_code=404):
|
||||
@ -319,17 +345,6 @@ class BackupRunner(TestRunner):
|
||||
self.assert_client_code(expected_http_code)
|
||||
self._wait_until_backup_is_gone(backup_id)
|
||||
|
||||
def _wait_until_backup_is_gone(self, backup_id):
|
||||
def _backup_is_gone():
|
||||
try:
|
||||
self.auth_client.backups.get(backup_id)
|
||||
return False
|
||||
except exceptions.NotFound:
|
||||
return True
|
||||
|
||||
poll_until(_backup_is_gone,
|
||||
time_out=self.TIMEOUT_BACKUP_DELETE)
|
||||
|
||||
def run_check_for_incremental_backup(
|
||||
self, expected_exception=exceptions.NotFound,
|
||||
expected_http_code=404):
|
||||
@ -339,3 +354,59 @@ class BackupRunner(TestRunner):
|
||||
expected_exception, expected_http_code,
|
||||
self.auth_client.backups.get,
|
||||
self.incremental_backup_info.id)
|
||||
|
||||
|
||||
class IncrementalBackupRunner(BackupRunnerMixin):
|
||||
|
||||
def __init__(self):
|
||||
self.TIMEOUT_BACKUP_CREATE = 60 * 30
|
||||
self.TIMEOUT_BACKUP_DELETE = 120
|
||||
|
||||
super(IncrementalBackupRunner,
|
||||
self).__init__(sleep_time=20, timeout=self.TIMEOUT_BACKUP_CREATE)
|
||||
|
||||
self.BACKUP_NAME = 'backup_test'
|
||||
self.BACKUP_DESC = 'test description'
|
||||
|
||||
self.INCREMENTAL_BACKUP_NAME = 'incremental_backup_test'
|
||||
self.INCREMENTAL_BACKUP_DESC = 'incremental test description'
|
||||
|
||||
self.backup_host = None
|
||||
self.backup_info = None
|
||||
self.backup_count_prior_to_create = 0
|
||||
self.backup_count_for_instance_prior_to_create = 0
|
||||
|
||||
self.incremental_backup_info = None
|
||||
self.restore_instance_id = 0
|
||||
self.restore_host = None
|
||||
self.other_client = None
|
||||
|
||||
def run_backup_incremental(self, increments=1):
|
||||
full_backup = self.auth_client.backups.create(
|
||||
name=self.BACKUP_NAME, instance=self.instance_info.id,
|
||||
description=self.BACKUP_DESC)
|
||||
self.backup_info = full_backup
|
||||
print("Verifying backup " + full_backup.id)
|
||||
self._verify_backup(full_backup.id)
|
||||
incr_id = full_backup.id
|
||||
# TODO(atomic77) Modify add/verify data helpers to enable multiple
|
||||
# calls to add that would be required to properly test multiple
|
||||
# incremental backups
|
||||
self.test_helper.add_data(DataType.large, self.get_instance_host())
|
||||
for i in range(0, increments):
|
||||
iresult = self.auth_client.backups.create(
|
||||
name=self.INCREMENTAL_BACKUP_NAME,
|
||||
instance=self.instance_info.id,
|
||||
description=self.INCREMENTAL_BACKUP_DESC,
|
||||
parent_id=incr_id)
|
||||
print("Verifying backup " + iresult.id)
|
||||
self._verify_backup(iresult.id)
|
||||
incr_id = iresult.id
|
||||
self.incremental_backup_info = iresult
|
||||
|
||||
def run_restore_from_incremental(self, increment=1):
|
||||
result = self._restore_from_backup(self.incremental_backup_info.id)
|
||||
self.assert_client_code(200)
|
||||
self.assert_equal('BUILD', result.status,
|
||||
'Unexpected instance status')
|
||||
self.restore_instance_id = result.id
|
||||
|
@ -11,7 +11,9 @@
|
||||
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
# License for the specific language governing permissions and limitations
|
||||
# under the License.
|
||||
|
||||
import mock
|
||||
import os
|
||||
from mock import ANY, DEFAULT, Mock, patch, PropertyMock
|
||||
from testtools.testcase import ExpectedException
|
||||
from trove.common import exception
|
||||
@ -23,6 +25,8 @@ from trove.guestagent.datastore.experimental.cassandra import (
|
||||
service as cass_service
|
||||
)
|
||||
from trove.guestagent.strategies.backup import base as backupBase
|
||||
from trove.guestagent.strategies.backup.experimental.postgresql_impl \
|
||||
import PgBaseBackupUtil
|
||||
from trove.guestagent.strategies.backup.mysql_impl import MySqlApp
|
||||
from trove.guestagent.strategies.restore import base as restoreBase
|
||||
from trove.guestagent.strategies.restore.mysql_impl import MySQLRestoreMixin
|
||||
@ -890,6 +894,56 @@ class RedisRestoreTests(trove_testtools.TestCase):
|
||||
self.restore_runner.restore)
|
||||
|
||||
|
||||
class PostgresqlBackupTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
super(PostgresqlBackupTests, self).setUp()
|
||||
self.bkutil = PgBaseBackupUtil()
|
||||
|
||||
self.b1 = ['000000010000000000000003',
|
||||
'000000010000000000000004',
|
||||
'000000010000000000000005',
|
||||
'000000010000000000000006',
|
||||
'000000010000000000000006.00000168.backup']
|
||||
|
||||
self.b2 = ['000000010000000000000007',
|
||||
'000000010000000000000008',
|
||||
'000000010000000000000009',
|
||||
'000000010000000000000010',
|
||||
'000000010000000000000009.0008A168.backup']
|
||||
|
||||
def tearDown(self):
|
||||
super(PostgresqlBackupTests, self).tearDown()
|
||||
|
||||
def test_check_most_recent_backup(self):
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1):
|
||||
mrb = self.bkutil.most_recent_backup_file()
|
||||
self.assertEqual(mrb, self.b1[4])
|
||||
mrbfile = self.bkutil.most_recent_backup_wal()
|
||||
self.assertEqual(mrbfile, self.b1[3])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1 + self.b2):
|
||||
mrb = self.bkutil.most_recent_backup_file()
|
||||
self.assertEqual(mrb, self.b2[4])
|
||||
mrbfile = self.bkutil.most_recent_backup_wal()
|
||||
self.assertEqual(mrbfile, self.b2[2])
|
||||
|
||||
def test_check_most_recent_wal_list(self):
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b1[3]])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b2):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b2[2], self.b2[3]])
|
||||
|
||||
with patch.object(os, 'listdir', return_value=self.b1 + self.b2):
|
||||
logs = self.bkutil.log_files_since_last_backup()
|
||||
self.assertEqual(logs, [self.b2[2], self.b2[3]])
|
||||
|
||||
|
||||
class DB2BackupTests(trove_testtools.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user