diff --git a/doc/source/deployment_guide.rst b/doc/source/deployment_guide.rst index 1218190661..050954ccd9 100644 --- a/doc/source/deployment_guide.rst +++ b/doc/source/deployment_guide.rst @@ -909,7 +909,9 @@ log_facility LOG_LOCAL0 Syslog log facility log_level INFO Logging level log_address /dev/log Logging directory interval 300 Minimum time for a pass to take -concurrency 1 Number of updater workers to spawn +updater_workers 1 Number of worker processes +concurrency 8 Number of updates to run concurrently in + each worker process node_timeout DEFAULT or 10 Request timeout to external services. This uses what's set here, or what's set in the DEFAULT section, or 10 (though other diff --git a/etc/object-server.conf-sample b/etc/object-server.conf-sample index 097f31472f..9babc68c3f 100644 --- a/etc/object-server.conf-sample +++ b/etc/object-server.conf-sample @@ -371,9 +371,16 @@ use = egg:swift#recon # log_address = /dev/log # # interval = 300 -# concurrency = 1 # node_timeout = # +# updater_workers controls how many processes the object updater will +# spawn, while concurrency controls how many async_pending records +# each updater process will operate on at any one time. With +# concurrency=C and updater_workers=W, there will be up to W*C +# async_pending records being processed at once. +# concurrency = 8 +# updater_workers = 1 +# # Send at most this many object updates per second # objects_per_second = 50 # diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 6f8419d329..a189b9c764 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -29,7 +29,7 @@ from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ - eventlet_monkey_patch, get_redirect_data + eventlet_monkey_patch, get_redirect_data, ContextPool from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict from swift.common.storage_policy import split_policy_string, PolicyError @@ -94,7 +94,8 @@ class ObjectUpdater(Daemon): self.swift_dir = conf.get('swift_dir', '/etc/swift') self.interval = int(conf.get('interval', 300)) self.container_ring = None - self.concurrency = int(conf.get('concurrency', 1)) + self.concurrency = int(conf.get('concurrency', 8)) + self.updater_workers = int(conf.get('updater_workers', 1)) if 'slowdown' in conf: self.logger.warning( 'The slowdown option is deprecated in favor of ' @@ -150,7 +151,7 @@ class ObjectUpdater(Daemon): self.logger.warning( _('Skipping %s as it is not mounted'), device) continue - while len(pids) >= self.concurrency: + while len(pids) >= self.updater_workers: pids.remove(os.wait()[0]) pid = os.fork() if pid: @@ -230,6 +231,7 @@ class ObjectUpdater(Daemon): prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): continue + last_obj_hash = None for update in sorted(self._listdir(prefix_path), reverse=True): update_path = os.path.join(prefix_path, update) if not os.path.isfile(update_path): @@ -244,13 +246,34 @@ class ObjectUpdater(Daemon): 'name %s') % (update_path)) continue - yield {'device': device, 'policy': policy, - 'path': update_path, - 'obj_hash': obj_hash, 'timestamp': timestamp} - try: - os.rmdir(prefix_path) - except OSError: - pass + # Async pendings are stored on disk like this: + # + # /async_pending//- + # + # If there are multiple updates for a given object, + # they'll look like this: + # + # /async_pending//- + # /async_pending//- + # /async_pending//- + # + # Async updates also have the property that newer + # updates contain all the information in older updates. + # Since we sorted the directory listing in reverse + # order, we'll see timestamp3 first, yield it, and then + # unlink timestamp2 and timestamp1 since we know they + # are obsolete. + # + # This way, our caller only gets useful async_pendings. + if obj_hash == last_obj_hash: + self.stats.unlinks += 1 + self.logger.increment('unlinks') + os.unlink(update_path) + else: + last_obj_hash = obj_hash + yield {'device': device, 'policy': policy, + 'path': update_path, + 'obj_hash': obj_hash, 'timestamp': timestamp} def object_sweep(self, device): """ @@ -265,31 +288,25 @@ class ObjectUpdater(Daemon): self.logger.info("Object update sweep starting on %s (pid: %d)", device, my_pid) - last_obj_hash = None ap_iter = RateLimitedIterator( self._iter_async_pendings(device), elements_per_second=self.max_objects_per_second) - for update in ap_iter: - if update['obj_hash'] == last_obj_hash: - self.stats.unlinks += 1 - self.logger.increment('unlinks') - os.unlink(update['path']) - else: - self.process_object_update(update['path'], update['device'], - update['policy']) - last_obj_hash = update['obj_hash'] - - now = time.time() - if now - last_status_update >= self.report_interval: - this_sweep = self.stats.since(start_stats) - self.logger.info( - ('Object update sweep progress on %(device)s: ' - '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), - {'device': device, - 'elapsed': now - start_time, - 'pid': my_pid, - 'stats': this_sweep}) - last_status_update = now + with ContextPool(self.concurrency) as pool: + for update in ap_iter: + pool.spawn(self.process_object_update, + update['path'], update['device'], update['policy']) + now = time.time() + if now - last_status_update >= self.report_interval: + this_sweep = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep progress on %(device)s: ' + '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), + {'device': device, + 'elapsed': now - start_time, + 'pid': my_pid, + 'stats': this_sweep}) + last_status_update = now + pool.waitall() self.logger.timing_since('timing', start_time) sweep_totals = self.stats.since(start_stats) @@ -370,6 +387,13 @@ class ObjectUpdater(Daemon): self.stats.unlinks += 1 self.logger.increment('unlinks') os.unlink(update_path) + try: + # If this was the last async_pending in the directory, + # then this will succeed. Otherwise, it'll fail, and + # that's okay. + os.rmdir(os.path.dirname(update_path)) + except OSError: + pass elif redirects: # erase any previous successes update.pop('successes', None) diff --git a/test/unit/obj/test_updater.py b/test/unit/obj/test_updater.py index 98146f1214..da86a6a4f6 100644 --- a/test/unit/obj/test_updater.py +++ b/test/unit/obj/test_updater.py @@ -43,6 +43,23 @@ from swift.common.utils import ( from swift.common.storage_policy import StoragePolicy, POLICIES +class MockPool(object): + def __init__(self, *a, **kw): + pass + + def spawn(self, func, *args, **kwargs): + func(*args, **kwargs) + + def waitall(self): + pass + + def __enter__(self): + return self + + def __exit__(self, *a, **kw): + pass + + _mocked_policies = [StoragePolicy(0, 'zero', False), StoragePolicy(1, 'one', True)] @@ -104,7 +121,8 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.mount_check, True) self.assertEqual(daemon.swift_dir, '/etc/swift') self.assertEqual(daemon.interval, 300) - self.assertEqual(daemon.concurrency, 1) + self.assertEqual(daemon.concurrency, 8) + self.assertEqual(daemon.updater_workers, 1) self.assertEqual(daemon.max_objects_per_second, 50.0) # non-defaults @@ -114,6 +132,7 @@ class TestObjectUpdater(unittest.TestCase): 'swift_dir': '/not/here', 'interval': '600', 'concurrency': '2', + 'updater_workers': '3', 'objects_per_second': '10.5', } daemon = object_updater.ObjectUpdater(conf, logger=self.logger) @@ -122,6 +141,7 @@ class TestObjectUpdater(unittest.TestCase): self.assertEqual(daemon.swift_dir, '/not/here') self.assertEqual(daemon.interval, 600) self.assertEqual(daemon.concurrency, 2) + self.assertEqual(daemon.updater_workers, 3) self.assertEqual(daemon.max_objects_per_second, 10.5) # check deprecated option @@ -234,10 +254,8 @@ class TestObjectUpdater(unittest.TestCase): if should_skip: # if we were supposed to skip over the dir, we didn't process # anything at all - self.assertTrue(os.path.exists(prefix_dir)) self.assertEqual(set(), seen) else: - self.assertTrue(not os.path.exists(prefix_dir)) self.assertEqual(expected, seen) # test cleanup: the tempdir gets cleaned up between runs, but this @@ -291,7 +309,8 @@ class TestObjectUpdater(unittest.TestCase): # and 5 async_pendings on disk, we should get at least two progress # lines. with mock.patch('swift.obj.updater.time', - mock.MagicMock(time=mock_time_function)): + mock.MagicMock(time=mock_time_function)), \ + mock.patch.object(object_updater, 'ContextPool', MockPool): ou.object_sweep(self.sda1) info_lines = logger.get_lines_for_level('info') @@ -444,7 +463,6 @@ class TestObjectUpdater(unittest.TestCase): os.mkdir(odd_dir) ou.run_once() self.assertTrue(os.path.exists(async_dir)) - self.assertFalse(os.path.exists(odd_dir)) self.assertEqual([ mock.call(self.devices_dir, 'sda1', True), ], mock_check_drive.mock_calls) @@ -548,7 +566,13 @@ class TestObjectUpdater(unittest.TestCase): err = event.wait() if err: raise err - self.assertTrue(not os.path.exists(op_path)) + + # we remove the async_pending and its containing suffix dir, but not + # anything above that + self.assertFalse(os.path.exists(op_path)) + self.assertFalse(os.path.exists(os.path.dirname(op_path))) + self.assertTrue(os.path.exists(os.path.dirname(os.path.dirname( + op_path)))) self.assertEqual(ou.logger.get_increment_counts(), {'unlinks': 1, 'successes': 1})