From f64c00b00aa8df31a937448917421891904abdc8 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Fri, 12 Jan 2018 07:17:18 -0800 Subject: [PATCH] Improve object-updater's stats logging The object updater has five different stats, but its logging only told you two of them (successes and failures), and it only told you after finishing all the async_pendings for a device. If you have a cluster that's been sick and has millions upon millions of async_pendings laying around, then your object-updaters are frustratingly silent. I've seen one cluster with around 8 million async_pendings per disk where the object-updaters only emitted stats every 12 hours. Yes, if you have StatsD logging set up properly, you can go look at your graphs and get real-time feedback on what it's doing. If you don't have that, all you get is a frustrating silence. Now, the object updater tells you all of its stats (successes, failures, quarantines due to bad pickles, unlinks, and errors), and it tells you incremental progress every five minutes. The logging at the end of a pass remains and has been expanded to also include all stats. Also included is a small change to what counts as an error: unmounted drives no longer do. The goal is that only abnormal things count as errors, like permission problems, malformed filenames, and so on. These are things that should never happen, but if they do, may require operator intervention. Drives fail, so logging an error upon encountering an unmounted drive is not useful. Change-Id: Idbddd507f0b633d14dffb7a9834fce93a10359ab --- doc/source/deployment_guide.rst | 2 + etc/object-server.conf-sample | 6 ++ swift/obj/updater.py | 136 +++++++++++++++++++++++++++----- test/unit/obj/test_updater.py | 65 ++++++++++++++- 4 files changed, 187 insertions(+), 22 deletions(-) diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 9692eb8c8d..3ea2344a47 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second. system specs. 0 is unlimited. slowdown 0.01 Time in seconds to wait between objects. Deprecated in favor of objects_per_second. +report_interval 300 Interval in seconds between logging + statistics about the current update pass. recon_cache_path /var/cache/swift Path to recon cache nice_priority None Scheduling priority of server processes. Niceness values range from -20 (most diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 04d04c11c5..8aae6ba457 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -372,6 +372,12 @@ use = egg:swift#recon # objects_per_second instead. # slowdown = 0.01 # +# Log stats (at INFO level) every report_interval seconds. This +# logging is per-process, so with concurrency > 1, the logs will +# contain one stats log per worker process every report_interval +# seconds. +# report_interval = 300 +# # recon_cache_path = /var/cache/swift # # You can set scheduling priority of processes. Niceness values range from -20 diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 0726d42cca..3116aa4e27 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -36,6 +36,37 @@ from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR +class SweepStats(object): + """ + Stats bucket for an update sweep + """ + def __init__(self, errors=0, failures=0, quarantines=0, successes=0, + unlinks=0): + self.errors = errors + self.failures = failures + self.quarantines = quarantines + self.successes = successes + self.unlinks = unlinks + + def copy(self): + return type(self)(self.errors, self.failures, self.quarantines, + self.successes, self.unlinks) + + def since(self, other): + return type(self)(self.errors - other.errors, + self.failures - other.failures, + self.quarantines - other.quarantines, + self.successes - other.successes, + self.unlinks - other.unlinks) + + def reset(self): + self.errors = 0 + self.failures = 0 + self.quarantines = 0 + self.successes = 0 + self.unlinks = 0 + + class ObjectUpdater(Daemon): """Update object information in container listings.""" @@ -63,16 +94,18 @@ class ObjectUpdater(Daemon): objects_per_second)) self.node_timeout = float(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) - self.successes = 0 - self.failures = 0 + self.report_interval = float(conf.get('report_interval', 300)) self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, 'object.recon') + self.stats = SweepStats() def _listdir(self, path): try: return os.listdir(path) except OSError as e: + self.stats.errors += 1 + self.logger.increment('errors') self.logger.error(_('ERROR: Unable to access %(path)s: ' '%(error)s') % {'path': path, 'error': e}) @@ -95,7 +128,9 @@ class ObjectUpdater(Daemon): self.get_container_ring().get_nodes('') for device in self._listdir(self.devices): if not check_drive(self.devices, device, self.mount_check): - self.logger.increment('errors') + # We don't count this as an error. The occasional + # unmounted drive is part of normal cluster operations, + # so a simple warning is sufficient. self.logger.warning( _('Skipping %s as it is not mounted'), device) continue @@ -107,17 +142,22 @@ class ObjectUpdater(Daemon): else: signal.signal(signal.SIGTERM, signal.SIG_DFL) eventlet_monkey_patch() - self.successes = 0 - self.failures = 0 + self.stats.reset() forkbegin = time.time() self.object_sweep(os.path.join(self.devices, device)) elapsed = time.time() - forkbegin self.logger.info( - _('Object update sweep of %(device)s' - ' completed: %(elapsed).02fs, %(success)s successes' - ', %(fail)s failures'), + ('Object update sweep of %(device)s ' + 'completed: %(elapsed).02fs, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors'), {'device': device, 'elapsed': elapsed, - 'success': self.successes, 'fail': self.failures}) + 'success': self.stats.successes, + 'failures': self.stats.failures, + 'quarantines': self.stats.quarantines, + 'unlinks': self.stats.unlinks, + 'errors': self.stats.errors}) sys.exit() while pids: pids.remove(os.wait()[0]) @@ -133,21 +173,29 @@ class ObjectUpdater(Daemon): """Run the updater once.""" self.logger.info(_('Begin object update single threaded sweep')) begin = time.time() - self.successes = 0 - self.failures = 0 + self.stats.reset() for device in self._listdir(self.devices): if not check_drive(self.devices, device, self.mount_check): - self.logger.increment('errors') + # We don't count this as an error. The occasional unmounted + # drive is part of normal cluster operations, so a simple + # warning is sufficient. self.logger.warning( _('Skipping %s as it is not mounted'), device) continue self.object_sweep(os.path.join(self.devices, device)) elapsed = time.time() - begin self.logger.info( - _('Object update single threaded sweep completed: ' - '%(elapsed).02fs, %(success)s successes, %(fail)s failures'), - {'elapsed': elapsed, 'success': self.successes, - 'fail': self.failures}) + ('Object update single-threaded sweep completed: ' + '%(elapsed).02fs, %(successes)d successes, ' + '%(failures)d failures, ' + '%(quarantines)d quarantines, %(unlinks)d unlinks, ' + '%(errors)d errors'), + {'elapsed': elapsed, + 'successes': self.stats.successes, + 'failures': self.stats.failures, + 'quarantines': self.stats.quarantines, + 'unlinks': self.stats.unlinks, + 'errors': self.stats.errors}) dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger) @@ -158,6 +206,12 @@ class ObjectUpdater(Daemon): :param device: path to device """ start_time = time.time() + last_status_update = start_time + start_stats = self.stats.copy() + my_pid = os.getpid() + self.logger.info("Object update sweep starting on %s (pid: %d)", + device, my_pid) + # loop through async pending dirs for all policies for asyncdir in self._listdir(device): # we only care about directories @@ -170,6 +224,8 @@ class ObjectUpdater(Daemon): try: base, policy = split_policy_string(asyncdir) except PolicyError as e: + # This isn't an error, but a misconfiguration. Logging a + # warning should be sufficient. self.logger.warning(_('Directory %(directory)r does not map ' 'to a valid policy (%(error)s)') % { 'directory': asyncdir, 'error': e}) @@ -186,6 +242,7 @@ class ObjectUpdater(Daemon): try: obj_hash, timestamp = update.split('-') except ValueError: + self.stats.errors += 1 self.logger.increment('errors') self.logger.error( _('ERROR async pending file with unexpected ' @@ -193,7 +250,8 @@ class ObjectUpdater(Daemon): % (update_path)) continue if obj_hash == last_obj_hash: - self.logger.increment("unlinks") + self.stats.unlinks += 1 + self.logger.increment('unlinks') os.unlink(update_path) else: self.process_object_update(update_path, device, @@ -203,11 +261,47 @@ class ObjectUpdater(Daemon): self.objects_running_time = ratelimit_sleep( self.objects_running_time, self.max_objects_per_second) + + now = time.time() + if now - last_status_update >= self.report_interval: + this_sweep = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep progress on %(device)s: ' + '%(elapsed).02fs, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors ' + '(pid: %(pid)d)'), + {'device': device, + 'elapsed': now - start_time, + 'pid': my_pid, + 'successes': this_sweep.successes, + 'failures': this_sweep.failures, + 'quarantines': this_sweep.quarantines, + 'unlinks': this_sweep.unlinks, + 'errors': this_sweep.errors}) + last_status_update = now try: os.rmdir(prefix_path) except OSError: pass self.logger.timing_since('timing', start_time) + sweep_totals = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep completed on %(device)s ' + 'in %(elapsed).02fs seconds:, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors ' + '(pid: %(pid)d)'), + {'device': device, + 'elapsed': time.time() - start_time, + 'pid': my_pid, + 'successes': sweep_totals.successes, + 'failures': sweep_totals.failures, + 'quarantines': sweep_totals.quarantines, + 'unlinks': sweep_totals.unlinks, + 'errors': sweep_totals.errors}) def process_object_update(self, update_path, device, policy): """ @@ -222,6 +316,7 @@ class ObjectUpdater(Daemon): except Exception: self.logger.exception( _('ERROR Pickle problem, quarantining %s'), update_path) + self.stats.quarantines += 1 self.logger.increment('quarantines') target_path = os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)) @@ -249,14 +344,15 @@ class ObjectUpdater(Daemon): else: success = False if success: - self.successes += 1 + self.stats.successes += 1 self.logger.increment('successes') self.logger.debug('Update sent for %(obj)s %(path)s', {'obj': obj, 'path': update_path}) - self.logger.increment("unlinks") + self.stats.unlinks += 1 + self.logger.increment('unlinks') os.unlink(update_path) else: - self.failures += 1 + self.stats.failures += 1 self.logger.increment('failures') self.logger.debug('Update failed for %(obj)s %(path)s', {'obj': obj, 'path': update_path}) diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 30012f1a76..32f72094d8 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -25,7 +25,8 @@ from tempfile import mkdtemp from shutil import rmtree from test import listen_zero from test.unit import ( - make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn) + make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn, + FakeLogger) from time import time from distutils.dir_util import mkpath @@ -248,6 +249,66 @@ class TestObjectUpdater(unittest.TestCase): # a warning indicating that the '99' policy isn't valid check_with_idx('99', 1, should_skip=True) + def test_sweep_logs(self): + asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE) + prefix_dir = os.path.join(asyncdir, 'abc') + mkpath(prefix_dir) + + for o, t in [('abc', 123), ('def', 234), ('ghi', 345), + ('jkl', 456), ('mno', 567)]: + ohash = hash_path('account', 'container', o) + o_path = os.path.join(prefix_dir, ohash + '-' + + normalize_timestamp(t)) + write_pickle({}, o_path) + + class MockObjectUpdater(object_updater.ObjectUpdater): + def process_object_update(self, update_path, device, policy): + os.unlink(update_path) + self.stats.successes += 1 + self.stats.unlinks += 1 + + logger = FakeLogger() + ou = MockObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'report_interval': '10.0', + 'node_timeout': '5'}, logger=logger) + + now = [time()] + + def mock_time_function(): + rv = now[0] + now[0] += 5 + return rv + + # With 10s between updates, time() advancing 5s every time we look, + # and 5 async_pendings on disk, we should get at least two progress + # lines. + with mock.patch('swift.obj.updater.time', + mock.MagicMock(time=mock_time_function)): + ou.object_sweep(self.sda1) + + info_lines = logger.get_lines_for_level('info') + self.assertEqual(4, len(info_lines)) + self.assertIn("sweep starting", info_lines[0]) + self.assertIn(self.sda1, info_lines[0]) + + self.assertIn("sweep progress", info_lines[1]) + # the space ensures it's a positive number + self.assertIn(" 2 successes", info_lines[1]) + self.assertIn(self.sda1, info_lines[1]) + + self.assertIn("sweep progress", info_lines[2]) + self.assertIn(" 4 successes", info_lines[2]) + self.assertIn(self.sda1, info_lines[2]) + + self.assertIn("sweep complete", info_lines[3]) + self.assertIn(" 5 successes", info_lines[3]) + self.assertIn(self.sda1, info_lines[3]) + @mock.patch.object(object_updater, 'check_drive') def test_run_once_with_disk_unmounted(self, mock_check_drive): mock_check_drive.return_value = False @@ -286,7 +347,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual([ mock.call(self.devices_dir, 'sda1', True), ], mock_check_drive.mock_calls) - self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1}) + self.assertEqual(ou.logger.get_increment_counts(), {}) @mock.patch.object(object_updater, 'check_drive') def test_run_once(self, mock_check_drive):