Merge "Improve object-updater's stats logging"
This commit is contained in:
commit
17eb570a6c
@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second.
|
||||
system specs. 0 is unlimited.
|
||||
slowdown 0.01 Time in seconds to wait between objects.
|
||||
Deprecated in favor of objects_per_second.
|
||||
report_interval 300 Interval in seconds between logging
|
||||
statistics about the current update pass.
|
||||
recon_cache_path /var/cache/swift Path to recon cache
|
||||
nice_priority None Scheduling priority of server processes.
|
||||
Niceness values range from -20 (most
|
||||
|
@ -372,6 +372,12 @@ use = egg:swift#recon
|
||||
# objects_per_second instead.
|
||||
# slowdown = 0.01
|
||||
#
|
||||
# Log stats (at INFO level) every report_interval seconds. This
|
||||
# logging is per-process, so with concurrency > 1, the logs will
|
||||
# contain one stats log per worker process every report_interval
|
||||
# seconds.
|
||||
# report_interval = 300
|
||||
#
|
||||
# recon_cache_path = /var/cache/swift
|
||||
#
|
||||
# You can set scheduling priority of processes. Niceness values range from -20
|
||||
|
@ -36,6 +36,37 @@ from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE
|
||||
from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR
|
||||
|
||||
|
||||
class SweepStats(object):
|
||||
"""
|
||||
Stats bucket for an update sweep
|
||||
"""
|
||||
def __init__(self, errors=0, failures=0, quarantines=0, successes=0,
|
||||
unlinks=0):
|
||||
self.errors = errors
|
||||
self.failures = failures
|
||||
self.quarantines = quarantines
|
||||
self.successes = successes
|
||||
self.unlinks = unlinks
|
||||
|
||||
def copy(self):
|
||||
return type(self)(self.errors, self.failures, self.quarantines,
|
||||
self.successes, self.unlinks)
|
||||
|
||||
def since(self, other):
|
||||
return type(self)(self.errors - other.errors,
|
||||
self.failures - other.failures,
|
||||
self.quarantines - other.quarantines,
|
||||
self.successes - other.successes,
|
||||
self.unlinks - other.unlinks)
|
||||
|
||||
def reset(self):
|
||||
self.errors = 0
|
||||
self.failures = 0
|
||||
self.quarantines = 0
|
||||
self.successes = 0
|
||||
self.unlinks = 0
|
||||
|
||||
|
||||
class ObjectUpdater(Daemon):
|
||||
"""Update object information in container listings."""
|
||||
|
||||
@ -63,16 +94,18 @@ class ObjectUpdater(Daemon):
|
||||
objects_per_second))
|
||||
self.node_timeout = float(conf.get('node_timeout', 10))
|
||||
self.conn_timeout = float(conf.get('conn_timeout', 0.5))
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.report_interval = float(conf.get('report_interval', 300))
|
||||
self.recon_cache_path = conf.get('recon_cache_path',
|
||||
'/var/cache/swift')
|
||||
self.rcache = os.path.join(self.recon_cache_path, 'object.recon')
|
||||
self.stats = SweepStats()
|
||||
|
||||
def _listdir(self, path):
|
||||
try:
|
||||
return os.listdir(path)
|
||||
except OSError as e:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(_('ERROR: Unable to access %(path)s: '
|
||||
'%(error)s') %
|
||||
{'path': path, 'error': e})
|
||||
@ -95,7 +128,9 @@ class ObjectUpdater(Daemon):
|
||||
self.get_container_ring().get_nodes('')
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional
|
||||
# unmounted drive is part of normal cluster operations,
|
||||
# so a simple warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
@ -107,17 +142,22 @@ class ObjectUpdater(Daemon):
|
||||
else:
|
||||
signal.signal(signal.SIGTERM, signal.SIG_DFL)
|
||||
eventlet_monkey_patch()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
forkbegin = time.time()
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - forkbegin
|
||||
self.logger.info(
|
||||
_('Object update sweep of %(device)s'
|
||||
' completed: %(elapsed).02fs, %(success)s successes'
|
||||
', %(fail)s failures'),
|
||||
('Object update sweep of %(device)s '
|
||||
'completed: %(elapsed).02fs, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors'),
|
||||
{'device': device, 'elapsed': elapsed,
|
||||
'success': self.successes, 'fail': self.failures})
|
||||
'success': self.stats.successes,
|
||||
'failures': self.stats.failures,
|
||||
'quarantines': self.stats.quarantines,
|
||||
'unlinks': self.stats.unlinks,
|
||||
'errors': self.stats.errors})
|
||||
sys.exit()
|
||||
while pids:
|
||||
pids.remove(os.wait()[0])
|
||||
@ -133,21 +173,29 @@ class ObjectUpdater(Daemon):
|
||||
"""Run the updater once."""
|
||||
self.logger.info(_('Begin object update single threaded sweep'))
|
||||
begin = time.time()
|
||||
self.successes = 0
|
||||
self.failures = 0
|
||||
self.stats.reset()
|
||||
for device in self._listdir(self.devices):
|
||||
if not check_drive(self.devices, device, self.mount_check):
|
||||
self.logger.increment('errors')
|
||||
# We don't count this as an error. The occasional unmounted
|
||||
# drive is part of normal cluster operations, so a simple
|
||||
# warning is sufficient.
|
||||
self.logger.warning(
|
||||
_('Skipping %s as it is not mounted'), device)
|
||||
continue
|
||||
self.object_sweep(os.path.join(self.devices, device))
|
||||
elapsed = time.time() - begin
|
||||
self.logger.info(
|
||||
_('Object update single threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(success)s successes, %(fail)s failures'),
|
||||
{'elapsed': elapsed, 'success': self.successes,
|
||||
'fail': self.failures})
|
||||
('Object update single-threaded sweep completed: '
|
||||
'%(elapsed).02fs, %(successes)d successes, '
|
||||
'%(failures)d failures, '
|
||||
'%(quarantines)d quarantines, %(unlinks)d unlinks, '
|
||||
'%(errors)d errors'),
|
||||
{'elapsed': elapsed,
|
||||
'successes': self.stats.successes,
|
||||
'failures': self.stats.failures,
|
||||
'quarantines': self.stats.quarantines,
|
||||
'unlinks': self.stats.unlinks,
|
||||
'errors': self.stats.errors})
|
||||
dump_recon_cache({'object_updater_sweep': elapsed},
|
||||
self.rcache, self.logger)
|
||||
|
||||
@ -158,6 +206,12 @@ class ObjectUpdater(Daemon):
|
||||
:param device: path to device
|
||||
"""
|
||||
start_time = time.time()
|
||||
last_status_update = start_time
|
||||
start_stats = self.stats.copy()
|
||||
my_pid = os.getpid()
|
||||
self.logger.info("Object update sweep starting on %s (pid: %d)",
|
||||
device, my_pid)
|
||||
|
||||
# loop through async pending dirs for all policies
|
||||
for asyncdir in self._listdir(device):
|
||||
# we only care about directories
|
||||
@ -170,6 +224,8 @@ class ObjectUpdater(Daemon):
|
||||
try:
|
||||
base, policy = split_policy_string(asyncdir)
|
||||
except PolicyError as e:
|
||||
# This isn't an error, but a misconfiguration. Logging a
|
||||
# warning should be sufficient.
|
||||
self.logger.warning(_('Directory %(directory)r does not map '
|
||||
'to a valid policy (%(error)s)') % {
|
||||
'directory': asyncdir, 'error': e})
|
||||
@ -186,6 +242,7 @@ class ObjectUpdater(Daemon):
|
||||
try:
|
||||
obj_hash, timestamp = update.split('-')
|
||||
except ValueError:
|
||||
self.stats.errors += 1
|
||||
self.logger.increment('errors')
|
||||
self.logger.error(
|
||||
_('ERROR async pending file with unexpected '
|
||||
@ -193,7 +250,8 @@ class ObjectUpdater(Daemon):
|
||||
% (update_path))
|
||||
continue
|
||||
if obj_hash == last_obj_hash:
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.process_object_update(update_path, device,
|
||||
@ -203,11 +261,47 @@ class ObjectUpdater(Daemon):
|
||||
self.objects_running_time = ratelimit_sleep(
|
||||
self.objects_running_time,
|
||||
self.max_objects_per_second)
|
||||
|
||||
now = time.time()
|
||||
if now - last_status_update >= self.report_interval:
|
||||
this_sweep = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep progress on %(device)s: '
|
||||
'%(elapsed).02fs, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors '
|
||||
'(pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': now - start_time,
|
||||
'pid': my_pid,
|
||||
'successes': this_sweep.successes,
|
||||
'failures': this_sweep.failures,
|
||||
'quarantines': this_sweep.quarantines,
|
||||
'unlinks': this_sweep.unlinks,
|
||||
'errors': this_sweep.errors})
|
||||
last_status_update = now
|
||||
try:
|
||||
os.rmdir(prefix_path)
|
||||
except OSError:
|
||||
pass
|
||||
self.logger.timing_since('timing', start_time)
|
||||
sweep_totals = self.stats.since(start_stats)
|
||||
self.logger.info(
|
||||
('Object update sweep completed on %(device)s '
|
||||
'in %(elapsed).02fs seconds:, '
|
||||
'%(successes)d successes, %(failures)d failures, '
|
||||
'%(quarantines)d quarantines, '
|
||||
'%(unlinks)d unlinks, %(errors)d errors '
|
||||
'(pid: %(pid)d)'),
|
||||
{'device': device,
|
||||
'elapsed': time.time() - start_time,
|
||||
'pid': my_pid,
|
||||
'successes': sweep_totals.successes,
|
||||
'failures': sweep_totals.failures,
|
||||
'quarantines': sweep_totals.quarantines,
|
||||
'unlinks': sweep_totals.unlinks,
|
||||
'errors': sweep_totals.errors})
|
||||
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
"""
|
||||
@ -222,6 +316,7 @@ class ObjectUpdater(Daemon):
|
||||
except Exception:
|
||||
self.logger.exception(
|
||||
_('ERROR Pickle problem, quarantining %s'), update_path)
|
||||
self.stats.quarantines += 1
|
||||
self.logger.increment('quarantines')
|
||||
target_path = os.path.join(device, 'quarantined', 'objects',
|
||||
os.path.basename(update_path))
|
||||
@ -249,14 +344,15 @@ class ObjectUpdater(Daemon):
|
||||
else:
|
||||
success = False
|
||||
if success:
|
||||
self.successes += 1
|
||||
self.stats.successes += 1
|
||||
self.logger.increment('successes')
|
||||
self.logger.debug('Update sent for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
self.logger.increment("unlinks")
|
||||
self.stats.unlinks += 1
|
||||
self.logger.increment('unlinks')
|
||||
os.unlink(update_path)
|
||||
else:
|
||||
self.failures += 1
|
||||
self.stats.failures += 1
|
||||
self.logger.increment('failures')
|
||||
self.logger.debug('Update failed for %(obj)s %(path)s',
|
||||
{'obj': obj, 'path': update_path})
|
||||
|
@ -25,7 +25,8 @@ from tempfile import mkdtemp
|
||||
from shutil import rmtree
|
||||
from test import listen_zero
|
||||
from test.unit import (
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn)
|
||||
make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn,
|
||||
FakeLogger)
|
||||
from time import time
|
||||
from distutils.dir_util import mkpath
|
||||
|
||||
@ -248,6 +249,66 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
# a warning indicating that the '99' policy isn't valid
|
||||
check_with_idx('99', 1, should_skip=True)
|
||||
|
||||
def test_sweep_logs(self):
|
||||
asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE)
|
||||
prefix_dir = os.path.join(asyncdir, 'abc')
|
||||
mkpath(prefix_dir)
|
||||
|
||||
for o, t in [('abc', 123), ('def', 234), ('ghi', 345),
|
||||
('jkl', 456), ('mno', 567)]:
|
||||
ohash = hash_path('account', 'container', o)
|
||||
o_path = os.path.join(prefix_dir, ohash + '-' +
|
||||
normalize_timestamp(t))
|
||||
write_pickle({}, o_path)
|
||||
|
||||
class MockObjectUpdater(object_updater.ObjectUpdater):
|
||||
def process_object_update(self, update_path, device, policy):
|
||||
os.unlink(update_path)
|
||||
self.stats.successes += 1
|
||||
self.stats.unlinks += 1
|
||||
|
||||
logger = FakeLogger()
|
||||
ou = MockObjectUpdater({
|
||||
'devices': self.devices_dir,
|
||||
'mount_check': 'false',
|
||||
'swift_dir': self.testdir,
|
||||
'interval': '1',
|
||||
'concurrency': '1',
|
||||
'report_interval': '10.0',
|
||||
'node_timeout': '5'}, logger=logger)
|
||||
|
||||
now = [time()]
|
||||
|
||||
def mock_time_function():
|
||||
rv = now[0]
|
||||
now[0] += 5
|
||||
return rv
|
||||
|
||||
# With 10s between updates, time() advancing 5s every time we look,
|
||||
# and 5 async_pendings on disk, we should get at least two progress
|
||||
# lines.
|
||||
with mock.patch('swift.obj.updater.time',
|
||||
mock.MagicMock(time=mock_time_function)):
|
||||
ou.object_sweep(self.sda1)
|
||||
|
||||
info_lines = logger.get_lines_for_level('info')
|
||||
self.assertEqual(4, len(info_lines))
|
||||
self.assertIn("sweep starting", info_lines[0])
|
||||
self.assertIn(self.sda1, info_lines[0])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[1])
|
||||
# the space ensures it's a positive number
|
||||
self.assertIn(" 2 successes", info_lines[1])
|
||||
self.assertIn(self.sda1, info_lines[1])
|
||||
|
||||
self.assertIn("sweep progress", info_lines[2])
|
||||
self.assertIn(" 4 successes", info_lines[2])
|
||||
self.assertIn(self.sda1, info_lines[2])
|
||||
|
||||
self.assertIn("sweep complete", info_lines[3])
|
||||
self.assertIn(" 5 successes", info_lines[3])
|
||||
self.assertIn(self.sda1, info_lines[3])
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once_with_disk_unmounted(self, mock_check_drive):
|
||||
mock_check_drive.return_value = False
|
||||
@ -286,7 +347,7 @@ class TestObjectUpdater(unittest.TestCase):
|
||||
self.assertEqual([
|
||||
mock.call(self.devices_dir, 'sda1', True),
|
||||
], mock_check_drive.mock_calls)
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1})
|
||||
self.assertEqual(ou.logger.get_increment_counts(), {})
|
||||
|
||||
@mock.patch.object(object_updater, 'check_drive')
|
||||
def test_run_once(self, mock_check_drive):
|
||||
|
Loading…
x
Reference in New Issue
Block a user