diff --git a/swift/common/utils.py b/swift/common/utils.py index 3014bc6176..23a137e6ca 100644 --- a/swift/common/utils.py +++ b/swift/common/utils.py @@ -4869,6 +4869,8 @@ class ShardRange(object): value. :param epoch: optional epoch timestamp which represents the time at which sharding was enabled for a container. + :param reported: optional indicator that this shard and its stats have + been reported to the root container. """ FOUND = 10 CREATED = 20 @@ -4919,7 +4921,8 @@ class ShardRange(object): def __init__(self, name, timestamp, lower=MIN, upper=MAX, object_count=0, bytes_used=0, meta_timestamp=None, - deleted=False, state=None, state_timestamp=None, epoch=None): + deleted=False, state=None, state_timestamp=None, epoch=None, + reported=False): self.account = self.container = self._timestamp = \ self._meta_timestamp = self._state_timestamp = self._epoch = None self._lower = ShardRange.MIN @@ -4938,6 +4941,7 @@ class ShardRange(object): self.state = self.FOUND if state is None else state self.state_timestamp = state_timestamp self.epoch = epoch + self.reported = reported @classmethod def _encode(cls, value): @@ -5118,8 +5122,14 @@ class ShardRange(object): cast to an int, or if meta_timestamp is neither None nor can be cast to a :class:`~swift.common.utils.Timestamp`. """ - self.object_count = int(object_count) - self.bytes_used = int(bytes_used) + if self.object_count != int(object_count): + self.object_count = int(object_count) + self.reported = False + + if self.bytes_used != int(bytes_used): + self.bytes_used = int(bytes_used) + self.reported = False + if meta_timestamp is None: self.meta_timestamp = Timestamp.now() else: @@ -5200,6 +5210,14 @@ class ShardRange(object): def epoch(self, epoch): self._epoch = self._to_timestamp(epoch) + @property + def reported(self): + return self._reported + + @reported.setter + def reported(self, value): + self._reported = bool(value) + def update_state(self, state, state_timestamp=None): """ Set state to the given value and optionally update the state_timestamp @@ -5216,6 +5234,7 @@ class ShardRange(object): self.state = state if state_timestamp is not None: self.state_timestamp = state_timestamp + self.reported = False return True @property @@ -5338,6 +5357,7 @@ class ShardRange(object): yield 'state', self.state yield 'state_timestamp', self.state_timestamp.internal yield 'epoch', self.epoch.internal if self.epoch is not None else None + yield 'reported', 1 if self.reported else 0 def copy(self, timestamp=None, **kwargs): """ @@ -5369,7 +5389,8 @@ class ShardRange(object): params['name'], params['timestamp'], params['lower'], params['upper'], params['object_count'], params['bytes_used'], params['meta_timestamp'], params['deleted'], params['state'], - params['state_timestamp'], params['epoch']) + params['state_timestamp'], params['epoch'], + params.get('reported', 0)) def find_shard_range(item, ranges): diff --git a/swift/container/backend.py b/swift/container/backend.py index 35a8081bca..bdf34f7d88 100644 --- a/swift/container/backend.py +++ b/swift/container/backend.py @@ -60,7 +60,7 @@ SHARD_UPDATE_STATES = [ShardRange.CREATED, ShardRange.CLEAVED, # tuples and vice-versa SHARD_RANGE_KEYS = ('name', 'timestamp', 'lower', 'upper', 'object_count', 'bytes_used', 'meta_timestamp', 'deleted', 'state', - 'state_timestamp', 'epoch') + 'state_timestamp', 'epoch', 'reported') POLICY_STAT_TABLE_CREATE = ''' CREATE TABLE policy_stat ( @@ -267,6 +267,7 @@ def merge_shards(shard_data, existing): if existing['timestamp'] < shard_data['timestamp']: # note that currently we do not roll forward any meta or state from # an item that was created at older time, newer created time trumps + shard_data['reported'] = 0 # reset the latch return True elif existing['timestamp'] > shard_data['timestamp']: return False @@ -283,6 +284,18 @@ def merge_shards(shard_data, existing): else: new_content = True + # We can latch the reported flag + if existing['reported'] and \ + existing['object_count'] == shard_data['object_count'] and \ + existing['bytes_used'] == shard_data['bytes_used'] and \ + existing['state'] == shard_data['state'] and \ + existing['epoch'] == shard_data['epoch']: + shard_data['reported'] = 1 + else: + shard_data.setdefault('reported', 0) + if shard_data['reported'] and not existing['reported']: + new_content = True + if (existing['state_timestamp'] == shard_data['state_timestamp'] and shard_data['state'] > existing['state']): new_content = True @@ -595,7 +608,8 @@ class ContainerBroker(DatabaseBroker): deleted INTEGER DEFAULT 0, state INTEGER, state_timestamp TEXT, - epoch TEXT + epoch TEXT, + reported INTEGER DEFAULT 0 ); """ % SHARD_RANGE_TABLE) @@ -1428,10 +1442,13 @@ class ContainerBroker(DatabaseBroker): # sqlite3.OperationalError: cannot start a transaction # within a transaction conn.rollback() - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): - raise - self.create_shard_range_table(conn) - return _really_merge_items(conn) + if 'no such column: reported' in str(err): + self._migrate_add_shard_range_reported(conn) + return _really_merge_items(conn) + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + self.create_shard_range_table(conn) + return _really_merge_items(conn) + raise def get_reconciler_sync(self): with self.get() as conn: @@ -1579,6 +1596,17 @@ class ContainerBroker(DatabaseBroker): CONTAINER_STAT_VIEW_SCRIPT + 'COMMIT;') + def _migrate_add_shard_range_reported(self, conn): + """ + Add the reported column to the 'shard_range' table. + """ + conn.executescript(''' + BEGIN; + ALTER TABLE %s + ADD COLUMN reported INTEGER DEFAULT 0; + COMMIT; + ''' % SHARD_RANGE_TABLE) + def _reclaim_other_stuff(self, conn, age_timestamp, sync_timestamp): super(ContainerBroker, self)._reclaim_other_stuff( conn, age_timestamp, sync_timestamp) @@ -1628,7 +1656,7 @@ class ContainerBroker(DatabaseBroker): elif states is not None: included_states.add(states) - def do_query(conn): + def do_query(conn, use_reported_column=True): condition = '' conditions = [] params = [] @@ -1646,21 +1674,27 @@ class ContainerBroker(DatabaseBroker): params.append(self.path) if conditions: condition = ' WHERE ' + ' AND '.join(conditions) + if use_reported_column: + columns = SHARD_RANGE_KEYS + else: + columns = SHARD_RANGE_KEYS[:-1] + ('0 as reported', ) sql = ''' SELECT %s FROM %s%s; - ''' % (', '.join(SHARD_RANGE_KEYS), SHARD_RANGE_TABLE, condition) + ''' % (', '.join(columns), SHARD_RANGE_TABLE, condition) data = conn.execute(sql, params) data.row_factory = None return [row for row in data] - try: - with self.maybe_get(connection) as conn: + with self.maybe_get(connection) as conn: + try: return do_query(conn) - except sqlite3.OperationalError as err: - if ('no such table: %s' % SHARD_RANGE_TABLE) not in str(err): + except sqlite3.OperationalError as err: + if ('no such table: %s' % SHARD_RANGE_TABLE) in str(err): + return [] + if 'no such column: reported' in str(err): + return do_query(conn, use_reported_column=False) raise - return [] @classmethod def resolve_shard_range_states(cls, states): diff --git a/swift/container/sharder.py b/swift/container/sharder.py index d0652143e8..dd33043ae4 100644 --- a/swift/container/sharder.py +++ b/swift/container/sharder.py @@ -618,7 +618,8 @@ class ContainerSharder(ContainerReplicator): def _send_shard_ranges(self, account, container, shard_ranges, headers=None): - body = json.dumps([dict(sr) for sr in shard_ranges]).encode('ascii') + body = json.dumps([dict(sr, reported=0) + for sr in shard_ranges]).encode('ascii') part, nodes = self.ring.get_nodes(account, container) headers = headers or {} headers.update({'X-Backend-Record-Type': RECORD_TYPE_SHARD, @@ -1469,7 +1470,7 @@ class ContainerSharder(ContainerReplicator): def _update_root_container(self, broker): own_shard_range = broker.get_own_shard_range(no_default=True) - if not own_shard_range: + if not own_shard_range or own_shard_range.reported: return # persist the reported shard metadata @@ -1479,9 +1480,12 @@ class ContainerSharder(ContainerReplicator): include_own=True, include_deleted=True) # send everything - self._send_shard_ranges( - broker.root_account, broker.root_container, - shard_ranges) + if self._send_shard_ranges( + broker.root_account, broker.root_container, shard_ranges): + # on success, mark ourselves as reported so we don't keep + # hammering the root + own_shard_range.reported = True + broker.merge_shard_ranges(own_shard_range) def _process_broker(self, broker, node, part): broker.get_info() # make sure account/container are populated diff --git a/test/unit/cli/test_manage_shard_ranges.py b/test/unit/cli/test_manage_shard_ranges.py index 65bcd0dd61..7f0aa8857d 100644 --- a/test/unit/cli/test_manage_shard_ranges.py +++ b/test/unit/cli/test_manage_shard_ranges.py @@ -189,6 +189,7 @@ class TestManageShardRanges(unittest.TestCase): ' "meta_timestamp": "%s",' % now.internal, ' "name": "a/c",', ' "object_count": 0,', + ' "reported": 0,', ' "state": "sharding",', ' "state_timestamp": "%s",' % now.internal, ' "timestamp": "%s",' % now.internal, @@ -230,6 +231,7 @@ class TestManageShardRanges(unittest.TestCase): ' "meta_timestamp": "%s",' % now.internal, ' "name": "a/c",', ' "object_count": 0,', + ' "reported": 0,', ' "state": "sharding",', ' "state_timestamp": "%s",' % now.internal, ' "timestamp": "%s",' % now.internal, diff --git a/test/unit/common/test_utils.py b/test/unit/common/test_utils.py index fa5a057fc9..26f70656b0 100644 --- a/test/unit/common/test_utils.py +++ b/test/unit/common/test_utils.py @@ -7354,7 +7354,8 @@ class TestShardRange(unittest.TestCase): upper='', object_count=0, bytes_used=0, meta_timestamp=ts_1.internal, deleted=0, state=utils.ShardRange.FOUND, - state_timestamp=ts_1.internal, epoch=None) + state_timestamp=ts_1.internal, epoch=None, + reported=0) assert_initialisation_ok(dict(empty_run, name='a/c', timestamp=ts_1), expect) assert_initialisation_ok(dict(name='a/c', timestamp=ts_1), expect) @@ -7363,11 +7364,13 @@ class TestShardRange(unittest.TestCase): upper='u', object_count=2, bytes_used=10, meta_timestamp=ts_2, deleted=0, state=utils.ShardRange.CREATED, - state_timestamp=ts_3.internal, epoch=ts_4) + state_timestamp=ts_3.internal, epoch=ts_4, + reported=0) expect.update({'lower': 'l', 'upper': 'u', 'object_count': 2, 'bytes_used': 10, 'meta_timestamp': ts_2.internal, 'state': utils.ShardRange.CREATED, - 'state_timestamp': ts_3.internal, 'epoch': ts_4}) + 'state_timestamp': ts_3.internal, 'epoch': ts_4, + 'reported': 0}) assert_initialisation_ok(good_run.copy(), expect) # obj count and bytes used as int strings @@ -7385,6 +7388,11 @@ class TestShardRange(unittest.TestCase): assert_initialisation_ok(good_deleted, dict(expect, deleted=1)) + good_reported = good_run.copy() + good_reported['reported'] = 1 + assert_initialisation_ok(good_reported, + dict(expect, reported=1)) + assert_initialisation_fails(dict(good_run, timestamp='water balloon')) assert_initialisation_fails( @@ -7423,7 +7431,7 @@ class TestShardRange(unittest.TestCase): 'upper': upper, 'object_count': 10, 'bytes_used': 100, 'meta_timestamp': ts_2.internal, 'deleted': 0, 'state': utils.ShardRange.FOUND, 'state_timestamp': ts_3.internal, - 'epoch': ts_4} + 'epoch': ts_4, 'reported': 0} self.assertEqual(expected, sr_dict) self.assertIsInstance(sr_dict['lower'], six.string_types) self.assertIsInstance(sr_dict['upper'], six.string_types) @@ -7438,6 +7446,14 @@ class TestShardRange(unittest.TestCase): for key in sr_dict: bad_dict = dict(sr_dict) bad_dict.pop(key) + if key == 'reported': + # This was added after the fact, and we need to be able to eat + # data from old servers + utils.ShardRange.from_dict(bad_dict) + utils.ShardRange(**bad_dict) + continue + + # The rest were present from the beginning with self.assertRaises(KeyError): utils.ShardRange.from_dict(bad_dict) # But __init__ still (generally) works! diff --git a/test/unit/container/test_backend.py b/test/unit/container/test_backend.py index 95d09a21f7..37308c154d 100644 --- a/test/unit/container/test_backend.py +++ b/test/unit/container/test_backend.py @@ -735,10 +735,12 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(info['put_timestamp'], start.internal) self.assertTrue(Timestamp(info['created_at']) >= start) self.assertEqual(info['delete_timestamp'], '0') - if self.__class__ in (TestContainerBrokerBeforeMetadata, - TestContainerBrokerBeforeXSync, - TestContainerBrokerBeforeSPI, - TestContainerBrokerBeforeShardRanges): + if self.__class__ in ( + TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI, + TestContainerBrokerBeforeShardRanges, + TestContainerBrokerBeforeShardRangeReportedColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(info['status_changed_at'], @@ -1025,6 +1027,8 @@ class TestContainerBroker(unittest.TestCase): "SELECT object_count FROM shard_range").fetchone()[0], 0) self.assertEqual(conn.execute( "SELECT bytes_used FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 0) # Reput same event broker.merge_shard_ranges( @@ -1050,6 +1054,64 @@ class TestContainerBroker(unittest.TestCase): "SELECT object_count FROM shard_range").fetchone()[0], 0) self.assertEqual(conn.execute( "SELECT bytes_used FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 0) + + # Mark it as reported + broker.merge_shard_ranges( + ShardRange('"a/{}"', timestamp, + 'low', 'up', meta_timestamp=meta_timestamp, + reported=True)) + with broker.get() as conn: + self.assertEqual(conn.execute( + "SELECT name FROM shard_range").fetchone()[0], + '"a/{}"') + self.assertEqual(conn.execute( + "SELECT timestamp FROM shard_range").fetchone()[0], + timestamp) + self.assertEqual(conn.execute( + "SELECT meta_timestamp FROM shard_range").fetchone()[0], + meta_timestamp) + self.assertEqual(conn.execute( + "SELECT lower FROM shard_range").fetchone()[0], 'low') + self.assertEqual(conn.execute( + "SELECT upper FROM shard_range").fetchone()[0], 'up') + self.assertEqual(conn.execute( + "SELECT deleted FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT object_count FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT bytes_used FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 1) + + # Reporting latches it + broker.merge_shard_ranges( + ShardRange('"a/{}"', timestamp, + 'low', 'up', meta_timestamp=meta_timestamp, + reported=False)) + with broker.get() as conn: + self.assertEqual(conn.execute( + "SELECT name FROM shard_range").fetchone()[0], + '"a/{}"') + self.assertEqual(conn.execute( + "SELECT timestamp FROM shard_range").fetchone()[0], + timestamp) + self.assertEqual(conn.execute( + "SELECT meta_timestamp FROM shard_range").fetchone()[0], + meta_timestamp) + self.assertEqual(conn.execute( + "SELECT lower FROM shard_range").fetchone()[0], 'low') + self.assertEqual(conn.execute( + "SELECT upper FROM shard_range").fetchone()[0], 'up') + self.assertEqual(conn.execute( + "SELECT deleted FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT object_count FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT bytes_used FROM shard_range").fetchone()[0], 0) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 1) # Put new event timestamp = next(self.ts).internal @@ -1077,11 +1139,14 @@ class TestContainerBroker(unittest.TestCase): "SELECT object_count FROM shard_range").fetchone()[0], 1) self.assertEqual(conn.execute( "SELECT bytes_used FROM shard_range").fetchone()[0], 2) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 0) # Put old event broker.merge_shard_ranges( ShardRange('"a/{}"', old_put_timestamp, - 'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp)) + 'lower', 'upper', 1, 2, meta_timestamp=meta_timestamp, + reported=True)) with broker.get() as conn: self.assertEqual(conn.execute( "SELECT name FROM shard_range").fetchone()[0], @@ -1102,6 +1167,8 @@ class TestContainerBroker(unittest.TestCase): "SELECT object_count FROM shard_range").fetchone()[0], 1) self.assertEqual(conn.execute( "SELECT bytes_used FROM shard_range").fetchone()[0], 2) + self.assertEqual(conn.execute( + "SELECT reported FROM shard_range").fetchone()[0], 0) # Put old delete event broker.merge_shard_ranges( @@ -1978,10 +2045,12 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(info['hash'], '00000000000000000000000000000000') self.assertEqual(info['put_timestamp'], Timestamp(1).internal) self.assertEqual(info['delete_timestamp'], '0') - if self.__class__ in (TestContainerBrokerBeforeMetadata, - TestContainerBrokerBeforeXSync, - TestContainerBrokerBeforeSPI, - TestContainerBrokerBeforeShardRanges): + if self.__class__ in ( + TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI, + TestContainerBrokerBeforeShardRanges, + TestContainerBrokerBeforeShardRangeReportedColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(info['status_changed_at'], @@ -3275,10 +3344,12 @@ class TestContainerBroker(unittest.TestCase): self.assertEqual(0, info['storage_policy_index']) # sanity check self.assertEqual(0, info['object_count']) self.assertEqual(0, info['bytes_used']) - if self.__class__ in (TestContainerBrokerBeforeMetadata, - TestContainerBrokerBeforeXSync, - TestContainerBrokerBeforeSPI, - TestContainerBrokerBeforeShardRanges): + if self.__class__ in ( + TestContainerBrokerBeforeMetadata, + TestContainerBrokerBeforeXSync, + TestContainerBrokerBeforeSPI, + TestContainerBrokerBeforeShardRanges, + TestContainerBrokerBeforeShardRangeReportedColumn): self.assertEqual(info['status_changed_at'], '0') else: self.assertEqual(timestamp.internal, info['status_changed_at']) @@ -5315,6 +5386,75 @@ class TestContainerBrokerBeforeShardRanges(ContainerBrokerMigrationMixin, FROM shard_range''') +def pre_reported_create_shard_range_table(self, conn): + """ + Copied from ContainerBroker before the + reported column was added; used for testing with + TestContainerBrokerBeforeShardRangeReportedColumn. + + Create a shard_range table with no 'reported' column. + + :param conn: DB connection object + """ + conn.execute(""" + CREATE TABLE shard_range ( + ROWID INTEGER PRIMARY KEY AUTOINCREMENT, + name TEXT, + timestamp TEXT, + lower TEXT, + upper TEXT, + object_count INTEGER DEFAULT 0, + bytes_used INTEGER DEFAULT 0, + meta_timestamp TEXT, + deleted INTEGER DEFAULT 0, + state INTEGER, + state_timestamp TEXT, + epoch TEXT + ); + """) + + conn.execute(""" + CREATE TRIGGER shard_range_update BEFORE UPDATE ON shard_range + BEGIN + SELECT RAISE(FAIL, 'UPDATE not allowed; DELETE and INSERT'); + END; + """) + + +class TestContainerBrokerBeforeShardRangeReportedColumn( + ContainerBrokerMigrationMixin, TestContainerBroker): + """ + Tests for ContainerBroker against databases created + before the shard_ranges table was added. + """ + # *grumble grumble* This should include container_info/policy_stat :-/ + expected_db_tables = {'outgoing_sync', 'incoming_sync', 'object', + 'sqlite_sequence', 'container_stat', 'shard_range'} + + def setUp(self): + super(TestContainerBrokerBeforeShardRangeReportedColumn, + self).setUp() + ContainerBroker.create_shard_range_table = \ + pre_reported_create_shard_range_table + + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(Timestamp('1').internal, 0) + with self.assertRaises(sqlite3.DatabaseError) as raised, \ + broker.get() as conn: + conn.execute('''SELECT reported + FROM shard_range''') + self.assertIn('no such column: reported', str(raised.exception)) + + def tearDown(self): + super(TestContainerBrokerBeforeShardRangeReportedColumn, + self).tearDown() + broker = ContainerBroker(':memory:', account='a', container='c') + broker.initialize(Timestamp('1').internal, 0) + with broker.get() as conn: + conn.execute('''SELECT reported + FROM shard_range''') + + class TestUpdateNewItemFromExisting(unittest.TestCase): # TODO: add test scenarios that have swift_bytes in content_type t0 = '1234567890.00000' diff --git a/test/unit/container/test_sharder.py b/test/unit/container/test_sharder.py index 43730a5d98..a54ddb652b 100644 --- a/test/unit/container/test_sharder.py +++ b/test/unit/container/test_sharder.py @@ -4189,6 +4189,7 @@ class TestSharder(BaseTestSharder): def capture_send(conn, data): bodies.append(data) + self.assertFalse(broker.get_own_shard_range().reported) # sanity with self._mock_sharder() as sharder: with mocked_http_conn(204, 204, 204, give_send=capture_send) as mock_conn: @@ -4198,6 +4199,7 @@ class TestSharder(BaseTestSharder): self.assertEqual('PUT', req['method']) self.assertEqual([expected_sent] * 3, [json.loads(b) for b in bodies]) + self.assertTrue(broker.get_own_shard_range().reported) def test_update_root_container_own_range(self): broker = self._make_broker() @@ -4230,6 +4232,32 @@ class TestSharder(BaseTestSharder): with annotate_failure(state): check_only_own_shard_range_sent(state) + def test_update_root_container_already_reported(self): + broker = self._make_broker() + + def check_already_reported_not_sent(state): + own_shard_range = broker.get_own_shard_range() + + own_shard_range.reported = True + self.assertTrue(own_shard_range.update_state( + state, state_timestamp=next(self.ts_iter))) + # Check that updating state clears the flag + self.assertFalse(own_shard_range.reported) + + # If we claim to have already updated... + own_shard_range.reported = True + broker.merge_shard_ranges([own_shard_range]) + + # ... then there's nothing to send + with self._mock_sharder() as sharder: + with mocked_http_conn() as mock_conn: + sharder._update_root_container(broker) + self.assertFalse(mock_conn.requests) + + for state in ShardRange.STATES: + with annotate_failure(state): + check_already_reported_not_sent(state) + def test_update_root_container_all_ranges(self): broker = self._make_broker() other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))