Fix base test class for functional api testing

To support in-tree functional api testing, the test class common
across all types of testing can't import modules that define
configuration that Tempest also defines because duplicate
configuration definitions result in runtime errors.  A previous change
(I44251db399cd73390a9d1931a7f253662002ba10) moved conflicting imports
to a separate module, but subsequent additions have added new conflicting
imports.

This change creates a new test class that is safe to use for all types
of testing (neutron.tests.sub_base.SubBaseTestCase), ensures that the
existing existing base test class (neutron.tests.base.BaseTestCase)
extends it, and documents their respective responsibilities.  This
will hopefully make it less likely that the functional api job will be
broken by changes to the base test class.

Implements: bp retargetable-functional-testing

Change-Id: Ifa270536481fcb19c476c9c62d89e6c5cae36ca1
This commit is contained in:
Maru Newby 2014-11-08 17:08:33 +00:00
parent 7479335bda
commit 7a9347668b
2 changed files with 152 additions and 96 deletions

View File

@ -13,32 +13,34 @@
# License for the specific language governing permissions and limitations
# under the License.
"""Base Test Case for all Unit Tests"""
"""Base test case for tests that do not rely on Tempest.
To change behavoir that is common to all tests, please target
the neutron.tests.sub_base module instead.
If a test needs to import a dependency like Tempest, see
neutron.tests.sub_base for a base test class that can be used without
errors due to duplicate configuration definitions.
"""
import contextlib
import logging as std_logging
import os
import os.path
import sys
import traceback
import eventlet.timeout
import fixtures
import mock
from oslo.config import cfg
from oslo.messaging import conffixture as messaging_conffixture
from oslo.utils import strutils
import testtools
from neutron.common import config
from neutron.common import rpc as n_rpc
from neutron.tests import fake_notifier
from neutron.tests import post_mortem_debug
from neutron.tests import sub_base
CONF = cfg.CONF
CONF.import_opt('state_path', 'neutron.common.config')
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
LOG_FORMAT = sub_base.LOG_FORMAT
ROOTDIR = os.path.dirname(__file__)
ETCDIR = os.path.join(ROOTDIR, 'etc')
@ -56,12 +58,10 @@ def fake_consume_in_threads(self):
return []
def bool_from_env(key, strict=False, default=False):
value = os.environ.get(key)
return strutils.bool_from_string(value, strict=strict, default=default)
bool_from_env = sub_base.bool_from_env
class BaseTestCase(testtools.TestCase):
class BaseTestCase(sub_base.SubBaseTestCase):
@staticmethod
def config_parse(conf=None, args=None):
@ -77,27 +77,8 @@ class BaseTestCase(testtools.TestCase):
def setUp(self):
super(BaseTestCase, self).setUp()
# Configure this first to ensure pm debugging support for setUp()
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
if debugger:
self.addOnException(post_mortem_debug.get_exception_handler(
debugger))
if bool_from_env('OS_DEBUG'):
_level = std_logging.DEBUG
else:
_level = std_logging.INFO
capture_logs = bool_from_env('OS_LOG_CAPTURE')
if not capture_logs:
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
self.log_fixture = self.useFixture(
fixtures.FakeLogger(
format=LOG_FORMAT,
level=_level,
nuke_handlers=capture_logs,
))
# suppress all but errors here
capture_logs = bool_from_env('OS_LOG_CAPTURE')
self.useFixture(
fixtures.FakeLogger(
name='neutron.api.extensions',
@ -106,28 +87,11 @@ class BaseTestCase(testtools.TestCase):
nuke_handlers=capture_logs,
))
test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
if test_timeout == -1:
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
# If someone does use tempfile directly, ensure that it's cleaned up
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.temp_dir = self.useFixture(fixtures.TempDir()).path
cfg.CONF.set_override('state_path', self.temp_dir)
self.addCleanup(mock.patch.stopall)
self.addCleanup(CONF.reset)
if bool_from_env('OS_STDOUT_CAPTURE'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if bool_from_env('OS_STDERR_CAPTURE'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.useFixture(fixtures.MonkeyPatch(
'neutron.common.exceptions.NeutronException.use_fatal_exceptions',
fake_use_fatal_exceptions))
@ -138,7 +102,6 @@ class BaseTestCase(testtools.TestCase):
raise self.skipException('XML Testing Skipped in Py26')
self.setup_config()
self.addOnException(self.check_for_systemexit)
def setup_rpc_mocks(self):
# don't actually start RPC listeners when testing
@ -165,11 +128,6 @@ class BaseTestCase(testtools.TestCase):
self.addCleanup(n_rpc.cleanup)
n_rpc.init(CONF)
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
self.fail("A SystemExit was raised during the test. %s"
% traceback.format_exception(*exc_info))
def setup_config(self):
"""Tests that need a non-default config can override this method."""
self.config_parse()
@ -189,43 +147,3 @@ class BaseTestCase(testtools.TestCase):
group = kw.pop('group', None)
for k, v in kw.iteritems():
CONF.set_override(k, v, group)
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.timeout.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def assertOrderedEqual(self, expected, actual):
expect_val = self.sort_dict_lists(expected)
actual_val = self.sort_dict_lists(actual)
self.assertEqual(expect_val, actual_val)
def sort_dict_lists(self, dic):
for key, value in dic.iteritems():
if isinstance(value, list):
dic[key] = sorted(value)
elif isinstance(value, dict):
dic[key] = self.sort_dict_lists(value)
return dic
def assertDictSupersetOf(self, expected_subset, actual_superset):
"""Checks that actual dict contains the expected dict.
After checking that the arguments are of the right type, this checks
that each item in expected_subset is in, and matches, what is in
actual_superset. Separate tests are done, so that detailed info can
be reported upon failure.
"""
if not isinstance(expected_subset, dict):
self.fail("expected_subset (%s) is not an instance of dict" %
type(expected_subset))
if not isinstance(actual_superset, dict):
self.fail("actual_superset (%s) is not an instance of dict" %
type(actual_superset))
for k, v in expected_subset.items():
self.assertIn(k, actual_superset)
self.assertEqual(v, actual_superset[k],
"Key %(key)s expected: %(exp)r, actual %(act)r" %
{'key': k, 'exp': v, 'act': actual_superset[k]})

138
neutron/tests/sub_base.py Normal file
View File

@ -0,0 +1,138 @@
# Copyright 2014 OpenStack Foundation
# All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License"); you may
# not use this file except in compliance with the License. You may obtain
# a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations
# under the License.
"""Base test case for all tests.
To change behavoir only for tests that do not rely on Tempest, please
target the neutron.tests.base module instead.
There should be no non-test Neutron imports in this module to ensure
that the functional API tests can import Tempest without triggering
errors due to duplicate configuration definitions.
"""
import contextlib
import logging as std_logging
import os
import os.path
import traceback
import eventlet.timeout
import fixtures
import mock
from oslo.utils import strutils
import testtools
from neutron.tests import post_mortem_debug
LOG_FORMAT = "%(asctime)s %(levelname)8s [%(name)s] %(message)s"
def bool_from_env(key, strict=False, default=False):
value = os.environ.get(key)
return strutils.bool_from_string(value, strict=strict, default=default)
class SubBaseTestCase(testtools.TestCase):
def setUp(self):
super(SubBaseTestCase, self).setUp()
# Configure this first to ensure pm debugging support for setUp()
debugger = os.environ.get('OS_POST_MORTEM_DEBUGGER')
if debugger:
self.addOnException(post_mortem_debug.get_exception_handler(
debugger))
if bool_from_env('OS_DEBUG'):
_level = std_logging.DEBUG
else:
_level = std_logging.INFO
capture_logs = bool_from_env('OS_LOG_CAPTURE')
if not capture_logs:
std_logging.basicConfig(format=LOG_FORMAT, level=_level)
self.log_fixture = self.useFixture(
fixtures.FakeLogger(
format=LOG_FORMAT,
level=_level,
nuke_handlers=capture_logs,
))
test_timeout = int(os.environ.get('OS_TEST_TIMEOUT', 0))
if test_timeout == -1:
test_timeout = 0
if test_timeout > 0:
self.useFixture(fixtures.Timeout(test_timeout, gentle=True))
# If someone does use tempfile directly, ensure that it's cleaned up
self.useFixture(fixtures.NestedTempfile())
self.useFixture(fixtures.TempHomeDir())
self.addCleanup(mock.patch.stopall)
if bool_from_env('OS_STDOUT_CAPTURE'):
stdout = self.useFixture(fixtures.StringStream('stdout')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stdout', stdout))
if bool_from_env('OS_STDERR_CAPTURE'):
stderr = self.useFixture(fixtures.StringStream('stderr')).stream
self.useFixture(fixtures.MonkeyPatch('sys.stderr', stderr))
self.addOnException(self.check_for_systemexit)
def check_for_systemexit(self, exc_info):
if isinstance(exc_info[1], SystemExit):
self.fail("A SystemExit was raised during the test. %s"
% traceback.format_exception(*exc_info))
@contextlib.contextmanager
def assert_max_execution_time(self, max_execution_time=5):
with eventlet.timeout.Timeout(max_execution_time, False):
yield
return
self.fail('Execution of this test timed out')
def assertOrderedEqual(self, expected, actual):
expect_val = self.sort_dict_lists(expected)
actual_val = self.sort_dict_lists(actual)
self.assertEqual(expect_val, actual_val)
def sort_dict_lists(self, dic):
for key, value in dic.iteritems():
if isinstance(value, list):
dic[key] = sorted(value)
elif isinstance(value, dict):
dic[key] = self.sort_dict_lists(value)
return dic
def assertDictSupersetOf(self, expected_subset, actual_superset):
"""Checks that actual dict contains the expected dict.
After checking that the arguments are of the right type, this checks
that each item in expected_subset is in, and matches, what is in
actual_superset. Separate tests are done, so that detailed info can
be reported upon failure.
"""
if not isinstance(expected_subset, dict):
self.fail("expected_subset (%s) is not an instance of dict" %
type(expected_subset))
if not isinstance(actual_superset, dict):
self.fail("actual_superset (%s) is not an instance of dict" %
type(actual_superset))
for k, v in expected_subset.items():
self.assertIn(k, actual_superset)
self.assertEqual(v, actual_superset[k],
"Key %(key)s expected: %(exp)r, actual %(act)r" %
{'key': k, 'exp': v, 'act': actual_superset[k]})