Check for change before container replicator updates db
As described in the related bug report, unnecessary updates to the container db during replication can impact object object GET performance in certain circumstances. This patch changes swift/container/replicator.py so that calls to merge_timestamps and update_reconciler_sync are conditional on values having actually changed. Related-Bug: 1332025 Change-Id: If498251656500ed7a3d7ca4b109ea1079b8513c2
This commit is contained in:
parent
c5013783ac
commit
b9ae377eab
@ -63,15 +63,18 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
broker.set_storage_policy_index(
|
||||
remote_info['storage_policy_index'],
|
||||
timestamp=status_changed_at.internal)
|
||||
broker.merge_timestamps(*(remote_info[key] for key in (
|
||||
'created_at', 'put_timestamp', 'delete_timestamp')))
|
||||
sync_timestamps = ('created_at', 'put_timestamp',
|
||||
'delete_timestamp')
|
||||
if any(info[key] != remote_info[key] for key in sync_timestamps):
|
||||
broker.merge_timestamps(*(remote_info[key] for key in
|
||||
sync_timestamps))
|
||||
rv = parent._handle_sync_response(
|
||||
node, response, info, broker, http)
|
||||
return rv
|
||||
|
||||
def find_local_handoff_for_part(self, part):
|
||||
"""
|
||||
Look through devices in the ring for the first handoff devie that was
|
||||
Look through devices in the ring for the first handoff device that was
|
||||
identified during job creation as available on this node.
|
||||
|
||||
:returns: a node entry from the ring
|
||||
@ -179,10 +182,10 @@ class ContainerReplicator(db_replicator.Replicator):
|
||||
def _post_replicate_hook(self, broker, info, responses):
|
||||
if info['account'] == MISPLACED_OBJECTS_ACCOUNT:
|
||||
return
|
||||
if not broker.has_multiple_policies():
|
||||
point = broker.get_reconciler_sync()
|
||||
if not broker.has_multiple_policies() and info['max_row'] != point:
|
||||
broker.update_reconciler_sync(info['max_row'])
|
||||
return
|
||||
point = broker.get_reconciler_sync()
|
||||
max_sync = self.dump_to_reconciler(broker, point)
|
||||
success = responses.count(True) >= quorum_size(len(responses))
|
||||
if max_sync > point and success:
|
||||
|
@ -31,6 +31,7 @@ from swift.common.storage_policy import POLICIES
|
||||
|
||||
from test.unit.common import test_db_replicator
|
||||
from test.unit import patch_policies
|
||||
from contextlib import contextmanager
|
||||
|
||||
|
||||
class TestReplicator(unittest.TestCase):
|
||||
@ -341,6 +342,62 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
remote_info['status_changed_at'],
|
||||
info['status_changed_at']))
|
||||
|
||||
@contextmanager
|
||||
def _wrap_merge_timestamps(self, broker, calls):
|
||||
def fake_merge_timestamps(*args, **kwargs):
|
||||
calls.append(args[0])
|
||||
orig_merge_timestamps(*args, **kwargs)
|
||||
|
||||
orig_merge_timestamps = broker.merge_timestamps
|
||||
broker.merge_timestamps = fake_merge_timestamps
|
||||
try:
|
||||
yield True
|
||||
finally:
|
||||
broker.merge_timestamps = orig_merge_timestamps
|
||||
|
||||
def test_sync_merge_timestamps(self):
|
||||
ts = (Timestamp(t).internal for t in
|
||||
itertools.count(int(time.time())))
|
||||
# setup a local container
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
put_timestamp = ts.next()
|
||||
broker.initialize(put_timestamp, POLICIES.default.idx)
|
||||
# setup remote container
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_put_timestamp = ts.next()
|
||||
remote_broker.initialize(remote_put_timestamp, POLICIES.default.idx)
|
||||
# replicate, expect call to merge_timestamps on remote and local
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
part, node = self._get_broker_part_node(remote_broker)
|
||||
info = broker.get_replication_info()
|
||||
local_calls = []
|
||||
remote_calls = []
|
||||
with self._wrap_merge_timestamps(broker, local_calls):
|
||||
with self._wrap_merge_timestamps(broker, remote_calls):
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(1, len(remote_calls))
|
||||
self.assertEqual(1, len(local_calls))
|
||||
self.assertEqual(remote_put_timestamp,
|
||||
broker.get_info()['put_timestamp'])
|
||||
self.assertEqual(remote_put_timestamp,
|
||||
remote_broker.get_info()['put_timestamp'])
|
||||
|
||||
# replicate again, no changes so expect no calls to merge_timestamps
|
||||
info = broker.get_replication_info()
|
||||
local_calls = []
|
||||
remote_calls = []
|
||||
with self._wrap_merge_timestamps(broker, local_calls):
|
||||
with self._wrap_merge_timestamps(broker, remote_calls):
|
||||
success = daemon._repl_to_node(node, broker, part, info)
|
||||
self.assertTrue(success)
|
||||
self.assertEqual(0, len(remote_calls))
|
||||
self.assertEqual(0, len(local_calls))
|
||||
self.assertEqual(remote_put_timestamp,
|
||||
broker.get_info()['put_timestamp'])
|
||||
self.assertEqual(remote_put_timestamp,
|
||||
remote_broker.get_info()['put_timestamp'])
|
||||
|
||||
def test_sync_bogus_db_quarantines(self):
|
||||
ts = (Timestamp(t).internal for t in
|
||||
itertools.count(int(time.time())))
|
||||
@ -851,6 +908,41 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
a, c, name = path.lstrip('/').split('/')
|
||||
self.assertEqual(most_recent_items[name], timestamp)
|
||||
|
||||
@contextmanager
|
||||
def _wrap_update_reconciler_sync(self, broker, calls):
|
||||
def wrapper_function(*args, **kwargs):
|
||||
calls.append(args)
|
||||
orig_function(*args, **kwargs)
|
||||
|
||||
orig_function = broker.update_reconciler_sync
|
||||
broker.update_reconciler_sync = wrapper_function
|
||||
try:
|
||||
yield True
|
||||
finally:
|
||||
broker.update_reconciler_sync = orig_function
|
||||
|
||||
def test_post_replicate_hook(self):
|
||||
ts = (Timestamp(t).internal for t in
|
||||
itertools.count(int(time.time())))
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(ts.next(), 0)
|
||||
broker.put_object('foo', ts.next(), 0, 'text/plain', 'xyz', deleted=0,
|
||||
storage_policy_index=0)
|
||||
info = broker.get_replication_info()
|
||||
self.assertEqual(1, info['max_row'])
|
||||
self.assertEqual(-1, broker.get_reconciler_sync())
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
calls = []
|
||||
with self._wrap_update_reconciler_sync(broker, calls):
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(1, len(calls))
|
||||
# repeated call to _post_replicate_hook with no change to info
|
||||
# should not call update_reconciler_sync
|
||||
calls = []
|
||||
with self._wrap_update_reconciler_sync(broker, calls):
|
||||
daemon._post_replicate_hook(broker, info, [])
|
||||
self.assertEqual(0, len(calls))
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
Loading…
x
Reference in New Issue
Block a user