Merge "sharder: ensure that misplaced tombstone rows are moved"
This commit is contained in:
commit
507cf18f96
@ -1214,7 +1214,8 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
def yield_objects(self, broker, src_shard_range, since_row=None):
|
||||
"""
|
||||
Iterates through all objects in ``src_shard_range`` in name order
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length.
|
||||
yielding them in lists of up to CONTAINER_LISTING_LIMIT length. Both
|
||||
deleted and undeleted objects are included.
|
||||
|
||||
:param broker: A :class:`~swift.container.backend.ContainerBroker`.
|
||||
:param src_shard_range: A :class:`~swift.common.utils.ShardRange`
|
||||
@ -1223,27 +1224,26 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
the given row id; by default all rows are included.
|
||||
:return: a generator of tuples of (list of objects, broker info dict)
|
||||
"""
|
||||
for include_deleted in (False, True):
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=include_deleted,
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
time.time() - start)
|
||||
yield objects, info
|
||||
marker = src_shard_range.lower_str
|
||||
while True:
|
||||
info = broker.get_info()
|
||||
info['max_row'] = broker.get_max_row()
|
||||
start = time.time()
|
||||
objects = broker.get_objects(
|
||||
self.cleave_row_batch_size,
|
||||
marker=marker,
|
||||
end_marker=src_shard_range.end_marker,
|
||||
include_deleted=None, # give me everything
|
||||
since_row=since_row)
|
||||
if objects:
|
||||
self.logger.debug('got %s objects from %s in %ss',
|
||||
len(objects), broker.db_file,
|
||||
time.time() - start)
|
||||
yield objects, info
|
||||
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
if len(objects) < self.cleave_row_batch_size:
|
||||
break
|
||||
marker = objects[-1]['name']
|
||||
|
||||
def yield_objects_to_shard_range(self, broker, src_shard_range,
|
||||
dest_shard_ranges):
|
||||
@ -1398,7 +1398,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
|
||||
self._increment_stat('misplaced', 'placed', step=placed)
|
||||
self._increment_stat('misplaced', 'unplaced', step=unplaced)
|
||||
return success, placed + unplaced
|
||||
return success, placed, unplaced
|
||||
|
||||
def _make_shard_range_fetcher(self, broker, src_shard_range):
|
||||
# returns a function that will lazy load shard ranges on demand;
|
||||
@ -1484,19 +1484,21 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
||||
self.logger.debug('misplaced object source bounds %s' % src_bounds)
|
||||
policy_index = broker.storage_policy_index
|
||||
success = True
|
||||
num_found = 0
|
||||
num_placed = num_unplaced = 0
|
||||
for src_shard_range in src_ranges:
|
||||
part_success, part_num_found = self._move_objects(
|
||||
part_success, part_placed, part_unplaced = self._move_objects(
|
||||
src_broker, src_shard_range, policy_index,
|
||||
self._make_shard_range_fetcher(broker, src_shard_range))
|
||||
success &= part_success
|
||||
num_found += part_num_found
|
||||
num_placed += part_placed
|
||||
num_unplaced += part_unplaced
|
||||
|
||||
if num_found:
|
||||
if num_placed or num_unplaced:
|
||||
# the found stat records the number of DBs in which any misplaced
|
||||
# rows were found, not the total number of misplaced rows
|
||||
self._increment_stat('misplaced', 'found', statsd=True)
|
||||
self.logger.debug('Moved %s misplaced objects' % num_found)
|
||||
self.logger.debug('Placed %s misplaced objects (%s unplaced)',
|
||||
num_placed, num_unplaced)
|
||||
self._increment_stat('misplaced', 'success' if success else 'failure')
|
||||
self.logger.debug('Finished handling misplaced objects')
|
||||
return success
|
||||
|
@ -3403,8 +3403,9 @@ class TestSharder(BaseTestSharder):
|
||||
|
||||
# and then more misplaced updates arrive
|
||||
newer_objects = [
|
||||
['a', self.ts_encoded(), 51, 'text/plain', 'etag_a', 0, 0],
|
||||
['a-deleted', self.ts_encoded(), 51, 'text/plain', 'etag_a', 1, 0],
|
||||
['z', self.ts_encoded(), 52, 'text/plain', 'etag_z', 0, 0],
|
||||
['z-deleted', self.ts_encoded(), 52, 'text/plain', 'etag_z', 1, 0],
|
||||
]
|
||||
for obj in newer_objects:
|
||||
broker.put_object(*obj)
|
||||
@ -3420,7 +3421,7 @@ class TestSharder(BaseTestSharder):
|
||||
any_order=True
|
||||
)
|
||||
expected_stats = {'attempted': 1, 'success': 1, 'failure': 0,
|
||||
'found': 1, 'placed': 2, 'unplaced': 0}
|
||||
'found': 1, 'placed': 3, 'unplaced': 0}
|
||||
self._assert_stats(expected_stats, sharder, 'misplaced')
|
||||
self.assertEqual(
|
||||
1, sharder.logger.get_increment_counts()['misplaced_found'])
|
||||
|
Loading…
Reference in New Issue
Block a user