From 56349e022dbbd90fbf9edb5d3d50dd1dd90ce7e2 Mon Sep 17 00:00:00 2001 From: Alistair Coles Date: Fri, 10 Mar 2017 12:59:37 +0000 Subject: [PATCH] Include received frag_index in reconstructor log warnings The Related-Change removed the frag_index expected from a node response from the full_path included in log messages. This made sense because the *expected* frag_index is not necessarily the index that was actually received from the node. However, it would be useful to include the *actual* received frag_index in log messages. This patch also: - makes _full_path a module level function - renames unique_index to be resp_frag_index to aid understanding of the various indexes we deal with during reconstruction. Change-Id: Ic932835b3c1ed51a8456fce775fb59445fcb834b Related-Change: I8096202f5f8d91296963f7a409a29d57fa7828e4 --- swift/obj/reconstructor.py | 91 ++++++++++++++++------------- test/unit/obj/test_reconstructor.py | 13 +++-- 2 files changed, 60 insertions(+), 44 deletions(-) diff --git a/swift/obj/reconstructor.py b/swift/obj/reconstructor.py index 8601e8d1e2..d6a6ba9fad 100644 --- a/swift/obj/reconstructor.py +++ b/swift/obj/reconstructor.py @@ -68,6 +68,29 @@ def _get_partners(frag_index, part_nodes): ] +def _full_path(node, part, relative_path, policy): + """ + Combines the node properties, partition, relative-path and policy into a + single string representation. + + :param node: a dict describing node properties + :param part: partition number + :param path: path of the desired EC archive relative to partition dir + :param policy: an instance of + :class:`~swift.common.storage_policy.BaseStoragePolicy` + :return: string representation of absolute path on node plus policy index + """ + return '%(replication_ip)s:%(replication_port)s' \ + '/%(device)s/%(part)s%(path)s ' \ + 'policy#%(policy)d' % { + 'replication_ip': node['replication_ip'], + 'replication_port': node['replication_port'], + 'device': node['device'], + 'part': part, 'path': relative_path, + 'policy': policy, + } + + class RebuildingECDiskFileStream(object): """ This class wraps the reconstructed fragment archive data and @@ -192,17 +215,6 @@ class ObjectReconstructor(Daemon): return False return True - def _full_path(self, node, part, path, policy): - return '%(replication_ip)s:%(replication_port)s' \ - '/%(device)s/%(part)s%(path)s ' \ - 'policy#%(policy)d' % { - 'replication_ip': node['replication_ip'], - 'replication_port': node['replication_port'], - 'device': node['device'], - 'part': part, 'path': path, - 'policy': policy, - } - def _get_response(self, node, part, path, headers, policy): """ Helper method for reconstruction that GETs a single EC fragment @@ -210,13 +222,13 @@ class ObjectReconstructor(Daemon): :param node: the node to GET from :param part: the partition - :param path: full path of the desired EC archive + :param path: path of the desired EC archive relative to partition dir :param headers: the headers to send :param policy: an instance of :class:`~swift.common.storage_policy.BaseStoragePolicy` :returns: response """ - full_path = self._full_path(node, part, path, policy) + full_path = _full_path(node, part, path, policy) resp = None try: with ConnectionTimeout(self.conn_timeout): @@ -286,7 +298,7 @@ class ObjectReconstructor(Daemon): resp.headers = HeaderKeyDict(resp.getheaders()) frag_index = resp.headers.get('X-Object-Sysmeta-Ec-Frag-Index') try: - unique_index = int(frag_index) + resp_frag_index = int(frag_index) except (TypeError, ValueError): # The successful response should include valid X-Object- # Sysmeta-Ec-Frag-Index but for safety, catching the case @@ -294,11 +306,11 @@ class ObjectReconstructor(Daemon): # frag index to reconstruct and dump warning log for that self.logger.warning( 'Invalid resp from %s ' - '(invalid X-Object-Sysmeta-Ec-Frag-Index)', - resp.full_path) + '(invalid X-Object-Sysmeta-Ec-Frag-Index: %r)', + resp.full_path, frag_index) continue - if fi_to_rebuild == unique_index: + if fi_to_rebuild == resp_frag_index: # TODO: With duplicated EC frags it's not unreasonable to find # the very fragment we're trying to rebuild exists on another # primary node. In this case we should stream it directly from @@ -307,36 +319,37 @@ class ObjectReconstructor(Daemon): self.logger.debug( 'Found existing frag #%s at %s while rebuilding to %s', fi_to_rebuild, resp.full_path, - self._full_path( + _full_path( node, job['partition'], datafile_metadata['name'], job['policy'])) continue timestamp = resp.headers.get('X-Backend-Timestamp') if not timestamp: - self.logger.warning( - 'Invalid resp from %s (missing X-Backend-Timestamp)', - resp.full_path) + self.logger.warning('Invalid resp from %s, frag index %s ' + '(missing X-Backend-Timestamp)', + resp.full_path, resp_frag_index) continue timestamp = Timestamp(timestamp) etag = resp.headers.get('X-Object-Sysmeta-Ec-Etag') if not etag: - self.logger.warning('Invalid resp from %s (missing Etag)', - resp.full_path) + self.logger.warning('Invalid resp from %s, frag index %s ' + '(missing Etag)', + resp.full_path, resp_frag_index) continue if etag != etag_buckets.setdefault(timestamp, etag): self.logger.error( 'Mixed Etag (%s, %s) for %s frag#%s', etag, etag_buckets[timestamp], - self._full_path(node, job['partition'], - datafile_metadata['name'], job['policy']), + _full_path(node, job['partition'], + datafile_metadata['name'], job['policy']), fi_to_rebuild) continue - if unique_index not in buckets[timestamp]: - buckets[timestamp][unique_index] = resp + if resp_frag_index not in buckets[timestamp]: + buckets[timestamp][resp_frag_index] = resp if len(buckets[timestamp]) >= job['policy'].ec_ndata: responses = buckets[timestamp].values() self.logger.debug( @@ -350,9 +363,9 @@ class ObjectReconstructor(Daemon): 'Unable to get enough responses (%s/%s) ' 'to reconstruct %s frag#%s with ETag %s' % ( len(resp), job['policy'].ec_ndata, - self._full_path(node, job['partition'], - datafile_metadata['name'], - job['policy']), + _full_path(node, job['partition'], + datafile_metadata['name'], + job['policy']), fi_to_rebuild, etag)) if error_resp_count: @@ -360,9 +373,9 @@ class ObjectReconstructor(Daemon): 'Unable to get enough responses (%s error responses) ' 'to reconstruct %s frag#%s' % ( error_resp_count, - self._full_path(node, job['partition'], - datafile_metadata['name'], - job['policy']), + _full_path(node, job['partition'], + datafile_metadata['name'], + job['policy']), fi_to_rebuild)) raise DiskFileError('Unable to reconstruct EC archive') @@ -535,7 +548,7 @@ class ObjectReconstructor(Daemon): conn.getresponse().read() except (Exception, Timeout): self.logger.exception( - _("Trying to sync suffixes with %s") % self._full_path( + _("Trying to sync suffixes with %s") % _full_path( node, job['partition'], '', job['policy'])) def _get_suffixes_to_sync(self, job, node): @@ -561,11 +574,11 @@ class ObjectReconstructor(Daemon): if resp.status == HTTP_INSUFFICIENT_STORAGE: self.logger.error( _('%s responded as unmounted'), - self._full_path(node, job['partition'], '', - job['policy'])) + _full_path(node, job['partition'], '', + job['policy'])) elif resp.status != HTTP_OK: - full_path = self._full_path(node, job['partition'], '', - job['policy']) + full_path = _full_path(node, job['partition'], '', + job['policy']) self.logger.error( _("Invalid response %(resp)s from %(full_path)s"), {'resp': resp.status, 'full_path': full_path}) @@ -576,7 +589,7 @@ class ObjectReconstructor(Daemon): # safely catch our exception and continue to the next node # without logging self.logger.exception('Unable to get remote suffix hashes ' - 'from %r' % self._full_path( + 'from %r' % _full_path( node, job['partition'], '', job['policy'])) diff --git a/test/unit/obj/test_reconstructor.py b/test/unit/obj/test_reconstructor.py index 78d1246597..d2814b52fb 100644 --- a/test/unit/obj/test_reconstructor.py +++ b/test/unit/obj/test_reconstructor.py @@ -3426,13 +3426,15 @@ class TestObjectReconstructor(unittest.TestCase): message_base = \ "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o policy#0" - test_missing_header( - 'X-Object-Sysmeta-Ec-Etag', - "%s %s" % (message_base, "(missing Etag)")) test_missing_header( 'X-Object-Sysmeta-Ec-Frag-Index', "%s %s" % (message_base, - "(invalid X-Object-Sysmeta-Ec-Frag-Index)")) + "(invalid X-Object-Sysmeta-Ec-Frag-Index: None)")) + + message_base += ", frag index 0" + test_missing_header( + 'X-Object-Sysmeta-Ec-Etag', + "%s %s" % (message_base, "(missing Etag)")) test_missing_header( 'X-Backend-Timestamp', "%s %s" % (message_base, "(missing X-Backend-Timestamp)")) @@ -3491,7 +3493,8 @@ class TestObjectReconstructor(unittest.TestCase): self.assertEqual(1, len(warning_log_lines)) expected_message = \ "Invalid resp from 10.0.0.0:1000/sda/0/a/c/o " \ - "policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index)" + "policy#0 (invalid X-Object-Sysmeta-Ec-Frag-Index: %r)" % \ + invalid_frag_index self.assertIn(expected_message, warning_log_lines) for value in ('None', 'invalid'):