sharder: ensure that misplaced tombstone rows are moved
The sharder's misplaced objects handler should iterate through misplaced objects in name order trying to place them in destination shard ranges. The destination shard ranges are consumed in order as misplaced objects are placed in them, and once the destination ranges are exhausted no more misplaced objects are placed. However, previously, undeleted objects were yielded in name order first, followed by deleted objects in name order, meaning that the destination shard ranges could be completely consumed placing undeleted objects. Misplaced deleted objects would then not be placed. The same would repeat on every attempt until there were no undeleted objects, or placing undeleted objects did not consume all the destination shard ranges. There is no guarantee this would ever happen. The fix is to yield objects in name order, as intended, with undeleted and deleted objects mingled in the same iterator. Also, make debug message more accurate. Change-Id: Ie8404f0c7e84d3916f0e0fa62afc54f1f43a4d06
This commit is contained in:
parent
75c5dbc29a
commit
019c955e1c
@ -1214,7 +1214,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length.
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both
|
||||
deleted and undeleted objects are included.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
@ -1223,27 +1224,26 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
the given row id; by default all rows are included.
|
||||
:return: a generator of tuples of (list of objects, broker info dict)
|
||||
"""
|
||||
for include_deleted in (False, True):
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=include_deleted,
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
time.time() - start)
|
||||
yield objects, info
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=None, # give me everything
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
time.time() - start)
|
||||
yield objects, info
|
||||
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
|
||||
def yield_objects_to_shard_range(self, broker, src_shard_range,
|
||||
dest_shard_ranges):
|
||||
@ -1398,7 +1398,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
|
||||
self._increment_stat('misplaced', 'placed', step=placed)
|
||||
self._increment_stat('misplaced', 'unplaced', step=unplaced)
|
||||
return success, placed + unplaced
|
||||
return success, placed, unplaced
|
||||
|
||||
def _make_shard_range_fetcher(self, broker, src_shard_range):
|
||||
# returns a function that will lazy load shard ranges on demand;
|
||||
@ -1484,19 +1484,21 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.logger.debug('misplaced object source bounds %s' % src_bounds)
|
||||
policy_index = broker.storage_policy_index
|
||||
success = True
|
||||
num_found = 0
|
||||
num_placed = num_unplaced = 0
|
||||
for src_shard_range in src_ranges:
|
||||
part_success, part_num_found = self._move_objects(
|
||||
part_success, part_placed, part_unplaced = self._move_objects(
|
||||
src_broker, src_shard_range, policy_index,
|
||||
self._make_shard_range_fetcher(broker, src_shard_range))
|
||||
success &= part_success
|
||||
num_found += part_num_found
|
||||
num_placed += part_placed
|
||||
num_unplaced += part_unplaced
|
||||
|
||||
if num_found:
|
||||
if num_placed or num_unplaced:
|
||||
# the found stat records the number of DBs in which any misplaced
|
||||
# rows were found, not the total number of misplaced rows
|
||||
self._increment_stat('misplaced', 'found', statsd=True)
|
||||
self.logger.debug('Moved %s misplaced objects' % num_found)
|
||||
self.logger.debug('Placed %s misplaced objects (%s unplaced)',
|
||||
num_placed, num_unplaced)
|
||||
self._increment_stat('misplaced', 'success' if success else 'failure')
|
||||
self.logger.debug('Finished handling misplaced objects')
|
||||
return success
|
||||
|
@ -3403,8 +3403,9 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
# and then more misplaced updates arrive
|
||||
newer_objects = [
|
||||
['a', self.ts_encoded(), 51, 'text/plain', 'etag_a', 0, 0],
|
||||
['a-deleted', self.ts_encoded(), 51, 'text/plain', 'etag_a', 1, 0],
|
||||
['z', self.ts_encoded(), 52, 'text/plain', 'etag_z', 0, 0],
|
||||
['z-deleted', self.ts_encoded(), 52, 'text/plain', 'etag_z', 1, 0],
|
||||
]
|
||||
for obj in newer_objects:
|
||||
broker.put_object(*obj)
|
||||
@ -3420,7 +3421,7 @@ class TestSharder(BaseTestSharder):
|
||||
any_order=True
|
||||
)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||
'found': 1, 'placed': 2, 'unplaced': 0}
|
||||
'found': 1, 'placed': 3, 'unplaced': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||
self.assertEqual(
|
||||
1, sharder.logger.get_increment_counts()['misplaced_found'])
|
||||
|
Loading…
x
Reference in New Issue
Block a user