relinker: Improve performance by limiting I/O
This commit reduce the number of I/O done by the swift-object-relinker. First, it saves a progress state of relinking and cleanup in case the process is interrupted during the operation. This allow to resume operation without rescanning all partitions. Secondly, it prevents from being scanned by relink and cleanup all partitions that are bigger than 2^part_power (or (2^next_part_power)/2). These partitions were not existing before the beginning of the part_power increase, so there is nothing to relink or cleanup. Thirdly, it reverse-orders the partitions to scan so that some useless work is avoided. If a device contains partitions 1 and 3, relinking partition 1 will create "new" objects in partition 3, that will need to be scanned when the relinker will work on partition 3. It is useless. If partition 3 is done first, it will only contain the objects that need to be relinked. Fourthly, it allows to specify a unique device to work on. To do that, some hooks were added in audit_location_generator to allow to execute some custom code before/after iterating a device/partition/suffix/hash. Change-Id: If1bf8ed9036fb0ec619b0d4f16061a81a1af2082
This commit is contained in:
parent
742835a6ec
commit
3061ec803f
@ -28,6 +28,8 @@ if __name__ == '__main__':
|
||||
dest='swift_dir', help='Path to swift directory')
|
||||
parser.add_argument('--devices', default='/srv/node',
|
||||
dest='devices', help='Path to swift device directory')
|
||||
parser.add_argument('--device', default=None, dest='device',
|
||||
help='Device name to relink (default: all)')
|
||||
parser.add_argument('--skip-mount-check', default=False,
|
||||
help='Don\'t test if disk is mounted',
|
||||
action="store_true", dest='skip_mount_check')
|
||||
|
@ -14,8 +14,12 @@
|
||||
# limitations under the License.
|
||||
|
||||
|
||||
import errno
|
||||
import fcntl
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from functools import partial
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.exceptions import DiskFileDeleted, DiskFileNotExist, \
|
||||
DiskFileQuarantined
|
||||
@ -24,10 +28,126 @@ from swift.common.utils import replace_partition_in_path, \
|
||||
from swift.obj import diskfile
|
||||
|
||||
|
||||
LOCK_FILE = '.relink.{datadir}.lock'
|
||||
STATE_FILE = 'relink.{datadir}.json'
|
||||
STATE_TMP_FILE = '.relink.{datadir}.json.tmp'
|
||||
STEP_RELINK = 'relink'
|
||||
STEP_CLEANUP = 'cleanup'
|
||||
|
||||
|
||||
def devices_filter(device, _, devices):
|
||||
if device:
|
||||
devices = [d for d in devices if d == device]
|
||||
|
||||
return set(devices)
|
||||
|
||||
|
||||
def hook_pre_device(locks, states, datadir, device_path):
|
||||
lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir))
|
||||
|
||||
fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
|
||||
fcntl.flock(fd, fcntl.LOCK_EX)
|
||||
locks[0] = fd
|
||||
|
||||
state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir))
|
||||
states.clear()
|
||||
try:
|
||||
with open(state_file, 'rt') as f:
|
||||
tmp = json.load(f)
|
||||
states.update(tmp)
|
||||
except ValueError:
|
||||
# Invalid JSON: remove the file to restart from scratch
|
||||
os.unlink(state_file)
|
||||
except IOError as err:
|
||||
# Ignore file not found error
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
|
||||
def hook_post_device(locks, _):
|
||||
os.close(locks[0])
|
||||
locks[0] = None
|
||||
|
||||
|
||||
def partitions_filter(states, step, part_power, next_part_power,
|
||||
datadir_path, partitions):
|
||||
# Remove all non partitions first (eg: auditor_status_ALL.json)
|
||||
partitions = [p for p in partitions if p.isdigit()]
|
||||
|
||||
if not (step == STEP_CLEANUP and part_power == next_part_power):
|
||||
# This is not a cleanup after cancel, partitions in the upper half are
|
||||
# new partitions, there is nothing to relink/cleanup from there
|
||||
partitions = [p for p in partitions
|
||||
if int(p) < 2 ** next_part_power / 2]
|
||||
|
||||
# Format: { 'part': [relinked, cleaned] }
|
||||
if states:
|
||||
missing = list(set(partitions) - set(states.keys()))
|
||||
if missing:
|
||||
# All missing partitions was created after the first run of
|
||||
# relink, so after the new ring was distribued, so they already
|
||||
# are hardlinked in both partitions, but they will need to
|
||||
# cleaned.. Just update the state file.
|
||||
for part in missing:
|
||||
states[part] = [True, False]
|
||||
if step == STEP_RELINK:
|
||||
partitions = [str(p) for p, (r, c) in states.items() if not r]
|
||||
elif step == STEP_CLEANUP:
|
||||
partitions = [str(p) for p, (r, c) in states.items() if not c]
|
||||
else:
|
||||
states.update({str(p): [False, False] for p in partitions})
|
||||
|
||||
# Always scan the partitions in reverse order to minimize the amount of IO
|
||||
# (it actually only matters for relink, not for cleanup).
|
||||
#
|
||||
# Initial situation:
|
||||
# objects/0/000/00000000000000000000000000000000/12345.data
|
||||
# -> relinked to objects/1/000/10000000000000000000000000000000/12345.data
|
||||
#
|
||||
# If the relinker then scan partition 1, it will listdir that object while
|
||||
# it's unnecessary. By working in reverse order of partitions, this is
|
||||
# avoided.
|
||||
partitions = sorted(partitions, key=lambda x: int(x), reverse=True)
|
||||
|
||||
return partitions
|
||||
|
||||
|
||||
# Save states when a partition is done
|
||||
def hook_post_partition(states, step,
|
||||
partition_path):
|
||||
part = os.path.basename(os.path.abspath(partition_path))
|
||||
datadir_path = os.path.dirname(os.path.abspath(partition_path))
|
||||
device_path = os.path.dirname(os.path.abspath(datadir_path))
|
||||
datadir_name = os.path.basename(os.path.abspath(datadir_path))
|
||||
state_tmp_file = os.path.join(device_path,
|
||||
STATE_TMP_FILE.format(datadir=datadir_name))
|
||||
state_file = os.path.join(device_path,
|
||||
STATE_FILE.format(datadir=datadir_name))
|
||||
|
||||
if step == STEP_RELINK:
|
||||
states[part][0] = True
|
||||
elif step == STEP_CLEANUP:
|
||||
states[part][1] = True
|
||||
with open(state_tmp_file, 'wt') as f:
|
||||
json.dump(states, f)
|
||||
os.fsync(f.fileno())
|
||||
os.rename(state_tmp_file, state_file)
|
||||
|
||||
|
||||
def hashes_filter(next_part_power, suff_path, hashes):
|
||||
hashes = list(hashes)
|
||||
for hsh in hashes:
|
||||
fname = os.path.join(suff_path, hsh, 'fake-file-name')
|
||||
if replace_partition_in_path(fname, next_part_power) == fname:
|
||||
hashes.remove(hsh)
|
||||
return hashes
|
||||
|
||||
|
||||
def relink(swift_dir='/etc/swift',
|
||||
devices='/srv/node',
|
||||
skip_mount_check=False,
|
||||
logger=logging.getLogger()):
|
||||
logger=logging.getLogger(),
|
||||
device=None):
|
||||
mount_check = not skip_mount_check
|
||||
run = False
|
||||
relinked = errors = 0
|
||||
@ -41,10 +161,31 @@ def relink(swift_dir='/etc/swift',
|
||||
logging.info('Relinking files for policy %s under %s',
|
||||
policy.name, devices)
|
||||
run = True
|
||||
datadir = diskfile.get_data_dir(policy)
|
||||
|
||||
locks = [None]
|
||||
states = {}
|
||||
relink_devices_filter = partial(devices_filter, device)
|
||||
relink_hook_pre_device = partial(hook_pre_device, locks, states,
|
||||
datadir)
|
||||
relink_hook_post_device = partial(hook_post_device, locks)
|
||||
relink_partition_filter = partial(partitions_filter,
|
||||
states, STEP_RELINK,
|
||||
part_power, next_part_power)
|
||||
relink_hook_post_partition = partial(hook_post_partition,
|
||||
states, STEP_RELINK)
|
||||
relink_hashes_filter = partial(hashes_filter, next_part_power)
|
||||
|
||||
locations = audit_location_generator(
|
||||
devices,
|
||||
diskfile.get_data_dir(policy),
|
||||
mount_check=mount_check)
|
||||
datadir,
|
||||
mount_check=mount_check,
|
||||
devices_filter=relink_devices_filter,
|
||||
hook_pre_device=relink_hook_pre_device,
|
||||
hook_post_device=relink_hook_post_device,
|
||||
partitions_filter=relink_partition_filter,
|
||||
hook_post_partition=relink_hook_post_partition,
|
||||
hashes_filter=relink_hashes_filter)
|
||||
for fname, _, _ in locations:
|
||||
newfname = replace_partition_in_path(fname, next_part_power)
|
||||
try:
|
||||
@ -67,7 +208,8 @@ def relink(swift_dir='/etc/swift',
|
||||
def cleanup(swift_dir='/etc/swift',
|
||||
devices='/srv/node',
|
||||
skip_mount_check=False,
|
||||
logger=logging.getLogger()):
|
||||
logger=logging.getLogger(),
|
||||
device=None):
|
||||
mount_check = not skip_mount_check
|
||||
conf = {'devices': devices, 'mount_check': mount_check}
|
||||
diskfile_router = diskfile.DiskFileRouter(conf, get_logger(conf))
|
||||
@ -83,10 +225,31 @@ def cleanup(swift_dir='/etc/swift',
|
||||
logging.info('Cleaning up files for policy %s under %s',
|
||||
policy.name, devices)
|
||||
run = True
|
||||
datadir = diskfile.get_data_dir(policy)
|
||||
|
||||
locks = [None]
|
||||
states = {}
|
||||
cleanup_devices_filter = partial(devices_filter, device)
|
||||
cleanup_hook_pre_device = partial(hook_pre_device, locks, states,
|
||||
datadir)
|
||||
cleanup_hook_post_device = partial(hook_post_device, locks)
|
||||
cleanup_partition_filter = partial(partitions_filter,
|
||||
states, STEP_CLEANUP,
|
||||
part_power, next_part_power)
|
||||
cleanup_hook_post_partition = partial(hook_post_partition,
|
||||
states, STEP_CLEANUP)
|
||||
cleanup_hashes_filter = partial(hashes_filter, next_part_power)
|
||||
|
||||
locations = audit_location_generator(
|
||||
devices,
|
||||
diskfile.get_data_dir(policy),
|
||||
mount_check=mount_check)
|
||||
datadir,
|
||||
mount_check=mount_check,
|
||||
devices_filter=cleanup_devices_filter,
|
||||
hook_pre_device=cleanup_hook_pre_device,
|
||||
hook_post_device=cleanup_hook_post_device,
|
||||
partitions_filter=cleanup_partition_filter,
|
||||
hook_post_partition=cleanup_hook_post_partition,
|
||||
hashes_filter=cleanup_hashes_filter)
|
||||
for fname, device, partition in locations:
|
||||
expected_fname = replace_partition_in_path(fname, part_power)
|
||||
if fname == expected_fname:
|
||||
@ -152,8 +315,10 @@ def main(args):
|
||||
|
||||
if args.action == 'relink':
|
||||
return relink(
|
||||
args.swift_dir, args.devices, args.skip_mount_check, logger)
|
||||
args.swift_dir, args.devices, args.skip_mount_check, logger,
|
||||
device=args.device)
|
||||
|
||||
if args.action == 'cleanup':
|
||||
return cleanup(
|
||||
args.swift_dir, args.devices, args.skip_mount_check, logger)
|
||||
args.swift_dir, args.devices, args.skip_mount_check, logger,
|
||||
device=args.device)
|
||||
|
@ -3147,11 +3147,26 @@ def remove_directory(path):
|
||||
|
||||
|
||||
def audit_location_generator(devices, datadir, suffix='',
|
||||
mount_check=True, logger=None):
|
||||
mount_check=True, logger=None,
|
||||
devices_filter=None, partitions_filter=None,
|
||||
suffixes_filter=None, hashes_filter=None,
|
||||
hook_pre_device=None, hook_post_device=None,
|
||||
hook_pre_partition=None, hook_post_partition=None,
|
||||
hook_pre_suffix=None, hook_post_suffix=None,
|
||||
hook_pre_hash=None, hook_post_hash=None):
|
||||
"""
|
||||
Given a devices path and a data directory, yield (path, device,
|
||||
partition) for all files in that directory
|
||||
|
||||
(devices|partitions|suffixes|hashes)_filter are meant to modify the list of
|
||||
elements that will be iterated. eg: they can be used to exclude some
|
||||
elements based on a custom condition defined by the caller.
|
||||
|
||||
hook_pre_(device|partition|suffix|hash) are called before yielding the
|
||||
element, hook_pos_(device|partition|suffix|hash) are called after the
|
||||
element was yielded. They are meant to do some pre/post processing.
|
||||
eg: saving a progress status.
|
||||
|
||||
:param devices: parent directory of the devices to be audited
|
||||
:param datadir: a directory located under self.devices. This should be
|
||||
one of the DATADIR constants defined in the account,
|
||||
@ -3160,11 +3175,31 @@ def audit_location_generator(devices, datadir, suffix='',
|
||||
:param mount_check: Flag to check if a mount check should be performed
|
||||
on devices
|
||||
:param logger: a logger object
|
||||
:devices_filter: a callable taking (devices, [list of devices]) as
|
||||
parameters and returning a [list of devices]
|
||||
:partitions_filter: a callable taking (datadir_path, [list of parts]) as
|
||||
parameters and returning a [list of parts]
|
||||
:suffixes_filter: a callable taking (part_path, [list of suffixes]) as
|
||||
parameters and returning a [list of suffixes]
|
||||
:hashes_filter: a callable taking (suff_path, [list of hashes]) as
|
||||
parameters and returning a [list of hashes]
|
||||
:hook_pre_device: a callable taking device_path as parameter
|
||||
:hook_post_device: a callable taking device_path as parameter
|
||||
:hook_pre_partition: a callable taking part_path as parameter
|
||||
:hook_post_partition: a callable taking part_path as parameter
|
||||
:hook_pre_suffix: a callable taking suff_path as parameter
|
||||
:hook_post_suffix: a callable taking suff_path as parameter
|
||||
:hook_pre_hash: a callable taking hash_path as parameter
|
||||
:hook_post_hash: a callable taking hash_path as parameter
|
||||
"""
|
||||
device_dir = listdir(devices)
|
||||
# randomize devices in case of process restart before sweep completed
|
||||
shuffle(device_dir)
|
||||
if devices_filter:
|
||||
device_dir = devices_filter(devices, device_dir)
|
||||
for device in device_dir:
|
||||
if hook_pre_device:
|
||||
hook_pre_device(os.path.join(devices, device))
|
||||
if mount_check and not ismount(os.path.join(devices, device)):
|
||||
if logger:
|
||||
logger.warning(
|
||||
@ -3178,24 +3213,36 @@ def audit_location_generator(devices, datadir, suffix='',
|
||||
logger.warning(_('Skipping %(datadir)s because %(err)s'),
|
||||
{'datadir': datadir_path, 'err': e})
|
||||
continue
|
||||
if partitions_filter:
|
||||
partitions = partitions_filter(datadir_path, partitions)
|
||||
for partition in partitions:
|
||||
part_path = os.path.join(datadir_path, partition)
|
||||
if hook_pre_partition:
|
||||
hook_pre_partition(part_path)
|
||||
try:
|
||||
suffixes = listdir(part_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
if suffixes_filter:
|
||||
suffixes = suffixes_filter(part_path, suffixes)
|
||||
for asuffix in suffixes:
|
||||
suff_path = os.path.join(part_path, asuffix)
|
||||
if hook_pre_suffix:
|
||||
hook_pre_suffix(suff_path)
|
||||
try:
|
||||
hashes = listdir(suff_path)
|
||||
except OSError as e:
|
||||
if e.errno != errno.ENOTDIR:
|
||||
raise
|
||||
continue
|
||||
if hashes_filter:
|
||||
hashes = hashes_filter(suff_path, hashes)
|
||||
for hsh in hashes:
|
||||
hash_path = os.path.join(suff_path, hsh)
|
||||
if hook_pre_hash:
|
||||
hook_pre_hash(hash_path)
|
||||
try:
|
||||
files = sorted(listdir(hash_path), reverse=True)
|
||||
except OSError as e:
|
||||
@ -3207,6 +3254,14 @@ def audit_location_generator(devices, datadir, suffix='',
|
||||
continue
|
||||
path = os.path.join(hash_path, fname)
|
||||
yield path, device, partition
|
||||
if hook_post_hash:
|
||||
hook_post_hash(hash_path)
|
||||
if hook_post_suffix:
|
||||
hook_post_suffix(suff_path)
|
||||
if hook_post_partition:
|
||||
hook_post_partition(part_path)
|
||||
if hook_post_device:
|
||||
hook_post_device(os.path.join(devices, device))
|
||||
|
||||
|
||||
def ratelimit_sleep(running_time, max_rate, incr_by=1, rate_buffer=5):
|
||||
|
@ -12,6 +12,9 @@
|
||||
# limitations under the License.
|
||||
|
||||
import binascii
|
||||
import errno
|
||||
import fcntl
|
||||
import json
|
||||
import os
|
||||
import shutil
|
||||
import struct
|
||||
@ -30,6 +33,9 @@ from test.unit import FakeLogger, skip_if_no_xattrs, DEFAULT_TEST_EC_TYPE, \
|
||||
patch_policies
|
||||
|
||||
|
||||
PART_POWER = 8
|
||||
|
||||
|
||||
class TestRelinker(unittest.TestCase):
|
||||
def setUp(self):
|
||||
skip_if_no_xattrs()
|
||||
@ -40,7 +46,7 @@ class TestRelinker(unittest.TestCase):
|
||||
os.mkdir(self.testdir)
|
||||
os.mkdir(self.devices)
|
||||
|
||||
self.rb = ring.RingBuilder(8, 6.0, 1)
|
||||
self.rb = ring.RingBuilder(PART_POWER, 6.0, 1)
|
||||
|
||||
for i in range(6):
|
||||
ip = "127.0.0.%s" % i
|
||||
@ -55,10 +61,10 @@ class TestRelinker(unittest.TestCase):
|
||||
os.mkdir(self.objects)
|
||||
self._hash = utils.hash_path('a/c/o')
|
||||
digest = binascii.unhexlify(self._hash)
|
||||
part = struct.unpack_from('>I', digest)[0] >> 24
|
||||
self.part = struct.unpack_from('>I', digest)[0] >> 24
|
||||
self.next_part = struct.unpack_from('>I', digest)[0] >> 23
|
||||
self.objdir = os.path.join(
|
||||
self.objects, str(part), self._hash[-3:], self._hash)
|
||||
self.objects, str(self.part), self._hash[-3:], self._hash)
|
||||
os.makedirs(self.objdir)
|
||||
self.object_fname = "1278553064.00000.data"
|
||||
self.objname = os.path.join(self.objdir, self.object_fname)
|
||||
@ -97,6 +103,27 @@ class TestRelinker(unittest.TestCase):
|
||||
stat_new = os.stat(self.expected_file)
|
||||
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
|
||||
|
||||
def test_relink_device_filter(self):
|
||||
self.rb.prepare_increase_partition_power()
|
||||
self._save_ring()
|
||||
relinker.relink(self.testdir, self.devices, True,
|
||||
device=self.existing_device)
|
||||
|
||||
self.assertTrue(os.path.isdir(self.expected_dir))
|
||||
self.assertTrue(os.path.isfile(self.expected_file))
|
||||
|
||||
stat_old = os.stat(os.path.join(self.objdir, self.object_fname))
|
||||
stat_new = os.stat(self.expected_file)
|
||||
self.assertEqual(stat_old.st_ino, stat_new.st_ino)
|
||||
|
||||
def test_relink_device_filter_invalid(self):
|
||||
self.rb.prepare_increase_partition_power()
|
||||
self._save_ring()
|
||||
relinker.relink(self.testdir, self.devices, True, device='none')
|
||||
|
||||
self.assertFalse(os.path.isdir(self.expected_dir))
|
||||
self.assertFalse(os.path.isfile(self.expected_file))
|
||||
|
||||
def _common_test_cleanup(self, relink=True):
|
||||
# Create a ring that has prev_part_power set
|
||||
self.rb.prepare_increase_partition_power()
|
||||
@ -121,6 +148,187 @@ class TestRelinker(unittest.TestCase):
|
||||
self.assertFalse(os.path.isfile(
|
||||
os.path.join(self.objdir, self.object_fname)))
|
||||
|
||||
def test_cleanup_device_filter(self):
|
||||
self._common_test_cleanup()
|
||||
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True,
|
||||
device=self.existing_device))
|
||||
|
||||
# Old objectname should be removed, new should still exist
|
||||
self.assertTrue(os.path.isdir(self.expected_dir))
|
||||
self.assertTrue(os.path.isfile(self.expected_file))
|
||||
self.assertFalse(os.path.isfile(
|
||||
os.path.join(self.objdir, self.object_fname)))
|
||||
|
||||
def test_cleanup_device_filter_invalid(self):
|
||||
self._common_test_cleanup()
|
||||
self.assertEqual(0, relinker.cleanup(self.testdir, self.devices, True,
|
||||
device='none'))
|
||||
|
||||
# Old objectname should still exist, new should still exist
|
||||
self.assertTrue(os.path.isdir(self.expected_dir))
|
||||
self.assertTrue(os.path.isfile(self.expected_file))
|
||||
self.assertTrue(os.path.isfile(
|
||||
os.path.join(self.objdir, self.object_fname)))
|
||||
|
||||
def test_relink_cleanup(self):
|
||||
state_file = os.path.join(self.devices, self.existing_device,
|
||||
'relink.objects.json')
|
||||
|
||||
self.rb.prepare_increase_partition_power()
|
||||
self._save_ring()
|
||||
relinker.relink(self.testdir, self.devices, True)
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {str(self.part): [True, False]})
|
||||
|
||||
self.rb.increase_partition_power()
|
||||
self.rb._ring = None # Force builder to reload ring
|
||||
self._save_ring()
|
||||
relinker.cleanup(self.testdir, self.devices, True)
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f),
|
||||
{str(self.part): [True, True],
|
||||
str(self.next_part): [True, True]})
|
||||
|
||||
def test_devices_filter_filtering(self):
|
||||
# With no filtering, returns all devices
|
||||
devices = relinker.devices_filter(None, "", [self.existing_device])
|
||||
self.assertEqual(set([self.existing_device]), devices)
|
||||
|
||||
# With a matching filter, returns what is matching
|
||||
devices = relinker.devices_filter(self.existing_device, "",
|
||||
[self.existing_device, 'sda2'])
|
||||
self.assertEqual(set([self.existing_device]), devices)
|
||||
|
||||
# With a non matching filter, returns nothing
|
||||
devices = relinker.devices_filter('none', "", [self.existing_device])
|
||||
self.assertEqual(set(), devices)
|
||||
|
||||
def test_hook_pre_post_device_locking(self):
|
||||
locks = [None]
|
||||
device_path = os.path.join(self.devices, self.existing_device)
|
||||
datadir = 'object'
|
||||
lock_file = os.path.join(device_path, '.relink.%s.lock' % datadir)
|
||||
|
||||
# The first run gets the lock
|
||||
relinker.hook_pre_device(locks, {}, datadir, device_path)
|
||||
self.assertNotEqual([None], locks)
|
||||
|
||||
# A following run would block
|
||||
with self.assertRaises(IOError) as raised:
|
||||
with open(lock_file, 'a') as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
self.assertEqual(errno.EAGAIN, raised.exception.errno)
|
||||
|
||||
# Another must not get the lock, so it must return an empty list
|
||||
relinker.hook_post_device(locks, "")
|
||||
self.assertEqual([None], locks)
|
||||
|
||||
with open(lock_file, 'a') as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def test_state_file(self):
|
||||
device_path = os.path.join(self.devices, self.existing_device)
|
||||
datadir = 'objects'
|
||||
datadir_path = os.path.join(device_path, datadir)
|
||||
state_file = os.path.join(device_path, 'relink.%s.json' % datadir)
|
||||
|
||||
def call_partition_filter(step, parts):
|
||||
# Partition 312 will be ignored because it must have been created
|
||||
# by the relinker
|
||||
return relinker.partitions_filter(states, step,
|
||||
PART_POWER, PART_POWER + 1,
|
||||
datadir_path, parts)
|
||||
|
||||
# Start relinking
|
||||
states = {}
|
||||
|
||||
# Load the states: As it starts, it must be empty
|
||||
locks = [None]
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual({}, states)
|
||||
os.close(locks[0]) # Release the lock
|
||||
|
||||
# Partition 312 is ignored because it must have been created with the
|
||||
# next_part_power, so it does not need to be relinked
|
||||
# 96 and 227 are reverse ordered
|
||||
# auditor_status_ALL.json is ignored because it's not a partition
|
||||
self.assertEqual(['227', '96'],
|
||||
call_partition_filter(relinker.STEP_RELINK,
|
||||
['96', '227', '312',
|
||||
'auditor_status.json']))
|
||||
self.assertEqual(states, {'96': [False, False], '227': [False, False]})
|
||||
|
||||
# Ack partition 96
|
||||
relinker.hook_post_partition(states, relinker.STEP_RELINK,
|
||||
os.path.join(datadir_path, '96'))
|
||||
self.assertEqual(states, {'96': [True, False], '227': [False, False]})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {'96': [True, False],
|
||||
'227': [False, False]})
|
||||
|
||||
# Restart relinking after only part 96 was done
|
||||
self.assertEqual(['227'],
|
||||
call_partition_filter(relinker.STEP_RELINK,
|
||||
['96', '227', '312']))
|
||||
self.assertEqual(states, {'96': [True, False], '227': [False, False]})
|
||||
|
||||
# Ack partition 227
|
||||
relinker.hook_post_partition(states, relinker.STEP_RELINK,
|
||||
os.path.join(datadir_path, '227'))
|
||||
self.assertEqual(states, {'96': [True, False], '227': [True, False]})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {'96': [True, False],
|
||||
'227': [True, False]})
|
||||
|
||||
# If the process restarts, it reload the state
|
||||
locks = [None]
|
||||
states = {}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states, {'96': [True, False], '227': [True, False]})
|
||||
os.close(locks[0]) # Release the lock
|
||||
|
||||
# Start cleanup
|
||||
self.assertEqual(['227', '96'],
|
||||
call_partition_filter(relinker.STEP_CLEANUP,
|
||||
['96', '227', '312']))
|
||||
# Ack partition 227
|
||||
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
|
||||
os.path.join(datadir_path, '227'))
|
||||
self.assertEqual(states, {'96': [True, False], '227': [True, True]})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {'96': [True, False],
|
||||
'227': [True, True]})
|
||||
|
||||
# Restart cleanup after only part 227 was done
|
||||
self.assertEqual(['96'],
|
||||
call_partition_filter(relinker.STEP_CLEANUP,
|
||||
['96', '227', '312']))
|
||||
self.assertEqual(states, {'96': [True, False], '227': [True, True]})
|
||||
|
||||
# Ack partition 96
|
||||
relinker.hook_post_partition(states, relinker.STEP_CLEANUP,
|
||||
os.path.join(datadir_path, '96'))
|
||||
self.assertEqual(states, {'96': [True, True], '227': [True, True]})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {'96': [True, True],
|
||||
'227': [True, True]})
|
||||
|
||||
# At the end, the state is still accurate
|
||||
locks = [None]
|
||||
states = {}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states, {'96': [True, True], '227': [True, True]})
|
||||
os.close(locks[0]) # Release the lock
|
||||
|
||||
# If the file gets corrupted, restart from scratch
|
||||
with open(state_file, 'wt') as f:
|
||||
f.write('NOT JSON')
|
||||
locks = [None]
|
||||
states = {}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states, {})
|
||||
os.close(locks[0]) # Release the lock
|
||||
|
||||
def test_cleanup_not_yet_relinked(self):
|
||||
self._common_test_cleanup(relink=False)
|
||||
self.assertEqual(1, relinker.cleanup(self.testdir, self.devices, True))
|
||||
@ -176,3 +384,7 @@ class TestRelinker(unittest.TestCase):
|
||||
|
||||
self.assertIn('failed audit and was quarantined',
|
||||
self.logger.get_lines_for_level('warning')[0])
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -6015,6 +6015,136 @@ class TestAuditLocationGenerator(unittest.TestCase):
|
||||
self.assertEqual(list(locations),
|
||||
[(obj_path, "drive", "partition2")])
|
||||
|
||||
def test_hooks(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
partition = os.path.join(data, "partition1")
|
||||
os.makedirs(partition)
|
||||
suffix = os.path.join(partition, "suffix1")
|
||||
os.makedirs(suffix)
|
||||
hash_path = os.path.join(suffix, "hash1")
|
||||
os.makedirs(hash_path)
|
||||
obj_path = os.path.join(hash_path, "obj1.dat")
|
||||
with open(obj_path, "w"):
|
||||
pass
|
||||
meta_path = os.path.join(hash_path, "obj1.meta")
|
||||
with open(meta_path, "w"):
|
||||
pass
|
||||
hook_pre_device = MagicMock()
|
||||
hook_post_device = MagicMock()
|
||||
hook_pre_partition = MagicMock()
|
||||
hook_post_partition = MagicMock()
|
||||
hook_pre_suffix = MagicMock()
|
||||
hook_post_suffix = MagicMock()
|
||||
hook_pre_hash = MagicMock()
|
||||
hook_post_hash = MagicMock()
|
||||
locations = utils.audit_location_generator(
|
||||
tmpdir, "data", ".dat", mount_check=False, logger=logger,
|
||||
hook_pre_device=hook_pre_device,
|
||||
hook_post_device=hook_post_device,
|
||||
hook_pre_partition=hook_pre_partition,
|
||||
hook_post_partition=hook_post_partition,
|
||||
hook_pre_suffix=hook_pre_suffix,
|
||||
hook_post_suffix=hook_post_suffix,
|
||||
hook_pre_hash=hook_pre_hash,
|
||||
hook_post_hash=hook_post_hash
|
||||
)
|
||||
list(locations)
|
||||
hook_pre_device.assert_called_once_with(os.path.join(tmpdir,
|
||||
"drive"))
|
||||
hook_post_device.assert_called_once_with(os.path.join(tmpdir,
|
||||
"drive"))
|
||||
hook_pre_partition.assert_called_once_with(partition)
|
||||
hook_post_partition.assert_called_once_with(partition)
|
||||
hook_pre_suffix.assert_called_once_with(suffix)
|
||||
hook_post_suffix.assert_called_once_with(suffix)
|
||||
hook_pre_hash.assert_called_once_with(hash_path)
|
||||
hook_post_hash.assert_called_once_with(hash_path)
|
||||
|
||||
def test_filters(self):
|
||||
with temptree([]) as tmpdir:
|
||||
logger = FakeLogger()
|
||||
data = os.path.join(tmpdir, "drive", "data")
|
||||
os.makedirs(data)
|
||||
partition = os.path.join(data, "partition1")
|
||||
os.makedirs(partition)
|
||||
suffix = os.path.join(partition, "suffix1")
|
||||
os.makedirs(suffix)
|
||||
hash_path = os.path.join(suffix, "hash1")
|
||||
os.makedirs(hash_path)
|
||||
obj_path = os.path.join(hash_path, "obj1.dat")
|
||||
with open(obj_path, "w"):
|
||||
pass
|
||||
meta_path = os.path.join(hash_path, "obj1.meta")
|
||||
with open(meta_path, "w"):
|
||||
pass
|
||||
|
||||
def audit_location_generator(**kwargs):
|
||||
return utils.audit_location_generator(
|
||||
tmpdir, "data", ".dat", mount_check=False, logger=logger,
|
||||
**kwargs)
|
||||
|
||||
# Return the list of devices
|
||||
|
||||
with patch('os.listdir', side_effect=os.listdir) as m_listdir:
|
||||
# devices_filter
|
||||
m_listdir.reset_mock()
|
||||
devices_filter = MagicMock(return_value=["drive"])
|
||||
list(audit_location_generator(devices_filter=devices_filter))
|
||||
devices_filter.assert_called_once_with(tmpdir, ["drive"])
|
||||
self.assertIn(((data,),), m_listdir.call_args_list)
|
||||
|
||||
m_listdir.reset_mock()
|
||||
devices_filter = MagicMock(return_value=[])
|
||||
list(audit_location_generator(devices_filter=devices_filter))
|
||||
devices_filter.assert_called_once_with(tmpdir, ["drive"])
|
||||
self.assertNotIn(((data,),), m_listdir.call_args_list)
|
||||
|
||||
# partitions_filter
|
||||
m_listdir.reset_mock()
|
||||
partitions_filter = MagicMock(return_value=["partition1"])
|
||||
list(audit_location_generator(
|
||||
partitions_filter=partitions_filter))
|
||||
partitions_filter.assert_called_once_with(data,
|
||||
["partition1"])
|
||||
self.assertIn(((partition,),), m_listdir.call_args_list)
|
||||
|
||||
m_listdir.reset_mock()
|
||||
partitions_filter = MagicMock(return_value=[])
|
||||
list(audit_location_generator(
|
||||
partitions_filter=partitions_filter))
|
||||
partitions_filter.assert_called_once_with(data,
|
||||
["partition1"])
|
||||
self.assertNotIn(((partition,),), m_listdir.call_args_list)
|
||||
|
||||
# suffixes_filter
|
||||
m_listdir.reset_mock()
|
||||
suffixes_filter = MagicMock(return_value=["suffix1"])
|
||||
list(audit_location_generator(suffixes_filter=suffixes_filter))
|
||||
suffixes_filter.assert_called_once_with(partition, ["suffix1"])
|
||||
self.assertIn(((suffix,),), m_listdir.call_args_list)
|
||||
|
||||
m_listdir.reset_mock()
|
||||
suffixes_filter = MagicMock(return_value=[])
|
||||
list(audit_location_generator(suffixes_filter=suffixes_filter))
|
||||
suffixes_filter.assert_called_once_with(partition, ["suffix1"])
|
||||
self.assertNotIn(((suffix,),), m_listdir.call_args_list)
|
||||
|
||||
# hashes_filter
|
||||
m_listdir.reset_mock()
|
||||
hashes_filter = MagicMock(return_value=["hash1"])
|
||||
list(audit_location_generator(hashes_filter=hashes_filter))
|
||||
hashes_filter.assert_called_once_with(suffix, ["hash1"])
|
||||
self.assertIn(((hash_path,),), m_listdir.call_args_list)
|
||||
|
||||
m_listdir.reset_mock()
|
||||
hashes_filter = MagicMock(return_value=[])
|
||||
list(audit_location_generator(hashes_filter=hashes_filter))
|
||||
hashes_filter.assert_called_once_with(suffix, ["hash1"])
|
||||
self.assertNotIn(((hash_path,),), m_listdir.call_args_list)
|
||||
|
||||
|
||||
class TestGreenAsyncPile(unittest.TestCase):
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user