Refactor object updater's async-pending finder

Pulled the FS walking out into its own function so that finding
async_pendings is in one place and processing them is in another. This
should make it easier to add greenthread-based concurrency later.

Also refactored rate limiting to use utils.RateLimitedIterator instead
of calling ratelimit_sleep ourselves. The functionality's the same,
but the interface is easier.

Change-Id: I02f9e3201e9434080c6fb87ac5e249d8b327a231
This commit is contained in:
Samuel Merritt 2018-05-30 16:58:06 -07:00
parent 84f2bfcb2e
commit 1424ff71f9

View File

@ -28,7 +28,7 @@ from swift.common.constraints import check_drive
from swift.common.exceptions import ConnectionTimeout
from swift.common.ring import Ring
from swift.common.utils import get_logger, renamer, write_pickle, \
dump_recon_cache, config_true_value, ratelimit_sleep, split_path, \
dump_recon_cache, config_true_value, RateLimitedIterator, split_path, \
eventlet_monkey_patch, get_redirect_data
from swift.common.daemon import Daemon
from swift.common.header_key_dict import HeaderKeyDict
@ -200,19 +200,14 @@ class ObjectUpdater(Daemon):
dump_recon_cache({'object_updater_sweep': elapsed},
self.rcache, self.logger)
def object_sweep(self, device):
def _iter_async_pendings(self, device):
"""
If there are async pendings on the device, walk each one and update.
Locate and yield all the async pendings on the device. Multiple updates
for the same object will come out in reverse-chronological order
(i.e. newest first) so that callers can skip stale async_pendings.
:param device: path to device
Tries to clean up empty directories as it goes.
"""
start_time = time.time()
last_status_update = start_time
start_stats = self.stats.copy()
my_pid = os.getpid()
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
# loop through async pending dirs for all policies
for asyncdir in self._listdir(device):
# we only care about directories
@ -235,7 +230,6 @@ class ObjectUpdater(Daemon):
prefix_path = os.path.join(async_pending, prefix)
if not os.path.isdir(prefix_path):
continue
last_obj_hash = None
for update in sorted(self._listdir(prefix_path), reverse=True):
update_path = os.path.join(prefix_path, update)
if not os.path.isfile(update_path):
@ -250,34 +244,53 @@ class ObjectUpdater(Daemon):
'name %s')
% (update_path))
continue
if obj_hash == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update_path)
else:
self.process_object_update(update_path, device,
policy)
last_obj_hash = obj_hash
self.objects_running_time = ratelimit_sleep(
self.objects_running_time,
self.max_objects_per_second)
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
yield {'device': device, 'policy': policy,
'path': update_path,
'obj_hash': obj_hash, 'timestamp': timestamp}
try:
os.rmdir(prefix_path)
except OSError:
pass
def object_sweep(self, device):
"""
If there are async pendings on the device, walk each one and update.
:param device: path to device
"""
start_time = time.time()
last_status_update = start_time
start_stats = self.stats.copy()
my_pid = os.getpid()
self.logger.info("Object update sweep starting on %s (pid: %d)",
device, my_pid)
last_obj_hash = None
ap_iter = RateLimitedIterator(
self._iter_async_pendings(device),
elements_per_second=self.max_objects_per_second)
for update in ap_iter:
if update['obj_hash'] == last_obj_hash:
self.stats.unlinks += 1
self.logger.increment('unlinks')
os.unlink(update['path'])
else:
self.process_object_update(update['path'], update['device'],
update['policy'])
last_obj_hash = update['obj_hash']
now = time.time()
if now - last_status_update >= self.report_interval:
this_sweep = self.stats.since(start_stats)
self.logger.info(
('Object update sweep progress on %(device)s: '
'%(elapsed).02fs, %(stats)s (pid: %(pid)d)'),
{'device': device,
'elapsed': now - start_time,
'pid': my_pid,
'stats': this_sweep})
last_status_update = now
self.logger.timing_since('timing', start_time)
sweep_totals = self.stats.since(start_stats)
self.logger.info(