Make SSYNC receiver return a reponse when initial checks fail
The ssync Receiver performs some checks on request parameters in initialize_request() before starting the exchange of missing hashes and updates e.g. the destination device must be available; the policy must be valid. Currently if any of these checks fails then the receiver just closes the connection, so the Sender gets no useful response code and noise is generated in logs by httplib and wsgi Exceptions. This change moves the request parameter checks to the Receiver constructor so that the HTTPExceptions raised are actually sent as responses. (The 'connection close' exception handling still applies once the 'missing_check' and 'updates' handshakes are in progress.) Moving initialize_request() revealed the following lurking bug: * initialize_request() sets req.environ['eventlet.minimum_write_chunk_size'] = 0 * this was previously ineffective because the Response environ had already been copied from Request environ before this value was set, so the Response never used the value :/ * Now that it is effective (a good thing) it causes the empty string yielded by the receiver when there are no missing hashes in missing_checks() to be sent to the sender immediately. This makes the Sender.readline() think there has been an early disconnect and raise an Exception (a bad thing), as revealed by test/unit/obj/test_ssync_sender.py:TestSsync.test_nothing_to_sync The fix for this is to simply make the receiver skip sending the empty string if there are no missing object_hashes. Change-Id: I036a6919fead6e970505dccbb0da7bfbdf8cecc3
This commit is contained in:
parent
ccb07cfd4d
commit
3aa06f185a
@ -69,6 +69,7 @@ class Receiver(object):
|
||||
# raised during processing because otherwise the sender could send for
|
||||
# quite some time before realizing it was all in vain.
|
||||
self.disconnect = True
|
||||
self.initialize_request()
|
||||
|
||||
def __call__(self):
|
||||
"""
|
||||
@ -88,9 +89,7 @@ class Receiver(object):
|
||||
try:
|
||||
# Double try blocks in case our main error handlers fail.
|
||||
try:
|
||||
# initialize_request is for preamble items that can be done
|
||||
# outside a replication semaphore lock.
|
||||
for data in self.initialize_request():
|
||||
for data in self._ensure_flush():
|
||||
yield data
|
||||
# If semaphore is in use, try to acquire it, non-blocking, and
|
||||
# return a 503 if it fails.
|
||||
@ -178,8 +177,6 @@ class Receiver(object):
|
||||
if not self.diskfile_mgr.get_dev_path(self.device):
|
||||
raise swob.HTTPInsufficientStorage(drive=self.device)
|
||||
self.fp = self.request.environ['wsgi.input']
|
||||
for data in self._ensure_flush():
|
||||
yield data
|
||||
|
||||
def missing_check(self):
|
||||
"""
|
||||
@ -249,7 +246,8 @@ class Receiver(object):
|
||||
if want:
|
||||
object_hashes.append(object_hash)
|
||||
yield ':MISSING_CHECK: START\r\n'
|
||||
yield '\r\n'.join(object_hashes)
|
||||
if object_hashes:
|
||||
yield '\r\n'.join(object_hashes)
|
||||
yield '\r\n'
|
||||
yield ':MISSING_CHECK: END\r\n'
|
||||
for data in self._ensure_flush():
|
||||
|
@ -4410,7 +4410,7 @@ class TestObjectController(unittest.TestCase):
|
||||
self.assertEqual(resp.status_int, 507)
|
||||
|
||||
def test_SSYNC_can_be_called(self):
|
||||
req = Request.blank('/sda1/p/other/suff',
|
||||
req = Request.blank('/sda1/0',
|
||||
environ={'REQUEST_METHOD': 'SSYNC'},
|
||||
headers={})
|
||||
resp = req.get_response(self.object_controller)
|
||||
|
@ -23,15 +23,19 @@ import unittest
|
||||
import eventlet
|
||||
import mock
|
||||
|
||||
from swift.common import bufferedhttp
|
||||
from swift.common import exceptions
|
||||
from swift.common import swob
|
||||
from swift.common import utils
|
||||
from swift.common.storage_policy import POLICIES
|
||||
from swift.common import utils
|
||||
from swift.common.swob import HTTPException
|
||||
from swift.obj import diskfile
|
||||
from swift.obj import server
|
||||
from swift.obj import ssync_receiver
|
||||
from swift.obj.reconstructor import ObjectReconstructor
|
||||
|
||||
from test import unit
|
||||
from test.unit import debug_logger, patch_policies
|
||||
|
||||
|
||||
@unit.patch_policies()
|
||||
@ -176,9 +180,12 @@ class TestReceiver(unittest.TestCase):
|
||||
':MISSING_CHECK: END\r\n'
|
||||
':UPDATES: START\r\n:UPDATES: END\r\n')
|
||||
self.controller.logger = mock.MagicMock()
|
||||
receiver = ssync_receiver.Receiver(self.controller, req)
|
||||
body_lines = [chunk.strip() for chunk in receiver() if chunk.strip()]
|
||||
self.assertEqual(body_lines, [":ERROR: 503 'No policy with index 2'"])
|
||||
try:
|
||||
ssync_receiver.Receiver(self.controller, req)
|
||||
self.fail('Expected HTTPException to be raised.')
|
||||
except HTTPException as err:
|
||||
self.assertEqual('503 Service Unavailable', err.status)
|
||||
self.assertEqual('No policy with index 2', err.body)
|
||||
|
||||
@unit.patch_policies()
|
||||
def test_Receiver_with_frag_index_header(self):
|
||||
@ -233,8 +240,8 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 400 'Invalid path: /device'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
["Invalid path: /device"])
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertFalse(mocked_replication_semaphore.acquire.called)
|
||||
self.assertFalse(mocked_replication_semaphore.release.called)
|
||||
|
||||
@ -246,8 +253,8 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 400 'Invalid path: /device/'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
["Invalid path: /device/"])
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertFalse(mocked_replication_semaphore.acquire.called)
|
||||
self.assertFalse(mocked_replication_semaphore.release.called)
|
||||
|
||||
@ -273,8 +280,8 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 400 'Invalid path: /device/partition/junk'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
["Invalid path: /device/partition/junk"])
|
||||
self.assertEqual(resp.status_int, 400)
|
||||
self.assertFalse(mocked_replication_semaphore.acquire.called)
|
||||
self.assertFalse(mocked_replication_semaphore.release.called)
|
||||
|
||||
@ -315,10 +322,10 @@ class TestReceiver(unittest.TestCase):
|
||||
resp = req.get_response(self.controller)
|
||||
self.assertEqual(
|
||||
self.body_lines(resp.body),
|
||||
[":ERROR: 507 '<html><h1>Insufficient Storage</h1><p>There "
|
||||
["<html><h1>Insufficient Storage</h1><p>There "
|
||||
"was not enough space to save the resource. Drive: "
|
||||
"device</p></html>'"])
|
||||
self.assertEqual(resp.status_int, 200)
|
||||
"device</p></html>"])
|
||||
self.assertEqual(resp.status_int, 507)
|
||||
mocked_check_mount.assert_called_once_with(
|
||||
self.controller._diskfile_router[POLICIES.legacy].devices,
|
||||
'device')
|
||||
@ -1476,5 +1483,74 @@ class TestReceiver(unittest.TestCase):
|
||||
self.assertEqual(_requests, [])
|
||||
|
||||
|
||||
@patch_policies(with_ec_default=True)
|
||||
class TestSsyncRxServer(unittest.TestCase):
|
||||
# Tests to verify behavior of SSYNC requests sent to an object
|
||||
# server socket.
|
||||
|
||||
def setUp(self):
|
||||
self.rx_ip = '127.0.0.1'
|
||||
# dirs
|
||||
self.tmpdir = tempfile.mkdtemp()
|
||||
self.tempdir = os.path.join(self.tmpdir, 'tmp_test_obj_server')
|
||||
|
||||
self.devices = os.path.join(self.tempdir, 'srv/node')
|
||||
for device in ('sda1', 'sdb1'):
|
||||
os.makedirs(os.path.join(self.devices, device))
|
||||
|
||||
self.conf = {
|
||||
'devices': self.devices,
|
||||
'swift_dir': self.tempdir,
|
||||
}
|
||||
self.rx_logger = debug_logger('test-object-server')
|
||||
rx_server = server.ObjectController(self.conf, logger=self.rx_logger)
|
||||
sock = eventlet.listen((self.rx_ip, 0))
|
||||
self.rx_server = eventlet.spawn(
|
||||
eventlet.wsgi.server, sock, rx_server, utils.NullLogger())
|
||||
self.rx_port = sock.getsockname()[1]
|
||||
self.tx_logger = debug_logger('test-reconstructor')
|
||||
self.daemon = ObjectReconstructor(self.conf, self.tx_logger)
|
||||
self.daemon._diskfile_mgr = self.daemon._df_router[POLICIES[0]]
|
||||
|
||||
def tearDown(self):
|
||||
shutil.rmtree(self.tmpdir)
|
||||
|
||||
def test_SSYNC_device_not_available(self):
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\
|
||||
as mock_missing_check:
|
||||
self.connection = bufferedhttp.BufferedHTTPConnection(
|
||||
'127.0.0.1:%s' % self.rx_port)
|
||||
self.connection.putrequest('SSYNC', '/sdc1/0')
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader('X-Backend-Storage-Policy-Index',
|
||||
int(POLICIES[0]))
|
||||
self.connection.endheaders()
|
||||
resp = self.connection.getresponse()
|
||||
self.assertEqual(507, resp.status)
|
||||
resp.read()
|
||||
resp.close()
|
||||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
def test_SSYNC_invalid_policy(self):
|
||||
valid_indices = sorted([int(policy) for policy in POLICIES])
|
||||
bad_index = valid_indices[-1] + 1
|
||||
with mock.patch('swift.obj.ssync_receiver.Receiver.missing_check')\
|
||||
as mock_missing_check:
|
||||
self.connection = bufferedhttp.BufferedHTTPConnection(
|
||||
'127.0.0.1:%s' % self.rx_port)
|
||||
self.connection.putrequest('SSYNC', '/sda1/0')
|
||||
self.connection.putheader('Transfer-Encoding', 'chunked')
|
||||
self.connection.putheader('X-Backend-Storage-Policy-Index',
|
||||
bad_index)
|
||||
self.connection.endheaders()
|
||||
resp = self.connection.getresponse()
|
||||
self.assertEqual(503, resp.status)
|
||||
resp.read()
|
||||
resp.close()
|
||||
# sanity check that the receiver did not proceed to missing_check
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
unittest.main()
|
||||
|
@ -497,8 +497,6 @@ class TestSender(BaseTestSender):
|
||||
node = dict(replication_ip='1.2.3.4', replication_port=5678,
|
||||
device='sda1', index=0)
|
||||
job = dict(partition='9', policy=POLICIES.legacy)
|
||||
self.sender = ssync_sender.Sender(self.daemon, node, job, None)
|
||||
self.sender.suffixes = ['abc']
|
||||
|
||||
class FakeBufferedHTTPConnection(NullBufferedHTTPConnection):
|
||||
def getresponse(*args, **kwargs):
|
||||
@ -506,16 +504,22 @@ class TestSender(BaseTestSender):
|
||||
response.status = 503
|
||||
return response
|
||||
|
||||
with mock.patch.object(
|
||||
missing_check_fn = 'swift.obj.ssync_sender.Sender.missing_check'
|
||||
with mock.patch(missing_check_fn) as mock_missing_check:
|
||||
with mock.patch.object(
|
||||
ssync_sender.bufferedhttp, 'BufferedHTTPConnection',
|
||||
FakeBufferedHTTPConnection):
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, {})
|
||||
FakeBufferedHTTPConnection):
|
||||
self.sender = ssync_sender.Sender(
|
||||
self.daemon, node, job, ['abc'])
|
||||
success, candidates = self.sender()
|
||||
self.assertFalse(success)
|
||||
self.assertEquals(candidates, {})
|
||||
error_lines = self.daemon.logger.get_lines_for_level('error')
|
||||
for line in error_lines:
|
||||
self.assertTrue(line.startswith(
|
||||
'1.2.3.4:5678/sda1/9 Expected status 200; got 503'))
|
||||
# sanity check that Sender did not proceed to missing_check exchange
|
||||
self.assertFalse(mock_missing_check.called)
|
||||
|
||||
def test_readline_newline_in_buffer(self):
|
||||
self.sender.response_buffer = 'Has a newline already.\r\nOkay.'
|
||||
|
Loading…
x
Reference in New Issue
Block a user