# vim: tabstop=4 shiftwidth=4 softtabstop=4 # Copyright 2012 United States Government as represented by the # Administrator of the National Aeronautics and Space Administration. # All Rights Reserved. # # Copyright 2012 Nebula, Inc. # # Licensed under the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. You may obtain # a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, WITHOUT # WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the # License for the specific language governing permissions and limitations # under the License. import datetime import os import cloudfiles as swift_client from django import http from django import test as django_test from django.conf import settings from django.contrib.messages.storage import default_storage from django.core.handlers import wsgi from django.test.client import RequestFactory from functools import wraps from glanceclient.v1 import client as glance_client from keystoneclient.v2_0 import client as keystone_client from novaclient.v1_1 import client as nova_client import httplib2 import mox from horizon import api from horizon import context_processors from horizon import middleware from horizon import users from horizon.tests.test_data.utils import load_test_data from .time import time from .time import today from .time import utcnow # Makes output of failing mox tests much easier to read. wsgi.WSGIRequest.__repr__ = lambda self: "" def create_stubs(stubs_to_create={}): if not isinstance(stubs_to_create, dict): raise TypeError("create_stub must be passed a dict, but a %s was " "given." % type(stubs_to_create).__name__) def inner_stub_out(fn): @wraps(fn) def instance_stub_out(self): for key in stubs_to_create: if not (isinstance(stubs_to_create[key], tuple) or isinstance(stubs_to_create[key], list)): raise TypeError("The values of the create_stub " "dict must be lists or tuples, but " "is a %s." % type(stubs_to_create[key]).__name__) for value in stubs_to_create[key]: self.mox.StubOutWithMock(key, value) return fn(self) return instance_stub_out return inner_stub_out class RequestFactoryWithMessages(RequestFactory): def get(self, *args, **kwargs): req = super(RequestFactoryWithMessages, self).get(*args, **kwargs) req.session = [] req._messages = default_storage(req) return req def post(self, *args, **kwargs): req = super(RequestFactoryWithMessages, self).post(*args, **kwargs) req.session = [] req._messages = default_storage(req) return req class TestCase(django_test.TestCase): """ Specialized base test case class for Horizon which gives access to numerous additional features: * A full suite of test data through various attached objects and managers (e.g. ``self.servers``, ``self.user``, etc.). See the docs for :class:`~horizon.tests.test_data.utils.TestData` for more information. * The ``mox`` mocking framework via ``self.mox``. * A set of request context data via ``self.context``. * A ``RequestFactory`` class which supports Django's ``contrib.messages`` framework via ``self.factory``. * A ready-to-go request object via ``self.request``. * The ability to override specific time data controls for easier testing. * Several handy additional assertion methods. """ def setUp(self): load_test_data(self) self.mox = mox.Mox() self.factory = RequestFactoryWithMessages() self.context = {'authorized_tenants': self.tenants.list()} def fake_conn_request(*args, **kwargs): raise Exception("An external URI request tried to escape through " "an httplib2 client. Args: %s, kwargs: %s" % (args, kwargs)) self._real_conn_request = httplib2.Http._conn_request httplib2.Http._conn_request = fake_conn_request self._real_horizon_context_processor = context_processors.horizon context_processors.horizon = lambda request: self.context self._real_get_user_from_request = users.get_user_from_request tenants = self.context['authorized_tenants'] self.setActiveUser(id=self.user.id, token=self.token.id, username=self.user.name, tenant_id=self.tenant.id, service_catalog=self.service_catalog, authorized_tenants=tenants) self.request = http.HttpRequest() self.request.session = self.client._session() self.request.session['token'] = self.token.id middleware.HorizonMiddleware().process_request(self.request) os.environ["HORIZON_TEST_RUN"] = "True" def tearDown(self): self.mox.UnsetStubs() httplib2.Http._conn_request = self._real_conn_request context_processors.horizon = self._real_horizon_context_processor users.get_user_from_request = self._real_get_user_from_request self.mox.VerifyAll() del os.environ["HORIZON_TEST_RUN"] def setActiveUser(self, id=None, token=None, username=None, tenant_id=None, service_catalog=None, tenant_name=None, roles=None, authorized_tenants=None): users.get_user_from_request = lambda x: \ users.User(id=id, token=token, user=username, tenant_id=tenant_id, service_catalog=service_catalog, roles=roles, authorized_tenants=authorized_tenants, request=self.request) def override_times(self): """ Overrides the "current" time with immutable values. """ now = datetime.datetime.utcnow() time.override_time = \ datetime.time(now.hour, now.minute, now.second) today.override_time = datetime.date(now.year, now.month, now.day) utcnow.override_time = now return now def reset_times(self): """ Undoes the changes made by ``override_times``. """ time.override_time = None today.override_time = None utcnow.override_time = None def assertRedirectsNoFollow(self, response, expected_url): """ Asserts that the given response issued a 302 redirect without processing the view which is redirected to. """ assert (response.status_code / 100 == 3), \ "The response did not return a redirect." self.assertEqual(response._headers.get('location', None), ('Location', settings.TESTSERVER + expected_url)) self.assertEqual(response.status_code, 302) def assertNoMessages(self, response=None): """ Asserts that no messages have been attached by the ``contrib.messages`` framework. """ self.assertMessageCount(response, success=0, warn=0, info=0, error=0) def assertMessageCount(self, response=None, **kwargs): """ Asserts that the specified number of messages have been attached for various message types. Usage would look like ``self.assertMessageCount(success=1)``. """ temp_req = self.client.request(**{'wsgi.input': None}) temp_req.COOKIES = self.client.cookies storage = default_storage(temp_req) messages = [] if response is None: # To gain early access to the messages we have to decode the # cookie on the test client. if 'messages' in self.client.cookies: message_cookie = self.client.cookies['messages'].value messages = storage._decode(message_cookie) # Check for messages in the context elif hasattr(response, "context") and "messages" in response.context: messages = response.context["messages"] # Check for messages attached to the request on a TemplateResponse elif hasattr(response, "_request") and hasattr(response._request, "_messages"): messages = response._request._messages._queued_messages # If we don't have messages and we don't expect messages, we're done. if not any(kwargs.values()) and not messages: return # If we expected messages and have none, that's a problem. if any(kwargs.values()) and not messages: error_msg = "Messages were expected, but none were set." assert 0 == sum(kwargs.values()), error_msg # Otherwise, make sure we got the expected messages. for msg_type, count in kwargs.items(): msgs = [m.message for m in messages if msg_type in m.tags] assert len(msgs) == count, \ "%s messages not as expected: %s" % (msg_type.title(), ", ".join(msgs)) def assertNoFormErrors(self, response, context_name="form"): """ Asserts that the response either does not contain a form in it's context, or that if it does, that form has no errors. """ context = getattr(response, "context", {}) if not context or context_name not in context: return True errors = response.context[context_name]._errors assert len(errors) == 0, \ "Unexpected errors were found on the form: %s" % errors def assertFormErrors(self, response, count=0, message=None, context_name="form"): """ Asserts that the response does contain a form in it's context, and that form has errors, if count were given, it must match the exact numbers of errors """ context = getattr(response, "context", {}) assert (context and context_name in context), \ "The response did not contain a form." errors = response.context[context_name]._errors if count: assert len(errors) == count, \ "%d errors were found on the form, %d expected" % \ (len(errors), count) if message and message not in unicode(errors): self.fail("Expected message not found, instead found: %s" % ["%s: %s" % (key, [e for e in field_errors]) for (key, field_errors) in errors.items()]) else: assert len(errors) > 0, "No errors were found on the form" class BaseAdminViewTests(TestCase): """ A ``TestCase`` subclass which sets an active user with the "admin" role for testing admin-only views and functionality. """ def setActiveUser(self, *args, **kwargs): if "roles" not in kwargs: kwargs['roles'] = [self.roles.admin._info] super(BaseAdminViewTests, self).setActiveUser(*args, **kwargs) class APITestCase(TestCase): """ The ``APITestCase`` class is for use with tests which deal with the underlying clients rather than stubbing out the horizon.api.* methods. """ def setUp(self): super(APITestCase, self).setUp() def fake_keystoneclient(request, username=None, password=None, tenant_id=None, token_id=None, endpoint=None, admin=False): """ Wrapper function which returns the stub keystoneclient. Only necessary because the function takes too many arguments to conveniently be a lambda. """ return self.stub_keystoneclient() # Store the original clients self._original_glanceclient = api.glance.glanceclient self._original_keystoneclient = api.keystone.keystoneclient self._original_novaclient = api.nova.novaclient # Replace the clients with our stubs. api.glance.glanceclient = lambda request: self.stub_glanceclient() api.keystone.keystoneclient = fake_keystoneclient api.nova.novaclient = lambda request: self.stub_novaclient() def tearDown(self): super(APITestCase, self).tearDown() api.glance.glanceclient = self._original_glanceclient api.nova.novaclient = self._original_novaclient api.keystone.keystoneclient = self._original_keystoneclient def stub_novaclient(self): if not hasattr(self, "novaclient"): self.mox.StubOutWithMock(nova_client, 'Client') self.novaclient = self.mox.CreateMock(nova_client.Client) return self.novaclient def stub_keystoneclient(self): if not hasattr(self, "keystoneclient"): self.mox.StubOutWithMock(keystone_client, 'Client') self.keystoneclient = self.mox.CreateMock(keystone_client.Client) return self.keystoneclient def stub_glanceclient(self): if not hasattr(self, "glanceclient"): self.mox.StubOutWithMock(glance_client, 'Client') self.glanceclient = self.mox.CreateMock(glance_client.Client) return self.glanceclient def stub_swiftclient(self, expected_calls=1): if not hasattr(self, "swiftclient"): self.mox.StubOutWithMock(swift_client, 'Connection') self.swiftclient = self.mox.CreateMock(swift_client.Connection) while expected_calls: swift_client.Connection(auth=mox.IgnoreArg())\ .AndReturn(self.swiftclient) expected_calls -= 1 return self.swiftclient