Object POST update gets distinct async pending file
Each object update to a container server is saved in a pending file if the initial update attempt fails. Pending file names were derived from the update request's x-timestamp, which is equal to the object's data file timestamp. This meant that updates due to an object POST used the same async pending file as updates due to the object's PUT. This is not so bad because the object POST update has a superset of the metadata included in the PUT update. But there is a risk of a race condition causing an update to be lost: the updater may open an update file due to a PUT whuile the object server is writing an update due to a POST to the same file name. The updater could then unlink the file before the more recent update for the POST is sent. This patch changes the POST update pending file name to be derived from the object's metadata timestamp, thus making it distinct from the PUT update pending file name. There is no upgrade impact since existing pending files will continue to be processed. Change-Id: I1b093c837efe8c2a64e92075ebd5e1b93e30efb9
This commit is contained in:
parent
35d62d0257
commit
9db7391e55
@ -250,7 +250,8 @@ class ObjectController(BaseStorageServer):
|
||||
{'ip': ip, 'port': port, 'dev': contdevice})
|
||||
data = {'op': op, 'account': account, 'container': container,
|
||||
'obj': obj, 'headers': headers_out}
|
||||
timestamp = headers_out['x-timestamp']
|
||||
timestamp = headers_out.get('x-meta-timestamp',
|
||||
headers_out.get('x-timestamp'))
|
||||
self._diskfile_router[policy].pickle_async_update(
|
||||
objdevice, account, container, obj, data, timestamp, policy)
|
||||
|
||||
@ -565,6 +566,7 @@ class ObjectController(BaseStorageServer):
|
||||
content_type_headers['Content-Type'] += (';swift_bytes=%s'
|
||||
% swift_bytes)
|
||||
|
||||
# object POST updates are PUT to the container server
|
||||
self.container_update(
|
||||
'PUT', account, container, obj, request,
|
||||
HeaderKeyDict({
|
||||
|
@ -48,6 +48,7 @@ from test.unit import FakeLogger, debug_logger, mocked_http_conn, \
|
||||
make_timestamp_iter, DEFAULT_TEST_EC_TYPE
|
||||
from test.unit import connect_tcp, readuntil2crlfs, patch_policies
|
||||
from swift.obj import server as object_server
|
||||
from swift.obj import updater
|
||||
from swift.obj import diskfile
|
||||
from swift.common import utils, bufferedhttp
|
||||
from swift.common.utils import hash_path, mkdirs, normalize_timestamp, \
|
||||
@ -697,6 +698,150 @@ class TestObjectController(unittest.TestCase):
|
||||
self._test_POST_container_updates(
|
||||
POLICIES[1], update_etag='override_etag')
|
||||
|
||||
def _test_PUT_then_POST_async_pendings(self, policy, update_etag=None):
|
||||
# Test that PUT and POST requests result in distinct async pending
|
||||
# files when sync container update fails.
|
||||
def fake_http_connect(*args):
|
||||
raise Exception('test')
|
||||
|
||||
device_dir = os.path.join(self.testdir, 'sda1')
|
||||
ts_iter = make_timestamp_iter()
|
||||
t_put = ts_iter.next()
|
||||
update_etag = update_etag or '098f6bcd4621d373cade4e832627b4f6'
|
||||
|
||||
put_headers = {
|
||||
'X-Trans-Id': 'put_trans_id',
|
||||
'X-Timestamp': t_put.internal,
|
||||
'Content-Type': 'application/octet-stream;swift_bytes=123456789',
|
||||
'Content-Length': '4',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Container-Host': 'chost:cport',
|
||||
'X-Container-Partition': 'cpartition',
|
||||
'X-Container-Device': 'cdevice'}
|
||||
if policy.policy_type == EC_POLICY:
|
||||
put_headers.update({
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': '2',
|
||||
'X-Backend-Container-Update-Override-Etag': update_etag,
|
||||
'X-Object-Sysmeta-Ec-Etag': update_etag})
|
||||
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers=put_headers, body='test')
|
||||
|
||||
with mock.patch('swift.obj.server.http_connect', fake_http_connect):
|
||||
with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''):
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
self.assertEqual(resp.status_int, 201)
|
||||
|
||||
async_pending_file_put = os.path.join(
|
||||
device_dir, diskfile.get_async_dir(policy), 'a83',
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_put.internal)
|
||||
self.assertTrue(os.path.isfile(async_pending_file_put),
|
||||
'Expected %s to be a file but it is not.'
|
||||
% async_pending_file_put)
|
||||
expected_put_headers = {
|
||||
'Referer': 'PUT http://localhost/sda1/p/a/c/o',
|
||||
'X-Trans-Id': 'put_trans_id',
|
||||
'X-Timestamp': t_put.internal,
|
||||
'X-Content-Type': 'application/octet-stream;swift_bytes=123456789',
|
||||
'X-Size': '4',
|
||||
'X-Etag': '098f6bcd4621d373cade4e832627b4f6',
|
||||
'User-Agent': 'object-server %s' % os.getpid(),
|
||||
'X-Backend-Storage-Policy-Index': '%d' % int(policy)}
|
||||
if policy.policy_type == EC_POLICY:
|
||||
expected_put_headers['X-Etag'] = update_etag
|
||||
self.assertDictEqual(
|
||||
pickle.load(open(async_pending_file_put)),
|
||||
{'headers': expected_put_headers,
|
||||
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
|
||||
|
||||
# POST with newer metadata returns success and container update
|
||||
# is expected
|
||||
t_post = ts_iter.next()
|
||||
post_headers = {
|
||||
'X-Trans-Id': 'post_trans_id',
|
||||
'X-Timestamp': t_post.internal,
|
||||
'Content-Type': 'application/other',
|
||||
'X-Backend-Storage-Policy-Index': int(policy),
|
||||
'X-Container-Host': 'chost:cport',
|
||||
'X-Container-Partition': 'cpartition',
|
||||
'X-Container-Device': 'cdevice'}
|
||||
req = Request.blank('/sda1/p/a/c/o',
|
||||
environ={'REQUEST_METHOD': 'POST'},
|
||||
headers=post_headers)
|
||||
|
||||
with mock.patch('swift.obj.server.http_connect', fake_http_connect):
|
||||
with mock.patch('swift.common.utils.HASH_PATH_PREFIX', ''):
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
self.assertEqual(resp.status_int, 202)
|
||||
|
||||
self.maxDiff = None
|
||||
# check async pending file for PUT is still intact
|
||||
self.assertDictEqual(
|
||||
pickle.load(open(async_pending_file_put)),
|
||||
{'headers': expected_put_headers,
|
||||
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
|
||||
|
||||
# check distinct async pending file for POST
|
||||
async_pending_file_post = os.path.join(
|
||||
device_dir, diskfile.get_async_dir(policy), 'a83',
|
||||
'06fbf0b514e5199dfc4e00f42eb5ea83-%s' % t_post.internal)
|
||||
self.assertTrue(os.path.isfile(async_pending_file_post),
|
||||
'Expected %s to be a file but it is not.'
|
||||
% async_pending_file_post)
|
||||
expected_post_headers = {
|
||||
'Referer': 'POST http://localhost/sda1/p/a/c/o',
|
||||
'X-Trans-Id': 'post_trans_id',
|
||||
'X-Timestamp': t_put.internal,
|
||||
'X-Content-Type': 'application/other;swift_bytes=123456789',
|
||||
'X-Size': '4',
|
||||
'X-Etag': '098f6bcd4621d373cade4e832627b4f6',
|
||||
'User-Agent': 'object-server %s' % os.getpid(),
|
||||
'X-Backend-Storage-Policy-Index': '%d' % int(policy),
|
||||
'X-Meta-Timestamp': t_post.internal,
|
||||
'X-Content-Type-Timestamp': t_post.internal,
|
||||
}
|
||||
if policy.policy_type == EC_POLICY:
|
||||
expected_post_headers['X-Etag'] = update_etag
|
||||
self.assertDictEqual(
|
||||
pickle.load(open(async_pending_file_post)),
|
||||
{'headers': expected_post_headers,
|
||||
'account': 'a', 'container': 'c', 'obj': 'o', 'op': 'PUT'})
|
||||
|
||||
# verify that only the POST (most recent) async update gets sent by the
|
||||
# object updater, and that both update files are deleted
|
||||
with mock.patch(
|
||||
'swift.obj.updater.ObjectUpdater.object_update') as mock_update, \
|
||||
mock.patch('swift.obj.updater.dump_recon_cache'):
|
||||
object_updater = updater.ObjectUpdater(
|
||||
{'devices': self.testdir,
|
||||
'mount_check': 'false'}, logger=debug_logger())
|
||||
node = {'id': 1}
|
||||
mock_ring = mock.MagicMock()
|
||||
mock_ring.get_nodes.return_value = (99, [node])
|
||||
object_updater.container_ring = mock_ring
|
||||
mock_update.return_value = ((True, 1))
|
||||
object_updater.run_once()
|
||||
self.assertEqual(1, mock_update.call_count)
|
||||
self.assertEqual((node, 99, 'PUT', '/a/c/o'),
|
||||
mock_update.call_args_list[0][0][0:4])
|
||||
actual_headers = mock_update.call_args_list[0][0][4]
|
||||
self.assertTrue(
|
||||
actual_headers.pop('user-agent').startswith('object-updater'))
|
||||
self.assertDictEqual(expected_post_headers, actual_headers)
|
||||
self.assertFalse(
|
||||
os.listdir(os.path.join(
|
||||
device_dir, diskfile.get_async_dir(policy))))
|
||||
|
||||
def test_PUT_then_POST_async_updates_with_repl_policy(self):
|
||||
self._test_PUT_then_POST_async_pendings(POLICIES[0])
|
||||
|
||||
def test_PUT_then_POST_async_updates_with_EC_policy(self):
|
||||
self._test_PUT_then_POST_async_pendings(
|
||||
POLICIES[1], update_etag='override_etag')
|
||||
|
||||
def test_POST_quarantine_zbyte(self):
|
||||
timestamp = normalize_timestamp(time())
|
||||
req = Request.blank('/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
|
Loading…
x
Reference in New Issue
Block a user