python-openstackclient/openstackclient/tests/unit/api/test_object_store_v1.py
Steve Martinelli 39839def2e move unit tests to new "unit" test module
this will better isolate the unit tests from the functional tests.
unfortunately, the "integration" tests had to be lumped into the
"unit" tests since we need the separation in testr.conf

Change-Id: Ifd12198c1f90e4e3c951c73bfa1884ab300d8ded
2016-09-08 15:19:50 -07:00

338 lines
9.8 KiB
Python

# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
#
"""Object Store v1 API Library Tests"""
import mock
from keystoneauth1 import session
from requests_mock.contrib import fixture
from openstackclient.api import object_store_v1 as object_store
from openstackclient.tests.unit import utils
FAKE_ACCOUNT = 'q12we34r'
FAKE_AUTH = '11223344556677889900'
FAKE_URL = 'http://gopher.com/v1/' + FAKE_ACCOUNT
FAKE_CONTAINER = 'rainbarrel'
FAKE_OBJECT = 'spigot'
LIST_CONTAINER_RESP = [
'qaz',
'fred',
]
LIST_OBJECT_RESP = [
{'name': 'fred', 'bytes': 1234, 'content_type': 'text'},
{'name': 'wilma', 'bytes': 5678, 'content_type': 'text'},
]
class TestObjectAPIv1(utils.TestCase):
def setUp(self):
super(TestObjectAPIv1, self).setUp()
sess = session.Session()
self.api = object_store.APIv1(session=sess, endpoint=FAKE_URL)
self.requests_mock = self.useFixture(fixture.Fixture())
class TestContainer(TestObjectAPIv1):
def setUp(self):
super(TestContainer, self).setUp()
def test_container_create(self):
headers = {
'x-trans-id': '1qaz2wsx',
}
self.requests_mock.register_uri(
'PUT',
FAKE_URL + '/qaz',
headers=headers,
status_code=201,
)
ret = self.api.container_create(container='qaz')
data = {
'account': FAKE_ACCOUNT,
'container': 'qaz',
'x-trans-id': '1qaz2wsx',
}
self.assertEqual(data, ret)
def test_container_delete(self):
self.requests_mock.register_uri(
'DELETE',
FAKE_URL + '/qaz',
status_code=204,
)
ret = self.api.container_delete(container='qaz')
self.assertIsNone(ret)
def test_container_list_no_options(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL,
json=LIST_CONTAINER_RESP,
status_code=200,
)
ret = self.api.container_list()
self.assertEqual(LIST_CONTAINER_RESP, ret)
def test_container_list_prefix(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '?prefix=foo%2f&format=json',
json=LIST_CONTAINER_RESP,
status_code=200,
)
ret = self.api.container_list(
prefix='foo/',
)
self.assertEqual(LIST_CONTAINER_RESP, ret)
def test_container_list_marker_limit_end(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '?marker=next&limit=2&end_marker=stop&format=json',
json=LIST_CONTAINER_RESP,
status_code=200,
)
ret = self.api.container_list(
marker='next',
limit=2,
end_marker='stop',
)
self.assertEqual(LIST_CONTAINER_RESP, ret)
# def test_container_list_full_listing(self):
# sess = self.app.client_manager.session
#
# def side_effect(*args, **kwargs):
# rv = sess.get().json.return_value
# sess.get().json.return_value = []
# sess.get().json.side_effect = None
# return rv
#
# resp = [{'name': 'is-name'}]
# sess.get().json.return_value = resp
# sess.get().json.side_effect = side_effect
#
# data = lib_container.list_containers(
# self.app.client_manager.session,
# fake_url,
# full_listing=True,
# )
#
# # Check expected values
# sess.get.assert_called_with(
# fake_url,
# params={
# 'format': 'json',
# 'marker': 'is-name',
# }
# )
# self.assertEqual(resp, data)
def test_container_show(self):
headers = {
'X-Container-Meta-Owner': FAKE_ACCOUNT,
'x-container-object-count': '1',
'x-container-bytes-used': '577',
}
resp = {
'account': FAKE_ACCOUNT,
'container': 'qaz',
'object_count': '1',
'bytes_used': '577',
'properties': {'Owner': FAKE_ACCOUNT},
}
self.requests_mock.register_uri(
'HEAD',
FAKE_URL + '/qaz',
headers=headers,
status_code=204,
)
ret = self.api.container_show(container='qaz')
self.assertEqual(resp, ret)
class TestObject(TestObjectAPIv1):
def setUp(self):
super(TestObject, self).setUp()
@mock.patch('openstackclient.api.object_store_v1.io.open')
def base_object_create(self, file_contents, mock_open):
mock_open.read.return_value = file_contents
headers = {
'etag': 'youreit',
'x-trans-id': '1qaz2wsx',
}
# TODO(dtroyer): When requests_mock gains the ability to
# match against request.body add this check
# https://review.openstack.org/127316
self.requests_mock.register_uri(
'PUT',
FAKE_URL + '/qaz/counter.txt',
headers=headers,
# body=file_contents,
status_code=201,
)
ret = self.api.object_create(
container='qaz',
object='counter.txt',
)
data = {
'account': FAKE_ACCOUNT,
'container': 'qaz',
'object': 'counter.txt',
'etag': 'youreit',
'x-trans-id': '1qaz2wsx',
}
self.assertEqual(data, ret)
def test_object_create(self):
self.base_object_create('111\n222\n333\n')
self.base_object_create(bytes([0x31, 0x00, 0x0d, 0x0a, 0x7f, 0xff]))
def test_object_delete(self):
self.requests_mock.register_uri(
'DELETE',
FAKE_URL + '/qaz/wsx',
status_code=204,
)
ret = self.api.object_delete(
container='qaz',
object='wsx',
)
self.assertIsNone(ret)
def test_object_list_no_options(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '/qaz',
json=LIST_OBJECT_RESP,
status_code=200,
)
ret = self.api.object_list(container='qaz')
self.assertEqual(LIST_OBJECT_RESP, ret)
def test_object_list_delimiter(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '/qaz?delimiter=%7C',
json=LIST_OBJECT_RESP,
status_code=200,
)
ret = self.api.object_list(
container='qaz',
delimiter='|',
)
self.assertEqual(LIST_OBJECT_RESP, ret)
def test_object_list_prefix(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '/qaz?prefix=foo%2f',
json=LIST_OBJECT_RESP,
status_code=200,
)
ret = self.api.object_list(
container='qaz',
prefix='foo/',
)
self.assertEqual(LIST_OBJECT_RESP, ret)
def test_object_list_marker_limit_end(self):
self.requests_mock.register_uri(
'GET',
FAKE_URL + '/qaz?marker=next&limit=2&end_marker=stop',
json=LIST_CONTAINER_RESP,
status_code=200,
)
ret = self.api.object_list(
container='qaz',
marker='next',
limit=2,
end_marker='stop',
)
self.assertEqual(LIST_CONTAINER_RESP, ret)
# def test_list_objects_full_listing(self):
# sess = self.app.client_manager.session
#
# def side_effect(*args, **kwargs):
# rv = sess.get().json.return_value
# sess.get().json.return_value = []
# sess.get().json.side_effect = None
# return rv
#
# resp = [{'name': 'is-name'}]
# sess.get().json.return_value = resp
# sess.get().json.side_effect = side_effect
#
# data = lib_object.list_objects(
# sess,
# fake_url,
# fake_container,
# full_listing=True,
# )
#
# # Check expected values
# sess.get.assert_called_with(
# fake_url + '/' + fake_container,
# params={
# 'format': 'json',
# 'marker': 'is-name',
# }
# )
# self.assertEqual(resp, data)
def test_object_show(self):
headers = {
'content-type': 'text/alpha',
'content-length': '577',
'last-modified': '20130101',
'etag': 'qaz',
'x-container-meta-owner': FAKE_ACCOUNT,
'x-object-meta-wife': 'Wilma',
'x-object-meta-Husband': 'fred',
'x-tra-header': 'yabba-dabba-do',
}
resp = {
'account': FAKE_ACCOUNT,
'container': 'qaz',
'object': FAKE_OBJECT,
'content-type': 'text/alpha',
'content-length': '577',
'last-modified': '20130101',
'etag': 'qaz',
'properties': {'wife': 'Wilma',
'Husband': 'fred'},
}
self.requests_mock.register_uri(
'HEAD',
FAKE_URL + '/qaz/' + FAKE_OBJECT,
headers=headers,
status_code=204,
)
ret = self.api.object_show(
container='qaz',
object=FAKE_OBJECT,
)
self.assertEqual(resp, ret)