Fix large out of sync out of date containers

As I understand it db replication starts with a preflight sync request
to the remote container server who's response will include the last
synced row_id that it has on file for the sending nodes database id.

If the difference in the last sync point returned is more than 50% of
the local sending db's rows, it'll fall back to sending the whole db
over rsync and let the remote end merge items locally - but generally
there's just a few rows missing and they're shipped over the wire as
json and stuffed into some rather normal looking merge_items calls.

The one thing that's a bit different with these remote merge_items calls
(compared to your average run of the mill eat a bunch of entries out of
a .pending file) is the is source kwarg.  When this optional kwarg comes
into merge_items it's the remote sending db's uuid, and after we eat all
the rows it sent us we update our local incoming_sync table for that
uuid so that next time when it makes it's pre-flight sync request we can
tell it where it left off.

Now normally the sending db is going to push out it's rows up from the
returned sync_point in 1000 item diffs, up to 10 batches total (per_diff
and max_diffs options) - 10K rows.  If that goes well then everything is
in sync up to at least the point it started, and the sending db will
*also* ship over *it's* incoming_sync rows to merge_syncs on the remote
end.  Since the sending db is in sync with these other db's up to those
points so is the remote db now by way of the transitive property.  Also
note through some weird artifact that I'm not entirely convinced isn't
an unrelated and possibly benign bug the incoming_sync table on the
sending db will often also happen to include it's own uuid - maybe it
got pushed back to it from another node?

Anyway, that seemed to work well enough until a sending db got diff
capped (i.e. sent it's 10K rows and wasn't finished), when this happened
the final merge_syncs call never gets sent because the remote end is
definitely *not* up to date with the other databases that the sending db
is - it's not even up-to-date with the sending db yet!  But the hope is
certainly that on the next pass it'll be able to finish sending the
remaining items.  But since the remote end is who decides what the last
successfully synced row with this local sending db was - it's super
important that the incoming_sync table is getting updated in merge_items
when that source kwarg is there.

I observed this simple and straight forward process wasn't working well
in one case - which is weird considering it didn't have much in the way
of tests.  After I had the test and started looking into it seemed maybe
the source kwarg handling got over-indented a bit in the bulk insert
merge_items refactor.  I think this is correct - maybe we could send
someone up to the mountain temple to seek out gholt?

Change-Id: I4137388a97925814748ecc36b3ab5f1ac3309659
This commit is contained in:
Clay Gerrard 2014-12-11 01:59:52 -08:00
parent 4cdb51418c
commit 404ac092d1
4 changed files with 70 additions and 16 deletions

View File

@ -279,7 +279,8 @@ class Replicator(Daemon):
"""
self.stats['diff'] += 1
self.logger.increment('diffs')
self.logger.debug('Syncing chunks with %s', http.host)
self.logger.debug('Syncing chunks with %s, starting at %s',
http.host, point)
sync_table = broker.get_syncs()
objects = broker.get_items_since(point, self.per_diff)
diffs = 0
@ -294,6 +295,8 @@ class Replicator(Daemon):
{'status': response.status,
'host': http.host})
return False
# replication relies on db order to send the next merge batch in
# order with no gaps
point = objects[-1]['ROWID']
objects = broker.get_items_since(point, self.per_diff)
if objects:

View File

@ -734,18 +734,19 @@ class ContainerBroker(DatabaseBroker):
rec['content_type'], rec['etag'], rec['deleted'],
rec['storage_policy_index'])
for rec in to_add.itervalues()))
if source:
max_rowid = max(rec['ROWID']
for rec in to_add.itervalues())
if source:
# for replication we rely on the remote end sending merges in
# order with no gaps to increment sync_points
sync_point = item_list[-1]['ROWID']
curs.execute('''
UPDATE incoming_sync SET
sync_point=max(?, sync_point) WHERE remote_id=?
''', (sync_point, source))
if curs.rowcount < 1:
curs.execute('''
UPDATE incoming_sync SET
sync_point=max(?, sync_point) WHERE remote_id=?
''', (max_rowid, source))
if curs.rowcount < 1:
curs.execute('''
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (max_rowid, source))
INSERT INTO incoming_sync (sync_point, remote_id)
VALUES (?, ?)
''', (sync_point, source))
conn.commit()
with self.get() as conn:

View File

@ -1351,7 +1351,7 @@ class TestReplicatorSync(unittest.TestCase):
if node['device'] == broker_device:
return part, node
def _run_once(self, node, conf_updates=None, daemon=None):
def _get_daemon(self, node, conf_updates):
conf = {
'devices': self.root,
'recon_cache_path': self.root,
@ -1360,7 +1360,10 @@ class TestReplicatorSync(unittest.TestCase):
}
if conf_updates:
conf.update(conf_updates)
daemon = daemon or self.replicator_daemon(conf, logger=self.logger)
return self.replicator_daemon(conf, logger=self.logger)
def _run_once(self, node, conf_updates=None, daemon=None):
daemon = daemon or self._get_daemon(node, conf_updates)
def _rsync_file(db_file, remote_file, **kwargs):
remote_server, remote_path = remote_file.split('/', 1)

View File

@ -184,7 +184,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
# add a row to "local" db
broker.put_object('/a/c/o', time.time(), 0, 'content-type', 'etag',
storage_policy_index=broker.storage_policy_index)
#replicate
# replicate
node = {'device': 'sdc', 'replication_ip': '127.0.0.1'}
daemon = replicator.ContainerReplicator({})
@ -309,6 +309,53 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
"mismatch remote %s %r != %r" % (
k, remote_info[k], v))
def test_diff_capped_sync(self):
ts = (Timestamp(t).internal for t in
itertools.count(int(time.time())))
put_timestamp = next(ts)
# start off with with a local db that is way behind
broker = self._get_broker('a', 'c', node_index=0)
broker.initialize(put_timestamp, POLICIES.default.idx)
for i in range(50):
broker.put_object(
'o%s' % i, next(ts), 0, 'content-type-old', 'etag',
storage_policy_index=broker.storage_policy_index)
# remote primary db has all the new bits...
remote_broker = self._get_broker('a', 'c', node_index=1)
remote_broker.initialize(put_timestamp, POLICIES.default.idx)
for i in range(100):
remote_broker.put_object(
'o%s' % i, next(ts), 0, 'content-type-new', 'etag',
storage_policy_index=remote_broker.storage_policy_index)
# except there's *one* tiny thing in our local broker that's newer
broker.put_object(
'o101', next(ts), 0, 'content-type-new', 'etag',
storage_policy_index=broker.storage_policy_index)
# setup daemon with smaller per_diff and max_diffs
part, node = self._get_broker_part_node(broker)
daemon = self._get_daemon(node, conf_updates={'per_diff': 10,
'max_diffs': 3})
self.assertEqual(daemon.per_diff, 10)
self.assertEqual(daemon.max_diffs, 3)
# run once and verify diff capped
self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual(1, daemon.stats['diff_capped'])
# run again and verify fully synced
self._run_once(node, daemon=daemon)
self.assertEqual(1, daemon.stats['diff'])
self.assertEqual(0, daemon.stats['diff_capped'])
# now that we're synced the new item should be in remote db
remote_names = set()
for item in remote_broker.list_objects_iter(500, '', '', '', ''):
name, ts, size, content_type, etag = item
remote_names.add(name)
self.assertEqual(content_type, 'content-type-new')
self.assert_('o101' in remote_names)
self.assertEqual(len(remote_names), 101)
self.assertEqual(remote_broker.get_info()['object_count'], 101)
def test_sync_status_change(self):
# setup a local container
broker = self._get_broker('a', 'c', node_index=0)
@ -746,7 +793,7 @@ class TestReplicatorSync(test_db_replicator.TestReplicatorSync):
self.assertEqual(remote_broker.get_info()['object_count'], 1)
self.assertEqual([], remote_broker.get_misplaced_since(-1, 1))
#replicate
# replicate
part, node = self._get_broker_part_node(broker)
daemon = self._run_once(node)
# since our local broker has no rows to push it logs as no_change