# Copyright (c) 2010-2012 OpenStack Foundation # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or # implied. # See the License for the specific language governing permissions and # limitations under the License. import errno import os import mock import unittest from shutil import rmtree from StringIO import StringIO import simplejson import xml.dom.minidom from swift.common.swob import Request from swift.account.server import AccountController, ACCOUNT_LISTING_LIMIT from swift.common.utils import normalize_timestamp, replication, public class TestAccountController(unittest.TestCase): """Test swift.account.server.AccountController""" def setUp(self): """Set up for testing swift.account.server.AccountController""" self.testdir = os.path.join(os.path.dirname(__file__), 'account_server') self.controller = AccountController( {'devices': self.testdir, 'mount_check': 'false'}) def tearDown(self): """Tear down for testing swift.account.server.AccountController""" try: rmtree(self.testdir) except OSError as err: if err.errno != errno.ENOENT: raise def test_DELETE_not_found(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '0'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertTrue('X-Account-Status' not in resp.headers) def test_DELETE_empty(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_DELETE_not_empty(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) # We now allow deleting non-empty accounts self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_DELETE_now_empty(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank( '/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '2', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_DELETE_invalid_partition(self): req = Request.blank('/sda1/./a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_DELETE_timestamp_not_float(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': 'not-float'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_DELETE_insufficient_storage(self): self.controller = AccountController({'devices': self.testdir}) req = Request.blank( '/sda-null/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 507) def test_HEAD_not_found(self): # Test the case in which account does not exist (can be recreated) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertTrue('X-Account-Status' not in resp.headers) # Test the case in which account was deleted but not yet reaped req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_HEAD_empty_account(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['x-account-container-count'], '0') self.assertEqual(resp.headers['x-account-object-count'], '0') self.assertEqual(resp.headers['x-account-bytes-used'], '0') def test_HEAD_with_containers(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['x-account-container-count'], '2') self.assertEqual(resp.headers['x-account-object-count'], '0') self.assertEqual(resp.headers['x-account-bytes-used'], '0') req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '1', 'X-Bytes-Used': '2', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '3', 'X-Bytes-Used': '4', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD', 'HTTP_X_TIMESTAMP': '5'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['x-account-container-count'], '2') self.assertEqual(resp.headers['x-account-object-count'], '4') self.assertEqual(resp.headers['x-account-bytes-used'], '6') def test_HEAD_invalid_partition(self): req = Request.blank('/sda1/./a', environ={'REQUEST_METHOD': 'HEAD', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_HEAD_invalid_content_type(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}, headers={'Accept': 'application/plain'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 406) def test_HEAD_insufficient_storage(self): self.controller = AccountController({'devices': self.testdir}) req = Request.blank('/sda-null/p/a', environ={'REQUEST_METHOD': 'HEAD', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 507) def test_HEAD_invalid_format(self): format = '%D1%BD%8A9' # invalid UTF-8; should be %E1%BD%8A9 (E -> D) req = Request.blank('/sda1/p/a?format=' + format, environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_PUT_not_found(self): req = Request.blank( '/sda1/p/a/c', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-PUT-Timestamp': normalize_timestamp(1), 'X-DELETE-Timestamp': normalize_timestamp(0), 'X-Object-Count': '1', 'X-Bytes-Used': '1', 'X-Timestamp': normalize_timestamp(0)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertTrue('X-Account-Status' not in resp.headers) def test_PUT(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 202) def test_PUT_simulated_create_race(self): state = ['initial'] from swift.account.backend import AccountBroker as OrigAcBr class InterceptedAcBr(OrigAcBr): def __init__(self, *args, **kwargs): super(InterceptedAcBr, self).__init__(*args, **kwargs) if state[0] == 'initial': # Do nothing initially pass elif state[0] == 'race': # Save the original db_file attribute value self._saved_db_file = self.db_file self.db_file += '.doesnotexist' def initialize(self, *args, **kwargs): if state[0] == 'initial': # Do nothing initially pass elif state[0] == 'race': # Restore the original db_file attribute to get the race # behavior self.db_file = self._saved_db_file return super(InterceptedAcBr, self).initialize(*args, **kwargs) with mock.patch("swift.account.server.AccountBroker", InterceptedAcBr): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) state[0] = "race" req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 202) def test_PUT_after_DELETE(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE'}, headers={'X-Timestamp': normalize_timestamp(1)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(2)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 403) self.assertEqual(resp.body, 'Recently deleted') self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_PUT_GET_metadata(self): # Set metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1), 'X-Account-Meta-Test': 'Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'Value') # Set another metadata header, ensuring old one doesn't disappear req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), 'X-Account-Meta-Test2': 'Value2'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'Value') self.assertEqual(resp.headers.get('x-account-meta-test2'), 'Value2') # Update metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(3), 'X-Account-Meta-Test': 'New Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 202) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'New Value') # Send old update to metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(2), 'X-Account-Meta-Test': 'Old Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 202) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'New Value') # Remove metadata header (by setting it to empty) req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(4), 'X-Account-Meta-Test': ''}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 202) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assert_('x-account-meta-test' not in resp.headers) def test_PUT_invalid_partition(self): req = Request.blank('/sda1/./a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_PUT_insufficient_storage(self): self.controller = AccountController({'devices': self.testdir}) req = Request.blank('/sda-null/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 507) def test_POST_HEAD_metadata(self): req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Timestamp': normalize_timestamp(1)}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 201) # Set metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(1), 'X-Account-Meta-Test': 'Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'Value') # Update metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(3), 'X-Account-Meta-Test': 'New Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'New Value') # Send old update to metadata header req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(2), 'X-Account-Meta-Test': 'Old Value'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers.get('x-account-meta-test'), 'New Value') # Remove metadata header (by setting it to empty) req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'POST'}, headers={'X-Timestamp': normalize_timestamp(4), 'X-Account-Meta-Test': ''}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'HEAD'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assert_('x-account-meta-test' not in resp.headers) def test_POST_invalid_partition(self): req = Request.blank('/sda1/./a', environ={'REQUEST_METHOD': 'POST', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_POST_timestamp_not_float(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST', 'HTTP_X_TIMESTAMP': '0'}, headers={'X-Timestamp': 'not-float'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 400) def test_POST_insufficient_storage(self): self.controller = AccountController({'devices': self.testdir}) req = Request.blank('/sda-null/p/a', environ={'REQUEST_METHOD': 'POST', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 507) def test_POST_after_DELETE_not_found(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 201) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) self.assertEquals(resp.status_int, 204) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'POST', 'HTTP_X_TIMESTAMP': '2'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_GET_not_found_plain(self): # Test the case in which account does not exist (can be recreated) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertTrue('X-Account-Status' not in resp.headers) # Test the case in which account was deleted but not yet reaped req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'DELETE', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) self.assertEqual(resp.headers['X-Account-Status'], 'Deleted') def test_GET_not_found_json(self): req = Request.blank('/sda1/p/a?format=json', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) def test_GET_not_found_xml(self): req = Request.blank('/sda1/p/a?format=xml', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 404) def test_GET_empty_account_plain(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 204) self.assertEqual(resp.headers['Content-Type'], 'text/plain; charset=utf-8') def test_GET_empty_account_json(self): req = Request.blank('/sda1/p/a?format=json', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=json', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['Content-Type'], 'application/json; charset=utf-8') def test_GET_empty_account_xml(self): req = Request.blank('/sda1/p/a?format=xml', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=xml', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.headers['Content-Type'], 'application/xml; charset=utf-8') def test_GET_over_limit(self): req = Request.blank( '/sda1/p/a?limit=%d' % (ACCOUNT_LISTING_LIMIT + 1), environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 412) def test_GET_with_containers_plain(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body.strip().split('\n'), ['c1', 'c2']) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '1', 'X-Bytes-Used': '2', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '3', 'X-Bytes-Used': '4', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body.strip().split('\n'), ['c1', 'c2']) self.assertEqual(resp.content_type, 'text/plain') self.assertEqual(resp.charset, 'utf-8') # test unknown format uses default plain req = Request.blank('/sda1/p/a?format=somethinglese', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(resp.body.strip().split('\n'), ['c1', 'c2']) self.assertEqual(resp.content_type, 'text/plain') self.assertEqual(resp.charset, 'utf-8') def test_GET_with_containers_json(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=json', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(simplejson.loads(resp.body), [{'count': 0, 'bytes': 0, 'name': 'c1'}, {'count': 0, 'bytes': 0, 'name': 'c2'}]) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '1', 'X-Bytes-Used': '2', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '3', 'X-Bytes-Used': '4', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=json', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) self.assertEqual(simplejson.loads(resp.body), [{'count': 1, 'bytes': 2, 'name': 'c1'}, {'count': 3, 'bytes': 4, 'name': 'c2'}]) self.assertEqual(resp.content_type, 'application/json') self.assertEqual(resp.charset, 'utf-8') def test_GET_with_containers_xml(self): req = Request.blank('/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '0', 'X-Bytes-Used': '0', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=xml', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.content_type, 'application/xml') self.assertEqual(resp.status_int, 200) dom = xml.dom.minidom.parseString(resp.body) self.assertEqual(dom.firstChild.nodeName, 'account') listing = \ [n for n in dom.firstChild.childNodes if n.nodeName != '#text'] self.assertEqual(len(listing), 2) self.assertEqual(listing[0].nodeName, 'container') container = [n for n in listing[0].childNodes if n.nodeName != '#text'] self.assertEqual(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] self.assertEqual(node.firstChild.nodeValue, 'c1') node = [n for n in container if n.nodeName == 'count'][0] self.assertEqual(node.firstChild.nodeValue, '0') node = [n for n in container if n.nodeName == 'bytes'][0] self.assertEqual(node.firstChild.nodeValue, '0') self.assertEqual(listing[-1].nodeName, 'container') container = \ [n for n in listing[-1].childNodes if n.nodeName != '#text'] self.assertEqual(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] self.assertEqual(node.firstChild.nodeValue, 'c2') node = [n for n in container if n.nodeName == 'count'][0] self.assertEqual(node.firstChild.nodeValue, '0') node = [n for n in container if n.nodeName == 'bytes'][0] self.assertEqual(node.firstChild.nodeValue, '0') req = Request.blank('/sda1/p/a/c1', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '1', 'X-Delete-Timestamp': '0', 'X-Object-Count': '1', 'X-Bytes-Used': '2', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a/c2', environ={'REQUEST_METHOD': 'PUT'}, headers={'X-Put-Timestamp': '2', 'X-Delete-Timestamp': '0', 'X-Object-Count': '3', 'X-Bytes-Used': '4', 'X-Timestamp': normalize_timestamp(0)}) req.get_response(self.controller) req = Request.blank('/sda1/p/a?format=xml', environ={'REQUEST_METHOD': 'GET'}) resp = req.get_response(self.controller) self.assertEqual(resp.status_int, 200) dom = xml.dom.minidom.parseString(resp.body) self.assertEqual(dom.firstChild.nodeName, 'account') listing = \ [n for n in dom.firstChild.childNodes if n.nodeName != '#text'] self.assertEqual(len(listing), 2) self.assertEqual(listing[0].nodeName, 'container') container = [n for n in listing[0].childNodes if n.nodeName != '#text'] self.assertEqual(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] self.assertEqual(node.firstChild.nodeValue, 'c1') node = [n for n in container if n.nodeName == 'count'][0] self.assertEqual(node.firstChild.nodeValue, '1') node = [n for n in container if n.nodeName == 'bytes'][0] self.assertEqual(node.firstChild.nodeValue, '2') self.assertEqual(listing[-1].nodeName, 'container') container = [ n for n in listing[-1].childNodes if n.nodeName != '#text'] self.assertEqual(sorted([n.nodeName for n in container]), ['bytes', 'count', 'name']) node = [n for n in container if n.nodeName == 'name'][0] self.assertEqual(node.firstChild.nodeValue, 'c2') node = [n for n in container if n.nodeName == 'count'][0] self.assertEqual(node.firstChild.nodeValue, '3') node = [n for n in container if n.nodeName == 'bytes'][0] self.assertEqual(node.firstChild.nodeValue, '4') self.assertEqual(resp.charset, 'utf-8') def test_GET_xml_escapes_account_name(self): req = Request.blank( '/sda1/p/%22%27', # "' environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank( '/sda1/p/%22%27?format=xml', environ={'REQUEST_METHOD': 'GET', 'HTTP_X_TIMESTAMP': '1'}) resp = req.get_response(self.controller) dom = xml.dom.minidom.parseString(resp.body) self.assertEqual(dom.firstChild.attributes['name'].value, '"\'') def test_GET_xml_escapes_container_name(self): req = Request.blank( '/sda1/p/a', environ={'REQUEST_METHOD': 'PUT', 'HTTP_X_TIMESTAMP': '0'}) req.get_response(self.controller) req = Request.blank( '/sda1/p/a/%22%3Cword', # "

Method Not Allowed

The method is not ' 'allowed for this resource.

'] mock_method = replication(public(lambda x: mock.MagicMock())) with mock.patch.object(self.controller, method, new=mock_method): mock_method.replication = True response = self.controller.__call__(env, start_response) self.assertEqual(response, answer) if __name__ == '__main__': unittest.main()