From a3facce53cac0497edb326a93a97f24e02c603ab Mon Sep 17 00:00:00 2001 From: paul luse Date: Thu, 27 Aug 2015 11:02:27 -0700 Subject: [PATCH] Fix invalid frag_index header in ssync_sender when reverting EC tombstones Back in d124ce [1] we failed to recognize the situation where a revert job would have an explicit frag_index key wth the literal value None which would take precedence over the dict.get's default value of ''. Later in ssync_receiver we'd bump into the ValueError converting 'None' to an int (again). In ssync_sender we now handle literal None's correctly and should hopefully no longer put this invalid headers on the wire - but for belts and braces we'll also update ssync_receiver to raise a 400 series error and ssync_sender to better log the error messages. 1. https://review.openstack.org/#/c/195457/ Co-Author: Clay Gerrard Co-Author: Alistair Coles Change-Id: Ic71ba7cc82487773214030207bb193f425319449 Closes-Bug: 1489546 --- swift/obj/ssync_receiver.py | 19 ++- swift/obj/ssync_sender.py | 19 ++- test/unit/obj/test_ssync_receiver.py | 165 +++++++++++++++++++++++++-- test/unit/obj/test_ssync_sender.py | 71 ++++++++++++ 4 files changed, 254 insertions(+), 20 deletions(-) diff --git a/swift/obj/ssync_receiver.py b/swift/obj/ssync_receiver.py index 394d2b0158..233373fe07 100644 --- a/swift/obj/ssync_receiver.py +++ b/swift/obj/ssync_receiver.py @@ -156,13 +156,24 @@ class Receiver(object): self.request.environ['eventlet.minimum_write_chunk_size'] = 0 self.device, self.partition, self.policy = \ request_helpers.get_name_and_placement(self.request, 2, 2, False) + self.frag_index = self.node_index = None if self.request.headers.get('X-Backend-Ssync-Frag-Index'): - self.frag_index = int( - self.request.headers['X-Backend-Ssync-Frag-Index']) + try: + self.frag_index = int( + self.request.headers['X-Backend-Ssync-Frag-Index']) + except ValueError: + raise swob.HTTPBadRequest( + 'Invalid X-Backend-Ssync-Frag-Index %r' % + self.request.headers['X-Backend-Ssync-Frag-Index']) if self.request.headers.get('X-Backend-Ssync-Node-Index'): - self.node_index = int( - self.request.headers['X-Backend-Ssync-Node-Index']) + try: + self.node_index = int( + self.request.headers['X-Backend-Ssync-Node-Index']) + except ValueError: + raise swob.HTTPBadRequest( + 'Invalid X-Backend-Ssync-Node-Index %r' % + self.request.headers['X-Backend-Ssync-Node-Index']) if self.node_index != self.frag_index: # a primary node should only receive it's own fragments raise swob.HTTPBadRequest( diff --git a/swift/obj/ssync_sender.py b/swift/obj/ssync_sender.py index cf6fcad6a4..83030782d3 100644 --- a/swift/obj/ssync_sender.py +++ b/swift/obj/ssync_sender.py @@ -133,9 +133,16 @@ class Sender(object): # a sync job must use the node's index for the frag_index of the # rebuilt fragments instead of the frag_index from the job which # will be rebuilding them - self.connection.putheader( - 'X-Backend-Ssync-Frag-Index', self.node.get( - 'index', self.job.get('frag_index', ''))) + frag_index = self.node.get('index', self.job.get('frag_index')) + if frag_index is None: + # replication jobs will not have a frag_index key; + # reconstructor jobs with only tombstones will have a + # frag_index key explicitly set to the value of None - in both + # cases on the wire we write the empty string which + # ssync_receiver will translate to None + frag_index = '' + self.connection.putheader('X-Backend-Ssync-Frag-Index', + frag_index) # a revert job to a handoff will not have a node index self.connection.putheader('X-Backend-Ssync-Node-Index', self.node.get('index', '')) @@ -144,10 +151,10 @@ class Sender(object): self.daemon.node_timeout, 'connect receive'): self.response = self.connection.getresponse() if self.response.status != http.HTTP_OK: - self.response.read() + err_msg = self.response.read()[:1024] raise exceptions.ReplicationException( - 'Expected status %s; got %s' % - (http.HTTP_OK, self.response.status)) + 'Expected status %s; got %s (%s)' % + (http.HTTP_OK, self.response.status, err_msg)) def readline(self): """ diff --git a/test/unit/obj/test_ssync_receiver.py b/test/unit/obj/test_ssync_receiver.py index 38654ffd61..6185f097f5 100644 --- a/test/unit/obj/test_ssync_receiver.py +++ b/test/unit/obj/test_ssync_receiver.py @@ -14,6 +14,7 @@ # limitations under the License. import contextlib +import hashlib import os import shutil import tempfile @@ -26,7 +27,7 @@ import six from swift.common import bufferedhttp from swift.common import exceptions from swift.common import swob -from swift.common.storage_policy import POLICIES +from swift.common.storage_policy import POLICIES, REPL_POLICY from swift.common import utils from swift.common.swob import HTTPException from swift.obj import diskfile @@ -255,6 +256,23 @@ class TestReceiver(unittest.TestCase): self.assertEqual(rcvr.frag_index, 7) self.assertEqual(rcvr.node_index, 7) + @unit.patch_policies() + def test_Receiver_with_invalid_indexes(self): + # update router post policy patch + self.controller._diskfile_router = diskfile.DiskFileRouter( + self.conf, self.controller.logger) + req = swob.Request.blank( + '/sda1/1', + environ={'REQUEST_METHOD': 'SSYNC', + 'HTTP_X_BACKEND_SSYNC_NODE_INDEX': 'None', + 'HTTP_X_BACKEND_SSYNC_FRAG_INDEX': 'None', + 'HTTP_X_BACKEND_STORAGE_POLICY_INDEX': '1'}, + body=':MISSING_CHECK: START\r\n' + ':MISSING_CHECK: END\r\n' + ':UPDATES: START\r\n:UPDATES: END\r\n') + resp = req.get_response(self.controller) + self.assertEqual(resp.status_int, 400) + @unit.patch_policies() def test_Receiver_with_mismatched_indexes(self): # update router post policy patch @@ -1679,30 +1697,56 @@ class TestSsyncRxServer(unittest.TestCase): # server socket. def setUp(self): + self.ts = unit.make_timestamp_iter() self.rx_ip = '127.0.0.1' # dirs self.tmpdir = tempfile.mkdtemp() self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server') - self.devices = os.path.join(self.tempdir, 'srv/node') + self.rx_devices = os.path.join(self.tempdir, 'rx/node') + self.tx_devices = os.path.join(self.tempdir, 'tx/node') for device in ('sda1', 'sdb1'): - os.makedirs(os.path.join(self.devices, device)) + for root in (self.rx_devices, self.tx_devices): + os.makedirs(os.path.join(root, device)) self.conf = { - 'devices': self.devices, + 'devices': self.rx_devices, 'swift_dir': self.tempdir, + 'mount_check': False, } self.rx_logger = debug_logger('test-object-server') - rx_server = server.ObjectController(self.conf, logger=self.rx_logger) - sock = eventlet.listen((self.rx_ip, 0)) + self.rx_app = server.ObjectController(self.conf, logger=self.rx_logger) + self.sock = eventlet.listen((self.rx_ip, 0)) self.rx_server = eventlet.spawn( - eventlet.wsgi.server, sock, rx_server, utils.NullLogger()) - self.rx_port = sock.getsockname()[1] - self.tx_logger = debug_logger('test-reconstructor') + eventlet.wsgi.server, self.sock, self.rx_app, utils.NullLogger()) + self.rx_port = self.sock.getsockname()[1] + self.tx_logger = debug_logger('test-daemon') + self.policy = POLICIES[0] + self.conf['devices'] = self.tx_devices self.daemon = ObjectReconstructor(self.conf, self.tx_logger) - self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]] + self.daemon._diskfile_mgr = self.daemon._df_router[self.policy] + + self.nodes = [ + { + 'device': 'sda1', + 'ip': '127.0.0.1', + 'replication_ip': '127.0.0.1', + 'port': self.rx_port, + 'replication_port': self.rx_port, + }, + { + 'device': 'sdb1', + 'ip': '127.0.0.1', + 'replication_ip': '127.0.0.1', + 'port': self.rx_port, + 'replication_port': self.rx_port, + }, + ] def tearDown(self): + self.rx_server.kill() + self.sock.close() + eventlet.sleep(0) shutil.rmtree(self.tmpdir) def test_SSYNC_disconnect(self): @@ -1770,6 +1814,107 @@ class TestSsyncRxServer(unittest.TestCase): # sanity check that the receiver did not proceed to missing_check self.assertFalse(mock_missing_check.called) + def test_sender_job_missing_frag_node_indexes(self): + # replication jobs don't send frag_index, so we'll use a REPL_POLICY + repl_policy = POLICIES[1] + self.assertEqual(repl_policy.policy_type, REPL_POLICY) + repl_mgr = self.daemon._df_router[repl_policy] + self.daemon._diskfile_mgr = repl_mgr + device = self.nodes[0]['device'] + # create a replicated object, on sender + df = repl_mgr.get_diskfile(device, '0', 'a', 'c', 'o', + policy=repl_policy) + now = next(self.ts) + metadata = { + 'X-Timestamp': now.internal, + 'Content-Type': 'text/plain', + 'Content-Length': '0', + 'ETag': hashlib.md5('').hexdigest(), + } + with df.create() as writer: + writer.write('') + writer.put(metadata) + # sanity the object is on the sender + self.assertTrue(df._datadir.startswith(self.tx_devices)) + # setup a ssync job + suffix = os.path.basename(os.path.dirname(df._datadir)) + job = { + 'partition': 0, + 'policy': repl_policy, + 'device': device, + } + sender = ssync_sender.Sender( + self.daemon, self.nodes[0], job, [suffix]) + success, _ = sender() + self.assertTrue(success) + # sanity object is synced to receiver + remote_df = self.rx_app._diskfile_router[repl_policy].get_diskfile( + device, '0', 'a', 'c', 'o', policy=repl_policy) + self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) + self.assertEqual(remote_df.read_metadata(), metadata) + + def test_send_frag_index_none(self): + # create an ec fragment on the remote node + device = self.nodes[1]['device'] + remote_df = self.rx_app._diskfile_router[self.policy].get_diskfile( + device, '1', 'a', 'c', 'o', policy=self.policy) + ts1 = next(self.ts) + data = 'frag_archive' + metadata = { + 'ETag': hashlib.md5(data).hexdigest(), + 'X-Timestamp': ts1.internal, + 'Content-Length': len(data), + 'X-Object-Sysmeta-Ec-Frag-Index': '3', + } + with remote_df.create() as writer: + writer.write(data) + writer.put(metadata) + writer.commit(ts1) + # create a tombstone on the local node + df = self.daemon._df_router[self.policy].get_diskfile( + device, '1', 'a', 'c', 'o', policy=self.policy) + suffix = os.path.basename(os.path.dirname(df._datadir)) + ts2 = next(self.ts) + df.delete(ts2) + # a reconstructor revert job with only tombstones will have frag_index + # explicitly set to None + job = { + 'frag_index': None, + 'partition': 1, + 'policy': self.policy, + 'device': device, + } + sender = ssync_sender.Sender( + self.daemon, self.nodes[1], job, [suffix]) + success, _ = sender() + self.assertTrue(success) + # diskfile tombstone synced to receiver's datadir with timestamp + self.assertTrue(remote_df._datadir.startswith(self.rx_devices)) + try: + remote_df.read_metadata() + except exceptions.DiskFileDeleted as e: + self.assertEqual(e.timestamp, ts2) + else: + self.fail('Successfully opened remote DiskFile') + + def test_bad_request_invalid_frag_index(self): + with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\ + as mock_missing_check: + self.connection = bufferedhttp.BufferedHTTPConnection( + '127.0.0.1:%s' % self.rx_port) + self.connection.putrequest('SSYNC', '/sda1/0') + self.connection.putheader('Transfer-Encoding', 'chunked') + self.connection.putheader('X-Backend-Ssync-Frag-Index', + 'None') + self.connection.endheaders() + resp = self.connection.getresponse() + self.assertEqual(400, resp.status) + error_msg = resp.read() + self.assertIn("Invalid X-Backend-Ssync-Frag-Index 'None'", error_msg) + resp.close() + # sanity check that the receiver did not proceed to missing_check + self.assertFalse(mock_missing_check.called) + if __name__ == '__main__': unittest.main() diff --git a/test/unit/obj/test_ssync_sender.py b/test/unit/obj/test_ssync_sender.py index 211ab39c46..64cc756134 100644 --- a/test/unit/obj/test_ssync_sender.py +++ b/test/unit/obj/test_ssync_sender.py @@ -312,6 +312,74 @@ class TestSender(BaseTestSender): method_name, mock_method.mock_calls, expected_calls)) + def test_connect_handoff_no_frag(self): + node = dict(replication_ip='1.2.3.4', replication_port=5678, + device='sda1') + job = dict(partition='9', policy=POLICIES[0]) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) + self.sender.suffixes = ['abc'] + with mock.patch( + 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + ) as mock_conn_class: + mock_conn = mock_conn_class.return_value + mock_resp = mock.MagicMock() + mock_resp.status = 200 + mock_conn.getresponse.return_value = mock_resp + self.sender.connect() + mock_conn_class.assert_called_once_with('1.2.3.4:5678') + expectations = { + 'putrequest': [ + mock.call('SSYNC', '/sda1/9'), + ], + 'putheader': [ + mock.call('Transfer-Encoding', 'chunked'), + mock.call('X-Backend-Storage-Policy-Index', 0), + mock.call('X-Backend-Ssync-Frag-Index', ''), + mock.call('X-Backend-Ssync-Node-Index', ''), + ], + 'endheaders': [mock.call()], + } + for method_name, expected_calls in expectations.items(): + mock_method = getattr(mock_conn, method_name) + self.assertEqual(expected_calls, mock_method.mock_calls, + 'connection method "%s" got %r not %r' % ( + method_name, mock_method.mock_calls, + expected_calls)) + + def test_connect_handoff_none_frag(self): + node = dict(replication_ip='1.2.3.4', replication_port=5678, + device='sda1') + job = dict(partition='9', policy=POLICIES[1], frag_index=None) + self.sender = ssync_sender.Sender(self.daemon, node, job, None) + self.sender.suffixes = ['abc'] + with mock.patch( + 'swift.obj.ssync_sender.bufferedhttp.BufferedHTTPConnection' + ) as mock_conn_class: + mock_conn = mock_conn_class.return_value + mock_resp = mock.MagicMock() + mock_resp.status = 200 + mock_conn.getresponse.return_value = mock_resp + self.sender.connect() + mock_conn_class.assert_called_once_with('1.2.3.4:5678') + expectations = { + 'putrequest': [ + mock.call('SSYNC', '/sda1/9'), + ], + 'putheader': [ + mock.call('Transfer-Encoding', 'chunked'), + mock.call('X-Backend-Storage-Policy-Index', 1), + mock.call('X-Backend-Ssync-Frag-Index', ''), + mock.call('X-Backend-Ssync-Node-Index', ''), + ], + 'endheaders': [mock.call()], + } + for method_name, expected_calls in expectations.items(): + mock_method = getattr(mock_conn, method_name) + self.assertEqual(expected_calls, mock_method.mock_calls, + 'connection method "%s" got %r not %r' % ( + method_name, mock_method.mock_calls, + expected_calls)) + def test_connect_handoff_replicated(self): node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') @@ -523,6 +591,7 @@ class TestSender(BaseTestSender): self.assertEqual(candidates, {}) def test_connect_send_timeout(self): + self.daemon.node_timeout = 0.01 # make disconnect fail fast self.daemon.conn_timeout = 0.01 node = dict(replication_ip='1.2.3.4', replication_port=5678, device='sda1') @@ -578,6 +647,7 @@ class TestSender(BaseTestSender): def getresponse(*args, **kwargs): response = FakeResponse() response.status = 503 + response.read = lambda: 'an error message' return response missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check' @@ -594,6 +664,7 @@ class TestSender(BaseTestSender): for line in error_lines: self.assertTrue(line.startswith( '1.2.3.4:5678/sda1/9 Expected status 200; got 503')) + self.assertIn('an error message', line) # sanity check that Sender did not proceed to missing_check exchange self.assertFalse(mock_missing_check.called)