diff --git a/swift/container/sharder.py b/swift/container/sharder.py index c87912e10f..3912fb35d6 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -197,7 +197,8 @@ def find_shrinking_candidates(broker, shrink_threshold, merge_size): class CleavingContext(object): def __init__(self, ref, cursor='', max_row=None, cleave_to_row=None, last_cleave_to_row=None, cleaving_done=False, - misplaced_done=False, ranges_done=0, ranges_todo=0): + misplaced_done=False, ranges_done=0, ranges_todo=0, + last_modified=None): self.ref = ref self._cursor = None self.cursor = cursor @@ -208,6 +209,7 @@ class CleavingContext(object): self.misplaced_done = misplaced_done self.ranges_done = ranges_done self.ranges_todo = ranges_todo + self.last_modified = last_modified def __iter__(self): yield 'ref', self.ref @@ -219,6 +221,7 @@ class CleavingContext(object): yield 'misplaced_done', self.misplaced_done yield 'ranges_done', self.ranges_done yield 'ranges_todo', self.ranges_todo + yield 'last_modified', self.last_modified def _encode(cls, value): if value is not None and six.PY2 and isinstance(value, six.text_type): @@ -241,6 +244,26 @@ class CleavingContext(object): def _make_ref(cls, broker): return broker.get_info()['id'] + @classmethod + def load_all(cls, broker): + """ + Returns all cleaving contexts stored in the broker. + + :param broker: + :return: list of CleavingContexts + """ + brokers = broker.get_brokers() + sysmeta = brokers[-1].get_sharding_sysmeta() + + for key, val in sysmeta.items(): + # If the value is of length 0, then the metadata is + # marked for deletion + if key.startswith("Context-") and len(val) > 0: + try: + yield cls(**json.loads(val)) + except ValueError: + continue + @classmethod def load(cls, broker): """ @@ -265,6 +288,7 @@ class CleavingContext(object): return cls(**data) def store(self, broker): + self.last_modified = Timestamp.now().internal broker.set_sharding_sysmeta('Context-' + self.ref, json.dumps(dict(self))) @@ -287,6 +311,11 @@ class CleavingContext(object): return all((self.misplaced_done, self.cleaving_done, self.max_row == self.cleave_to_row)) + def delete(self, broker): + # These will get reclaimed when `_reclaim_metadata` in + # common/db.py is called. + broker.set_sharding_sysmeta('Context-' + self.ref, '') + DEFAULT_SHARD_CONTAINER_THRESHOLD = 1000000 DEFAULT_SHARD_SHRINK_POINT = 25 @@ -724,12 +753,23 @@ class ContainerSharder(ContainerReplicator): self._increment_stat('audit_shard', 'success', statsd=True) return True + def _audit_cleave_contexts(self, broker): + for context in CleavingContext.load_all(broker): + now = Timestamp.now() + last_mod = context.last_modified + if not last_mod: + context.store(broker) + elif Timestamp(last_mod).timestamp + self.reclaim_age < \ + now.timestamp: + context.delete(broker) + def _audit_container(self, broker): if broker.is_deleted(): # if the container has been marked as deleted, all metadata will # have been erased so no point auditing. But we want it to pass, in # case any objects exist inside it. return True + self._audit_cleave_contexts(broker) if broker.is_root_container(): return self._audit_root_container(broker) return self._audit_shard_container(broker) @@ -1307,6 +1347,7 @@ class ContainerSharder(ContainerReplicator): modified_shard_ranges.append(own_shard_range) broker.merge_shard_ranges(modified_shard_ranges) if broker.set_sharded_state(): + cleaving_context.delete(broker) return True else: self.logger.warning( diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 11f6420e4c..4fb2245397 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -201,6 +201,7 @@ class TestManageShardRanges(unittest.TestCase): ' "cleaving_done": false,', ' "cursor": "",', ' "last_cleave_to_row": null,', + ' "last_modified": null,', ' "max_row": -1,', ' "misplaced_done": false,', ' "ranges_done": 0,', diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 1f499c4151..a563671802 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -21,6 +21,7 @@ import os import shutil from contextlib import contextmanager from tempfile import mkdtemp +from uuid import uuid4 import mock import unittest @@ -4486,6 +4487,83 @@ class TestSharder(BaseTestSharder): set((call[0][0].path, call[0][1]['id'], call[0][2]) for call in mock_process_broker.call_args_list)) + def test_audit_cleave_contexts(self): + + def add_cleave_context(id, last_modified): + params = {'ref': id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + if last_modified is not None: + params['last_modified'] = last_modified + key = 'X-Container-Sysmeta-Shard-Context-%s' % id + with mock_timestamp_now(Timestamp(0)): + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + + def get_context(id, broker): + data = broker.get_sharding_sysmeta().get('Context-%s' % id) + if data: + return CleavingContext(**json.loads(data)) + return data + + reclaim_age = 100 + broker = self._make_broker() + + # sanity check + self.assertIsNone(broker.get_own_shard_range(no_default=True)) + self.assertEqual(UNSHARDED, broker.get_db_state()) + + # Setup some cleaving contexts + id_missing_lm, id_old, id_newish = [str(uuid4()) for _ in range(3)] + contexts = ((id_missing_lm, None), + (id_old, 1), + (id_newish, reclaim_age // 2)) + for id, last_modified in contexts: + add_cleave_context(id, last_modified) + + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age + 2)): + sharder._audit_cleave_contexts(broker) + + # now is reclaim_age + 1, so the old should've been removed and the + # context with the missing last modified should now be reclaim + 1 + missing_lm_ctx = get_context(id_missing_lm, broker) + self.assertEqual(missing_lm_ctx.last_modified, + Timestamp(reclaim_age + 2).internal) + + old_ctx = get_context(id_old, broker) + self.assertEqual(old_ctx, "") + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx.ref, id_newish) + + # If we push time another reclaim age later, and they all be removed + # minus id_missing_lm as it has a later last_modified. + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age * 2)): + sharder._audit_cleave_contexts(broker) + + missing_lm_ctx = get_context(id_missing_lm, broker) + self.assertEqual(missing_lm_ctx.last_modified, + Timestamp(reclaim_age + 2).internal) + + newish_ctx = get_context(id_newish, broker) + self.assertEqual(newish_ctx, "") + + # Fast forward again and they're all cleaned up + with self._mock_sharder({'reclaim_age': str(reclaim_age)}) as sharder: + with mock_timestamp_now(Timestamp(reclaim_age * 3)): + sharder._audit_cleave_contexts(broker) + + missing_lm_ctx = get_context(id_missing_lm, broker) + self.assertEqual(missing_lm_ctx, "") + class TestCleavingContext(BaseTestSharder): def test_init(self): @@ -4505,6 +4583,7 @@ class TestCleavingContext(BaseTestSharder): 'max_row': 12, 'cleave_to_row': 11, 'last_cleave_to_row': 10, + 'last_modified': None, 'cleaving_done': False, 'misplaced_done': True, 'ranges_done': 0, @@ -4524,6 +4603,7 @@ class TestCleavingContext(BaseTestSharder): 'max_row': 12, 'cleave_to_row': 11, 'last_cleave_to_row': 10, + 'last_modified': None, 'cleaving_done': False, 'misplaced_done': True, 'ranges_done': 0, @@ -4571,11 +4651,102 @@ class TestCleavingContext(BaseTestSharder): self.assertEqual(2, ctx.ranges_done) self.assertEqual(4, ctx.ranges_todo) + def test_load_all(self): + broker = self._make_broker() + last_ctx = None + + db_ids = [str(uuid4()) for _ in range(6)] + for db_id in db_ids: + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + for ctx in CleavingContext.load_all(broker): + last_ctx = ctx + self.assertIn(ctx.ref, db_ids) + + # If a context is deleted (metadata is "") then it's skipped + last_ctx.delete(broker) + db_ids.remove(last_ctx.ref) + + for ctx in CleavingContext.load_all(broker): + self.assertIn(ctx.ref, db_ids) + + def test_delete(self): + broker = self._make_broker() + + db_id = broker.get_info()['id'] + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + ctx = CleavingContext.load(broker) + self.assertEqual(db_id, ctx.ref) + + # Now let's delete it. When deleted the metadata key will exist, but + # the value will be "" as this means it'll be reaped later. + ctx.delete(broker) + sysmeta = broker.get_sharding_sysmeta() + for key, val in sysmeta.items(): + if key == "Context-%s" % db_id: + self.assertEqual(val, "") + break + else: + self.fail("Deleted context 'Context-%s' not found") + + def test_last_modified(self): + broker = self._make_broker() + + db_id = broker.get_info()['id'] + params = {'ref': db_id, + 'cursor': 'curs', + 'max_row': 2, + 'cleave_to_row': 2, + 'last_cleave_to_row': 1, + 'cleaving_done': False, + 'misplaced_done': True, + 'ranges_done': 2, + 'ranges_todo': 4} + key = 'X-Container-Sysmeta-Shard-Context-%s' % db_id + broker.update_metadata( + {key: (json.dumps(params), Timestamp.now().internal)}) + ctx = CleavingContext.load(broker) + self.assertIsNone(ctx.last_modified) + + # after a store/save the last_modified will be updated + ctx.store(broker) + ctx = CleavingContext.load(broker) + self.assertIsNotNone(ctx.last_modified) + last_modified = ctx.last_modified + + # Store again it'll be updated again + ctx.store(broker) + ctx = CleavingContext.load(broker) + self.assertGreater(ctx.last_modified, last_modified) + def test_store(self): broker = self._make_sharding_broker() old_db_id = broker.get_brokers()[0].get_info()['id'] + last_mod = Timestamp.now() ctx = CleavingContext(old_db_id, 'curs', 12, 11, 2, True, True, 2, 4) - ctx.store(broker) + with mock_timestamp_now(last_mod): + ctx.store(broker) key = 'X-Container-Sysmeta-Shard-Context-%s' % old_db_id data = json.loads(broker.metadata[key][0]) expected = {'ref': old_db_id, @@ -4583,6 +4754,7 @@ class TestCleavingContext(BaseTestSharder): 'max_row': 12, 'cleave_to_row': 11, 'last_cleave_to_row': 2, + 'last_modified': last_mod.internal, 'cleaving_done': True, 'misplaced_done': True, 'ranges_done': 2,