diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 9692eb8c8d..3ea2344a47 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -909,6 +909,8 @@ objects_per_second 50 Maximum objects updated per second. system specs. 0 is unlimited. slowdown 0.01 Time in seconds to wait between objects. Deprecated in favor of objects_per_second. +report_interval 300 Interval in seconds between logging + statistics about the current update pass. recon_cache_path /var/cache/swift Path to recon cache nice_priority None Scheduling priority of server processes. Niceness values range from -20 (most diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 04d04c11c5..8aae6ba457 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -372,6 +372,12 @@ use = egg:swift#recon # objects_per_second instead. # slowdown = 0.01 # +# Log stats (at INFO level) every report_interval seconds. This +# logging is per-process, so with concurrency > 1, the logs will +# contain one stats log per worker process every report_interval +# seconds. +# report_interval = 300 +# # recon_cache_path = /var/cache/swift # # You can set scheduling priority of processes. Niceness values range from -20 diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 0726d42cca..3116aa4e27 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -36,6 +36,37 @@ from swift.obj.diskfile import get_tmp_dir, ASYNCDIR_BASE from swift.common.http import is_success, HTTP_INTERNAL_SERVER_ERROR +class SweepStats(object): + """ + Stats bucket for an update sweep + """ + def __init__(self, errors=0, failures=0, quarantines=0, successes=0, + unlinks=0): + self.errors = errors + self.failures = failures + self.quarantines = quarantines + self.successes = successes + self.unlinks = unlinks + + def copy(self): + return type(self)(self.errors, self.failures, self.quarantines, + self.successes, self.unlinks) + + def since(self, other): + return type(self)(self.errors - other.errors, + self.failures - other.failures, + self.quarantines - other.quarantines, + self.successes - other.successes, + self.unlinks - other.unlinks) + + def reset(self): + self.errors = 0 + self.failures = 0 + self.quarantines = 0 + self.successes = 0 + self.unlinks = 0 + + class ObjectUpdater(Daemon): """Update object information in container listings.""" @@ -63,16 +94,18 @@ class ObjectUpdater(Daemon): objects_per_second)) self.node_timeout = float(conf.get('node_timeout', 10)) self.conn_timeout = float(conf.get('conn_timeout', 0.5)) - self.successes = 0 - self.failures = 0 + self.report_interval = float(conf.get('report_interval', 300)) self.recon_cache_path = conf.get('recon_cache_path', '/var/cache/swift') self.rcache = os.path.join(self.recon_cache_path, 'object.recon') + self.stats = SweepStats() def _listdir(self, path): try: return os.listdir(path) except OSError as e: + self.stats.errors += 1 + self.logger.increment('errors') self.logger.error(_('ERROR: Unable to access %(path)s: ' '%(error)s') % {'path': path, 'error': e}) @@ -95,7 +128,9 @@ class ObjectUpdater(Daemon): self.get_container_ring().get_nodes('') for device in self._listdir(self.devices): if not check_drive(self.devices, device, self.mount_check): - self.logger.increment('errors') + # We don't count this as an error. The occasional + # unmounted drive is part of normal cluster operations, + # so a simple warning is sufficient. self.logger.warning( _('Skipping %s as it is not mounted'), device) continue @@ -107,17 +142,22 @@ class ObjectUpdater(Daemon): else: signal.signal(signal.SIGTERM, signal.SIG_DFL) eventlet_monkey_patch() - self.successes = 0 - self.failures = 0 + self.stats.reset() forkbegin = time.time() self.object_sweep(os.path.join(self.devices, device)) elapsed = time.time() - forkbegin self.logger.info( - _('Object update sweep of %(device)s' - ' completed: %(elapsed).02fs, %(success)s successes' - ', %(fail)s failures'), + ('Object update sweep of %(device)s ' + 'completed: %(elapsed).02fs, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors'), {'device': device, 'elapsed': elapsed, - 'success': self.successes, 'fail': self.failures}) + 'success': self.stats.successes, + 'failures': self.stats.failures, + 'quarantines': self.stats.quarantines, + 'unlinks': self.stats.unlinks, + 'errors': self.stats.errors}) sys.exit() while pids: pids.remove(os.wait()[0]) @@ -133,21 +173,29 @@ class ObjectUpdater(Daemon): """Run the updater once.""" self.logger.info(_('Begin object update single threaded sweep')) begin = time.time() - self.successes = 0 - self.failures = 0 + self.stats.reset() for device in self._listdir(self.devices): if not check_drive(self.devices, device, self.mount_check): - self.logger.increment('errors') + # We don't count this as an error. The occasional unmounted + # drive is part of normal cluster operations, so a simple + # warning is sufficient. self.logger.warning( _('Skipping %s as it is not mounted'), device) continue self.object_sweep(os.path.join(self.devices, device)) elapsed = time.time() - begin self.logger.info( - _('Object update single threaded sweep completed: ' - '%(elapsed).02fs, %(success)s successes, %(fail)s failures'), - {'elapsed': elapsed, 'success': self.successes, - 'fail': self.failures}) + ('Object update single-threaded sweep completed: ' + '%(elapsed).02fs, %(successes)d successes, ' + '%(failures)d failures, ' + '%(quarantines)d quarantines, %(unlinks)d unlinks, ' + '%(errors)d errors'), + {'elapsed': elapsed, + 'successes': self.stats.successes, + 'failures': self.stats.failures, + 'quarantines': self.stats.quarantines, + 'unlinks': self.stats.unlinks, + 'errors': self.stats.errors}) dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger) @@ -158,6 +206,12 @@ class ObjectUpdater(Daemon): :param device: path to device """ start_time = time.time() + last_status_update = start_time + start_stats = self.stats.copy() + my_pid = os.getpid() + self.logger.info("Object update sweep starting on %s (pid: %d)", + device, my_pid) + # loop through async pending dirs for all policies for asyncdir in self._listdir(device): # we only care about directories @@ -170,6 +224,8 @@ class ObjectUpdater(Daemon): try: base, policy = split_policy_string(asyncdir) except PolicyError as e: + # This isn't an error, but a misconfiguration. Logging a + # warning should be sufficient. self.logger.warning(_('Directory %(directory)r does not map ' 'to a valid policy (%(error)s)') % { 'directory': asyncdir, 'error': e}) @@ -186,6 +242,7 @@ class ObjectUpdater(Daemon): try: obj_hash, timestamp = update.split('-') except ValueError: + self.stats.errors += 1 self.logger.increment('errors') self.logger.error( _('ERROR async pending file with unexpected ' @@ -193,7 +250,8 @@ class ObjectUpdater(Daemon): % (update_path)) continue if obj_hash == last_obj_hash: - self.logger.increment("unlinks") + self.stats.unlinks += 1 + self.logger.increment('unlinks') os.unlink(update_path) else: self.process_object_update(update_path, device, @@ -203,11 +261,47 @@ class ObjectUpdater(Daemon): self.objects_running_time = ratelimit_sleep( self.objects_running_time, self.max_objects_per_second) + + now = time.time() + if now - last_status_update >= self.report_interval: + this_sweep = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep progress on %(device)s: ' + '%(elapsed).02fs, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors ' + '(pid: %(pid)d)'), + {'device': device, + 'elapsed': now - start_time, + 'pid': my_pid, + 'successes': this_sweep.successes, + 'failures': this_sweep.failures, + 'quarantines': this_sweep.quarantines, + 'unlinks': this_sweep.unlinks, + 'errors': this_sweep.errors}) + last_status_update = now try: os.rmdir(prefix_path) except OSError: pass self.logger.timing_since('timing', start_time) + sweep_totals = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep completed on %(device)s ' + 'in %(elapsed).02fs seconds:, ' + '%(successes)d successes, %(failures)d failures, ' + '%(quarantines)d quarantines, ' + '%(unlinks)d unlinks, %(errors)d errors ' + '(pid: %(pid)d)'), + {'device': device, + 'elapsed': time.time() - start_time, + 'pid': my_pid, + 'successes': sweep_totals.successes, + 'failures': sweep_totals.failures, + 'quarantines': sweep_totals.quarantines, + 'unlinks': sweep_totals.unlinks, + 'errors': sweep_totals.errors}) def process_object_update(self, update_path, device, policy): """ @@ -222,6 +316,7 @@ class ObjectUpdater(Daemon): except Exception: self.logger.exception( _('ERROR Pickle problem, quarantining %s'), update_path) + self.stats.quarantines += 1 self.logger.increment('quarantines') target_path = os.path.join(device, 'quarantined', 'objects', os.path.basename(update_path)) @@ -249,14 +344,15 @@ class ObjectUpdater(Daemon): else: success = False if success: - self.successes += 1 + self.stats.successes += 1 self.logger.increment('successes') self.logger.debug('Update sent for %(obj)s %(path)s', {'obj': obj, 'path': update_path}) - self.logger.increment("unlinks") + self.stats.unlinks += 1 + self.logger.increment('unlinks') os.unlink(update_path) else: - self.failures += 1 + self.stats.failures += 1 self.logger.increment('failures') self.logger.debug('Update failed for %(obj)s %(path)s', {'obj': obj, 'path': update_path}) diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 30012f1a76..32f72094d8 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -25,7 +25,8 @@ from tempfile import mkdtemp from shutil import rmtree from test import listen_zero from test.unit import ( - make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn) + make_timestamp_iter, debug_logger, patch_policies, mocked_http_conn, + FakeLogger) from time import time from distutils.dir_util import mkpath @@ -248,6 +249,66 @@ class TestObjectUpdater(unittest.TestCase): # a warning indicating that the '99' policy isn't valid check_with_idx('99', 1, should_skip=True) + def test_sweep_logs(self): + asyncdir = os.path.join(self.sda1, ASYNCDIR_BASE) + prefix_dir = os.path.join(asyncdir, 'abc') + mkpath(prefix_dir) + + for o, t in [('abc', 123), ('def', 234), ('ghi', 345), + ('jkl', 456), ('mno', 567)]: + ohash = hash_path('account', 'container', o) + o_path = os.path.join(prefix_dir, ohash + '-' + + normalize_timestamp(t)) + write_pickle({}, o_path) + + class MockObjectUpdater(object_updater.ObjectUpdater): + def process_object_update(self, update_path, device, policy): + os.unlink(update_path) + self.stats.successes += 1 + self.stats.unlinks += 1 + + logger = FakeLogger() + ou = MockObjectUpdater({ + 'devices': self.devices_dir, + 'mount_check': 'false', + 'swift_dir': self.testdir, + 'interval': '1', + 'concurrency': '1', + 'report_interval': '10.0', + 'node_timeout': '5'}, logger=logger) + + now = [time()] + + def mock_time_function(): + rv = now[0] + now[0] += 5 + return rv + + # With 10s between updates, time() advancing 5s every time we look, + # and 5 async_pendings on disk, we should get at least two progress + # lines. + with mock.patch('swift.obj.updater.time', + mock.MagicMock(time=mock_time_function)): + ou.object_sweep(self.sda1) + + info_lines = logger.get_lines_for_level('info') + self.assertEqual(4, len(info_lines)) + self.assertIn("sweep starting", info_lines[0]) + self.assertIn(self.sda1, info_lines[0]) + + self.assertIn("sweep progress", info_lines[1]) + # the space ensures it's a positive number + self.assertIn(" 2 successes", info_lines[1]) + self.assertIn(self.sda1, info_lines[1]) + + self.assertIn("sweep progress", info_lines[2]) + self.assertIn(" 4 successes", info_lines[2]) + self.assertIn(self.sda1, info_lines[2]) + + self.assertIn("sweep complete", info_lines[3]) + self.assertIn(" 5 successes", info_lines[3]) + self.assertIn(self.sda1, info_lines[3]) + @mock.patch.object(object_updater, 'check_drive') def test_run_once_with_disk_unmounted(self, mock_check_drive): mock_check_drive.return_value = False @@ -286,7 +347,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual([ mock.call(self.devices_dir, 'sda1', True), ], mock_check_drive.mock_calls) - self.assertEqual(ou.logger.get_increment_counts(), {'errors': 1}) + self.assertEqual(ou.logger.get_increment_counts(), {}) @mock.patch.object(object_updater, 'check_drive') def test_run_once(self, mock_check_drive):