dcae1d7643
Also unblocks Jenkins. Change-Id: I76e401e7154714448911c77e08e7c9d18d8734e9
340 lines
14 KiB
Python
340 lines
14 KiB
Python
# vim: tabstop=4 shiftwidth=4 softtabstop=4
|
|
|
|
# Copyright 2012 United States Government as represented by the
|
|
# Administrator of the National Aeronautics and Space Administration.
|
|
# All Rights Reserved.
|
|
#
|
|
# Copyright 2012 Nebula, Inc.
|
|
#
|
|
# Licensed under the Apache License, Version 2.0 (the "License"); you may
|
|
# not use this file except in compliance with the License. You may obtain
|
|
# a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing, software
|
|
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
|
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
|
# License for the specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import datetime
|
|
import os
|
|
|
|
import cloudfiles as swift_client
|
|
from django import http
|
|
from django import test as django_test
|
|
from django.conf import settings
|
|
from django.contrib.messages.storage import default_storage
|
|
from django.core.handlers import wsgi
|
|
from django.test.client import RequestFactory
|
|
from functools import wraps
|
|
from glanceclient.v1 import client as glance_client
|
|
from keystoneclient.v2_0 import client as keystone_client
|
|
from novaclient.v1_1 import client as nova_client
|
|
import httplib2
|
|
import mox
|
|
|
|
from horizon import api
|
|
from horizon import context_processors
|
|
from horizon import middleware
|
|
from horizon import users
|
|
from horizon.tests.test_data.utils import load_test_data
|
|
|
|
from .time import time
|
|
from .time import today
|
|
from .time import utcnow
|
|
|
|
|
|
# Makes output of failing mox tests much easier to read.
|
|
wsgi.WSGIRequest.__repr__ = lambda self: "<class 'django.http.HttpRequest'>"
|
|
|
|
|
|
def create_stubs(stubs_to_create={}):
|
|
if not isinstance(stubs_to_create, dict):
|
|
raise TypeError("create_stub must be passed a dict, but a %s was "
|
|
"given." % type(stubs_to_create).__name__)
|
|
|
|
def inner_stub_out(fn):
|
|
@wraps(fn)
|
|
def instance_stub_out(self):
|
|
for key in stubs_to_create:
|
|
if not (isinstance(stubs_to_create[key], tuple) or
|
|
isinstance(stubs_to_create[key], list)):
|
|
raise TypeError("The values of the create_stub "
|
|
"dict must be lists or tuples, but "
|
|
"is a %s."
|
|
% type(stubs_to_create[key]).__name__)
|
|
|
|
for value in stubs_to_create[key]:
|
|
self.mox.StubOutWithMock(key, value)
|
|
return fn(self)
|
|
return instance_stub_out
|
|
return inner_stub_out
|
|
|
|
|
|
class RequestFactoryWithMessages(RequestFactory):
|
|
def get(self, *args, **kwargs):
|
|
req = super(RequestFactoryWithMessages, self).get(*args, **kwargs)
|
|
req.session = []
|
|
req._messages = default_storage(req)
|
|
return req
|
|
|
|
def post(self, *args, **kwargs):
|
|
req = super(RequestFactoryWithMessages, self).post(*args, **kwargs)
|
|
req.session = []
|
|
req._messages = default_storage(req)
|
|
return req
|
|
|
|
|
|
class TestCase(django_test.TestCase):
|
|
"""
|
|
Specialized base test case class for Horizon which gives access to
|
|
numerous additional features:
|
|
|
|
* A full suite of test data through various attached objects and
|
|
managers (e.g. ``self.servers``, ``self.user``, etc.). See the
|
|
docs for :class:`~horizon.tests.test_data.utils.TestData` for more
|
|
information.
|
|
* The ``mox`` mocking framework via ``self.mox``.
|
|
* A set of request context data via ``self.context``.
|
|
* A ``RequestFactory`` class which supports Django's ``contrib.messages``
|
|
framework via ``self.factory``.
|
|
* A ready-to-go request object via ``self.request``.
|
|
* The ability to override specific time data controls for easier testing.
|
|
* Several handy additional assertion methods.
|
|
"""
|
|
def setUp(self):
|
|
load_test_data(self)
|
|
self.mox = mox.Mox()
|
|
self.factory = RequestFactoryWithMessages()
|
|
self.context = {'authorized_tenants': self.tenants.list()}
|
|
|
|
def fake_conn_request(*args, **kwargs):
|
|
raise Exception("An external URI request tried to escape through "
|
|
"an httplib2 client. Args: %s, kwargs: %s"
|
|
% (args, kwargs))
|
|
|
|
self._real_conn_request = httplib2.Http._conn_request
|
|
httplib2.Http._conn_request = fake_conn_request
|
|
|
|
self._real_horizon_context_processor = context_processors.horizon
|
|
context_processors.horizon = lambda request: self.context
|
|
|
|
self._real_get_user_from_request = users.get_user_from_request
|
|
tenants = self.context['authorized_tenants']
|
|
self.setActiveUser(id=self.user.id,
|
|
token=self.token.id,
|
|
username=self.user.name,
|
|
tenant_id=self.tenant.id,
|
|
service_catalog=self.service_catalog,
|
|
authorized_tenants=tenants)
|
|
self.request = http.HttpRequest()
|
|
self.request.session = self.client._session()
|
|
self.request.session['token'] = self.token.id
|
|
middleware.HorizonMiddleware().process_request(self.request)
|
|
os.environ["HORIZON_TEST_RUN"] = "True"
|
|
|
|
def tearDown(self):
|
|
self.mox.UnsetStubs()
|
|
httplib2.Http._conn_request = self._real_conn_request
|
|
context_processors.horizon = self._real_horizon_context_processor
|
|
users.get_user_from_request = self._real_get_user_from_request
|
|
self.mox.VerifyAll()
|
|
del os.environ["HORIZON_TEST_RUN"]
|
|
|
|
def setActiveUser(self, id=None, token=None, username=None, tenant_id=None,
|
|
service_catalog=None, tenant_name=None, roles=None,
|
|
authorized_tenants=None):
|
|
users.get_user_from_request = lambda x: \
|
|
users.User(id=id,
|
|
token=token,
|
|
user=username,
|
|
tenant_id=tenant_id,
|
|
service_catalog=service_catalog,
|
|
roles=roles,
|
|
authorized_tenants=authorized_tenants,
|
|
request=self.request)
|
|
|
|
def override_times(self):
|
|
""" Overrides the "current" time with immutable values. """
|
|
now = datetime.datetime.utcnow()
|
|
time.override_time = \
|
|
datetime.time(now.hour, now.minute, now.second)
|
|
today.override_time = datetime.date(now.year, now.month, now.day)
|
|
utcnow.override_time = now
|
|
return now
|
|
|
|
def reset_times(self):
|
|
""" Undoes the changes made by ``override_times``. """
|
|
time.override_time = None
|
|
today.override_time = None
|
|
utcnow.override_time = None
|
|
|
|
def assertRedirectsNoFollow(self, response, expected_url):
|
|
"""
|
|
Asserts that the given response issued a 302 redirect without
|
|
processing the view which is redirected to.
|
|
"""
|
|
assert (response.status_code / 100 == 3), \
|
|
"The response did not return a redirect."
|
|
self.assertEqual(response._headers.get('location', None),
|
|
('Location', settings.TESTSERVER + expected_url))
|
|
self.assertEqual(response.status_code, 302)
|
|
|
|
def assertNoMessages(self, response=None):
|
|
"""
|
|
Asserts that no messages have been attached by the ``contrib.messages``
|
|
framework.
|
|
"""
|
|
self.assertMessageCount(response, success=0, warn=0, info=0, error=0)
|
|
|
|
def assertMessageCount(self, response=None, **kwargs):
|
|
"""
|
|
Asserts that the specified number of messages have been attached
|
|
for various message types. Usage would look like
|
|
``self.assertMessageCount(success=1)``.
|
|
"""
|
|
temp_req = self.client.request(**{'wsgi.input': None})
|
|
temp_req.COOKIES = self.client.cookies
|
|
storage = default_storage(temp_req)
|
|
messages = []
|
|
|
|
if response is None:
|
|
# To gain early access to the messages we have to decode the
|
|
# cookie on the test client.
|
|
if 'messages' in self.client.cookies:
|
|
message_cookie = self.client.cookies['messages'].value
|
|
messages = storage._decode(message_cookie)
|
|
# Check for messages in the context
|
|
elif hasattr(response, "context") and "messages" in response.context:
|
|
messages = response.context["messages"]
|
|
# Check for messages attached to the request on a TemplateResponse
|
|
elif hasattr(response, "_request") and hasattr(response._request,
|
|
"_messages"):
|
|
messages = response._request._messages._queued_messages
|
|
|
|
# If we don't have messages and we don't expect messages, we're done.
|
|
if not any(kwargs.values()) and not messages:
|
|
return
|
|
|
|
# If we expected messages and have none, that's a problem.
|
|
if any(kwargs.values()) and not messages:
|
|
error_msg = "Messages were expected, but none were set."
|
|
assert 0 == sum(kwargs.values()), error_msg
|
|
|
|
# Otherwise, make sure we got the expected messages.
|
|
for msg_type, count in kwargs.items():
|
|
msgs = [m.message for m in messages if msg_type in m.tags]
|
|
assert len(msgs) == count, \
|
|
"%s messages not as expected: %s" % (msg_type.title(),
|
|
", ".join(msgs))
|
|
|
|
def assertNoFormErrors(self, response, context_name="form"):
|
|
"""
|
|
Asserts that the response either does not contain a form in it's
|
|
context, or that if it does, that form has no errors.
|
|
"""
|
|
context = getattr(response, "context", {})
|
|
if not context or context_name not in context:
|
|
return True
|
|
errors = response.context[context_name]._errors
|
|
assert len(errors) == 0, \
|
|
"Unexpected errors were found on the form: %s" % errors
|
|
|
|
def assertFormErrors(self, response, count=0, message=None,
|
|
context_name="form"):
|
|
"""
|
|
Asserts that the response does contain a form in it's
|
|
context, and that form has errors, if count were given,
|
|
it must match the exact numbers of errors
|
|
"""
|
|
context = getattr(response, "context", {})
|
|
assert (context and context_name in context), \
|
|
"The response did not contain a form."
|
|
errors = response.context[context_name]._errors
|
|
if count:
|
|
assert len(errors) == count, \
|
|
"%d errors were found on the form, %d expected" % \
|
|
(len(errors), count)
|
|
if message and message not in unicode(errors):
|
|
self.fail("Expected message not found, instead found: %s"
|
|
% ["%s: %s" % (key, [e for e in field_errors]) for
|
|
(key, field_errors) in errors.items()])
|
|
else:
|
|
assert len(errors) > 0, "No errors were found on the form"
|
|
|
|
|
|
class BaseAdminViewTests(TestCase):
|
|
"""
|
|
A ``TestCase`` subclass which sets an active user with the "admin" role
|
|
for testing admin-only views and functionality.
|
|
"""
|
|
def setActiveUser(self, *args, **kwargs):
|
|
if "roles" not in kwargs:
|
|
kwargs['roles'] = [self.roles.admin._info]
|
|
super(BaseAdminViewTests, self).setActiveUser(*args, **kwargs)
|
|
|
|
|
|
class APITestCase(TestCase):
|
|
"""
|
|
The ``APITestCase`` class is for use with tests which deal with the
|
|
underlying clients rather than stubbing out the horizon.api.* methods.
|
|
"""
|
|
def setUp(self):
|
|
super(APITestCase, self).setUp()
|
|
|
|
def fake_keystoneclient(request, username=None, password=None,
|
|
tenant_id=None, token_id=None, endpoint=None,
|
|
admin=False):
|
|
"""
|
|
Wrapper function which returns the stub keystoneclient. Only
|
|
necessary because the function takes too many arguments to
|
|
conveniently be a lambda.
|
|
"""
|
|
return self.stub_keystoneclient()
|
|
|
|
# Store the original clients
|
|
self._original_glanceclient = api.glance.glanceclient
|
|
self._original_keystoneclient = api.keystone.keystoneclient
|
|
self._original_novaclient = api.nova.novaclient
|
|
|
|
# Replace the clients with our stubs.
|
|
api.glance.glanceclient = lambda request: self.stub_glanceclient()
|
|
api.keystone.keystoneclient = fake_keystoneclient
|
|
api.nova.novaclient = lambda request: self.stub_novaclient()
|
|
|
|
def tearDown(self):
|
|
super(APITestCase, self).tearDown()
|
|
api.glance.glanceclient = self._original_glanceclient
|
|
api.nova.novaclient = self._original_novaclient
|
|
api.keystone.keystoneclient = self._original_keystoneclient
|
|
|
|
def stub_novaclient(self):
|
|
if not hasattr(self, "novaclient"):
|
|
self.mox.StubOutWithMock(nova_client, 'Client')
|
|
self.novaclient = self.mox.CreateMock(nova_client.Client)
|
|
return self.novaclient
|
|
|
|
def stub_keystoneclient(self):
|
|
if not hasattr(self, "keystoneclient"):
|
|
self.mox.StubOutWithMock(keystone_client, 'Client')
|
|
self.keystoneclient = self.mox.CreateMock(keystone_client.Client)
|
|
return self.keystoneclient
|
|
|
|
def stub_glanceclient(self):
|
|
if not hasattr(self, "glanceclient"):
|
|
self.mox.StubOutWithMock(glance_client, 'Client')
|
|
self.glanceclient = self.mox.CreateMock(glance_client.Client)
|
|
return self.glanceclient
|
|
|
|
def stub_swiftclient(self, expected_calls=1):
|
|
if not hasattr(self, "swiftclient"):
|
|
self.mox.StubOutWithMock(swift_client, 'Connection')
|
|
self.swiftclient = self.mox.CreateMock(swift_client.Connection)
|
|
while expected_calls:
|
|
swift_client.Connection(auth=mox.IgnoreArg())\
|
|
.AndReturn(self.swiftclient)
|
|
expected_calls -= 1
|
|
return self.swiftclient
|