Merge "Include received frag_index in reconstructor log warnings"

This commit is contained in:
Jenkins 2017-03-22 16:19:53 +00:00 committed by Gerrit Code Review
commit 050bb63b77
2 changed files with 60 additions and 44 deletions

View File

@ -68,6 +68,29 @@ def _get_partners(frag_index, part_nodes):
]
def _full_path(node, part, relative_path, policy):
"""
Combines the node properties, partition, relative-path and policy into a
single string representation.
:param node: a dict describing node properties
:param part: partition number
:param path: path of the desired EC archive relative to partition dir
:param policy: an instance of
:class:`~swift.common.storage_policy.BaseStoragePolicy`
:return: string representation of absolute path on node plus policy index
"""
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d' % {
'replication_ip': node['replication_ip'],
'replication_port': node['replication_port'],
'device': node['device'],
'part': part, 'path': relative_path,
'policy': policy,
}
class RebuildingECDiskFileStream(object):
"""
This class wraps the reconstructed fragment archive data and
@ -192,17 +215,6 @@ class ObjectReconstructor(Daemon):
return False
return True
def _full_path(self, node, part, path, policy):
return '%(replication_ip)s:%(replication_port)s' \
'/%(device)s/%(part)s%(path)s ' \
'policy#%(policy)d' % {
'replication_ip': node['replication_ip'],
'replication_port': node['replication_port'],
'device': node['device'],
'part': part, 'path': path,
'policy': policy,
}
def _get_response(self, node, part, path, headers, policy):
"""
Helper method for reconstruction that GETs a single EC fragment
@ -210,13 +222,13 @@ class ObjectReconstructor(Daemon):
:param node: the node to GET from
:param part: the partition
:param path: full path of the desired EC archive
:param path: path of the desired EC archive relative to partition dir
:param headers: the headers to send
:param policy: an instance of
:class:`~swift.common.storage_policy.BaseStoragePolicy`
:returns: response
"""
full_path = self._full_path(node, part, path, policy)
full_path = _full_path(node, part, path, policy)
resp = None
try:
with ConnectionTimeout(self.conn_timeout):
@ -286,7 +298,7 @@ class ObjectReconstructor(Daemon):
resp.headers = HeaderKeyDict(resp.getheaders())
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
try:
unique_index = int(frag_index)
resp_frag_index = int(frag_index)
except (TypeError, ValueError):
# The successful response should include valid X-Object-
# Sysmeta-Ec-Frag-Index but for safety, catching the case
@ -294,11 +306,11 @@ class ObjectReconstructor(Daemon):
# frag index to reconstruct and dump warning log for that
self.logger.warning(
'Invalid resp from %s '
'(invalid X-Object-Sysmeta-Ec-Frag-Index)',
resp.full_path)
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
resp.full_path, frag_index)
continue
if fi_to_rebuild == unique_index:
if fi_to_rebuild == resp_frag_index:
# TODO: With duplicated EC frags it's not unreasonable to find
# the very fragment we're trying to rebuild exists on another
# primary node. In this case we should stream it directly from
@ -307,36 +319,37 @@ class ObjectReconstructor(Daemon):
self.logger.debug(
'Found existing frag #%s at %s while rebuilding to %s',
fi_to_rebuild, resp.full_path,
self._full_path(
_full_path(
node, job['partition'], datafile_metadata['name'],
job['policy']))
continue
timestamp = resp.headers.get('X-Backend-Timestamp')
if not timestamp:
self.logger.warning(
'Invalid resp from %s (missing X-Backend-Timestamp)',
resp.full_path)
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing X-Backend-Timestamp)',
resp.full_path, resp_frag_index)
continue
timestamp = Timestamp(timestamp)
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
if not etag:
self.logger.warning('Invalid resp from %s (missing Etag)',
resp.full_path)
self.logger.warning('Invalid resp from %s, frag index %s '
'(missing Etag)',
resp.full_path, resp_frag_index)
continue
if etag != etag_buckets.setdefault(timestamp, etag):
self.logger.error(
'Mixed Etag (%s, %s) for %s frag#%s',
etag, etag_buckets[timestamp],
self._full_path(node, job['partition'],
datafile_metadata['name'], job['policy']),
_full_path(node, job['partition'],
datafile_metadata['name'], job['policy']),
fi_to_rebuild)
continue
if unique_index not in buckets[timestamp]:
buckets[timestamp][unique_index] = resp
if resp_frag_index not in buckets[timestamp]:
buckets[timestamp][resp_frag_index] = resp
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
responses = buckets[timestamp].values()
self.logger.debug(
@ -350,9 +363,9 @@ class ObjectReconstructor(Daemon):
'Unable to get enough responses (%s/%s) '
'to reconstruct %s frag#%s with ETag %s' % (
len(resp), job['policy'].ec_ndata,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
_full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
fi_to_rebuild, etag))
if error_resp_count:
@ -360,9 +373,9 @@ class ObjectReconstructor(Daemon):
'Unable to get enough responses (%s error responses) '
'to reconstruct %s frag#%s' % (
error_resp_count,
self._full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
_full_path(node, job['partition'],
datafile_metadata['name'],
job['policy']),
fi_to_rebuild))
raise DiskFileError('Unable to reconstruct EC archive')
@ -535,7 +548,7 @@ class ObjectReconstructor(Daemon):
conn.getresponse().read()
except (Exception, Timeout):
self.logger.exception(
_("Trying to sync suffixes with %s") % self._full_path(
_("Trying to sync suffixes with %s") % _full_path(
node, job['partition'], '', job['policy']))
def _get_suffixes_to_sync(self, job, node):
@ -561,11 +574,11 @@ class ObjectReconstructor(Daemon):
if resp.status == HTTP_INSUFFICIENT_STORAGE:
self.logger.error(
_('%s responded as unmounted'),
self._full_path(node, job['partition'], '',
job['policy']))
_full_path(node, job['partition'], '',
job['policy']))
elif resp.status != HTTP_OK:
full_path = self._full_path(node, job['partition'], '',
job['policy'])
full_path = _full_path(node, job['partition'], '',
job['policy'])
self.logger.error(
_("Invalid response %(resp)s from %(full_path)s"),
{'resp': resp.status, 'full_path': full_path})
@ -576,7 +589,7 @@ class ObjectReconstructor(Daemon):
# safely catch our exception and continue to the next node
# without logging
self.logger.exception('Unable to get remote suffix hashes '
'from %r' % self._full_path(
'from %r' % _full_path(
node, job['partition'], '',
job['policy']))

View File

@ -3426,13 +3426,15 @@ class TestObjectReconstructor(unittest.TestCase):
message_base = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
test_missing_header(
'X-Object-Sysmeta-Ec-Etag',
"%s %s" % (message_base, "(missing Etag)"))
test_missing_header(
'X-Object-Sysmeta-Ec-Frag-Index',
"%s %s" % (message_base,
"(invalid X-Object-Sysmeta-Ec-Frag-Index)"))
"(invalid X-Object-Sysmeta-Ec-Frag-Index: None)"))
message_base += ", frag index 0"
test_missing_header(
'X-Object-Sysmeta-Ec-Etag',
"%s %s" % (message_base, "(missing Etag)"))
test_missing_header(
'X-Backend-Timestamp',
"%s %s" % (message_base, "(missing X-Backend-Timestamp)"))
@ -3491,7 +3493,8 @@ class TestObjectReconstructor(unittest.TestCase):
self.assertEqual(1, len(warning_log_lines))
expected_message = \
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index: %r)" % \
invalid_frag_index
self.assertIn(expected_message, warning_log_lines)
for value in ('None', 'invalid'):