relinker: Move filters & hooks to be methods
Shouldn't be any functional changes, though. Change-Id: If906513ed47f203ca97ef08d4375b01c76f812bb
This commit is contained in:
parent
f8265c18bf
commit
7e278291f8
@ -20,7 +20,6 @@ import fcntl
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from functools import partial
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common.utils import replace_partition_in_path, config_true_value, \
|
||||
audit_location_generator, get_logger, readconf, drop_privileges, \
|
||||
@ -46,181 +45,6 @@ def policy(policy_index):
|
||||
return value
|
||||
|
||||
|
||||
def devices_filter(device, _, devices):
|
||||
if device:
|
||||
devices = [d for d in devices if d == device]
|
||||
|
||||
return set(devices)
|
||||
|
||||
|
||||
def hook_pre_device(locks, states, datadir, device_path):
|
||||
lock_file = os.path.join(device_path, LOCK_FILE.format(datadir=datadir))
|
||||
|
||||
fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
|
||||
fcntl.flock(fd, fcntl.LOCK_EX)
|
||||
locks[0] = fd
|
||||
|
||||
state_file = os.path.join(device_path, STATE_FILE.format(datadir=datadir))
|
||||
states["state"].clear()
|
||||
try:
|
||||
with open(state_file, 'rt') as f:
|
||||
state_from_disk = json.load(f)
|
||||
if state_from_disk["next_part_power"] != states["next_part_power"]:
|
||||
raise ValueError
|
||||
if state_from_disk["part_power"] != states["part_power"]:
|
||||
states["prev_part_power"] = state_from_disk["part_power"]
|
||||
raise ValueError
|
||||
states["state"].update(state_from_disk["state"])
|
||||
except (ValueError, TypeError, KeyError):
|
||||
# Bad state file: remove the file to restart from scratch
|
||||
os.unlink(state_file)
|
||||
except IOError as err:
|
||||
# Ignore file not found error
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
|
||||
def hook_post_device(locks, _):
|
||||
os.close(locks[0])
|
||||
locks[0] = None
|
||||
|
||||
|
||||
def partitions_filter(states, part_power, next_part_power,
|
||||
datadir_path, partitions):
|
||||
# Remove all non partitions first (eg: auditor_status_ALL.json)
|
||||
partitions = [p for p in partitions if p.isdigit()]
|
||||
|
||||
relinking = (part_power != next_part_power)
|
||||
if relinking:
|
||||
# All partitions in the upper half are new partitions and there is
|
||||
# nothing to relink there
|
||||
partitions = [part for part in partitions
|
||||
if int(part) < 2 ** part_power]
|
||||
elif "prev_part_power" in states:
|
||||
# All partitions in the upper half are new partitions and there is
|
||||
# nothing to clean up there
|
||||
partitions = [part for part in partitions
|
||||
if int(part) < 2 ** states["prev_part_power"]]
|
||||
|
||||
# Format: { 'part': processed }
|
||||
if states["state"]:
|
||||
missing = list(set(partitions) - set(states["state"].keys()))
|
||||
if missing:
|
||||
# All missing partitions were created after the first run of the
|
||||
# relinker with this part_power/next_part_power pair. This is
|
||||
# expected when relinking, where new partitions appear that are
|
||||
# appropriate for the target part power. In such cases, there's
|
||||
# nothing to be done. Err on the side of caution during cleanup,
|
||||
# however.
|
||||
for part in missing:
|
||||
states["state"][part] = relinking
|
||||
partitions = [str(part) for part, processed in states["state"].items()
|
||||
if not processed]
|
||||
else:
|
||||
states["state"].update({str(part): False for part in partitions})
|
||||
|
||||
# Always scan the partitions in reverse order to minimize the amount of IO
|
||||
# (it actually only matters for relink, not for cleanup).
|
||||
#
|
||||
# Initial situation:
|
||||
# objects/0/000/00000000000000000000000000000000/12345.data
|
||||
# -> relinked to objects/1/000/10000000000000000000000000000000/12345.data
|
||||
#
|
||||
# If the relinker then scan partition 1, it will listdir that object while
|
||||
# it's unnecessary. By working in reverse order of partitions, this is
|
||||
# avoided.
|
||||
partitions = sorted(partitions, key=lambda part: int(part), reverse=True)
|
||||
|
||||
return partitions
|
||||
|
||||
|
||||
# Save states when a partition is done
|
||||
def hook_post_partition(logger, states, step, policy, diskfile_manager,
|
||||
partition_path):
|
||||
datadir_path, part = os.path.split(os.path.abspath(partition_path))
|
||||
device_path, datadir_name = os.path.split(datadir_path)
|
||||
device = os.path.basename(device_path)
|
||||
state_tmp_file = os.path.join(device_path,
|
||||
STATE_TMP_FILE.format(datadir=datadir_name))
|
||||
state_file = os.path.join(device_path,
|
||||
STATE_FILE.format(datadir=datadir_name))
|
||||
|
||||
# We started with a partition space like
|
||||
# |0 N|
|
||||
# |ABCDEFGHIJKLMNOP|
|
||||
#
|
||||
# After relinking, it will be more like
|
||||
# |0 2N|
|
||||
# |AABBCCDDEEFFGGHHIIJJKKLLMMNNOOPP|
|
||||
#
|
||||
# We want to hold off on rehashing until after cleanup, since that is the
|
||||
# point at which we've finished with filesystem manipulations. But there's
|
||||
# a slight complication: we know the upper half has nothing to clean up,
|
||||
# so the cleanup phase only looks at
|
||||
# |0 2N|
|
||||
# |AABBCCDDEEFFGGHH |
|
||||
#
|
||||
# To ensure that the upper half gets rehashed, too, do it as part of
|
||||
# relinking; as we finish
|
||||
# |0 N|
|
||||
# | IJKLMNOP|
|
||||
# shift to the new partition space and rehash
|
||||
# |0 2N|
|
||||
# | IIJJKKLLMMNNOOPP|
|
||||
partition = int(part)
|
||||
if step == STEP_RELINK and partition >= 2 ** (states['part_power'] - 1):
|
||||
for new_part in (2 * partition, 2 * partition + 1):
|
||||
diskfile_manager.get_hashes(device, new_part, [], policy)
|
||||
elif step == STEP_CLEANUP:
|
||||
hashes = diskfile_manager.get_hashes(device, partition, [], policy)
|
||||
# In any reasonably-large cluster, we'd expect all old partitions P
|
||||
# to be empty after cleanup (i.e., it's unlikely that there's another
|
||||
# partition Q := P//2 that also has data on this device).
|
||||
#
|
||||
# Try to clean up empty partitions now, so operators can use existing
|
||||
# rebalance-complete metrics to monitor relinking progress (provided
|
||||
# there are few/no handoffs when relinking starts and little data is
|
||||
# written to handoffs during the increase).
|
||||
if not hashes:
|
||||
with lock_path(partition_path):
|
||||
# Same lock used by invalidate_hashes, consolidate_hashes,
|
||||
# get_hashes
|
||||
try:
|
||||
os.unlink(os.path.join(partition_path, 'hashes.pkl'))
|
||||
os.unlink(os.path.join(partition_path, 'hashes.invalid'))
|
||||
os.unlink(os.path.join(partition_path, '.lock'))
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.rmdir(partition_path)
|
||||
except OSError:
|
||||
# Most likely, some data landed in here or we hit an error
|
||||
# above. Let the replicator deal with things; it was worth
|
||||
# a shot.
|
||||
pass
|
||||
|
||||
# Then mark this part as done, in case the process is interrupted and
|
||||
# needs to resume.
|
||||
states["state"][part] = True
|
||||
with open(state_tmp_file, 'wt') as f:
|
||||
json.dump(states, f)
|
||||
os.fsync(f.fileno())
|
||||
os.rename(state_tmp_file, state_file)
|
||||
num_parts_done = sum(1 for part in states["state"].values() if part)
|
||||
logger.info("Device: %s Step: %s Partitions: %d/%d" % (
|
||||
device, step, num_parts_done, len(states["state"])))
|
||||
|
||||
|
||||
def hashes_filter(devices, next_part_power, suff_path, hashes):
|
||||
hashes = list(hashes)
|
||||
for hsh in hashes:
|
||||
fname = os.path.join(suff_path, hsh)
|
||||
if fname == replace_partition_in_path(
|
||||
devices, fname, next_part_power):
|
||||
hashes.remove(hsh)
|
||||
return hashes
|
||||
|
||||
|
||||
class Relinker(object):
|
||||
def __init__(self, conf, logger, device, do_cleanup=False):
|
||||
self.conf = conf
|
||||
@ -232,6 +56,7 @@ class Relinker(object):
|
||||
self.root = os.path.join(self.root, self.device)
|
||||
self.part_power = self.next_part_power = None
|
||||
self.diskfile_mgr = None
|
||||
self.dev_lock = None
|
||||
self.diskfile_router = diskfile.DiskFileRouter(self.conf, self.logger)
|
||||
self._zero_stats()
|
||||
|
||||
@ -245,6 +70,186 @@ class Relinker(object):
|
||||
'policies': 0,
|
||||
}
|
||||
|
||||
def devices_filter(self, _, devices):
|
||||
if self.device:
|
||||
devices = [d for d in devices if d == self.device]
|
||||
|
||||
return set(devices)
|
||||
|
||||
def hook_pre_device(self, device_path):
|
||||
lock_file = os.path.join(device_path,
|
||||
LOCK_FILE.format(datadir=self.datadir))
|
||||
|
||||
fd = os.open(lock_file, os.O_CREAT | os.O_WRONLY)
|
||||
fcntl.flock(fd, fcntl.LOCK_EX)
|
||||
self.dev_lock = fd
|
||||
|
||||
state_file = os.path.join(device_path,
|
||||
STATE_FILE.format(datadir=self.datadir))
|
||||
self.states["state"].clear()
|
||||
try:
|
||||
with open(state_file, 'rt') as f:
|
||||
state_from_disk = json.load(f)
|
||||
if state_from_disk["next_part_power"] != \
|
||||
self.states["next_part_power"]:
|
||||
raise ValueError
|
||||
on_disk_part_power = state_from_disk["part_power"]
|
||||
if on_disk_part_power != self.states["part_power"]:
|
||||
self.states["prev_part_power"] = on_disk_part_power
|
||||
raise ValueError
|
||||
self.states["state"].update(state_from_disk["state"])
|
||||
except (ValueError, TypeError, KeyError):
|
||||
# Bad state file: remove the file to restart from scratch
|
||||
os.unlink(state_file)
|
||||
except IOError as err:
|
||||
# Ignore file not found error
|
||||
if err.errno != errno.ENOENT:
|
||||
raise
|
||||
|
||||
def hook_post_device(self, _):
|
||||
os.close(self.dev_lock)
|
||||
self.dev_lock = None
|
||||
|
||||
def partitions_filter(self, datadir_path, partitions):
|
||||
# Remove all non partitions first (eg: auditor_status_ALL.json)
|
||||
partitions = [p for p in partitions if p.isdigit()]
|
||||
|
||||
relinking = (self.part_power != self.next_part_power)
|
||||
if relinking:
|
||||
# All partitions in the upper half are new partitions and there is
|
||||
# nothing to relink there
|
||||
partitions = [part for part in partitions
|
||||
if int(part) < 2 ** self.part_power]
|
||||
elif "prev_part_power" in self.states:
|
||||
# All partitions in the upper half are new partitions and there is
|
||||
# nothing to clean up there
|
||||
partitions = [part for part in partitions
|
||||
if int(part) < 2 ** self.states["prev_part_power"]]
|
||||
|
||||
# Format: { 'part': processed }
|
||||
if self.states["state"]:
|
||||
missing = list(set(partitions) - set(self.states["state"].keys()))
|
||||
if missing:
|
||||
# All missing partitions were created after the first run of
|
||||
# the relinker with this part_power/next_part_power pair. This
|
||||
# is expected when relinking, where new partitions appear that
|
||||
# are appropriate for the target part power. In such cases,
|
||||
# there's nothing to be done. Err on the side of caution
|
||||
# during cleanup, however.
|
||||
for part in missing:
|
||||
self.states["state"][part] = relinking
|
||||
partitions = [
|
||||
str(part) for part, processed in self.states["state"].items()
|
||||
if not processed]
|
||||
else:
|
||||
self.states["state"].update({
|
||||
str(part): False for part in partitions})
|
||||
|
||||
# Always scan the partitions in reverse order to minimize the amount
|
||||
# of IO (it actually only matters for relink, not for cleanup).
|
||||
#
|
||||
# Initial situation:
|
||||
# objects/0/000/00000000...00000000/12345.data
|
||||
# -> relinked to objects/1/000/10000000...00000000/12345.data
|
||||
#
|
||||
# If the relinker then scan partition 1, it will listdir that object
|
||||
# while it's unnecessary. By working in reverse order of partitions,
|
||||
# this is avoided.
|
||||
partitions = sorted(partitions, key=int, reverse=True)
|
||||
|
||||
return partitions
|
||||
|
||||
# Save states when a partition is done
|
||||
def hook_post_partition(self, partition_path):
|
||||
datadir_path, part = os.path.split(os.path.abspath(partition_path))
|
||||
device_path, datadir_name = os.path.split(datadir_path)
|
||||
device = os.path.basename(device_path)
|
||||
state_tmp_file = os.path.join(
|
||||
device_path, STATE_TMP_FILE.format(datadir=datadir_name))
|
||||
state_file = os.path.join(
|
||||
device_path, STATE_FILE.format(datadir=datadir_name))
|
||||
|
||||
# We started with a partition space like
|
||||
# |0 N|
|
||||
# |ABCDEFGHIJKLMNOP|
|
||||
#
|
||||
# After relinking, it will be more like
|
||||
# |0 2N|
|
||||
# |AABBCCDDEEFFGGHHIIJJKKLLMMNNOOPP|
|
||||
#
|
||||
# We want to hold off on rehashing until after cleanup, since that is
|
||||
# the point at which we've finished with filesystem manipulations. But
|
||||
# there's a slight complication: we know the upper half has nothing to
|
||||
# clean up, so the cleanup phase only looks at
|
||||
# |0 2N|
|
||||
# |AABBCCDDEEFFGGHH |
|
||||
#
|
||||
# To ensure that the upper half gets rehashed, too, do it as part of
|
||||
# relinking; as we finish
|
||||
# |0 N|
|
||||
# | IJKLMNOP|
|
||||
# shift to the new partition space and rehash
|
||||
# |0 2N|
|
||||
# | IIJJKKLLMMNNOOPP|
|
||||
partition = int(part)
|
||||
if not self.do_cleanup and partition >= 2 ** (
|
||||
self.states['part_power'] - 1):
|
||||
for new_part in (2 * partition, 2 * partition + 1):
|
||||
self.diskfile_mgr.get_hashes(
|
||||
device, new_part, [], self.policy)
|
||||
elif self.do_cleanup:
|
||||
hashes = self.diskfile_mgr.get_hashes(
|
||||
device, partition, [], self.policy)
|
||||
# In any reasonably-large cluster, we'd expect all old
|
||||
# partitions P to be empty after cleanup (i.e., it's unlikely
|
||||
# that there's another partition Q := P//2 that also has data
|
||||
# on this device).
|
||||
#
|
||||
# Try to clean up empty partitions now, so operators can use
|
||||
# existing rebalance-complete metrics to monitor relinking
|
||||
# progress (provided there are few/no handoffs when relinking
|
||||
# starts and little data is written to handoffs during the
|
||||
# increase).
|
||||
if not hashes:
|
||||
with lock_path(partition_path):
|
||||
# Same lock used by invalidate_hashes, consolidate_hashes,
|
||||
# get_hashes
|
||||
try:
|
||||
for f in ('hashes.pkl', 'hashes.invalid', '.lock'):
|
||||
os.unlink(os.path.join(partition_path, f))
|
||||
except OSError:
|
||||
pass
|
||||
try:
|
||||
os.rmdir(partition_path)
|
||||
except OSError:
|
||||
# Most likely, some data landed in here or we hit an error
|
||||
# above. Let the replicator deal with things; it was worth
|
||||
# a shot.
|
||||
pass
|
||||
|
||||
# Then mark this part as done, in case the process is interrupted and
|
||||
# needs to resume.
|
||||
self.states["state"][part] = True
|
||||
with open(state_tmp_file, 'wt') as f:
|
||||
json.dump(self.states, f)
|
||||
os.fsync(f.fileno())
|
||||
os.rename(state_tmp_file, state_file)
|
||||
num_parts_done = sum(
|
||||
1 for part in self.states["state"].values()
|
||||
if part)
|
||||
self.logger.info("Device: %s Step: %s Partitions: %d/%d" % (
|
||||
device, STEP_CLEANUP if self.do_cleanup else STEP_RELINK,
|
||||
num_parts_done, len(self.states["state"])))
|
||||
|
||||
def hashes_filter(self, suff_path, hashes):
|
||||
hashes = list(hashes)
|
||||
for hsh in hashes:
|
||||
fname = os.path.join(suff_path, hsh)
|
||||
if fname == replace_partition_in_path(
|
||||
self.conf['devices'], fname, self.next_part_power):
|
||||
hashes.remove(hsh)
|
||||
return hashes
|
||||
|
||||
def process_location(self, hash_path, new_hash_path):
|
||||
# Compare the contents of each hash dir with contents of same hash
|
||||
# dir in its new partition to verify that the new location has the
|
||||
@ -352,36 +357,23 @@ class Relinker(object):
|
||||
self.part_power = policy.object_ring.part_power
|
||||
self.next_part_power = policy.object_ring.next_part_power
|
||||
self.diskfile_mgr = self.diskfile_router[policy]
|
||||
datadir = diskfile.get_data_dir(policy)
|
||||
locks = [None]
|
||||
states = {
|
||||
self.datadir = diskfile.get_data_dir(policy)
|
||||
self.states = {
|
||||
"part_power": self.part_power,
|
||||
"next_part_power": self.next_part_power,
|
||||
"state": {},
|
||||
}
|
||||
run_devices_filter = partial(devices_filter, self.device)
|
||||
run_hook_pre_device = partial(hook_pre_device, locks, states,
|
||||
datadir)
|
||||
run_hook_post_device = partial(hook_post_device, locks)
|
||||
run_partition_filter = partial(
|
||||
partitions_filter, states, self.part_power, self.next_part_power)
|
||||
run_hook_post_partition = partial(
|
||||
hook_post_partition, self.logger, states,
|
||||
STEP_CLEANUP if self.do_cleanup else STEP_RELINK,
|
||||
policy, self.diskfile_mgr)
|
||||
run_hashes_filter = partial(hashes_filter, self.conf['devices'],
|
||||
self.next_part_power)
|
||||
|
||||
locations = audit_location_generator(
|
||||
self.conf['devices'],
|
||||
datadir,
|
||||
self.datadir,
|
||||
mount_check=self.conf['mount_check'],
|
||||
devices_filter=run_devices_filter,
|
||||
hook_pre_device=run_hook_pre_device,
|
||||
hook_post_device=run_hook_post_device,
|
||||
partitions_filter=run_partition_filter,
|
||||
hook_post_partition=run_hook_post_partition,
|
||||
hashes_filter=run_hashes_filter,
|
||||
devices_filter=self.devices_filter,
|
||||
hook_pre_device=self.hook_pre_device,
|
||||
hook_post_device=self.hook_post_device,
|
||||
partitions_filter=self.partitions_filter,
|
||||
hook_post_partition=self.hook_post_partition,
|
||||
hashes_filter=self.hashes_filter,
|
||||
logger=self.logger,
|
||||
error_counter=self.stats,
|
||||
yield_hash_dirs=True
|
||||
@ -400,6 +392,7 @@ class Relinker(object):
|
||||
def run(self):
|
||||
self._zero_stats()
|
||||
for policy in self.conf['policies']:
|
||||
self.policy = policy
|
||||
policy.object_ring = None # Ensure it will be reloaded
|
||||
policy.load_ring(self.conf['swift_dir'])
|
||||
ring = policy.object_ring
|
||||
|
@ -1230,28 +1230,31 @@ class TestRelinker(unittest.TestCase):
|
||||
|
||||
def test_devices_filter_filtering(self):
|
||||
# With no filtering, returns all devices
|
||||
devices = relinker.devices_filter(None, "", [self.existing_device])
|
||||
r = relinker.Relinker(
|
||||
{'devices': self.devices}, self.logger, self.existing_device)
|
||||
devices = r.devices_filter("", [self.existing_device])
|
||||
self.assertEqual(set([self.existing_device]), devices)
|
||||
|
||||
# With a matching filter, returns what is matching
|
||||
devices = relinker.devices_filter(self.existing_device, "",
|
||||
[self.existing_device, 'sda2'])
|
||||
devices = r.devices_filter("", [self.existing_device, 'sda2'])
|
||||
self.assertEqual(set([self.existing_device]), devices)
|
||||
|
||||
# With a non matching filter, returns nothing
|
||||
devices = relinker.devices_filter('none', "", [self.existing_device])
|
||||
r.device = 'none'
|
||||
devices = r.devices_filter("", [self.existing_device])
|
||||
self.assertEqual(set(), devices)
|
||||
|
||||
def test_hook_pre_post_device_locking(self):
|
||||
locks = [None]
|
||||
r = relinker.Relinker(
|
||||
{'devices': self.devices}, self.logger, self.existing_device)
|
||||
device_path = os.path.join(self.devices, self.existing_device)
|
||||
datadir = 'object'
|
||||
lock_file = os.path.join(device_path, '.relink.%s.lock' % datadir)
|
||||
r.datadir = 'object' # would get set in process_policy
|
||||
r.states = {"state": {}} # ditto
|
||||
lock_file = os.path.join(device_path, '.relink.%s.lock' % r.datadir)
|
||||
|
||||
# The first run gets the lock
|
||||
states = {"state": {}}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertNotEqual([None], locks)
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertIsNotNone(r.dev_lock)
|
||||
|
||||
# A following run would block
|
||||
with self.assertRaises(IOError) as raised:
|
||||
@ -1260,54 +1263,51 @@ class TestRelinker(unittest.TestCase):
|
||||
self.assertEqual(errno.EAGAIN, raised.exception.errno)
|
||||
|
||||
# Another must not get the lock, so it must return an empty list
|
||||
relinker.hook_post_device(locks, "")
|
||||
self.assertEqual([None], locks)
|
||||
r.hook_post_device("")
|
||||
self.assertIsNone(r.dev_lock)
|
||||
|
||||
with open(lock_file, 'a') as f:
|
||||
fcntl.flock(f.fileno(), fcntl.LOCK_EX | fcntl.LOCK_NB)
|
||||
|
||||
def test_state_file(self):
|
||||
r = relinker.Relinker(
|
||||
{'devices': self.devices}, self.logger, self.existing_device)
|
||||
device_path = os.path.join(self.devices, self.existing_device)
|
||||
datadir = 'objects'
|
||||
datadir_path = os.path.join(device_path, datadir)
|
||||
state_file = os.path.join(device_path, 'relink.%s.json' % datadir)
|
||||
|
||||
def call_partition_filter(part_power, next_part_power, parts):
|
||||
# Partition 312 will be ignored because it must have been created
|
||||
# by the relinker
|
||||
return relinker.partitions_filter(states,
|
||||
part_power, next_part_power,
|
||||
datadir_path, parts)
|
||||
r.datadir = 'objects'
|
||||
r.part_power = PART_POWER
|
||||
r.next_part_power = PART_POWER + 1
|
||||
datadir_path = os.path.join(device_path, r.datadir)
|
||||
state_file = os.path.join(device_path, 'relink.%s.json' % r.datadir)
|
||||
|
||||
# Start relinking
|
||||
states = {"part_power": PART_POWER, "next_part_power": PART_POWER + 1,
|
||||
"state": {}}
|
||||
r.states = {
|
||||
"part_power": PART_POWER,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {},
|
||||
}
|
||||
|
||||
# Load the states: As it starts, it must be empty
|
||||
locks = [None]
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual({}, states["state"])
|
||||
os.close(locks[0]) # Release the lock
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual({}, r.states["state"])
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
# Partition 312 is ignored because it must have been created with the
|
||||
# next_part_power, so it does not need to be relinked
|
||||
# 96 and 227 are reverse ordered
|
||||
# auditor_status_ALL.json is ignored because it's not a partition
|
||||
self.assertEqual(['227', '96'],
|
||||
call_partition_filter(PART_POWER, PART_POWER + 1,
|
||||
['96', '227', '312',
|
||||
'auditor_status.json']))
|
||||
self.assertEqual(states["state"], {'96': False, '227': False})
|
||||
self.assertEqual(['227', '96'], r.partitions_filter(
|
||||
"", ['96', '227', '312', 'auditor_status.json']))
|
||||
self.assertEqual(r.states["state"], {'96': False, '227': False})
|
||||
|
||||
pol = POLICIES[0]
|
||||
mgr = DiskFileRouter({'devices': self.devices,
|
||||
'mount_check': False}, self.logger)[pol]
|
||||
r.policy = POLICIES[0]
|
||||
r.diskfile_mgr = DiskFileRouter({
|
||||
'devices': self.devices,
|
||||
'mount_check': False,
|
||||
}, self.logger)[r.policy]
|
||||
|
||||
# Ack partition 96
|
||||
relinker.hook_post_partition(self.logger, states,
|
||||
relinker.STEP_RELINK, pol, mgr,
|
||||
os.path.join(datadir_path, '96'))
|
||||
self.assertEqual(states["state"], {'96': True, '227': False})
|
||||
r.hook_post_partition(os.path.join(datadir_path, '96'))
|
||||
self.assertEqual(r.states["state"], {'96': True, '227': False})
|
||||
self.assertIn("Device: sda1 Step: relink Partitions: 1/2",
|
||||
self.logger.get_lines_for_level("info"))
|
||||
with open(state_file, 'rt') as f:
|
||||
@ -1318,17 +1318,14 @@ class TestRelinker(unittest.TestCase):
|
||||
|
||||
# Restart relinking after only part 96 was done
|
||||
self.assertEqual(['227'],
|
||||
call_partition_filter(PART_POWER, PART_POWER + 1,
|
||||
['96', '227', '312']))
|
||||
self.assertEqual(states["state"], {'96': True, '227': False})
|
||||
r.partitions_filter("", ['96', '227', '312']))
|
||||
self.assertEqual(r.states["state"], {'96': True, '227': False})
|
||||
|
||||
# Ack partition 227
|
||||
relinker.hook_post_partition(
|
||||
self.logger, states, relinker.STEP_RELINK, pol,
|
||||
mgr, os.path.join(datadir_path, '227'))
|
||||
r.hook_post_partition(os.path.join(datadir_path, '227'))
|
||||
self.assertIn("Device: sda1 Step: relink Partitions: 2/2",
|
||||
self.logger.get_lines_for_level("info"))
|
||||
self.assertEqual(states["state"], {'96': True, '227': True})
|
||||
self.assertEqual(r.states["state"], {'96': True, '227': True})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {
|
||||
"part_power": PART_POWER,
|
||||
@ -1336,44 +1333,42 @@ class TestRelinker(unittest.TestCase):
|
||||
"state": {'96': True, '227': True}})
|
||||
|
||||
# If the process restarts, it reload the state
|
||||
locks = [None]
|
||||
states = {
|
||||
r.states = {
|
||||
"part_power": PART_POWER,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {},
|
||||
}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states, {
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual(r.states, {
|
||||
"part_power": PART_POWER,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {'96': True, '227': True}})
|
||||
os.close(locks[0]) # Release the lock
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
# Start cleanup -- note that part_power and next_part_power now match!
|
||||
states = {
|
||||
r.do_cleanup = True
|
||||
r.part_power = PART_POWER + 1
|
||||
r.states = {
|
||||
"part_power": PART_POWER + 1,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {},
|
||||
}
|
||||
# ...which means our state file was ignored
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states, {
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual(r.states, {
|
||||
"prev_part_power": PART_POWER,
|
||||
"part_power": PART_POWER + 1,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {}})
|
||||
os.close(locks[0]) # Release the lock
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
self.assertEqual(['227', '96'],
|
||||
call_partition_filter(PART_POWER + 1, PART_POWER + 1,
|
||||
['96', '227', '312']))
|
||||
r.partitions_filter("", ['96', '227', '312']))
|
||||
# Ack partition 227
|
||||
relinker.hook_post_partition(
|
||||
self.logger, states, relinker.STEP_CLEANUP, pol, mgr,
|
||||
os.path.join(datadir_path, '227'))
|
||||
r.hook_post_partition(os.path.join(datadir_path, '227'))
|
||||
self.assertIn("Device: sda1 Step: cleanup Partitions: 1/2",
|
||||
self.logger.get_lines_for_level("info"))
|
||||
self.assertEqual(states["state"],
|
||||
self.assertEqual(r.states["state"],
|
||||
{'96': False, '227': True})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {
|
||||
@ -1383,19 +1378,15 @@ class TestRelinker(unittest.TestCase):
|
||||
"state": {'96': False, '227': True}})
|
||||
|
||||
# Restart cleanup after only part 227 was done
|
||||
self.assertEqual(['96'],
|
||||
call_partition_filter(PART_POWER + 1, PART_POWER + 1,
|
||||
['96', '227', '312']))
|
||||
self.assertEqual(states["state"],
|
||||
self.assertEqual(['96'], r.partitions_filter("", ['96', '227', '312']))
|
||||
self.assertEqual(r.states["state"],
|
||||
{'96': False, '227': True})
|
||||
|
||||
# Ack partition 96
|
||||
relinker.hook_post_partition(self.logger, states,
|
||||
relinker.STEP_CLEANUP, pol, mgr,
|
||||
os.path.join(datadir_path, '96'))
|
||||
r.hook_post_partition(os.path.join(datadir_path, '96'))
|
||||
self.assertIn("Device: sda1 Step: cleanup Partitions: 2/2",
|
||||
self.logger.get_lines_for_level("info"))
|
||||
self.assertEqual(states["state"],
|
||||
self.assertEqual(r.states["state"],
|
||||
{'96': True, '227': True})
|
||||
with open(state_file, 'rt') as f:
|
||||
self.assertEqual(json.load(f), {
|
||||
@ -1405,40 +1396,40 @@ class TestRelinker(unittest.TestCase):
|
||||
"state": {'96': True, '227': True}})
|
||||
|
||||
# At the end, the state is still accurate
|
||||
locks = [None]
|
||||
states = {
|
||||
r.states = {
|
||||
"prev_part_power": PART_POWER,
|
||||
"part_power": PART_POWER + 1,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {},
|
||||
}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states["state"],
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual(r.states["state"],
|
||||
{'96': True, '227': True})
|
||||
os.close(locks[0]) # Release the lock
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
# If the part_power/next_part_power tuple differs, restart from scratch
|
||||
locks = [None]
|
||||
states = {
|
||||
r.states = {
|
||||
"part_power": PART_POWER + 1,
|
||||
"next_part_power": PART_POWER + 2,
|
||||
"state": {},
|
||||
}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states["state"], {})
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual(r.states["state"], {})
|
||||
self.assertFalse(os.path.exists(state_file))
|
||||
os.close(locks[0]) # Release the lock
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
# If the file gets corrupted, restart from scratch
|
||||
with open(state_file, 'wt') as f:
|
||||
f.write('NOT JSON')
|
||||
locks = [None]
|
||||
states = {"part_power": PART_POWER, "next_part_power": PART_POWER + 1,
|
||||
"state": {}}
|
||||
relinker.hook_pre_device(locks, states, datadir, device_path)
|
||||
self.assertEqual(states["state"], {})
|
||||
r.states = {
|
||||
"part_power": PART_POWER,
|
||||
"next_part_power": PART_POWER + 1,
|
||||
"state": {},
|
||||
}
|
||||
r.hook_pre_device(device_path)
|
||||
self.assertEqual(r.states["state"], {})
|
||||
self.assertFalse(os.path.exists(state_file))
|
||||
os.close(locks[0]) # Release the lock
|
||||
os.close(r.dev_lock) # Release the lock
|
||||
|
||||
def test_cleanup_relinked_ok(self):
|
||||
self._common_test_cleanup()
|
||||
|
Loading…
x
Reference in New Issue
Block a user