Merge "EC Ssync: Update parms to include node and frag indices"
This commit is contained in:
commit
66db3bc2ce
@ -498,10 +498,14 @@ class ObjectController(BaseStorageServer):
|
||||
except ValueError as e:
|
||||
return HTTPBadRequest(body=str(e), request=request,
|
||||
content_type='text/plain')
|
||||
# SSYNC will include Frag-Index header for subrequests to primary
|
||||
# nodes; handoff nodes should 409 subrequests to over-write an
|
||||
# existing data fragment until they offloaded the existing fragment
|
||||
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
|
||||
try:
|
||||
disk_file = self.get_diskfile(
|
||||
device, partition, account, container, obj,
|
||||
policy=policy)
|
||||
policy=policy, frag_index=frag_index)
|
||||
except DiskFileDeviceUnavailable:
|
||||
return HTTPInsufficientStorage(drive=device, request=request)
|
||||
try:
|
||||
|
@ -156,11 +156,18 @@ class Receiver(object):
|
||||
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
|
||||
self.device, self.partition, self.policy = \
|
||||
request_helpers.get_name_and_placement(self.request, 2, 2, False)
|
||||
if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
|
||||
self.frag_index = self.node_index = None
|
||||
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
|
||||
self.frag_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Frag-Index'])
|
||||
else:
|
||||
self.frag_index = None
|
||||
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
|
||||
self.node_index = int(
|
||||
self.request.headers['X-Backend-Ssync-Node-Index'])
|
||||
if self.node_index != self.frag_index:
|
||||
# a primary node should only recieve it's own fragments
|
||||
raise swob.HTTPBadRequest(
|
||||
'Frag-Index (%s) != Node-Index (%s)' % (
|
||||
self.frag_index, self.node_index))
|
||||
utils.validate_device_partition(self.device, self.partition)
|
||||
self.diskfile_mgr = self.app._diskfile_router[self.policy]
|
||||
if not self.diskfile_mgr.get_dev_path(self.device):
|
||||
@ -344,6 +351,9 @@ class Receiver(object):
|
||||
raise Exception('Invalid subrequest method %s' % method)
|
||||
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
|
||||
subreq.headers['X-Backend-Replication'] = 'True'
|
||||
if self.node_index is not None:
|
||||
# primary node should not 409 if it has a non-primary fragment
|
||||
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index
|
||||
if replication_headers:
|
||||
subreq.headers['X-Backend-Replication-Headers'] = \
|
||||
' '.join(replication_headers)
|
||||
|
@ -129,8 +129,15 @@ class Sender(object):
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader('X-Backend-Storage-Policy-Index',
|
||||
int(self.job['policy']))
|
||||
self.connection.putheader('X-Backend-Ssync-Frag-Index',
|
||||
self.node['index'])
|
||||
# a sync job must use the node's index for the frag_index of the
|
||||
# rebuilt fragments instead of the frag_index from the job which
|
||||
# will be rebuilding them
|
||||
self.connection.putheader(
|
||||
'X-Backend-Ssync-Frag-Index', self.node.get(
|
||||
'index', self.job.get('frag_index')))
|
||||
# a revert job to a handoff will not have a node index
|
||||
self.connection.putheader('X-Backend-Ssync-Node-Index',
|
||||
self.node.get('index'))
|
||||
self.connection.endheaders()
|
||||
with exceptions.MessageTimeout(
|
||||
self.daemon.node_timeout, 'connect receive'):
|
||||
|
@ -1266,6 +1266,61 @@ class TestObjectController(unittest.TestCase):
|
||||
finally:
|
||||
object_server.http_connect = old_http_connect
|
||||
|
||||
def test_PUT_ssync_multi_frag(self):
|
||||
def put_with_index(expected_rsp, frag_index, node_index=None):
|
||||
timestamp = utils.Timestamp(int(time())).internal
|
||||
data_file_tail = '#%d.data' % frag_index
|
||||
headers = {'X-Timestamp': timestamp,
|
||||
'Content-Length': '6',
|
||||
'Content-Type': 'application/octet-stream',
|
||||
'X-Backend-Ssync-Frag-Index': node_index,
|
||||
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
|
||||
'X-Backend-Storage-Policy-Index': int(policy)}
|
||||
req = Request.blank(
|
||||
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
|
||||
headers=headers)
|
||||
req.body = 'VERIFY'
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
||||
self.assertEquals(
|
||||
resp.status_int, expected_rsp,
|
||||
'got %s != %s for frag_index=%s node_index=%s' % (
|
||||
resp.status_int, expected_rsp,
|
||||
frag_index, node_index))
|
||||
if expected_rsp == 409:
|
||||
return
|
||||
obj_dir = os.path.join(
|
||||
self.testdir, 'sda1',
|
||||
storage_directory(diskfile.get_data_dir(int(policy)),
|
||||
'p', hash_path('a', 'c', 'o')))
|
||||
data_file = os.path.join(obj_dir, timestamp) + data_file_tail
|
||||
self.assertTrue(os.path.isfile(data_file),
|
||||
'Expected file %r not found in %r for policy %r'
|
||||
% (data_file, os.listdir(obj_dir), int(policy)))
|
||||
|
||||
for policy in POLICIES:
|
||||
if policy.policy_type == EC_POLICY:
|
||||
# upload with a ec-frag-index
|
||||
put_with_index(201, 3)
|
||||
# same timestamp will conflict a different ec-frag-index
|
||||
put_with_index(409, 2)
|
||||
# but with the ssync-frag-index (primary node) it will just
|
||||
# save both!
|
||||
put_with_index(201, 2, 2)
|
||||
# but even with the ssync-frag-index we can still get a
|
||||
# timestamp collisison if the file already exists
|
||||
put_with_index(409, 3, 3)
|
||||
|
||||
# FWIW, ssync will never send in-consistent indexes - but if
|
||||
# something else did, from the object server perspective ...
|
||||
|
||||
# ... the ssync-frag-index is canonical on the
|
||||
# read/pre-existance check
|
||||
put_with_index(409, 7, 2)
|
||||
# ... but the ec-frag-index is canonical when it comes to on
|
||||
# disk file
|
||||
put_with_index(201, 7, 6)
|
||||
|
||||
def test_PUT_durable_files(self):
|
||||
for policy in POLICIES:
|
||||
timestamp = utils.Timestamp(int(time())).internal
|
||||
|
@ -188,7 +188,7 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual('No policy with index 2', err.body)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_frag_index_header(self):
|
||||
def test_Receiver_with_only_frag_index_header(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
@ -208,6 +208,69 @@ class TestReceiver(unittest.TestCase):
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(rcvr.policy, POLICIES[1])
|
||||
self.assertEqual(rcvr.frag_index, 7)
|
||||
self.assertEqual(rcvr.node_index, None)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_only_node_index_header(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
with self.assertRaises(HTTPException) as e:
|
||||
ssync_receiver.Receiver(self.controller, req)
|
||||
self.assertEqual(e.exception.status_int, 400)
|
||||
# if a node index is included - it *must* be
|
||||
# the same value of frag index
|
||||
self.assertEqual(e.exception.body,
|
||||
'Frag-Index (None) != Node-Index (7)')
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_matched_indexes(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
rcvr = ssync_receiver.Receiver(self.controller, req)
|
||||
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
|
||||
self.assertEqual(
|
||||
body_lines,
|
||||
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
|
||||
':UPDATES: START', ':UPDATES: END'])
|
||||
self.assertEqual(rcvr.policy, POLICIES[1])
|
||||
self.assertEqual(rcvr.frag_index, 7)
|
||||
self.assertEqual(rcvr.node_index, 7)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_mismatched_indexes(self):
|
||||
# update router post policy patch
|
||||
self.controller._diskfile_router = diskfile.DiskFileRouter(
|
||||
self.conf, self.controller.logger)
|
||||
req = swob.Request.blank(
|
||||
'/sda1/1',
|
||||
environ={'REQUEST_METHOD': 'SSYNC',
|
||||
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '6',
|
||||
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
|
||||
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
|
||||
body=':MISSING_CHECK: START\r\n'
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
self.assertRaises(HTTPException, ssync_receiver.Receiver,
|
||||
self.controller, req)
|
||||
|
||||
def test_SSYNC_replication_lock_fail(self):
|
||||
def _mock(path):
|
||||
|
@ -261,6 +261,7 @@ class TestSender(BaseTestSender):
|
||||
mock.call('Transfer-Encoding', 'chunked'),
|
||||
mock.call('X-Backend-Storage-Policy-Index', 1),
|
||||
mock.call('X-Backend-Ssync-Frag-Index', 0),
|
||||
mock.call('X-Backend-Ssync-Node-Index', 0),
|
||||
],
|
||||
'endheaders': [mock.call()],
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user