diff --git a/swift/obj/server.py b/swift/obj/server.py index 658f207a8d..97424cf787 100644 --- a/swift/obj/server.py +++ b/swift/obj/server.py @@ -498,10 +498,14 @@ class ObjectController(BaseStorageServer): except ValueError as e: return HTTPBadRequest(body=str(e), request=request, content_type='text/plain') + # SSYNC will include Frag-Index header for subrequests to primary + # nodes; handoff nodes should 409 subrequests to over-write an + # existing data fragment until they offloaded the existing fragment + frag_index = request.headers.get('X-Backend-Ssync-Frag-Index') try: disk_file = self.get_diskfile( device, partition, account, container, obj, - policy=policy) + policy=policy, frag_index=frag_index) except DiskFileDeviceUnavailable: return HTTPInsufficientStorage(drive=device, request=request) try: diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 5f2461d62e..6aeb4c401f 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -156,11 +156,18 @@ class Receiver(object): self.request.environ['eventlet.minimum_write_chunk_size'] = 0 self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) - if 'X-Backend-Ssync-Frag-Index' in self.request.headers: + self.frag_index = self.node_index = None + if self.request.headers.get('X-Backend-Ssync-Frag-Index'): self.frag_index = int( self.request.headers['X-Backend-Ssync-Frag-Index']) - else: - self.frag_index = None + if self.request.headers.get('X-Backend-Ssync-Node-Index'): + self.node_index = int( + self.request.headers['X-Backend-Ssync-Node-Index']) + if self.node_index != self.frag_index: + # a primary node should only recieve it's own fragments + raise swob.HTTPBadRequest( + 'Frag-Index (%s) != Node-Index (%s)' % ( + self.frag_index, self.node_index)) utils.validate_device_partition(self.device, self.partition) self.diskfile_mgr = self.app._diskfile_router[self.policy] if not self.diskfile_mgr.get_dev_path(self.device): @@ -344,6 +351,9 @@ class Receiver(object): raise Exception('Invalid subrequest method %s' % method) subreq.headers['X-Backend-Storage-Policy-Index'] = int(self.policy) subreq.headers['X-Backend-Replication'] = 'True' + if self.node_index is not None: + # primary node should not 409 if it has a non-primary fragment + subreq.headers['X-Backend-Ssync-Frag-Index'] = self.node_index if replication_headers: subreq.headers['X-Backend-Replication-Headers'] = \ ' '.join(replication_headers) diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index 8e9202c004..50662da84c 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -129,8 +129,15 @@ class Sender(object): self.connection.putheader('Transfer-Encoding', 'chunked') self.connection.putheader('X-Backend-Storage-Policy-Index', int(self.job['policy'])) - self.connection.putheader('X-Backend-Ssync-Frag-Index', - self.node['index']) + # a sync job must use the node's index for the frag_index of the + # rebuilt fragments instead of the frag_index from the job which + # will be rebuilding them + self.connection.putheader( + 'X-Backend-Ssync-Frag-Index', self.node.get( + 'index', self.job.get('frag_index'))) + # a revert job to a handoff will not have a node index + self.connection.putheader('X-Backend-Ssync-Node-Index', + self.node.get('index')) self.connection.endheaders() with exceptions.MessageTimeout( self.daemon.node_timeout, 'connect receive'): diff --git a/test/unit/obj/test_server.py b/test/unit/obj/test_server.py index 9b29dcfff4..d0a6865a4c 100755 --- a/test/unit/obj/test_server.py +++ b/test/unit/obj/test_server.py @@ -1266,6 +1266,61 @@ class TestObjectController(unittest.TestCase): finally: object_server.http_connect = old_http_connect + def test_PUT_ssync_multi_frag(self): + def put_with_index(expected_rsp, frag_index, node_index=None): + timestamp = utils.Timestamp(int(time())).internal + data_file_tail = '#%d.data' % frag_index + headers = {'X-Timestamp': timestamp, + 'Content-Length': '6', + 'Content-Type': 'application/octet-stream', + 'X-Backend-Ssync-Frag-Index': node_index, + 'X-Object-Sysmeta-Ec-Frag-Index': frag_index, + 'X-Backend-Storage-Policy-Index': int(policy)} + req = Request.blank( + '/sda1/p/a/c/o', environ={'REQUEST_METHOD': 'PUT'}, + headers=headers) + req.body = 'VERIFY' + resp = req.get_response(self.object_controller) + + self.assertEquals( + resp.status_int, expected_rsp, + 'got %s != %s for frag_index=%s node_index=%s' % ( + resp.status_int, expected_rsp, + frag_index, node_index)) + if expected_rsp == 409: + return + obj_dir = os.path.join( + self.testdir, 'sda1', + storage_directory(diskfile.get_data_dir(int(policy)), + 'p', hash_path('a', 'c', 'o'))) + data_file = os.path.join(obj_dir, timestamp) + data_file_tail + self.assertTrue(os.path.isfile(data_file), + 'Expected file %r not found in %r for policy %r' + % (data_file, os.listdir(obj_dir), int(policy))) + + for policy in POLICIES: + if policy.policy_type == EC_POLICY: + # upload with a ec-frag-index + put_with_index(201, 3) + # same timestamp will conflict a different ec-frag-index + put_with_index(409, 2) + # but with the ssync-frag-index (primary node) it will just + # save both! + put_with_index(201, 2, 2) + # but even with the ssync-frag-index we can still get a + # timestamp collisison if the file already exists + put_with_index(409, 3, 3) + + # FWIW, ssync will never send in-consistent indexes - but if + # something else did, from the object server perspective ... + + # ... the ssync-frag-index is canonical on the + # read/pre-existance check + put_with_index(409, 7, 2) + # ... but the ec-frag-index is canonical when it comes to on + # disk file + put_with_index(201, 7, 6) + def test_PUT_durable_files(self): for policy in POLICIES: timestamp = utils.Timestamp(int(time())).internal diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 8b652ad2ec..4a8ee4541b 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -188,7 +188,7 @@ class TestReceiver(unittest.TestCase): self.assertEqual('No policy with index 2', err.body) @unit.patch_policies() - def test_Receiver_with_frag_index_header(self): + def test_Receiver_with_only_frag_index_header(self): # update router post policy patch self.controller._diskfile_router = diskfile.DiskFileRouter( self.conf, self.controller.logger) @@ -208,6 +208,69 @@ class TestReceiver(unittest.TestCase): ':UPDATES: START', ':UPDATES: END']) self.assertEqual(rcvr.policy, POLICIES[1]) self.assertEqual(rcvr.frag_index, 7) + self.assertEqual(rcvr.node_index, None) + + @unit.patch_policies() + def test_Receiver_with_only_node_index_header(self): + # update router post policy patch + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + with self.assertRaises(HTTPException) as e: + ssync_receiver.Receiver(self.controller, req) + self.assertEqual(e.exception.status_int, 400) + # if a node index is included - it *must* be + # the same value of frag index + self.assertEqual(e.exception.body, + 'Frag-Index (None) != Node-Index (7)') + + @unit.patch_policies() + def test_Receiver_with_matched_indexes(self): + # update router post policy patch + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '7', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + rcvr = ssync_receiver.Receiver(self.controller, req) + body_lines = [chunk.strip() for chunk in rcvr() if chunk.strip()] + self.assertEqual( + body_lines, + [':MISSING_CHECK: START', ':MISSING_CHECK: END', + ':UPDATES: START', ':UPDATES: END']) + self.assertEqual(rcvr.policy, POLICIES[1]) + self.assertEqual(rcvr.frag_index, 7) + self.assertEqual(rcvr.node_index, 7) + + @unit.patch_policies() + def test_Receiver_with_mismatched_indexes(self): + # update router post policy patch + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_SSYNC_NODE_INDEX': '6', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': '7', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + self.assertRaises(HTTPException, ssync_receiver.Receiver, + self.controller, req) def test_SSYNC_replication_lock_fail(self): def _mock(path): diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 32a6dae02a..b6f75ba659 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -261,6 +261,7 @@ class TestSender(BaseTestSender): mock.call('Transfer-Encoding', 'chunked'), mock.call('X-Backend-Storage-Policy-Index', 1), mock.call('X-Backend-Ssync-Frag-Index', 0), + mock.call('X-Backend-Ssync-Node-Index', 0), ], 'endheaders': [mock.call()], }