Include received frag_index in reconstructor log warnings
The Related-Change removed the frag_index expected from a node response from the full_path included in log messages. This made sense because the *expected* frag_index is not necessarily the index that was actually received from the node. However, it would be useful to include the *actual* received frag_index in log messages. This patch also: - makes _full_path a module level function - renames unique_index to be resp_frag_index to aid understanding of the various indexes we deal with during reconstruction. Change-Id: Ic932835b3c1ed51a8456fce775fb59445fcb834b Related-Change: I8096202f5f8d91296963f7a409a29d57fa7828e4
This commit is contained in:
parent
a2f4046624
commit
56349e022d
@ -68,6 +68,29 @@ def _get_partners(frag_index, part_nodes):
|
|||||||
]
|
]
|
||||||
|
|
||||||
|
|
||||||
|
def _full_path(node, part, relative_path, policy):
|
||||||
|
"""
|
||||||
|
Combines the node properties, partition, relative-path and policy into a
|
||||||
|
single string representation.
|
||||||
|
|
||||||
|
:param node: a dict describing node properties
|
||||||
|
:param part: partition number
|
||||||
|
:param path: path of the desired EC archive relative to partition dir
|
||||||
|
:param policy: an instance of
|
||||||
|
:class:`~swift.common.storage_policy.BaseStoragePolicy`
|
||||||
|
:return: string representation of absolute path on node plus policy index
|
||||||
|
"""
|
||||||
|
return '%(replication_ip)s:%(replication_port)s' \
|
||||||
|
'/%(device)s/%(part)s%(path)s ' \
|
||||||
|
'policy#%(policy)d' % {
|
||||||
|
'replication_ip': node['replication_ip'],
|
||||||
|
'replication_port': node['replication_port'],
|
||||||
|
'device': node['device'],
|
||||||
|
'part': part, 'path': relative_path,
|
||||||
|
'policy': policy,
|
||||||
|
}
|
||||||
|
|
||||||
|
|
||||||
class RebuildingECDiskFileStream(object):
|
class RebuildingECDiskFileStream(object):
|
||||||
"""
|
"""
|
||||||
This class wraps the reconstructed fragment archive data and
|
This class wraps the reconstructed fragment archive data and
|
||||||
@ -192,17 +215,6 @@ class ObjectReconstructor(Daemon):
|
|||||||
return False
|
return False
|
||||||
return True
|
return True
|
||||||
|
|
||||||
def _full_path(self, node, part, path, policy):
|
|
||||||
return '%(replication_ip)s:%(replication_port)s' \
|
|
||||||
'/%(device)s/%(part)s%(path)s ' \
|
|
||||||
'policy#%(policy)d' % {
|
|
||||||
'replication_ip': node['replication_ip'],
|
|
||||||
'replication_port': node['replication_port'],
|
|
||||||
'device': node['device'],
|
|
||||||
'part': part, 'path': path,
|
|
||||||
'policy': policy,
|
|
||||||
}
|
|
||||||
|
|
||||||
def _get_response(self, node, part, path, headers, policy):
|
def _get_response(self, node, part, path, headers, policy):
|
||||||
"""
|
"""
|
||||||
Helper method for reconstruction that GETs a single EC fragment
|
Helper method for reconstruction that GETs a single EC fragment
|
||||||
@ -210,13 +222,13 @@ class ObjectReconstructor(Daemon):
|
|||||||
|
|
||||||
:param node: the node to GET from
|
:param node: the node to GET from
|
||||||
:param part: the partition
|
:param part: the partition
|
||||||
:param path: full path of the desired EC archive
|
:param path: path of the desired EC archive relative to partition dir
|
||||||
:param headers: the headers to send
|
:param headers: the headers to send
|
||||||
:param policy: an instance of
|
:param policy: an instance of
|
||||||
:class:`~swift.common.storage_policy.BaseStoragePolicy`
|
:class:`~swift.common.storage_policy.BaseStoragePolicy`
|
||||||
:returns: response
|
:returns: response
|
||||||
"""
|
"""
|
||||||
full_path = self._full_path(node, part, path, policy)
|
full_path = _full_path(node, part, path, policy)
|
||||||
resp = None
|
resp = None
|
||||||
try:
|
try:
|
||||||
with ConnectionTimeout(self.conn_timeout):
|
with ConnectionTimeout(self.conn_timeout):
|
||||||
@ -286,7 +298,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
resp.headers = HeaderKeyDict(resp.getheaders())
|
resp.headers = HeaderKeyDict(resp.getheaders())
|
||||||
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
|
frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index')
|
||||||
try:
|
try:
|
||||||
unique_index = int(frag_index)
|
resp_frag_index = int(frag_index)
|
||||||
except (TypeError, ValueError):
|
except (TypeError, ValueError):
|
||||||
# The successful response should include valid X-Object-
|
# The successful response should include valid X-Object-
|
||||||
# Sysmeta-Ec-Frag-Index but for safety, catching the case
|
# Sysmeta-Ec-Frag-Index but for safety, catching the case
|
||||||
@ -294,11 +306,11 @@ class ObjectReconstructor(Daemon):
|
|||||||
# frag index to reconstruct and dump warning log for that
|
# frag index to reconstruct and dump warning log for that
|
||||||
self.logger.warning(
|
self.logger.warning(
|
||||||
'Invalid resp from %s '
|
'Invalid resp from %s '
|
||||||
'(invalid X-Object-Sysmeta-Ec-Frag-Index)',
|
'(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)',
|
||||||
resp.full_path)
|
resp.full_path, frag_index)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if fi_to_rebuild == unique_index:
|
if fi_to_rebuild == resp_frag_index:
|
||||||
# TODO: With duplicated EC frags it's not unreasonable to find
|
# TODO: With duplicated EC frags it's not unreasonable to find
|
||||||
# the very fragment we're trying to rebuild exists on another
|
# the very fragment we're trying to rebuild exists on another
|
||||||
# primary node. In this case we should stream it directly from
|
# primary node. In this case we should stream it directly from
|
||||||
@ -307,36 +319,37 @@ class ObjectReconstructor(Daemon):
|
|||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
'Found existing frag #%s at %s while rebuilding to %s',
|
'Found existing frag #%s at %s while rebuilding to %s',
|
||||||
fi_to_rebuild, resp.full_path,
|
fi_to_rebuild, resp.full_path,
|
||||||
self._full_path(
|
_full_path(
|
||||||
node, job['partition'], datafile_metadata['name'],
|
node, job['partition'], datafile_metadata['name'],
|
||||||
job['policy']))
|
job['policy']))
|
||||||
continue
|
continue
|
||||||
|
|
||||||
timestamp = resp.headers.get('X-Backend-Timestamp')
|
timestamp = resp.headers.get('X-Backend-Timestamp')
|
||||||
if not timestamp:
|
if not timestamp:
|
||||||
self.logger.warning(
|
self.logger.warning('Invalid resp from %s, frag index %s '
|
||||||
'Invalid resp from %s (missing X-Backend-Timestamp)',
|
'(missing X-Backend-Timestamp)',
|
||||||
resp.full_path)
|
resp.full_path, resp_frag_index)
|
||||||
continue
|
continue
|
||||||
timestamp = Timestamp(timestamp)
|
timestamp = Timestamp(timestamp)
|
||||||
|
|
||||||
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag')
|
||||||
if not etag:
|
if not etag:
|
||||||
self.logger.warning('Invalid resp from %s (missing Etag)',
|
self.logger.warning('Invalid resp from %s, frag index %s '
|
||||||
resp.full_path)
|
'(missing Etag)',
|
||||||
|
resp.full_path, resp_frag_index)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if etag != etag_buckets.setdefault(timestamp, etag):
|
if etag != etag_buckets.setdefault(timestamp, etag):
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
'Mixed Etag (%s, %s) for %s frag#%s',
|
'Mixed Etag (%s, %s) for %s frag#%s',
|
||||||
etag, etag_buckets[timestamp],
|
etag, etag_buckets[timestamp],
|
||||||
self._full_path(node, job['partition'],
|
_full_path(node, job['partition'],
|
||||||
datafile_metadata['name'], job['policy']),
|
datafile_metadata['name'], job['policy']),
|
||||||
fi_to_rebuild)
|
fi_to_rebuild)
|
||||||
continue
|
continue
|
||||||
|
|
||||||
if unique_index not in buckets[timestamp]:
|
if resp_frag_index not in buckets[timestamp]:
|
||||||
buckets[timestamp][unique_index] = resp
|
buckets[timestamp][resp_frag_index] = resp
|
||||||
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
|
if len(buckets[timestamp]) >= job['policy'].ec_ndata:
|
||||||
responses = buckets[timestamp].values()
|
responses = buckets[timestamp].values()
|
||||||
self.logger.debug(
|
self.logger.debug(
|
||||||
@ -350,9 +363,9 @@ class ObjectReconstructor(Daemon):
|
|||||||
'Unable to get enough responses (%s/%s) '
|
'Unable to get enough responses (%s/%s) '
|
||||||
'to reconstruct %s frag#%s with ETag %s' % (
|
'to reconstruct %s frag#%s with ETag %s' % (
|
||||||
len(resp), job['policy'].ec_ndata,
|
len(resp), job['policy'].ec_ndata,
|
||||||
self._full_path(node, job['partition'],
|
_full_path(node, job['partition'],
|
||||||
datafile_metadata['name'],
|
datafile_metadata['name'],
|
||||||
job['policy']),
|
job['policy']),
|
||||||
fi_to_rebuild, etag))
|
fi_to_rebuild, etag))
|
||||||
|
|
||||||
if error_resp_count:
|
if error_resp_count:
|
||||||
@ -360,9 +373,9 @@ class ObjectReconstructor(Daemon):
|
|||||||
'Unable to get enough responses (%s error responses) '
|
'Unable to get enough responses (%s error responses) '
|
||||||
'to reconstruct %s frag#%s' % (
|
'to reconstruct %s frag#%s' % (
|
||||||
error_resp_count,
|
error_resp_count,
|
||||||
self._full_path(node, job['partition'],
|
_full_path(node, job['partition'],
|
||||||
datafile_metadata['name'],
|
datafile_metadata['name'],
|
||||||
job['policy']),
|
job['policy']),
|
||||||
fi_to_rebuild))
|
fi_to_rebuild))
|
||||||
|
|
||||||
raise DiskFileError('Unable to reconstruct EC archive')
|
raise DiskFileError('Unable to reconstruct EC archive')
|
||||||
@ -535,7 +548,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
conn.getresponse().read()
|
conn.getresponse().read()
|
||||||
except (Exception, Timeout):
|
except (Exception, Timeout):
|
||||||
self.logger.exception(
|
self.logger.exception(
|
||||||
_("Trying to sync suffixes with %s") % self._full_path(
|
_("Trying to sync suffixes with %s") % _full_path(
|
||||||
node, job['partition'], '', job['policy']))
|
node, job['partition'], '', job['policy']))
|
||||||
|
|
||||||
def _get_suffixes_to_sync(self, job, node):
|
def _get_suffixes_to_sync(self, job, node):
|
||||||
@ -561,11 +574,11 @@ class ObjectReconstructor(Daemon):
|
|||||||
if resp.status == HTTP_INSUFFICIENT_STORAGE:
|
if resp.status == HTTP_INSUFFICIENT_STORAGE:
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
_('%s responded as unmounted'),
|
_('%s responded as unmounted'),
|
||||||
self._full_path(node, job['partition'], '',
|
_full_path(node, job['partition'], '',
|
||||||
job['policy']))
|
job['policy']))
|
||||||
elif resp.status != HTTP_OK:
|
elif resp.status != HTTP_OK:
|
||||||
full_path = self._full_path(node, job['partition'], '',
|
full_path = _full_path(node, job['partition'], '',
|
||||||
job['policy'])
|
job['policy'])
|
||||||
self.logger.error(
|
self.logger.error(
|
||||||
_("Invalid response %(resp)s from %(full_path)s"),
|
_("Invalid response %(resp)s from %(full_path)s"),
|
||||||
{'resp': resp.status, 'full_path': full_path})
|
{'resp': resp.status, 'full_path': full_path})
|
||||||
@ -576,7 +589,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
# safely catch our exception and continue to the next node
|
# safely catch our exception and continue to the next node
|
||||||
# without logging
|
# without logging
|
||||||
self.logger.exception('Unable to get remote suffix hashes '
|
self.logger.exception('Unable to get remote suffix hashes '
|
||||||
'from %r' % self._full_path(
|
'from %r' % _full_path(
|
||||||
node, job['partition'], '',
|
node, job['partition'], '',
|
||||||
job['policy']))
|
job['policy']))
|
||||||
|
|
||||||
|
@ -3426,13 +3426,15 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
message_base = \
|
message_base = \
|
||||||
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
|
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0"
|
||||||
|
|
||||||
test_missing_header(
|
|
||||||
'X-Object-Sysmeta-Ec-Etag',
|
|
||||||
"%s %s" % (message_base, "(missing Etag)"))
|
|
||||||
test_missing_header(
|
test_missing_header(
|
||||||
'X-Object-Sysmeta-Ec-Frag-Index',
|
'X-Object-Sysmeta-Ec-Frag-Index',
|
||||||
"%s %s" % (message_base,
|
"%s %s" % (message_base,
|
||||||
"(invalid X-Object-Sysmeta-Ec-Frag-Index)"))
|
"(invalid X-Object-Sysmeta-Ec-Frag-Index: None)"))
|
||||||
|
|
||||||
|
message_base += ", frag index 0"
|
||||||
|
test_missing_header(
|
||||||
|
'X-Object-Sysmeta-Ec-Etag',
|
||||||
|
"%s %s" % (message_base, "(missing Etag)"))
|
||||||
test_missing_header(
|
test_missing_header(
|
||||||
'X-Backend-Timestamp',
|
'X-Backend-Timestamp',
|
||||||
"%s %s" % (message_base, "(missing X-Backend-Timestamp)"))
|
"%s %s" % (message_base, "(missing X-Backend-Timestamp)"))
|
||||||
@ -3491,7 +3493,8 @@ class TestObjectReconstructor(unittest.TestCase):
|
|||||||
self.assertEqual(1, len(warning_log_lines))
|
self.assertEqual(1, len(warning_log_lines))
|
||||||
expected_message = \
|
expected_message = \
|
||||||
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
|
"Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \
|
||||||
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)"
|
"policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index: %r)" % \
|
||||||
|
invalid_frag_index
|
||||||
self.assertIn(expected_message, warning_log_lines)
|
self.assertIn(expected_message, warning_log_lines)
|
||||||
|
|
||||||
for value in ('None', 'invalid'):
|
for value in ('None', 'invalid'):
|
||||||
|
Loading…
x
Reference in New Issue
Block a user