diff --git a/etc/container-reconciler.conf-sample b/etc/container-reconciler.conf-sample index 7b74124e1f..422a567200 100644 --- a/etc/container-reconciler.conf-sample +++ b/etc/container-reconciler.conf-sample @@ -60,6 +60,16 @@ # Number of objects to process concurrently per process # concurrency = 1 +# processes is how many parts to divide the work into, one part per process +# that will be doing the work +# processes set 0 means that a single process will be doing all the work +# processes = 0 +# +# process is which of the parts a particular process will work on +# process is "zero based", if you want to use 3 processes, you should run +# processes with process set to 0, 1, and 2 +# process = 0 + [pipeline:main] # Note that the reconciler's pipeline is intentionally very sparse -- it is # only responsible for moving data from one policy to another and should not diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 0116df6d76..39faf6b7a9 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -31,7 +31,7 @@ from swift.common.request_helpers import MISPLACED_OBJECTS_ACCOUNT, \ USE_REPLICATION_NETWORK_HEADER from swift.common.utils import get_logger, split_path, majority_size, \ FileLikeIter, Timestamp, last_modified_date_to_timestamp, \ - LRUCache, decode_timestamps + LRUCache, decode_timestamps, hash_path from swift.common.storage_policy import POLICIES MISPLACED_OBJECTS_CONTAINER_DIVISOR = 3600 # 1 hour @@ -381,6 +381,17 @@ class ContainerReconciler(Daemon): self.concurrency = int(conf.get('concurrency', 1)) if self.concurrency < 1: raise ValueError("concurrency must be set to at least 1") + self.processes = int(self.conf.get('processes', 0)) + if self.processes < 0: + raise ValueError( + 'processes must be an integer greater than or equal to 0') + self.process = int(self.conf.get('process', 0)) + if self.process < 0: + raise ValueError( + 'process must be an integer greater than or equal to 0') + if self.processes and self.process >= self.processes: + raise ValueError( + 'process must be less than processes') def stats_log(self, metric, msg, *args, **kwargs): """ @@ -773,6 +784,19 @@ class ContainerReconciler(Daemon): MISPLACED_OBJECTS_ACCOUNT, container, acceptable_statuses=(2, 404, 409, 412)) + def should_process(self, queue_item): + """ + Check if a given entry should be handled by this process. + + :param container: the queue container + :param queue_item: an entry from the queue + """ + if not self.processes: + return True + hexdigest = hash_path( + queue_item['account'], queue_item['container'], queue_item['obj']) + return int(hexdigest, 16) % self.processes == self.process + def process_queue_item(self, q_container, q_entry, queue_item): """ Process an entry and remove from queue on success. @@ -806,8 +830,9 @@ class ContainerReconciler(Daemon): 'invalid queue record: %r', raw_obj, level=logging.ERROR, exc_info=True) continue - pool.spawn_n(self.process_queue_item, - container, raw_obj['name'], queue_item) + if self.should_process(queue_item): + pool.spawn_n(self.process_queue_item, + container, raw_obj['name'], queue_item) self.log_stats() pool.waitall() diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py index d62750f272..9185f0a06d 100644 --- a/test/unit/container/test_reconciler.py +++ b/test/unit/container/test_reconciler.py @@ -765,6 +765,34 @@ class TestReconciler(unittest.TestCase): self.assertRaises(ValueError, reconciler.ContainerReconciler, conf, self.logger, self.swift) + def test_processes_config(self): + conf = {} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.process, 0) + self.assertEqual(r.processes, 0) + + conf = {'processes': '1'} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.process, 0) + self.assertEqual(r.processes, 1) + + conf = {'processes': 10, 'process': '9'} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.process, 9) + self.assertEqual(r.processes, 10) + + conf = {'processes': -1} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + + conf = {'process': -1} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + + conf = {'processes': 9, 'process': 9} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + def _mock_listing(self, objects): self.swift.parse(objects) self.fake_swift = self.reconciler.swift.app @@ -853,6 +881,57 @@ class TestReconciler(unittest.TestCase): ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'), ]) + def test_multi_process_should_process(self): + def mkqi(a, c, o): + "make queue item" + return { + 'account': a, + 'container': c, + 'obj': o, + } + queue = [ + mkqi('a', 'c', 'o1'), + mkqi('a', 'c', 'o2'), + mkqi('a', 'c', 'o3'), + mkqi('a', 'c', 'o4'), + ] + + def map_should_process(process, processes): + self.reconciler.process = process + self.reconciler.processes = processes + with mock.patch('swift.common.utils.HASH_PATH_SUFFIX', + b'endcap'), \ + mock.patch('swift.common.utils.HASH_PATH_PREFIX', b''): + return [self.reconciler.should_process(q_item) + for q_item in queue] + + def check_process(process, processes, expected): + should_process = map_should_process(process, processes) + try: + self.assertEqual(should_process, expected) + except AssertionError as e: + self.fail('unexpected items processed for %s/%s\n%s' % ( + process, processes, e)) + + check_process(0, 0, [True] * 4) + check_process(0, 1, [True] * 4) + check_process(0, 2, [False, True, False, False]) + check_process(1, 2, [True, False, True, True]) + + check_process(0, 4, [False, True, False, False]) + check_process(1, 4, [True, False, False, False]) + check_process(2, 4, [False] * 4) # lazy + check_process(3, 4, [False, False, True, True]) + + queue = [mkqi('a%s' % i, 'c%s' % i, 'o%s' % i) for i in range(1000)] + items_handled = [0] * 1000 + for process in range(100): + should_process = map_should_process(process, 100) + for i, handled in enumerate(should_process): + if handled: + items_handled[i] += 1 + self.assertEqual([1] * 1000, items_handled) + def test_invalid_queue_name(self): self._mock_listing({ (None, "/.misplaced_objects/3600/bogus"): 3618.84187,