reconciler: concurreny follow-up

Related-Change-Id: I72e9601b58c2f20bb1294876bb39f2c78827d5f8
Change-Id: I21ad3c6377d15e931f737d8bec48ad6c887be52e
This commit is contained in:
Clay Gerrard 2021-07-14 13:49:50 -05:00
parent f739cadf97
commit f29ea9d04a
2 changed files with 21 additions and 18 deletions

View File

@ -773,25 +773,19 @@ class ContainerReconciler(Daemon):
MISPLACED_OBJECTS_ACCOUNT, container, MISPLACED_OBJECTS_ACCOUNT, container,
acceptable_statuses=(2, 404, 409, 412)) acceptable_statuses=(2, 404, 409, 412))
def process_queue_entry(self, container, raw_obj): def process_queue_item(self, q_container, q_entry, queue_item):
""" """
Process an entry and remove from queue on success. Process an entry and remove from queue on success.
:param container: the queue container :param q_container: the queue container
:param raw_obj: the raw_obj listing from the container :param q_entry: the raw_obj name from the q_container
:param queue_item: a parsed entry from the queue
""" """
try: finished = self.reconcile_object(queue_item)
obj_info = parse_raw_obj(raw_obj)
except Exception:
self.stats_log('invalid_record',
'invalid queue record: %r', raw_obj,
level=logging.ERROR, exc_info=True)
return
finished = self.reconcile_object(obj_info)
if finished: if finished:
self.pop_queue(container, raw_obj['name'], self.pop_queue(q_container, q_entry,
obj_info['q_ts'], queue_item['q_ts'],
obj_info['q_record']) queue_item['q_record'])
def reconcile(self): def reconcile(self):
""" """
@ -805,7 +799,15 @@ class ContainerReconciler(Daemon):
for container in self._iter_containers(): for container in self._iter_containers():
self.logger.debug('checking container %s', container) self.logger.debug('checking container %s', container)
for raw_obj in self._iter_objects(container): for raw_obj in self._iter_objects(container):
pool.spawn_n(self.process_queue_entry, container, raw_obj) try:
queue_item = parse_raw_obj(raw_obj)
except Exception:
self.stats_log('invalid_record',
'invalid queue record: %r', raw_obj,
level=logging.ERROR, exc_info=True)
continue
pool.spawn_n(self.process_queue_item,
container, raw_obj['name'], queue_item)
self.log_stats() self.log_stats()
pool.waitall() pool.waitall()

View File

@ -837,9 +837,10 @@ class TestReconciler(unittest.TestCase):
def fake_reconcile_object(account, container, obj, q_policy_index, def fake_reconcile_object(account, container, obj, q_policy_index,
q_ts, q_op, path, **kwargs): q_ts, q_op, path, **kwargs):
order_recieved.append(obj) order_recieved.append(obj)
if obj == 'o1':
# o1 takes longer than o2 for some reason # o1 takes longer than o2 for some reason
while 'o2' not in order_recieved: for i in range(10):
eventlet.sleep(0.001) eventlet.sleep(0.0)
return True return True
self.reconciler._reconcile_object = fake_reconcile_object self.reconciler._reconcile_object = fake_reconcile_object