Small fixes for ec duplication

To address Alistair's comment at
https://review.openstack.org/#/c/219165.

This includes:

- Fix reconstructor log message to avoid redundant frag index info
- Fix incorrect FabricatedRing setting to have ec_k + ec_m replicas
- Use policy.ec_n_unique_fragments for testing frag index election
- Plus some various minor cleanup and docs additions

Huge refactoring around TestECMixin at the test/unit/proxy/test_server.py
is in https://review.openstack.org/#/c/440466/ to clarify the change.

Co-Authored-By: Alistair Coles <alistairncoles@gmail.com>

Change-Id: I8096202f5f8d91296963f7a409a29d57fa7828e4
This commit is contained in:
Kota Tsuyuzaki 2017-02-28 00:52:06 -08:00
parent 5c0814daa6
commit a2f4046624
3 changed files with 60 additions and 41 deletions

View File

@ -193,17 +193,14 @@ class ObjectReconstructor(Daemon):
return True return True
def _full_path(self, node, part, path, policy): def _full_path(self, node, part, path, policy):
frag_index = (policy.get_backend_index(node['index'])
if 'index' in node else 'handoff')
return '%(replication_ip)s:%(replication_port)s' \ return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \ '/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d frag#%(frag_index)s' % { 'policy#%(policy)d' % {
'replication_ip': node['replication_ip'], 'replication_ip': node['replication_ip'],
'replication_port': node['replication_port'], 'replication_port': node['replication_port'],
'device': node['device'], 'device': node['device'],
'part': part, 'path': path, 'part': part, 'path': path,
'policy': policy, 'policy': policy,
'frag_index': frag_index,
} }
def _get_response(self, node, part, path, headers, policy): def _get_response(self, node, part, path, headers, policy):
@ -308,8 +305,9 @@ class ObjectReconstructor(Daemon):
# the remote node to our target instead of rebuild. But # the remote node to our target instead of rebuild. But
# instead we ignore it. # instead we ignore it.
self.logger.debug( self.logger.debug(
'Found existing frag #%s while rebuilding #%s from %s', 'Found existing frag #%s at %s while rebuilding to %s',
unique_index, fi_to_rebuild, self._full_path( fi_to_rebuild, resp.full_path,
self._full_path(
node, job['partition'], datafile_metadata['name'], node, job['partition'], datafile_metadata['name'],
job['policy'])) job['policy']))
continue continue
@ -330,10 +328,11 @@ class ObjectReconstructor(Daemon):
if etag != etag_buckets.setdefault(timestamp, etag): if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error( self.logger.error(
'Mixed Etag (%s, %s) for %s', 'Mixed Etag (%s, %s) for %s frag#%s',
etag, etag_buckets[timestamp], etag, etag_buckets[timestamp],
self._full_path(node, job['partition'], self._full_path(node, job['partition'],
datafile_metadata['name'], job['policy'])) datafile_metadata['name'], job['policy']),
fi_to_rebuild)
continue continue
if unique_index not in buckets[timestamp]: if unique_index not in buckets[timestamp]:
@ -349,21 +348,22 @@ class ObjectReconstructor(Daemon):
etag = etag_buckets[timestamp] etag = etag_buckets[timestamp]
self.logger.error( self.logger.error(
'Unable to get enough responses (%s/%s) ' 'Unable to get enough responses (%s/%s) '
'to reconstruct %s with ETag %s' % ( 'to reconstruct %s frag#%s with ETag %s' % (
len(resp), job['policy'].ec_ndata, len(resp), job['policy'].ec_ndata,
self._full_path(node, job['partition'], self._full_path(node, job['partition'],
datafile_metadata['name'], datafile_metadata['name'],
job['policy']), job['policy']),
etag)) fi_to_rebuild, etag))
if error_resp_count: if error_resp_count:
self.logger.error( self.logger.error(
'Unable to get enough responses (%s error responses) ' 'Unable to get enough responses (%s error responses) '
'to reconstruct %s' % ( 'to reconstruct %s frag#%s' % (
error_resp_count, error_resp_count,
self._full_path(node, job['partition'], self._full_path(node, job['partition'],
datafile_metadata['name'], datafile_metadata['name'],
job['policy']))) job['policy']),
fi_to_rebuild))
raise DiskFileError('Unable to reconstruct EC archive') raise DiskFileError('Unable to reconstruct EC archive')

View File

@ -645,14 +645,14 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
def test_get_response(self): def test_get_response(self):
part = self.part_nums[0] part = self.part_nums[0]
node = POLICIES[1].object_ring.get_part_nodes(int(part))[0] node = self.policy.object_ring.get_part_nodes(int(part))[0]
def do_test(stat_code): def do_test(stat_code):
with mocked_http_conn(stat_code): with mocked_http_conn(stat_code):
resp = self.reconstructor._get_response(node, part, resp = self.reconstructor._get_response(node, part,
path='nada', path='nada',
headers={}, headers={},
policy=POLICIES[1]) policy=self.policy)
return resp return resp
resp = do_test(200) resp = do_test(200)
@ -1031,9 +1031,8 @@ class TestGlobalSetupObjectReconstructor(unittest.TestCase):
stat_method, stat_prefix = stat_key stat_method, stat_prefix = stat_key
self.assertStatCount(stat_method, stat_prefix, expected) self.assertStatCount(stat_method, stat_prefix, expected)
# part 2 should be totally empty # part 2 should be totally empty
policy = POLICIES[1] hash_gen = self.reconstructor._df_router[self.policy].yield_hashes(
hash_gen = self.reconstructor._df_router[policy].yield_hashes( 'sda1', '2', self.policy)
'sda1', '2', policy)
for path, hash_, ts in hash_gen: for path, hash_, ts in hash_gen:
self.fail('found %s with %s in %s' % (hash_, ts, path)) self.fail('found %s with %s in %s' % (hash_, ts, path))
# but the partition directory and hashes pkl still exist # but the partition directory and hashes pkl still exist
@ -1180,7 +1179,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.policy.object_ring.max_more_nodes = \ self.policy.object_ring.max_more_nodes = \
self.policy.object_ring.replicas self.policy.object_ring.replicas
self.ts_iter = make_timestamp_iter() self.ts_iter = make_timestamp_iter()
self.fabricated_ring = FabricatedRing() self.fabricated_ring = FabricatedRing(replicas=14, devices=28)
def _configure_reconstructor(self, **kwargs): def _configure_reconstructor(self, **kwargs):
self.conf.update(kwargs) self.conf.update(kwargs)
@ -1885,8 +1884,7 @@ class TestObjectReconstructor(unittest.TestCase):
} }
# since this part doesn't belong on us it doesn't matter what # since this part doesn't belong on us it doesn't matter what
# frag_index we have # frag_index we have
frag_index = self.policy.get_backend_index( frag_index = random.randint(0, self.policy.ec_n_unique_fragments - 1)
random.randint(0, ring.replicas - 1))
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'abc': {None: 'hash'}, 'abc': {None: 'hash'},
@ -1940,9 +1938,9 @@ class TestObjectReconstructor(unittest.TestCase):
'part_path': part_path, 'part_path': part_path,
} }
frag_index = self.policy.get_backend_index(node_index) frag_index = self.policy.get_backend_index(node_index)
other_frag_index = self.policy.get_backend_index( other_frag_index = random.choice(
random.choice( [f for f in range(self.policy.ec_n_unique_fragments)
[f for f in range(ring.replicas) if f != node_index])) if f != node_index])
stub_hashes = { stub_hashes = {
'123': {frag_index: 'hash', None: 'hash'}, '123': {frag_index: 'hash', None: 'hash'},
'456': {other_frag_index: 'hash', None: 'hash'}, '456': {other_frag_index: 'hash', None: 'hash'},
@ -3232,6 +3230,9 @@ class TestObjectReconstructor(unittest.TestCase):
self.fail( self.fail(
"no expected etag %s found: %s" % "no expected etag %s found: %s" %
(list(ec_archive_dict), error_line)) (list(ec_archive_dict), error_line))
# remove the found etag which should not be found in the
# following error lines
del ec_archive_dict[(expected_etag, ts)]
expected = 'Unable to get enough responses (%s/10) to ' \ expected = 'Unable to get enough responses (%s/10) to ' \
'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \ 'reconstruct 10.0.0.1:1001/sdb/0/a/c/o policy#0 ' \
@ -3244,12 +3245,15 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertFalse(self.logger.get_lines_for_level('warning')) self.assertFalse(self.logger.get_lines_for_level('warning'))
def test_reconstruct_fa_finds_itself_does_not_fail(self): def test_reconstruct_fa_finds_itself_does_not_fail(self):
# verify that reconstruction of a missing frag can cope with finding
# that missing frag in the responses it gets from other nodes while
# attempting to rebuild the missing frag
job = { job = {
'partition': 0, 'partition': 0,
'policy': self.policy, 'policy': self.policy,
} }
part_nodes = self.policy.object_ring.get_part_nodes(0) part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1] broken_node = random.randint(0, self.policy.ec_ndata - 1)
metadata = { metadata = {
'name': '/a/c/o', 'name': '/a/c/o',
'Content-Length': 0, 'Content-Length': 0,
@ -3263,7 +3267,7 @@ class TestObjectReconstructor(unittest.TestCase):
# instead of popping the broken body, we'll just leave it in the list # instead of popping the broken body, we'll just leave it in the list
# of responses and take away something else. # of responses and take away something else.
broken_body = ec_archive_bodies[1] broken_body = ec_archive_bodies[broken_node]
ec_archive_bodies = ec_archive_bodies[:-1] ec_archive_bodies = ec_archive_bodies[:-1]
def make_header(body): def make_header(body):
@ -3276,22 +3280,21 @@ class TestObjectReconstructor(unittest.TestCase):
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
with mocked_http_conn(*codes, body_iter=body_iter, headers=headers): with mocked_http_conn(*codes, body_iter=body_iter, headers=headers):
df = self.reconstructor.reconstruct_fa( df = self.reconstructor.reconstruct_fa(
job, node, metadata) job, part_nodes[broken_node], metadata)
fixed_body = ''.join(df.reader()) fixed_body = ''.join(df.reader())
self.assertEqual(len(fixed_body), len(broken_body)) self.assertEqual(len(fixed_body), len(broken_body))
self.assertEqual(md5(fixed_body).hexdigest(), self.assertEqual(md5(fixed_body).hexdigest(),
md5(broken_body).hexdigest()) md5(broken_body).hexdigest())
# no error and warning # no error, no warning
self.assertFalse(self.logger.get_lines_for_level('error')) self.assertFalse(self.logger.get_lines_for_level('error'))
self.assertFalse(self.logger.get_lines_for_level('warning')) self.assertFalse(self.logger.get_lines_for_level('warning'))
# the found own frag will be reported in the debug message # the found own frag will be reported in the debug message
debug_log_lines = self.logger.get_lines_for_level('debug') debug_log_lines = self.logger.get_lines_for_level('debug')
self.assertEqual(2, len(debug_log_lines)) # redundant frag found once in first ec_ndata responses
self.assertIn( self.assertIn(
'Found existing frag #1 while rebuilding #1 from', 'Found existing frag #%s at' % broken_node,
debug_log_lines[0]) debug_log_lines[0])
# ... and then, it should be skipped in the responses
# N.B. in the future, we could avoid those check because # N.B. in the future, we could avoid those check because
# definitely sending the copy rather than reconstruct will # definitely sending the copy rather than reconstruct will
@ -3301,11 +3304,12 @@ class TestObjectReconstructor(unittest.TestCase):
# liberasurecode[1]. # liberasurecode[1].
# 1: https://github.com/openstack/liberasurecode/blob/ # 1: https://github.com/openstack/liberasurecode/blob/
# master/src/erasurecode.c#L870 # master/src/erasurecode.c#L870
expected_prefix = 'Reconstruct frag #1 with frag indexes' log_prefix = 'Reconstruct frag #%s with frag indexes' % broken_node
self.assertIn(expected_prefix, debug_log_lines[1]) self.assertIn(log_prefix, debug_log_lines[1])
self.assertFalse(debug_log_lines[2:])
got_frag_index_list = json.loads( got_frag_index_list = json.loads(
debug_log_lines[1][len(expected_prefix):]) debug_log_lines[1][len(log_prefix):])
self.assertNotIn(1, got_frag_index_list) self.assertNotIn(broken_node, got_frag_index_list)
def test_reconstruct_fa_finds_duplicate_does_not_fail(self): def test_reconstruct_fa_finds_duplicate_does_not_fail(self):
job = { job = {
@ -3369,11 +3373,13 @@ class TestObjectReconstructor(unittest.TestCase):
} }
part_nodes = self.policy.object_ring.get_part_nodes(0) part_nodes = self.policy.object_ring.get_part_nodes(0)
node = part_nodes[1] node = part_nodes[1]
ts = make_timestamp_iter()
timestamp = next(ts)
metadata = { metadata = {
'name': '/a/c/o', 'name': '/a/c/o',
'Content-Length': 0, 'Content-Length': 0,
'ETag': 'etag', 'ETag': 'etag',
'X-Timestamp': '1234567890.12345' 'X-Timestamp': timestamp.normal
} }
test_data = ('rebuild' * self.policy.ec_segment_size)[:-777] test_data = ('rebuild' * self.policy.ec_segment_size)[:-777]
@ -3384,7 +3390,9 @@ class TestObjectReconstructor(unittest.TestCase):
def make_header(body): def make_header(body):
headers = get_header_frag_index(self, body) headers = get_header_frag_index(self, body)
headers.update({'X-Object-Sysmeta-Ec-Etag': etag}) headers.update(
{'X-Object-Sysmeta-Ec-Etag': etag,
'X-Backend-Timestamp': timestamp.internal})
return headers return headers
def test_missing_header(missing_header, expected_warning): def test_missing_header(missing_header, expected_warning):
@ -3392,6 +3400,10 @@ class TestObjectReconstructor(unittest.TestCase):
responses = [(200, body, make_header(body)) responses = [(200, body, make_header(body))
for body in ec_archive_bodies] for body in ec_archive_bodies]
# To drop the header from the response[0], set None as the value
# explicitly instead of deleting the key because if no key exists
# in the dict, fake_http_connect will insert some key/value pairs
# automatically (e.g. X-Backend-Timestamp)
responses[0][2].update({missing_header: None}) responses[0][2].update({missing_header: None})
codes, body_iter, headers = zip(*responses) codes, body_iter, headers = zip(*responses)
@ -3412,7 +3424,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertIn(expected_warning, warning_log_lines) self.assertIn(expected_warning, warning_log_lines)
message_base = \ message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0 frag#0" "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
test_missing_header( test_missing_header(
'X-Object-Sysmeta-Ec-Etag', 'X-Object-Sysmeta-Ec-Etag',
@ -3479,7 +3491,7 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(1, len(warning_log_lines)) self.assertEqual(1, len(warning_log_lines))
expected_message = \ expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \ "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"policy#0 frag#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)" "policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
self.assertIn(expected_message, warning_log_lines) self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'): for value in ('None', 'invalid'):

View File

@ -4304,6 +4304,9 @@ class TestECDuplicationObjController(
controller = self.controller_cls( controller = self.controller_cls(
self.app, 'a', 'c', 'o') self.app, 'a', 'c', 'o')
# sanity, tester should set unique frag index to the variable
self.assertEqual(
unique, self.policy.get_backend_index(unique))
# create a dummy list of putters, check no handoffs # create a dummy list of putters, check no handoffs
putters = [] putters = []
for index in range(self.policy.object_ring.replica_count): for index in range(self.policy.object_ring.replica_count):
@ -4327,9 +4330,13 @@ class TestECDuplicationObjController(
# pop one more fragment too to make one missing hole # pop one more fragment too to make one missing hole
putters.pop(one_more_missing) putters.pop(one_more_missing)
# then determine chunk, we have 26 putters here and unique frag # we have 26 putters here and frag index "unique" missing 2
# index 0 missing 2 copies and unique frag index 1 missing 1 copy # copies. (i.e. missing "unique" and "duplicated" which
# i.e. the handoff node should be assigned to unique frag index 1 # should be same unique index). Then "one_more_missing" frag
# is different from both of the "unique" and "duplicated"
# but it's only 1 copy missing so that the handoff node should
# be assigned to either "unique" or "duplicated" prior to
# "one_more_missing"
got = controller._determine_chunk_destinations(putters, self.policy) got = controller._determine_chunk_destinations(putters, self.policy)
# N.B. len(putters) is now len(expected - 2) due to pop twice # N.B. len(putters) is now len(expected - 2) due to pop twice
self.assertEqual(len(putters), len(got)) self.assertEqual(len(putters), len(got))