Sharding: No stat updates before CLEAVED state

Once a shard container has been created as part of the sharder cycle it
pulls the shards own_shard_range, updates the object_count and
bytes_used and pushes this to the root container. The root container can
use these to display the current container stats.

However, it is not until a shard gets to the CLEAVED state, that it
holds enough information for it's namespace, so before this the number
it returns is incorrect.  Further, when we find and create a shard, it
starts out with the number of objects, at the time, that are expected to
go into them. This is better answer then, say, nothing.

So it's better for the shard to send it's current own_shard_range but
don't update the stats until it can be authoritive of that answer.
This patch adds a new SHARD_UPDATE_STAT_STATES that track what
ShardRange states a shard needs to be in in order to be responsible,
current definition is:

    SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE,
                                ShardRange.SHARDING, ShardRange.SHARDED,
                                ShardRange.SHRINKING, ShardRange.SHRUNK]

As we don't want to update the OSR stats and the meta_timestmap, also
move tombstone updates to only happen when in a SHARD_UPDATE_STAT_STATES
state.

Change-Id: I838dbba3c791fffa6a36ffdcf73eceeaff718373
This commit is contained in:
Matthew Oliver 2022-08-08 16:25:55 +10:00
parent 9063ea0ac7
commit ece4b04e82
4 changed files with 122 additions and 48 deletions

View File

@ -61,6 +61,12 @@ SHARD_AUDITING_STATES = [ShardRange.CREATED, ShardRange.CLEAVED,
ShardRange.ACTIVE, ShardRange.SHARDING, ShardRange.ACTIVE, ShardRange.SHARDING,
ShardRange.SHARDED, ShardRange.SHRINKING, ShardRange.SHARDED, ShardRange.SHRINKING,
ShardRange.SHRUNK] ShardRange.SHRUNK]
# shard's may not be fully populated while in the FOUND and CREATED
# state, so shards should only update their own shard range's object
# stats when they are in the following states
SHARD_UPDATE_STAT_STATES = [ShardRange.CLEAVED, ShardRange.ACTIVE,
ShardRange.SHARDING, ShardRange.SHARDED,
ShardRange.SHRINKING, ShardRange.SHRUNK]
# attribute names in order used when transforming shard ranges from dicts to # attribute names in order used when transforming shard ranges from dicts to
# tuples and vice-versa # tuples and vice-versa
@ -2351,10 +2357,14 @@ class ContainerBroker(DatabaseBroker):
# object_count # object_count
shard_size = object_count - progress shard_size = object_count - progress
# NB shard ranges are created with a non-zero object count so that # NB shard ranges are created with a non-zero object count for a
# the apparent container object count remains constant, and the # few reasons:
# container is non-deletable while shards have been found but not # 1. so that the apparent container object count remains
# yet cleaved # consistent;
# 2. the container is non-deletable while shards have been found
# but not yet cleaved; and
# 3. So we have a rough idea of size of the shards should be
# while cleaving.
found_ranges.append( found_ranges.append(
{'index': index, {'index': index,
'lower': str(last_shard_upper), 'lower': str(last_shard_upper),

View File

@ -40,7 +40,7 @@ from swift.common.utils import get_logger, config_true_value, \
Everything, config_auto_int_value, ShardRangeList, config_percent_value Everything, config_auto_int_value, ShardRangeList, config_percent_value
from swift.container.backend import ContainerBroker, \ from swift.container.backend import ContainerBroker, \
RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \ RECORD_TYPE_SHARD, UNSHARDED, SHARDING, SHARDED, COLLAPSED, \
SHARD_UPDATE_STATES, sift_shard_ranges SHARD_UPDATE_STATES, sift_shard_ranges, SHARD_UPDATE_STAT_STATES
from swift.container.replicator import ContainerReplicator from swift.container.replicator import ContainerReplicator
@ -2260,20 +2260,29 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
if not own_shard_range: if not own_shard_range:
return return
# do a reclaim *now* in order to get best estimate of tombstone count # Don't update the osr stats including tombstones unless its CLEAVED+
# that is consistent with the current object_count if own_shard_range.state in SHARD_UPDATE_STAT_STATES:
reclaimer = self._reclaim(broker) # do a reclaim *now* in order to get best estimate of tombstone
tombstones = reclaimer.get_tombstone_count() # count that is consistent with the current object_count
self.logger.debug('tombstones in %s = %d', reclaimer = self._reclaim(broker)
quote(broker.path), tombstones) tombstones = reclaimer.get_tombstone_count()
own_shard_range.update_tombstones(tombstones) self.logger.debug('tombstones in %s = %d',
update_own_shard_range_stats(broker, own_shard_range) quote(broker.path), tombstones)
# shrinking candidates are found in the root DB so that's the only
# place we need up to date tombstone stats.
own_shard_range.update_tombstones(tombstones)
update_own_shard_range_stats(broker, own_shard_range)
if not own_shard_range.reported:
broker.merge_shard_ranges(own_shard_range)
# we can't use `state not in SHARD_UPDATE_STAT_STATES` to return
# because there are cases we still want to update root even if the
# stats are wrong. Such as it's a new shard or something else has
# decided to remove the latch to update root.
if own_shard_range.reported: if own_shard_range.reported:
# no change to the stats metadata
return return
# stats metadata has been updated so persist it
broker.merge_shard_ranges(own_shard_range)
# now get a consistent list of own and other shard ranges # now get a consistent list of own and other shard ranges
shard_ranges = broker.get_shard_ranges( shard_ranges = broker.get_shard_ranges(
include_own=True, include_own=True,

View File

@ -2935,7 +2935,7 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
self.sharders.once(**kwargs) self.sharders.once(**kwargs)
def test_manage_shard_ranges(self): def test_manage_shard_ranges(self):
obj_names = self._make_object_names(7) obj_names = self._make_object_names(10)
self.put_objects(obj_names) self.put_objects(obj_names)
client.post_container(self.url, self.admin_token, self.container_name, client.post_container(self.url, self.admin_token, self.container_name,
@ -2954,17 +2954,38 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
'swift-manage-shard-ranges', 'swift-manage-shard-ranges',
self.get_db_file(self.brain.part, self.brain.nodes[0]), self.get_db_file(self.brain.part, self.brain.nodes[0]),
'find_and_replace', '3', '--enable', '--minimum-shard-size', '2']) 'find_and_replace', '3', '--enable', '--minimum-shard-size', '2'])
self.assert_container_state(self.brain.nodes[0], 'unsharded', 2) self.assert_container_state(self.brain.nodes[0], 'unsharded', 3)
# "Run container-replicator to replicate them to other nodes." # "Run container-replicator to replicate them to other nodes."
self.replicators.once() self.replicators.once()
# "Run container-sharder on all nodes to shard the container." # "Run container-sharder on all nodes to shard the container."
# first pass cleaves 2 shards
self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
self.assert_container_state(self.brain.nodes[0], 'sharding', 3)
self.assert_container_state(self.brain.nodes[1], 'sharding', 3)
shard_ranges = self.assert_container_state(
self.brain.nodes[2], 'sharding', 3)
self.assert_container_listing(obj_names)
# make the un-cleaved shard update the root container...
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
shard_part, nodes = self.get_part_and_node_numbers(shard_ranges[2])
self.sharders_once(additional_args='--partitions=%s' % shard_part)
shard_ranges = self.assert_container_state(
self.brain.nodes[2], 'sharding', 3)
# ...it does not report zero-stats despite being empty, because it has
# not yet reached CLEAVED state
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
# second pass cleaves final shard
self.sharders_once(additional_args='--partitions=%s' % self.brain.part) self.sharders_once(additional_args='--partitions=%s' % self.brain.part)
# Everybody's settled # Everybody's settled
self.assert_container_state(self.brain.nodes[0], 'sharded', 2) self.assert_container_state(self.brain.nodes[0], 'sharded', 3)
self.assert_container_state(self.brain.nodes[1], 'sharded', 2) self.assert_container_state(self.brain.nodes[1], 'sharded', 3)
self.assert_container_state(self.brain.nodes[2], 'sharded', 2) shard_ranges = self.assert_container_state(
self.brain.nodes[2], 'sharded', 3)
self.assertEqual([3, 3, 4], [sr.object_count for sr in shard_ranges])
self.assert_container_listing(obj_names) self.assert_container_listing(obj_names)
def test_manage_shard_ranges_compact(self): def test_manage_shard_ranges_compact(self):

View File

@ -1588,6 +1588,7 @@ class TestSharder(BaseTestSharder):
# verify that objects are not missed if shard ranges change between # verify that objects are not missed if shard ranges change between
# cleaving batches # cleaving batches
broker = self._make_broker() broker = self._make_broker()
# this root db has very few object rows...
objects = [ objects = [
('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0), ('a', self.ts_encoded(), 10, 'text/plain', 'etag_a', 0, 0),
('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0), ('b', self.ts_encoded(), 10, 'text/plain', 'etag_b', 0, 0),
@ -1603,8 +1604,10 @@ class TestSharder(BaseTestSharder):
broker.enable_sharding(Timestamp.now()) broker.enable_sharding(Timestamp.now())
shard_bounds = (('', 'd'), ('d', 'x'), ('x', '')) shard_bounds = (('', 'd'), ('d', 'x'), ('x', ''))
# shard ranges start life with object count that is typically much
# larger than this DB's object population...
shard_ranges = self._make_shard_ranges( shard_ranges = self._make_shard_ranges(
shard_bounds, state=ShardRange.CREATED) shard_bounds, state=ShardRange.CREATED, object_count=500000)
expected_shard_dbs = [] expected_shard_dbs = []
for shard_range in shard_ranges: for shard_range in shard_ranges:
db_hash = hash_path(shard_range.account, shard_range.container) db_hash = hash_path(shard_range.account, shard_range.container)
@ -1634,8 +1637,8 @@ class TestSharder(BaseTestSharder):
updated_shard_ranges = broker.get_shard_ranges() updated_shard_ranges = broker.get_shard_ranges()
self.assertEqual(3, len(updated_shard_ranges)) self.assertEqual(3, len(updated_shard_ranges))
# first 2 shard ranges should have updated object count, bytes used and # now they have reached CLEAVED state, the first 2 shard ranges should
# meta_timestamp # have updated object count, bytes used and meta_timestamp
shard_ranges[0].bytes_used = 23 shard_ranges[0].bytes_used = 23
shard_ranges[0].object_count = 4 shard_ranges[0].object_count = 4
shard_ranges[0].state = ShardRange.CLEAVED shard_ranges[0].state = ShardRange.CLEAVED
@ -1646,6 +1649,11 @@ class TestSharder(BaseTestSharder):
self._check_shard_range(shard_ranges[1], updated_shard_ranges[1]) self._check_shard_range(shard_ranges[1], updated_shard_ranges[1])
self._check_objects(objects[:4], expected_shard_dbs[0]) self._check_objects(objects[:4], expected_shard_dbs[0])
self._check_objects(objects[4:7], expected_shard_dbs[1]) self._check_objects(objects[4:7], expected_shard_dbs[1])
# the actual object counts were set in the new shard brokers' own_sr's
shard_broker = ContainerBroker(expected_shard_dbs[0])
self.assertEqual(4, shard_broker.get_own_shard_range().object_count)
shard_broker = ContainerBroker(expected_shard_dbs[1])
self.assertEqual(2, shard_broker.get_own_shard_range().object_count)
self.assertFalse(os.path.exists(expected_shard_dbs[2])) self.assertFalse(os.path.exists(expected_shard_dbs[2]))
# third shard range should be unchanged - not yet cleaved # third shard range should be unchanged - not yet cleaved
@ -5276,6 +5284,7 @@ class TestSharder(BaseTestSharder):
def test_update_root_container_own_range(self): def test_update_root_container_own_range(self):
broker = self._make_broker() broker = self._make_broker()
obj_names = []
# nothing to send # nothing to send
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
@ -5290,16 +5299,27 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges([own_shard_range]) broker.merge_shard_ranges([own_shard_range])
# add an object, expect to see it reflected in the own shard range # add an object, expect to see it reflected in the own shard range
# that is sent # that is sent
broker.put_object(str(own_shard_range.object_count + 1), obj_names.append(uuid4())
broker.put_object(str(obj_names[-1]),
next(self.ts_iter).internal, 1, '', '') next(self.ts_iter).internal, 1, '', '')
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
# force own shard range meta updates to be at fixed timestamp # check if the state if in SHARD_UPDATE_STAT_STATES
expected_sent = [ if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
dict(own_shard_range, ShardRange.SHARDING, ShardRange.SHARDED,
meta_timestamp=now.internal, ShardRange.SHRINKING, ShardRange.SHRUNK]:
object_count=own_shard_range.object_count + 1, exp_obj_count = len(obj_names)
bytes_used=own_shard_range.bytes_used + 1)] expected_sent = [
dict(own_shard_range,
meta_timestamp=now.internal,
object_count=len(obj_names),
bytes_used=len(obj_names))]
else:
exp_obj_count = own_shard_range.object_count
expected_sent = [
dict(own_shard_range)]
self.check_shard_ranges_sent(broker, expected_sent) self.check_shard_ranges_sent(broker, expected_sent)
self.assertEqual(
exp_obj_count, broker.get_own_shard_range().object_count)
# initialise tombstones # initialise tombstones
with mock_timestamp_now(next(self.ts_iter)): with mock_timestamp_now(next(self.ts_iter)):
@ -5311,6 +5331,8 @@ class TestSharder(BaseTestSharder):
with annotate_failure(state): with annotate_failure(state):
check_only_own_shard_range_sent(state) check_only_own_shard_range_sent(state)
init_obj_count = len(obj_names)
def check_tombstones_sent(state): def check_tombstones_sent(state):
own_shard_range = broker.get_own_shard_range() own_shard_range = broker.get_own_shard_range()
self.assertTrue(own_shard_range.update_state( self.assertTrue(own_shard_range.update_state(
@ -5318,19 +5340,25 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges([own_shard_range]) broker.merge_shard_ranges([own_shard_range])
# delete an object, expect to see it reflected in the own shard # delete an object, expect to see it reflected in the own shard
# range that is sent # range that is sent
broker.delete_object(str(own_shard_range.object_count), broker.delete_object(str(obj_names.pop(-1)),
next(self.ts_iter).internal) next(self.ts_iter).internal)
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
# force own shard range meta updates to be at fixed timestamp # check if the state if in SHARD_UPDATE_STAT_STATES
expected_sent = [ if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
dict(own_shard_range, ShardRange.SHARDING, ShardRange.SHARDED,
meta_timestamp=now.internal, ShardRange.SHRINKING, ShardRange.SHRUNK]:
object_count=own_shard_range.object_count - 1, expected_sent = [
bytes_used=own_shard_range.bytes_used - 1, dict(own_shard_range,
tombstones=own_shard_range.tombstones + 1)] meta_timestamp=now.internal,
object_count=len(obj_names),
bytes_used=len(obj_names),
tombstones=init_obj_count - len(obj_names))]
else:
expected_sent = [
dict(own_shard_range)]
self.check_shard_ranges_sent(broker, expected_sent) self.check_shard_ranges_sent(broker, expected_sent)
for state in ShardRange.STATES: for i, state in enumerate(ShardRange.STATES):
with annotate_failure(state): with annotate_failure(state):
check_tombstones_sent(state) check_tombstones_sent(state)
@ -5371,6 +5399,7 @@ class TestSharder(BaseTestSharder):
other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', ''))) other_shard_ranges = self._make_shard_ranges((('', 'h'), ('h', '')))
self.assertTrue(other_shard_ranges[0].set_deleted()) self.assertTrue(other_shard_ranges[0].set_deleted())
broker.merge_shard_ranges(other_shard_ranges) broker.merge_shard_ranges(other_shard_ranges)
obj_names = []
# own range missing - send nothing # own range missing - send nothing
with self._mock_sharder() as sharder: with self._mock_sharder() as sharder:
@ -5385,17 +5414,22 @@ class TestSharder(BaseTestSharder):
broker.merge_shard_ranges([own_shard_range]) broker.merge_shard_ranges([own_shard_range])
# add an object, expect to see it reflected in the own shard range # add an object, expect to see it reflected in the own shard range
# that is sent # that is sent
broker.put_object(str(own_shard_range.object_count + 1), obj_names.append(uuid4())
broker.put_object(str(obj_names[-1]),
next(self.ts_iter).internal, 1, '', '') next(self.ts_iter).internal, 1, '', '')
with mock_timestamp_now() as now: with mock_timestamp_now() as now:
shard_ranges = broker.get_shard_ranges(include_deleted=True) shard_ranges = broker.get_shard_ranges(include_deleted=True)
expected_sent = sorted([ exp_own_shard_range = own_shard_range.copy()
own_shard_range.copy( # check if the state if in SHARD_UPDATE_STAT_STATES
meta_timestamp=now.internal, if state in [ShardRange.CLEAVED, ShardRange.ACTIVE,
object_count=own_shard_range.object_count + 1, ShardRange.SHARDING, ShardRange.SHARDED,
bytes_used=own_shard_range.bytes_used + 1, ShardRange.SHRINKING, ShardRange.SHRUNK]:
tombstones=0)] + exp_own_shard_range.object_count = len(obj_names)
shard_ranges, exp_own_shard_range.bytes_used = len(obj_names)
exp_own_shard_range.meta_timestamp = now.internal
exp_own_shard_range.tombstones = 0
expected_sent = sorted(
[exp_own_shard_range] + shard_ranges,
key=lambda sr: (sr.upper, sr.state, sr.lower)) key=lambda sr: (sr.upper, sr.state, sr.lower))
self.check_shard_ranges_sent( self.check_shard_ranges_sent(
broker, [dict(sr) for sr in expected_sent]) broker, [dict(sr) for sr in expected_sent])