Add multiple worker processes strategy to reconstructor

This change adds a new Strategy concept to the daemon module similar to
how we manage WSGI workers.  We need to leverage multiple python
processes to get the concurrency properties we need.  More workers will
rebalance much faster on dense chassis with many devices.

Currently the default is still only one process, and no workers.  Set
reconstructor_workers in the [object-reconstructor] section to some
whole number <= the number of devices on a node to get that many
reconstructor workers.

Each worker will operate on a different subset of disks.

Once mode works as before, but tends to want to update recon drops a
little bit more.

If you change the rings, the strategy will shutdown workers and spawn
new ones.

You can kill the worker pids and the daemon strategy will respawn them.

New per-disk reconstructor stats are dumped to recon under the
object_reconstruction_per_disk key.  To maintain legacy compatibility
and replication monitoring based on cycle times they are aggregated
every stats_interval (default 5 mins).

Change-Id: I28925a37f3985c9082b5a06e76af4dc3ec813abe
This commit is contained in:
Clay Gerrard 2017-06-02 17:47:25 -07:00 committed by Tim Burke
parent d40c9ed3a2
commit 701a172afa
8 changed files with 1559 additions and 57 deletions

View File

@ -811,8 +811,18 @@ daemonize yes Whether or not to run
reconstruction as a daemon
interval 30 Time in seconds to wait between
reconstruction passes
reconstructor_workers 0 Maximum number of worker processes
to spawn. Each worker will handle
a subset of devices. Devices will
be assigned evenly among the workers
so that workers cycle at similar
intervals (which can lead to fewer
workers than requested). You can not
have more workers than devices. If
you have no devices only a single
worker is spawned.
concurrency 1 Number of reconstruction threads to
spawn.
spawn per reconstructor process.
stats_interval 300 Interval in seconds between
logging reconstruction statistics
handoffs_only false The handoffs_only mode option is for

View File

@ -314,6 +314,13 @@ use = egg:swift#recon
# run_pause is deprecated, use interval instead
# run_pause = 30
#
# Maximum number of worker processes to spawn. Each worker will handle a
# subset of devices. Devices will be assigned evenly among the workers so that
# workers cycle at similar intervals (which can lead to fewer workers than
# requested). You can not have more workers than devices. If you have no
# devices only a single worker is spawned.
# reconstructor_workers = 0
#
# concurrency = 1
# stats_interval = 300
# node_timeout = 10

View File

@ -13,6 +13,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
import errno
import os
import sys
import time
@ -25,7 +26,24 @@ from swift.common import utils
class Daemon(object):
"""Daemon base class"""
"""
Daemon base class
A daemon has a run method that accepts a ``once`` kwarg and will dispatch
to :meth:`run_once` or :meth:`run_forever`.
A subclass of Daemon must implement :meth:`run_once` and
:meth:`run_forever`.
A subclass of Daemon may override :meth:`get_worker_args` to dispatch
arguments to individual child process workers and :meth:`is_healthy` to
perform context specific periodic wellness checks which can reset worker
arguments.
Implementations of Daemon do not know *how* to daemonize, or execute
multiple daemonized workers, they simply provide the behavior of the daemon
and context specific knowledge about how workers should be started.
"""
def __init__(self, conf):
self.conf = conf
@ -40,35 +58,200 @@ class Daemon(object):
raise NotImplementedError('run_forever not implemented')
def run(self, once=False, **kwargs):
"""Run the daemon"""
if once:
self.run_once(**kwargs)
else:
self.run_forever(**kwargs)
def get_worker_args(self, once=False, **kwargs):
"""
For each worker yield a (possibly empty) dict of kwargs to pass along
to the daemon's :meth:`run` method after fork. The length of elements
returned from this method will determine the number of processes
created.
If the returned iterable is empty, the Strategy will fallback to
run-inline strategy.
:param once: False if the worker(s) will be daemonized, True if the
worker(s) will be run once
:param kwargs: plumbed through via command line argparser
:returns: an iterable of dicts, each element represents the kwargs to
be passed to a single worker's :meth:`run` method after fork.
"""
return []
def is_healthy(self):
"""
This method is called very frequently on the instance of the daemon
held by the parent process. If it returns False, all child workers are
terminated, and new workers will be created.
:returns: a boolean, True only if all workers should continue to run
"""
return True
class DaemonStrategy(object):
"""
This is the execution strategy for using subclasses of Daemon. The default
behavior is to invoke the daemon's :meth:`Daemon.run` method from within
the parent process. When the :meth:`Daemon.run` method returns the parent
process will exit.
However, if the Daemon returns a non-empty iterable from
:meth:`Daemon.get_worker_args`, the daemon's :meth:`Daemon.run` method will
be invoked in child processes, with the arguments provided from the parent
process's instance of the daemon. If a child process exits it will be
restarted with the same options, unless it was executed in once mode.
:param daemon: an instance of a :class:`Daemon` (has a `run` method)
:param logger: a logger instance
"""
def __init__(self, daemon, logger):
self.daemon = daemon
self.logger = logger
self.running = False
# only used by multi-worker strategy
self.options_by_pid = {}
self.unspawned_worker_options = []
def setup(self, **kwargs):
utils.validate_configuration()
utils.drop_privileges(self.conf.get('user', 'swift'))
utils.drop_privileges(self.daemon.conf.get('user', 'swift'))
utils.capture_stdio(self.logger, **kwargs)
def kill_children(*args):
self.running = False
self.logger.info('SIGTERM received')
signal.signal(signal.SIGTERM, signal.SIG_IGN)
os.killpg(0, signal.SIGTERM)
os._exit(0)
signal.signal(signal.SIGTERM, kill_children)
if once:
self.run_once(**kwargs)
self.running = True
def _run_inline(self, once=False, **kwargs):
"""Run the daemon"""
self.daemon.run(once=once, **kwargs)
def run(self, once=False, **kwargs):
"""Daemonize and execute our strategy"""
self.setup(**kwargs)
try:
self._run(once=once, **kwargs)
except KeyboardInterrupt:
self.logger.notice('User quit')
finally:
self.cleanup()
self.running = False
def _fork(self, once, **kwargs):
pid = os.fork()
if pid == 0:
signal.signal(signal.SIGHUP, signal.SIG_DFL)
signal.signal(signal.SIGTERM, signal.SIG_DFL)
self.daemon.run(once, **kwargs)
self.logger.debug('Forked worker %s finished', os.getpid())
# do not return from this stack, nor execute any finally blocks
os._exit(0)
else:
self.run_forever(**kwargs)
self.register_worker_start(pid, kwargs)
return pid
def iter_unspawned_workers(self):
while True:
try:
per_worker_options = self.unspawned_worker_options.pop()
except IndexError:
return
yield per_worker_options
def spawned_pids(self):
return self.options_by_pid.keys()
def register_worker_start(self, pid, per_worker_options):
self.logger.debug('Spawned worker %s with %r', pid, per_worker_options)
self.options_by_pid[pid] = per_worker_options
def register_worker_exit(self, pid):
self.unspawned_worker_options.append(self.options_by_pid.pop(pid))
def ask_daemon_to_prepare_workers(self, once, **kwargs):
self.unspawned_worker_options = list(
self.daemon.get_worker_args(once=once, **kwargs))
def abort_workers_if_daemon_would_like(self):
if not self.daemon.is_healthy():
self.logger.debug(
'Daemon needs to change options, aborting workers')
self.cleanup()
return True
return False
def check_on_all_running_workers(self):
for p in self.spawned_pids():
try:
pid, status = os.waitpid(p, os.WNOHANG)
except OSError as err:
if err.errno not in (errno.EINTR, errno.ECHILD):
raise
self.logger.notice('Worker %s died', p)
else:
if pid == 0:
# child still running
continue
self.logger.debug('Worker %s exited', p)
self.register_worker_exit(p)
def _run(self, once, **kwargs):
self.ask_daemon_to_prepare_workers(once, **kwargs)
if not self.unspawned_worker_options:
return self._run_inline(once, **kwargs)
for per_worker_options in self.iter_unspawned_workers():
if self._fork(once, **per_worker_options) == 0:
return 0
while self.running:
if self.abort_workers_if_daemon_would_like():
self.ask_daemon_to_prepare_workers(once, **kwargs)
self.check_on_all_running_workers()
if not once:
for per_worker_options in self.iter_unspawned_workers():
if self._fork(once, **per_worker_options) == 0:
return 0
else:
if not self.spawned_pids():
self.logger.notice('Finished %s', os.getpid())
break
time.sleep(0.1)
return 0
def cleanup(self):
for p in self.spawned_pids():
try:
os.kill(p, signal.SIGTERM)
except OSError as err:
if err.errno not in (errno.ESRCH, errno.EINTR, errno.ECHILD):
raise
self.register_worker_exit(p)
self.logger.debug('Cleaned up worker %s', p)
def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
"""
Loads settings from conf, then instantiates daemon "klass" and runs the
daemon with the specified once kwarg. The section_name will be derived
from the daemon "klass" if not provided (e.g. ObjectReplicator =>
Loads settings from conf, then instantiates daemon ``klass`` and runs the
daemon with the specified ``once`` kwarg. The section_name will be derived
from the daemon ``klass`` if not provided (e.g. ObjectReplicator =>
object-replicator).
:param klass: Class to instantiate, subclass of common.daemon.Daemon
:param klass: Class to instantiate, subclass of :class:`Daemon`
:param conf_file: Path to configuration file
:param section_name: Section name from conf file to load config from
:param once: Passed to daemon run method
:param once: Passed to daemon :meth:`Daemon.run` method
"""
# very often the config section_name is based on the class name
# the None singleton will be passed through to readconf as is
@ -113,8 +296,9 @@ def run_daemon(klass, conf_file, section_name='', once=False, **kwargs):
os.environ['TZ'] = 'UTC+0'
time.tzset()
logger.notice('Starting %s', os.getpid())
try:
klass(conf).run(once=once, **kwargs)
DaemonStrategy(klass(conf), logger).run(once=once, **kwargs)
except KeyboardInterrupt:
logger.info('User quit')
logger.info('Exited')
logger.notice('Exited %s', os.getpid())

View File

@ -3120,7 +3120,7 @@ def dump_recon_cache(cache_dict, cache_file, logger, lock_timeout=2,
try:
with NamedTemporaryFile(dir=os.path.dirname(cache_file),
delete=False) as tf:
tf.write(json.dumps(cache_entry) + '\n')
tf.write(json.dumps(cache_entry, sort_keys=True) + '\n')
if set_owner:
os.chown(tf.name, pwd.getpwnam(set_owner).pw_uid, -1)
renamer(tf.name, cache_file, fsync=False)

View File

@ -15,6 +15,7 @@
import json
import errno
import math
import os
from os.path import join
import random
@ -93,6 +94,30 @@ def _full_path(node, part, relative_path, policy):
}
def parse_override_options(**kwargs):
"""
Return a dict with keys `override_devices` and `override_partitions` whose
values have been parsed from `kwargs`. If either key is found in `kwargs`
then copy its value from kwargs. Otherwise, if `once` is set in `kwargs`
then parse `devices` and `partitions` keys for the value of
`override_devices` and `override_partitions` respectively.
:return: a dict with keys `override_devices` and `override_partitions`
"""
if kwargs.get('once', False):
devices = list_from_csv(kwargs.get('devices'))
partitions = [
int(p) for p in list_from_csv(kwargs.get('partitions'))]
else:
devices = []
partitions = []
return {
'override_devices': kwargs.get('override_devices', devices),
'override_partitions': kwargs.get('override_partitions', partitions),
}
class RebuildingECDiskFileStream(object):
"""
This class wraps the reconstructed fragment archive data and
@ -155,6 +180,12 @@ class ObjectReconstructor(Daemon):
self.port = None if self.servers_per_port else \
int(conf.get('bind_port', 6200))
self.concurrency = int(conf.get('concurrency', 1))
# N.B. to maintain compatibility with legacy configs this option can
# not be named 'workers' because the object-server uses that option
# name in the DEFAULT section
self.reconstructor_workers = int(conf.get('reconstructor_workers', 0))
self.policies = [policy for policy in POLICIES
if policy.policy_type == EC_POLICY]
self.stats_interval = int(conf.get('stats_interval', '300'))
self.ring_check_interval = int(conf.get('ring_check_interval', 15))
self.next_check = time.time() + self.ring_check_interval
@ -166,6 +197,7 @@ class ObjectReconstructor(Daemon):
self.recon_cache_path = conf.get('recon_cache_path',
'/var/cache/swift')
self.rcache = os.path.join(self.recon_cache_path, "object.recon")
self._next_rcache_update = time.time() + self.stats_interval
# defaults subject to change after beta
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
self.node_timeout = float(conf.get('node_timeout', 10))
@ -193,6 +225,102 @@ class ObjectReconstructor(Daemon):
self.logger.warning('Ignored handoffs_first option in favor '
'of handoffs_only.')
self._df_router = DiskFileRouter(conf, self.logger)
self.all_local_devices = self.get_local_devices()
def get_worker_args(self, once=False, **kwargs):
"""
Take the set of all local devices for this node from all the EC
policies rings, and distribute them evenly into the number of workers
to be spawned according to the configured worker count. If `devices` is
given in `kwargs` then distribute only those devices.
:param once: False if the worker(s) will be daemonized, True if the
worker(s) will be run once
:param kwargs: optional overrides from the command line
"""
if self.reconstructor_workers < 1:
return
override_options = parse_override_options(once=once, **kwargs)
# Note that this get re-used when dumping stats and in is_healthy
self.all_local_devices = self.get_local_devices()
if override_options['override_devices']:
devices = [d for d in override_options['override_devices']
if d in self.all_local_devices]
else:
devices = list(self.all_local_devices)
if not devices:
# we only need a single worker to do nothing until a ring change
yield dict(override_options)
return
# for somewhat uniform load per worker use same max_devices_per_worker
# when handling all devices or just override devices...
max_devices_per_worker = int(math.ceil(
1.0 * len(self.all_local_devices) / self.reconstructor_workers))
# ...but only use enough workers for the actual devices being handled
n = int(math.ceil(1.0 * len(devices) / max_devices_per_worker))
override_devices_per_worker = [devices[i::n] for i in range(n)]
for override_devices in override_devices_per_worker:
yield dict(override_options, override_devices=override_devices)
def is_healthy(self):
"""
Check whether rings have changed, and maybe do a recon update.
:returns: False if any ec ring has changed
"""
now = time.time()
if now > self._next_rcache_update:
self._next_rcache_update = now + self.stats_interval
self.aggregate_recon_update()
return self.get_local_devices() == self.all_local_devices
def aggregate_recon_update(self):
"""
Aggregate per-disk rcache updates from child workers.
"""
try:
with open(self.rcache) as f:
existing_data = json.load(f)
except IOError as e:
if e.errno != errno.ENOENT:
raise
# dump_recon_cache will create new file and dirs
existing_data = {}
first_start = time.time()
last_finish = 0
all_devices_reporting = True
for device in self.all_local_devices:
per_disk_stats = existing_data.get(
'object_reconstruction_per_disk', {}).get(device, {})
try:
start_time = per_disk_stats['object_reconstruction_last'] - \
(per_disk_stats['object_reconstruction_time'] * 60)
finish_time = per_disk_stats['object_reconstruction_last']
except KeyError:
all_devices_reporting = False
break
first_start = min(first_start, start_time)
last_finish = max(last_finish, finish_time)
if all_devices_reporting and last_finish > 0:
duration = last_finish - first_start
recon_update = {
'object_reconstruction_time': duration / 60.0,
'object_reconstruction_last': last_finish
}
else:
# if any current devices have not yet dropped stats, or the rcache
# file does not yet exist, we may still clear out per device stats
# for any devices that have been removed from local devices
recon_update = {}
found_devices = set(existing_data.get(
'object_reconstruction_per_disk', {}).keys())
clear_update = {d: {} for d in found_devices
if d not in self.all_local_devices}
if clear_update:
recon_update['object_reconstruction_per_disk'] = clear_update
dump_recon_cache(recon_update, self.rcache, self.logger)
def load_object_ring(self, policy):
"""
@ -888,38 +1016,37 @@ class ObjectReconstructor(Daemon):
# return a list of jobs for this part
return jobs
def collect_parts(self, override_devices=None,
override_partitions=None):
"""
Helper for getting partitions in the top level reconstructor
In handoffs_only mode no primary partitions will not be included in the
returned (possibly empty) list.
"""
override_devices = override_devices or []
override_partitions = override_partitions or []
def get_policy2devices(self):
ips = whataremyips(self.bind_ip)
ec_policies = (policy for policy in POLICIES
if policy.policy_type == EC_POLICY)
policy2devices = {}
for policy in ec_policies:
for policy in self.policies:
self.load_object_ring(policy)
local_devices = list(six.moves.filter(
lambda dev: dev and is_local_device(
ips, self.port,
dev['replication_ip'], dev['replication_port']),
policy.object_ring.devs))
if override_devices:
local_devices = list(six.moves.filter(
lambda dev_info: dev_info['device'] in override_devices,
local_devices))
policy2devices[policy] = local_devices
self.device_count += len(local_devices)
return policy2devices
def get_local_devices(self):
"""Returns a set of all local devices in all EC policies."""
policy2devices = self.get_policy2devices()
return reduce(set.union, (
set(d['device'] for d in devices)
for devices in policy2devices.values()))
def collect_parts(self, override_devices=None, override_partitions=None):
"""
Helper for getting partitions in the top level reconstructor
In handoffs_only mode primary partitions will not be included in the
returned (possibly empty) list.
"""
override_devices = override_devices or []
override_partitions = override_partitions or []
policy2devices = self.get_policy2devices()
all_parts = []
for policy, local_devices in policy2devices.items():
@ -938,6 +1065,10 @@ class ObjectReconstructor(Daemon):
df_mgr = self._df_router[policy]
for local_dev in local_devices:
if override_devices and (
local_dev['device'] not in override_devices):
continue
self.device_count += 1
dev_path = df_mgr.get_dev_path(local_dev['device'])
if not dev_path:
self.logger.warning(_('%s is not mounted'),
@ -1088,22 +1219,48 @@ class ObjectReconstructor(Daemon):
"You should disable handoffs_only once all nodes "
"are reporting no handoffs remaining."))
def final_recon_dump(self, total, override_devices=None, **kwargs):
"""
Add stats for this worker's run to recon cache.
When in worker mode (per_disk_stats == True) this worker's stats are
added per device instead of in the top level keys (aggregation is
serialized in the parent process).
:param total: the runtime of cycle in minutes
:param override_devices: (optional) list of device that are being
reconstructed
"""
recon_update = {
'object_reconstruction_time': total,
'object_reconstruction_last': time.time(),
}
if self.reconstructor_workers > 0:
devices = override_devices or self.all_local_devices
recon_update['pid'] = os.getpid()
recon_update = {'object_reconstruction_per_disk': {
d: recon_update for d in devices}}
else:
# if not running in worker mode, kill any per_disk stats
recon_update['object_reconstruction_per_disk'] = {}
dump_recon_cache(recon_update, self.rcache, self.logger)
def run_once(self, *args, **kwargs):
start = time.time()
self.logger.info(_("Running object reconstructor in script mode."))
override_devices = list_from_csv(kwargs.get('devices'))
override_partitions = [int(p) for p in
list_from_csv(kwargs.get('partitions'))]
self.reconstruct(
override_devices=override_devices,
override_partitions=override_partitions)
override_options = parse_override_options(once=True, **kwargs)
self.reconstruct(**override_options)
total = (time.time() - start) / 60
self.logger.info(
_("Object reconstruction complete (once). (%.02f minutes)"), total)
if not (override_partitions or override_devices):
dump_recon_cache({'object_reconstruction_time': total,
'object_reconstruction_last': time.time()},
self.rcache, self.logger)
# Only dump stats if they would actually be meaningful -- i.e. we're
# collecting per-disk stats and covering all partitions, or we're
# covering all partitions, all disks.
if not override_options['override_partitions'] and (
self.reconstructor_workers > 0 or
not override_options['override_devices']):
self.final_recon_dump(total, **override_options)
def run_forever(self, *args, **kwargs):
self.logger.info(_("Starting object reconstructor in daemon mode."))
@ -1111,14 +1268,13 @@ class ObjectReconstructor(Daemon):
while True:
start = time.time()
self.logger.info(_("Starting object reconstruction pass."))
override_options = parse_override_options(**kwargs)
# Run the reconstructor
self.reconstruct()
self.reconstruct(**override_options)
total = (time.time() - start) / 60
self.logger.info(
_("Object reconstruction complete. (%.02f minutes)"), total)
dump_recon_cache({'object_reconstruction_time': total,
'object_reconstruction_last': time.time()},
self.rcache, self.logger)
self.final_recon_dump(total, **override_options)
self.logger.debug('reconstruction sleeping for %s seconds.',
self.interval)
sleep(self.interval)

View File

@ -30,6 +30,7 @@ from numbers import Number
from tempfile import NamedTemporaryFile
import time
import eventlet
from eventlet import greenpool, debug as eventlet_debug
from eventlet.green import socket
from tempfile import mkdtemp
from shutil import rmtree
@ -218,6 +219,14 @@ class FakeRing(Ring):
self.set_replicas(replicas)
self._reload()
def has_changed(self):
"""
The real implementation uses getmtime on the serialized_path attribute,
which doesn't exist on our fake and relies on the implementation of
_reload which we override. So ... just NOOPE.
"""
return False
def _reload(self):
self._rtime = time.time()
@ -690,6 +699,16 @@ if utils.config_true_value(
fake_syslog_handler()
@contextmanager
def quiet_eventlet_exceptions():
orig_state = greenpool.DEBUG
eventlet_debug.hub_exceptions(False)
try:
yield
finally:
eventlet_debug.hub_exceptions(orig_state)
class MockTrue(object):
"""
Instances of MockTrue evaluate like True

View File

@ -13,8 +13,6 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# TODO(clayg): Test kill_children signal handlers
import os
from six import StringIO
from six.moves import reload_module
@ -25,15 +23,20 @@ import logging
from test.unit import tmpfile
import mock
import signal
from contextlib import contextmanager
import itertools
from collections import defaultdict
import errno
from swift.common import daemon, utils
from test.unit import debug_logger
class MyDaemon(daemon.Daemon):
def __init__(self, conf):
self.conf = conf
self.logger = utils.get_logger(None, 'server', log_route='server')
self.logger = debug_logger('my-daemon')
MyDaemon.forever_called = False
MyDaemon.once_called = False
@ -63,6 +66,39 @@ class TestDaemon(unittest.TestCase):
self.assertRaises(NotImplementedError, d.run_forever)
class MyWorkerDaemon(MyDaemon):
def get_worker_args(self, once=False, **kwargs):
return [kwargs for i in range(int(self.conf.get('workers', 0)))]
def is_healthy(self):
try:
return getattr(self, 'health_side_effects', []).pop(0)
except IndexError:
return True
class TestWorkerDaemon(unittest.TestCase):
def test_stubs(self):
d = daemon.Daemon({})
self.assertRaises(NotImplementedError, d.run_once)
self.assertRaises(NotImplementedError, d.run_forever)
self.assertEqual([], d.get_worker_args())
self.assertEqual(True, d.is_healthy())
def test_my_worker_daemon(self):
d = MyWorkerDaemon({})
self.assertEqual([], d.get_worker_args())
self.assertTrue(d.is_healthy())
d = MyWorkerDaemon({'workers': '3'})
self.assertEqual([{'key': 'val'}] * 3, d.get_worker_args(key='val'))
d.health_side_effects = [True, False]
self.assertTrue(d.is_healthy())
self.assertFalse(d.is_healthy())
self.assertTrue(d.is_healthy())
class TestRunDaemon(unittest.TestCase):
def setUp(self):
@ -89,7 +125,7 @@ class TestRunDaemon(unittest.TestCase):
d = MyDaemon({})
with mock.patch('swift.common.daemon.signal') as mock_signal:
mock_signal.SIGTERM = signal.SIGTERM
d.run()
daemon.DaemonStrategy(d, d.logger).run()
signal_args, kwargs = mock_signal.signal.call_args
sig, func = signal_args
self.assertEqual(sig, signal.SIGTERM)
@ -158,6 +194,169 @@ class TestRunDaemon(unittest.TestCase):
os.environ['TZ'] = old_tz
time.tzset()
@contextmanager
def mock_os(self, child_worker_cycles=3):
self.waitpid_calls = defaultdict(int)
def mock_waitpid(p, *args):
self.waitpid_calls[p] += 1
if self.waitpid_calls[p] >= child_worker_cycles:
rv = p
else:
rv = 0
return rv, 0
with mock.patch('swift.common.daemon.os.fork') as mock_fork, \
mock.patch('swift.common.daemon.os.waitpid', mock_waitpid), \
mock.patch('swift.common.daemon.os.kill') as mock_kill:
mock_fork.side_effect = (
'mock-pid-%s' % i for i in itertools.count())
self.mock_fork = mock_fork
self.mock_kill = mock_kill
yield
def test_fork_workers(self):
d = MyWorkerDaemon({'workers': 3})
strategy = daemon.DaemonStrategy(d, d.logger)
with self.mock_os():
strategy.run(once=True)
self.assertEqual([mock.call()] * 3, self.mock_fork.call_args_list)
self.assertEqual(self.waitpid_calls, {
'mock-pid-0': 3,
'mock-pid-1': 3,
'mock-pid-2': 3,
})
self.assertEqual([], self.mock_kill.call_args_list)
self.assertIn('Finished', d.logger.get_lines_for_level('notice')[-1])
def test_forked_worker(self):
d = MyWorkerDaemon({'workers': 3})
strategy = daemon.DaemonStrategy(d, d.logger)
with mock.patch('swift.common.daemon.os.fork') as mock_fork, \
mock.patch('swift.common.daemon.os._exit') as mock_exit:
mock_fork.return_value = 0
mock_exit.side_effect = SystemExit
self.assertRaises(SystemExit, strategy.run, once=True)
self.assertTrue(d.once_called)
def test_restart_workers(self):
d = MyWorkerDaemon({'workers': 3})
strategy = daemon.DaemonStrategy(d, d.logger)
d.health_side_effects = [True, False]
with self.mock_os():
self.mock_kill.side_effect = lambda *args, **kwargs: setattr(
strategy, 'running', False)
strategy.run()
# six workers forked in total
self.assertEqual([mock.call()] * 6, self.mock_fork.call_args_list)
# since the daemon starts healthy, first pass checks children once
self.assertEqual(self.waitpid_calls, {
'mock-pid-0': 1,
'mock-pid-1': 1,
'mock-pid-2': 1,
})
# second pass is not healthy, original pid's killed
self.assertEqual(set([
('mock-pid-0', signal.SIGTERM),
('mock-pid-1', signal.SIGTERM),
('mock-pid-2', signal.SIGTERM),
]), set(c[0] for c in self.mock_kill.call_args_list[:3]))
# our mock_kill side effect breaks out of running, and cleanup kills
# remaining pids
self.assertEqual(set([
('mock-pid-3', signal.SIGTERM),
('mock-pid-4', signal.SIGTERM),
('mock-pid-5', signal.SIGTERM),
]), set(c[0] for c in self.mock_kill.call_args_list[3:]))
def test_worker_disappears(self):
d = MyWorkerDaemon({'workers': 3})
strategy = daemon.DaemonStrategy(d, d.logger)
strategy.register_worker_start('mock-pid', {'mock_options': True})
self.assertEqual(strategy.unspawned_worker_options, [])
self.assertEqual(strategy.options_by_pid, {
'mock-pid': {'mock_options': True}
})
# still running
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
mock_waitpid.return_value = (0, 0)
strategy.check_on_all_running_workers()
self.assertEqual(strategy.unspawned_worker_options, [])
self.assertEqual(strategy.options_by_pid, {
'mock-pid': {'mock_options': True}
})
# finished
strategy = daemon.DaemonStrategy(d, d.logger)
strategy.register_worker_start('mock-pid', {'mock_options': True})
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
mock_waitpid.return_value = ('mock-pid', 0)
strategy.check_on_all_running_workers()
self.assertEqual(strategy.unspawned_worker_options, [
{'mock_options': True}])
self.assertEqual(strategy.options_by_pid, {})
self.assertEqual(d.logger.get_lines_for_level('debug')[-1],
'Worker mock-pid exited')
# disappeared
strategy = daemon.DaemonStrategy(d, d.logger)
strategy.register_worker_start('mock-pid', {'mock_options': True})
with mock.patch('swift.common.daemon.os.waitpid') as mock_waitpid:
mock_waitpid.side_effect = OSError(
errno.ECHILD, os.strerror(errno.ECHILD))
mock_waitpid.return_value = ('mock-pid', 0)
strategy.check_on_all_running_workers()
self.assertEqual(strategy.unspawned_worker_options, [
{'mock_options': True}])
self.assertEqual(strategy.options_by_pid, {})
self.assertEqual(d.logger.get_lines_for_level('notice')[-1],
'Worker mock-pid died')
def test_worker_kills_pids_in_cleanup(self):
d = MyWorkerDaemon({'workers': 2})
strategy = daemon.DaemonStrategy(d, d.logger)
strategy.register_worker_start('mock-pid-1', {'mock_options': True})
strategy.register_worker_start('mock-pid-2', {'mock_options': True})
self.assertEqual(strategy.unspawned_worker_options, [])
self.assertEqual(strategy.options_by_pid, {
'mock-pid-1': {'mock_options': True},
'mock-pid-2': {'mock_options': True},
})
with mock.patch('swift.common.daemon.os.kill') as mock_kill:
strategy.cleanup()
self.assertEqual(strategy.unspawned_worker_options, [
{'mock_options': True}] * 2)
self.assertEqual(strategy.options_by_pid, {})
self.assertEqual(set([
('mock-pid-1', signal.SIGTERM),
('mock-pid-2', signal.SIGTERM),
]), set(c[0] for c in mock_kill.call_args_list))
self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]),
set(['Cleaned up worker mock-pid-1',
'Cleaned up worker mock-pid-2']))
def test_worker_disappears_in_cleanup(self):
d = MyWorkerDaemon({'workers': 2})
strategy = daemon.DaemonStrategy(d, d.logger)
strategy.register_worker_start('mock-pid-1', {'mock_options': True})
strategy.register_worker_start('mock-pid-2', {'mock_options': True})
self.assertEqual(strategy.unspawned_worker_options, [])
self.assertEqual(strategy.options_by_pid, {
'mock-pid-1': {'mock_options': True},
'mock-pid-2': {'mock_options': True},
})
with mock.patch('swift.common.daemon.os.kill') as mock_kill:
mock_kill.side_effect = [None, OSError(errno.ECHILD,
os.strerror(errno.ECHILD))]
strategy.cleanup()
self.assertEqual(strategy.unspawned_worker_options, [
{'mock_options': True}] * 2)
self.assertEqual(strategy.options_by_pid, {})
self.assertEqual(set([
('mock-pid-1', signal.SIGTERM),
('mock-pid-2', signal.SIGTERM),
]), set(c[0] for c in mock_kill.call_args_list))
self.assertEqual(set(d.logger.get_lines_for_level('debug')[-2:]),
set(['Cleaned up worker mock-pid-1',
'Cleaned up worker mock-pid-2']))
if __name__ == '__main__':
unittest.main()

View File

@ -26,7 +26,7 @@ import re
import random
import struct
import collections
from eventlet import Timeout, sleep
from eventlet import Timeout, sleep, spawn
from contextlib import closing, contextmanager
from gzip import GzipFile
@ -35,6 +35,7 @@ from six.moves.urllib.parse import unquote
from swift.common import utils
from swift.common.exceptions import DiskFileError
from swift.common.header_key_dict import HeaderKeyDict
from swift.common.utils import dump_recon_cache
from swift.obj import diskfile, reconstructor as object_reconstructor
from swift.common import ring
from swift.common.storage_policy import (StoragePolicy, ECStoragePolicy,
@ -43,7 +44,8 @@ from swift.obj.reconstructor import REVERT
from test.unit import (patch_policies, debug_logger, mocked_http_conn,
FabricatedRing, make_timestamp_iter,
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies)
DEFAULT_TEST_EC_TYPE, encode_frag_archive_bodies,
quiet_eventlet_exceptions)
from test.unit.obj.common import write_diskfile
@ -1343,6 +1345,931 @@ class TestGlobalSetupObjectReconstructorLegacyDurable(
legacy_durable = True
@patch_policies(with_ec_default=True)
class TestWorkerReconstructor(unittest.TestCase):
maxDiff = None
def setUp(self):
super(TestWorkerReconstructor, self).setUp()
self.logger = debug_logger()
self.testdir = tempfile.mkdtemp()
self.recon_cache_path = os.path.join(self.testdir, 'recon')
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
# dump_recon_cache expects recon_cache_path to exist
os.mkdir(self.recon_cache_path)
def tearDown(self):
super(TestWorkerReconstructor, self).tearDown()
shutil.rmtree(self.testdir)
def test_no_workers_by_default(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{}, logger=self.logger)
self.assertEqual(0, reconstructor.reconstructor_workers)
self.assertEqual(0, len(list(reconstructor.get_worker_args())))
def test_bad_value_workers(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '-1'}, logger=self.logger)
self.assertEqual(-1, reconstructor.reconstructor_workers)
self.assertEqual(0, len(list(reconstructor.get_worker_args())))
def test_workers_with_no_devices(self):
def do_test(num_workers):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': num_workers}, logger=self.logger)
self.assertEqual(num_workers, reconstructor.reconstructor_workers)
self.assertEqual(1, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': []},
], list(reconstructor.get_worker_args()))
do_test(1)
do_test(10)
def test_workers_with_devices_and_no_valid_overrides(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
self.assertEqual(2, reconstructor.reconstructor_workers)
# N.B. sdz is not in local_devices so there are no devices to process
# but still expect a single worker process
worker_args = list(reconstructor.get_worker_args(
once=True, devices='sdz'))
self.assertEqual(1, len(worker_args))
self.assertEqual([{'override_partitions': [],
'override_devices': ['sdz']}],
worker_args)
# overrides are ignored in forever mode
worker_args = list(reconstructor.get_worker_args(
once=False, devices='sdz'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']}
], worker_args)
def test_workers_with_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
self.assertEqual(2, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
expected = [
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
]
worker_args = list(reconstructor.get_worker_args(once=False))
self.assertEqual(2, len(worker_args))
self.assertEqual(expected, worker_args)
worker_args = list(reconstructor.get_worker_args(once=True))
self.assertEqual(2, len(worker_args))
self.assertEqual(expected, worker_args)
def test_workers_with_devices_and_overrides(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
self.assertEqual(2, reconstructor.reconstructor_workers)
# check we don't get more workers than override devices...
# N.B. sdz is not in local_devices so should be ignored for the
# purposes of generating workers
worker_args = list(reconstructor.get_worker_args(
once=True, devices='sdb,sdz', partitions='99,333'))
self.assertEqual(1, len(worker_args))
self.assertEqual(
[{'override_partitions': [99, 333], 'override_devices': ['sdb']}],
worker_args)
# overrides are ignored in forever mode
worker_args = list(reconstructor.get_worker_args(
once=False, devices='sdb,sdz', partitions='99,333'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']}
], worker_args)
def test_workers_with_lots_of_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: [
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
self.assertEqual(2, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': [
'sdb', 'sdd', 'sdf']},
{'override_partitions': [], 'override_devices': [
'sdc', 'sde']},
], list(reconstructor.get_worker_args()))
def test_workers_with_lots_of_devices_and_overrides(self):
# check that override devices get distributed across workers
# in similar fashion to all devices
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '2'}, logger=self.logger)
reconstructor.get_local_devices = lambda: [
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
self.assertEqual(2, reconstructor.reconstructor_workers)
worker_args = list(reconstructor.get_worker_args(
once=True, devices='sdb,sdd,sdf', partitions='99,333'))
self.assertEqual(1, len(worker_args))
# 5 devices in total, 2 workers -> up to 3 devices per worker so a
# single worker should handle the requested override devices
self.assertEqual([
{'override_partitions': [99, 333], 'override_devices': [
'sdb', 'sdd', 'sdf']},
], worker_args)
# with 4 override devices, expect 2 per worker
worker_args = list(reconstructor.get_worker_args(
once=True, devices='sdb,sdc,sdd,sdf', partitions='99,333'))
self.assertEqual(2, len(worker_args))
self.assertEqual([
{'override_partitions': [99, 333], 'override_devices': [
'sdb', 'sdd']},
{'override_partitions': [99, 333], 'override_devices': [
'sdc', 'sdf']},
], worker_args)
def test_workers_with_lots_of_workers(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '10'}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sdb', 'sdc']
self.assertEqual(10, reconstructor.reconstructor_workers)
self.assertEqual(2, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
], list(reconstructor.get_worker_args()))
def test_workers_with_lots_of_workers_and_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'reconstructor_workers': '10'}, logger=self.logger)
reconstructor.get_local_devices = lambda: [
'sdb', 'sdc', 'sdd', 'sde', 'sdf']
self.assertEqual(10, reconstructor.reconstructor_workers)
self.assertEqual(5, len(list(reconstructor.get_worker_args())))
self.assertEqual([
{'override_partitions': [], 'override_devices': ['sdb']},
{'override_partitions': [], 'override_devices': ['sdc']},
{'override_partitions': [], 'override_devices': ['sdd']},
{'override_partitions': [], 'override_devices': ['sde']},
{'override_partitions': [], 'override_devices': ['sdf']},
], list(reconstructor.get_worker_args()))
def test_workers_with_some_workers_and_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{}, logger=self.logger)
reconstructor.get_local_devices = lambda: [
'd%s' % (i + 1) for i in range(21)]
# ... with many devices per worker, worker count is pretty granular
for i in range(1, 8):
reconstructor.reconstructor_workers = i
self.assertEqual(i, len(list(reconstructor.get_worker_args())))
# ... then it gets sorta stair step
for i in range(9, 10):
reconstructor.reconstructor_workers = i
self.assertEqual(7, len(list(reconstructor.get_worker_args())))
# 2-3 devices per worker
for args in reconstructor.get_worker_args():
self.assertIn(len(args['override_devices']), (2, 3))
for i in range(11, 20):
reconstructor.reconstructor_workers = i
self.assertEqual(11, len(list(reconstructor.get_worker_args())))
# 1, 2 devices per worker
for args in reconstructor.get_worker_args():
self.assertIn(len(args['override_devices']), (1, 2))
# this is debatable, but maybe I'll argue if you're going to have
# *some* workers with > 1 device, it's better to have fewer workers
# with devices spread out evenly than a couple outliers?
self.assertEqual([
{'override_partitions': [], 'override_devices': ['d1', 'd12']},
{'override_partitions': [], 'override_devices': ['d2', 'd13']},
{'override_partitions': [], 'override_devices': ['d3', 'd14']},
{'override_partitions': [], 'override_devices': ['d4', 'd15']},
{'override_partitions': [], 'override_devices': ['d5', 'd16']},
{'override_partitions': [], 'override_devices': ['d6', 'd17']},
{'override_partitions': [], 'override_devices': ['d7', 'd18']},
{'override_partitions': [], 'override_devices': ['d8', 'd19']},
{'override_partitions': [], 'override_devices': ['d9', 'd20']},
{'override_partitions': [], 'override_devices': ['d10', 'd21']},
{'override_partitions': [], 'override_devices': ['d11']},
], list(reconstructor.get_worker_args()))
# you can't get < than 1 device per worker
for i in range(21, 52):
reconstructor.reconstructor_workers = i
self.assertEqual(21, len(list(reconstructor.get_worker_args())))
for args in reconstructor.get_worker_args():
self.assertEqual(1, len(args['override_devices']))
def test_next_rcache_update_configured_with_stats_interval(self):
now = time.time()
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
reconstructor = object_reconstructor.ObjectReconstructor(
{}, logger=self.logger)
self.assertEqual(now + 300, reconstructor._next_rcache_update)
reconstructor = object_reconstructor.ObjectReconstructor(
{'stats_interval': '30'}, logger=self.logger)
self.assertEqual(now + 30, reconstructor._next_rcache_update)
def test_is_healthy_rcache_update_waits_for_next_update(self):
now = time.time()
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
# file does not exist to start
self.assertFalse(os.path.exists(self.rcache))
self.assertTrue(reconstructor.is_healthy())
# ... and isn't created until _next_rcache_update
self.assertFalse(os.path.exists(self.rcache))
# ... but if we wait 5 mins (by default)
orig_next_update = reconstructor._next_rcache_update
with mock.patch('swift.obj.reconstructor.time.time',
return_value=now + 301):
self.assertTrue(reconstructor.is_healthy())
self.assertGreater(reconstructor._next_rcache_update, orig_next_update)
# ... it will be created
self.assertTrue(os.path.exists(self.rcache))
with open(self.rcache) as f:
data = json.load(f)
# and empty
self.assertEqual({}, data)
def test_is_healthy_ring_update_next_check(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
self.assertTrue(reconstructor.is_healthy())
reconstructor.get_local_devices = lambda: {
'sdb%d' % p for p in reconstructor.policies}
self.assertFalse(reconstructor.is_healthy())
reconstructor.all_local_devices = {
'sdb%d' % p for p in reconstructor.policies}
self.assertTrue(reconstructor.is_healthy())
def test_final_recon_dump(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
reconstructor.all_local_devices = ['sda', 'sdc']
total = 12.0
now = time.time()
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
reconstructor.final_recon_dump(total)
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': now,
'object_reconstruction_time': total,
}, data)
total = 14.0
now += total * 60
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
reconstructor.final_recon_dump(total)
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': now,
'object_reconstruction_time': total,
}, data)
# per_disk_stats with workers
reconstructor.reconstructor_workers = 1
old_total = total
total = 16.0
before = now
now += total * 60
with mock.patch('swift.obj.reconstructor.time.time',
return_value=now), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-1'):
reconstructor.final_recon_dump(total, override_devices=[
'sda', 'sdc'])
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': before,
'object_reconstruction_time': old_total,
'object_reconstruction_per_disk': {
'sda': {
'object_reconstruction_last': now,
'object_reconstruction_time': total,
'pid': 'pid-1',
},
'sdc': {
'object_reconstruction_last': now,
'object_reconstruction_time': total,
'pid': 'pid-1',
},
},
}, data)
# and without workers we clear it out
reconstructor.reconstructor_workers = 0
total = 18.0
now += total * 60
with mock.patch('swift.obj.reconstructor.time.time', return_value=now):
reconstructor.final_recon_dump(total)
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': now,
'object_reconstruction_time': total,
}, data)
def test_dump_recon_run_once_inline(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
reconstructor.reconstruct = mock.MagicMock()
now = time.time()
later = now + 300 # 5 mins
with mock.patch('swift.obj.reconstructor.time.time', side_effect=[
now, later, later]):
reconstructor.run_once()
# no override args passed to reconstruct
self.assertEqual([mock.call(
override_devices=[],
override_partitions=[]
)], reconstructor.reconstruct.call_args_list)
# script mode with no override args, we expect recon dumps
self.assertTrue(os.path.exists(self.rcache))
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
}, data)
total = 10.0
later += total * 60
with mock.patch('swift.obj.reconstructor.time.time',
return_value=later):
reconstructor.final_recon_dump(total)
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': later,
'object_reconstruction_time': 10.0,
}, data)
def test_dump_recon_run_once_in_worker(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path,
'reconstructor_workers': 1},
logger=self.logger)
reconstructor.get_local_devices = lambda: {'sda'}
now = time.time()
later = now + 300 # 5 mins
def do_test(run_kwargs, expected_device):
# get the actual kwargs that would be passed to run_once in a
# worker
run_once_kwargs = list(
reconstructor.get_worker_args(once=True, **run_kwargs))[0]
reconstructor.reconstruct = mock.MagicMock()
with mock.patch('swift.obj.reconstructor.time.time',
side_effect=[now, later, later]):
reconstructor.run_once(**run_once_kwargs)
self.assertEqual([mock.call(
override_devices=[expected_device],
override_partitions=[]
)], reconstructor.reconstruct.call_args_list)
self.assertTrue(os.path.exists(self.rcache))
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
# no aggregate is written but perhaps it should be, in which
# case this assertion will need to change
'object_reconstruction_per_disk': {
expected_device: {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': mock.ANY
}
}
}, data)
# script mode with no CLI override args, we expect recon dumps
do_test({}, 'sda')
# script mode *with* CLI override devices, we expect recon dumps
os.unlink(self.rcache)
do_test(dict(devices='sda'), 'sda')
# if the override device is not in local devices we still get
# a recon dump, but it'll get cleaned up in the next aggregation
os.unlink(self.rcache)
do_test(dict(devices='sdz'), 'sdz')
# now disable workers and check that inline run_once updates rcache
# and clears out per disk stats
now = time.time()
later = now + 600 # 10 mins
reconstructor.reconstructor_workers = 0
with mock.patch('swift.obj.reconstructor.time.time',
side_effect=[now, later, later]):
reconstructor.run_once()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': later,
'object_reconstruction_time': 10.0,
}, data)
def test_no_dump_recon_run_once(self):
reconstructor = object_reconstructor.ObjectReconstructor(
{'recon_cache_path': self.recon_cache_path},
logger=self.logger)
reconstructor.get_local_devices = lambda: {'sda', 'sdb', 'sdc'}
def do_test(run_once_kwargs, expected_devices, expected_partitions):
reconstructor.reconstruct = mock.MagicMock()
now = time.time()
later = now + 300 # 5 mins
with mock.patch('swift.obj.reconstructor.time.time', side_effect=[
now, later, later]):
reconstructor.run_once(**run_once_kwargs)
# override args passed to reconstruct
actual_calls = reconstructor.reconstruct.call_args_list
self.assertEqual({'override_devices', 'override_partitions'},
set(actual_calls[0][1]))
self.assertEqual(sorted(expected_devices),
sorted(actual_calls[0][1]['override_devices']))
self.assertEqual(sorted(expected_partitions),
sorted(actual_calls[0][1]['override_partitions']))
self.assertFalse(actual_calls[1:])
self.assertEqual(False, os.path.exists(self.rcache))
# inline mode with overrides never does recon dump
reconstructor.reconstructor_workers = 0
kwargs = {'devices': 'sda,sdb'}
do_test(kwargs, ['sda', 'sdb'], [])
# Have partition override, so no recon dump
kwargs = {'partitions': '1,2,3'}
do_test(kwargs, [], [1, 2, 3])
reconstructor.reconstructor_workers = 1
worker_kwargs = list(
reconstructor.get_worker_args(once=True, **kwargs))[0]
do_test(worker_kwargs, ['sda', 'sdb', 'sdc'], [1, 2, 3])
reconstructor.reconstructor_workers = 0
kwargs = {'devices': 'sda,sdb', 'partitions': '1,2,3'}
do_test(kwargs, ['sda', 'sdb'], [1, 2, 3])
reconstructor.reconstructor_workers = 1
worker_kwargs = list(
reconstructor.get_worker_args(once=True, **kwargs))[0]
do_test(worker_kwargs, ['sda', 'sdb'], [1, 2, 3])
# 'sdz' is not in local devices
reconstructor.reconstructor_workers = 0
kwargs = {'devices': 'sdz'}
do_test(kwargs, ['sdz'], [])
def test_run_forever_recon_aggregation(self):
class StopForever(Exception):
pass
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
reconstructor.get_local_devices = lambda: ['sda', 'sdb', 'sdc', 'sdd']
reconstructor.reconstruct = mock.MagicMock()
now = time.time()
later = now + 300 # 5 mins
worker_args = list(
# include 'devices' kwarg as a sanity check - it should be ignored
# in run_forever mode
reconstructor.get_worker_args(once=False, devices='sda'))
with mock.patch('swift.obj.reconstructor.time.time',
side_effect=[now, later, later]), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-1'), \
mock.patch('swift.obj.reconstructor.sleep',
side_effect=[StopForever]), \
Timeout(.3), quiet_eventlet_exceptions(), \
self.assertRaises(StopForever):
gt = spawn(reconstructor.run_forever, **worker_args[0])
gt.wait()
# override args are passed to reconstruct
self.assertEqual([mock.call(
override_devices=['sda', 'sdc'],
override_partitions=[]
)], reconstructor.reconstruct.call_args_list)
# forever mode with override args, we expect per-disk recon dumps
self.assertTrue(os.path.exists(self.rcache))
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'sda': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
'sdc': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
}
}, data)
reconstructor.reconstruct.reset_mock()
# another worker would get *different* disks
before = now = later
later = now + 300 # 5 more minutes
with mock.patch('swift.obj.reconstructor.time.time',
side_effect=[now, later, later]), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-2'), \
mock.patch('swift.obj.reconstructor.sleep',
side_effect=[StopForever]), \
Timeout(.3), quiet_eventlet_exceptions(), \
self.assertRaises(StopForever):
gt = spawn(reconstructor.run_forever, **worker_args[1])
gt.wait()
# override args are parsed
self.assertEqual([mock.call(
override_devices=['sdb', 'sdd'],
override_partitions=[]
)], reconstructor.reconstruct.call_args_list)
# forever mode with override args, we expect per-disk recon dumps
self.assertTrue(os.path.exists(self.rcache))
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'sda': {
'object_reconstruction_last': before,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
'sdb': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-2',
},
'sdc': {
'object_reconstruction_last': before,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
'sdd': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-2',
},
}
}, data)
# aggregation is done in the parent thread even later
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': later,
'object_reconstruction_time': 10.0,
'object_reconstruction_per_disk': {
'sda': {
'object_reconstruction_last': before,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
'sdb': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-2',
},
'sdc': {
'object_reconstruction_last': before,
'object_reconstruction_time': 5.0,
'pid': 'pid-1',
},
'sdd': {
'object_reconstruction_last': later,
'object_reconstruction_time': 5.0,
'pid': 'pid-2',
},
}
}, data)
def test_recon_aggregation_waits_for_all_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
reconstructor.all_local_devices = set([
'd0', 'd1', 'd2', 'd3',
# unreported device definitely matters
'd4'])
start = time.time() - 1000
for i in range(4):
with mock.patch('swift.obj.reconstructor.time.time',
return_value=start + (300 * i)), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-%s' % i):
reconstructor.final_recon_dump(
i, override_devices=['d%s' % i])
# sanity
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 0.0,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
'd3': {
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 3,
'pid': 'pid-3',
},
}
}, data)
# unreported device d4 prevents aggregation
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertNotIn('object_reconstruction_last', data)
self.assertNotIn('object_reconstruction_time', data)
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
set(data['object_reconstruction_per_disk'].keys()))
# it's idempotent
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertNotIn('object_reconstruction_last', data)
self.assertNotIn('object_reconstruction_time', data)
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
set(data['object_reconstruction_per_disk'].keys()))
# remove d4, we no longer wait on it for aggregation
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual(start + 900, data['object_reconstruction_last'])
self.assertEqual(15, data['object_reconstruction_time'])
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
set(data['object_reconstruction_per_disk'].keys()))
def test_recon_aggregation_removes_devices(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
start = time.time() - 1000
for i in range(4):
with mock.patch('swift.obj.reconstructor.time.time',
return_value=start + (300 * i)), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-%s' % i):
reconstructor.final_recon_dump(
i, override_devices=['d%s' % i])
# sanity
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 0.0,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
'd3': {
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 3,
'pid': 'pid-3',
},
}
}, data)
reconstructor.all_local_devices = set(['d0', 'd1', 'd2', 'd3'])
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual(start + 900, data['object_reconstruction_last'])
self.assertEqual(15, data['object_reconstruction_time'])
self.assertEqual(set(['d0', 'd1', 'd2', 'd3']),
set(data['object_reconstruction_per_disk'].keys()))
# it's idempotent
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 15,
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 0.0,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
'd3': {
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 3,
'pid': 'pid-3',
},
}
}, data)
# if a device is removed from the ring
reconstructor.all_local_devices = set(['d1', 'd2', 'd3'])
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
# ... it's per-disk stats are removed (d0)
self.assertEqual({
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 11,
'object_reconstruction_per_disk': {
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
'd3': {
'object_reconstruction_last': start + 900,
'object_reconstruction_time': 3,
'pid': 'pid-3',
},
}
}, data)
# which can affect the aggregates!
reconstructor.all_local_devices = set(['d1', 'd2'])
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 6,
'object_reconstruction_per_disk': {
'd1': {
'object_reconstruction_last': start + 300,
'object_reconstruction_time': 1,
'pid': 'pid-1',
},
'd2': {
'object_reconstruction_last': start + 600,
'object_reconstruction_time': 2,
'pid': 'pid-2',
},
}
}, data)
def test_recon_aggregation_races_with_final_recon_dump(self):
reconstructor = object_reconstructor.ObjectReconstructor({
'reconstructor_workers': 2,
'recon_cache_path': self.recon_cache_path
}, logger=self.logger)
reconstructor.all_local_devices = set(['d0', 'd1'])
start = time.time() - 1000
# first worker dumps to recon cache
with mock.patch('swift.obj.reconstructor.time.time',
return_value=start), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-0'):
reconstructor.final_recon_dump(
1, override_devices=['d0'])
# sanity
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 1,
'pid': 'pid-0',
},
}
}, data)
# simulate a second worker concurrently dumping to recon cache while
# parent is aggregatng existing results; mock dump_recon_cache as a
# convenient way to interrupt parent aggregate_recon_update and 'pass
# control' to second worker
updated_data = [] # state of recon cache just after second worker dump
def simulate_other_process_final_recon_dump():
with mock.patch('swift.obj.reconstructor.time.time',
return_value=start + 999), \
mock.patch('swift.obj.reconstructor.os.getpid',
return_value='pid-1'):
reconstructor.final_recon_dump(
1000, override_devices=['d1'])
with open(self.rcache) as f:
updated_data.append(json.load(f))
def fake_dump_recon_cache(*args, **kwargs):
# temporarily put back real dump_recon_cache
with mock.patch('swift.obj.reconstructor.dump_recon_cache',
dump_recon_cache):
simulate_other_process_final_recon_dump()
# and now proceed with parent dump_recon_cache
dump_recon_cache(*args, **kwargs)
reconstructor.dump_recon_cache = fake_dump_recon_cache
with mock.patch('swift.obj.reconstructor.dump_recon_cache',
fake_dump_recon_cache):
reconstructor.aggregate_recon_update()
self.assertEqual([{ # sanity check - second process did dump its data
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 1,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 999,
'object_reconstruction_time': 1000,
'pid': 'pid-1',
},
}
}], updated_data)
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 1,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 999,
'object_reconstruction_time': 1000,
'pid': 'pid-1',
},
}
}, data)
# next aggregation will find d1 stats
reconstructor.aggregate_recon_update()
with open(self.rcache) as f:
data = json.load(f)
self.assertEqual({
'object_reconstruction_last': start + 999,
'object_reconstruction_time': 1000,
'object_reconstruction_per_disk': {
'd0': {
'object_reconstruction_last': start,
'object_reconstruction_time': 1,
'pid': 'pid-0',
},
'd1': {
'object_reconstruction_last': start + 999,
'object_reconstruction_time': 1000,
'pid': 'pid-1',
},
}
}, data)
@patch_policies(with_ec_default=True)
class BaseTestObjectReconstructor(unittest.TestCase):
def setUp(self):