Fix inconsistent suffix hashes after ssync of tombstone
Consider two replicas of the same object whose ondisk files have diverged due to failures: A has t2.ts B has t1.data, t4.meta (The DELETE at t2 did not make it to B. The POST at t4 was rejected by A.) After ssync replication the two ondisk file sets will not be consistent: A has t2.ts (ssync cannot POST t4.meta to this node) B has t2.ts, t4.meta (ssync should not delete t4.meta, there may be a t3.data somewhere) Consequenty the two nodes will report different hashes for the object's suffix, and replication will repeat, always with the inconsistent outcome. This scenario is reproduced by the probe test added in this patch. (Note that rsync replication does result in (t2.ts, t4.meta) on both nodes.) The solution is to change the way that suffix hashes are calculated. Currently the names of *all* files found in each object dir are added to the hash. With this patch the timestamps of only those files that could be used to construct a valid diskfile are added to the hash. File extensions are appended to the timestamp so that in most 'normal' situations the result of the hashing is the same as before this patch. That avoids a storm of hash mismatches when this patch is deployed in an existing cluster. In the problem case described above, t4.meta is no longer added to the hash, since it is not useful for constructing a diskfile. (Note that t4.meta is not deleted because it may become useful should a t3.data be replicated in future). Closes-Bug: 1534276 Change-Id: I99e88b8d5f5d9bc22b42112a99634ba942415e05
This commit is contained in:
parent
ebe61381c2
commit
2d55960a22
@ -25,6 +25,7 @@ from time import time
|
||||
|
||||
from eventlet import sleep, Timeout
|
||||
import six
|
||||
import six.moves.cPickle as pickle
|
||||
from six.moves.http_client import HTTPException
|
||||
|
||||
from swift.common.bufferedhttp import http_connect
|
||||
@ -496,6 +497,34 @@ def direct_delete_object(node, part, account, container, obj,
|
||||
node, part, path, resp)
|
||||
|
||||
|
||||
def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
|
||||
response_timeout=15, headers=None):
|
||||
"""
|
||||
Get suffix hashes directly from the object server.
|
||||
|
||||
:param node: node dictionary from the ring
|
||||
:param part: partition the container is on
|
||||
:param conn_timeout: timeout in seconds for establishing the connection
|
||||
:param response_timeout: timeout in seconds for getting the response
|
||||
:param headers: dict to be passed into HTTPConnection headers
|
||||
:returns: dict of suffix hashes
|
||||
:raises ClientException: HTTP REPLICATE request failed
|
||||
"""
|
||||
if headers is None:
|
||||
headers = {}
|
||||
|
||||
path = '/%s' % '-'.join(suffixes)
|
||||
with Timeout(conn_timeout):
|
||||
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||
'REPLICATE', path, headers=gen_headers(headers))
|
||||
with Timeout(response_timeout):
|
||||
resp = conn.getresponse()
|
||||
if not is_success(resp.status):
|
||||
raise DirectClientException('Object', 'REPLICATE',
|
||||
node, part, path, resp)
|
||||
return pickle.loads(resp.read())
|
||||
|
||||
|
||||
def retry(func, *args, **kwargs):
|
||||
"""
|
||||
Helper function to retry a given function a number of times.
|
||||
|
@ -678,7 +678,18 @@ class BaseDiskFileManager(object):
|
||||
return self.cleanup_ondisk_files(
|
||||
hsh_path, reclaim_age=reclaim_age)['files']
|
||||
|
||||
def _hash_suffix_dir(self, path, mapper, reclaim_age):
|
||||
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||
"""
|
||||
Applies policy specific updates to the given dict of md5 hashes for
|
||||
the given ondisk_info.
|
||||
|
||||
:param hashes: a dict of md5 hashes to be updated
|
||||
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||
returned by get_ondisk_files
|
||||
"""
|
||||
raise NotImplementedError
|
||||
|
||||
def _hash_suffix_dir(self, path, reclaim_age):
|
||||
"""
|
||||
|
||||
:param path: full path to directory
|
||||
@ -694,7 +705,7 @@ class BaseDiskFileManager(object):
|
||||
for hsh in path_contents:
|
||||
hsh_path = join(path, hsh)
|
||||
try:
|
||||
files = self.hash_cleanup_listdir(hsh_path, reclaim_age)
|
||||
ondisk_info = self.cleanup_ondisk_files(hsh_path, reclaim_age)
|
||||
except OSError as err:
|
||||
if err.errno == errno.ENOTDIR:
|
||||
partition_path = dirname(path)
|
||||
@ -707,14 +718,30 @@ class BaseDiskFileManager(object):
|
||||
'quar_path': quar_path})
|
||||
continue
|
||||
raise
|
||||
if not files:
|
||||
if not ondisk_info['files']:
|
||||
try:
|
||||
os.rmdir(hsh_path)
|
||||
except OSError:
|
||||
pass
|
||||
for filename in files:
|
||||
key, value = mapper(filename)
|
||||
hashes[key].update(value)
|
||||
continue
|
||||
|
||||
# ondisk_info has info dicts containing timestamps for those
|
||||
# files that could determine the state of the diskfile if it were
|
||||
# to be opened. We update the suffix hash with the concatenation of
|
||||
# each file's timestamp and extension. The extension is added to
|
||||
# guarantee distinct hash values from two object dirs that have
|
||||
# different file types at the same timestamp(s).
|
||||
#
|
||||
# Files that may be in the object dir but would have no effect on
|
||||
# the state of the diskfile are not used to update the hash.
|
||||
for key in (k for k in ('meta_info', 'ts_info')
|
||||
if k in ondisk_info):
|
||||
info = ondisk_info[key]
|
||||
hashes[None].update(info['timestamp'].internal + info['ext'])
|
||||
|
||||
# delegate to subclass for data file related updates...
|
||||
self._update_suffix_hashes(hashes, ondisk_info)
|
||||
|
||||
try:
|
||||
os.rmdir(path)
|
||||
except OSError as e:
|
||||
@ -2195,6 +2222,20 @@ class DiskFileManager(BaseDiskFileManager):
|
||||
# set results
|
||||
results['data_info'] = exts['.data'][0]
|
||||
|
||||
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||
"""
|
||||
Applies policy specific updates to the given dict of md5 hashes for
|
||||
the given ondisk_info.
|
||||
|
||||
:param hashes: a dict of md5 hashes to be updated
|
||||
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||
returned by get_ondisk_files
|
||||
"""
|
||||
if 'data_info' in ondisk_info:
|
||||
file_info = ondisk_info['data_info']
|
||||
hashes[None].update(
|
||||
file_info['timestamp'].internal + file_info['ext'])
|
||||
|
||||
def _hash_suffix(self, path, reclaim_age):
|
||||
"""
|
||||
Performs reclamation and returns an md5 of all (remaining) files.
|
||||
@ -2203,9 +2244,9 @@ class DiskFileManager(BaseDiskFileManager):
|
||||
:param reclaim_age: age in seconds at which to remove tombstones
|
||||
:raises PathNotDir: if given path is not a valid directory
|
||||
:raises OSError: for non-ENOTDIR errors
|
||||
:returns: md5 of files in suffix
|
||||
"""
|
||||
mapper = lambda filename: (None, filename)
|
||||
hashes = self._hash_suffix_dir(path, mapper, reclaim_age)
|
||||
hashes = self._hash_suffix_dir(path, reclaim_age)
|
||||
return hashes[None].hexdigest()
|
||||
|
||||
|
||||
@ -2544,28 +2585,41 @@ class ECDiskFileManager(BaseDiskFileManager):
|
||||
return have_data_file == have_durable
|
||||
return False
|
||||
|
||||
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||
"""
|
||||
Applies policy specific updates to the given dict of md5 hashes for
|
||||
the given ondisk_info.
|
||||
|
||||
The only difference between this method and the replication policy
|
||||
function is the way that data files update hashes dict. Instead of all
|
||||
filenames hashed into a single hasher, each data file name will fall
|
||||
into a bucket keyed by its fragment index.
|
||||
|
||||
:param hashes: a dict of md5 hashes to be updated
|
||||
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||
returned by get_ondisk_files
|
||||
"""
|
||||
for frag_set in ondisk_info['frag_sets'].values():
|
||||
for file_info in frag_set:
|
||||
fi = file_info['frag_index']
|
||||
hashes[fi].update(file_info['timestamp'].internal)
|
||||
if 'durable_frag_set' in ondisk_info:
|
||||
file_info = ondisk_info['durable_frag_set'][0]
|
||||
hashes[None].update(file_info['timestamp'].internal + '.durable')
|
||||
|
||||
def _hash_suffix(self, path, reclaim_age):
|
||||
"""
|
||||
The only difference between this method and the replication policy
|
||||
function is the way that files are updated on the returned hash.
|
||||
|
||||
Instead of all filenames hashed into a single hasher, each file name
|
||||
will fall into a bucket either by fragment index for datafiles, or
|
||||
None (indicating a durable, metadata or tombstone).
|
||||
Performs reclamation and returns an md5 of all (remaining) files.
|
||||
|
||||
:param path: full path to directory
|
||||
:param reclaim_age: age in seconds at which to remove tombstones
|
||||
:raises PathNotDir: if given path is not a valid directory
|
||||
:raises OSError: for non-ENOTDIR errors
|
||||
:returns: dict of md5 hex digests
|
||||
"""
|
||||
# hash_per_fi instead of single hash for whole suffix
|
||||
# here we flatten out the hashers hexdigest into a dictionary instead
|
||||
# of just returning the one hexdigest for the whole suffix
|
||||
def mapper(filename):
|
||||
info = self.parse_on_disk_filename(filename)
|
||||
fi = info['frag_index']
|
||||
if fi is None:
|
||||
return None, filename
|
||||
else:
|
||||
return fi, info['timestamp'].internal
|
||||
|
||||
hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age)
|
||||
hash_per_fi = self._hash_suffix_dir(path, reclaim_age)
|
||||
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
|
||||
|
@ -21,12 +21,15 @@ import unittest
|
||||
import os
|
||||
import shutil
|
||||
import uuid
|
||||
from swift.common.exceptions import DiskFileDeleted
|
||||
|
||||
from swift.common.direct_client import direct_get_suffix_hashes
|
||||
from swift.common.exceptions import DiskFileDeleted
|
||||
from swift.common.internal_client import UnexpectedResponse
|
||||
from swift.container.backend import ContainerBroker
|
||||
from swift.common import internal_client, utils
|
||||
from swiftclient import client
|
||||
from swift.common.ring import Ring
|
||||
from swift.common.utils import Timestamp, get_logger
|
||||
from swift.common.utils import Timestamp, get_logger, hash_path
|
||||
from swift.obj.diskfile import DiskFileManager
|
||||
from swift.common.storage_policy import POLICIES
|
||||
|
||||
@ -186,6 +189,20 @@ class Test(ReplProbeTest):
|
||||
self.container_name,
|
||||
self.object_name)
|
||||
|
||||
def _assert_consistent_suffix_hashes(self):
|
||||
opart, onodes = self.object_ring.get_nodes(
|
||||
self.account, self.container_name, self.object_name)
|
||||
name_hash = hash_path(
|
||||
self.account, self.container_name, self.object_name)
|
||||
results = []
|
||||
for node in onodes:
|
||||
results.append(
|
||||
(node,
|
||||
direct_get_suffix_hashes(node, opart, [name_hash[-3:]])))
|
||||
for (node, hashes) in results[1:]:
|
||||
self.assertEqual(results[0][1], hashes,
|
||||
'Inconsistent suffix hashes found: %s' % results)
|
||||
|
||||
def test_object_delete_is_replicated(self):
|
||||
self.brain.put_container(policy_index=int(self.policy))
|
||||
# put object
|
||||
@ -419,6 +436,51 @@ class Test(ReplProbeTest):
|
||||
self._assert_consistent_object_metadata()
|
||||
self._assert_consistent_container_dbs()
|
||||
|
||||
def test_post_trumped_by_prior_delete(self):
|
||||
# new metadata and content-type posted to subset of nodes should not
|
||||
# cause object to persist after replication of an earlier delete on
|
||||
# other nodes.
|
||||
self.brain.put_container(policy_index=0)
|
||||
# incomplete put
|
||||
self.brain.stop_primary_half()
|
||||
self._put_object(headers={'Content-Type': 'oldest',
|
||||
'X-Object-Sysmeta-Test': 'oldest',
|
||||
'X-Object-Meta-Test': 'oldest'})
|
||||
self.brain.start_primary_half()
|
||||
|
||||
# incomplete put then delete
|
||||
self.brain.stop_handoff_half()
|
||||
self._put_object(headers={'Content-Type': 'oldest',
|
||||
'X-Object-Sysmeta-Test': 'oldest',
|
||||
'X-Object-Meta-Test': 'oldest'})
|
||||
self._delete_object()
|
||||
self.brain.start_handoff_half()
|
||||
|
||||
# handoff post
|
||||
self.brain.stop_primary_half()
|
||||
self._post_object(headers={'Content-Type': 'newest',
|
||||
'X-Object-Sysmeta-Test': 'ignored',
|
||||
'X-Object-Meta-Test': 'newest'})
|
||||
|
||||
# check object metadata
|
||||
metadata = self._get_object_metadata()
|
||||
self.assertEqual(metadata['x-object-sysmeta-test'], 'oldest')
|
||||
self.assertEqual(metadata['x-object-meta-test'], 'newest')
|
||||
self.assertEqual(metadata['content-type'], 'oldest')
|
||||
|
||||
self.brain.start_primary_half()
|
||||
|
||||
# delete trumps later post
|
||||
self.get_to_final_state()
|
||||
|
||||
# check object is now deleted
|
||||
self.assertRaises(UnexpectedResponse, self._get_object_metadata)
|
||||
container_metadata, objs = client.get_container(self.url, self.token,
|
||||
self.container_name)
|
||||
self.assertEqual(0, len(objs))
|
||||
self._assert_consistent_container_dbs()
|
||||
self._assert_consistent_deleted_object()
|
||||
self._assert_consistent_suffix_hashes()
|
||||
|
||||
if __name__ == "__main__":
|
||||
unittest.main()
|
||||
|
@ -4568,6 +4568,37 @@ class TestSuffixHashes(unittest.TestCase):
|
||||
}[policy.policy_type]
|
||||
self.assertEqual(hashes, expected)
|
||||
|
||||
def test_hash_suffix_one_tombstone_and_one_meta(self):
|
||||
# A tombstone plus a newer meta file can happen if a tombstone is
|
||||
# replicated to a node with a newer meta file but older data file. The
|
||||
# meta file will be ignored when the diskfile is opened so the
|
||||
# effective state of the disk files is equivalent to only having the
|
||||
# tombstone. Replication cannot remove the meta file, and the meta file
|
||||
# cannot be ssync replicated to a node with only the tombstone, so
|
||||
# we want the get_hashes result to be the same as if the meta file was
|
||||
# not there.
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
df = df_mgr.get_diskfile(
|
||||
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||
# write a tombstone
|
||||
timestamp = self.ts()
|
||||
df.delete(timestamp)
|
||||
# write a meta file
|
||||
df.write_metadata({'X-Timestamp': self.ts().internal})
|
||||
# sanity check
|
||||
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||
tombstone_hash = md5(timestamp.internal + '.ts').hexdigest()
|
||||
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||
expected = {
|
||||
REPL_POLICY: {suffix: tombstone_hash},
|
||||
EC_POLICY: {suffix: {
|
||||
# fi is None here because we have a tombstone
|
||||
None: tombstone_hash}},
|
||||
}[policy.policy_type]
|
||||
self.assertEqual(hashes, expected)
|
||||
|
||||
def test_hash_suffix_one_reclaim_tombstone(self):
|
||||
for policy in self.iter_policies():
|
||||
df_mgr = self.df_router[policy]
|
||||
|
Loading…
x
Reference in New Issue
Block a user