Merge "sharder: process deleted DBs"
This commit is contained in:
commit
bd63970365
@ -1046,12 +1046,14 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
headers=headers, contents=body)
|
headers=headers, contents=body)
|
||||||
except DirectClientException as err:
|
except DirectClientException as err:
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Failed to put shard ranges to %s:%s/%s: %s',
|
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
|
||||||
node['ip'], node['port'], node['device'], err.http_status)
|
node['ip'], node['port'], node['device'],
|
||||||
|
quote(account), quote(container), err.http_status)
|
||||||
except (Exception, Timeout) as err:
|
except (Exception, Timeout) as err:
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
'Failed to put shard ranges to %s:%s/%s: %s',
|
'Failed to put shard ranges to %s:%s/%s %s/%s: %s',
|
||||||
node['ip'], node['port'], node['device'], err)
|
node['ip'], node['port'], node['device'],
|
||||||
|
quote(account), quote(container), err)
|
||||||
else:
|
else:
|
||||||
return True
|
return True
|
||||||
return False
|
return False
|
||||||
@ -2071,8 +2073,10 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
def _process_broker(self, broker, node, part):
|
def _process_broker(self, broker, node, part):
|
||||||
broker.get_info() # make sure account/container are populated
|
broker.get_info() # make sure account/container are populated
|
||||||
state = broker.get_db_state()
|
state = broker.get_db_state()
|
||||||
self.logger.debug('Starting processing %s state %s',
|
is_deleted = broker.is_deleted()
|
||||||
quote(broker.path), state)
|
self.logger.debug('Starting processing %s state %s%s',
|
||||||
|
quote(broker.path), state,
|
||||||
|
' (deleted)' if is_deleted else '')
|
||||||
|
|
||||||
if not self._audit_container(broker):
|
if not self._audit_container(broker):
|
||||||
return
|
return
|
||||||
@ -2080,13 +2084,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# now look and deal with misplaced objects.
|
# now look and deal with misplaced objects.
|
||||||
self._move_misplaced_objects(broker)
|
self._move_misplaced_objects(broker)
|
||||||
|
|
||||||
if broker.is_deleted():
|
is_leader = node['index'] == 0 and self.auto_shard and not is_deleted
|
||||||
# This container is deleted so we can skip it. We still want
|
|
||||||
# deleted containers to go via misplaced items because they may
|
|
||||||
# have new objects sitting in them that may need to move.
|
|
||||||
return
|
|
||||||
|
|
||||||
is_leader = node['index'] == 0 and self.auto_shard
|
|
||||||
if state in (UNSHARDED, COLLAPSED):
|
if state in (UNSHARDED, COLLAPSED):
|
||||||
if is_leader and broker.is_root_container():
|
if is_leader and broker.is_root_container():
|
||||||
# bootstrap sharding of root container
|
# bootstrap sharding of root container
|
||||||
@ -2138,6 +2136,7 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
self.logger.debug('Remaining in sharding state %s',
|
self.logger.debug('Remaining in sharding state %s',
|
||||||
quote(broker.path))
|
quote(broker.path))
|
||||||
|
|
||||||
|
if not broker.is_deleted():
|
||||||
if state == SHARDED and broker.is_root_container():
|
if state == SHARDED and broker.is_root_container():
|
||||||
# look for shrink stats
|
# look for shrink stats
|
||||||
self._identify_shrinking_candidate(broker, node)
|
self._identify_shrinking_candidate(broker, node)
|
||||||
@ -2159,8 +2158,9 @@ class ContainerSharder(ContainerSharderConf, ContainerReplicator):
|
|||||||
# simultaneously become deleted.
|
# simultaneously become deleted.
|
||||||
self._update_root_container(broker)
|
self._update_root_container(broker)
|
||||||
|
|
||||||
self.logger.debug('Finished processing %s state %s',
|
self.logger.debug('Finished processing %s state %s%s',
|
||||||
quote(broker.path), broker.get_db_state())
|
quote(broker.path), broker.get_db_state(),
|
||||||
|
' (deleted)' if is_deleted else '')
|
||||||
|
|
||||||
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
|
def _one_shard_cycle(self, devices_to_shard, partitions_to_shard):
|
||||||
"""
|
"""
|
||||||
|
@ -367,13 +367,17 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
self.fail('No shard sysmeta found in %s' % headers)
|
self.fail('No shard sysmeta found in %s' % headers)
|
||||||
|
|
||||||
def assert_container_state(self, node, expected_state, num_shard_ranges,
|
def assert_container_state(self, node, expected_state, num_shard_ranges,
|
||||||
account=None, container=None, part=None):
|
account=None, container=None, part=None,
|
||||||
|
override_deleted=False):
|
||||||
account = account or self.account
|
account = account or self.account
|
||||||
container = container or self.container_to_shard
|
container = container or self.container_to_shard
|
||||||
part = part or self.brain.part
|
part = part or self.brain.part
|
||||||
|
headers = {'X-Backend-Record-Type': 'shard'}
|
||||||
|
if override_deleted:
|
||||||
|
headers['x-backend-override-deleted'] = True
|
||||||
headers, shard_ranges = direct_client.direct_get_container(
|
headers, shard_ranges = direct_client.direct_get_container(
|
||||||
node, part, account, container,
|
node, part, account, container,
|
||||||
headers={'X-Backend-Record-Type': 'shard'})
|
headers=headers)
|
||||||
self.assertEqual(num_shard_ranges, len(shard_ranges))
|
self.assertEqual(num_shard_ranges, len(shard_ranges))
|
||||||
self.assertIn('X-Backend-Sharding-State', headers)
|
self.assertIn('X-Backend-Sharding-State', headers)
|
||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
@ -401,12 +405,17 @@ class BaseTestContainerSharding(ReplProbeTest):
|
|||||||
shard_range.account, shard_range.container)
|
shard_range.account, shard_range.container)
|
||||||
return part, [n['id'] + 1 for n in nodes]
|
return part, [n['id'] + 1 for n in nodes]
|
||||||
|
|
||||||
def run_sharders(self, shard_ranges):
|
def run_sharders(self, shard_ranges, exclude_partitions=None):
|
||||||
"""Run the sharder on partitions for given shard ranges."""
|
"""Run the sharder on partitions for given shard ranges."""
|
||||||
if not isinstance(shard_ranges, (list, tuple, set)):
|
if not isinstance(shard_ranges, (list, tuple, set)):
|
||||||
shard_ranges = (shard_ranges,)
|
shard_ranges = (shard_ranges,)
|
||||||
partitions = ','.join(str(self.get_part_and_node_numbers(sr)[0])
|
exclude_partitions = exclude_partitions or []
|
||||||
for sr in shard_ranges)
|
shard_parts = []
|
||||||
|
for sr in shard_ranges:
|
||||||
|
sr_part = self.get_part_and_node_numbers(sr)[0]
|
||||||
|
if sr_part not in exclude_partitions:
|
||||||
|
shard_parts.append(str(sr_part))
|
||||||
|
partitions = ','.join(shard_parts)
|
||||||
self.sharders.once(additional_args='--partitions=%s' % partitions)
|
self.sharders.once(additional_args='--partitions=%s' % partitions)
|
||||||
|
|
||||||
def run_sharder_sequentially(self, shard_range=None):
|
def run_sharder_sequentially(self, shard_range=None):
|
||||||
@ -3586,3 +3595,187 @@ class TestManagedContainerSharding(BaseTestContainerSharding):
|
|||||||
|
|
||||||
self.assert_container_listing(
|
self.assert_container_listing(
|
||||||
[obj_names[0], new_obj_name] + obj_names[1:])
|
[obj_names[0], new_obj_name] + obj_names[1:])
|
||||||
|
|
||||||
|
def test_manage_shard_ranges_unsharded_deleted_root(self):
|
||||||
|
# verify that a deleted DB will still be sharded
|
||||||
|
|
||||||
|
# choose a node that will not be sharded initially
|
||||||
|
sharded_nodes = []
|
||||||
|
unsharded_node = None
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
if self.brain.node_numbers[node['index']] \
|
||||||
|
in self.brain.handoff_numbers:
|
||||||
|
unsharded_node = node
|
||||||
|
else:
|
||||||
|
sharded_nodes.append(node)
|
||||||
|
|
||||||
|
# put some objects - not enough to trigger auto-sharding
|
||||||
|
obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1)
|
||||||
|
self.put_objects(obj_names)
|
||||||
|
|
||||||
|
# run replicators first time to get sync points set and commit updates
|
||||||
|
self.replicators.once()
|
||||||
|
|
||||||
|
# setup sharding...
|
||||||
|
self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
self.get_db_file(self.brain.part, sharded_nodes[0]),
|
||||||
|
'find_and_replace', '2', '--enable', '--minimum-shard-size', '1'])
|
||||||
|
|
||||||
|
# Run container-replicator to replicate shard ranges
|
||||||
|
self.container_replicators.once()
|
||||||
|
self.assert_container_state(sharded_nodes[0], 'unsharded', 2)
|
||||||
|
self.assert_container_state(sharded_nodes[1], 'unsharded', 2)
|
||||||
|
self.assert_container_state(unsharded_node, 'unsharded', 2)
|
||||||
|
|
||||||
|
# Run container-sharder to shard the 2 primary replicas that did
|
||||||
|
# receive the object PUTs
|
||||||
|
for num in self.brain.primary_numbers:
|
||||||
|
self.sharders_once(
|
||||||
|
number=num,
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
# delete the objects - the proxy's will have cached container info with
|
||||||
|
# out-of-date db_state=unsharded, so updates go to the root DBs
|
||||||
|
self.delete_objects(obj_names)
|
||||||
|
# deal with DELETE's being misplaced in root db's...
|
||||||
|
for num in self.brain.primary_numbers:
|
||||||
|
self.sharders_once(
|
||||||
|
number=num,
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
self.assert_container_state(sharded_nodes[0], 'sharded', 2)
|
||||||
|
self.assert_container_state(sharded_nodes[1], 'sharded', 2)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
unsharded_node, 'unsharded', 2)
|
||||||
|
|
||||||
|
# get root stats updated - but avoid sharding the remaining root DB
|
||||||
|
self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part])
|
||||||
|
self.assert_container_listing([])
|
||||||
|
|
||||||
|
# delete the empty container
|
||||||
|
client.delete_container(self.url, self.admin_token,
|
||||||
|
self.container_name)
|
||||||
|
|
||||||
|
# sanity check - unsharded DB is deleted
|
||||||
|
broker = self.get_broker(self.brain.part, unsharded_node,
|
||||||
|
self.account, self.container_name)
|
||||||
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
self.assertTrue(broker.is_deleted())
|
||||||
|
self.assertEqual(0, broker.get_info()['object_count'])
|
||||||
|
self.assertEqual(0, broker.get_shard_usage()['object_count'])
|
||||||
|
|
||||||
|
# now shard the final DB
|
||||||
|
for num in self.brain.handoff_numbers:
|
||||||
|
self.sharders_once(
|
||||||
|
number=num,
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
# all DBs should now be sharded and still deleted
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
with annotate_failure(
|
||||||
|
'node %s in %s'
|
||||||
|
% (node['index'], [n['index'] for n in self.brain.nodes])):
|
||||||
|
self.assert_container_state(node, 'sharded', 2,
|
||||||
|
override_deleted=True)
|
||||||
|
broker = self.get_broker(self.brain.part, node,
|
||||||
|
self.account, self.container_name)
|
||||||
|
self.assertEqual(SHARDED, broker.get_db_state())
|
||||||
|
self.assertEqual(0, broker.get_info()['object_count'])
|
||||||
|
self.assertEqual(0,
|
||||||
|
broker.get_shard_usage()['object_count'])
|
||||||
|
self.assertTrue(broker.is_deleted())
|
||||||
|
|
||||||
|
def test_manage_shard_ranges_unsharded_deleted_root_gets_undeleted(self):
|
||||||
|
# verify that an apparently deleted DB (no object rows in root db) will
|
||||||
|
# still be sharded and also become undeleted when objects are
|
||||||
|
# discovered in the shards
|
||||||
|
|
||||||
|
# choose a node that will not be sharded initially
|
||||||
|
sharded_nodes = []
|
||||||
|
unsharded_node = None
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
if self.brain.node_numbers[node['index']] \
|
||||||
|
in self.brain.handoff_numbers:
|
||||||
|
unsharded_node = node
|
||||||
|
else:
|
||||||
|
sharded_nodes.append(node)
|
||||||
|
|
||||||
|
# put some objects, but only to 2 replicas - not enough to trigger
|
||||||
|
# auto-sharding
|
||||||
|
self.brain.stop_handoff_half()
|
||||||
|
|
||||||
|
obj_names = self._make_object_names(MIN_SHARD_CONTAINER_THRESHOLD - 1)
|
||||||
|
self.put_objects(obj_names)
|
||||||
|
# run replicators first time to get sync points set and commit puts
|
||||||
|
self.replicators.once()
|
||||||
|
|
||||||
|
# setup sharding...
|
||||||
|
self.assert_subprocess_success([
|
||||||
|
'swift-manage-shard-ranges',
|
||||||
|
self.get_db_file(self.brain.part, sharded_nodes[0]),
|
||||||
|
'find_and_replace', '2', '--enable', '--minimum-shard-size', '1'])
|
||||||
|
|
||||||
|
# Run container-replicator to replicate shard ranges - object rows will
|
||||||
|
# not be sync'd now there are shard ranges
|
||||||
|
for num in self.brain.primary_numbers:
|
||||||
|
self.container_replicators.once(number=num)
|
||||||
|
self.assert_container_state(sharded_nodes[0], 'unsharded', 2)
|
||||||
|
self.assert_container_state(sharded_nodes[1], 'unsharded', 2)
|
||||||
|
|
||||||
|
# revive the stopped node
|
||||||
|
self.brain.start_handoff_half()
|
||||||
|
self.assert_container_state(unsharded_node, 'unsharded', 0)
|
||||||
|
|
||||||
|
# delete the empty replica
|
||||||
|
direct_client.direct_delete_container(
|
||||||
|
unsharded_node, self.brain.part, self.account,
|
||||||
|
self.container_name)
|
||||||
|
|
||||||
|
# Run container-sharder to shard the 2 primary replicas that did
|
||||||
|
# receive the object PUTs
|
||||||
|
for num in self.brain.primary_numbers:
|
||||||
|
self.sharders_once(
|
||||||
|
number=num,
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
|
||||||
|
self.assert_container_state(sharded_nodes[0], 'sharded', 2)
|
||||||
|
self.assert_container_state(sharded_nodes[1], 'sharded', 2)
|
||||||
|
# the sharder syncs shard ranges ...
|
||||||
|
self.assert_container_state(unsharded_node, 'unsharded', 2,
|
||||||
|
override_deleted=True)
|
||||||
|
|
||||||
|
# sanity check - unsharded DB is empty and deleted
|
||||||
|
broker = self.get_broker(self.brain.part, unsharded_node,
|
||||||
|
self.account, self.container_name)
|
||||||
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
self.assertEqual(0, broker.get_info()['object_count'])
|
||||||
|
# the shard ranges do have object count but are in CREATED state so
|
||||||
|
# not reported in shard usage...
|
||||||
|
self.assertEqual(0, broker.get_shard_usage()['object_count'])
|
||||||
|
self.assertTrue(broker.is_deleted())
|
||||||
|
|
||||||
|
# now shard the final DB
|
||||||
|
for num in self.brain.handoff_numbers:
|
||||||
|
self.sharders_once(
|
||||||
|
number=num,
|
||||||
|
additional_args='--partitions=%s' % self.brain.part)
|
||||||
|
shard_ranges = self.assert_container_state(
|
||||||
|
unsharded_node, 'sharded', 2, override_deleted=True)
|
||||||
|
|
||||||
|
# and get roots updated and sync'd
|
||||||
|
self.container_replicators.once()
|
||||||
|
self.run_sharders(shard_ranges, exclude_partitions=[self.brain.part])
|
||||||
|
|
||||||
|
# all DBs should now be sharded and NOT deleted
|
||||||
|
for node in self.brain.nodes:
|
||||||
|
with annotate_failure(
|
||||||
|
'node %s in %s'
|
||||||
|
% (node['index'], [n['index'] for n in self.brain.nodes])):
|
||||||
|
broker = self.get_broker(self.brain.part, node,
|
||||||
|
self.account, self.container_name)
|
||||||
|
self.assertEqual(SHARDED, broker.get_db_state())
|
||||||
|
self.assertEqual(3, broker.get_info()['object_count'])
|
||||||
|
self.assertEqual(3,
|
||||||
|
broker.get_shard_usage()['object_count'])
|
||||||
|
self.assertFalse(broker.is_deleted())
|
||||||
|
@ -68,7 +68,7 @@ class BaseTestSharder(unittest.TestCase):
|
|||||||
[dict(sr) for sr in actual])
|
[dict(sr) for sr in actual])
|
||||||
|
|
||||||
def _make_broker(self, account='a', container='c', epoch=None,
|
def _make_broker(self, account='a', container='c', epoch=None,
|
||||||
device='sda', part=0, hash_=None):
|
device='sda', part=0, hash_=None, put_timestamp=None):
|
||||||
hash_ = hash_ or md5(
|
hash_ = hash_ or md5(
|
||||||
container.encode('utf-8'), usedforsecurity=False).hexdigest()
|
container.encode('utf-8'), usedforsecurity=False).hexdigest()
|
||||||
datadir = os.path.join(
|
datadir = os.path.join(
|
||||||
@ -81,7 +81,7 @@ class BaseTestSharder(unittest.TestCase):
|
|||||||
broker = ContainerBroker(
|
broker = ContainerBroker(
|
||||||
db_file, account=account, container=container,
|
db_file, account=account, container=container,
|
||||||
logger=self.logger)
|
logger=self.logger)
|
||||||
broker.initialize()
|
broker.initialize(put_timestamp=put_timestamp)
|
||||||
return broker
|
return broker
|
||||||
|
|
||||||
def _make_old_style_sharding_broker(self, account='a', container='c',
|
def _make_old_style_sharding_broker(self, account='a', container='c',
|
||||||
@ -4784,37 +4784,62 @@ class TestSharder(BaseTestSharder):
|
|||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
broker.logger.clear()
|
broker.logger.clear()
|
||||||
|
|
||||||
def _check_process_broker_sharding_no_others(self, state):
|
def _check_process_broker_sharding_no_others(self, start_state, deleted):
|
||||||
# verify that when existing own_shard_range has given state and there
|
# verify that when existing own_shard_range has given state and there
|
||||||
# are other shard ranges then the sharding process will begin
|
# are other shard ranges then the sharding process will complete
|
||||||
broker = self._make_broker(hash_='hash%s' % state)
|
broker = self._make_broker(hash_='hash%s%s' % (start_state, deleted))
|
||||||
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
'index': 0}
|
'index': 0}
|
||||||
own_sr = broker.get_own_shard_range()
|
own_sr = broker.get_own_shard_range()
|
||||||
self.assertTrue(own_sr.update_state(state))
|
self.assertTrue(own_sr.update_state(start_state))
|
||||||
epoch = Timestamp.now()
|
epoch = next(self.ts_iter)
|
||||||
own_sr.epoch = epoch
|
own_sr.epoch = epoch
|
||||||
shard_ranges = self._make_shard_ranges((('', 'm'), ('m', '')))
|
shard_ranges = self._make_shard_ranges((('', 'm'), ('m', '')))
|
||||||
broker.merge_shard_ranges([own_sr] + shard_ranges)
|
broker.merge_shard_ranges([own_sr] + shard_ranges)
|
||||||
|
if deleted:
|
||||||
|
broker.delete_db(next(self.ts_iter).internal)
|
||||||
|
|
||||||
with self._mock_sharder() as sharder:
|
with self._mock_sharder() as sharder:
|
||||||
with mock.patch.object(
|
with mock.patch.object(
|
||||||
sharder, '_create_shard_containers', return_value=0):
|
sharder, '_send_shard_ranges', return_value=True):
|
||||||
with mock_timestamp_now() as now:
|
with mock_timestamp_now_with_iter(self.ts_iter):
|
||||||
sharder._audit_container = mock.MagicMock()
|
sharder._audit_container = mock.MagicMock()
|
||||||
sharder._process_broker(broker, node, 99)
|
sharder._process_broker(broker, node, 99)
|
||||||
final_own_sr = broker.get_own_shard_range(no_default=True)
|
|
||||||
|
|
||||||
self.assertEqual(dict(own_sr, meta_timestamp=now),
|
final_own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
dict(final_own_sr))
|
self.assertEqual(SHARDED, broker.get_db_state())
|
||||||
self.assertEqual(SHARDING, broker.get_db_state())
|
|
||||||
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
self.assertEqual(epoch.normal, parse_db_filename(broker.db_file)[1])
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
self.assertFalse(broker.logger.get_lines_for_level('warning'))
|
||||||
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
self.assertFalse(broker.logger.get_lines_for_level('error'))
|
||||||
|
# self.assertEqual(deleted, broker.is_deleted())
|
||||||
|
return own_sr, final_own_sr
|
||||||
|
|
||||||
def test_process_broker_sharding_with_own_shard_range_no_others(self):
|
def test_process_broker_sharding_with_own_shard_range_no_others(self):
|
||||||
self._check_process_broker_sharding_no_others(ShardRange.SHARDING)
|
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
||||||
self._check_process_broker_sharding_no_others(ShardRange.SHRINKING)
|
ShardRange.SHARDING, False)
|
||||||
|
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
||||||
|
meta_timestamp=mock.ANY)
|
||||||
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
|
# verify that deleted DBs will be sharded
|
||||||
|
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
||||||
|
ShardRange.SHARDING, True)
|
||||||
|
exp_own_sr = dict(own_sr, state=ShardRange.SHARDED,
|
||||||
|
meta_timestamp=mock.ANY)
|
||||||
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
|
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
||||||
|
ShardRange.SHRINKING, False)
|
||||||
|
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
||||||
|
meta_timestamp=mock.ANY)
|
||||||
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
|
# verify that deleted DBs will be shrunk
|
||||||
|
own_sr, final_own_sr = self._check_process_broker_sharding_no_others(
|
||||||
|
ShardRange.SHRINKING, True)
|
||||||
|
exp_own_sr = dict(own_sr, state=ShardRange.SHRUNK,
|
||||||
|
meta_timestamp=mock.ANY)
|
||||||
|
self.assertEqual(exp_own_sr, dict(final_own_sr))
|
||||||
|
|
||||||
def test_process_broker_not_sharding_others(self):
|
def test_process_broker_not_sharding_others(self):
|
||||||
# verify that sharding process will not start when own shard range is
|
# verify that sharding process will not start when own shard range is
|
||||||
@ -4907,6 +4932,83 @@ class TestSharder(BaseTestSharder):
|
|||||||
self._check_process_broker_sharding_others(ShardRange.SHRINKING)
|
self._check_process_broker_sharding_others(ShardRange.SHRINKING)
|
||||||
self._check_process_broker_sharding_others(ShardRange.SHARDED)
|
self._check_process_broker_sharding_others(ShardRange.SHARDED)
|
||||||
|
|
||||||
|
def test_process_broker_leader_auto_shard(self):
|
||||||
|
# verify conditions for acting as auto-shard leader
|
||||||
|
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
|
||||||
|
objects = [
|
||||||
|
['obj%3d' % i, self.ts_encoded(), i, 'text/plain',
|
||||||
|
'etag%s' % i, 0] for i in range(10)]
|
||||||
|
for obj in objects:
|
||||||
|
broker.put_object(*obj)
|
||||||
|
self.assertEqual(10, broker.get_info()['object_count'])
|
||||||
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
|
'index': 0}
|
||||||
|
|
||||||
|
def do_process(conf):
|
||||||
|
with self._mock_sharder(conf) as sharder:
|
||||||
|
with mock_timestamp_now():
|
||||||
|
# we're not testing rest of the process here so prevent any
|
||||||
|
# attempt to progress shard range states
|
||||||
|
sharder._create_shard_containers = lambda *args: 0
|
||||||
|
sharder._process_broker(broker, node, 99)
|
||||||
|
|
||||||
|
# auto shard disabled
|
||||||
|
conf = {'shard_container_threshold': 10,
|
||||||
|
'rows_per_shard': 5,
|
||||||
|
'shrink_threshold': 1,
|
||||||
|
'auto_shard': False}
|
||||||
|
do_process(conf)
|
||||||
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertIsNone(own_sr)
|
||||||
|
|
||||||
|
# auto shard enabled, not node 0
|
||||||
|
conf['auto_shard'] = True
|
||||||
|
node['index'] = 1
|
||||||
|
do_process(conf)
|
||||||
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertIsNone(own_sr)
|
||||||
|
|
||||||
|
# auto shard enabled, node 0 -> start sharding
|
||||||
|
node['index'] = 0
|
||||||
|
do_process(conf)
|
||||||
|
self.assertEqual(SHARDING, broker.get_db_state())
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertIsNotNone(own_sr)
|
||||||
|
self.assertEqual(ShardRange.SHARDING, own_sr.state)
|
||||||
|
self.assertEqual(own_sr.epoch.normal,
|
||||||
|
parse_db_filename(broker.db_file)[1])
|
||||||
|
self.assertEqual(2, len(broker.get_shard_ranges()))
|
||||||
|
|
||||||
|
def test_process_broker_leader_auto_shard_deleted_db(self):
|
||||||
|
# verify no auto-shard leader if broker is deleted
|
||||||
|
conf = {'shard_container_threshold': 10,
|
||||||
|
'rows_per_shard': 5,
|
||||||
|
'shrink_threshold': 1,
|
||||||
|
'auto_shard': True}
|
||||||
|
broker = self._make_broker(put_timestamp=next(self.ts_iter).internal)
|
||||||
|
broker.delete_db(next(self.ts_iter).internal)
|
||||||
|
self.assertTrue(broker.is_deleted()) # sanity check
|
||||||
|
node = {'ip': '1.2.3.4', 'port': 6040, 'device': 'sda5', 'id': '2',
|
||||||
|
'index': 0}
|
||||||
|
|
||||||
|
with self._mock_sharder(conf) as sharder:
|
||||||
|
with mock_timestamp_now():
|
||||||
|
with mock.patch.object(
|
||||||
|
sharder, '_find_and_enable_sharding_candidates'
|
||||||
|
) as mock_find_and_enable:
|
||||||
|
sharder._process_broker(broker, node, 99)
|
||||||
|
|
||||||
|
self.assertEqual(UNSHARDED, broker.get_db_state())
|
||||||
|
own_sr = broker.get_own_shard_range(no_default=True)
|
||||||
|
self.assertIsNone(own_sr)
|
||||||
|
# this is the only concrete assertion that verifies the leader actions
|
||||||
|
# are not taken; no shard ranges would actually be found for an empty
|
||||||
|
# deleted db so there's no other way to differentiate from an undeleted
|
||||||
|
# db being processed...
|
||||||
|
mock_find_and_enable.assert_not_called()
|
||||||
|
|
||||||
def check_shard_ranges_sent(self, broker, expected_sent):
|
def check_shard_ranges_sent(self, broker, expected_sent):
|
||||||
bodies = []
|
bodies = []
|
||||||
servers = []
|
servers = []
|
||||||
|
Loading…
Reference in New Issue
Block a user