reconciler: Record queue-clean-up later than old-policy-clean-up
Previously, queue-clean-up would use the same timestamp/offset as the tombstone written down in the old policy to clean up now-moved data. This could lead to bad behaviors, as the reconciler-written tombstone could itself be enqueued to be reconciled, at the same timestamp. If that happened, replication would never bring the DB replicas to consistency, causing the reconciler to get different answers for whether there was work to do. Now, add an extra offset bump between the tombstone in the old policy and queue-clean-up. Also add an extra offset to the moved data in the new policy, to keep it one ahead of queue-clean-up. New ordering looks like: - data/ts in old policy at t0 (as well as queue entry time to move it) - tombstone in old policy at t0_1 - queue clean-up time at t0_2 - moved data/ts in new policy at t0_3 Closes-Bug: #2028175 Change-Id: Ib0dda0d338f48336d18d3d817a0c5994e201042e
This commit is contained in:
parent
288677695d
commit
3a43242e79
@ -435,7 +435,7 @@ class ContainerReconciler(Daemon):
|
||||
an object was manually re-enqued.
|
||||
"""
|
||||
q_path = '/%s/%s/%s' % (MISPLACED_OBJECTS_ACCOUNT, container, obj)
|
||||
x_timestamp = slightly_later_timestamp(max(q_record, q_ts))
|
||||
x_timestamp = slightly_later_timestamp(max(q_record, q_ts), offset=2)
|
||||
self.stats_log('pop_queue', 'remove %r (%f) from the queue (%s)',
|
||||
q_path, q_ts, x_timestamp)
|
||||
headers = {'X-Timestamp': x_timestamp}
|
||||
@ -623,7 +623,7 @@ class ContainerReconciler(Daemon):
|
||||
# optimistically move any source with a timestamp >= q_ts
|
||||
ts = max(Timestamp(source_ts), q_ts)
|
||||
# move the object
|
||||
put_timestamp = slightly_later_timestamp(ts, offset=2)
|
||||
put_timestamp = slightly_later_timestamp(ts, offset=3)
|
||||
self.stats_log('copy_attempt', '%r (%f) in policy_index %s will be '
|
||||
'moved to policy_index %s (%s)', path, source_ts,
|
||||
q_policy_index, container_policy_index, put_timestamp)
|
||||
@ -663,7 +663,7 @@ class ContainerReconciler(Daemon):
|
||||
Issue a DELETE request against the destination to match the
|
||||
misplaced DELETE against the source.
|
||||
"""
|
||||
delete_timestamp = slightly_later_timestamp(q_ts, offset=2)
|
||||
delete_timestamp = slightly_later_timestamp(q_ts, offset=3)
|
||||
self.stats_log('delete_attempt', '%r (%f) in policy_index %s '
|
||||
'will be deleted from policy_index %s (%s)', path,
|
||||
source_ts, q_policy_index, container_policy_index,
|
||||
|
@ -947,8 +947,8 @@ class TestReconciler(unittest.TestCase):
|
||||
with mock.patch('time.time', lambda: next(mock_time_iter)):
|
||||
self.reconciler.run_once()
|
||||
|
||||
return [c[1][1:4] for c in
|
||||
mocks['direct_delete_container_entry'].mock_calls]
|
||||
return [tuple(list(c[1][1:4]) + [c[2]['headers']['X-Timestamp']])
|
||||
for c in mocks['direct_delete_container_entry'].mock_calls]
|
||||
|
||||
def test_no_concurrency(self):
|
||||
self._mock_listing({
|
||||
@ -971,8 +971,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# process in order recieved
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2',
|
||||
Timestamp(3724.23456, offset=2).internal)
|
||||
])
|
||||
|
||||
def test_concurrency(self):
|
||||
@ -1000,8 +1002,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(order_recieved, ['o1', 'o2'])
|
||||
# ... and so we finish o2 first
|
||||
self.assertEqual(deleted_container_entries, [
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1'),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o2',
|
||||
Timestamp(3724.23456, offset=2).internal),
|
||||
('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal),
|
||||
])
|
||||
|
||||
def test_multi_process_should_process(self):
|
||||
@ -1100,7 +1104,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so pop the queue for that one
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_queue_name_with_policy_index_delimiter_in_name(self):
|
||||
@ -1145,7 +1150,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and pop the queue for that one
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries, [(
|
||||
'.misplaced_objects', '3600', '1:/%s' % obj_path)])
|
||||
'.misplaced_objects', '3600', '1:/%s' % obj_path,
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_unable_to_direct_get_oldest_storage_policy(self):
|
||||
@ -1243,9 +1249,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'),
|
||||
('PUT', '/v1/AUTH_bob/c/o1')])
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2))
|
||||
Timestamp(3618.84187, offset=3))
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1256,7 +1262,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_the_other_direction(self):
|
||||
@ -1290,9 +1297,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[1].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2).internal)
|
||||
Timestamp(3618.84187, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1303,7 +1310,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '0:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '0:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_with_unicode_and_spaces(self):
|
||||
@ -1349,9 +1357,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/%s' % obj_path), # 1
|
||||
('PUT', '/v1/%s' % obj_path)]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we PUT the object in the right place with q_ts + offset 2
|
||||
# we PUT the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3618.84187, offset=2).internal)
|
||||
Timestamp(3618.84187, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1365,7 +1373,8 @@ class TestReconciler(unittest.TestCase):
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
# this mock received the name, it's encoded down in buffered_http
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/%s' % obj_name)])
|
||||
[('.misplaced_objects', '3600', '1:/%s' % obj_name,
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_delete(self):
|
||||
@ -1406,9 +1415,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'),
|
||||
('DELETE', '/v1/AUTH_bob/c/o1')])
|
||||
reconcile_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# we DELETE the object in the right place with q_ts + offset 2
|
||||
# we DELETE the object in the right place with q_ts + offset 3
|
||||
self.assertEqual(reconcile_headers.get('X-Timestamp'),
|
||||
Timestamp(q_ts, offset=2).internal)
|
||||
Timestamp(q_ts, offset=3).internal)
|
||||
# cleans up the old
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1419,7 +1428,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and when we're done, we pop the entry from the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(q_ts, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_enqueued_for_the_correct_dest_noop(self):
|
||||
@ -1443,7 +1453,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so we just pop the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3618.84187, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_src_object_newer_than_queue_entry(self):
|
||||
@ -1478,10 +1489,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# .. with source timestamp + offset 2
|
||||
# .. with source timestamp + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3600.234567, offset=2))
|
||||
Timestamp(3600.234567, offset=3))
|
||||
# src object is cleaned up
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1491,7 +1502,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and queue is popped
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3600.123456, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_src_object_older_than_queue_entry(self):
|
||||
@ -1542,7 +1554,7 @@ class TestReconciler(unittest.TestCase):
|
||||
self._mock_oldest_spi({'c': 0})
|
||||
self.fake_swift.storage_policy[1].register(
|
||||
'GET', '/v1/AUTH_bob/c/o1', swob.HTTPNotFound,
|
||||
{'X-Backend-Timestamp': Timestamp(q_ts, offset=2).internal})
|
||||
{'X-Backend-Timestamp': Timestamp(q_ts, offset=3).internal})
|
||||
deleted_container_entries = self._run_once()
|
||||
|
||||
# found a misplaced object
|
||||
@ -1682,10 +1694,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# .. with source timestamp + offset 2
|
||||
# .. with source timestamp + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(3600.123457, offset=2))
|
||||
Timestamp(3600.123457, offset=3))
|
||||
# we try to cleanup
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
# ... with q_ts + offset 1
|
||||
@ -1731,7 +1743,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# so... we're done
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(q_ts, offset=2).internal)])
|
||||
# dunno if this is helpful, but FWIW we don't throw tombstones?
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 0)
|
||||
self.assertEqual(self.reconciler.stats['success'], 1) # lol
|
||||
@ -1772,7 +1785,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and wipe our hands of it
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3679.2019, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_dest_object_newer_than_queue_entry(self):
|
||||
@ -1813,7 +1827,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and since we cleaned up the old object, so this counts as done
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '3600', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(3679.2019, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_dest_object_older_than_queue_entry(self):
|
||||
@ -1847,10 +1862,10 @@ class TestReconciler(unittest.TestCase):
|
||||
self.fake_swift.storage_policy[0].calls,
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
# ... with a q_ts + offset 2
|
||||
# ... with a q_ts + offset 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.38393, offset=2))
|
||||
Timestamp(36123.38393, offset=3))
|
||||
# then clean the dark matter
|
||||
self.assertEqual(self.reconciler.stats['cleanup_attempt'], 1)
|
||||
self.assertEqual(self.reconciler.stats['cleanup_success'], 1)
|
||||
@ -1861,7 +1876,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# and pop the queue
|
||||
self.assertEqual(self.reconciler.stats['pop_queue'], 1)
|
||||
self.assertEqual(deleted_container_entries,
|
||||
[('.misplaced_objects', '36000', '1:/AUTH_bob/c/o1')])
|
||||
[('.misplaced_objects', '36000', '1:/AUTH_bob/c/o1',
|
||||
Timestamp(36123.38393, offset=2).internal)])
|
||||
self.assertEqual(self.reconciler.stats['success'], 1)
|
||||
|
||||
def test_object_move_put_fails(self):
|
||||
@ -1899,9 +1915,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# ...with q_ts + offset 2 (20-microseconds)
|
||||
# ...with q_ts + offset 3 (20-microseconds)
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.383925, offset=2))
|
||||
Timestamp(36123.383925, offset=3))
|
||||
# but it failed
|
||||
self.assertEqual(self.reconciler.stats['copy_success'], 0)
|
||||
self.assertEqual(self.reconciler.stats['copy_failed'], 1)
|
||||
@ -1950,9 +1966,9 @@ class TestReconciler(unittest.TestCase):
|
||||
[('HEAD', '/v1/AUTH_bob/c/o1'), # 1
|
||||
('PUT', '/v1/AUTH_bob/c/o1')]) # 3
|
||||
put_headers = self.fake_swift.storage_policy[0].headers[1]
|
||||
# ...with q_ts + offset 2 (20-microseconds)
|
||||
# ...with q_ts + offset 3 (20-microseconds)
|
||||
self.assertEqual(put_headers.get('X-Timestamp'),
|
||||
Timestamp(36123.383925, offset=2))
|
||||
Timestamp(36123.383925, offset=3))
|
||||
# but it blows up hard
|
||||
self.assertEqual(self.reconciler.stats['unhandled_error'], 1)
|
||||
# so we don't cleanup
|
||||
@ -2029,7 +2045,8 @@ class TestReconciler(unittest.TestCase):
|
||||
# probably been reaped, so we'll just give up
|
||||
self.assertEqual(
|
||||
deleted_container_entries,
|
||||
[('.misplaced_objects', container, '1:/AUTH_jeb/c/o1')])
|
||||
[('.misplaced_objects', container, '1:/AUTH_jeb/c/o1',
|
||||
Timestamp(queue_ts, offset=2).internal)])
|
||||
|
||||
def test_delete_old_empty_queue_containers(self):
|
||||
ts = time.time() - self.reconciler.reclaim_age * 1.1
|
||||
|
Loading…
x
Reference in New Issue
Block a user