diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 347b0a0594..9f3659f267 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -811,8 +811,18 @@ daemonize yes Whether or not to run reconstruction as a daemon interval 30 Time in seconds to wait between reconstruction passes +reconstructor_workers 0 Maximum number of worker processes + to spawn. Each worker will handle + a subset of devices. Devices will + be assigned evenly among the workers + so that workers cycle at similar + intervals (which can lead to fewer + workers than requested). You can not + have more workers than devices. If + you have no devices only a single + worker is spawned. concurrency 1 Number of reconstruction threads to - spawn. + spawn per reconstructor process. stats_interval 300 Interval in seconds between logging reconstruction statistics handoffs_only false The handoffs_only mode option is for diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 917dea3299..e9c434b2bd 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -314,6 +314,13 @@ use = egg:swift#recon # run_pause is deprecated, use interval instead # run_pause = 30 # +# Maximum number of worker processes to spawn. Each worker will handle a +# subset of devices. Devices will be assigned evenly among the workers so that +# workers cycle at similar intervals (which can lead to fewer workers than +# requested). You can not have more workers than devices. If you have no +# devices only a single worker is spawned. +# reconstructor_workers = 0 +# # concurrency = 1 # stats_interval = 300 # node_timeout = 10 diff --git a/swift/common/daemon.py b/swift/common/daemon.py index 67004fe51d..141027cc1c 100644 --- a/swift/common/daemon.py +++ b/swift/common/daemon.py @@ -13,6 +13,7 @@ # See the License for the specific language governing permissions and # limitations under the License. +import errno import os import sys import time @@ -25,7 +26,24 @@ from swift.common import utils class Daemon(object): - """Daemon base class""" + """ + Daemon base class + + A daemon has a run method that accepts a ``once`` kwarg and will dispatch + to :meth:`run_once` or :meth:`run_forever`. + + A subclass of Daemon must implement :meth:`run_once` and + :meth:`run_forever`. + + A subclass of Daemon may override :meth:`get_worker_args` to dispatch + arguments to individual child process workers and :meth:`is_healthy` to + perform context specific periodic wellness checks which can reset worker + arguments. + + Implementations of Daemon do not know *how* to daemonize, or execute + multiple daemonized workers, they simply provide the behavior of the daemon + and context specific knowledge about how workers should be started. + """ def __init__(self, conf): self.conf = conf @@ -40,35 +58,200 @@ class Daemon(object): raise NotImplementedError('run_forever not implemented') def run(self, once=False, **kwargs): - """Run the daemon""" + if once: + self.run_once(**kwargs) + else: + self.run_forever(**kwargs) + + def get_worker_args(self, once=False, **kwargs): + """ + For each worker yield a (possibly empty) dict of kwargs to pass along + to the daemon's :meth:`run` method after fork. The length of elements + returned from this method will determine the number of processes + created. + + If the returned iterable is empty, the Strategy will fallback to + run-inline strategy. + + :param once: False if the worker(s) will be daemonized, True if the + worker(s) will be run once + :param kwargs: plumbed through via command line argparser + + :returns: an iterable of dicts, each element represents the kwargs to + be passed to a single worker's :meth:`run` method after fork. + """ + return [] + + def is_healthy(self): + """ + This method is called very frequently on the instance of the daemon + held by the parent process. If it returns False, all child workers are + terminated, and new workers will be created. + + :returns: a boolean, True only if all workers should continue to run + """ + return True + + +class DaemonStrategy(object): + """ + This is the execution strategy for using subclasses of Daemon. The default + behavior is to invoke the daemon's :meth:`Daemon.run` method from within + the parent process. When the :meth:`Daemon.run` method returns the parent + process will exit. + + However, if the Daemon returns a non-empty iterable from + :meth:`Daemon.get_worker_args`, the daemon's :meth:`Daemon.run` method will + be invoked in child processes, with the arguments provided from the parent + process's instance of the daemon. If a child process exits it will be + restarted with the same options, unless it was executed in once mode. + + :param daemon: an instance of a :class:`Daemon` (has a `run` method) + :param logger: a logger instance + """ + + def __init__(self, daemon, logger): + self.daemon = daemon + self.logger = logger + self.running = False + # only used by multi-worker strategy + self.options_by_pid = {} + self.unspawned_worker_options = [] + + def setup(self, **kwargs): utils.validate_configuration() - utils.drop_privileges(self.conf.get('user', 'swift')) + utils.drop_privileges(self.daemon.conf.get('user', 'swift')) utils.capture_stdio(self.logger, **kwargs) def kill_children(*args): + self.running = False self.logger.info('SIGTERM received') signal.signal(signal.SIGTERM, signal.SIG_IGN) os.killpg(0, signal.SIGTERM) os._exit(0) signal.signal(signal.SIGTERM, kill_children) - if once: - self.run_once(**kwargs) + self.running = True + + def _run_inline(self, once=False, **kwargs): + """Run the daemon""" + self.daemon.run(once=once, **kwargs) + + def run(self, once=False, **kwargs): + """Daemonize and execute our strategy""" + self.setup(**kwargs) + try: + self._run(once=once, **kwargs) + except KeyboardInterrupt: + self.logger.notice('User quit') + finally: + self.cleanup() + self.running = False + + def _fork(self, once, **kwargs): + pid = os.fork() + if pid == 0: + signal.signal(signal.SIGHUP, signal.SIG_DFL) + signal.signal(signal.SIGTERM, signal.SIG_DFL) + + self.daemon.run(once, **kwargs) + + self.logger.debug('Forked worker %s finished', os.getpid()) + # do not return from this stack, nor execute any finally blocks + os._exit(0) else: - self.run_forever(**kwargs) + self.register_worker_start(pid, kwargs) + return pid + + def iter_unspawned_workers(self): + while True: + try: + per_worker_options = self.unspawned_worker_options.pop() + except IndexError: + return + yield per_worker_options + + def spawned_pids(self): + return self.options_by_pid.keys() + + def register_worker_start(self, pid, per_worker_options): + self.logger.debug('Spawned worker %s with %r', pid, per_worker_options) + self.options_by_pid[pid] = per_worker_options + + def register_worker_exit(self, pid): + self.unspawned_worker_options.append(self.options_by_pid.pop(pid)) + + def ask_daemon_to_prepare_workers(self, once, **kwargs): + self.unspawned_worker_options = list( + self.daemon.get_worker_args(once=once, **kwargs)) + + def abort_workers_if_daemon_would_like(self): + if not self.daemon.is_healthy(): + self.logger.debug( + 'Daemon needs to change options, aborting workers') + self.cleanup() + return True + return False + + def check_on_all_running_workers(self): + for p in self.spawned_pids(): + try: + pid, status = os.waitpid(p, os.WNOHANG) + except OSError as err: + if err.errno not in (errno.EINTR, errno.ECHILD): + raise + self.logger.notice('Worker %s died', p) + else: + if pid == 0: + # child still running + continue + self.logger.debug('Worker %s exited', p) + self.register_worker_exit(p) + + def _run(self, once, **kwargs): + self.ask_daemon_to_prepare_workers(once, **kwargs) + if not self.unspawned_worker_options: + return self._run_inline(once, **kwargs) + for per_worker_options in self.iter_unspawned_workers(): + if self._fork(once, **per_worker_options) == 0: + return 0 + while self.running: + if self.abort_workers_if_daemon_would_like(): + self.ask_daemon_to_prepare_workers(once, **kwargs) + self.check_on_all_running_workers() + if not once: + for per_worker_options in self.iter_unspawned_workers(): + if self._fork(once, **per_worker_options) == 0: + return 0 + else: + if not self.spawned_pids(): + self.logger.notice('Finished %s', os.getpid()) + break + time.sleep(0.1) + return 0 + + def cleanup(self): + for p in self.spawned_pids(): + try: + os.kill(p, signal.SIGTERM) + except OSError as err: + if err.errno not in (errno.ESRCH, errno.EINTR, errno.ECHILD): + raise + self.register_worker_exit(p) + self.logger.debug('Cleaned up worker %s', p) def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): """ - Loads settings from conf, then instantiates daemon "klass" and runs the - daemon with the specified once kwarg. The section_name will be derived - from the daemon "klass" if not provided (e.g. ObjectReplicator => + Loads settings from conf, then instantiates daemon ``klass`` and runs the + daemon with the specified ``once`` kwarg. The section_name will be derived + from the daemon ``klass`` if not provided (e.g. ObjectReplicator => object-replicator). - :param klass: Class to instantiate, subclass of common.daemon.Daemon + :param klass: Class to instantiate, subclass of :class:`Daemon` :param conf_file: Path to configuration file :param section_name: Section name from conf file to load config from - :param once: Passed to daemon run method + :param once: Passed to daemon :meth:`Daemon.run` method """ # very often the config section_name is based on the class name # the None singleton will be passed through to readconf as is @@ -113,8 +296,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs): os.environ['TZ'] = 'UTC+0' time.tzset() + logger.notice('Starting %s', os.getpid()) try: - klass(conf).run(once=once, **kwargs) + DaemonStrategy(klass(conf), logger).run(once=once, **kwargs) except KeyboardInterrupt: logger.info('User quit') - logger.info('Exited') + logger.notice('Exited %s', os.getpid()) diff --git a/swift/common/utils.py b/swift/common/utils.py index d564faea40..b516444fc4 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -3120,7 +3120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2, try: with NamedTemporaryFile(dir=os.path.dirname(cache_file), delete=False) as tf: - tf.write(json.dumps(cache_entry) + '\n') + tf.write(json.dumps(cache_entry, sort_keys=True) + '\n') if set_owner: os.chown(tf.name, pwd.getpwnam(set_owner).pw_uid, -1) renamer(tf.name, cache_file, fsync=False) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 766cb27476..67b371a27d 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -15,6 +15,7 @@ import json import errno +import math import os from os.path import join import random @@ -93,6 +94,30 @@ def _full_path(node, part, relative_path, policy): } +def parse_override_options(**kwargs): + """ + Return a dict with keys `override_devices` and `override_partitions` whose + values have been parsed from `kwargs`. If either key is found in `kwargs` + then copy its value from kwargs. Otherwise, if `once` is set in `kwargs` + then parse `devices` and `partitions` keys for the value of + `override_devices` and `override_partitions` respectively. + + :return: a dict with keys `override_devices` and `override_partitions` + """ + if kwargs.get('once', False): + devices = list_from_csv(kwargs.get('devices')) + partitions = [ + int(p) for p in list_from_csv(kwargs.get('partitions'))] + else: + devices = [] + partitions = [] + + return { + 'override_devices': kwargs.get('override_devices', devices), + 'override_partitions': kwargs.get('override_partitions', partitions), + } + + class RebuildingECDiskFileStream(object): """ This class wraps the reconstructed fragment archive data and @@ -155,6 +180,12 @@ class ObjectReconstructor(Daemon): self.port = None if self.servers_per_port else \ int(conf.get('bind_port', 6200)) self.concurrency = int(conf.get('concurrency', 1)) + # N.B. to maintain compatibility with legacy configs this option can + # not be named 'workers' because the object-server uses that option + # name in the DEFAULT section + self.reconstructor_workers = int(conf.get('reconstructor_workers', 0)) + self.policies = [policy for policy in POLICIES + if policy.policy_type == EC_POLICY] self.stats_interval = int(conf.get('stats_interval', '300')) self.ring_check_interval = int(conf.get('ring_check_interval', 15)) self.next_check = time.time() + self.ring_check_interval @@ -166,6 +197,7 @@ class ObjectReconstructor(Daemon): self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, "object.recon") + self._next_rcache_update = time.time() + self.stats_interval # defaults subject to change after beta self.conn_timeout = float(conf.get('conn_timeout', 0.5)) self.node_timeout = float(conf.get('node_timeout', 10)) @@ -193,6 +225,102 @@ class ObjectReconstructor(Daemon): self.logger.warning('Ignored handoffs_first option in favor ' 'of handoffs_only.') self._df_router = DiskFileRouter(conf, self.logger) + self.all_local_devices = self.get_local_devices() + + def get_worker_args(self, once=False, **kwargs): + """ + Take the set of all local devices for this node from all the EC + policies rings, and distribute them evenly into the number of workers + to be spawned according to the configured worker count. If `devices` is + given in `kwargs` then distribute only those devices. + + :param once: False if the worker(s) will be daemonized, True if the + worker(s) will be run once + :param kwargs: optional overrides from the command line + """ + if self.reconstructor_workers < 1: + return + override_options = parse_override_options(once=once, **kwargs) + + # Note that this get re-used when dumping stats and in is_healthy + self.all_local_devices = self.get_local_devices() + + if override_options['override_devices']: + devices = [d for d in override_options['override_devices'] + if d in self.all_local_devices] + else: + devices = list(self.all_local_devices) + if not devices: + # we only need a single worker to do nothing until a ring change + yield dict(override_options) + return + # for somewhat uniform load per worker use same max_devices_per_worker + # when handling all devices or just override devices... + max_devices_per_worker = int(math.ceil( + 1.0 * len(self.all_local_devices) / self.reconstructor_workers)) + # ...but only use enough workers for the actual devices being handled + n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker)) + override_devices_per_worker = [devices[i::n] for i in range(n)] + for override_devices in override_devices_per_worker: + yield dict(override_options, override_devices=override_devices) + + def is_healthy(self): + """ + Check whether rings have changed, and maybe do a recon update. + + :returns: False if any ec ring has changed + """ + now = time.time() + if now > self._next_rcache_update: + self._next_rcache_update = now + self.stats_interval + self.aggregate_recon_update() + return self.get_local_devices() == self.all_local_devices + + def aggregate_recon_update(self): + """ + Aggregate per-disk rcache updates from child workers. + """ + try: + with open(self.rcache) as f: + existing_data = json.load(f) + except IOError as e: + if e.errno != errno.ENOENT: + raise + # dump_recon_cache will create new file and dirs + existing_data = {} + first_start = time.time() + last_finish = 0 + all_devices_reporting = True + for device in self.all_local_devices: + per_disk_stats = existing_data.get( + 'object_reconstruction_per_disk', {}).get(device, {}) + try: + start_time = per_disk_stats['object_reconstruction_last'] - \ + (per_disk_stats['object_reconstruction_time'] * 60) + finish_time = per_disk_stats['object_reconstruction_last'] + except KeyError: + all_devices_reporting = False + break + first_start = min(first_start, start_time) + last_finish = max(last_finish, finish_time) + if all_devices_reporting and last_finish > 0: + duration = last_finish - first_start + recon_update = { + 'object_reconstruction_time': duration / 60.0, + 'object_reconstruction_last': last_finish + } + else: + # if any current devices have not yet dropped stats, or the rcache + # file does not yet exist, we may still clear out per device stats + # for any devices that have been removed from local devices + recon_update = {} + found_devices = set(existing_data.get( + 'object_reconstruction_per_disk', {}).keys()) + clear_update = {d: {} for d in found_devices + if d not in self.all_local_devices} + if clear_update: + recon_update['object_reconstruction_per_disk'] = clear_update + dump_recon_cache(recon_update, self.rcache, self.logger) def load_object_ring(self, policy): """ @@ -888,38 +1016,37 @@ class ObjectReconstructor(Daemon): # return a list of jobs for this part return jobs - def collect_parts(self, override_devices=None, - override_partitions=None): - """ - Helper for getting partitions in the top level reconstructor - - In handoffs_only mode no primary partitions will not be included in the - returned (possibly empty) list. - """ - override_devices = override_devices or [] - override_partitions = override_partitions or [] + def get_policy2devices(self): ips = whataremyips(self.bind_ip) - ec_policies = (policy for policy in POLICIES - if policy.policy_type == EC_POLICY) - policy2devices = {} - - for policy in ec_policies: + for policy in self.policies: self.load_object_ring(policy) local_devices = list(six.moves.filter( lambda dev: dev and is_local_device( ips, self.port, dev['replication_ip'], dev['replication_port']), policy.object_ring.devs)) - - if override_devices: - local_devices = list(six.moves.filter( - lambda dev_info: dev_info['device'] in override_devices, - local_devices)) - policy2devices[policy] = local_devices - self.device_count += len(local_devices) + return policy2devices + def get_local_devices(self): + """Returns a set of all local devices in all EC policies.""" + policy2devices = self.get_policy2devices() + return reduce(set.union, ( + set(d['device'] for d in devices) + for devices in policy2devices.values())) + + def collect_parts(self, override_devices=None, override_partitions=None): + """ + Helper for getting partitions in the top level reconstructor + + In handoffs_only mode primary partitions will not be included in the + returned (possibly empty) list. + """ + override_devices = override_devices or [] + override_partitions = override_partitions or [] + + policy2devices = self.get_policy2devices() all_parts = [] for policy, local_devices in policy2devices.items(): @@ -938,6 +1065,10 @@ class ObjectReconstructor(Daemon): df_mgr = self._df_router[policy] for local_dev in local_devices: + if override_devices and ( + local_dev['device'] not in override_devices): + continue + self.device_count += 1 dev_path = df_mgr.get_dev_path(local_dev['device']) if not dev_path: self.logger.warning(_('%s is not mounted'), @@ -1088,22 +1219,48 @@ class ObjectReconstructor(Daemon): "You should disable handoffs_only once all nodes " "are reporting no handoffs remaining.")) + def final_recon_dump(self, total, override_devices=None, **kwargs): + """ + Add stats for this worker's run to recon cache. + + When in worker mode (per_disk_stats == True) this worker's stats are + added per device instead of in the top level keys (aggregation is + serialized in the parent process). + + :param total: the runtime of cycle in minutes + :param override_devices: (optional) list of device that are being + reconstructed + """ + recon_update = { + 'object_reconstruction_time': total, + 'object_reconstruction_last': time.time(), + } + + if self.reconstructor_workers > 0: + devices = override_devices or self.all_local_devices + recon_update['pid'] = os.getpid() + recon_update = {'object_reconstruction_per_disk': { + d: recon_update for d in devices}} + else: + # if not running in worker mode, kill any per_disk stats + recon_update['object_reconstruction_per_disk'] = {} + dump_recon_cache(recon_update, self.rcache, self.logger) + def run_once(self, *args, **kwargs): start = time.time() self.logger.info(_("Running object reconstructor in script mode.")) - override_devices = list_from_csv(kwargs.get('devices')) - override_partitions = [int(p) for p in - list_from_csv(kwargs.get('partitions'))] - self.reconstruct( - override_devices=override_devices, - override_partitions=override_partitions) + override_options = parse_override_options(once=True, **kwargs) + self.reconstruct(**override_options) total = (time.time() - start) / 60 self.logger.info( _("Object reconstruction complete (once). (%.02f minutes)"), total) - if not (override_partitions or override_devices): - dump_recon_cache({'object_reconstruction_time': total, - 'object_reconstruction_last': time.time()}, - self.rcache, self.logger) + # Only dump stats if they would actually be meaningful -- i.e. we're + # collecting per-disk stats and covering all partitions, or we're + # covering all partitions, all disks. + if not override_options['override_partitions'] and ( + self.reconstructor_workers > 0 or + not override_options['override_devices']): + self.final_recon_dump(total, **override_options) def run_forever(self, *args, **kwargs): self.logger.info(_("Starting object reconstructor in daemon mode.")) @@ -1111,14 +1268,13 @@ class ObjectReconstructor(Daemon): while True: start = time.time() self.logger.info(_("Starting object reconstruction pass.")) + override_options = parse_override_options(**kwargs) # Run the reconstructor - self.reconstruct() + self.reconstruct(**override_options) total = (time.time() - start) / 60 self.logger.info( _("Object reconstruction complete. (%.02f minutes)"), total) - dump_recon_cache({'object_reconstruction_time': total, - 'object_reconstruction_last': time.time()}, - self.rcache, self.logger) + self.final_recon_dump(total, **override_options) self.logger.debug('reconstruction sleeping for %s seconds.', self.interval) sleep(self.interval) diff --git a/test/unit/__init__.py b/test/unit/__init__.py index 6d970ec672..56d7cdedd0 100644 --- a/test/unit/__init__.py +++ b/test/unit/__init__.py @@ -30,6 +30,7 @@ from numbers import Number from tempfile import NamedTemporaryFile import time import eventlet +from eventlet import greenpool, debug as eventlet_debug from eventlet.green import socket from tempfile import mkdtemp from shutil import rmtree @@ -218,6 +219,14 @@ class FakeRing(Ring): self.set_replicas(replicas) self._reload() + def has_changed(self): + """ + The real implementation uses getmtime on the serialized_path attribute, + which doesn't exist on our fake and relies on the implementation of + _reload which we override. So ... just NOOPE. + """ + return False + def _reload(self): self._rtime = time.time() @@ -690,6 +699,16 @@ if utils.config_true_value( fake_syslog_handler() +@contextmanager +def quiet_eventlet_exceptions(): + orig_state = greenpool.DEBUG + eventlet_debug.hub_exceptions(False) + try: + yield + finally: + eventlet_debug.hub_exceptions(orig_state) + + class MockTrue(object): """ Instances of MockTrue evaluate like True diff --git a/test/unit/common/test_daemon.py b/test/unit/common/test_daemon.py index 24220c1729..c9ec89bc26 100644 --- a/test/unit/common/test_daemon.py +++ b/test/unit/common/test_daemon.py @@ -13,8 +13,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -# TODO(clayg): Test kill_children signal handlers - import os from six import StringIO from six.moves import reload_module @@ -25,15 +23,20 @@ import logging from test.unit import tmpfile import mock import signal +from contextlib import contextmanager +import itertools +from collections import defaultdict +import errno from swift.common import daemon, utils +from test.unit import debug_logger class MyDaemon(daemon.Daemon): def __init__(self, conf): self.conf = conf - self.logger = utils.get_logger(None, 'server', log_route='server') + self.logger = debug_logger('my-daemon') MyDaemon.forever_called = False MyDaemon.once_called = False @@ -63,6 +66,39 @@ class TestDaemon(unittest.TestCase): self.assertRaises(NotImplementedError, d.run_forever) +class MyWorkerDaemon(MyDaemon): + + def get_worker_args(self, once=False, **kwargs): + return [kwargs for i in range(int(self.conf.get('workers', 0)))] + + def is_healthy(self): + try: + return getattr(self, 'health_side_effects', []).pop(0) + except IndexError: + return True + + +class TestWorkerDaemon(unittest.TestCase): + + def test_stubs(self): + d = daemon.Daemon({}) + self.assertRaises(NotImplementedError, d.run_once) + self.assertRaises(NotImplementedError, d.run_forever) + self.assertEqual([], d.get_worker_args()) + self.assertEqual(True, d.is_healthy()) + + def test_my_worker_daemon(self): + d = MyWorkerDaemon({}) + self.assertEqual([], d.get_worker_args()) + self.assertTrue(d.is_healthy()) + d = MyWorkerDaemon({'workers': '3'}) + self.assertEqual([{'key': 'val'}] * 3, d.get_worker_args(key='val')) + d.health_side_effects = [True, False] + self.assertTrue(d.is_healthy()) + self.assertFalse(d.is_healthy()) + self.assertTrue(d.is_healthy()) + + class TestRunDaemon(unittest.TestCase): def setUp(self): @@ -89,7 +125,7 @@ class TestRunDaemon(unittest.TestCase): d = MyDaemon({}) with mock.patch('swift.common.daemon.signal') as mock_signal: mock_signal.SIGTERM = signal.SIGTERM - d.run() + daemon.DaemonStrategy(d, d.logger).run() signal_args, kwargs = mock_signal.signal.call_args sig, func = signal_args self.assertEqual(sig, signal.SIGTERM) @@ -158,6 +194,169 @@ class TestRunDaemon(unittest.TestCase): os.environ['TZ'] = old_tz time.tzset() + @contextmanager + def mock_os(self, child_worker_cycles=3): + self.waitpid_calls = defaultdict(int) + + def mock_waitpid(p, *args): + self.waitpid_calls[p] += 1 + if self.waitpid_calls[p] >= child_worker_cycles: + rv = p + else: + rv = 0 + return rv, 0 + with mock.patch('swift.common.daemon.os.fork') as mock_fork, \ + mock.patch('swift.common.daemon.os.waitpid', mock_waitpid), \ + mock.patch('swift.common.daemon.os.kill') as mock_kill: + mock_fork.side_effect = ( + 'mock-pid-%s' % i for i in itertools.count()) + self.mock_fork = mock_fork + self.mock_kill = mock_kill + yield + + def test_fork_workers(self): + d = MyWorkerDaemon({'workers': 3}) + strategy = daemon.DaemonStrategy(d, d.logger) + with self.mock_os(): + strategy.run(once=True) + self.assertEqual([mock.call()] * 3, self.mock_fork.call_args_list) + self.assertEqual(self.waitpid_calls, { + 'mock-pid-0': 3, + 'mock-pid-1': 3, + 'mock-pid-2': 3, + }) + self.assertEqual([], self.mock_kill.call_args_list) + self.assertIn('Finished', d.logger.get_lines_for_level('notice')[-1]) + + def test_forked_worker(self): + d = MyWorkerDaemon({'workers': 3}) + strategy = daemon.DaemonStrategy(d, d.logger) + with mock.patch('swift.common.daemon.os.fork') as mock_fork, \ + mock.patch('swift.common.daemon.os._exit') as mock_exit: + mock_fork.return_value = 0 + mock_exit.side_effect = SystemExit + self.assertRaises(SystemExit, strategy.run, once=True) + self.assertTrue(d.once_called) + + def test_restart_workers(self): + d = MyWorkerDaemon({'workers': 3}) + strategy = daemon.DaemonStrategy(d, d.logger) + d.health_side_effects = [True, False] + with self.mock_os(): + self.mock_kill.side_effect = lambda *args, **kwargs: setattr( + strategy, 'running', False) + strategy.run() + # six workers forked in total + self.assertEqual([mock.call()] * 6, self.mock_fork.call_args_list) + # since the daemon starts healthy, first pass checks children once + self.assertEqual(self.waitpid_calls, { + 'mock-pid-0': 1, + 'mock-pid-1': 1, + 'mock-pid-2': 1, + }) + # second pass is not healthy, original pid's killed + self.assertEqual(set([ + ('mock-pid-0', signal.SIGTERM), + ('mock-pid-1', signal.SIGTERM), + ('mock-pid-2', signal.SIGTERM), + ]), set(c[0] for c in self.mock_kill.call_args_list[:3])) + # our mock_kill side effect breaks out of running, and cleanup kills + # remaining pids + self.assertEqual(set([ + ('mock-pid-3', signal.SIGTERM), + ('mock-pid-4', signal.SIGTERM), + ('mock-pid-5', signal.SIGTERM), + ]), set(c[0] for c in self.mock_kill.call_args_list[3:])) + + def test_worker_disappears(self): + d = MyWorkerDaemon({'workers': 3}) + strategy = daemon.DaemonStrategy(d, d.logger) + strategy.register_worker_start('mock-pid', {'mock_options': True}) + self.assertEqual(strategy.unspawned_worker_options, []) + self.assertEqual(strategy.options_by_pid, { + 'mock-pid': {'mock_options': True} + }) + # still running + with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid: + mock_waitpid.return_value = (0, 0) + strategy.check_on_all_running_workers() + self.assertEqual(strategy.unspawned_worker_options, []) + self.assertEqual(strategy.options_by_pid, { + 'mock-pid': {'mock_options': True} + }) + # finished + strategy = daemon.DaemonStrategy(d, d.logger) + strategy.register_worker_start('mock-pid', {'mock_options': True}) + with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid: + mock_waitpid.return_value = ('mock-pid', 0) + strategy.check_on_all_running_workers() + self.assertEqual(strategy.unspawned_worker_options, [ + {'mock_options': True}]) + self.assertEqual(strategy.options_by_pid, {}) + self.assertEqual(d.logger.get_lines_for_level('debug')[-1], + 'Worker mock-pid exited') + # disappeared + strategy = daemon.DaemonStrategy(d, d.logger) + strategy.register_worker_start('mock-pid', {'mock_options': True}) + with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid: + mock_waitpid.side_effect = OSError( + errno.ECHILD, os.strerror(errno.ECHILD)) + mock_waitpid.return_value = ('mock-pid', 0) + strategy.check_on_all_running_workers() + self.assertEqual(strategy.unspawned_worker_options, [ + {'mock_options': True}]) + self.assertEqual(strategy.options_by_pid, {}) + self.assertEqual(d.logger.get_lines_for_level('notice')[-1], + 'Worker mock-pid died') + + def test_worker_kills_pids_in_cleanup(self): + d = MyWorkerDaemon({'workers': 2}) + strategy = daemon.DaemonStrategy(d, d.logger) + strategy.register_worker_start('mock-pid-1', {'mock_options': True}) + strategy.register_worker_start('mock-pid-2', {'mock_options': True}) + self.assertEqual(strategy.unspawned_worker_options, []) + self.assertEqual(strategy.options_by_pid, { + 'mock-pid-1': {'mock_options': True}, + 'mock-pid-2': {'mock_options': True}, + }) + with mock.patch('swift.common.daemon.os.kill') as mock_kill: + strategy.cleanup() + self.assertEqual(strategy.unspawned_worker_options, [ + {'mock_options': True}] * 2) + self.assertEqual(strategy.options_by_pid, {}) + self.assertEqual(set([ + ('mock-pid-1', signal.SIGTERM), + ('mock-pid-2', signal.SIGTERM), + ]), set(c[0] for c in mock_kill.call_args_list)) + self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]), + set(['Cleaned up worker mock-pid-1', + 'Cleaned up worker mock-pid-2'])) + + def test_worker_disappears_in_cleanup(self): + d = MyWorkerDaemon({'workers': 2}) + strategy = daemon.DaemonStrategy(d, d.logger) + strategy.register_worker_start('mock-pid-1', {'mock_options': True}) + strategy.register_worker_start('mock-pid-2', {'mock_options': True}) + self.assertEqual(strategy.unspawned_worker_options, []) + self.assertEqual(strategy.options_by_pid, { + 'mock-pid-1': {'mock_options': True}, + 'mock-pid-2': {'mock_options': True}, + }) + with mock.patch('swift.common.daemon.os.kill') as mock_kill: + mock_kill.side_effect = [None, OSError(errno.ECHILD, + os.strerror(errno.ECHILD))] + strategy.cleanup() + self.assertEqual(strategy.unspawned_worker_options, [ + {'mock_options': True}] * 2) + self.assertEqual(strategy.options_by_pid, {}) + self.assertEqual(set([ + ('mock-pid-1', signal.SIGTERM), + ('mock-pid-2', signal.SIGTERM), + ]), set(c[0] for c in mock_kill.call_args_list)) + self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]), + set(['Cleaned up worker mock-pid-1', + 'Cleaned up worker mock-pid-2'])) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 6cb649bbc5..f71ff2c1be 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -26,7 +26,7 @@ import re import random import struct import collections -from eventlet import Timeout, sleep +from eventlet import Timeout, sleep, spawn from contextlib import closing, contextmanager from gzip import GzipFile @@ -35,6 +35,7 @@ from six.moves.urllib.parse import unquote from swift.common import utils from swift.common.exceptions import DiskFileError from swift.common.header_key_dict import HeaderKeyDict +from swift.common.utils import dump_recon_cache from swift.obj import diskfile, reconstructor as object_reconstructor from swift.common import ring from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy, @@ -43,7 +44,8 @@ from swift.obj.reconstructor import REVERT from test.unit import (patch_policies, debug_logger, mocked_http_conn, FabricatedRing, make_timestamp_iter, - DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies) + DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies, + quiet_eventlet_exceptions) from test.unit.obj.common import write_diskfile @@ -1343,6 +1345,931 @@ class TestGlobalSetupObjectReconstructorLegacyDurable( legacy_durable = True +@patch_policies(with_ec_default=True) +class TestWorkerReconstructor(unittest.TestCase): + + maxDiff = None + + def setUp(self): + super(TestWorkerReconstructor, self).setUp() + self.logger = debug_logger() + self.testdir = tempfile.mkdtemp() + self.recon_cache_path = os.path.join(self.testdir, 'recon') + self.rcache = os.path.join(self.recon_cache_path, 'object.recon') + # dump_recon_cache expects recon_cache_path to exist + os.mkdir(self.recon_cache_path) + + def tearDown(self): + super(TestWorkerReconstructor, self).tearDown() + shutil.rmtree(self.testdir) + + def test_no_workers_by_default(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {}, logger=self.logger) + self.assertEqual(0, reconstructor.reconstructor_workers) + self.assertEqual(0, len(list(reconstructor.get_worker_args()))) + + def test_bad_value_workers(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '-1'}, logger=self.logger) + self.assertEqual(-1, reconstructor.reconstructor_workers) + self.assertEqual(0, len(list(reconstructor.get_worker_args()))) + + def test_workers_with_no_devices(self): + def do_test(num_workers): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': num_workers}, logger=self.logger) + self.assertEqual(num_workers, reconstructor.reconstructor_workers) + self.assertEqual(1, len(list(reconstructor.get_worker_args()))) + self.assertEqual([ + {'override_partitions': [], 'override_devices': []}, + ], list(reconstructor.get_worker_args())) + do_test(1) + do_test(10) + + def test_workers_with_devices_and_no_valid_overrides(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sdb', 'sdc'] + self.assertEqual(2, reconstructor.reconstructor_workers) + # N.B. sdz is not in local_devices so there are no devices to process + # but still expect a single worker process + worker_args = list(reconstructor.get_worker_args( + once=True, devices='sdz')) + self.assertEqual(1, len(worker_args)) + self.assertEqual([{'override_partitions': [], + 'override_devices': ['sdz']}], + worker_args) + # overrides are ignored in forever mode + worker_args = list(reconstructor.get_worker_args( + once=False, devices='sdz')) + self.assertEqual(2, len(worker_args)) + self.assertEqual([ + {'override_partitions': [], 'override_devices': ['sdb']}, + {'override_partitions': [], 'override_devices': ['sdc']} + ], worker_args) + + def test_workers_with_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sdb', 'sdc'] + self.assertEqual(2, reconstructor.reconstructor_workers) + self.assertEqual(2, len(list(reconstructor.get_worker_args()))) + expected = [ + {'override_partitions': [], 'override_devices': ['sdb']}, + {'override_partitions': [], 'override_devices': ['sdc']}, + ] + worker_args = list(reconstructor.get_worker_args(once=False)) + self.assertEqual(2, len(worker_args)) + self.assertEqual(expected, worker_args) + worker_args = list(reconstructor.get_worker_args(once=True)) + self.assertEqual(2, len(worker_args)) + self.assertEqual(expected, worker_args) + + def test_workers_with_devices_and_overrides(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sdb', 'sdc'] + self.assertEqual(2, reconstructor.reconstructor_workers) + # check we don't get more workers than override devices... + # N.B. sdz is not in local_devices so should be ignored for the + # purposes of generating workers + worker_args = list(reconstructor.get_worker_args( + once=True, devices='sdb,sdz', partitions='99,333')) + self.assertEqual(1, len(worker_args)) + self.assertEqual( + [{'override_partitions': [99, 333], 'override_devices': ['sdb']}], + worker_args) + # overrides are ignored in forever mode + worker_args = list(reconstructor.get_worker_args( + once=False, devices='sdb,sdz', partitions='99,333')) + self.assertEqual(2, len(worker_args)) + self.assertEqual([ + {'override_partitions': [], 'override_devices': ['sdb']}, + {'override_partitions': [], 'override_devices': ['sdc']} + ], worker_args) + + def test_workers_with_lots_of_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: [ + 'sdb', 'sdc', 'sdd', 'sde', 'sdf'] + self.assertEqual(2, reconstructor.reconstructor_workers) + self.assertEqual(2, len(list(reconstructor.get_worker_args()))) + self.assertEqual([ + {'override_partitions': [], 'override_devices': [ + 'sdb', 'sdd', 'sdf']}, + {'override_partitions': [], 'override_devices': [ + 'sdc', 'sde']}, + ], list(reconstructor.get_worker_args())) + + def test_workers_with_lots_of_devices_and_overrides(self): + # check that override devices get distributed across workers + # in similar fashion to all devices + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '2'}, logger=self.logger) + reconstructor.get_local_devices = lambda: [ + 'sdb', 'sdc', 'sdd', 'sde', 'sdf'] + self.assertEqual(2, reconstructor.reconstructor_workers) + worker_args = list(reconstructor.get_worker_args( + once=True, devices='sdb,sdd,sdf', partitions='99,333')) + self.assertEqual(1, len(worker_args)) + # 5 devices in total, 2 workers -> up to 3 devices per worker so a + # single worker should handle the requested override devices + self.assertEqual([ + {'override_partitions': [99, 333], 'override_devices': [ + 'sdb', 'sdd', 'sdf']}, + ], worker_args) + + # with 4 override devices, expect 2 per worker + worker_args = list(reconstructor.get_worker_args( + once=True, devices='sdb,sdc,sdd,sdf', partitions='99,333')) + self.assertEqual(2, len(worker_args)) + self.assertEqual([ + {'override_partitions': [99, 333], 'override_devices': [ + 'sdb', 'sdd']}, + {'override_partitions': [99, 333], 'override_devices': [ + 'sdc', 'sdf']}, + ], worker_args) + + def test_workers_with_lots_of_workers(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '10'}, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sdb', 'sdc'] + self.assertEqual(10, reconstructor.reconstructor_workers) + self.assertEqual(2, len(list(reconstructor.get_worker_args()))) + self.assertEqual([ + {'override_partitions': [], 'override_devices': ['sdb']}, + {'override_partitions': [], 'override_devices': ['sdc']}, + ], list(reconstructor.get_worker_args())) + + def test_workers_with_lots_of_workers_and_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'reconstructor_workers': '10'}, logger=self.logger) + reconstructor.get_local_devices = lambda: [ + 'sdb', 'sdc', 'sdd', 'sde', 'sdf'] + self.assertEqual(10, reconstructor.reconstructor_workers) + self.assertEqual(5, len(list(reconstructor.get_worker_args()))) + self.assertEqual([ + {'override_partitions': [], 'override_devices': ['sdb']}, + {'override_partitions': [], 'override_devices': ['sdc']}, + {'override_partitions': [], 'override_devices': ['sdd']}, + {'override_partitions': [], 'override_devices': ['sde']}, + {'override_partitions': [], 'override_devices': ['sdf']}, + ], list(reconstructor.get_worker_args())) + + def test_workers_with_some_workers_and_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {}, logger=self.logger) + reconstructor.get_local_devices = lambda: [ + 'd%s' % (i + 1) for i in range(21)] + # ... with many devices per worker, worker count is pretty granular + for i in range(1, 8): + reconstructor.reconstructor_workers = i + self.assertEqual(i, len(list(reconstructor.get_worker_args()))) + # ... then it gets sorta stair step + for i in range(9, 10): + reconstructor.reconstructor_workers = i + self.assertEqual(7, len(list(reconstructor.get_worker_args()))) + # 2-3 devices per worker + for args in reconstructor.get_worker_args(): + self.assertIn(len(args['override_devices']), (2, 3)) + for i in range(11, 20): + reconstructor.reconstructor_workers = i + self.assertEqual(11, len(list(reconstructor.get_worker_args()))) + # 1, 2 devices per worker + for args in reconstructor.get_worker_args(): + self.assertIn(len(args['override_devices']), (1, 2)) + # this is debatable, but maybe I'll argue if you're going to have + # *some* workers with > 1 device, it's better to have fewer workers + # with devices spread out evenly than a couple outliers? + self.assertEqual([ + {'override_partitions': [], 'override_devices': ['d1', 'd12']}, + {'override_partitions': [], 'override_devices': ['d2', 'd13']}, + {'override_partitions': [], 'override_devices': ['d3', 'd14']}, + {'override_partitions': [], 'override_devices': ['d4', 'd15']}, + {'override_partitions': [], 'override_devices': ['d5', 'd16']}, + {'override_partitions': [], 'override_devices': ['d6', 'd17']}, + {'override_partitions': [], 'override_devices': ['d7', 'd18']}, + {'override_partitions': [], 'override_devices': ['d8', 'd19']}, + {'override_partitions': [], 'override_devices': ['d9', 'd20']}, + {'override_partitions': [], 'override_devices': ['d10', 'd21']}, + {'override_partitions': [], 'override_devices': ['d11']}, + ], list(reconstructor.get_worker_args())) + # you can't get < than 1 device per worker + for i in range(21, 52): + reconstructor.reconstructor_workers = i + self.assertEqual(21, len(list(reconstructor.get_worker_args()))) + for args in reconstructor.get_worker_args(): + self.assertEqual(1, len(args['override_devices'])) + + def test_next_rcache_update_configured_with_stats_interval(self): + now = time.time() + with mock.patch('swift.obj.reconstructor.time.time', return_value=now): + reconstructor = object_reconstructor.ObjectReconstructor( + {}, logger=self.logger) + self.assertEqual(now + 300, reconstructor._next_rcache_update) + reconstructor = object_reconstructor.ObjectReconstructor( + {'stats_interval': '30'}, logger=self.logger) + self.assertEqual(now + 30, reconstructor._next_rcache_update) + + def test_is_healthy_rcache_update_waits_for_next_update(self): + now = time.time() + with mock.patch('swift.obj.reconstructor.time.time', return_value=now): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path}, + logger=self.logger) + # file does not exist to start + self.assertFalse(os.path.exists(self.rcache)) + self.assertTrue(reconstructor.is_healthy()) + # ... and isn't created until _next_rcache_update + self.assertFalse(os.path.exists(self.rcache)) + # ... but if we wait 5 mins (by default) + orig_next_update = reconstructor._next_rcache_update + with mock.patch('swift.obj.reconstructor.time.time', + return_value=now + 301): + self.assertTrue(reconstructor.is_healthy()) + self.assertGreater(reconstructor._next_rcache_update, orig_next_update) + # ... it will be created + self.assertTrue(os.path.exists(self.rcache)) + with open(self.rcache) as f: + data = json.load(f) + # and empty + self.assertEqual({}, data) + + def test_is_healthy_ring_update_next_check(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path}, + logger=self.logger) + self.assertTrue(reconstructor.is_healthy()) + reconstructor.get_local_devices = lambda: { + 'sdb%d' % p for p in reconstructor.policies} + self.assertFalse(reconstructor.is_healthy()) + reconstructor.all_local_devices = { + 'sdb%d' % p for p in reconstructor.policies} + self.assertTrue(reconstructor.is_healthy()) + + def test_final_recon_dump(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path}, + logger=self.logger) + reconstructor.all_local_devices = ['sda', 'sdc'] + total = 12.0 + now = time.time() + with mock.patch('swift.obj.reconstructor.time.time', return_value=now): + reconstructor.final_recon_dump(total) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': now, + 'object_reconstruction_time': total, + }, data) + total = 14.0 + now += total * 60 + with mock.patch('swift.obj.reconstructor.time.time', return_value=now): + reconstructor.final_recon_dump(total) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': now, + 'object_reconstruction_time': total, + }, data) + # per_disk_stats with workers + reconstructor.reconstructor_workers = 1 + old_total = total + total = 16.0 + before = now + now += total * 60 + with mock.patch('swift.obj.reconstructor.time.time', + return_value=now), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-1'): + reconstructor.final_recon_dump(total, override_devices=[ + 'sda', 'sdc']) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': before, + 'object_reconstruction_time': old_total, + 'object_reconstruction_per_disk': { + 'sda': { + 'object_reconstruction_last': now, + 'object_reconstruction_time': total, + 'pid': 'pid-1', + }, + 'sdc': { + 'object_reconstruction_last': now, + 'object_reconstruction_time': total, + 'pid': 'pid-1', + }, + + }, + }, data) + # and without workers we clear it out + reconstructor.reconstructor_workers = 0 + total = 18.0 + now += total * 60 + with mock.patch('swift.obj.reconstructor.time.time', return_value=now): + reconstructor.final_recon_dump(total) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': now, + 'object_reconstruction_time': total, + }, data) + + def test_dump_recon_run_once_inline(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path}, + logger=self.logger) + reconstructor.reconstruct = mock.MagicMock() + now = time.time() + later = now + 300 # 5 mins + with mock.patch('swift.obj.reconstructor.time.time', side_effect=[ + now, later, later]): + reconstructor.run_once() + # no override args passed to reconstruct + self.assertEqual([mock.call( + override_devices=[], + override_partitions=[] + )], reconstructor.reconstruct.call_args_list) + # script mode with no override args, we expect recon dumps + self.assertTrue(os.path.exists(self.rcache)) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + }, data) + total = 10.0 + later += total * 60 + with mock.patch('swift.obj.reconstructor.time.time', + return_value=later): + reconstructor.final_recon_dump(total) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': later, + 'object_reconstruction_time': 10.0, + }, data) + + def test_dump_recon_run_once_in_worker(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path, + 'reconstructor_workers': 1}, + logger=self.logger) + reconstructor.get_local_devices = lambda: {'sda'} + now = time.time() + later = now + 300 # 5 mins + + def do_test(run_kwargs, expected_device): + # get the actual kwargs that would be passed to run_once in a + # worker + run_once_kwargs = list( + reconstructor.get_worker_args(once=True, **run_kwargs))[0] + reconstructor.reconstruct = mock.MagicMock() + with mock.patch('swift.obj.reconstructor.time.time', + side_effect=[now, later, later]): + reconstructor.run_once(**run_once_kwargs) + self.assertEqual([mock.call( + override_devices=[expected_device], + override_partitions=[] + )], reconstructor.reconstruct.call_args_list) + self.assertTrue(os.path.exists(self.rcache)) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + # no aggregate is written but perhaps it should be, in which + # case this assertion will need to change + 'object_reconstruction_per_disk': { + expected_device: { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': mock.ANY + } + } + }, data) + + # script mode with no CLI override args, we expect recon dumps + do_test({}, 'sda') + # script mode *with* CLI override devices, we expect recon dumps + os.unlink(self.rcache) + do_test(dict(devices='sda'), 'sda') + # if the override device is not in local devices we still get + # a recon dump, but it'll get cleaned up in the next aggregation + os.unlink(self.rcache) + do_test(dict(devices='sdz'), 'sdz') + + # now disable workers and check that inline run_once updates rcache + # and clears out per disk stats + now = time.time() + later = now + 600 # 10 mins + reconstructor.reconstructor_workers = 0 + with mock.patch('swift.obj.reconstructor.time.time', + side_effect=[now, later, later]): + reconstructor.run_once() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': later, + 'object_reconstruction_time': 10.0, + }, data) + + def test_no_dump_recon_run_once(self): + reconstructor = object_reconstructor.ObjectReconstructor( + {'recon_cache_path': self.recon_cache_path}, + logger=self.logger) + reconstructor.get_local_devices = lambda: {'sda', 'sdb', 'sdc'} + + def do_test(run_once_kwargs, expected_devices, expected_partitions): + reconstructor.reconstruct = mock.MagicMock() + now = time.time() + later = now + 300 # 5 mins + with mock.patch('swift.obj.reconstructor.time.time', side_effect=[ + now, later, later]): + reconstructor.run_once(**run_once_kwargs) + # override args passed to reconstruct + actual_calls = reconstructor.reconstruct.call_args_list + self.assertEqual({'override_devices', 'override_partitions'}, + set(actual_calls[0][1])) + self.assertEqual(sorted(expected_devices), + sorted(actual_calls[0][1]['override_devices'])) + self.assertEqual(sorted(expected_partitions), + sorted(actual_calls[0][1]['override_partitions'])) + self.assertFalse(actual_calls[1:]) + self.assertEqual(False, os.path.exists(self.rcache)) + + # inline mode with overrides never does recon dump + reconstructor.reconstructor_workers = 0 + kwargs = {'devices': 'sda,sdb'} + do_test(kwargs, ['sda', 'sdb'], []) + + # Have partition override, so no recon dump + kwargs = {'partitions': '1,2,3'} + do_test(kwargs, [], [1, 2, 3]) + reconstructor.reconstructor_workers = 1 + worker_kwargs = list( + reconstructor.get_worker_args(once=True, **kwargs))[0] + do_test(worker_kwargs, ['sda', 'sdb', 'sdc'], [1, 2, 3]) + + reconstructor.reconstructor_workers = 0 + kwargs = {'devices': 'sda,sdb', 'partitions': '1,2,3'} + do_test(kwargs, ['sda', 'sdb'], [1, 2, 3]) + reconstructor.reconstructor_workers = 1 + worker_kwargs = list( + reconstructor.get_worker_args(once=True, **kwargs))[0] + do_test(worker_kwargs, ['sda', 'sdb'], [1, 2, 3]) + + # 'sdz' is not in local devices + reconstructor.reconstructor_workers = 0 + kwargs = {'devices': 'sdz'} + do_test(kwargs, ['sdz'], []) + + def test_run_forever_recon_aggregation(self): + + class StopForever(Exception): + pass + + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 2, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + reconstructor.get_local_devices = lambda: ['sda', 'sdb', 'sdc', 'sdd'] + reconstructor.reconstruct = mock.MagicMock() + now = time.time() + later = now + 300 # 5 mins + worker_args = list( + # include 'devices' kwarg as a sanity check - it should be ignored + # in run_forever mode + reconstructor.get_worker_args(once=False, devices='sda')) + with mock.patch('swift.obj.reconstructor.time.time', + side_effect=[now, later, later]), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-1'), \ + mock.patch('swift.obj.reconstructor.sleep', + side_effect=[StopForever]), \ + Timeout(.3), quiet_eventlet_exceptions(), \ + self.assertRaises(StopForever): + gt = spawn(reconstructor.run_forever, **worker_args[0]) + gt.wait() + # override args are passed to reconstruct + self.assertEqual([mock.call( + override_devices=['sda', 'sdc'], + override_partitions=[] + )], reconstructor.reconstruct.call_args_list) + # forever mode with override args, we expect per-disk recon dumps + self.assertTrue(os.path.exists(self.rcache)) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'sda': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + 'sdc': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + } + }, data) + reconstructor.reconstruct.reset_mock() + # another worker would get *different* disks + before = now = later + later = now + 300 # 5 more minutes + with mock.patch('swift.obj.reconstructor.time.time', + side_effect=[now, later, later]), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-2'), \ + mock.patch('swift.obj.reconstructor.sleep', + side_effect=[StopForever]), \ + Timeout(.3), quiet_eventlet_exceptions(), \ + self.assertRaises(StopForever): + gt = spawn(reconstructor.run_forever, **worker_args[1]) + gt.wait() + # override args are parsed + self.assertEqual([mock.call( + override_devices=['sdb', 'sdd'], + override_partitions=[] + )], reconstructor.reconstruct.call_args_list) + # forever mode with override args, we expect per-disk recon dumps + self.assertTrue(os.path.exists(self.rcache)) + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'sda': { + 'object_reconstruction_last': before, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + 'sdb': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-2', + }, + 'sdc': { + 'object_reconstruction_last': before, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + 'sdd': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-2', + }, + } + }, data) + + # aggregation is done in the parent thread even later + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': later, + 'object_reconstruction_time': 10.0, + 'object_reconstruction_per_disk': { + 'sda': { + 'object_reconstruction_last': before, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + 'sdb': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-2', + }, + 'sdc': { + 'object_reconstruction_last': before, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-1', + }, + 'sdd': { + 'object_reconstruction_last': later, + 'object_reconstruction_time': 5.0, + 'pid': 'pid-2', + }, + } + }, data) + + def test_recon_aggregation_waits_for_all_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 2, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + reconstructor.all_local_devices = set([ + 'd0', 'd1', 'd2', 'd3', + # unreported device definitely matters + 'd4']) + start = time.time() - 1000 + for i in range(4): + with mock.patch('swift.obj.reconstructor.time.time', + return_value=start + (300 * i)), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-%s' % i): + reconstructor.final_recon_dump( + i, override_devices=['d%s' % i]) + # sanity + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 0.0, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + 'd3': { + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 3, + 'pid': 'pid-3', + }, + } + }, data) + + # unreported device d4 prevents aggregation + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertNotIn('object_reconstruction_last', data) + self.assertNotIn('object_reconstruction_time', data) + self.assertEqual(set(['d0', 'd1', 'd2', 'd3']), + set(data['object_reconstruction_per_disk'].keys())) + + # it's idempotent + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertNotIn('object_reconstruction_last', data) + self.assertNotIn('object_reconstruction_time', data) + self.assertEqual(set(['d0', 'd1', 'd2', 'd3']), + set(data['object_reconstruction_per_disk'].keys())) + + # remove d4, we no longer wait on it for aggregation + reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3']) + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual(start + 900, data['object_reconstruction_last']) + self.assertEqual(15, data['object_reconstruction_time']) + self.assertEqual(set(['d0', 'd1', 'd2', 'd3']), + set(data['object_reconstruction_per_disk'].keys())) + + def test_recon_aggregation_removes_devices(self): + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 2, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3']) + start = time.time() - 1000 + for i in range(4): + with mock.patch('swift.obj.reconstructor.time.time', + return_value=start + (300 * i)), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-%s' % i): + reconstructor.final_recon_dump( + i, override_devices=['d%s' % i]) + # sanity + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 0.0, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + 'd3': { + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 3, + 'pid': 'pid-3', + }, + } + }, data) + + reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3']) + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual(start + 900, data['object_reconstruction_last']) + self.assertEqual(15, data['object_reconstruction_time']) + self.assertEqual(set(['d0', 'd1', 'd2', 'd3']), + set(data['object_reconstruction_per_disk'].keys())) + + # it's idempotent + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 15, + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 0.0, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + 'd3': { + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 3, + 'pid': 'pid-3', + }, + } + }, data) + + # if a device is removed from the ring + reconstructor.all_local_devices = set(['d1', 'd2', 'd3']) + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + # ... it's per-disk stats are removed (d0) + self.assertEqual({ + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 11, + 'object_reconstruction_per_disk': { + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + 'd3': { + 'object_reconstruction_last': start + 900, + 'object_reconstruction_time': 3, + 'pid': 'pid-3', + }, + } + }, data) + + # which can affect the aggregates! + reconstructor.all_local_devices = set(['d1', 'd2']) + reconstructor.aggregate_recon_update() + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 6, + 'object_reconstruction_per_disk': { + 'd1': { + 'object_reconstruction_last': start + 300, + 'object_reconstruction_time': 1, + 'pid': 'pid-1', + }, + 'd2': { + 'object_reconstruction_last': start + 600, + 'object_reconstruction_time': 2, + 'pid': 'pid-2', + }, + } + }, data) + + def test_recon_aggregation_races_with_final_recon_dump(self): + reconstructor = object_reconstructor.ObjectReconstructor({ + 'reconstructor_workers': 2, + 'recon_cache_path': self.recon_cache_path + }, logger=self.logger) + reconstructor.all_local_devices = set(['d0', 'd1']) + start = time.time() - 1000 + # first worker dumps to recon cache + with mock.patch('swift.obj.reconstructor.time.time', + return_value=start), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-0'): + reconstructor.final_recon_dump( + 1, override_devices=['d0']) + # sanity + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 1, + 'pid': 'pid-0', + }, + } + }, data) + + # simulate a second worker concurrently dumping to recon cache while + # parent is aggregatng existing results; mock dump_recon_cache as a + # convenient way to interrupt parent aggregate_recon_update and 'pass + # control' to second worker + updated_data = [] # state of recon cache just after second worker dump + + def simulate_other_process_final_recon_dump(): + with mock.patch('swift.obj.reconstructor.time.time', + return_value=start + 999), \ + mock.patch('swift.obj.reconstructor.os.getpid', + return_value='pid-1'): + reconstructor.final_recon_dump( + 1000, override_devices=['d1']) + with open(self.rcache) as f: + updated_data.append(json.load(f)) + + def fake_dump_recon_cache(*args, **kwargs): + # temporarily put back real dump_recon_cache + with mock.patch('swift.obj.reconstructor.dump_recon_cache', + dump_recon_cache): + simulate_other_process_final_recon_dump() + # and now proceed with parent dump_recon_cache + dump_recon_cache(*args, **kwargs) + + reconstructor.dump_recon_cache = fake_dump_recon_cache + with mock.patch('swift.obj.reconstructor.dump_recon_cache', + fake_dump_recon_cache): + reconstructor.aggregate_recon_update() + + self.assertEqual([{ # sanity check - second process did dump its data + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 1, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 999, + 'object_reconstruction_time': 1000, + 'pid': 'pid-1', + }, + } + }], updated_data) + + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 1, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 999, + 'object_reconstruction_time': 1000, + 'pid': 'pid-1', + }, + } + }, data) + + # next aggregation will find d1 stats + reconstructor.aggregate_recon_update() + + with open(self.rcache) as f: + data = json.load(f) + self.assertEqual({ + 'object_reconstruction_last': start + 999, + 'object_reconstruction_time': 1000, + 'object_reconstruction_per_disk': { + 'd0': { + 'object_reconstruction_last': start, + 'object_reconstruction_time': 1, + 'pid': 'pid-0', + }, + 'd1': { + 'object_reconstruction_last': start + 999, + 'object_reconstruction_time': 1000, + 'pid': 'pid-1', + }, + } + }, data) + + @patch_policies(with_ec_default=True) class BaseTestObjectReconstructor(unittest.TestCase): def setUp(self):