Merge "sharder: Prevent ValueError when no cleaving contexts"
This commit is contained in:
commit
d9a6fe4362
@ -501,14 +501,16 @@ class CleavingContext(object):
|
||||
brokers = broker.get_brokers()
|
||||
sysmeta = brokers[-1].get_sharding_sysmeta_with_timestamps()
|
||||
|
||||
contexts = []
|
||||
for key, (val, timestamp) in sysmeta.items():
|
||||
# If the value is of length 0, then the metadata is
|
||||
# If the value is blank, then the metadata is
|
||||
# marked for deletion
|
||||
if key.startswith("Context-") and len(val) > 0:
|
||||
if key.startswith("Context-") and val:
|
||||
try:
|
||||
yield cls(**json.loads(val)), timestamp
|
||||
contexts.append((cls(**json.loads(val)), timestamp))
|
||||
except ValueError:
|
||||
continue
|
||||
return contexts
|
||||
|
||||
@classmethod
|
||||
def load(cls, broker):
|
||||
@ -753,11 +755,13 @@ class ContainerSharder(ContainerReplicator):
|
||||
and own_shard_range.state in (ShardRange.SHARDING,
|
||||
ShardRange.SHARDED)):
|
||||
if db_state == SHARDED:
|
||||
context_ts = max([float(ts) for c, ts in
|
||||
CleavingContext.load_all(broker)]) or None
|
||||
if not context_ts or (context_ts + self.recon_sharded_timeout
|
||||
< Timestamp.now().timestamp):
|
||||
# not contexts or last context timestamp too old for the
|
||||
contexts = CleavingContext.load_all(broker)
|
||||
if not contexts:
|
||||
return
|
||||
context_ts = max(float(ts) for c, ts in contexts)
|
||||
if context_ts + self.recon_sharded_timeout \
|
||||
< Timestamp.now().timestamp:
|
||||
# last context timestamp too old for the
|
||||
# broker to be recorded
|
||||
return
|
||||
|
||||
|
@ -2681,6 +2681,24 @@ class TestSharder(BaseTestSharder):
|
||||
'.shards_', 'shard_c', (('l', 'mid'), ('mid', 'u')))
|
||||
self.assertEqual(1, broker.get_own_shard_range().deleted)
|
||||
|
||||
def test_sharded_record_sharding_progress_missing_contexts(self):
|
||||
broker = self._check_complete_sharding(
|
||||
'a', 'c', (('', 'mid'), ('mid', '')))
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock.patch.object(sharder, '_append_stat') as mocked:
|
||||
sharder._record_sharding_progress(broker, {}, None)
|
||||
mocked.assert_called_once_with('sharding_in_progress', 'all', mock.ANY)
|
||||
|
||||
# clear the contexts then run _record_sharding_progress
|
||||
for context, _ in CleavingContext.load_all(broker):
|
||||
context.delete(broker)
|
||||
|
||||
with self._mock_sharder() as sharder:
|
||||
with mock.patch.object(sharder, '_append_stat') as mocked:
|
||||
sharder._record_sharding_progress(broker, {}, None)
|
||||
mocked.assert_not_called()
|
||||
|
||||
def test_identify_sharding_old_style_candidate(self):
|
||||
brokers = [self._make_broker(container='c%03d' % i) for i in range(6)]
|
||||
for broker in brokers:
|
||||
@ -5988,6 +6006,11 @@ class TestCleavingContext(BaseTestSharder):
|
||||
else:
|
||||
self.assertEqual(lm, timestamp.internal)
|
||||
|
||||
# delete all contexts
|
||||
for ctx, lm in CleavingContext.load_all(broker):
|
||||
ctx.delete(broker)
|
||||
self.assertEqual([], CleavingContext.load_all(broker))
|
||||
|
||||
def test_delete(self):
|
||||
broker = self._make_broker()
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user