Merge "Add multiple worker processes strategy to reconstructor"
This commit is contained in:
commit
2105ad2a7f
@ -811,8 +811,18 @@ daemonize yes Whether or not to run
|
||||
reconstruction as a daemon
|
||||
interval 30 Time in seconds to wait between
|
||||
reconstruction passes
|
||||
reconstructor_workers 0 Maximum number of worker processes
|
||||
to spawn. Each worker will handle
|
||||
a subset of devices. Devices will
|
||||
be assigned evenly among the workers
|
||||
so that workers cycle at similar
|
||||
intervals (which can lead to fewer
|
||||
workers than requested). You can not
|
||||
have more workers than devices. If
|
||||
you have no devices only a single
|
||||
worker is spawned.
|
||||
concurrency 1 Number of reconstruction threads to
|
||||
spawn.
|
||||
spawn per reconstructor process.
|
||||
stats_interval 300 Interval in seconds between
|
||||
logging reconstruction statistics
|
||||
handoffs_only false The handoffs_only mode option is for
|
||||
|
@ -314,6 +314,13 @@ use = egg:swift#recon
|
||||
# run_pause is deprecated, use interval instead
|
||||
# run_pause = 30
|
||||
#
|
||||
# Maximum number of worker processes to spawn. Each worker will handle a
|
||||
# subset of devices. Devices will be assigned evenly among the workers so that
|
||||
# workers cycle at similar intervals (which can lead to fewer workers than
|
||||
# requested). You can not have more workers than devices. If you have no
|
||||
# devices only a single worker is spawned.
|
||||
# reconstructor_workers = 0
|
||||
#
|
||||
# concurrency = 1
|
||||
# stats_interval = 300
|
||||
# node_timeout = 10
|
||||
|
@ -13,6 +13,7 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
import errno
|
||||
import os
|
||||
import sys
|
||||
import time
|
||||
@ -25,7 +26,24 @@ from swift.common import utils
|
||||
|
||||
|
||||
class Daemon(object):
|
||||
"""Daemon base class"""
|
||||
"""
|
||||
Daemon base class
|
||||
|
||||
A daemon has a run method that accepts a ``once`` kwarg and will dispatch
|
||||
to :meth:`run_once` or :meth:`run_forever`.
|
||||
|
||||
A subclass of Daemon must implement :meth:`run_once` and
|
||||
:meth:`run_forever`.
|
||||
|
||||
A subclass of Daemon may override :meth:`get_worker_args` to dispatch
|
||||
arguments to individual child process workers and :meth:`is_healthy` to
|
||||
perform context specific periodic wellness checks which can reset worker
|
||||
arguments.
|
||||
|
||||
Implementations of Daemon do not know *how* to daemonize, or execute
|
||||
multiple daemonized workers, they simply provide the behavior of the daemon
|
||||
and context specific knowledge about how workers should be started.
|
||||
"""
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
@ -40,35 +58,200 @@ class Daemon(object):
|
||||
raise NotImplementedError('run_forever not implemented')
|
||||
|
||||
def run(self, once=False, **kwargs):
|
||||
"""Run the daemon"""
|
||||
if once:
|
||||
self.run_once(**kwargs)
|
||||
else:
|
||||
self.run_forever(**kwargs)
|
||||
|
||||
def get_worker_args(self, once=False, **kwargs):
|
||||
"""
|
||||
For each worker yield a (possibly empty) dict of kwargs to pass along
|
||||
to the daemon's :meth:`run` method after fork. The length of elements
|
||||
returned from this method will determine the number of processes
|
||||
created.
|
||||
|
||||
If the returned iterable is empty, the Strategy will fallback to
|
||||
run-inline strategy.
|
||||
|
||||
:param once: False if the worker(s) will be daemonized, True if the
|
||||
worker(s) will be run once
|
||||
:param kwargs: plumbed through via command line argparser
|
||||
|
||||
:returns: an iterable of dicts, each element represents the kwargs to
|
||||
be passed to a single worker's :meth:`run` method after fork.
|
||||
"""
|
||||
return []
|
||||
|
||||
def is_healthy(self):
|
||||
"""
|
||||
This method is called very frequently on the instance of the daemon
|
||||
held by the parent process. If it returns False, all child workers are
|
||||
terminated, and new workers will be created.
|
||||
|
||||
:returns: a boolean, True only if all workers should continue to run
|
||||
"""
|
||||
return True
|
||||
|
||||
|
||||
class DaemonStrategy(object):
|
||||
"""
|
||||
This is the execution strategy for using subclasses of Daemon. The default
|
||||
behavior is to invoke the daemon's :meth:`Daemon.run` method from within
|
||||
the parent process. When the :meth:`Daemon.run` method returns the parent
|
||||
process will exit.
|
||||
|
||||
However, if the Daemon returns a non-empty iterable from
|
||||
:meth:`Daemon.get_worker_args`, the daemon's :meth:`Daemon.run` method will
|
||||
be invoked in child processes, with the arguments provided from the parent
|
||||
process's instance of the daemon. If a child process exits it will be
|
||||
restarted with the same options, unless it was executed in once mode.
|
||||
|
||||
:param daemon: an instance of a :class:`Daemon` (has a `run` method)
|
||||
:param logger: a logger instance
|
||||
"""
|
||||
|
||||
def __init__(self, daemon, logger):
|
||||
self.daemon = daemon
|
||||
self.logger = logger
|
||||
self.running = False
|
||||
# only used by multi-worker strategy
|
||||
self.options_by_pid = {}
|
||||
self.unspawned_worker_options = []
|
||||
|
||||
def setup(self, **kwargs):
|
||||
utils.validate_configuration()
|
||||
utils.drop_privileges(self.conf.get('user', 'swift'))
|
||||
utils.drop_privileges(self.daemon.conf.get('user', 'swift'))
|
||||
utils.capture_stdio(self.logger, **kwargs)
|
||||
|
||||
def kill_children(*args):
|
||||
self.running = False
|
||||
self.logger.info('SIGTERM received')
|
||||
signal.signal(signal.SIGTERM, signal.SIG_IGN)
|
||||
os.killpg(0, signal.SIGTERM)
|
||||
os._exit(0)
|
||||
|
||||
signal.signal(signal.SIGTERM, kill_children)
|
||||
if once:
|
||||
self.run_once(**kwargs)
|
||||
self.running = True
|
||||
|
||||
def _run_inline(self, once=False, **kwargs):
|
||||
"""Run the daemon"""
|
||||
self.daemon.run(once=once, **kwargs)
|
||||
|
||||
def run(self, once=False, **kwargs):
|
||||
"""Daemonize and execute our strategy"""
|
||||
self.setup(**kwargs)
|
||||
try:
|
||||
self._run(once=once, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
self.logger.notice('User quit')
|
||||
finally:
|
||||
self.cleanup()
|
||||
self.running = False
|
||||
|
||||
def _fork(self, once, **kwargs):
|
||||
pid = os.fork()
|
||||
if pid == 0:
|
||||
signal.signal(signal.SIGHUP, signal.SIG_DFL)
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
|
||||
self.daemon.run(once, **kwargs)
|
||||
|
||||
self.logger.debug('Forked worker %s finished', os.getpid())
|
||||
# do not return from this stack, nor execute any finally blocks
|
||||
os._exit(0)
|
||||
else:
|
||||
self.run_forever(**kwargs)
|
||||
self.register_worker_start(pid, kwargs)
|
||||
return pid
|
||||
|
||||
def iter_unspawned_workers(self):
|
||||
while True:
|
||||
try:
|
||||
per_worker_options = self.unspawned_worker_options.pop()
|
||||
except IndexError:
|
||||
return
|
||||
yield per_worker_options
|
||||
|
||||
def spawned_pids(self):
|
||||
return self.options_by_pid.keys()
|
||||
|
||||
def register_worker_start(self, pid, per_worker_options):
|
||||
self.logger.debug('Spawned worker %s with %r', pid, per_worker_options)
|
||||
self.options_by_pid[pid] = per_worker_options
|
||||
|
||||
def register_worker_exit(self, pid):
|
||||
self.unspawned_worker_options.append(self.options_by_pid.pop(pid))
|
||||
|
||||
def ask_daemon_to_prepare_workers(self, once, **kwargs):
|
||||
self.unspawned_worker_options = list(
|
||||
self.daemon.get_worker_args(once=once, **kwargs))
|
||||
|
||||
def abort_workers_if_daemon_would_like(self):
|
||||
if not self.daemon.is_healthy():
|
||||
self.logger.debug(
|
||||
'Daemon needs to change options, aborting workers')
|
||||
self.cleanup()
|
||||
return True
|
||||
return False
|
||||
|
||||
def check_on_all_running_workers(self):
|
||||
for p in self.spawned_pids():
|
||||
try:
|
||||
pid, status = os.waitpid(p, os.WNOHANG)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
self.logger.notice('Worker %s died', p)
|
||||
else:
|
||||
if pid == 0:
|
||||
# child still running
|
||||
continue
|
||||
self.logger.debug('Worker %s exited', p)
|
||||
self.register_worker_exit(p)
|
||||
|
||||
def _run(self, once, **kwargs):
|
||||
self.ask_daemon_to_prepare_workers(once, **kwargs)
|
||||
if not self.unspawned_worker_options:
|
||||
return self._run_inline(once, **kwargs)
|
||||
for per_worker_options in self.iter_unspawned_workers():
|
||||
if self._fork(once, **per_worker_options) == 0:
|
||||
return 0
|
||||
while self.running:
|
||||
if self.abort_workers_if_daemon_would_like():
|
||||
self.ask_daemon_to_prepare_workers(once, **kwargs)
|
||||
self.check_on_all_running_workers()
|
||||
if not once:
|
||||
for per_worker_options in self.iter_unspawned_workers():
|
||||
if self._fork(once, **per_worker_options) == 0:
|
||||
return 0
|
||||
else:
|
||||
if not self.spawned_pids():
|
||||
self.logger.notice('Finished %s', os.getpid())
|
||||
break
|
||||
time.sleep(0.1)
|
||||
return 0
|
||||
|
||||
def cleanup(self):
|
||||
for p in self.spawned_pids():
|
||||
try:
|
||||
os.kill(p, signal.SIGTERM)
|
||||
except OSError as err:
|
||||
if err.errno not in (errno.ESRCH, errno.EINTR, errno.ECHILD):
|
||||
raise
|
||||
self.register_worker_exit(p)
|
||||
self.logger.debug('Cleaned up worker %s', p)
|
||||
|
||||
|
||||
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
|
||||
"""
|
||||
Loads settings from conf, then instantiates daemon "klass" and runs the
|
||||
daemon with the specified once kwarg. The section_name will be derived
|
||||
from the daemon "klass" if not provided (e.g. ObjectReplicator =>
|
||||
Loads settings from conf, then instantiates daemon ``klass`` and runs the
|
||||
daemon with the specified ``once`` kwarg. The section_name will be derived
|
||||
from the daemon ``klass`` if not provided (e.g. ObjectReplicator =>
|
||||
object-replicator).
|
||||
|
||||
:param klass: Class to instantiate, subclass of common.daemon.Daemon
|
||||
:param klass: Class to instantiate, subclass of :class:`Daemon`
|
||||
:param conf_file: Path to configuration file
|
||||
:param section_name: Section name from conf file to load config from
|
||||
:param once: Passed to daemon run method
|
||||
:param once: Passed to daemon :meth:`Daemon.run` method
|
||||
"""
|
||||
# very often the config section_name is based on the class name
|
||||
# the None singleton will be passed through to readconf as is
|
||||
@ -113,8 +296,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
|
||||
os.environ['TZ'] = 'UTC+0'
|
||||
time.tzset()
|
||||
|
||||
logger.notice('Starting %s', os.getpid())
|
||||
try:
|
||||
klass(conf).run(once=once, **kwargs)
|
||||
DaemonStrategy(klass(conf), logger).run(once=once, **kwargs)
|
||||
except KeyboardInterrupt:
|
||||
logger.info('User quit')
|
||||
logger.info('Exited')
|
||||
logger.notice('Exited %s', os.getpid())
|
||||
|
@ -3143,7 +3143,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2,
|
||||
try:
|
||||
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
|
||||
delete=False) as tf:
|
||||
tf.write(json.dumps(cache_entry) + '\n')
|
||||
tf.write(json.dumps(cache_entry, sort_keys=True) + '\n')
|
||||
if set_owner:
|
||||
os.chown(tf.name, pwd.getpwnam(set_owner).pw_uid, -1)
|
||||
renamer(tf.name, cache_file, fsync=False)
|
||||
|
@ -15,6 +15,7 @@
|
||||
|
||||
import json
|
||||
import errno
|
||||
import math
|
||||
import os
|
||||
from os.path import join
|
||||
import random
|
||||
@ -93,6 +94,30 @@ def _full_path(node, part, relative_path, policy):
|
||||
}
|
||||
|
||||
|
||||
def parse_override_options(**kwargs):
|
||||
"""
|
||||
Return a dict with keys `override_devices` and `override_partitions` whose
|
||||
values have been parsed from `kwargs`. If either key is found in `kwargs`
|
||||
then copy its value from kwargs. Otherwise, if `once` is set in `kwargs`
|
||||
then parse `devices` and `partitions` keys for the value of
|
||||
`override_devices` and `override_partitions` respectively.
|
||||
|
||||
:return: a dict with keys `override_devices` and `override_partitions`
|
||||
"""
|
||||
if kwargs.get('once', False):
|
||||
devices = list_from_csv(kwargs.get('devices'))
|
||||
partitions = [
|
||||
int(p) for p in list_from_csv(kwargs.get('partitions'))]
|
||||
else:
|
||||
devices = []
|
||||
partitions = []
|
||||
|
||||
return {
|
||||
'override_devices': kwargs.get('override_devices', devices),
|
||||
'override_partitions': kwargs.get('override_partitions', partitions),
|
||||
}
|
||||
|
||||
|
||||
class RebuildingECDiskFileStream(object):
|
||||
"""
|
||||
This class wraps the reconstructed fragment archive data and
|
||||
@ -155,6 +180,12 @@ class ObjectReconstructor(Daemon):
|
||||
self.port = None if self.servers_per_port else \
|
||||
int(conf.get('bind_port', 6200))
|
||||
self.concurrency = int(conf.get('concurrency', 1))
|
||||
# N.B. to maintain compatibility with legacy configs this option can
|
||||
# not be named 'workers' because the object-server uses that option
|
||||
# name in the DEFAULT section
|
||||
self.reconstructor_workers = int(conf.get('reconstructor_workers', 0))
|
||||
self.policies = [policy for policy in POLICIES
|
||||
if policy.policy_type == EC_POLICY]
|
||||
self.stats_interval = int(conf.get('stats_interval', '300'))
|
||||
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
|
||||
self.next_check = time.time() + self.ring_check_interval
|
||||
@ -166,6 +197,7 @@ class ObjectReconstructor(Daemon):
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
|
||||
self._next_rcache_update = time.time() + self.stats_interval
|
||||
# defaults subject to change after beta
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
@ -193,6 +225,102 @@ class ObjectReconstructor(Daemon):
|
||||
self.logger.warning('Ignored handoffs_first option in favor '
|
||||
'of handoffs_only.')
|
||||
self._df_router = DiskFileRouter(conf, self.logger)
|
||||
self.all_local_devices = self.get_local_devices()
|
||||
|
||||
def get_worker_args(self, once=False, **kwargs):
|
||||
"""
|
||||
Take the set of all local devices for this node from all the EC
|
||||
policies rings, and distribute them evenly into the number of workers
|
||||
to be spawned according to the configured worker count. If `devices` is
|
||||
given in `kwargs` then distribute only those devices.
|
||||
|
||||
:param once: False if the worker(s) will be daemonized, True if the
|
||||
worker(s) will be run once
|
||||
:param kwargs: optional overrides from the command line
|
||||
"""
|
||||
if self.reconstructor_workers < 1:
|
||||
return
|
||||
override_options = parse_override_options(once=once, **kwargs)
|
||||
|
||||
# Note that this get re-used when dumping stats and in is_healthy
|
||||
self.all_local_devices = self.get_local_devices()
|
||||
|
||||
if override_options['override_devices']:
|
||||
devices = [d for d in override_options['override_devices']
|
||||
if d in self.all_local_devices]
|
||||
else:
|
||||
devices = list(self.all_local_devices)
|
||||
if not devices:
|
||||
# we only need a single worker to do nothing until a ring change
|
||||
yield dict(override_options)
|
||||
return
|
||||
# for somewhat uniform load per worker use same max_devices_per_worker
|
||||
# when handling all devices or just override devices...
|
||||
max_devices_per_worker = int(math.ceil(
|
||||
1.0 * len(self.all_local_devices) / self.reconstructor_workers))
|
||||
# ...but only use enough workers for the actual devices being handled
|
||||
n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker))
|
||||
override_devices_per_worker = [devices[i::n] for i in range(n)]
|
||||
for override_devices in override_devices_per_worker:
|
||||
yield dict(override_options, override_devices=override_devices)
|
||||
|
||||
def is_healthy(self):
|
||||
"""
|
||||
Check whether rings have changed, and maybe do a recon update.
|
||||
|
||||
:returns: False if any ec ring has changed
|
||||
"""
|
||||
now = time.time()
|
||||
if now > self._next_rcache_update:
|
||||
self._next_rcache_update = now + self.stats_interval
|
||||
self.aggregate_recon_update()
|
||||
return self.get_local_devices() == self.all_local_devices
|
||||
|
||||
def aggregate_recon_update(self):
|
||||
"""
|
||||
Aggregate per-disk rcache updates from child workers.
|
||||
"""
|
||||
try:
|
||||
with open(self.rcache) as f:
|
||||
existing_data = json.load(f)
|
||||
except IOError as e:
|
||||
if e.errno != errno.ENOENT:
|
||||
raise
|
||||
# dump_recon_cache will create new file and dirs
|
||||
existing_data = {}
|
||||
first_start = time.time()
|
||||
last_finish = 0
|
||||
all_devices_reporting = True
|
||||
for device in self.all_local_devices:
|
||||
per_disk_stats = existing_data.get(
|
||||
'object_reconstruction_per_disk', {}).get(device, {})
|
||||
try:
|
||||
start_time = per_disk_stats['object_reconstruction_last'] - \
|
||||
(per_disk_stats['object_reconstruction_time'] * 60)
|
||||
finish_time = per_disk_stats['object_reconstruction_last']
|
||||
except KeyError:
|
||||
all_devices_reporting = False
|
||||
break
|
||||
first_start = min(first_start, start_time)
|
||||
last_finish = max(last_finish, finish_time)
|
||||
if all_devices_reporting and last_finish > 0:
|
||||
duration = last_finish - first_start
|
||||
recon_update = {
|
||||
'object_reconstruction_time': duration / 60.0,
|
||||
'object_reconstruction_last': last_finish
|
||||
}
|
||||
else:
|
||||
# if any current devices have not yet dropped stats, or the rcache
|
||||
# file does not yet exist, we may still clear out per device stats
|
||||
# for any devices that have been removed from local devices
|
||||
recon_update = {}
|
||||
found_devices = set(existing_data.get(
|
||||
'object_reconstruction_per_disk', {}).keys())
|
||||
clear_update = {d: {} for d in found_devices
|
||||
if d not in self.all_local_devices}
|
||||
if clear_update:
|
||||
recon_update['object_reconstruction_per_disk'] = clear_update
|
||||
dump_recon_cache(recon_update, self.rcache, self.logger)
|
||||
|
||||
def load_object_ring(self, policy):
|
||||
"""
|
||||
@ -888,38 +1016,37 @@ class ObjectReconstructor(Daemon):
|
||||
# return a list of jobs for this part
|
||||
return jobs
|
||||
|
||||
def collect_parts(self, override_devices=None,
|
||||
override_partitions=None):
|
||||
"""
|
||||
Helper for getting partitions in the top level reconstructor
|
||||
|
||||
In handoffs_only mode no primary partitions will not be included in the
|
||||
returned (possibly empty) list.
|
||||
"""
|
||||
override_devices = override_devices or []
|
||||
override_partitions = override_partitions or []
|
||||
def get_policy2devices(self):
|
||||
ips = whataremyips(self.bind_ip)
|
||||
ec_policies = (policy for policy in POLICIES
|
||||
if policy.policy_type == EC_POLICY)
|
||||
|
||||
policy2devices = {}
|
||||
|
||||
for policy in ec_policies:
|
||||
for policy in self.policies:
|
||||
self.load_object_ring(policy)
|
||||
local_devices = list(six.moves.filter(
|
||||
lambda dev: dev and is_local_device(
|
||||
ips, self.port,
|
||||
dev['replication_ip'], dev['replication_port']),
|
||||
policy.object_ring.devs))
|
||||
|
||||
if override_devices:
|
||||
local_devices = list(six.moves.filter(
|
||||
lambda dev_info: dev_info['device'] in override_devices,
|
||||
local_devices))
|
||||
|
||||
policy2devices[policy] = local_devices
|
||||
self.device_count += len(local_devices)
|
||||
return policy2devices
|
||||
|
||||
def get_local_devices(self):
|
||||
"""Returns a set of all local devices in all EC policies."""
|
||||
policy2devices = self.get_policy2devices()
|
||||
return reduce(set.union, (
|
||||
set(d['device'] for d in devices)
|
||||
for devices in policy2devices.values()))
|
||||
|
||||
def collect_parts(self, override_devices=None, override_partitions=None):
|
||||
"""
|
||||
Helper for getting partitions in the top level reconstructor
|
||||
|
||||
In handoffs_only mode primary partitions will not be included in the
|
||||
returned (possibly empty) list.
|
||||
"""
|
||||
override_devices = override_devices or []
|
||||
override_partitions = override_partitions or []
|
||||
|
||||
policy2devices = self.get_policy2devices()
|
||||
all_parts = []
|
||||
|
||||
for policy, local_devices in policy2devices.items():
|
||||
@ -938,6 +1065,10 @@ class ObjectReconstructor(Daemon):
|
||||
|
||||
df_mgr = self._df_router[policy]
|
||||
for local_dev in local_devices:
|
||||
if override_devices and (
|
||||
local_dev['device'] not in override_devices):
|
||||
continue
|
||||
self.device_count += 1
|
||||
dev_path = df_mgr.get_dev_path(local_dev['device'])
|
||||
if not dev_path:
|
||||
self.logger.warning(_('%s is not mounted'),
|
||||
@ -1088,22 +1219,48 @@ class ObjectReconstructor(Daemon):
|
||||
"You should disable handoffs_only once all nodes "
|
||||
"are reporting no handoffs remaining."))
|
||||
|
||||
def final_recon_dump(self, total, override_devices=None, **kwargs):
|
||||
"""
|
||||
Add stats for this worker's run to recon cache.
|
||||
|
||||
When in worker mode (per_disk_stats == True) this worker's stats are
|
||||
added per device instead of in the top level keys (aggregation is
|
||||
serialized in the parent process).
|
||||
|
||||
:param total: the runtime of cycle in minutes
|
||||
:param override_devices: (optional) list of device that are being
|
||||
reconstructed
|
||||
"""
|
||||
recon_update = {
|
||||
'object_reconstruction_time': total,
|
||||
'object_reconstruction_last': time.time(),
|
||||
}
|
||||
|
||||
if self.reconstructor_workers > 0:
|
||||
devices = override_devices or self.all_local_devices
|
||||
recon_update['pid'] = os.getpid()
|
||||
recon_update = {'object_reconstruction_per_disk': {
|
||||
d: recon_update for d in devices}}
|
||||
else:
|
||||
# if not running in worker mode, kill any per_disk stats
|
||||
recon_update['object_reconstruction_per_disk'] = {}
|
||||
dump_recon_cache(recon_update, self.rcache, self.logger)
|
||||
|
||||
def run_once(self, *args, **kwargs):
|
||||
start = time.time()
|
||||
self.logger.info(_("Running object reconstructor in script mode."))
|
||||
override_devices = list_from_csv(kwargs.get('devices'))
|
||||
override_partitions = [int(p) for p in
|
||||
list_from_csv(kwargs.get('partitions'))]
|
||||
self.reconstruct(
|
||||
override_devices=override_devices,
|
||||
override_partitions=override_partitions)
|
||||
override_options = parse_override_options(once=True, **kwargs)
|
||||
self.reconstruct(**override_options)
|
||||
total = (time.time() - start) / 60
|
||||
self.logger.info(
|
||||
_("Object reconstruction complete (once). (%.02f minutes)"), total)
|
||||
if not (override_partitions or override_devices):
|
||||
dump_recon_cache({'object_reconstruction_time': total,
|
||||
'object_reconstruction_last': time.time()},
|
||||
self.rcache, self.logger)
|
||||
# Only dump stats if they would actually be meaningful -- i.e. we're
|
||||
# collecting per-disk stats and covering all partitions, or we're
|
||||
# covering all partitions, all disks.
|
||||
if not override_options['override_partitions'] and (
|
||||
self.reconstructor_workers > 0 or
|
||||
not override_options['override_devices']):
|
||||
self.final_recon_dump(total, **override_options)
|
||||
|
||||
def run_forever(self, *args, **kwargs):
|
||||
self.logger.info(_("Starting object reconstructor in daemon mode."))
|
||||
@ -1111,14 +1268,13 @@ class ObjectReconstructor(Daemon):
|
||||
while True:
|
||||
start = time.time()
|
||||
self.logger.info(_("Starting object reconstruction pass."))
|
||||
override_options = parse_override_options(**kwargs)
|
||||
# Run the reconstructor
|
||||
self.reconstruct()
|
||||
self.reconstruct(**override_options)
|
||||
total = (time.time() - start) / 60
|
||||
self.logger.info(
|
||||
_("Object reconstruction complete. (%.02f minutes)"), total)
|
||||
dump_recon_cache({'object_reconstruction_time': total,
|
||||
'object_reconstruction_last': time.time()},
|
||||
self.rcache, self.logger)
|
||||
self.final_recon_dump(total, **override_options)
|
||||
self.logger.debug('reconstruction sleeping for %s seconds.',
|
||||
self.interval)
|
||||
sleep(self.interval)
|
||||
|
@ -30,6 +30,7 @@ from numbers import Number
|
||||
from tempfile import NamedTemporaryFile
|
||||
import time
|
||||
import eventlet
|
||||
from eventlet import greenpool, debug as eventlet_debug
|
||||
from eventlet.green import socket
|
||||
from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
@ -218,6 +219,14 @@ class FakeRing(Ring):
|
||||
self.set_replicas(replicas)
|
||||
self._reload()
|
||||
|
||||
def has_changed(self):
|
||||
"""
|
||||
The real implementation uses getmtime on the serialized_path attribute,
|
||||
which doesn't exist on our fake and relies on the implementation of
|
||||
_reload which we override. So ... just NOOPE.
|
||||
"""
|
||||
return False
|
||||
|
||||
def _reload(self):
|
||||
self._rtime = time.time()
|
||||
|
||||
@ -690,6 +699,16 @@ if utils.config_true_value(
|
||||
fake_syslog_handler()
|
||||
|
||||
|
||||
@contextmanager
|
||||
def quiet_eventlet_exceptions():
|
||||
orig_state = greenpool.DEBUG
|
||||
eventlet_debug.hub_exceptions(False)
|
||||
try:
|
||||
yield
|
||||
finally:
|
||||
eventlet_debug.hub_exceptions(orig_state)
|
||||
|
||||
|
||||
class MockTrue(object):
|
||||
"""
|
||||
Instances of MockTrue evaluate like True
|
||||
|
@ -13,8 +13,6 @@
|
||||
# See the License for the specific language governing permissions and
|
||||
# limitations under the License.
|
||||
|
||||
# TODO(clayg): Test kill_children signal handlers
|
||||
|
||||
import os
|
||||
from six import StringIO
|
||||
from six.moves import reload_module
|
||||
@ -25,15 +23,20 @@ import logging
|
||||
from test.unit import tmpfile
|
||||
import mock
|
||||
import signal
|
||||
from contextlib import contextmanager
|
||||
import itertools
|
||||
from collections import defaultdict
|
||||
import errno
|
||||
|
||||
from swift.common import daemon, utils
|
||||
from test.unit import debug_logger
|
||||
|
||||
|
||||
class MyDaemon(daemon.Daemon):
|
||||
|
||||
def __init__(self, conf):
|
||||
self.conf = conf
|
||||
self.logger = utils.get_logger(None, 'server', log_route='server')
|
||||
self.logger = debug_logger('my-daemon')
|
||||
MyDaemon.forever_called = False
|
||||
MyDaemon.once_called = False
|
||||
|
||||
@ -63,6 +66,39 @@ class TestDaemon(unittest.TestCase):
|
||||
self.assertRaises(NotImplementedError, d.run_forever)
|
||||
|
||||
|
||||
class MyWorkerDaemon(MyDaemon):
|
||||
|
||||
def get_worker_args(self, once=False, **kwargs):
|
||||
return [kwargs for i in range(int(self.conf.get('workers', 0)))]
|
||||
|
||||
def is_healthy(self):
|
||||
try:
|
||||
return getattr(self, 'health_side_effects', []).pop(0)
|
||||
except IndexError:
|
||||
return True
|
||||
|
||||
|
||||
class TestWorkerDaemon(unittest.TestCase):
|
||||
|
||||
def test_stubs(self):
|
||||
d = daemon.Daemon({})
|
||||
self.assertRaises(NotImplementedError, d.run_once)
|
||||
self.assertRaises(NotImplementedError, d.run_forever)
|
||||
self.assertEqual([], d.get_worker_args())
|
||||
self.assertEqual(True, d.is_healthy())
|
||||
|
||||
def test_my_worker_daemon(self):
|
||||
d = MyWorkerDaemon({})
|
||||
self.assertEqual([], d.get_worker_args())
|
||||
self.assertTrue(d.is_healthy())
|
||||
d = MyWorkerDaemon({'workers': '3'})
|
||||
self.assertEqual([{'key': 'val'}] * 3, d.get_worker_args(key='val'))
|
||||
d.health_side_effects = [True, False]
|
||||
self.assertTrue(d.is_healthy())
|
||||
self.assertFalse(d.is_healthy())
|
||||
self.assertTrue(d.is_healthy())
|
||||
|
||||
|
||||
class TestRunDaemon(unittest.TestCase):
|
||||
|
||||
def setUp(self):
|
||||
@ -89,7 +125,7 @@ class TestRunDaemon(unittest.TestCase):
|
||||
d = MyDaemon({})
|
||||
with mock.patch('swift.common.daemon.signal') as mock_signal:
|
||||
mock_signal.SIGTERM = signal.SIGTERM
|
||||
d.run()
|
||||
daemon.DaemonStrategy(d, d.logger).run()
|
||||
signal_args, kwargs = mock_signal.signal.call_args
|
||||
sig, func = signal_args
|
||||
self.assertEqual(sig, signal.SIGTERM)
|
||||
@ -158,6 +194,169 @@ class TestRunDaemon(unittest.TestCase):
|
||||
os.environ['TZ'] = old_tz
|
||||
time.tzset()
|
||||
|
||||
@contextmanager
|
||||
def mock_os(self, child_worker_cycles=3):
|
||||
self.waitpid_calls = defaultdict(int)
|
||||
|
||||
def mock_waitpid(p, *args):
|
||||
self.waitpid_calls[p] += 1
|
||||
if self.waitpid_calls[p] >= child_worker_cycles:
|
||||
rv = p
|
||||
else:
|
||||
rv = 0
|
||||
return rv, 0
|
||||
with mock.patch('swift.common.daemon.os.fork') as mock_fork, \
|
||||
mock.patch('swift.common.daemon.os.waitpid', mock_waitpid), \
|
||||
mock.patch('swift.common.daemon.os.kill') as mock_kill:
|
||||
mock_fork.side_effect = (
|
||||
'mock-pid-%s' % i for i in itertools.count())
|
||||
self.mock_fork = mock_fork
|
||||
self.mock_kill = mock_kill
|
||||
yield
|
||||
|
||||
def test_fork_workers(self):
|
||||
d = MyWorkerDaemon({'workers': 3})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
with self.mock_os():
|
||||
strategy.run(once=True)
|
||||
self.assertEqual([mock.call()] * 3, self.mock_fork.call_args_list)
|
||||
self.assertEqual(self.waitpid_calls, {
|
||||
'mock-pid-0': 3,
|
||||
'mock-pid-1': 3,
|
||||
'mock-pid-2': 3,
|
||||
})
|
||||
self.assertEqual([], self.mock_kill.call_args_list)
|
||||
self.assertIn('Finished', d.logger.get_lines_for_level('notice')[-1])
|
||||
|
||||
def test_forked_worker(self):
|
||||
d = MyWorkerDaemon({'workers': 3})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
with mock.patch('swift.common.daemon.os.fork') as mock_fork, \
|
||||
mock.patch('swift.common.daemon.os._exit') as mock_exit:
|
||||
mock_fork.return_value = 0
|
||||
mock_exit.side_effect = SystemExit
|
||||
self.assertRaises(SystemExit, strategy.run, once=True)
|
||||
self.assertTrue(d.once_called)
|
||||
|
||||
def test_restart_workers(self):
|
||||
d = MyWorkerDaemon({'workers': 3})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
d.health_side_effects = [True, False]
|
||||
with self.mock_os():
|
||||
self.mock_kill.side_effect = lambda *args, **kwargs: setattr(
|
||||
strategy, 'running', False)
|
||||
strategy.run()
|
||||
# six workers forked in total
|
||||
self.assertEqual([mock.call()] * 6, self.mock_fork.call_args_list)
|
||||
# since the daemon starts healthy, first pass checks children once
|
||||
self.assertEqual(self.waitpid_calls, {
|
||||
'mock-pid-0': 1,
|
||||
'mock-pid-1': 1,
|
||||
'mock-pid-2': 1,
|
||||
})
|
||||
# second pass is not healthy, original pid's killed
|
||||
self.assertEqual(set([
|
||||
('mock-pid-0', signal.SIGTERM),
|
||||
('mock-pid-1', signal.SIGTERM),
|
||||
('mock-pid-2', signal.SIGTERM),
|
||||
]), set(c[0] for c in self.mock_kill.call_args_list[:3]))
|
||||
# our mock_kill side effect breaks out of running, and cleanup kills
|
||||
# remaining pids
|
||||
self.assertEqual(set([
|
||||
('mock-pid-3', signal.SIGTERM),
|
||||
('mock-pid-4', signal.SIGTERM),
|
||||
('mock-pid-5', signal.SIGTERM),
|
||||
]), set(c[0] for c in self.mock_kill.call_args_list[3:]))
|
||||
|
||||
def test_worker_disappears(self):
|
||||
d = MyWorkerDaemon({'workers': 3})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
strategy.register_worker_start('mock-pid', {'mock_options': True})
|
||||
self.assertEqual(strategy.unspawned_worker_options, [])
|
||||
self.assertEqual(strategy.options_by_pid, {
|
||||
'mock-pid': {'mock_options': True}
|
||||
})
|
||||
# still running
|
||||
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
|
||||
mock_waitpid.return_value = (0, 0)
|
||||
strategy.check_on_all_running_workers()
|
||||
self.assertEqual(strategy.unspawned_worker_options, [])
|
||||
self.assertEqual(strategy.options_by_pid, {
|
||||
'mock-pid': {'mock_options': True}
|
||||
})
|
||||
# finished
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
strategy.register_worker_start('mock-pid', {'mock_options': True})
|
||||
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
|
||||
mock_waitpid.return_value = ('mock-pid', 0)
|
||||
strategy.check_on_all_running_workers()
|
||||
self.assertEqual(strategy.unspawned_worker_options, [
|
||||
{'mock_options': True}])
|
||||
self.assertEqual(strategy.options_by_pid, {})
|
||||
self.assertEqual(d.logger.get_lines_for_level('debug')[-1],
|
||||
'Worker mock-pid exited')
|
||||
# disappeared
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
strategy.register_worker_start('mock-pid', {'mock_options': True})
|
||||
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
|
||||
mock_waitpid.side_effect = OSError(
|
||||
errno.ECHILD, os.strerror(errno.ECHILD))
|
||||
mock_waitpid.return_value = ('mock-pid', 0)
|
||||
strategy.check_on_all_running_workers()
|
||||
self.assertEqual(strategy.unspawned_worker_options, [
|
||||
{'mock_options': True}])
|
||||
self.assertEqual(strategy.options_by_pid, {})
|
||||
self.assertEqual(d.logger.get_lines_for_level('notice')[-1],
|
||||
'Worker mock-pid died')
|
||||
|
||||
def test_worker_kills_pids_in_cleanup(self):
|
||||
d = MyWorkerDaemon({'workers': 2})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
strategy.register_worker_start('mock-pid-1', {'mock_options': True})
|
||||
strategy.register_worker_start('mock-pid-2', {'mock_options': True})
|
||||
self.assertEqual(strategy.unspawned_worker_options, [])
|
||||
self.assertEqual(strategy.options_by_pid, {
|
||||
'mock-pid-1': {'mock_options': True},
|
||||
'mock-pid-2': {'mock_options': True},
|
||||
})
|
||||
with mock.patch('swift.common.daemon.os.kill') as mock_kill:
|
||||
strategy.cleanup()
|
||||
self.assertEqual(strategy.unspawned_worker_options, [
|
||||
{'mock_options': True}] * 2)
|
||||
self.assertEqual(strategy.options_by_pid, {})
|
||||
self.assertEqual(set([
|
||||
('mock-pid-1', signal.SIGTERM),
|
||||
('mock-pid-2', signal.SIGTERM),
|
||||
]), set(c[0] for c in mock_kill.call_args_list))
|
||||
self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]),
|
||||
set(['Cleaned up worker mock-pid-1',
|
||||
'Cleaned up worker mock-pid-2']))
|
||||
|
||||
def test_worker_disappears_in_cleanup(self):
|
||||
d = MyWorkerDaemon({'workers': 2})
|
||||
strategy = daemon.DaemonStrategy(d, d.logger)
|
||||
strategy.register_worker_start('mock-pid-1', {'mock_options': True})
|
||||
strategy.register_worker_start('mock-pid-2', {'mock_options': True})
|
||||
self.assertEqual(strategy.unspawned_worker_options, [])
|
||||
self.assertEqual(strategy.options_by_pid, {
|
||||
'mock-pid-1': {'mock_options': True},
|
||||
'mock-pid-2': {'mock_options': True},
|
||||
})
|
||||
with mock.patch('swift.common.daemon.os.kill') as mock_kill:
|
||||
mock_kill.side_effect = [None, OSError(errno.ECHILD,
|
||||
os.strerror(errno.ECHILD))]
|
||||
strategy.cleanup()
|
||||
self.assertEqual(strategy.unspawned_worker_options, [
|
||||
{'mock_options': True}] * 2)
|
||||
self.assertEqual(strategy.options_by_pid, {})
|
||||
self.assertEqual(set([
|
||||
('mock-pid-1', signal.SIGTERM),
|
||||
('mock-pid-2', signal.SIGTERM),
|
||||
]), set(c[0] for c in mock_kill.call_args_list))
|
||||
self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]),
|
||||
set(['Cleaned up worker mock-pid-1',
|
||||
'Cleaned up worker mock-pid-2']))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -26,7 +26,7 @@ import re
|
||||
import random
|
||||
import struct
|
||||
import collections
|
||||
from eventlet import Timeout, sleep
|
||||
from eventlet import Timeout, sleep, spawn
|
||||
|
||||
from contextlib import closing, contextmanager
|
||||
from gzip import GzipFile
|
||||
@ -35,6 +35,7 @@ from six.moves.urllib.parse import unquote
|
||||
from swift.common import utils
|
||||
from swift.common.exceptions import DiskFileError
|
||||
from swift.common.header_key_dict import HeaderKeyDict
|
||||
from swift.common.utils import dump_recon_cache
|
||||
from swift.obj import diskfile, reconstructor as object_reconstructor
|
||||
from swift.common import ring
|
||||
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
|
||||
@ -43,7 +44,8 @@ from swift.obj.reconstructor import REVERT
|
||||
|
||||
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
|
||||
FabricatedRing, make_timestamp_iter,
|
||||
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies)
|
||||
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies,
|
||||
quiet_eventlet_exceptions)
|
||||
from test.unit.obj.common import write_diskfile
|
||||
|
||||
|
||||
@ -1343,6 +1345,931 @@ class TestGlobalSetupObjectReconstructorLegacyDurable(
|
||||
legacy_durable = True
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestWorkerReconstructor(unittest.TestCase):
|
||||
|
||||
maxDiff = None
|
||||
|
||||
def setUp(self):
|
||||
super(TestWorkerReconstructor, self).setUp()
|
||||
self.logger = debug_logger()
|
||||
self.testdir = tempfile.mkdtemp()
|
||||
self.recon_cache_path = os.path.join(self.testdir, 'recon')
|
||||
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
|
||||
# dump_recon_cache expects recon_cache_path to exist
|
||||
os.mkdir(self.recon_cache_path)
|
||||
|
||||
def tearDown(self):
|
||||
super(TestWorkerReconstructor, self).tearDown()
|
||||
shutil.rmtree(self.testdir)
|
||||
|
||||
def test_no_workers_by_default(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{}, logger=self.logger)
|
||||
self.assertEqual(0, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(0, len(list(reconstructor.get_worker_args())))
|
||||
|
||||
def test_bad_value_workers(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '-1'}, logger=self.logger)
|
||||
self.assertEqual(-1, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(0, len(list(reconstructor.get_worker_args())))
|
||||
|
||||
def test_workers_with_no_devices(self):
|
||||
def do_test(num_workers):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': num_workers}, logger=self.logger)
|
||||
self.assertEqual(num_workers, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(1, len(list(reconstructor.get_worker_args())))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': []},
|
||||
], list(reconstructor.get_worker_args()))
|
||||
do_test(1)
|
||||
do_test(10)
|
||||
|
||||
def test_workers_with_devices_and_no_valid_overrides(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '2'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
|
||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||
# N.B. sdz is not in local_devices so there are no devices to process
|
||||
# but still expect a single worker process
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=True, devices='sdz'))
|
||||
self.assertEqual(1, len(worker_args))
|
||||
self.assertEqual([{'override_partitions': [],
|
||||
'override_devices': ['sdz']}],
|
||||
worker_args)
|
||||
# overrides are ignored in forever mode
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=False, devices='sdz'))
|
||||
self.assertEqual(2, len(worker_args))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': ['sdb']},
|
||||
{'override_partitions': [], 'override_devices': ['sdc']}
|
||||
], worker_args)
|
||||
|
||||
def test_workers_with_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '2'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
|
||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
|
||||
expected = [
|
||||
{'override_partitions': [], 'override_devices': ['sdb']},
|
||||
{'override_partitions': [], 'override_devices': ['sdc']},
|
||||
]
|
||||
worker_args = list(reconstructor.get_worker_args(once=False))
|
||||
self.assertEqual(2, len(worker_args))
|
||||
self.assertEqual(expected, worker_args)
|
||||
worker_args = list(reconstructor.get_worker_args(once=True))
|
||||
self.assertEqual(2, len(worker_args))
|
||||
self.assertEqual(expected, worker_args)
|
||||
|
||||
def test_workers_with_devices_and_overrides(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '2'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
|
||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||
# check we don't get more workers than override devices...
|
||||
# N.B. sdz is not in local_devices so should be ignored for the
|
||||
# purposes of generating workers
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=True, devices='sdb,sdz', partitions='99,333'))
|
||||
self.assertEqual(1, len(worker_args))
|
||||
self.assertEqual(
|
||||
[{'override_partitions': [99, 333], 'override_devices': ['sdb']}],
|
||||
worker_args)
|
||||
# overrides are ignored in forever mode
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=False, devices='sdb,sdz', partitions='99,333'))
|
||||
self.assertEqual(2, len(worker_args))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': ['sdb']},
|
||||
{'override_partitions': [], 'override_devices': ['sdc']}
|
||||
], worker_args)
|
||||
|
||||
def test_workers_with_lots_of_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '2'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: [
|
||||
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
|
||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': [
|
||||
'sdb', 'sdd', 'sdf']},
|
||||
{'override_partitions': [], 'override_devices': [
|
||||
'sdc', 'sde']},
|
||||
], list(reconstructor.get_worker_args()))
|
||||
|
||||
def test_workers_with_lots_of_devices_and_overrides(self):
|
||||
# check that override devices get distributed across workers
|
||||
# in similar fashion to all devices
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '2'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: [
|
||||
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
|
||||
self.assertEqual(2, reconstructor.reconstructor_workers)
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=True, devices='sdb,sdd,sdf', partitions='99,333'))
|
||||
self.assertEqual(1, len(worker_args))
|
||||
# 5 devices in total, 2 workers -> up to 3 devices per worker so a
|
||||
# single worker should handle the requested override devices
|
||||
self.assertEqual([
|
||||
{'override_partitions': [99, 333], 'override_devices': [
|
||||
'sdb', 'sdd', 'sdf']},
|
||||
], worker_args)
|
||||
|
||||
# with 4 override devices, expect 2 per worker
|
||||
worker_args = list(reconstructor.get_worker_args(
|
||||
once=True, devices='sdb,sdc,sdd,sdf', partitions='99,333'))
|
||||
self.assertEqual(2, len(worker_args))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [99, 333], 'override_devices': [
|
||||
'sdb', 'sdd']},
|
||||
{'override_partitions': [99, 333], 'override_devices': [
|
||||
'sdc', 'sdf']},
|
||||
], worker_args)
|
||||
|
||||
def test_workers_with_lots_of_workers(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '10'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
|
||||
self.assertEqual(10, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': ['sdb']},
|
||||
{'override_partitions': [], 'override_devices': ['sdc']},
|
||||
], list(reconstructor.get_worker_args()))
|
||||
|
||||
def test_workers_with_lots_of_workers_and_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'reconstructor_workers': '10'}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: [
|
||||
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
|
||||
self.assertEqual(10, reconstructor.reconstructor_workers)
|
||||
self.assertEqual(5, len(list(reconstructor.get_worker_args())))
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': ['sdb']},
|
||||
{'override_partitions': [], 'override_devices': ['sdc']},
|
||||
{'override_partitions': [], 'override_devices': ['sdd']},
|
||||
{'override_partitions': [], 'override_devices': ['sde']},
|
||||
{'override_partitions': [], 'override_devices': ['sdf']},
|
||||
], list(reconstructor.get_worker_args()))
|
||||
|
||||
def test_workers_with_some_workers_and_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: [
|
||||
'd%s' % (i + 1) for i in range(21)]
|
||||
# ... with many devices per worker, worker count is pretty granular
|
||||
for i in range(1, 8):
|
||||
reconstructor.reconstructor_workers = i
|
||||
self.assertEqual(i, len(list(reconstructor.get_worker_args())))
|
||||
# ... then it gets sorta stair step
|
||||
for i in range(9, 10):
|
||||
reconstructor.reconstructor_workers = i
|
||||
self.assertEqual(7, len(list(reconstructor.get_worker_args())))
|
||||
# 2-3 devices per worker
|
||||
for args in reconstructor.get_worker_args():
|
||||
self.assertIn(len(args['override_devices']), (2, 3))
|
||||
for i in range(11, 20):
|
||||
reconstructor.reconstructor_workers = i
|
||||
self.assertEqual(11, len(list(reconstructor.get_worker_args())))
|
||||
# 1, 2 devices per worker
|
||||
for args in reconstructor.get_worker_args():
|
||||
self.assertIn(len(args['override_devices']), (1, 2))
|
||||
# this is debatable, but maybe I'll argue if you're going to have
|
||||
# *some* workers with > 1 device, it's better to have fewer workers
|
||||
# with devices spread out evenly than a couple outliers?
|
||||
self.assertEqual([
|
||||
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
|
||||
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
|
||||
{'override_partitions': [], 'override_devices': ['d3', 'd14']},
|
||||
{'override_partitions': [], 'override_devices': ['d4', 'd15']},
|
||||
{'override_partitions': [], 'override_devices': ['d5', 'd16']},
|
||||
{'override_partitions': [], 'override_devices': ['d6', 'd17']},
|
||||
{'override_partitions': [], 'override_devices': ['d7', 'd18']},
|
||||
{'override_partitions': [], 'override_devices': ['d8', 'd19']},
|
||||
{'override_partitions': [], 'override_devices': ['d9', 'd20']},
|
||||
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
|
||||
{'override_partitions': [], 'override_devices': ['d11']},
|
||||
], list(reconstructor.get_worker_args()))
|
||||
# you can't get < than 1 device per worker
|
||||
for i in range(21, 52):
|
||||
reconstructor.reconstructor_workers = i
|
||||
self.assertEqual(21, len(list(reconstructor.get_worker_args())))
|
||||
for args in reconstructor.get_worker_args():
|
||||
self.assertEqual(1, len(args['override_devices']))
|
||||
|
||||
def test_next_rcache_update_configured_with_stats_interval(self):
|
||||
now = time.time()
|
||||
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{}, logger=self.logger)
|
||||
self.assertEqual(now + 300, reconstructor._next_rcache_update)
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'stats_interval': '30'}, logger=self.logger)
|
||||
self.assertEqual(now + 30, reconstructor._next_rcache_update)
|
||||
|
||||
def test_is_healthy_rcache_update_waits_for_next_update(self):
|
||||
now = time.time()
|
||||
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
# file does not exist to start
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
# ... and isn't created until _next_rcache_update
|
||||
self.assertFalse(os.path.exists(self.rcache))
|
||||
# ... but if we wait 5 mins (by default)
|
||||
orig_next_update = reconstructor._next_rcache_update
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=now + 301):
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
self.assertGreater(reconstructor._next_rcache_update, orig_next_update)
|
||||
# ... it will be created
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
# and empty
|
||||
self.assertEqual({}, data)
|
||||
|
||||
def test_is_healthy_ring_update_next_check(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
reconstructor.get_local_devices = lambda: {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertFalse(reconstructor.is_healthy())
|
||||
reconstructor.all_local_devices = {
|
||||
'sdb%d' % p for p in reconstructor.policies}
|
||||
self.assertTrue(reconstructor.is_healthy())
|
||||
|
||||
def test_final_recon_dump(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
reconstructor.all_local_devices = ['sda', 'sdc']
|
||||
total = 12.0
|
||||
now = time.time()
|
||||
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||
reconstructor.final_recon_dump(total)
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': now,
|
||||
'object_reconstruction_time': total,
|
||||
}, data)
|
||||
total = 14.0
|
||||
now += total * 60
|
||||
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||
reconstructor.final_recon_dump(total)
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': now,
|
||||
'object_reconstruction_time': total,
|
||||
}, data)
|
||||
# per_disk_stats with workers
|
||||
reconstructor.reconstructor_workers = 1
|
||||
old_total = total
|
||||
total = 16.0
|
||||
before = now
|
||||
now += total * 60
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=now), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-1'):
|
||||
reconstructor.final_recon_dump(total, override_devices=[
|
||||
'sda', 'sdc'])
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': before,
|
||||
'object_reconstruction_time': old_total,
|
||||
'object_reconstruction_per_disk': {
|
||||
'sda': {
|
||||
'object_reconstruction_last': now,
|
||||
'object_reconstruction_time': total,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdc': {
|
||||
'object_reconstruction_last': now,
|
||||
'object_reconstruction_time': total,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
|
||||
},
|
||||
}, data)
|
||||
# and without workers we clear it out
|
||||
reconstructor.reconstructor_workers = 0
|
||||
total = 18.0
|
||||
now += total * 60
|
||||
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
|
||||
reconstructor.final_recon_dump(total)
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': now,
|
||||
'object_reconstruction_time': total,
|
||||
}, data)
|
||||
|
||||
def test_dump_recon_run_once_inline(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
reconstructor.reconstruct = mock.MagicMock()
|
||||
now = time.time()
|
||||
later = now + 300 # 5 mins
|
||||
with mock.patch('swift.obj.reconstructor.time.time', side_effect=[
|
||||
now, later, later]):
|
||||
reconstructor.run_once()
|
||||
# no override args passed to reconstruct
|
||||
self.assertEqual([mock.call(
|
||||
override_devices=[],
|
||||
override_partitions=[]
|
||||
)], reconstructor.reconstruct.call_args_list)
|
||||
# script mode with no override args, we expect recon dumps
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
}, data)
|
||||
total = 10.0
|
||||
later += total * 60
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=later):
|
||||
reconstructor.final_recon_dump(total)
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 10.0,
|
||||
}, data)
|
||||
|
||||
def test_dump_recon_run_once_in_worker(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path,
|
||||
'reconstructor_workers': 1},
|
||||
logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: {'sda'}
|
||||
now = time.time()
|
||||
later = now + 300 # 5 mins
|
||||
|
||||
def do_test(run_kwargs, expected_device):
|
||||
# get the actual kwargs that would be passed to run_once in a
|
||||
# worker
|
||||
run_once_kwargs = list(
|
||||
reconstructor.get_worker_args(once=True, **run_kwargs))[0]
|
||||
reconstructor.reconstruct = mock.MagicMock()
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
side_effect=[now, later, later]):
|
||||
reconstructor.run_once(**run_once_kwargs)
|
||||
self.assertEqual([mock.call(
|
||||
override_devices=[expected_device],
|
||||
override_partitions=[]
|
||||
)], reconstructor.reconstruct.call_args_list)
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
# no aggregate is written but perhaps it should be, in which
|
||||
# case this assertion will need to change
|
||||
'object_reconstruction_per_disk': {
|
||||
expected_device: {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': mock.ANY
|
||||
}
|
||||
}
|
||||
}, data)
|
||||
|
||||
# script mode with no CLI override args, we expect recon dumps
|
||||
do_test({}, 'sda')
|
||||
# script mode *with* CLI override devices, we expect recon dumps
|
||||
os.unlink(self.rcache)
|
||||
do_test(dict(devices='sda'), 'sda')
|
||||
# if the override device is not in local devices we still get
|
||||
# a recon dump, but it'll get cleaned up in the next aggregation
|
||||
os.unlink(self.rcache)
|
||||
do_test(dict(devices='sdz'), 'sdz')
|
||||
|
||||
# now disable workers and check that inline run_once updates rcache
|
||||
# and clears out per disk stats
|
||||
now = time.time()
|
||||
later = now + 600 # 10 mins
|
||||
reconstructor.reconstructor_workers = 0
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
side_effect=[now, later, later]):
|
||||
reconstructor.run_once()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 10.0,
|
||||
}, data)
|
||||
|
||||
def test_no_dump_recon_run_once(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor(
|
||||
{'recon_cache_path': self.recon_cache_path},
|
||||
logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: {'sda', 'sdb', 'sdc'}
|
||||
|
||||
def do_test(run_once_kwargs, expected_devices, expected_partitions):
|
||||
reconstructor.reconstruct = mock.MagicMock()
|
||||
now = time.time()
|
||||
later = now + 300 # 5 mins
|
||||
with mock.patch('swift.obj.reconstructor.time.time', side_effect=[
|
||||
now, later, later]):
|
||||
reconstructor.run_once(**run_once_kwargs)
|
||||
# override args passed to reconstruct
|
||||
actual_calls = reconstructor.reconstruct.call_args_list
|
||||
self.assertEqual({'override_devices', 'override_partitions'},
|
||||
set(actual_calls[0][1]))
|
||||
self.assertEqual(sorted(expected_devices),
|
||||
sorted(actual_calls[0][1]['override_devices']))
|
||||
self.assertEqual(sorted(expected_partitions),
|
||||
sorted(actual_calls[0][1]['override_partitions']))
|
||||
self.assertFalse(actual_calls[1:])
|
||||
self.assertEqual(False, os.path.exists(self.rcache))
|
||||
|
||||
# inline mode with overrides never does recon dump
|
||||
reconstructor.reconstructor_workers = 0
|
||||
kwargs = {'devices': 'sda,sdb'}
|
||||
do_test(kwargs, ['sda', 'sdb'], [])
|
||||
|
||||
# Have partition override, so no recon dump
|
||||
kwargs = {'partitions': '1,2,3'}
|
||||
do_test(kwargs, [], [1, 2, 3])
|
||||
reconstructor.reconstructor_workers = 1
|
||||
worker_kwargs = list(
|
||||
reconstructor.get_worker_args(once=True, **kwargs))[0]
|
||||
do_test(worker_kwargs, ['sda', 'sdb', 'sdc'], [1, 2, 3])
|
||||
|
||||
reconstructor.reconstructor_workers = 0
|
||||
kwargs = {'devices': 'sda,sdb', 'partitions': '1,2,3'}
|
||||
do_test(kwargs, ['sda', 'sdb'], [1, 2, 3])
|
||||
reconstructor.reconstructor_workers = 1
|
||||
worker_kwargs = list(
|
||||
reconstructor.get_worker_args(once=True, **kwargs))[0]
|
||||
do_test(worker_kwargs, ['sda', 'sdb'], [1, 2, 3])
|
||||
|
||||
# 'sdz' is not in local devices
|
||||
reconstructor.reconstructor_workers = 0
|
||||
kwargs = {'devices': 'sdz'}
|
||||
do_test(kwargs, ['sdz'], [])
|
||||
|
||||
def test_run_forever_recon_aggregation(self):
|
||||
|
||||
class StopForever(Exception):
|
||||
pass
|
||||
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||
'reconstructor_workers': 2,
|
||||
'recon_cache_path': self.recon_cache_path
|
||||
}, logger=self.logger)
|
||||
reconstructor.get_local_devices = lambda: ['sda', 'sdb', 'sdc', 'sdd']
|
||||
reconstructor.reconstruct = mock.MagicMock()
|
||||
now = time.time()
|
||||
later = now + 300 # 5 mins
|
||||
worker_args = list(
|
||||
# include 'devices' kwarg as a sanity check - it should be ignored
|
||||
# in run_forever mode
|
||||
reconstructor.get_worker_args(once=False, devices='sda'))
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
side_effect=[now, later, later]), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-1'), \
|
||||
mock.patch('swift.obj.reconstructor.sleep',
|
||||
side_effect=[StopForever]), \
|
||||
Timeout(.3), quiet_eventlet_exceptions(), \
|
||||
self.assertRaises(StopForever):
|
||||
gt = spawn(reconstructor.run_forever, **worker_args[0])
|
||||
gt.wait()
|
||||
# override args are passed to reconstruct
|
||||
self.assertEqual([mock.call(
|
||||
override_devices=['sda', 'sdc'],
|
||||
override_partitions=[]
|
||||
)], reconstructor.reconstruct.call_args_list)
|
||||
# forever mode with override args, we expect per-disk recon dumps
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'sda': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdc': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
reconstructor.reconstruct.reset_mock()
|
||||
# another worker would get *different* disks
|
||||
before = now = later
|
||||
later = now + 300 # 5 more minutes
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
side_effect=[now, later, later]), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-2'), \
|
||||
mock.patch('swift.obj.reconstructor.sleep',
|
||||
side_effect=[StopForever]), \
|
||||
Timeout(.3), quiet_eventlet_exceptions(), \
|
||||
self.assertRaises(StopForever):
|
||||
gt = spawn(reconstructor.run_forever, **worker_args[1])
|
||||
gt.wait()
|
||||
# override args are parsed
|
||||
self.assertEqual([mock.call(
|
||||
override_devices=['sdb', 'sdd'],
|
||||
override_partitions=[]
|
||||
)], reconstructor.reconstruct.call_args_list)
|
||||
# forever mode with override args, we expect per-disk recon dumps
|
||||
self.assertTrue(os.path.exists(self.rcache))
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'sda': {
|
||||
'object_reconstruction_last': before,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdb': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'sdc': {
|
||||
'object_reconstruction_last': before,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdd': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# aggregation is done in the parent thread even later
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 10.0,
|
||||
'object_reconstruction_per_disk': {
|
||||
'sda': {
|
||||
'object_reconstruction_last': before,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdb': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'sdc': {
|
||||
'object_reconstruction_last': before,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'sdd': {
|
||||
'object_reconstruction_last': later,
|
||||
'object_reconstruction_time': 5.0,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
def test_recon_aggregation_waits_for_all_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||
'reconstructor_workers': 2,
|
||||
'recon_cache_path': self.recon_cache_path
|
||||
}, logger=self.logger)
|
||||
reconstructor.all_local_devices = set([
|
||||
'd0', 'd1', 'd2', 'd3',
|
||||
# unreported device definitely matters
|
||||
'd4'])
|
||||
start = time.time() - 1000
|
||||
for i in range(4):
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=start + (300 * i)), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-%s' % i):
|
||||
reconstructor.final_recon_dump(
|
||||
i, override_devices=['d%s' % i])
|
||||
# sanity
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 0.0,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 300,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'd2': {
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 2,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'd3': {
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 3,
|
||||
'pid': 'pid-3',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# unreported device d4 prevents aggregation
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertNotIn('object_reconstruction_last', data)
|
||||
self.assertNotIn('object_reconstruction_time', data)
|
||||
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
|
||||
set(data['object_reconstruction_per_disk'].keys()))
|
||||
|
||||
# it's idempotent
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertNotIn('object_reconstruction_last', data)
|
||||
self.assertNotIn('object_reconstruction_time', data)
|
||||
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
|
||||
set(data['object_reconstruction_per_disk'].keys()))
|
||||
|
||||
# remove d4, we no longer wait on it for aggregation
|
||||
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual(start + 900, data['object_reconstruction_last'])
|
||||
self.assertEqual(15, data['object_reconstruction_time'])
|
||||
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
|
||||
set(data['object_reconstruction_per_disk'].keys()))
|
||||
|
||||
def test_recon_aggregation_removes_devices(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||
'reconstructor_workers': 2,
|
||||
'recon_cache_path': self.recon_cache_path
|
||||
}, logger=self.logger)
|
||||
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
|
||||
start = time.time() - 1000
|
||||
for i in range(4):
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=start + (300 * i)), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-%s' % i):
|
||||
reconstructor.final_recon_dump(
|
||||
i, override_devices=['d%s' % i])
|
||||
# sanity
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 0.0,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 300,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'd2': {
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 2,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'd3': {
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 3,
|
||||
'pid': 'pid-3',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual(start + 900, data['object_reconstruction_last'])
|
||||
self.assertEqual(15, data['object_reconstruction_time'])
|
||||
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
|
||||
set(data['object_reconstruction_per_disk'].keys()))
|
||||
|
||||
# it's idempotent
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 15,
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 0.0,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 300,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'd2': {
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 2,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'd3': {
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 3,
|
||||
'pid': 'pid-3',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# if a device is removed from the ring
|
||||
reconstructor.all_local_devices = set(['d1', 'd2', 'd3'])
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
# ... it's per-disk stats are removed (d0)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 11,
|
||||
'object_reconstruction_per_disk': {
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 300,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'd2': {
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 2,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
'd3': {
|
||||
'object_reconstruction_last': start + 900,
|
||||
'object_reconstruction_time': 3,
|
||||
'pid': 'pid-3',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# which can affect the aggregates!
|
||||
reconstructor.all_local_devices = set(['d1', 'd2'])
|
||||
reconstructor.aggregate_recon_update()
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 6,
|
||||
'object_reconstruction_per_disk': {
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 300,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
'd2': {
|
||||
'object_reconstruction_last': start + 600,
|
||||
'object_reconstruction_time': 2,
|
||||
'pid': 'pid-2',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
def test_recon_aggregation_races_with_final_recon_dump(self):
|
||||
reconstructor = object_reconstructor.ObjectReconstructor({
|
||||
'reconstructor_workers': 2,
|
||||
'recon_cache_path': self.recon_cache_path
|
||||
}, logger=self.logger)
|
||||
reconstructor.all_local_devices = set(['d0', 'd1'])
|
||||
start = time.time() - 1000
|
||||
# first worker dumps to recon cache
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=start), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-0'):
|
||||
reconstructor.final_recon_dump(
|
||||
1, override_devices=['d0'])
|
||||
# sanity
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# simulate a second worker concurrently dumping to recon cache while
|
||||
# parent is aggregatng existing results; mock dump_recon_cache as a
|
||||
# convenient way to interrupt parent aggregate_recon_update and 'pass
|
||||
# control' to second worker
|
||||
updated_data = [] # state of recon cache just after second worker dump
|
||||
|
||||
def simulate_other_process_final_recon_dump():
|
||||
with mock.patch('swift.obj.reconstructor.time.time',
|
||||
return_value=start + 999), \
|
||||
mock.patch('swift.obj.reconstructor.os.getpid',
|
||||
return_value='pid-1'):
|
||||
reconstructor.final_recon_dump(
|
||||
1000, override_devices=['d1'])
|
||||
with open(self.rcache) as f:
|
||||
updated_data.append(json.load(f))
|
||||
|
||||
def fake_dump_recon_cache(*args, **kwargs):
|
||||
# temporarily put back real dump_recon_cache
|
||||
with mock.patch('swift.obj.reconstructor.dump_recon_cache',
|
||||
dump_recon_cache):
|
||||
simulate_other_process_final_recon_dump()
|
||||
# and now proceed with parent dump_recon_cache
|
||||
dump_recon_cache(*args, **kwargs)
|
||||
|
||||
reconstructor.dump_recon_cache = fake_dump_recon_cache
|
||||
with mock.patch('swift.obj.reconstructor.dump_recon_cache',
|
||||
fake_dump_recon_cache):
|
||||
reconstructor.aggregate_recon_update()
|
||||
|
||||
self.assertEqual([{ # sanity check - second process did dump its data
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 999,
|
||||
'object_reconstruction_time': 1000,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
}
|
||||
}], updated_data)
|
||||
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 999,
|
||||
'object_reconstruction_time': 1000,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
# next aggregation will find d1 stats
|
||||
reconstructor.aggregate_recon_update()
|
||||
|
||||
with open(self.rcache) as f:
|
||||
data = json.load(f)
|
||||
self.assertEqual({
|
||||
'object_reconstruction_last': start + 999,
|
||||
'object_reconstruction_time': 1000,
|
||||
'object_reconstruction_per_disk': {
|
||||
'd0': {
|
||||
'object_reconstruction_last': start,
|
||||
'object_reconstruction_time': 1,
|
||||
'pid': 'pid-0',
|
||||
},
|
||||
'd1': {
|
||||
'object_reconstruction_last': start + 999,
|
||||
'object_reconstruction_time': 1000,
|
||||
'pid': 'pid-1',
|
||||
},
|
||||
}
|
||||
}, data)
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class BaseTestObjectReconstructor(unittest.TestCase):
|
||||
def setUp(self):
|
||||
|
Loading…
x
Reference in New Issue
Block a user