From 1424ff71f953a87dfbc12e1b252253839010e9d5 Mon Sep 17 00:00:00 2001 From: Samuel Merritt Date: Wed, 30 May 2018 16:58:06 -0700 Subject: [PATCH] Refactor object updater's async-pending finder Pulled the FS walking out into its own function so that finding async_pendings is in one place and processing them is in another. This should make it easier to add greenthread-based concurrency later. Also refactored rate limiting to use utils.RateLimitedIterator instead of calling ratelimit_sleep ourselves. The functionality's the same, but the interface is easier. Change-Id: I02f9e3201e9434080c6fb87ac5e249d8b327a231 --- swift/obj/updater.py | 85 +++++++++++++++++++++++++------------------- 1 file changed, 49 insertions(+), 36 deletions(-) diff --git a/swift/obj/updater.py b/swift/obj/updater.py index 8db17b98d4..6f8419d329 100644 --- a/swift/obj/updater.py +++ b/swift/obj/updater.py @@ -28,7 +28,7 @@ from swift.common.constraints import check_drive from swift.common.exceptions import ConnectionTimeout from swift.common.ring import Ring from swift.common.utils import get_logger, renamer, write_pickle, \ - dump_recon_cache, config_true_value, ratelimit_sleep, split_path, \ + dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \ eventlet_monkey_patch, get_redirect_data from swift.common.daemon import Daemon from swift.common.header_key_dict import HeaderKeyDict @@ -200,19 +200,14 @@ class ObjectUpdater(Daemon): dump_recon_cache({'object_updater_sweep': elapsed}, self.rcache, self.logger) - def object_sweep(self, device): + def _iter_async_pendings(self, device): """ - If there are async pendings on the device, walk each one and update. + Locate and yield all the async pendings on the device. Multiple updates + for the same object will come out in reverse-chronological order + (i.e. newest first) so that callers can skip stale async_pendings. - :param device: path to device + Tries to clean up empty directories as it goes. """ - start_time = time.time() - last_status_update = start_time - start_stats = self.stats.copy() - my_pid = os.getpid() - self.logger.info("Object update sweep starting on %s (pid: %d)", - device, my_pid) - # loop through async pending dirs for all policies for asyncdir in self._listdir(device): # we only care about directories @@ -235,7 +230,6 @@ class ObjectUpdater(Daemon): prefix_path = os.path.join(async_pending, prefix) if not os.path.isdir(prefix_path): continue - last_obj_hash = None for update in sorted(self._listdir(prefix_path), reverse=True): update_path = os.path.join(prefix_path, update) if not os.path.isfile(update_path): @@ -250,34 +244,53 @@ class ObjectUpdater(Daemon): 'name %s') % (update_path)) continue - if obj_hash == last_obj_hash: - self.stats.unlinks += 1 - self.logger.increment('unlinks') - os.unlink(update_path) - else: - self.process_object_update(update_path, device, - policy) - last_obj_hash = obj_hash - - self.objects_running_time = ratelimit_sleep( - self.objects_running_time, - self.max_objects_per_second) - - now = time.time() - if now - last_status_update >= self.report_interval: - this_sweep = self.stats.since(start_stats) - self.logger.info( - ('Object update sweep progress on %(device)s: ' - '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), - {'device': device, - 'elapsed': now - start_time, - 'pid': my_pid, - 'stats': this_sweep}) - last_status_update = now + yield {'device': device, 'policy': policy, + 'path': update_path, + 'obj_hash': obj_hash, 'timestamp': timestamp} try: os.rmdir(prefix_path) except OSError: pass + + def object_sweep(self, device): + """ + If there are async pendings on the device, walk each one and update. + + :param device: path to device + """ + start_time = time.time() + last_status_update = start_time + start_stats = self.stats.copy() + my_pid = os.getpid() + self.logger.info("Object update sweep starting on %s (pid: %d)", + device, my_pid) + + last_obj_hash = None + ap_iter = RateLimitedIterator( + self._iter_async_pendings(device), + elements_per_second=self.max_objects_per_second) + for update in ap_iter: + if update['obj_hash'] == last_obj_hash: + self.stats.unlinks += 1 + self.logger.increment('unlinks') + os.unlink(update['path']) + else: + self.process_object_update(update['path'], update['device'], + update['policy']) + last_obj_hash = update['obj_hash'] + + now = time.time() + if now - last_status_update >= self.report_interval: + this_sweep = self.stats.since(start_stats) + self.logger.info( + ('Object update sweep progress on %(device)s: ' + '%(elapsed).02fs, %(stats)s (pid: %(pid)d)'), + {'device': device, + 'elapsed': now - start_time, + 'pid': my_pid, + 'stats': this_sweep}) + last_status_update = now + self.logger.timing_since('timing', start_time) sweep_totals = self.stats.since(start_stats) self.logger.info(