Merge "Fix large out of sync out of date containers"
This commit is contained in:
commit
8cf9107022
@ -279,7 +279,8 @@ class Replicator(Daemon):
|
||||
"""
|
||||
self.stats['diff'] += 1
|
||||
self.logger.increment('diffs')
|
||||
self.logger.debug('Syncing chunks with %s', http.host)
|
||||
self.logger.debug('Syncing chunks with %s, starting at %s',
|
||||
http.host, point)
|
||||
sync_table = broker.get_syncs()
|
||||
objects = broker.get_items_since(point, self.per_diff)
|
||||
diffs = 0
|
||||
@ -294,6 +295,8 @@ class Replicator(Daemon):
|
||||
{'status': response.status,
|
||||
'host': http.host})
|
||||
return False
|
||||
# replication relies on db order to send the next merge batch in
|
||||
# order with no gaps
|
||||
point = objects[-1]['ROWID']
|
||||
objects = broker.get_items_since(point, self.per_diff)
|
||||
if objects:
|
||||
|
@ -734,18 +734,19 @@ class ContainerBroker(DatabaseBroker):
|
||||
rec['content_type'], rec['etag'], rec['deleted'],
|
||||
rec['storage_policy_index'])
|
||||
for rec in to_add.itervalues()))
|
||||
if source:
|
||||
max_rowid = max(rec['ROWID']
|
||||
for rec in to_add.itervalues())
|
||||
if source:
|
||||
# for replication we rely on the remote end sending merges in
|
||||
# order with no gaps to increment sync_points
|
||||
sync_point = item_list[-1]['ROWID']
|
||||
curs.execute('''
|
||||
UPDATE incoming_sync SET
|
||||
sync_point=max(?, sync_point) WHERE remote_id=?
|
||||
''', (sync_point, source))
|
||||
if curs.rowcount < 1:
|
||||
curs.execute('''
|
||||
UPDATE incoming_sync SET
|
||||
sync_point=max(?, sync_point) WHERE remote_id=?
|
||||
''', (max_rowid, source))
|
||||
if curs.rowcount < 1:
|
||||
curs.execute('''
|
||||
INSERT INTO incoming_sync (sync_point, remote_id)
|
||||
VALUES (?, ?)
|
||||
''', (max_rowid, source))
|
||||
INSERT INTO incoming_sync (sync_point, remote_id)
|
||||
VALUES (?, ?)
|
||||
''', (sync_point, source))
|
||||
conn.commit()
|
||||
|
||||
with self.get() as conn:
|
||||
|
@ -1334,7 +1334,7 @@ class TestReplicatorSync(unittest.TestCase):
|
||||
if node['device'] == broker_device:
|
||||
return part, node
|
||||
|
||||
def _run_once(self, node, conf_updates=None, daemon=None):
|
||||
def _get_daemon(self, node, conf_updates):
|
||||
conf = {
|
||||
'devices': self.root,
|
||||
'recon_cache_path': self.root,
|
||||
@ -1343,7 +1343,10 @@ class TestReplicatorSync(unittest.TestCase):
|
||||
}
|
||||
if conf_updates:
|
||||
conf.update(conf_updates)
|
||||
daemon = daemon or self.replicator_daemon(conf, logger=self.logger)
|
||||
return self.replicator_daemon(conf, logger=self.logger)
|
||||
|
||||
def _run_once(self, node, conf_updates=None, daemon=None):
|
||||
daemon = daemon or self._get_daemon(node, conf_updates)
|
||||
|
||||
def _rsync_file(db_file, remote_file, **kwargs):
|
||||
remote_server, remote_path = remote_file.split('/', 1)
|
||||
|
@ -184,7 +184,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
# add a row to "local" db
|
||||
broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag',
|
||||
storage_policy_index=broker.storage_policy_index)
|
||||
#replicate
|
||||
# replicate
|
||||
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
|
||||
daemon = replicator.ContainerReplicator({})
|
||||
|
||||
@ -309,6 +309,53 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
"mismatch remote %s %r != %r" % (
|
||||
k, remote_info[k], v))
|
||||
|
||||
def test_diff_capped_sync(self):
|
||||
ts = (Timestamp(t).internal for t in
|
||||
itertools.count(int(time.time())))
|
||||
put_timestamp = next(ts)
|
||||
# start off with with a local db that is way behind
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
broker.initialize(put_timestamp, POLICIES.default.idx)
|
||||
for i in range(50):
|
||||
broker.put_object(
|
||||
'o%s' % i, next(ts), 0, 'content-type-old', 'etag',
|
||||
storage_policy_index=broker.storage_policy_index)
|
||||
# remote primary db has all the new bits...
|
||||
remote_broker = self._get_broker('a', 'c', node_index=1)
|
||||
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
|
||||
for i in range(100):
|
||||
remote_broker.put_object(
|
||||
'o%s' % i, next(ts), 0, 'content-type-new', 'etag',
|
||||
storage_policy_index=remote_broker.storage_policy_index)
|
||||
# except there's *one* tiny thing in our local broker that's newer
|
||||
broker.put_object(
|
||||
'o101', next(ts), 0, 'content-type-new', 'etag',
|
||||
storage_policy_index=broker.storage_policy_index)
|
||||
|
||||
# setup daemon with smaller per_diff and max_diffs
|
||||
part, node = self._get_broker_part_node(broker)
|
||||
daemon = self._get_daemon(node, conf_updates={'per_diff': 10,
|
||||
'max_diffs': 3})
|
||||
self.assertEqual(daemon.per_diff, 10)
|
||||
self.assertEqual(daemon.max_diffs, 3)
|
||||
# run once and verify diff capped
|
||||
self._run_once(node, daemon=daemon)
|
||||
self.assertEqual(1, daemon.stats['diff'])
|
||||
self.assertEqual(1, daemon.stats['diff_capped'])
|
||||
# run again and verify fully synced
|
||||
self._run_once(node, daemon=daemon)
|
||||
self.assertEqual(1, daemon.stats['diff'])
|
||||
self.assertEqual(0, daemon.stats['diff_capped'])
|
||||
# now that we're synced the new item should be in remote db
|
||||
remote_names = set()
|
||||
for item in remote_broker.list_objects_iter(500, '', '', '', ''):
|
||||
name, ts, size, content_type, etag = item
|
||||
remote_names.add(name)
|
||||
self.assertEqual(content_type, 'content-type-new')
|
||||
self.assert_('o101' in remote_names)
|
||||
self.assertEqual(len(remote_names), 101)
|
||||
self.assertEqual(remote_broker.get_info()['object_count'], 101)
|
||||
|
||||
def test_sync_status_change(self):
|
||||
# setup a local container
|
||||
broker = self._get_broker('a', 'c', node_index=0)
|
||||
@ -746,7 +793,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
|
||||
self.assertEqual(remote_broker.get_info()['object_count'], 1)
|
||||
self.assertEqual([], remote_broker.get_misplaced_since(-1, 1))
|
||||
|
||||
#replicate
|
||||
# replicate
|
||||
part, node = self._get_broker_part_node(broker)
|
||||
daemon = self._run_once(node)
|
||||
# since our local broker has no rows to push it logs as no_change
|
||||
|
Loading…
Reference in New Issue
Block a user