# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. from __future__ import absolute_import import cloudfiles from horizon import api from horizon import exceptions from horizon import test class SwiftApiTests(test.APITestCase): def test_swift_get_containers(self): containers = self.containers.list() swift_api = self.stub_swiftclient() swift_api.get_all_containers(limit=1001, marker=None).AndReturn(containers) self.mox.ReplayAll() (conts, more) = api.swift_get_containers(self.request) self.assertEqual(len(conts), len(containers)) self.assertFalse(more) def test_swift_create_container(self): container = self.containers.first() swift_api = self.stub_swiftclient(expected_calls=2) # Check for existence, then create exc = cloudfiles.errors.NoSuchContainer() swift_api.get_container(container.name).AndRaise(exc) swift_api.create_container(container.name).AndReturn(container) self.mox.ReplayAll() # Verification handled by mox, no assertions needed. api.swift_create_container(self.request, container.name) def test_swift_create_duplicate_container(self): container = self.containers.first() swift_api = self.stub_swiftclient() swift_api.get_container(container.name).AndReturn(container) self.mox.ReplayAll() # Verification handled by mox, no assertions needed. with self.assertRaises(exceptions.AlreadyExists): api.swift_create_container(self.request, container.name) def test_swift_get_objects(self): container = self.containers.first() objects = self.objects.list() swift_api = self.stub_swiftclient() swift_api.get_container(container.name).AndReturn(container) self.mox.StubOutWithMock(container, 'get_objects') container.get_objects(limit=1001, marker=None, prefix=None).AndReturn(objects) self.mox.ReplayAll() (objs, more) = api.swift_get_objects(self.request, container.name) self.assertEqual(len(objs), len(objects)) self.assertFalse(more) def test_swift_upload_object(self): container = self.containers.first() obj = self.objects.first() OBJECT_DATA = 'someData' swift_api = self.stub_swiftclient() swift_api.get_container(container.name).AndReturn(container) self.mox.StubOutWithMock(container, 'create_object') container.create_object(obj.name).AndReturn(obj) self.mox.StubOutWithMock(obj, 'send') obj.send(OBJECT_DATA).AndReturn(obj) self.mox.ReplayAll() ret_val = api.swift_upload_object(self.request, container.name, obj.name, OBJECT_DATA) self.assertEqual(ret_val, obj) def test_swift_delete_object(self): container = self.containers.first() obj = self.objects.first() swift_api = self.stub_swiftclient() swift_api.get_container(container.name).AndReturn(container) self.mox.StubOutWithMock(container, 'delete_object') container.delete_object(obj.name).AndReturn(obj) self.mox.ReplayAll() ret_val = api.swift_delete_object(self.request, container.name, obj.name) self.assertIsNone(ret_val) def test_swift_get_object_data(self): container = self.containers.first() obj = self.objects.first() OBJECT_DATA = 'objectData' swift_api = self.stub_swiftclient() swift_api.get_container(container.name).AndReturn(container) self.mox.StubOutWithMock(container, 'get_object') container.get_object(obj.name).AndReturn(obj) self.mox.StubOutWithMock(obj, 'stream') obj.stream().AndReturn(OBJECT_DATA) self.mox.ReplayAll() ret_val = api.swift_get_object_data(self.request, container.name, obj.name) self.assertEqual(ret_val, OBJECT_DATA) def test_swift_object_exists(self): container = self.containers.first() obj = self.objects.first() swift_api = self.stub_swiftclient(expected_calls=2) self.mox.StubOutWithMock(container, 'get_object') swift_api.get_container(container.name).AndReturn(container) container.get_object(obj.name).AndReturn(obj) swift_api.get_container(container.name).AndReturn(container) exc = cloudfiles.errors.NoSuchObject() container.get_object(obj.name).AndRaise(exc) self.mox.ReplayAll() args = self.request, container.name, obj.name self.assertTrue(api.swift_object_exists(*args)) # Again, for a "non-existent" object self.assertFalse(api.swift_object_exists(*args)) def test_swift_copy_object(self): container = self.containers.get(name=u"container_one\u6346") container_2 = self.containers.get(name=u"container_two\u6346") obj = self.objects.first() swift_api = self.stub_swiftclient() self.mox.StubOutWithMock(api.swift, 'swift_object_exists') self.mox.StubOutWithMock(container, 'get_object') self.mox.StubOutWithMock(obj, 'copy_to') # Using the non-unicode names here, see below. swift_api.get_container("no_unicode").AndReturn(container) api.swift.swift_object_exists(self.request, "also no unicode", "obj_with_no_unicode").AndReturn(False) container.get_object("obj_with_no_unicode").AndReturn(obj) obj.copy_to("also no unicode", "obj_with_no_unicode") self.mox.ReplayAll() # Unicode fails... we'll get to a successful test in a minute with self.assertRaises(exceptions.HorizonException): api.swift_copy_object(self.request, container.name, obj.name, container_2.name, obj.name) # Verification handled by mox. No assertions needed. container.name = "no_unicode" container_2.name = "also no unicode" obj.name = "obj_with_no_unicode" api.swift_copy_object(self.request, container.name, obj.name, container_2.name, obj.name)