Merge "Check for change before container replicator updates db"
This commit is contained in:
commit
ed84a77c40
@ -63,15 +63,18 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
broker.set_storage_policy_index(
|
broker.set_storage_policy_index(
|
||||||
remote_info['storage_policy_index'],
|
remote_info['storage_policy_index'],
|
||||||
timestamp=status_changed_at.internal)
|
timestamp=status_changed_at.internal)
|
||||||
broker.merge_timestamps(*(remote_info[key] for key in (
|
sync_timestamps = ('created_at', 'put_timestamp',
|
||||||
'created_at', 'put_timestamp', 'delete_timestamp')))
|
'delete_timestamp')
|
||||||
|
if any(info[key] != remote_info[key] for key in sync_timestamps):
|
||||||
|
broker.merge_timestamps(*(remote_info[key] for key in
|
||||||
|
sync_timestamps))
|
||||||
rv = parent._handle_sync_response(
|
rv = parent._handle_sync_response(
|
||||||
node, response, info, broker, http)
|
node, response, info, broker, http)
|
||||||
return rv
|
return rv
|
||||||
|
|
||||||
def find_local_handoff_for_part(self, part):
|
def find_local_handoff_for_part(self, part):
|
||||||
"""
|
"""
|
||||||
Look through devices in the ring for the first handoff devie that was
|
Look through devices in the ring for the first handoff device that was
|
||||||
identified during job creation as available on this node.
|
identified during job creation as available on this node.
|
||||||
|
|
||||||
:returns: a node entry from the ring
|
:returns: a node entry from the ring
|
||||||
@ -179,10 +182,10 @@ class ContainerReplicator(db_replicator.Replicator):
|
|||||||
def _post_replicate_hook(self, broker, info, responses):
|
def _post_replicate_hook(self, broker, info, responses):
|
||||||
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
||||||
return
|
return
|
||||||
if not broker.has_multiple_policies():
|
point = broker.get_reconciler_sync()
|
||||||
|
if not broker.has_multiple_policies() and info['max_row'] != point:
|
||||||
broker.update_reconciler_sync(info['max_row'])
|
broker.update_reconciler_sync(info['max_row'])
|
||||||
return
|
return
|
||||||
point = broker.get_reconciler_sync()
|
|
||||||
max_sync = self.dump_to_reconciler(broker, point)
|
max_sync = self.dump_to_reconciler(broker, point)
|
||||||
success = responses.count(True) >= quorum_size(len(responses))
|
success = responses.count(True) >= quorum_size(len(responses))
|
||||||
if max_sync > point and success:
|
if max_sync > point and success:
|
||||||
|
@ -31,6 +31,7 @@ from swift.common.storage_policy import POLICIES
|
|||||||
|
|
||||||
from test.unit.common import test_db_replicator
|
from test.unit.common import test_db_replicator
|
||||||
from test.unit import patch_policies
|
from test.unit import patch_policies
|
||||||
|
from contextlib import contextmanager
|
||||||
|
|
||||||
|
|
||||||
class TestReplicator(unittest.TestCase):
|
class TestReplicator(unittest.TestCase):
|
||||||
@ -341,6 +342,62 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
remote_info['status_changed_at'],
|
remote_info['status_changed_at'],
|
||||||
info['status_changed_at']))
|
info['status_changed_at']))
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _wrap_merge_timestamps(self, broker, calls):
|
||||||
|
def fake_merge_timestamps(*args, **kwargs):
|
||||||
|
calls.append(args[0])
|
||||||
|
orig_merge_timestamps(*args, **kwargs)
|
||||||
|
|
||||||
|
orig_merge_timestamps = broker.merge_timestamps
|
||||||
|
broker.merge_timestamps = fake_merge_timestamps
|
||||||
|
try:
|
||||||
|
yield True
|
||||||
|
finally:
|
||||||
|
broker.merge_timestamps = orig_merge_timestamps
|
||||||
|
|
||||||
|
def test_sync_merge_timestamps(self):
|
||||||
|
ts = (Timestamp(t).internal for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
# setup a local container
|
||||||
|
broker = self._get_broker('a', 'c', node_index=0)
|
||||||
|
put_timestamp = ts.next()
|
||||||
|
broker.initialize(put_timestamp, POLICIES.default.idx)
|
||||||
|
# setup remote container
|
||||||
|
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||||
|
remote_put_timestamp = ts.next()
|
||||||
|
remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx)
|
||||||
|
# replicate, expect call to merge_timestamps on remote and local
|
||||||
|
daemon = replicator.ContainerReplicator({})
|
||||||
|
part, node = self._get_broker_part_node(remote_broker)
|
||||||
|
info = broker.get_replication_info()
|
||||||
|
local_calls = []
|
||||||
|
remote_calls = []
|
||||||
|
with self._wrap_merge_timestamps(broker, local_calls):
|
||||||
|
with self._wrap_merge_timestamps(broker, remote_calls):
|
||||||
|
success = daemon._repl_to_node(node, broker, part, info)
|
||||||
|
self.assertTrue(success)
|
||||||
|
self.assertEqual(1, len(remote_calls))
|
||||||
|
self.assertEqual(1, len(local_calls))
|
||||||
|
self.assertEqual(remote_put_timestamp,
|
||||||
|
broker.get_info()['put_timestamp'])
|
||||||
|
self.assertEqual(remote_put_timestamp,
|
||||||
|
remote_broker.get_info()['put_timestamp'])
|
||||||
|
|
||||||
|
# replicate again, no changes so expect no calls to merge_timestamps
|
||||||
|
info = broker.get_replication_info()
|
||||||
|
local_calls = []
|
||||||
|
remote_calls = []
|
||||||
|
with self._wrap_merge_timestamps(broker, local_calls):
|
||||||
|
with self._wrap_merge_timestamps(broker, remote_calls):
|
||||||
|
success = daemon._repl_to_node(node, broker, part, info)
|
||||||
|
self.assertTrue(success)
|
||||||
|
self.assertEqual(0, len(remote_calls))
|
||||||
|
self.assertEqual(0, len(local_calls))
|
||||||
|
self.assertEqual(remote_put_timestamp,
|
||||||
|
broker.get_info()['put_timestamp'])
|
||||||
|
self.assertEqual(remote_put_timestamp,
|
||||||
|
remote_broker.get_info()['put_timestamp'])
|
||||||
|
|
||||||
def test_sync_bogus_db_quarantines(self):
|
def test_sync_bogus_db_quarantines(self):
|
||||||
ts = (Timestamp(t).internal for t in
|
ts = (Timestamp(t).internal for t in
|
||||||
itertools.count(int(time.time())))
|
itertools.count(int(time.time())))
|
||||||
@ -851,6 +908,41 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
|||||||
a, c, name = path.lstrip('/').split('/')
|
a, c, name = path.lstrip('/').split('/')
|
||||||
self.assertEqual(most_recent_items[name], timestamp)
|
self.assertEqual(most_recent_items[name], timestamp)
|
||||||
|
|
||||||
|
@contextmanager
|
||||||
|
def _wrap_update_reconciler_sync(self, broker, calls):
|
||||||
|
def wrapper_function(*args, **kwargs):
|
||||||
|
calls.append(args)
|
||||||
|
orig_function(*args, **kwargs)
|
||||||
|
|
||||||
|
orig_function = broker.update_reconciler_sync
|
||||||
|
broker.update_reconciler_sync = wrapper_function
|
||||||
|
try:
|
||||||
|
yield True
|
||||||
|
finally:
|
||||||
|
broker.update_reconciler_sync = orig_function
|
||||||
|
|
||||||
|
def test_post_replicate_hook(self):
|
||||||
|
ts = (Timestamp(t).internal for t in
|
||||||
|
itertools.count(int(time.time())))
|
||||||
|
broker = self._get_broker('a', 'c', node_index=0)
|
||||||
|
broker.initialize(ts.next(), 0)
|
||||||
|
broker.put_object('foo', ts.next(), 0, 'text/plain', 'xyz', deleted=0,
|
||||||
|
storage_policy_index=0)
|
||||||
|
info = broker.get_replication_info()
|
||||||
|
self.assertEqual(1, info['max_row'])
|
||||||
|
self.assertEqual(-1, broker.get_reconciler_sync())
|
||||||
|
daemon = replicator.ContainerReplicator({})
|
||||||
|
calls = []
|
||||||
|
with self._wrap_update_reconciler_sync(broker, calls):
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(1, len(calls))
|
||||||
|
# repeated call to _post_replicate_hook with no change to info
|
||||||
|
# should not call update_reconciler_sync
|
||||||
|
calls = []
|
||||||
|
with self._wrap_update_reconciler_sync(broker, calls):
|
||||||
|
daemon._post_replicate_hook(broker, info, [])
|
||||||
|
self.assertEqual(0, len(calls))
|
||||||
|
|
||||||
|
|
||||||
if __name__ == '__main__':
|
if __name__ == '__main__':
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
Loading…
Reference in New Issue
Block a user