diff --git a/etc/container-reconciler.conf-sample b/etc/container-reconciler.conf-sample index cd61f9a690..7b74124e1f 100644 --- a/etc/container-reconciler.conf-sample +++ b/etc/container-reconciler.conf-sample @@ -57,6 +57,8 @@ # Work only with ionice_class. # ionice_class = # ionice_priority = +# Number of objects to process concurrently per process +# concurrency = 1 [pipeline:main] # Note that the reconciler's pipeline is intentionally very sparse -- it is diff --git a/swift/container/reconciler.py b/swift/container/reconciler.py index 6f3166a297..177bb538d0 100644 --- a/swift/container/reconciler.py +++ b/swift/container/reconciler.py @@ -357,7 +357,7 @@ class ContainerReconciler(Daemon): Move objects that are in the wrong storage policy. """ - def __init__(self, conf): + def __init__(self, conf, logger=None, swift=None): self.conf = conf # This option defines how long an un-processable misplaced object # marker will be retried before it is abandoned. It is not coupled @@ -366,9 +366,10 @@ class ContainerReconciler(Daemon): self.interval = float(conf.get('interval', 30)) conf_path = conf.get('__file__') or \ '/etc/swift/container-reconciler.conf' - self.logger = get_logger(conf, log_route='container-reconciler') + self.logger = logger or get_logger( + conf, log_route='container-reconciler') request_tries = int(conf.get('request_tries') or 3) - self.swift = InternalClient( + self.swift = swift or InternalClient( conf_path, 'Swift Container Reconciler', request_tries, @@ -377,6 +378,9 @@ class ContainerReconciler(Daemon): self.stats = defaultdict(int) self.last_stat_time = time.time() self.ring_check_interval = float(conf.get('ring_check_interval', 15)) + self.concurrency = int(conf.get('concurrency', 1)) + if self.concurrency < 1: + raise ValueError("concurrency must be set to at least 1") def stats_log(self, metric, msg, *args, **kwargs): """ @@ -716,9 +720,9 @@ class ContainerReconciler(Daemon): # hit most recent container first instead of waiting on the updaters current_container = get_reconciler_container_name(time.time()) yield current_container - container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT) self.logger.debug('looking for containers in %s', MISPLACED_OBJECTS_ACCOUNT) + container_gen = self.swift.iter_containers(MISPLACED_OBJECTS_ACCOUNT) while True: one_page = None try: @@ -769,29 +773,41 @@ class ContainerReconciler(Daemon): MISPLACED_OBJECTS_ACCOUNT, container, acceptable_statuses=(2, 404, 409, 412)) + def process_queue_entry(self, container, raw_obj): + """ + Process an entry and remove from queue on success. + + :param container: the queue container + :param raw_obj: the raw_obj listing from the container + """ + try: + obj_info = parse_raw_obj(raw_obj) + except Exception: + self.stats_log('invalid_record', + 'invalid queue record: %r', raw_obj, + level=logging.ERROR, exc_info=True) + return + finished = self.reconcile_object(obj_info) + if finished: + self.pop_queue(container, raw_obj['name'], + obj_info['q_ts'], + obj_info['q_record']) + def reconcile(self): """ - Main entry point for processing misplaced objects. + Main entry point for concurrent processing of misplaced objects. - Iterate over all queue entries and delegate to reconcile_object. + Iterate over all queue entries and delegate processing to spawned + workers in the pool. """ self.logger.debug('pulling items from the queue') + pool = GreenPool(self.concurrency) for container in self._iter_containers(): + self.logger.debug('checking container %s', container) for raw_obj in self._iter_objects(container): - try: - obj_info = parse_raw_obj(raw_obj) - except Exception: - self.stats_log('invalid_record', - 'invalid queue record: %r', raw_obj, - level=logging.ERROR, exc_info=True) - continue - finished = self.reconcile_object(obj_info) - if finished: - self.pop_queue(container, raw_obj['name'], - obj_info['q_ts'], - obj_info['q_record']) + pool.spawn_n(self.process_queue_entry, container, raw_obj) self.log_stats() - self.logger.debug('finished container %s', container) + pool.waitall() def run_once(self, *args, **kwargs): """ diff --git a/test/unit/container/test_reconciler.py b/test/unit/container/test_reconciler.py index c521d72bf7..da967abfb2 100644 --- a/test/unit/container/test_reconciler.py +++ b/test/unit/container/test_reconciler.py @@ -23,6 +23,7 @@ import os import errno import itertools import random +import eventlet from collections import defaultdict from datetime import datetime @@ -95,7 +96,7 @@ class FakeStoragePolicySwift(object): class FakeInternalClient(reconciler.InternalClient): - def __init__(self, listings): + def __init__(self, listings=None): self.app = FakeStoragePolicySwift() self.user_agent = 'fake-internal-client' self.request_tries = 1 @@ -103,6 +104,7 @@ class FakeInternalClient(reconciler.InternalClient): self.parse(listings) def parse(self, listings): + listings = listings or {} self.accounts = defaultdict(lambda: defaultdict(list)) for item, timestamp in listings.items(): # XXX this interface is stupid @@ -735,15 +737,36 @@ class TestReconciler(unittest.TestCase): def setUp(self): self.logger = debug_logger() conf = {} - with mock.patch('swift.container.reconciler.InternalClient'): - self.reconciler = reconciler.ContainerReconciler(conf) - self.reconciler.logger = self.logger + self.swift = FakeInternalClient() + self.reconciler = reconciler.ContainerReconciler( + conf, logger=self.logger, swift=self.swift) self.start_interval = int(time.time() // 3600 * 3600) self.current_container_path = '/v1/.misplaced_objects/%d' % ( self.start_interval) + listing_qs('') + def test_concurrency_config(self): + conf = {} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 1) + + conf = {'concurrency': '10'} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 10) + + conf = {'concurrency': 48} + r = reconciler.ContainerReconciler(conf, self.logger, self.swift) + self.assertEqual(r.concurrency, 48) + + conf = {'concurrency': 0} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + + conf = {'concurrency': '-1'} + self.assertRaises(ValueError, reconciler.ContainerReconciler, + conf, self.logger, self.swift) + def _mock_listing(self, objects): - self.reconciler.swift = FakeInternalClient(objects) + self.swift.parse(objects) self.fake_swift = self.reconciler.swift.app def _mock_oldest_spi(self, container_oldest_spi_map): @@ -776,6 +799,59 @@ class TestReconciler(unittest.TestCase): return [c[1][1:4] for c in mocks['direct_delete_container_entry'].mock_calls] + def test_no_concurrency(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456, + (1, "/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o2"): 3724.23456, + }) + + order_recieved = [] + + def fake_reconcile_object(account, container, obj, q_policy_index, + q_ts, q_op, path, **kwargs): + order_recieved.append(obj) + return True + + self.reconciler._reconcile_object = fake_reconcile_object + self.assertEqual(self.reconciler.concurrency, 1) # sanity + deleted_container_entries = self._run_once() + self.assertEqual(order_recieved, ['o1', 'o2']) + # process in order recieved + self.assertEqual(deleted_container_entries, [ + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'), + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'), + ]) + + def test_concurrency(self): + self._mock_listing({ + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o1"): 3618.84187, + (None, "/.misplaced_objects/3600/1:/AUTH_bob/c/o2"): 3724.23456, + (1, "/AUTH_bob/c/o1"): 3618.84187, + (1, "/AUTH_bob/c/o2"): 3724.23456, + }) + + order_recieved = [] + + def fake_reconcile_object(account, container, obj, q_policy_index, + q_ts, q_op, path, **kwargs): + order_recieved.append(obj) + # o1 takes longer than o2 for some reason + while 'o2' not in order_recieved: + eventlet.sleep(0.001) + return True + + self.reconciler._reconcile_object = fake_reconcile_object + self.reconciler.concurrency = 2 + deleted_container_entries = self._run_once() + self.assertEqual(order_recieved, ['o1', 'o2']) + # ... and so we finish o2 first + self.assertEqual(deleted_container_entries, [ + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'), + ('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'), + ]) + def test_invalid_queue_name(self): self._mock_listing({ (None, "/.misplaced_objects/3600/bogus"): 3618.84187,