4601548dab
If we move it to constraints it's more globally accessible in our code, but more importantly it's more obvious to ops that everything breaks if you try to mis-configure different values per-service. Change-Id: Ib8f7d08bc48da12be5671abe91a17ae2b49ecfee
700 lines
32 KiB
Python
700 lines
32 KiB
Python
# Copyright (c) 2010-2012 OpenStack Foundation
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License");
|
|
# you may not use this file except in compliance with the License.
|
|
# You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS,
|
|
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
|
|
# implied.
|
|
# See the License for the specific language governing permissions and
|
|
# limitations under the License.
|
|
|
|
import unittest
|
|
import mock
|
|
import tempfile
|
|
import time
|
|
|
|
from six.moves import range
|
|
from test.unit import mock_check_drive
|
|
|
|
from swift.common.swob import Request, HTTPException
|
|
from swift.common.http import HTTP_REQUEST_ENTITY_TOO_LARGE, \
|
|
HTTP_BAD_REQUEST, HTTP_LENGTH_REQUIRED, HTTP_NOT_IMPLEMENTED
|
|
from swift.common import constraints, utils
|
|
from swift.common.constraints import MAX_OBJECT_NAME_LENGTH
|
|
|
|
|
|
class TestConstraints(unittest.TestCase):
|
|
def test_check_metadata_empty(self):
|
|
headers = {}
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
def test_check_metadata_good(self):
|
|
headers = {'X-Object-Meta-Name': 'Value'}
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
def test_check_metadata_empty_name(self):
|
|
headers = {'X-Object-Meta-': 'Value'}
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Metadata name cannot be empty', resp.body)
|
|
|
|
def test_check_metadata_non_utf8(self):
|
|
# Consciously using native "WSGI strings" in headers
|
|
headers = {'X-Account-Meta-Foo': '\xff'}
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'account')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Metadata must be valid UTF-8', resp.body)
|
|
|
|
headers = {'X-Container-Meta-\xff': 'foo'}
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'container')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Metadata must be valid UTF-8', resp.body)
|
|
# Object's OK; its metadata isn't serialized as JSON
|
|
headers = {'X-Object-Meta-Foo': '\xff'}
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
def test_check_metadata_name_length(self):
|
|
name = 'a' * constraints.MAX_META_NAME_LENGTH
|
|
headers = {'X-Object-Meta-%s' % name: 'v'}
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
name = 'a' * (constraints.MAX_META_NAME_LENGTH + 1)
|
|
headers = {'X-Object-Meta-%s' % name: 'v'}
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(
|
|
b'x-object-meta-%s' % name.encode('ascii'), resp.body.lower())
|
|
self.assertIn(b'Metadata name too long', resp.body)
|
|
|
|
def test_check_metadata_value_length(self):
|
|
value = 'a' * constraints.MAX_META_VALUE_LENGTH
|
|
headers = {'X-Object-Meta-Name': value}
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
value = 'a' * (constraints.MAX_META_VALUE_LENGTH + 1)
|
|
headers = {'X-Object-Meta-Name': value}
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'x-object-meta-name', resp.body.lower())
|
|
self.assertIn(
|
|
str(constraints.MAX_META_VALUE_LENGTH).encode('ascii'), resp.body)
|
|
self.assertIn(b'Metadata value longer than 256', resp.body)
|
|
|
|
def test_check_metadata_count(self):
|
|
headers = {}
|
|
for x in range(constraints.MAX_META_COUNT):
|
|
headers['X-Object-Meta-%d' % x] = 'v'
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
|
|
headers['X-Object-Meta-Too-Many'] = 'v'
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Too many metadata items', resp.body)
|
|
|
|
def test_check_metadata_size(self):
|
|
headers = {}
|
|
size = 0
|
|
chunk = constraints.MAX_META_NAME_LENGTH + \
|
|
constraints.MAX_META_VALUE_LENGTH
|
|
x = 0
|
|
while size + chunk < constraints.MAX_META_OVERALL_SIZE:
|
|
headers['X-Object-Meta-%04d%s' %
|
|
(x, 'a' * (constraints.MAX_META_NAME_LENGTH - 4))] = \
|
|
'v' * constraints.MAX_META_VALUE_LENGTH
|
|
size += chunk
|
|
x += 1
|
|
self.assertIsNone(constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object'))
|
|
# add two more headers in case adding just one falls exactly on the
|
|
# limit (eg one header adds 1024 and the limit is 2048)
|
|
headers['X-Object-Meta-%04d%s' %
|
|
(x, 'a' * (constraints.MAX_META_NAME_LENGTH - 4))] = \
|
|
'v' * constraints.MAX_META_VALUE_LENGTH
|
|
headers['X-Object-Meta-%04d%s' %
|
|
(x + 1, 'a' * (constraints.MAX_META_NAME_LENGTH - 4))] = \
|
|
'v' * constraints.MAX_META_VALUE_LENGTH
|
|
resp = constraints.check_metadata(Request.blank(
|
|
'/', headers=headers), 'object')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Total metadata too large', resp.body)
|
|
|
|
def test_check_object_creation_content_length(self):
|
|
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE),
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name'))
|
|
|
|
headers = {'Content-Length': str(constraints.MAX_FILE_SIZE + 1),
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_REQUEST_ENTITY_TOO_LARGE)
|
|
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name'))
|
|
|
|
headers = {'Transfer-Encoding': 'gzip',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Invalid Transfer-Encoding header value', resp.body)
|
|
|
|
headers = {'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_LENGTH_REQUIRED)
|
|
|
|
headers = {'Content-Length': 'abc',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Invalid Content-Length header value', resp.body)
|
|
|
|
headers = {'Transfer-Encoding': 'gzip,chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_NOT_IMPLEMENTED)
|
|
|
|
def test_check_object_creation_name_length(self):
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
name = 'o' * constraints.MAX_OBJECT_NAME_LENGTH
|
|
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), name))
|
|
|
|
name = 'o' * (MAX_OBJECT_NAME_LENGTH + 1)
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), name)
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Object name length of %d longer than %d' %
|
|
(MAX_OBJECT_NAME_LENGTH + 1, MAX_OBJECT_NAME_LENGTH),
|
|
resp.body)
|
|
|
|
def test_check_object_creation_content_type(self):
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Timestamp': str(time.time())}
|
|
self.assertIsNone(constraints.check_object_creation(Request.blank(
|
|
'/', headers=headers), 'object_name'))
|
|
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'No content type', resp.body)
|
|
|
|
def test_check_object_creation_bad_content_type(self):
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': '\xff\xff',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Content-Type', resp.body)
|
|
|
|
def test_check_object_creation_bad_delete_headers(self):
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-After': 'abc',
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Non-integer X-Delete-After', resp.body)
|
|
|
|
t = str(int(time.time() - 60))
|
|
headers = {'Transfer-Encoding': 'chunked',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-At': t,
|
|
'X-Timestamp': str(time.time())}
|
|
resp = constraints.check_object_creation(
|
|
Request.blank('/', headers=headers), 'object_name')
|
|
self.assertEqual(resp.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-At in past', resp.body)
|
|
|
|
def test_check_delete_headers(self):
|
|
# x-delete-at value should be relative to the request timestamp rather
|
|
# than time.time() so separate the two to ensure the checks are robust
|
|
ts = utils.Timestamp(time.time() + 100)
|
|
|
|
# X-Delete-After
|
|
headers = {'X-Delete-After': '600',
|
|
'X-Timestamp': ts.internal}
|
|
req = constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertIsInstance(req, Request)
|
|
self.assertIn('x-delete-at', req.headers)
|
|
self.assertNotIn('x-delete-after', req.headers)
|
|
expected_delete_at = str(int(ts) + 600)
|
|
self.assertEqual(req.headers.get('X-Delete-At'), expected_delete_at)
|
|
|
|
headers = {'X-Delete-After': 'abc',
|
|
'X-Timestamp': ts.internal}
|
|
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Non-integer X-Delete-After', cm.exception.body)
|
|
|
|
headers = {'X-Delete-After': '60.1',
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Non-integer X-Delete-After', cm.exception.body)
|
|
|
|
headers = {'X-Delete-After': '-1',
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-After in past', cm.exception.body)
|
|
|
|
headers = {'X-Delete-After': '0',
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-After in past', cm.exception.body)
|
|
|
|
# x-delete-after = 0 disallowed when it results in x-delete-at equal to
|
|
# the timestamp
|
|
headers = {'X-Delete-After': '0',
|
|
'X-Timestamp': utils.Timestamp(int(ts)).internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-After in past', cm.exception.body)
|
|
|
|
# X-Delete-At
|
|
delete_at = str(int(ts) + 100)
|
|
headers = {'X-Delete-At': delete_at,
|
|
'X-Timestamp': ts.internal}
|
|
req = constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertIsInstance(req, Request)
|
|
self.assertIn('x-delete-at', req.headers)
|
|
self.assertEqual(req.headers.get('X-Delete-At'), delete_at)
|
|
|
|
headers = {'X-Delete-At': 'abc',
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Non-integer X-Delete-At', cm.exception.body)
|
|
|
|
delete_at = str(int(ts) + 100) + '.1'
|
|
headers = {'X-Delete-At': delete_at,
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'Non-integer X-Delete-At', cm.exception.body)
|
|
|
|
delete_at = str(int(ts) - 1)
|
|
headers = {'X-Delete-At': delete_at,
|
|
'X-Timestamp': ts.internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-At in past', cm.exception.body)
|
|
|
|
# x-delete-at disallowed when exactly equal to timestamp
|
|
delete_at = str(int(ts))
|
|
headers = {'X-Delete-At': delete_at,
|
|
'X-Timestamp': utils.Timestamp(int(ts)).internal}
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_delete_headers(
|
|
Request.blank('/', headers=headers))
|
|
self.assertEqual(cm.exception.status_int, HTTP_BAD_REQUEST)
|
|
self.assertIn(b'X-Delete-At in past', cm.exception.body)
|
|
|
|
def test_check_delete_headers_removes_delete_after(self):
|
|
ts = utils.Timestamp.now()
|
|
headers = {'Content-Length': '0',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-After': '42',
|
|
'X-Delete-At': str(int(ts) + 40),
|
|
'X-Timestamp': ts.internal}
|
|
req = Request.blank('/', headers=headers)
|
|
constraints.check_delete_headers(req)
|
|
self.assertNotIn('X-Delete-After', req.headers)
|
|
self.assertEqual(req.headers['X-Delete-At'], str(int(ts) + 42))
|
|
|
|
def test_check_delete_headers_sets_delete_at(self):
|
|
ts = utils.Timestamp.now()
|
|
expected = str(int(ts) + 1000)
|
|
# check delete-at is passed through
|
|
headers = {'Content-Length': '0',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-At': expected,
|
|
'X-Timestamp': ts.internal}
|
|
req = Request.blank('/', headers=headers)
|
|
constraints.check_delete_headers(req)
|
|
self.assertIn('X-Delete-At', req.headers)
|
|
self.assertEqual(req.headers['X-Delete-At'], expected)
|
|
|
|
# check delete-after is converted to delete-at
|
|
headers = {'Content-Length': '0',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-After': '42',
|
|
'X-Timestamp': ts.internal}
|
|
req = Request.blank('/', headers=headers)
|
|
constraints.check_delete_headers(req)
|
|
self.assertIn('X-Delete-At', req.headers)
|
|
expected = str(int(ts) + 42)
|
|
self.assertEqual(req.headers['X-Delete-At'], expected)
|
|
|
|
# check delete-after takes precedence over delete-at
|
|
headers = {'Content-Length': '0',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-After': '42',
|
|
'X-Delete-At': str(int(ts) + 40),
|
|
'X-Timestamp': ts.internal}
|
|
req = Request.blank('/', headers=headers)
|
|
constraints.check_delete_headers(req)
|
|
self.assertIn('X-Delete-At', req.headers)
|
|
self.assertEqual(req.headers['X-Delete-At'], expected)
|
|
|
|
headers = {'Content-Length': '0',
|
|
'Content-Type': 'text/plain',
|
|
'X-Delete-After': '42',
|
|
'X-Delete-At': str(int(ts) + 44),
|
|
'X-Timestamp': ts.internal}
|
|
req = Request.blank('/', headers=headers)
|
|
constraints.check_delete_headers(req)
|
|
self.assertIn('X-Delete-At', req.headers)
|
|
self.assertEqual(req.headers['X-Delete-At'], expected)
|
|
|
|
def test_check_drive_invalid_path(self):
|
|
root = '/srv/'
|
|
with mock_check_drive() as mocks:
|
|
drive = 'foo?bar'
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_dir(root, drive)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'%s is not a valid drive name' % drive)
|
|
|
|
drive = 'foo bar'
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_mount(root, drive)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'%s is not a valid drive name' % drive)
|
|
|
|
drive = 'foo/bar'
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_drive(root, drive, True)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'%s is not a valid drive name' % drive)
|
|
|
|
drive = 'foo%bar'
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_drive(root, drive, False)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'%s is not a valid drive name' % drive)
|
|
self.assertEqual([], mocks['isdir'].call_args_list)
|
|
self.assertEqual([], mocks['ismount'].call_args_list)
|
|
|
|
def test_check_drive_ismount(self):
|
|
root = '/srv'
|
|
path = 'sdb1'
|
|
with mock_check_drive(ismount=True) as mocks:
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_dir(root, path)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'/srv/sdb1 is not a directory')
|
|
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_drive(root, path, False)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'/srv/sdb1 is not a directory')
|
|
|
|
self.assertEqual([mock.call('/srv/sdb1'), mock.call('/srv/sdb1')],
|
|
mocks['isdir'].call_args_list)
|
|
self.assertEqual([], mocks['ismount'].call_args_list)
|
|
|
|
with mock_check_drive(ismount=True) as mocks:
|
|
self.assertEqual('/srv/sdb1', constraints.check_mount(root, path))
|
|
self.assertEqual('/srv/sdb1', constraints.check_drive(
|
|
root, path, True))
|
|
self.assertEqual([], mocks['isdir'].call_args_list)
|
|
self.assertEqual([mock.call('/srv/sdb1'), mock.call('/srv/sdb1')],
|
|
mocks['ismount'].call_args_list)
|
|
|
|
def test_check_drive_isdir(self):
|
|
root = '/srv'
|
|
path = 'sdb2'
|
|
with mock_check_drive(isdir=True) as mocks:
|
|
self.assertEqual('/srv/sdb2', constraints.check_dir(root, path))
|
|
self.assertEqual('/srv/sdb2', constraints.check_drive(
|
|
root, path, False))
|
|
self.assertEqual([mock.call('/srv/sdb2'), mock.call('/srv/sdb2')],
|
|
mocks['isdir'].call_args_list)
|
|
self.assertEqual([], mocks['ismount'].call_args_list)
|
|
|
|
with mock_check_drive(isdir=True) as mocks:
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_mount(root, path)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'/srv/sdb2 is not mounted')
|
|
|
|
with self.assertRaises(ValueError) as exc_mgr:
|
|
constraints.check_drive(root, path, True)
|
|
self.assertEqual(str(exc_mgr.exception),
|
|
'/srv/sdb2 is not mounted')
|
|
|
|
self.assertEqual([], mocks['isdir'].call_args_list)
|
|
self.assertEqual([mock.call('/srv/sdb2'), mock.call('/srv/sdb2')],
|
|
mocks['ismount'].call_args_list)
|
|
|
|
def test_check_float(self):
|
|
self.assertFalse(constraints.check_float(''))
|
|
self.assertTrue(constraints.check_float('0'))
|
|
|
|
def test_valid_timestamp(self):
|
|
self.assertRaises(HTTPException,
|
|
constraints.valid_timestamp,
|
|
Request.blank('/'))
|
|
self.assertRaises(HTTPException,
|
|
constraints.valid_timestamp,
|
|
Request.blank('/', headers={
|
|
'X-Timestamp': 'asdf'}))
|
|
timestamp = utils.Timestamp.now()
|
|
req = Request.blank('/', headers={'X-Timestamp': timestamp.internal})
|
|
self.assertEqual(timestamp, constraints.valid_timestamp(req))
|
|
req = Request.blank('/', headers={'X-Timestamp': timestamp.normal})
|
|
self.assertEqual(timestamp, constraints.valid_timestamp(req))
|
|
|
|
def test_check_utf8(self):
|
|
unicode_sample = u'\uc77c\uc601'
|
|
unicode_with_reserved = u'abc%sdef' % utils.RESERVED_STR
|
|
|
|
# Some false-y values
|
|
self.assertFalse(constraints.check_utf8(None))
|
|
self.assertFalse(constraints.check_utf8(''))
|
|
self.assertFalse(constraints.check_utf8(b''))
|
|
self.assertFalse(constraints.check_utf8(u''))
|
|
|
|
# invalid utf8 bytes
|
|
self.assertFalse(constraints.check_utf8(
|
|
unicode_sample.encode('utf-8')[::-1]))
|
|
# unicode with null
|
|
self.assertFalse(constraints.check_utf8(unicode_with_reserved))
|
|
# utf8 bytes with null
|
|
self.assertFalse(constraints.check_utf8(
|
|
unicode_with_reserved.encode('utf8')))
|
|
|
|
self.assertTrue(constraints.check_utf8('this is ascii and utf-8, too'))
|
|
self.assertTrue(constraints.check_utf8(unicode_sample))
|
|
self.assertTrue(constraints.check_utf8(unicode_sample.encode('utf8')))
|
|
|
|
def test_check_utf8_internal(self):
|
|
unicode_with_reserved = u'abc%sdef' % utils.RESERVED_STR
|
|
# sanity
|
|
self.assertFalse(constraints.check_utf8(unicode_with_reserved))
|
|
self.assertTrue(constraints.check_utf8('foobar', internal=True))
|
|
# internal allows reserved names
|
|
self.assertTrue(constraints.check_utf8(unicode_with_reserved,
|
|
internal=True))
|
|
|
|
def test_check_utf8_non_canonical(self):
|
|
self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbc\xed\xbc\xb8'))
|
|
self.assertTrue(constraints.check_utf8(u'\U0001f338'))
|
|
self.assertTrue(constraints.check_utf8(b'\xf0\x9f\x8c\xb8'))
|
|
self.assertTrue(constraints.check_utf8(u'\U0001f338'.encode('utf8')))
|
|
self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbd\xed\xb9\x88'))
|
|
self.assertTrue(constraints.check_utf8(u'\U0001f648'))
|
|
|
|
def test_check_utf8_lone_surrogates(self):
|
|
self.assertFalse(constraints.check_utf8(b'\xed\xa0\xbc'))
|
|
self.assertFalse(constraints.check_utf8(u'\ud83c'))
|
|
self.assertFalse(constraints.check_utf8(b'\xed\xb9\x88'))
|
|
self.assertFalse(constraints.check_utf8(u'\ude48'))
|
|
|
|
self.assertFalse(constraints.check_utf8(u'\ud800'))
|
|
self.assertFalse(constraints.check_utf8(u'\udc00'))
|
|
self.assertFalse(constraints.check_utf8(u'\udcff'))
|
|
self.assertFalse(constraints.check_utf8(u'\udfff'))
|
|
|
|
def test_validate_bad_meta(self):
|
|
req = Request.blank(
|
|
'/v/a/c/o',
|
|
headers={'x-object-meta-hello':
|
|
'ab' * constraints.MAX_HEADER_SIZE})
|
|
self.assertEqual(constraints.check_metadata(req, 'object').status_int,
|
|
HTTP_BAD_REQUEST)
|
|
resp = constraints.check_metadata(req, 'object')
|
|
self.assertIsNotNone(resp)
|
|
self.assertIn(b'x-object-meta-hello', resp.body.lower())
|
|
|
|
def test_validate_constraints(self):
|
|
c = constraints
|
|
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_NAME_LENGTH)
|
|
self.assertGreater(c.MAX_META_OVERALL_SIZE, c.MAX_META_VALUE_LENGTH)
|
|
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_NAME_LENGTH)
|
|
self.assertGreater(c.MAX_HEADER_SIZE, c.MAX_META_VALUE_LENGTH)
|
|
|
|
def test_check_account_format(self):
|
|
req = Request.blank(
|
|
'/v/a/c/o',
|
|
headers={'X-Copy-From-Account': 'account/with/slashes'})
|
|
self.assertRaises(HTTPException,
|
|
constraints.check_account_format,
|
|
req, req.headers['X-Copy-From-Account'])
|
|
req = Request.blank(
|
|
'/v/a/c/o',
|
|
headers={'X-Copy-From-Account': ''})
|
|
self.assertRaises(HTTPException,
|
|
constraints.check_account_format,
|
|
req, req.headers['X-Copy-From-Account'])
|
|
|
|
def test_check_container_format(self):
|
|
invalid_versions_locations = (
|
|
'container/with/slashes',
|
|
'', # empty
|
|
)
|
|
for versions_location in invalid_versions_locations:
|
|
req = Request.blank(
|
|
'/v/a/c/o', headers={
|
|
'X-Versions-Location': versions_location})
|
|
with self.assertRaises(HTTPException) as cm:
|
|
constraints.check_container_format(
|
|
req, req.headers['X-Versions-Location'])
|
|
self.assertTrue(cm.exception.body.startswith(
|
|
b'Container name cannot'))
|
|
|
|
def test_valid_api_version(self):
|
|
version = 'v1'
|
|
self.assertTrue(constraints.valid_api_version(version))
|
|
|
|
version = 'v1.0'
|
|
self.assertTrue(constraints.valid_api_version(version))
|
|
|
|
version = 'v2'
|
|
self.assertFalse(constraints.valid_api_version(version))
|
|
|
|
|
|
class TestConstraintsConfig(unittest.TestCase):
|
|
|
|
def test_default_constraints(self):
|
|
for key in constraints.DEFAULT_CONSTRAINTS:
|
|
# if there is local over-rides in swift.conf we just continue on
|
|
if key in constraints.OVERRIDE_CONSTRAINTS:
|
|
continue
|
|
# module level attrs (that aren't in OVERRIDE) should have the
|
|
# same value as the DEFAULT map
|
|
module_level_value = getattr(constraints, key.upper())
|
|
self.assertEqual(constraints.DEFAULT_CONSTRAINTS[key],
|
|
module_level_value)
|
|
|
|
def test_effective_constraints(self):
|
|
for key in constraints.DEFAULT_CONSTRAINTS:
|
|
# module level attrs should always mirror the same value as the
|
|
# EFFECTIVE map
|
|
module_level_value = getattr(constraints, key.upper())
|
|
self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS[key],
|
|
module_level_value)
|
|
# if there are local over-rides in swift.conf those should be
|
|
# reflected in the EFFECTIVE, otherwise we expect the DEFAULTs
|
|
self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS[key],
|
|
constraints.OVERRIDE_CONSTRAINTS.get(
|
|
key, constraints.DEFAULT_CONSTRAINTS[key]))
|
|
|
|
def test_override_constraints(self):
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(b'[swift-constraints]\n')
|
|
# set everything to 1
|
|
for key in constraints.DEFAULT_CONSTRAINTS:
|
|
f.write(b'%s = 1\n' % key.encode('ascii'))
|
|
f.flush()
|
|
with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name):
|
|
constraints.reload_constraints()
|
|
for key, default in constraints.DEFAULT_CONSTRAINTS.items():
|
|
# module level attrs should all be 1
|
|
module_level_value = getattr(constraints, key.upper())
|
|
if isinstance(default, int):
|
|
self.assertEqual(module_level_value, 1)
|
|
elif isinstance(default, str):
|
|
self.assertEqual(module_level_value, '1')
|
|
else:
|
|
self.assertEqual(module_level_value, ['1'])
|
|
# all keys should be in OVERRIDE
|
|
self.assertEqual(constraints.OVERRIDE_CONSTRAINTS[key],
|
|
module_level_value)
|
|
# module level attrs should always mirror the same value as
|
|
# the EFFECTIVE map
|
|
self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS[key],
|
|
module_level_value)
|
|
finally:
|
|
constraints.reload_constraints()
|
|
|
|
def test_reload_reset(self):
|
|
try:
|
|
with tempfile.NamedTemporaryFile() as f:
|
|
f.write(b'[swift-constraints]\n')
|
|
# set everything to 1
|
|
for key in constraints.DEFAULT_CONSTRAINTS:
|
|
f.write(b'%s = 1\n' % key.encode('ascii'))
|
|
f.flush()
|
|
with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name):
|
|
constraints.reload_constraints()
|
|
self.assertTrue(constraints.SWIFT_CONSTRAINTS_LOADED)
|
|
self.assertEqual(sorted(constraints.DEFAULT_CONSTRAINTS.keys()),
|
|
sorted(constraints.OVERRIDE_CONSTRAINTS.keys()))
|
|
# file is now deleted...
|
|
with mock.patch.object(utils, 'SWIFT_CONF_FILE', f.name):
|
|
constraints.reload_constraints()
|
|
# no constraints have been loaded from non-existent swift.conf
|
|
self.assertFalse(constraints.SWIFT_CONSTRAINTS_LOADED)
|
|
# no constraints are in OVERRIDE
|
|
self.assertEqual([], list(constraints.OVERRIDE_CONSTRAINTS.keys()))
|
|
# the EFFECTIVE constraints mirror DEFAULT
|
|
self.assertEqual(constraints.EFFECTIVE_CONSTRAINTS,
|
|
constraints.DEFAULT_CONSTRAINTS)
|
|
finally:
|
|
constraints.reload_constraints()
|
|
|
|
|
|
if __name__ == '__main__':
|
|
unittest.main()
|