Sharding: Sync container_stat table with fresh db
When moving an UNSHARDED broker into the SHARDING db state the important information from the retiring DB needs to be moved into the fresh DB. However, currently we are missing a whole bunch of container information from the retiring db. Currently we dont sync across the delete_timestamp, status or status_changed_at metadata either. These are important bits of information from the retiring DB. Delete_timestamp for instance, might indicate the UNSHARDED container is actually deleted, but there are objects inside that need to be dealt with. Currently, when transitioning to the SHARDING state it effectively undeletes itself. This patch, when moving into the SHARDING DB state via broker.set_sharding_state() now does an SQL update to update bits of the container_stat metadata that doesn't have direct API access from the broker. Namely delete_timestamp, status and status_changed_at. To get the container status, I had to add status to the get_info SQL. The fact that it had status_changed_at and not status feels like a bug, so adding seems to be in scope for this change. Change-Id: Ic3b3f21ce67c640dfb632b79fd09902f14576eaf
This commit is contained in:
parent
2e2251638c
commit
45e13ff4c5
@ -868,7 +868,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
try:
|
||||
data = conn.execute(('''
|
||||
SELECT account, container, created_at, put_timestamp,
|
||||
delete_timestamp, status_changed_at,
|
||||
delete_timestamp, status, status_changed_at,
|
||||
object_count, bytes_used,
|
||||
reported_put_timestamp, reported_delete_timestamp,
|
||||
reported_object_count, reported_bytes_used, hash,
|
||||
@ -923,7 +923,7 @@ class ContainerBroker(DatabaseBroker):
|
||||
Get global data for the container.
|
||||
|
||||
:returns: dict with keys: account, container, created_at,
|
||||
put_timestamp, delete_timestamp, status_changed_at,
|
||||
put_timestamp, delete_timestamp, status, status_changed_at,
|
||||
object_count, bytes_used, reported_put_timestamp,
|
||||
reported_delete_timestamp, reported_object_count,
|
||||
reported_bytes_used, hash, id, x_container_sync_point1,
|
||||
@ -2005,16 +2005,24 @@ class ContainerBroker(DatabaseBroker):
|
||||
self.path, err)
|
||||
return False
|
||||
|
||||
# Set the created_at and hash in the container_info table the same
|
||||
# in both brokers
|
||||
# sync the retiring container stat into the fresh db. At least the
|
||||
# things that either aren't covered through the normal
|
||||
# broker api, and things that wont just be regenerated.
|
||||
try:
|
||||
fresh_broker_conn.execute(
|
||||
'UPDATE container_stat SET created_at=?',
|
||||
(info['created_at'],))
|
||||
sql = 'UPDATE container_stat SET created_at=?, '
|
||||
sql += 'delete_timestamp=?, status=?, status_changed_at=?'
|
||||
sql_data = (info['created_at'], info['delete_timestamp'],
|
||||
info['status'], info['status_changed_at'])
|
||||
# 'reported_*' items are not sync'd because this is consistent
|
||||
# with when a new DB is created after rsync'ing to another
|
||||
# node (see _newid()). 'hash' should not be sync'd because
|
||||
# this DB has no object rows.
|
||||
fresh_broker_conn.execute(sql, sql_data)
|
||||
fresh_broker_conn.commit()
|
||||
except sqlite3.OperationalError as err:
|
||||
self.logger.error('Failed to set matching created_at time in '
|
||||
'the fresh database for %s: %s',
|
||||
self.logger.error(
|
||||
'Failed to sync the container_stat table/view with the '
|
||||
'fresh database for %s: %s',
|
||||
self.path, err)
|
||||
return False
|
||||
|
||||
|
@ -304,7 +304,6 @@ class TestContainerBroker(unittest.TestCase):
|
||||
# move to sharding state
|
||||
broker.enable_sharding(next(self.ts))
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
broker.delete_db(next(self.ts).internal)
|
||||
self.assertTrue(broker.is_deleted())
|
||||
|
||||
# check object in retiring db is considered
|
||||
@ -4755,6 +4754,11 @@ class TestContainerBroker(unittest.TestCase):
|
||||
self.assertTrue(os.path.exists(new_db_path))
|
||||
self.assertEqual([], broker.get_objects())
|
||||
self.assertEqual(objects, broker.get_brokers()[0].get_objects())
|
||||
self.assertEqual(broker.get_reconciler_sync(), -1)
|
||||
info = broker.get_info()
|
||||
if info.get('x_container_sync_point1'):
|
||||
self.assertEqual(info['x_container_sync_point1'], -1)
|
||||
self.assertEqual(info['x_container_sync_point2'], -1)
|
||||
check_sharding_state(broker)
|
||||
|
||||
# to confirm we're definitely looking at the shard db
|
||||
@ -4841,6 +4845,111 @@ class TestContainerBroker(unittest.TestCase):
|
||||
do_revive_shard_delete(shard_ranges)
|
||||
do_revive_shard_delete(shard_ranges)
|
||||
|
||||
@with_tempdir
|
||||
def test_set_sharding_state(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c',
|
||||
logger=debug_logger())
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
broker.merge_items([{'name': 'obj_%d' % i,
|
||||
'created_at': next(self.ts).normal,
|
||||
'content_type': 'text/plain',
|
||||
'etag': 'etag_%d' % i,
|
||||
'size': 1024 * i,
|
||||
'deleted': 0,
|
||||
'storage_policy_index': 0,
|
||||
} for i in range(1, 6)])
|
||||
broker.set_x_container_sync_points(1, 2)
|
||||
broker.update_reconciler_sync(3)
|
||||
self.assertEqual(3, broker.get_reconciler_sync())
|
||||
broker.reported(next(self.ts).internal, next(self.ts).internal,
|
||||
next(self.ts).internal, next(self.ts).internal)
|
||||
epoch = next(self.ts)
|
||||
broker.enable_sharding(epoch)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
self.assertFalse(broker.is_deleted())
|
||||
retiring_info = broker.get_info()
|
||||
self.assertEqual(1, len(broker.db_files))
|
||||
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
broker = ContainerBroker(db_path, account='a', container='c',
|
||||
logger=debug_logger())
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
fresh_info = broker.get_info()
|
||||
for key in ('reported_put_timestamp', 'reported_delete_timestamp'):
|
||||
retiring_info.pop(key)
|
||||
self.assertEqual('0', fresh_info.pop(key), key)
|
||||
for key in ('reported_object_count', 'reported_bytes_used'):
|
||||
retiring_info.pop(key)
|
||||
self.assertEqual(0, fresh_info.pop(key), key)
|
||||
self.assertNotEqual(retiring_info.pop('id'), fresh_info.pop('id'))
|
||||
self.assertNotEqual(retiring_info.pop('hash'), fresh_info.pop('hash'))
|
||||
self.assertNotEqual(retiring_info.pop('x_container_sync_point1'),
|
||||
fresh_info.pop('x_container_sync_point1'))
|
||||
self.assertNotEqual(retiring_info.pop('x_container_sync_point2'),
|
||||
fresh_info.pop('x_container_sync_point2'))
|
||||
self.assertEqual(-1, broker.get_reconciler_sync())
|
||||
self.assertEqual('unsharded', retiring_info.pop('db_state'))
|
||||
self.assertEqual('sharding', fresh_info.pop('db_state'))
|
||||
self.assertEqual(retiring_info, fresh_info)
|
||||
self.assertFalse(broker.is_deleted())
|
||||
self.assertEqual(2, len(broker.db_files))
|
||||
self.assertEqual(db_path, broker.db_files[0])
|
||||
fresh_db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash',
|
||||
'container_%s.db' % epoch.internal)
|
||||
self.assertEqual(fresh_db_path, broker.db_files[1])
|
||||
|
||||
@with_tempdir
|
||||
def test_set_sharding_state_deleted(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c',
|
||||
logger=debug_logger())
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
broker.set_x_container_sync_points(1, 2)
|
||||
broker.update_reconciler_sync(3)
|
||||
self.assertEqual(3, broker.get_reconciler_sync())
|
||||
broker.reported(next(self.ts).internal, next(self.ts).internal,
|
||||
next(self.ts).internal, next(self.ts).internal)
|
||||
epoch = next(self.ts)
|
||||
broker.enable_sharding(epoch)
|
||||
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||
broker.delete_db(next(self.ts).internal)
|
||||
self.assertTrue(broker.is_deleted())
|
||||
retiring_info = broker.get_info()
|
||||
self.assertEqual("DELETED", retiring_info['status'])
|
||||
self.assertEqual(1, len(broker.db_files))
|
||||
|
||||
self.assertTrue(broker.set_sharding_state())
|
||||
broker = ContainerBroker(db_path, account='a', container='c',
|
||||
logger=debug_logger())
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
fresh_info = broker.get_info()
|
||||
for key in ('reported_put_timestamp', 'reported_delete_timestamp'):
|
||||
retiring_info.pop(key)
|
||||
self.assertEqual('0', fresh_info.pop(key), key)
|
||||
for key in ('reported_object_count', 'reported_bytes_used'):
|
||||
retiring_info.pop(key)
|
||||
self.assertEqual(0, fresh_info.pop(key), key)
|
||||
self.assertNotEqual(retiring_info.pop('id'), fresh_info.pop('id'))
|
||||
self.assertNotEqual(retiring_info.pop('x_container_sync_point1'),
|
||||
fresh_info.pop('x_container_sync_point1'))
|
||||
self.assertNotEqual(retiring_info.pop('x_container_sync_point2'),
|
||||
fresh_info.pop('x_container_sync_point2'))
|
||||
self.assertEqual(-1, broker.get_reconciler_sync())
|
||||
self.assertEqual('unsharded', retiring_info.pop('db_state'))
|
||||
self.assertEqual('sharding', fresh_info.pop('db_state'))
|
||||
self.assertEqual(retiring_info, fresh_info)
|
||||
self.assertTrue(broker.is_deleted())
|
||||
self.assertEqual(2, len(broker.db_files))
|
||||
self.assertEqual(db_path, broker.db_files[0])
|
||||
fresh_db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash',
|
||||
'container_%s.db' % epoch.internal)
|
||||
self.assertEqual(fresh_db_path, broker.db_files[1])
|
||||
|
||||
@with_tempdir
|
||||
def test_set_sharding_state_errors(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
@ -4873,7 +4982,9 @@ class TestContainerBroker(unittest.TestCase):
|
||||
res = broker.set_sharding_state()
|
||||
self.assertFalse(res)
|
||||
lines = broker.logger.get_lines_for_level('error')
|
||||
self.assertIn('Failed to set matching', lines[0])
|
||||
self.assertIn(
|
||||
'Failed to sync the container_stat table/view with the fresh '
|
||||
'database', lines[0])
|
||||
self.assertFalse(lines[1:])
|
||||
|
||||
@with_tempdir
|
||||
|
Loading…
x
Reference in New Issue
Block a user