EC Ssync: Update parms to include node and frag indices

Previously we sent the ssync backend frag index based on the node
index.  We need to be more specific for ssync to handle both sync
and revert cases so now we send the frag index based on the job
contents (as determined by the ec recon)) and the node index
as a new header based on, well, the node index.

The rcvr can now validate the incoming pair to reject (400) when
a primary node is being asked to accept fragments that don't
belong to it.  Additionally, by having the frag index the
rcvr can reject (409) an attempt to accept a fragment when its
a handoff and already has one that needs to be reverted.

Fixes-bug: #1452619
Change-Id: I8287b274bbbd00903c1975fe49375590af697be4
This commit is contained in:
paul luse 2015-06-13 11:03:56 -07:00 committed by Clay Gerrard
parent 0e22371cd0
commit ac8a769585
6 changed files with 147 additions and 7 deletions

View File

@ -498,10 +498,14 @@ class ObjectController(BaseStorageServer):
except ValueError as e:
return HTTPBadRequest(body=str(e), request=request,
content_type='text/plain')
# SSYNC will include Frag-Index header for subrequests to primary
# nodes; handoff nodes should 409 subrequests to over-write an
# existing data fragment until they offloaded the existing fragment
frag_index = request.headers.get('X-Backend-Ssync-Frag-Index')
try:
disk_file = self.get_diskfile(
device, partition, account, container, obj,
policy=policy)
policy=policy, frag_index=frag_index)
except DiskFileDeviceUnavailable:
return HTTPInsufficientStorage(drive=device, request=request)
try:

View File

@ -156,11 +156,18 @@ class Receiver(object):
self.request.environ['eventlet.minimum_write_chunk_size'] = 0
self.device, self.partition, self.policy = \
request_helpers.get_name_and_placement(self.request, 2, 2, False)
if 'X-Backend-Ssync-Frag-Index' in self.request.headers:
self.frag_index = self.node_index = None
if self.request.headers.get('X-Backend-Ssync-Frag-Index'):
self.frag_index = int(
self.request.headers['X-Backend-Ssync-Frag-Index'])
else:
self.frag_index = None
if self.request.headers.get('X-Backend-Ssync-Node-Index'):
self.node_index = int(
self.request.headers['X-Backend-Ssync-Node-Index'])
if self.node_index != self.frag_index:
# a primary node should only recieve it's own fragments
raise swob.HTTPBadRequest(
'Frag-Index (%s) != Node-Index (%s)' % (
self.frag_index, self.node_index))
utils.validate_device_partition(self.device, self.partition)
self.diskfile_mgr = self.app._diskfile_router[self.policy]
if not self.diskfile_mgr.get_dev_path(self.device):
@ -344,6 +351,9 @@ class Receiver(object):
raise Exception('Invalid subrequest method %s' % method)
subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy)
subreq.headers['X-Backend-Replication'] = 'True'
if self.node_index is not None:
# primary node should not 409 if it has a non-primary fragment
subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index
if replication_headers:
subreq.headers['X-Backend-Replication-Headers'] = \
' '.join(replication_headers)

View File

@ -129,8 +129,15 @@ class Sender(object):
self.connection.putheader('Transfer-Encoding', 'chunked')
self.connection.putheader('X-Backend-Storage-Policy-Index',
int(self.job['policy']))
self.connection.putheader('X-Backend-Ssync-Frag-Index',
self.node['index'])
# a sync job must use the node's index for the frag_index of the
# rebuilt fragments instead of the frag_index from the job which
# will be rebuilding them
self.connection.putheader(
'X-Backend-Ssync-Frag-Index', self.node.get(
'index', self.job.get('frag_index')))
# a revert job to a handoff will not have a node index
self.connection.putheader('X-Backend-Ssync-Node-Index',
self.node.get('index'))
self.connection.endheaders()
with exceptions.MessageTimeout(
self.daemon.node_timeout, 'connect receive'):

View File

@ -1266,6 +1266,61 @@ class TestObjectController(unittest.TestCase):
finally:
object_server.http_connect = old_http_connect
def test_PUT_ssync_multi_frag(self):
def put_with_index(expected_rsp, frag_index, node_index=None):
timestamp = utils.Timestamp(int(time())).internal
data_file_tail = '#%d.data' % frag_index
headers = {'X-Timestamp': timestamp,
'Content-Length': '6',
'Content-Type': 'application/octet-stream',
'X-Backend-Ssync-Frag-Index': node_index,
'X-Object-Sysmeta-Ec-Frag-Index': frag_index,
'X-Backend-Storage-Policy-Index': int(policy)}
req = Request.blank(
'/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'},
headers=headers)
req.body = 'VERIFY'
resp = req.get_response(self.object_controller)
self.assertEquals(
resp.status_int, expected_rsp,
'got %s != %s for frag_index=%s node_index=%s' % (
resp.status_int, expected_rsp,
frag_index, node_index))
if expected_rsp == 409:
return
obj_dir = os.path.join(
self.testdir, 'sda1',
storage_directory(diskfile.get_data_dir(int(policy)),
'p', hash_path('a', 'c', 'o')))
data_file = os.path.join(obj_dir, timestamp) + data_file_tail
self.assertTrue(os.path.isfile(data_file),
'Expected file %r not found in %r for policy %r'
% (data_file, os.listdir(obj_dir), int(policy)))
for policy in POLICIES:
if policy.policy_type == EC_POLICY:
# upload with a ec-frag-index
put_with_index(201, 3)
# same timestamp will conflict a different ec-frag-index
put_with_index(409, 2)
# but with the ssync-frag-index (primary node) it will just
# save both!
put_with_index(201, 2, 2)
# but even with the ssync-frag-index we can still get a
# timestamp collisison if the file already exists
put_with_index(409, 3, 3)
# FWIW, ssync will never send in-consistent indexes - but if
# something else did, from the object server perspective ...
# ... the ssync-frag-index is canonical on the
# read/pre-existance check
put_with_index(409, 7, 2)
# ... but the ec-frag-index is canonical when it comes to on
# disk file
put_with_index(201, 7, 6)
def test_PUT_durable_files(self):
for policy in POLICIES:
timestamp = utils.Timestamp(int(time())).internal

View File

@ -188,7 +188,7 @@ class TestReceiver(unittest.TestCase):
self.assertEqual('No policy with index 2', err.body)
@unit.patch_policies()
def test_Receiver_with_frag_index_header(self):
def test_Receiver_with_only_frag_index_header(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
@ -208,6 +208,69 @@ class TestReceiver(unittest.TestCase):
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertEqual(rcvr.node_index, None)
@unit.patch_policies()
def test_Receiver_with_only_node_index_header(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
with self.assertRaises(HTTPException) as e:
ssync_receiver.Receiver(self.controller, req)
self.assertEqual(e.exception.status_int, 400)
# if a node index is included - it *must* be
# the same value of frag index
self.assertEqual(e.exception.body,
'Frag-Index (None) != Node-Index (7)')
@unit.patch_policies()
def test_Receiver_with_matched_indexes(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
rcvr = ssync_receiver.Receiver(self.controller, req)
body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()]
self.assertEqual(
body_lines,
[':MISSING_CHECK: START', ':MISSING_CHECK: END',
':UPDATES: START', ':UPDATES: END'])
self.assertEqual(rcvr.policy, POLICIES[1])
self.assertEqual(rcvr.frag_index, 7)
self.assertEqual(rcvr.node_index, 7)
@unit.patch_policies()
def test_Receiver_with_mismatched_indexes(self):
# update router post policy patch
self.controller._diskfile_router = diskfile.DiskFileRouter(
self.conf, self.controller.logger)
req = swob.Request.blank(
'/sda1/1',
environ={'REQUEST_METHOD': 'SSYNC',
'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '6',
'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7',
'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'},
body=':MISSING_CHECK: START\r\n'
':MISSING_CHECK: END\r\n'
':UPDATES: START\r\n:UPDATES: END\r\n')
self.assertRaises(HTTPException, ssync_receiver.Receiver,
self.controller, req)
def test_SSYNC_replication_lock_fail(self):
def _mock(path):

View File

@ -261,6 +261,7 @@ class TestSender(BaseTestSender):
mock.call('Transfer-Encoding', 'chunked'),
mock.call('X-Backend-Storage-Policy-Index', 1),
mock.call('X-Backend-Ssync-Frag-Index', 0),
mock.call('X-Backend-Ssync-Node-Index', 0),
],
'endheaders': [mock.call()],
}