Merge "Do not reclaim sharded roots until they shrink"
This commit is contained in:
commit
923323f477
@ -812,16 +812,24 @@ class ContainerBroker(DatabaseBroker):
|
||||
info.update(self._get_alternate_object_stats()[1])
|
||||
return self._is_deleted_info(**info)
|
||||
|
||||
def is_reclaimable(self, now, reclaim_age):
|
||||
def is_old_enough_to_reclaim(self, now, reclaim_age):
|
||||
with self.get() as conn:
|
||||
info = conn.execute('''
|
||||
SELECT put_timestamp, delete_timestamp
|
||||
FROM container_stat''').fetchone()
|
||||
if (Timestamp(now - reclaim_age) >
|
||||
return (Timestamp(now - reclaim_age) >
|
||||
Timestamp(info['delete_timestamp']) >
|
||||
Timestamp(info['put_timestamp'])):
|
||||
return self.empty()
|
||||
Timestamp(info['put_timestamp']))
|
||||
|
||||
def is_empty_enough_to_reclaim(self):
|
||||
if self.is_root_container() and (self.get_shard_ranges() or
|
||||
self.get_db_state() == SHARDING):
|
||||
return False
|
||||
return self.empty()
|
||||
|
||||
def is_reclaimable(self, now, reclaim_age):
|
||||
return self.is_old_enough_to_reclaim(now, reclaim_age) and \
|
||||
self.is_empty_enough_to_reclaim()
|
||||
|
||||
def get_info_is_deleted(self):
|
||||
"""
|
||||
|
@ -834,6 +834,11 @@ class ContainerSharder(ContainerReplicator):
|
||||
|
||||
def _audit_container(self, broker):
|
||||
if broker.is_deleted():
|
||||
if broker.is_old_enough_to_reclaim(time.time(), self.reclaim_age) \
|
||||
and not broker.is_empty_enough_to_reclaim():
|
||||
self.logger.warning(
|
||||
'Reclaimable db stuck waiting for shrinking: %s (%s)',
|
||||
broker.db_file, quote(broker.path))
|
||||
# if the container has been marked as deleted, all metadata will
|
||||
# have been erased so no point auditing. But we want it to pass, in
|
||||
# case any objects exist inside it.
|
||||
|
@ -29,7 +29,8 @@ from swift.common.utils import ShardRange, parse_db_filename, get_db_files, \
|
||||
quorum_size, config_true_value, Timestamp, md5
|
||||
from swift.container.backend import ContainerBroker, UNSHARDED, SHARDING, \
|
||||
SHARDED
|
||||
from swift.container.sharder import CleavingContext
|
||||
from swift.container.sharder import CleavingContext, ContainerSharder
|
||||
from swift.container.replicator import ContainerReplicator
|
||||
from swiftclient import client, get_auth, ClientException
|
||||
|
||||
from swift.proxy.controllers.base import get_cache_key
|
||||
@ -39,6 +40,7 @@ from test.probe import PROXY_BASE_URL
|
||||
from test.probe.brain import BrainSplitter
|
||||
from test.probe.common import ReplProbeTest, get_server_number, \
|
||||
wait_for_server_to_hangup
|
||||
from test.debug_logger import debug_logger
|
||||
|
||||
|
||||
MIN_SHARD_CONTAINER_THRESHOLD = 4
|
||||
@ -1740,6 +1742,82 @@ class TestContainerSharding(BaseTestContainerSharding):
|
||||
# container
|
||||
do_shard_then_shrink()
|
||||
|
||||
def test_delete_root_reclaim(self):
|
||||
all_obj_names = self._make_object_names(self.max_shard_size)
|
||||
self.put_objects(all_obj_names)
|
||||
# Shard the container
|
||||
client.post_container(self.url, self.admin_token, self.container_name,
|
||||
headers={'X-Container-Sharding': 'on'})
|
||||
for n in self.brain.node_numbers:
|
||||
self.sharders.once(
|
||||
number=n, additional_args='--partitions=%s' % self.brain.part)
|
||||
# sanity checks
|
||||
for node in self.brain.nodes:
|
||||
self.assert_container_state(node, 'sharded', 2)
|
||||
self.assert_container_delete_fails()
|
||||
self.assert_container_has_shard_sysmeta()
|
||||
self.assert_container_post_ok('sharded')
|
||||
self.assert_container_listing(all_obj_names)
|
||||
|
||||
# delete all objects - updates redirected to shards
|
||||
self.delete_objects(all_obj_names)
|
||||
self.assert_container_listing([])
|
||||
self.assert_container_post_ok('has objects')
|
||||
# root not yet updated with shard stats
|
||||
self.assert_container_object_count(len(all_obj_names))
|
||||
self.assert_container_delete_fails()
|
||||
self.assert_container_has_shard_sysmeta()
|
||||
|
||||
# run sharder on shard containers to update root stats
|
||||
shard_ranges = self.get_container_shard_ranges()
|
||||
self.assertLengthEqual(shard_ranges, 2)
|
||||
self.run_sharders(shard_ranges)
|
||||
self.assert_container_listing([])
|
||||
self.assert_container_post_ok('empty')
|
||||
self.assert_container_object_count(0)
|
||||
|
||||
# and now we can delete it!
|
||||
client.delete_container(self.url, self.token, self.container_name)
|
||||
self.assert_container_post_fails('deleted')
|
||||
self.assert_container_not_found()
|
||||
|
||||
# see if it will reclaim
|
||||
Manager(['container-updater']).once()
|
||||
for conf_file in self.configs['container-replicator'].values():
|
||||
conf = utils.readconf(conf_file, 'container-replicator')
|
||||
conf['reclaim_age'] = 0
|
||||
ContainerReplicator(conf).run_once()
|
||||
|
||||
logger = debug_logger('probe')
|
||||
|
||||
# not sure why this doesn't work like replicators?
|
||||
self.assertFalse(self.configs['container-sharder'].values())
|
||||
sharder_conf_files = []
|
||||
for server in Manager(['container-sharder']):
|
||||
sharder_conf_files.extend(server.conf_files())
|
||||
# we don't expect warnings from sharder root audits
|
||||
for conf_file in sharder_conf_files:
|
||||
conf = utils.readconf(conf_file, 'container-sharder')
|
||||
ContainerSharder(conf, logger=logger).run_once()
|
||||
self.assertEqual([], logger.get_lines_for_level('warning'))
|
||||
|
||||
# until the root wants to start reclaiming but we haven't shrunk yet!
|
||||
found_warning = False
|
||||
for conf_file in sharder_conf_files:
|
||||
conf = utils.readconf(conf_file, 'container-sharder')
|
||||
logger = debug_logger('probe')
|
||||
conf['reclaim_age'] = 0
|
||||
ContainerSharder(conf, logger=logger).run_once()
|
||||
warnings = logger.get_lines_for_level('warning')
|
||||
if warnings:
|
||||
self.assertTrue(warnings[0].startswith(
|
||||
'Reclaimable db stuck waiting for shrinking'))
|
||||
self.assertEqual(1, len(warnings))
|
||||
found_warning = True
|
||||
self.assertTrue(found_warning)
|
||||
|
||||
# TODO: shrink empty shards and assert everything reclaims
|
||||
|
||||
def _setup_replication_scenario(self, num_shards, extra_objs=('alpha',)):
|
||||
# Get cluster to state where 2 replicas are sharding or sharded but 3rd
|
||||
# replica is unsharded and has an object that the first 2 are missing.
|
||||
|
@ -130,6 +130,92 @@ class TestContainerBroker(unittest.TestCase):
|
||||
pass
|
||||
self.assertTrue(broker.conn is None)
|
||||
|
||||
@with_tempdir
|
||||
@mock.patch("swift.container.backend.ContainerBroker.get")
|
||||
def test_is_old_enough_to_reclaim(self, tempdir, mocked_get):
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
def do_test(now, reclaim_age, put_ts, delete_ts, expected):
|
||||
mocked_get.return_value.\
|
||||
__enter__.return_value.\
|
||||
execute.return_value.\
|
||||
fetchone.return_value = dict(delete_timestamp=delete_ts,
|
||||
put_timestamp=put_ts)
|
||||
|
||||
self.assertEqual(expected,
|
||||
broker.is_old_enough_to_reclaim(now, reclaim_age))
|
||||
|
||||
now_time = time()
|
||||
tests = (
|
||||
# (now, reclaim_age, put_ts, del_ts, expected),
|
||||
(0, 0, 0, 0, False),
|
||||
# Never deleted
|
||||
(now_time, 100, now_time - 200, 0, False),
|
||||
# Deleted ts older the put_ts
|
||||
(now_time, 100, now_time - 150, now_time - 200, False),
|
||||
# not reclaim_age yet
|
||||
(now_time, 100, now_time - 150, now_time - 50, False),
|
||||
# right on reclaim doesn't work
|
||||
(now_time, 100, now_time - 150, now_time - 100, False),
|
||||
# put_ts wins over del_ts
|
||||
(now_time, 100, now_time - 150, now_time - 150, False),
|
||||
# good case, reclaim > delete_ts > put_ts
|
||||
(now_time, 100, now_time - 150, now_time - 125, True))
|
||||
for test in tests:
|
||||
do_test(*test)
|
||||
|
||||
@with_tempdir
|
||||
def test_is_reclaimable(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
|
||||
self.assertFalse(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
broker.delete_db(next(self.ts).internal)
|
||||
self.assertFalse(broker.is_reclaimable(float(next(self.ts)), 604800))
|
||||
self.assertTrue(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
|
||||
# adding a shard range makes us unreclaimable
|
||||
sr = ShardRange('.shards_a/shard_c', next(self.ts), object_count=0)
|
||||
broker.merge_shard_ranges([sr])
|
||||
self.assertFalse(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
# ... but still "deleted"
|
||||
self.assertTrue(broker.is_deleted())
|
||||
# ... until the shard range is deleted
|
||||
sr.set_deleted(next(self.ts))
|
||||
broker.merge_shard_ranges([sr])
|
||||
self.assertTrue(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
|
||||
# adding an object makes us unreclaimable
|
||||
obj = {'name': 'o', 'created_at': next(self.ts).internal,
|
||||
'size': 0, 'content_type': 'text/plain', 'etag': EMPTY_ETAG,
|
||||
'deleted': 0}
|
||||
broker.merge_items([dict(obj)])
|
||||
self.assertFalse(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
# ... and "not deleted"
|
||||
self.assertFalse(broker.is_deleted())
|
||||
|
||||
@with_tempdir
|
||||
def test_sharding_state_is_not_reclaimable(self, tempdir):
|
||||
db_path = os.path.join(
|
||||
tempdir, 'containers', 'part', 'suffix', 'hash', 'container.db')
|
||||
broker = ContainerBroker(db_path, account='a', container='c')
|
||||
broker.initialize(next(self.ts).internal, 0)
|
||||
broker.enable_sharding(next(self.ts))
|
||||
broker.set_sharding_state()
|
||||
broker.delete_db(next(self.ts).internal)
|
||||
self.assertTrue(broker.is_deleted())
|
||||
# we won't reclaim in SHARDING state
|
||||
self.assertEqual(SHARDING, broker.get_db_state())
|
||||
self.assertFalse(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
# ... but if we find one stuck like this it's easy enough to fix
|
||||
broker.set_sharded_state()
|
||||
self.assertTrue(broker.is_reclaimable(float(next(self.ts)), 0))
|
||||
|
||||
@with_tempdir
|
||||
def test_is_deleted(self, tempdir):
|
||||
# Test ContainerBroker.is_deleted() and get_info_is_deleted()
|
||||
|
@ -4699,6 +4699,46 @@ class TestSharder(BaseTestSharder):
|
||||
self.assert_no_audit_messages(sharder, mock_swift)
|
||||
self.assertTrue(broker.is_deleted())
|
||||
|
||||
def test_audit_deleted_root_container(self):
|
||||
broker = self._make_broker()
|
||||
shard_bounds = (
|
||||
('a', 'b'), ('b', 'c'), ('c', 'd'), ('d', 'e'), ('e', 'f'))
|
||||
shard_ranges = self._make_shard_ranges(shard_bounds, ShardRange.ACTIVE)
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._audit_container(broker)
|
||||
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||
|
||||
# delete it
|
||||
delete_ts = next(self.ts_iter)
|
||||
broker.delete_db(delete_ts.internal)
|
||||
with self._mock_sharder() as sharder:
|
||||
sharder._audit_container(broker)
|
||||
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||
|
||||
# advance time
|
||||
with mock.patch('swift.container.sharder.time.time') as fake_time, \
|
||||
self._mock_sharder() as sharder:
|
||||
fake_time.return_value = 6048000 + float(delete_ts)
|
||||
sharder._audit_container(broker)
|
||||
message = 'Reclaimable db stuck waiting for shrinking: %s (%s)' % (
|
||||
broker.db_file, broker.path)
|
||||
self.assertEqual([message], self.logger.get_lines_for_level('warning'))
|
||||
|
||||
# delete all shard ranges
|
||||
for sr in shard_ranges:
|
||||
sr.update_state(ShardRange.SHRUNK, Timestamp.now())
|
||||
sr.deleted = True
|
||||
sr.timestamp = Timestamp.now()
|
||||
broker.merge_shard_ranges(shard_ranges)
|
||||
|
||||
# no more warning
|
||||
with mock.patch('swift.container.sharder.time.time') as fake_time, \
|
||||
self._mock_sharder() as sharder:
|
||||
fake_time.return_value = 6048000 + float(delete_ts)
|
||||
sharder._audit_container(broker)
|
||||
self.assertEqual([], self.logger.get_lines_for_level('warning'))
|
||||
|
||||
def test_audit_old_style_shard_container(self):
|
||||
self._do_test_audit_shard_container('Root', 'a/c')
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user