Merge "Small fixes for ec duplication"

This commit is contained in:
Jenkins 2017-03-22 16:14:23 +00:00 committed by Gerrit Code Review
commit 39c7021aa4
3 changed files with 60 additions and 41 deletions

View File

@ -193,17 +193,14 @@ class ObjectReconstructor(Daemon):
return True
def _full_path(self, node, part, path, policy):
frag_index = (policy.get_backend_index(node['index'])
if 'index' in node else 'handoff')
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d frag#%(frag_index)s' % {
'policy#%(policy)d' % {
'replication_ip': node['replication_ip'],
'replication_port': node['replication_port'],
'device': node['device'],
'part': part, 'path': path,
'policy': policy,
'frag_index': frag_index,
}
def _get_response(self, node, part, path, headers, policy):
@ -308,8 +305,9 @@ class ObjectReconstructor(Daemon):
# the remote node to our target instead of rebuild. But
# instead we ignore it.
self.logger.debug(
'Found existing frag #%s while rebuilding #%s from %s',
unique_index, fi_to_rebuild, self._full_path(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
self._full_path(
node, job['partition'], datafile_metadata['name'],
job['policy']))
continue
@ -330,10 +328,11 @@ class ObjectReconstructor(Daemon):
if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error(
'Mixed Etag (%s, %s) for %s',
'Mixed Etag (%s, %s) for %s frag#%s',
etag, etag_buckets[timestamp],
self._full_path(node, job['partition'],
datafile_metadata['name'], job['policy']))
datafile_metadata['name'], job['policy']),
fi_to_rebuild)
continue
if unique_index not in buckets[timestamp]:
@ -349,21 +348,22 @@ class ObjectReconstructor(Daemon):
etag = etag_buckets[timestamp]
self.logger.error(
'Unable to get enough responses (%s/%s) '
'to reconstruct %s with ETag %s' % (
'to reconstruct %s frag#%s with ETag %s' % (
len(resp), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
etag))
fi_to_rebuild, etag))
if error_resp_count:
self.logger.error(
'Unable to get enough responses (%s error responses) '
'to reconstruct %s' % (
'to reconstruct %s frag#%s' % (
error_resp_count,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy'])))
job['policy']),
fi_to_rebuild))
raise DiskFileError('Unable to reconstruct EC archive')

View File

@ -645,14 +645,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_get_response(self):
part = self.part_nums[0]
node = POLICIES[1].object_ring.get_part_nodes(int(part))[0]
node = self.policy.object_ring.get_part_nodes(int(part))[0]
def do_test(stat_code):
with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part,
path='nada',
headers={},
policy=POLICIES[1])
policy=self.policy)
return resp
resp = do_test(200)
@ -1031,9 +1031,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
stat_method, stat_prefix = stat_key
self.assertStatCount(stat_method, stat_prefix, expected)
# part 2 should be totally empty
policy = POLICIES[1]
hash_gen = self.reconstructor._df_router[policy].yield_hashes(
'sda1', '2', policy)
hash_gen = self.reconstructor._df_router[self.policy].yield_hashes(
'sda1', '2', self.policy)
for path, hash_, ts in hash_gen:
self.fail('found %s with %s in %s' % (hash_, ts, path))
# but the partition directory and hashes pkl still exist
@ -1180,7 +1179,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter()
self.fabricated_ring = FabricatedRing()
self.fabricated_ring = FabricatedRing(replicas=14, devices=28)
def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs)
@ -1885,8 +1884,7 @@ class TestObjectReconstructor(unittest.TestCase):
}
# since this part doesn't belong on us it doesn't matter what
# frag_index we have
frag_index = self.policy.get_backend_index(
random.randint(0, ring.replicas - 1))
frag_index = random.randint(0, self.policy.ec_n_unique_fragments - 1)
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'},
@ -1940,9 +1938,9 @@ class TestObjectReconstructor(unittest.TestCase):
'part_path': part_path,
}
frag_index = self.policy.get_backend_index(node_index)
other_frag_index = self.policy.get_backend_index(
random.choice(
[f for f in range(ring.replicas) if f != node_index]))
other_frag_index = random.choice(
[f for f in range(self.policy.ec_n_unique_fragments)
if f != node_index])
stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'},
'456': {other_frag_index: 'hash', None: 'hash'},
@ -3232,6 +3230,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.fail(
"no expected etag %s found: %s" %
(list(ec_archive_dict), error_line))
# remove the found etag which should not be found in the
# following error lines
del ec_archive_dict[(expected_etag, ts)]
expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \
@ -3244,12 +3245,15 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_itself_does_not_fail(self):
# verify that reconstruction of a missing frag can cope with finding
# that missing frag in the responses it gets from other nodes while
# attempting to rebuild the missing frag
job = {
'partition': 0,
'policy': self.policy,
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
broken_node = random.randint(0, self.policy.ec_ndata - 1)
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
@ -3263,7 +3267,7 @@ class TestObjectReconstructor(unittest.TestCase):
# instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else.
broken_body = ec_archive_bodies[1]
broken_body = ec_archive_bodies[broken_node]
ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body):
@ -3276,22 +3280,21 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa(
job, node, metadata)
job, part_nodes[broken_node], metadata)
fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest())
# no error and warning
# no error, no warning
self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning'))
# the found own frag will be reported in the debug message
debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(2, len(debug_log_lines))
# redundant frag found once in first ec_ndata responses
self.assertIn(
'Found existing frag #1 while rebuilding #1 from',
'Found existing frag #%s at' % broken_node,
debug_log_lines[0])
# ... and then, it should be skipped in the responses
# N.B. in the future, we could avoid those check because
# definitely sending the copy rather than reconstruct will
@ -3301,11 +3304,12 @@ class TestObjectReconstructor(unittest.TestCase):
# liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870
expected_prefix = 'Reconstruct frag #1 with frag indexes'
self.assertIn(expected_prefix, debug_log_lines[1])
log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node
self.assertIn(log_prefix, debug_log_lines[1])
self.assertFalse(debug_log_lines[2:])
got_frag_index_list = json.loads(
debug_log_lines[1][len(expected_prefix):])
self.assertNotIn(1, got_frag_index_list)
debug_log_lines[1][len(log_prefix):])
self.assertNotIn(broken_node, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = {
@ -3369,11 +3373,13 @@ class TestObjectReconstructor(unittest.TestCase):
}
part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1]
ts = make_timestamp_iter()
timestamp = next(ts)
metadata = {
'name': '/a/c/o',
'Content-Length': 0,
'ETag': 'etag',
'X-Timestamp': '1234567890.12345'
'X-Timestamp': timestamp.normal
}
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
@ -3384,7 +3390,9 @@ class TestObjectReconstructor(unittest.TestCase):
def make_header(body):
headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag})
headers.update(
{'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': timestamp.internal})
return headers
def test_missing_header(missing_header, expected_warning):
@ -3392,6 +3400,10 @@ class TestObjectReconstructor(unittest.TestCase):
responses = [(200, body, make_header(body))
for body in ec_archive_bodies]
# To drop the header from the response[0], set None as the value
# explicitly instead of deleting the key because if no key exists
# in the dict, fake_http_connect will insert some key/value pairs
# automatically (e.g. X-Backend-Timestamp)
responses[0][2].update({missing_header: None})
codes, body_iter, headers = zip(*responses)
@ -3412,7 +3424,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertIn(expected_warning, warning_log_lines)
message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0 frag#0"
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
test_missing_header(
'X-Object-Sysmeta-Ec-Etag',
@ -3479,7 +3491,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(1, len(warning_log_lines))
expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"policy#0 frag#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'):

View File

@ -4304,6 +4304,9 @@ class TestECDuplicationObjController(
controller = self.controller_cls(
self.app, 'a', 'c', 'o')
# sanity, tester should set unique frag index to the variable
self.assertEqual(
unique, self.policy.get_backend_index(unique))
# create a dummy list of putters, check no handoffs
putters = []
for index in range(self.policy.object_ring.replica_count):
@ -4327,9 +4330,13 @@ class TestECDuplicationObjController(
# pop one more fragment too to make one missing hole
putters.pop(one_more_missing)
# then determine chunk, we have 26 putters here and unique frag
# index 0 missing 2 copies and unique frag index 1 missing 1 copy
# i.e. the handoff node should be assigned to unique frag index 1
# we have 26 putters here and frag index "unique" missing 2
# copies. (i.e. missing "unique" and "duplicated" which
# should be same unique index). Then "one_more_missing" frag
# is different from both of the "unique" and "duplicated"
# but it's only 1 copy missing so that the handoff node should
# be assigned to either "unique" or "duplicated" prior to
# "one_more_missing"
got = controller._determine_chunk_destinations(putters, self.policy)
# N.B. len(putters) is now len(expected - 2) due to pop twice
self.assertEqual(len(putters), len(got))