Merge "Fix inconsistent suffix hashes after ssync of tombstone"
This commit is contained in:
commit
61928443ed
@ -25,6 +25,7 @@ from time import time
|
|||||||
|
|
||||||
from eventlet import sleep, Timeout
|
from eventlet import sleep, Timeout
|
||||||
import six
|
import six
|
||||||
|
import six.moves.cPickle as pickle
|
||||||
from six.moves.http_client import HTTPException
|
from six.moves.http_client import HTTPException
|
||||||
|
|
||||||
from swift.common.bufferedhttp import http_connect
|
from swift.common.bufferedhttp import http_connect
|
||||||
@ -463,6 +464,34 @@ def direct_delete_object(node, part, account, container, obj,
|
|||||||
'Object', conn_timeout, response_timeout)
|
'Object', conn_timeout, response_timeout)
|
||||||
|
|
||||||
|
|
||||||
|
def direct_get_suffix_hashes(node, part, suffixes, conn_timeout=5,
|
||||||
|
response_timeout=15, headers=None):
|
||||||
|
"""
|
||||||
|
Get suffix hashes directly from the object server.
|
||||||
|
|
||||||
|
:param node: node dictionary from the ring
|
||||||
|
:param part: partition the container is on
|
||||||
|
:param conn_timeout: timeout in seconds for establishing the connection
|
||||||
|
:param response_timeout: timeout in seconds for getting the response
|
||||||
|
:param headers: dict to be passed into HTTPConnection headers
|
||||||
|
:returns: dict of suffix hashes
|
||||||
|
:raises ClientException: HTTP REPLICATE request failed
|
||||||
|
"""
|
||||||
|
if headers is None:
|
||||||
|
headers = {}
|
||||||
|
|
||||||
|
path = '/%s' % '-'.join(suffixes)
|
||||||
|
with Timeout(conn_timeout):
|
||||||
|
conn = http_connect(node['ip'], node['port'], node['device'], part,
|
||||||
|
'REPLICATE', path, headers=gen_headers(headers))
|
||||||
|
with Timeout(response_timeout):
|
||||||
|
resp = conn.getresponse()
|
||||||
|
if not is_success(resp.status):
|
||||||
|
raise DirectClientException('Object', 'REPLICATE',
|
||||||
|
node, part, path, resp)
|
||||||
|
return pickle.loads(resp.read())
|
||||||
|
|
||||||
|
|
||||||
def retry(func, *args, **kwargs):
|
def retry(func, *args, **kwargs):
|
||||||
"""
|
"""
|
||||||
Helper function to retry a given function a number of times.
|
Helper function to retry a given function a number of times.
|
||||||
|
@ -678,7 +678,18 @@ class BaseDiskFileManager(object):
|
|||||||
return self.cleanup_ondisk_files(
|
return self.cleanup_ondisk_files(
|
||||||
hsh_path, reclaim_age=reclaim_age)['files']
|
hsh_path, reclaim_age=reclaim_age)['files']
|
||||||
|
|
||||||
def _hash_suffix_dir(self, path, mapper, reclaim_age):
|
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||||
|
"""
|
||||||
|
Applies policy specific updates to the given dict of md5 hashes for
|
||||||
|
the given ondisk_info.
|
||||||
|
|
||||||
|
:param hashes: a dict of md5 hashes to be updated
|
||||||
|
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||||
|
returned by get_ondisk_files
|
||||||
|
"""
|
||||||
|
raise NotImplementedError
|
||||||
|
|
||||||
|
def _hash_suffix_dir(self, path, reclaim_age):
|
||||||
"""
|
"""
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
@ -694,7 +705,7 @@ class BaseDiskFileManager(object):
|
|||||||
for hsh in path_contents:
|
for hsh in path_contents:
|
||||||
hsh_path = join(path, hsh)
|
hsh_path = join(path, hsh)
|
||||||
try:
|
try:
|
||||||
files = self.hash_cleanup_listdir(hsh_path, reclaim_age)
|
ondisk_info = self.cleanup_ondisk_files(hsh_path, reclaim_age)
|
||||||
except OSError as err:
|
except OSError as err:
|
||||||
if err.errno == errno.ENOTDIR:
|
if err.errno == errno.ENOTDIR:
|
||||||
partition_path = dirname(path)
|
partition_path = dirname(path)
|
||||||
@ -707,14 +718,30 @@ class BaseDiskFileManager(object):
|
|||||||
'quar_path': quar_path})
|
'quar_path': quar_path})
|
||||||
continue
|
continue
|
||||||
raise
|
raise
|
||||||
if not files:
|
if not ondisk_info['files']:
|
||||||
try:
|
try:
|
||||||
os.rmdir(hsh_path)
|
os.rmdir(hsh_path)
|
||||||
except OSError:
|
except OSError:
|
||||||
pass
|
pass
|
||||||
for filename in files:
|
continue
|
||||||
key, value = mapper(filename)
|
|
||||||
hashes[key].update(value)
|
# ondisk_info has info dicts containing timestamps for those
|
||||||
|
# files that could determine the state of the diskfile if it were
|
||||||
|
# to be opened. We update the suffix hash with the concatenation of
|
||||||
|
# each file's timestamp and extension. The extension is added to
|
||||||
|
# guarantee distinct hash values from two object dirs that have
|
||||||
|
# different file types at the same timestamp(s).
|
||||||
|
#
|
||||||
|
# Files that may be in the object dir but would have no effect on
|
||||||
|
# the state of the diskfile are not used to update the hash.
|
||||||
|
for key in (k for k in ('meta_info', 'ts_info')
|
||||||
|
if k in ondisk_info):
|
||||||
|
info = ondisk_info[key]
|
||||||
|
hashes[None].update(info['timestamp'].internal + info['ext'])
|
||||||
|
|
||||||
|
# delegate to subclass for data file related updates...
|
||||||
|
self._update_suffix_hashes(hashes, ondisk_info)
|
||||||
|
|
||||||
try:
|
try:
|
||||||
os.rmdir(path)
|
os.rmdir(path)
|
||||||
except OSError as e:
|
except OSError as e:
|
||||||
@ -2195,6 +2222,20 @@ class DiskFileManager(BaseDiskFileManager):
|
|||||||
# set results
|
# set results
|
||||||
results['data_info'] = exts['.data'][0]
|
results['data_info'] = exts['.data'][0]
|
||||||
|
|
||||||
|
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||||
|
"""
|
||||||
|
Applies policy specific updates to the given dict of md5 hashes for
|
||||||
|
the given ondisk_info.
|
||||||
|
|
||||||
|
:param hashes: a dict of md5 hashes to be updated
|
||||||
|
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||||
|
returned by get_ondisk_files
|
||||||
|
"""
|
||||||
|
if 'data_info' in ondisk_info:
|
||||||
|
file_info = ondisk_info['data_info']
|
||||||
|
hashes[None].update(
|
||||||
|
file_info['timestamp'].internal + file_info['ext'])
|
||||||
|
|
||||||
def _hash_suffix(self, path, reclaim_age):
|
def _hash_suffix(self, path, reclaim_age):
|
||||||
"""
|
"""
|
||||||
Performs reclamation and returns an md5 of all (remaining) files.
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
@ -2203,9 +2244,9 @@ class DiskFileManager(BaseDiskFileManager):
|
|||||||
:param reclaim_age: age in seconds at which to remove tombstones
|
:param reclaim_age: age in seconds at which to remove tombstones
|
||||||
:raises PathNotDir: if given path is not a valid directory
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
:raises OSError: for non-ENOTDIR errors
|
:raises OSError: for non-ENOTDIR errors
|
||||||
|
:returns: md5 of files in suffix
|
||||||
"""
|
"""
|
||||||
mapper = lambda filename: (None, filename)
|
hashes = self._hash_suffix_dir(path, reclaim_age)
|
||||||
hashes = self._hash_suffix_dir(path, mapper, reclaim_age)
|
|
||||||
return hashes[None].hexdigest()
|
return hashes[None].hexdigest()
|
||||||
|
|
||||||
|
|
||||||
@ -2544,28 +2585,41 @@ class ECDiskFileManager(BaseDiskFileManager):
|
|||||||
return have_data_file == have_durable
|
return have_data_file == have_durable
|
||||||
return False
|
return False
|
||||||
|
|
||||||
|
def _update_suffix_hashes(self, hashes, ondisk_info):
|
||||||
|
"""
|
||||||
|
Applies policy specific updates to the given dict of md5 hashes for
|
||||||
|
the given ondisk_info.
|
||||||
|
|
||||||
|
The only difference between this method and the replication policy
|
||||||
|
function is the way that data files update hashes dict. Instead of all
|
||||||
|
filenames hashed into a single hasher, each data file name will fall
|
||||||
|
into a bucket keyed by its fragment index.
|
||||||
|
|
||||||
|
:param hashes: a dict of md5 hashes to be updated
|
||||||
|
:param ondisk_info: a dict describing the state of ondisk files, as
|
||||||
|
returned by get_ondisk_files
|
||||||
|
"""
|
||||||
|
for frag_set in ondisk_info['frag_sets'].values():
|
||||||
|
for file_info in frag_set:
|
||||||
|
fi = file_info['frag_index']
|
||||||
|
hashes[fi].update(file_info['timestamp'].internal)
|
||||||
|
if 'durable_frag_set' in ondisk_info:
|
||||||
|
file_info = ondisk_info['durable_frag_set'][0]
|
||||||
|
hashes[None].update(file_info['timestamp'].internal + '.durable')
|
||||||
|
|
||||||
def _hash_suffix(self, path, reclaim_age):
|
def _hash_suffix(self, path, reclaim_age):
|
||||||
"""
|
"""
|
||||||
The only difference between this method and the replication policy
|
Performs reclamation and returns an md5 of all (remaining) files.
|
||||||
function is the way that files are updated on the returned hash.
|
|
||||||
|
|
||||||
Instead of all filenames hashed into a single hasher, each file name
|
|
||||||
will fall into a bucket either by fragment index for datafiles, or
|
|
||||||
None (indicating a durable, metadata or tombstone).
|
|
||||||
|
|
||||||
:param path: full path to directory
|
:param path: full path to directory
|
||||||
:param reclaim_age: age in seconds at which to remove tombstones
|
:param reclaim_age: age in seconds at which to remove tombstones
|
||||||
|
:raises PathNotDir: if given path is not a valid directory
|
||||||
|
:raises OSError: for non-ENOTDIR errors
|
||||||
|
:returns: dict of md5 hex digests
|
||||||
"""
|
"""
|
||||||
# hash_per_fi instead of single hash for whole suffix
|
# hash_per_fi instead of single hash for whole suffix
|
||||||
# here we flatten out the hashers hexdigest into a dictionary instead
|
# here we flatten out the hashers hexdigest into a dictionary instead
|
||||||
# of just returning the one hexdigest for the whole suffix
|
# of just returning the one hexdigest for the whole suffix
|
||||||
def mapper(filename):
|
|
||||||
info = self.parse_on_disk_filename(filename)
|
|
||||||
fi = info['frag_index']
|
|
||||||
if fi is None:
|
|
||||||
return None, filename
|
|
||||||
else:
|
|
||||||
return fi, info['timestamp'].internal
|
|
||||||
|
|
||||||
hash_per_fi = self._hash_suffix_dir(path, mapper, reclaim_age)
|
hash_per_fi = self._hash_suffix_dir(path, reclaim_age)
|
||||||
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
|
return dict((fi, md5.hexdigest()) for fi, md5 in hash_per_fi.items())
|
||||||
|
@ -21,12 +21,15 @@ import unittest
|
|||||||
import os
|
import os
|
||||||
import shutil
|
import shutil
|
||||||
import uuid
|
import uuid
|
||||||
from swift.common.exceptions import DiskFileDeleted
|
|
||||||
|
|
||||||
|
from swift.common.direct_client import direct_get_suffix_hashes
|
||||||
|
from swift.common.exceptions import DiskFileDeleted
|
||||||
|
from swift.common.internal_client import UnexpectedResponse
|
||||||
from swift.container.backend import ContainerBroker
|
from swift.container.backend import ContainerBroker
|
||||||
from swift.common import internal_client, utils
|
from swift.common import internal_client, utils
|
||||||
|
from swiftclient import client
|
||||||
from swift.common.ring import Ring
|
from swift.common.ring import Ring
|
||||||
from swift.common.utils import Timestamp, get_logger
|
from swift.common.utils import Timestamp, get_logger, hash_path
|
||||||
from swift.obj.diskfile import DiskFileManager
|
from swift.obj.diskfile import DiskFileManager
|
||||||
from swift.common.storage_policy import POLICIES
|
from swift.common.storage_policy import POLICIES
|
||||||
|
|
||||||
@ -186,6 +189,20 @@ class Test(ReplProbeTest):
|
|||||||
self.container_name,
|
self.container_name,
|
||||||
self.object_name)
|
self.object_name)
|
||||||
|
|
||||||
|
def _assert_consistent_suffix_hashes(self):
|
||||||
|
opart, onodes = self.object_ring.get_nodes(
|
||||||
|
self.account, self.container_name, self.object_name)
|
||||||
|
name_hash = hash_path(
|
||||||
|
self.account, self.container_name, self.object_name)
|
||||||
|
results = []
|
||||||
|
for node in onodes:
|
||||||
|
results.append(
|
||||||
|
(node,
|
||||||
|
direct_get_suffix_hashes(node, opart, [name_hash[-3:]])))
|
||||||
|
for (node, hashes) in results[1:]:
|
||||||
|
self.assertEqual(results[0][1], hashes,
|
||||||
|
'Inconsistent suffix hashes found: %s' % results)
|
||||||
|
|
||||||
def test_object_delete_is_replicated(self):
|
def test_object_delete_is_replicated(self):
|
||||||
self.brain.put_container(policy_index=int(self.policy))
|
self.brain.put_container(policy_index=int(self.policy))
|
||||||
# put object
|
# put object
|
||||||
@ -419,6 +436,51 @@ class Test(ReplProbeTest):
|
|||||||
self._assert_consistent_object_metadata()
|
self._assert_consistent_object_metadata()
|
||||||
self._assert_consistent_container_dbs()
|
self._assert_consistent_container_dbs()
|
||||||
|
|
||||||
|
def test_post_trumped_by_prior_delete(self):
|
||||||
|
# new metadata and content-type posted to subset of nodes should not
|
||||||
|
# cause object to persist after replication of an earlier delete on
|
||||||
|
# other nodes.
|
||||||
|
self.brain.put_container(policy_index=0)
|
||||||
|
# incomplete put
|
||||||
|
self.brain.stop_primary_half()
|
||||||
|
self._put_object(headers={'Content-Type': 'oldest',
|
||||||
|
'X-Object-Sysmeta-Test': 'oldest',
|
||||||
|
'X-Object-Meta-Test': 'oldest'})
|
||||||
|
self.brain.start_primary_half()
|
||||||
|
|
||||||
|
# incomplete put then delete
|
||||||
|
self.brain.stop_handoff_half()
|
||||||
|
self._put_object(headers={'Content-Type': 'oldest',
|
||||||
|
'X-Object-Sysmeta-Test': 'oldest',
|
||||||
|
'X-Object-Meta-Test': 'oldest'})
|
||||||
|
self._delete_object()
|
||||||
|
self.brain.start_handoff_half()
|
||||||
|
|
||||||
|
# handoff post
|
||||||
|
self.brain.stop_primary_half()
|
||||||
|
self._post_object(headers={'Content-Type': 'newest',
|
||||||
|
'X-Object-Sysmeta-Test': 'ignored',
|
||||||
|
'X-Object-Meta-Test': 'newest'})
|
||||||
|
|
||||||
|
# check object metadata
|
||||||
|
metadata = self._get_object_metadata()
|
||||||
|
self.assertEqual(metadata['x-object-sysmeta-test'], 'oldest')
|
||||||
|
self.assertEqual(metadata['x-object-meta-test'], 'newest')
|
||||||
|
self.assertEqual(metadata['content-type'], 'oldest')
|
||||||
|
|
||||||
|
self.brain.start_primary_half()
|
||||||
|
|
||||||
|
# delete trumps later post
|
||||||
|
self.get_to_final_state()
|
||||||
|
|
||||||
|
# check object is now deleted
|
||||||
|
self.assertRaises(UnexpectedResponse, self._get_object_metadata)
|
||||||
|
container_metadata, objs = client.get_container(self.url, self.token,
|
||||||
|
self.container_name)
|
||||||
|
self.assertEqual(0, len(objs))
|
||||||
|
self._assert_consistent_container_dbs()
|
||||||
|
self._assert_consistent_deleted_object()
|
||||||
|
self._assert_consistent_suffix_hashes()
|
||||||
|
|
||||||
if __name__ == "__main__":
|
if __name__ == "__main__":
|
||||||
unittest.main()
|
unittest.main()
|
||||||
|
@ -4568,6 +4568,37 @@ class TestSuffixHashes(unittest.TestCase):
|
|||||||
}[policy.policy_type]
|
}[policy.policy_type]
|
||||||
self.assertEqual(hashes, expected)
|
self.assertEqual(hashes, expected)
|
||||||
|
|
||||||
|
def test_hash_suffix_one_tombstone_and_one_meta(self):
|
||||||
|
# A tombstone plus a newer meta file can happen if a tombstone is
|
||||||
|
# replicated to a node with a newer meta file but older data file. The
|
||||||
|
# meta file will be ignored when the diskfile is opened so the
|
||||||
|
# effective state of the disk files is equivalent to only having the
|
||||||
|
# tombstone. Replication cannot remove the meta file, and the meta file
|
||||||
|
# cannot be ssync replicated to a node with only the tombstone, so
|
||||||
|
# we want the get_hashes result to be the same as if the meta file was
|
||||||
|
# not there.
|
||||||
|
for policy in self.iter_policies():
|
||||||
|
df_mgr = self.df_router[policy]
|
||||||
|
df = df_mgr.get_diskfile(
|
||||||
|
'sda1', '0', 'a', 'c', 'o', policy=policy)
|
||||||
|
suffix = os.path.basename(os.path.dirname(df._datadir))
|
||||||
|
# write a tombstone
|
||||||
|
timestamp = self.ts()
|
||||||
|
df.delete(timestamp)
|
||||||
|
# write a meta file
|
||||||
|
df.write_metadata({'X-Timestamp': self.ts().internal})
|
||||||
|
# sanity check
|
||||||
|
self.assertEqual(2, len(os.listdir(df._datadir)))
|
||||||
|
tombstone_hash = md5(timestamp.internal + '.ts').hexdigest()
|
||||||
|
hashes = df_mgr.get_hashes('sda1', '0', [], policy)
|
||||||
|
expected = {
|
||||||
|
REPL_POLICY: {suffix: tombstone_hash},
|
||||||
|
EC_POLICY: {suffix: {
|
||||||
|
# fi is None here because we have a tombstone
|
||||||
|
None: tombstone_hash}},
|
||||||
|
}[policy.policy_type]
|
||||||
|
self.assertEqual(hashes, expected)
|
||||||
|
|
||||||
def test_hash_suffix_one_reclaim_tombstone(self):
|
def test_hash_suffix_one_reclaim_tombstone(self):
|
||||||
for policy in self.iter_policies():
|
for policy in self.iter_policies():
|
||||||
df_mgr = self.df_router[policy]
|
df_mgr = self.df_router[policy]
|
||||||
|
Loading…
Reference in New Issue
Block a user