Allow to rebuild a fragment of an expired object
When a fragment of an expired object was missing, the reconstructor ssync job would send a DELETE sub-request. This leads to situation where, for the same object and timestamp, some nodes have a data file, while others can have a tombstone file. This patch forces the reconstructor to reconstruct a data file, even for expired objects. DELETE requests are only sent for tombstoned objects. Co-Authored-By: Alistair Coles <alistairncoles@gmail.com> Closes-Bug: #1652323 Change-Id: I7f90b732c3268cb852b64f17555c631d668044a8
This commit is contained in:
parent
0b22193718
commit
69df458254
@ -407,6 +407,7 @@ class ObjectReconstructor(Daemon):
|
|||||||
# need to be durable.
|
# need to be durable.
|
||||||
headers = self.headers.copy()
|
headers = self.headers.copy()
|
||||||
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
headers['X-Backend-Storage-Policy-Index'] = int(job['policy'])
|
||||||
|
headers['X-Backend-Replication'] = 'True'
|
||||||
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
|
frag_prefs = [{'timestamp': datafile_metadata['X-Timestamp'],
|
||||||
'exclude': []}]
|
'exclude': []}]
|
||||||
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
|
headers['X-Backend-Fragment-Preferences'] = json.dumps(frag_prefs)
|
||||||
|
@ -908,7 +908,9 @@ class ObjectController(BaseStorageServer):
|
|||||||
try:
|
try:
|
||||||
disk_file = self.get_diskfile(
|
disk_file = self.get_diskfile(
|
||||||
device, partition, account, container, obj,
|
device, partition, account, container, obj,
|
||||||
policy=policy, frag_prefs=frag_prefs)
|
policy=policy, frag_prefs=frag_prefs,
|
||||||
|
open_expired=config_true_value(
|
||||||
|
request.headers.get('x-backend-replication', 'false')))
|
||||||
except DiskFileDeviceUnavailable:
|
except DiskFileDeviceUnavailable:
|
||||||
return HTTPInsufficientStorage(drive=device, request=request)
|
return HTTPInsufficientStorage(drive=device, request=request)
|
||||||
try:
|
try:
|
||||||
|
@ -329,7 +329,8 @@ class Sender(object):
|
|||||||
try:
|
try:
|
||||||
df = self.df_mgr.get_diskfile_from_hash(
|
df = self.df_mgr.get_diskfile_from_hash(
|
||||||
self.job['device'], self.job['partition'], object_hash,
|
self.job['device'], self.job['partition'], object_hash,
|
||||||
self.job['policy'], frag_index=self.job.get('frag_index'))
|
self.job['policy'], frag_index=self.job.get('frag_index'),
|
||||||
|
open_expired=True)
|
||||||
except exceptions.DiskFileNotExist:
|
except exceptions.DiskFileNotExist:
|
||||||
continue
|
continue
|
||||||
url_path = urllib.parse.quote(
|
url_path = urllib.parse.quote(
|
||||||
|
@ -24,7 +24,9 @@ import shutil
|
|||||||
import random
|
import random
|
||||||
from collections import defaultdict
|
from collections import defaultdict
|
||||||
import os
|
import os
|
||||||
|
import time
|
||||||
|
|
||||||
|
from swift.common.direct_client import DirectClientException
|
||||||
from test.probe.common import ECProbeTest
|
from test.probe.common import ECProbeTest
|
||||||
|
|
||||||
from swift.common import direct_client
|
from swift.common import direct_client
|
||||||
@ -80,19 +82,10 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
headers=headers)
|
headers=headers)
|
||||||
|
|
||||||
# PUT object and POST some metadata
|
# PUT object and POST some metadata
|
||||||
contents = Body()
|
self.proxy_put()
|
||||||
headers = {
|
|
||||||
self._make_name('x-object-meta-').decode('utf8'):
|
|
||||||
self._make_name('meta-foo-').decode('utf8'),
|
|
||||||
}
|
|
||||||
self.headers_post = {
|
self.headers_post = {
|
||||||
self._make_name('x-object-meta-').decode('utf8'):
|
self._make_name('x-object-meta-').decode('utf8'):
|
||||||
self._make_name('meta-bar-').decode('utf8')}
|
self._make_name('meta-bar-').decode('utf8')}
|
||||||
|
|
||||||
self.etag = client.put_object(self.url, self.token,
|
|
||||||
self.container_name,
|
|
||||||
self.object_name,
|
|
||||||
contents=contents, headers=headers)
|
|
||||||
client.post_object(self.url, self.token, self.container_name,
|
client.post_object(self.url, self.token, self.container_name,
|
||||||
self.object_name, headers=dict(self.headers_post))
|
self.object_name, headers=dict(self.headers_post))
|
||||||
|
|
||||||
@ -107,6 +100,19 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
'X-Backend-Durable-Timestamp', hdrs,
|
'X-Backend-Durable-Timestamp', hdrs,
|
||||||
'Missing durable timestamp in %r' % self.frag_headers)
|
'Missing durable timestamp in %r' % self.frag_headers)
|
||||||
|
|
||||||
|
def proxy_put(self, extra_headers=None):
|
||||||
|
contents = Body()
|
||||||
|
headers = {
|
||||||
|
self._make_name('x-object-meta-').decode('utf8'):
|
||||||
|
self._make_name('meta-foo-').decode('utf8'),
|
||||||
|
}
|
||||||
|
if extra_headers:
|
||||||
|
headers.update(extra_headers)
|
||||||
|
self.etag = client.put_object(self.url, self.token,
|
||||||
|
self.container_name,
|
||||||
|
self.object_name,
|
||||||
|
contents=contents, headers=headers)
|
||||||
|
|
||||||
def proxy_get(self):
|
def proxy_get(self):
|
||||||
# GET object
|
# GET object
|
||||||
headers, body = client.get_object(self.url, self.token,
|
headers, body = client.get_object(self.url, self.token,
|
||||||
@ -118,8 +124,10 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
resp_checksum.update(chunk)
|
resp_checksum.update(chunk)
|
||||||
return headers, resp_checksum.hexdigest()
|
return headers, resp_checksum.hexdigest()
|
||||||
|
|
||||||
def direct_get(self, node, part, require_durable=True):
|
def direct_get(self, node, part, require_durable=True, extra_headers=None):
|
||||||
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
req_headers = {'X-Backend-Storage-Policy-Index': int(self.policy)}
|
||||||
|
if extra_headers:
|
||||||
|
req_headers.update(extra_headers)
|
||||||
if not require_durable:
|
if not require_durable:
|
||||||
req_headers.update(
|
req_headers.update(
|
||||||
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
{'X-Backend-Fragment-Preferences': json.dumps([])})
|
||||||
@ -166,14 +174,15 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
def _format_node(self, node):
|
def _format_node(self, node):
|
||||||
return '%s#%s' % (node['device'], node['index'])
|
return '%s#%s' % (node['device'], node['index'])
|
||||||
|
|
||||||
def _assert_all_nodes_have_frag(self):
|
def _assert_all_nodes_have_frag(self, extra_headers=None):
|
||||||
# check all frags are in place
|
# check all frags are in place
|
||||||
failures = []
|
failures = []
|
||||||
frag_etags = {}
|
frag_etags = {}
|
||||||
frag_headers = {}
|
frag_headers = {}
|
||||||
for node in self.onodes:
|
for node in self.onodes:
|
||||||
try:
|
try:
|
||||||
headers, etag = self.direct_get(node, self.opart)
|
headers, etag = self.direct_get(node, self.opart,
|
||||||
|
extra_headers=extra_headers)
|
||||||
frag_etags[node['index']] = etag
|
frag_etags[node['index']] = etag
|
||||||
del headers['Date'] # Date header will vary so remove it
|
del headers['Date'] # Date header will vary so remove it
|
||||||
frag_headers[node['index']] = headers
|
frag_headers[node['index']] = headers
|
||||||
@ -335,6 +344,51 @@ class TestReconstructorRebuild(ECProbeTest):
|
|||||||
# just to be nice
|
# just to be nice
|
||||||
self.revive_drive(device_path)
|
self.revive_drive(device_path)
|
||||||
|
|
||||||
|
def test_sync_expired_object(self):
|
||||||
|
# verify that missing frag can be rebuilt for an expired object
|
||||||
|
delete_at = int(time.time() + 3)
|
||||||
|
self.proxy_put(extra_headers={'x-delete-at': delete_at})
|
||||||
|
self.proxy_get() # sanity check
|
||||||
|
orig_frag_headers, orig_frag_etags = self._assert_all_nodes_have_frag(
|
||||||
|
extra_headers={'X-Backend-Replication': 'True'})
|
||||||
|
|
||||||
|
# wait for object to expire
|
||||||
|
time.sleep(3)
|
||||||
|
|
||||||
|
# sanity check - object has now expired, proxy get fails
|
||||||
|
with self.assertRaises(ClientException) as cm:
|
||||||
|
self.proxy_get()
|
||||||
|
self.assertEqual(404, cm.exception.http_status)
|
||||||
|
|
||||||
|
# sanity check - X-Backend-Replication let's us get expired frag...
|
||||||
|
fail_node = random.choice(self.onodes)
|
||||||
|
self.direct_get(fail_node, self.opart,
|
||||||
|
extra_headers={'X-Backend-Replication': 'True'})
|
||||||
|
# ...until we remove the frag from fail_node
|
||||||
|
self._break_nodes([self.onodes.index(fail_node)], [])
|
||||||
|
# ...now it's really gone
|
||||||
|
with self.assertRaises(DirectClientException) as cm:
|
||||||
|
self.direct_get(fail_node, self.opart,
|
||||||
|
extra_headers={'X-Backend-Replication': 'True'})
|
||||||
|
self.assertEqual(404, cm.exception.http_status)
|
||||||
|
self.assertNotIn('X-Backend-Timestamp', cm.exception.http_headers)
|
||||||
|
|
||||||
|
# run the reconstructor
|
||||||
|
self.reconstructor.once()
|
||||||
|
|
||||||
|
# the missing frag is now in place but expired
|
||||||
|
with self.assertRaises(DirectClientException) as cm:
|
||||||
|
self.direct_get(fail_node, self.opart)
|
||||||
|
self.assertEqual(404, cm.exception.http_status)
|
||||||
|
self.assertIn('X-Backend-Timestamp', cm.exception.http_headers)
|
||||||
|
|
||||||
|
# check all frags are intact, durable and have expected metadata
|
||||||
|
frag_headers, frag_etags = self._assert_all_nodes_have_frag(
|
||||||
|
extra_headers={'X-Backend-Replication': 'True'})
|
||||||
|
self.assertEqual(orig_frag_etags, frag_etags)
|
||||||
|
self.maxDiff = None
|
||||||
|
self.assertEqual(orig_frag_headers, frag_headers)
|
||||||
|
|
||||||
|
|
||||||
class TestReconstructorRebuildUTF8(TestReconstructorRebuild):
|
class TestReconstructorRebuildUTF8(TestReconstructorRebuild):
|
||||||
|
|
||||||
|
@ -3964,6 +3964,7 @@ class TestReconstructFragmentArchive(BaseTestObjectReconstructor):
|
|||||||
self.assertEqual(
|
self.assertEqual(
|
||||||
[{'timestamp': self.obj_timestamp.normal, 'exclude': []}],
|
[{'timestamp': self.obj_timestamp.normal, 'exclude': []}],
|
||||||
json.loads(called_header['X-Backend-Fragment-Preferences']))
|
json.loads(called_header['X-Backend-Fragment-Preferences']))
|
||||||
|
self.assertIn('X-Backend-Replication', called_header)
|
||||||
# no error and warning
|
# no error and warning
|
||||||
self.assertFalse(self.logger.get_lines_for_level('error'))
|
self.assertFalse(self.logger.get_lines_for_level('error'))
|
||||||
self.assertFalse(self.logger.get_lines_for_level('warning'))
|
self.assertFalse(self.logger.get_lines_for_level('warning'))
|
||||||
|
@ -5711,7 +5711,8 @@ class TestObjectController(unittest.TestCase):
|
|||||||
given_args[5], 'sda1', policy])
|
given_args[5], 'sda1', policy])
|
||||||
|
|
||||||
def test_GET_but_expired(self):
|
def test_GET_but_expired(self):
|
||||||
test_time = time() + 10000
|
now = time()
|
||||||
|
test_time = now + 10000
|
||||||
delete_at_timestamp = int(test_time + 100)
|
delete_at_timestamp = int(test_time + 100)
|
||||||
delete_at_container = str(
|
delete_at_container = str(
|
||||||
delete_at_timestamp /
|
delete_at_timestamp /
|
||||||
@ -5734,11 +5735,7 @@ class TestObjectController(unittest.TestCase):
|
|||||||
resp = req.get_response(self.object_controller)
|
resp = req.get_response(self.object_controller)
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
|
||||||
orig_time = object_server.time.time
|
delete_at_timestamp = int(now + 1)
|
||||||
try:
|
|
||||||
t = time()
|
|
||||||
object_server.time.time = lambda: t
|
|
||||||
delete_at_timestamp = int(t + 1)
|
|
||||||
delete_at_container = str(
|
delete_at_container = str(
|
||||||
delete_at_timestamp /
|
delete_at_timestamp /
|
||||||
self.object_controller.expiring_objects_container_divisor *
|
self.object_controller.expiring_objects_container_divisor *
|
||||||
@ -5755,29 +5752,35 @@ class TestObjectController(unittest.TestCase):
|
|||||||
req.body = 'TEST'
|
req.body = 'TEST'
|
||||||
resp = req.get_response(self.object_controller)
|
resp = req.get_response(self.object_controller)
|
||||||
self.assertEqual(resp.status_int, 201)
|
self.assertEqual(resp.status_int, 201)
|
||||||
|
|
||||||
|
# fix server time to now: delete-at is in future, verify GET is ok
|
||||||
|
with mock.patch('swift.obj.server.time.time', return_value=now):
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/sda1/p/a/c/o',
|
'/sda1/p/a/c/o',
|
||||||
environ={'REQUEST_METHOD': 'GET'},
|
environ={'REQUEST_METHOD': 'GET'},
|
||||||
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
headers={'X-Timestamp': normalize_timestamp(test_time)})
|
||||||
resp = req.get_response(self.object_controller)
|
resp = req.get_response(self.object_controller)
|
||||||
self.assertEqual(resp.status_int, 200)
|
self.assertEqual(resp.status_int, 200)
|
||||||
finally:
|
|
||||||
object_server.time.time = orig_time
|
|
||||||
|
|
||||||
orig_time = object_server.time.time
|
# fix server time to now + 2: delete-at is in past, verify GET fails...
|
||||||
try:
|
with mock.patch('swift.obj.server.time.time', return_value=now + 2):
|
||||||
t = time() + 2
|
|
||||||
object_server.time.time = lambda: t
|
|
||||||
req = Request.blank(
|
req = Request.blank(
|
||||||
'/sda1/p/a/c/o',
|
'/sda1/p/a/c/o',
|
||||||
environ={'REQUEST_METHOD': 'GET'},
|
environ={'REQUEST_METHOD': 'GET'},
|
||||||
headers={'X-Timestamp': normalize_timestamp(t)})
|
headers={'X-Timestamp': normalize_timestamp(now + 2)})
|
||||||
resp = req.get_response(self.object_controller)
|
resp = req.get_response(self.object_controller)
|
||||||
self.assertEqual(resp.status_int, 404)
|
self.assertEqual(resp.status_int, 404)
|
||||||
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
self.assertEqual(resp.headers['X-Backend-Timestamp'],
|
||||||
utils.Timestamp(put_timestamp))
|
utils.Timestamp(put_timestamp))
|
||||||
finally:
|
# ...unless X-Backend-Replication is sent
|
||||||
object_server.time.time = orig_time
|
req = Request.blank(
|
||||||
|
'/sda1/p/a/c/o',
|
||||||
|
environ={'REQUEST_METHOD': 'GET'},
|
||||||
|
headers={'X-Timestamp': normalize_timestamp(now + 2),
|
||||||
|
'X-Backend-Replication': 'True'})
|
||||||
|
resp = req.get_response(self.object_controller)
|
||||||
|
self.assertEqual(resp.status_int, 200)
|
||||||
|
self.assertEqual('TEST', resp.body)
|
||||||
|
|
||||||
def test_HEAD_but_expired(self):
|
def test_HEAD_but_expired(self):
|
||||||
test_time = time() + 10000
|
test_time = time() + 10000
|
||||||
|
@ -147,7 +147,7 @@ class TestBaseSsync(BaseTest):
|
|||||||
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
|
def _open_rx_diskfile(self, obj_name, policy, frag_index=None):
|
||||||
df = self.rx_controller.get_diskfile(
|
df = self.rx_controller.get_diskfile(
|
||||||
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
|
self.device, self.partition, 'a', 'c', obj_name, policy=policy,
|
||||||
frag_index=frag_index)
|
frag_index=frag_index, open_expired=True)
|
||||||
df.open()
|
df.open()
|
||||||
return df
|
return df
|
||||||
|
|
||||||
@ -1346,6 +1346,41 @@ class TestSsyncReplication(TestBaseSsync):
|
|||||||
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
self.assertEqual(metadata['X-Object-Meta-Test'], oname)
|
||||||
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
self.assertEqual(metadata['X-Object-Sysmeta-Test'], 'sys_' + oname)
|
||||||
|
|
||||||
|
def test_expired_object(self):
|
||||||
|
# verify that expired objects sync
|
||||||
|
policy = POLICIES.default
|
||||||
|
rx_node_index = 0
|
||||||
|
tx_df_mgr = self.daemon._df_router[policy]
|
||||||
|
t1 = next(self.ts_iter)
|
||||||
|
obj_name = 'o1'
|
||||||
|
metadata = {'X-Delete-At': '0', 'Content-Type': 'plain/text'}
|
||||||
|
df = self._make_diskfile(
|
||||||
|
obj=obj_name, body=self._get_object_data('/a/c/%s' % obj_name),
|
||||||
|
extra_metadata=metadata, timestamp=t1, policy=policy,
|
||||||
|
df_mgr=tx_df_mgr, verify=False)
|
||||||
|
with self.assertRaises(DiskFileExpired):
|
||||||
|
df.open() # sanity check - expired
|
||||||
|
|
||||||
|
# create ssync sender instance...
|
||||||
|
suffixes = [os.path.basename(os.path.dirname(df._datadir))]
|
||||||
|
job = {'device': self.device,
|
||||||
|
'partition': self.partition,
|
||||||
|
'policy': policy}
|
||||||
|
node = dict(self.rx_node)
|
||||||
|
node.update({'index': rx_node_index})
|
||||||
|
sender = ssync_sender.Sender(self.daemon, node, job, suffixes)
|
||||||
|
# wrap connection from tx to rx to capture ssync messages...
|
||||||
|
sender.connect, trace = self.make_connect_wrapper(sender)
|
||||||
|
|
||||||
|
# run the sync protocol...
|
||||||
|
success, in_sync_objs = sender()
|
||||||
|
|
||||||
|
self.assertEqual(1, len(in_sync_objs))
|
||||||
|
self.assertTrue(success)
|
||||||
|
# allow the expired sender diskfile to be opened for verification
|
||||||
|
df._open_expired = True
|
||||||
|
self._verify_ondisk_files({obj_name: [df]}, policy)
|
||||||
|
|
||||||
def _check_no_longer_expired_object(self, obj_name, df, policy):
|
def _check_no_longer_expired_object(self, obj_name, df, policy):
|
||||||
# verify that objects with x-delete-at metadata that are not expired
|
# verify that objects with x-delete-at metadata that are not expired
|
||||||
# can be sync'd
|
# can be sync'd
|
||||||
|
Loading…
x
Reference in New Issue
Block a user