Reverts tag reporting changes

Change-Id: I5211c92e4704a868e712052c70582655aa13755a
This commit is contained in:
Jose Idar 2014-05-01 13:14:47 -05:00
parent 9375aaea5c
commit 86eb2a3ead
8 changed files with 27 additions and 306 deletions

View File

@ -130,7 +130,7 @@ def setup_new_cchandler(
return log_handler
def log_results(result, test_id=None, verbosity=0):
def log_results(result):
"""Replicates the printing functionality of unittest's runner.run() but
log's instead of prints
"""
@ -179,14 +179,6 @@ def log_results(result, test_id=None, verbosity=0):
os.getenv("CAFE_TEST_LOG_PATH"))
print '-' * 150
# Print the tag to test mapping if available and verbosity is > 2
if verbosity > 2 and hasattr(result, 'mapping'):
if test_id is not None:
result.mapping.write_to_stream(
"Test Suite ID: {0}\n".format(test_id))
result.mapping.print_tag_to_test_mapping()
result.mapping.print_attribute_to_test_mapping()
def log_errors(label, result, errors):
border1 = '=' * 45

View File

@ -19,9 +19,6 @@ from cafe.common.reporting.xml_report import XMLReport
class Reporter:
JSON_REPORT = 'json'
XML_REPORT = 'xml'
def __init__(self, result_parser, all_results):
self.result_parser = result_parser
self.all_results = all_results
@ -30,11 +27,11 @@ class Reporter:
""" Creates a report object based on what type is given and generates
the report in the specified directory.
"""
if result_type == Reporter.JSON_REPORT:
if result_type == 'json':
report = JSONReport()
elif result_type == Reporter.XML_REPORT:
elif result_type == 'xml':
report = XMLReport()
report.generate_report(result_parser=self.result_parser,
all_results=self.all_results,
path=path)
report.generate_report(
result_parser=self.result_parser, all_results=self.all_results,
path=path)

View File

@ -66,14 +66,6 @@ class XMLReport(BaseReport):
else:
testcase_tag.attrib['result'] = "PASSED"
comment = ""
if testcase.tags is not None:
comment += "Test Tags: {tags}".format(tags=testcase.tags)
if testcase.attributes is not None:
comment += " Attribute Tags: {attributes}".format(
attributes=testcase.attributes)
testcase_tag.attrib['comment'] = comment
result_path = path or os.getcwd()
if os.path.isdir(result_path):
result_path += "/results.xml"

View File

@ -36,7 +36,7 @@ class DataDrivenFixtureError(Exception):
def tags(*tags, **attrs):
"""Adds tags and attributes to tests, which are interpreted by the
cafe-runner at run time and by result generator during reporting
cafe-runner at run time
"""
def decorator(func):
setattr(func, TAGS_DECORATOR_TAG_LIST_NAME, [])

View File

@ -94,25 +94,6 @@ class SummarizeResults(object):
return failure_obj_list
def update_tags(self, executed_tests):
for test in executed_tests:
if hasattr(self, 'mapping'):
test_tags = self.mapping.test_to_tag_mapping.get(
getattr(test, 'test_method_name'))
if test_tags is None or len(test_tags) == 0:
test_tags = []
setattr(test, 'tags', test_tags)
attribute_tags = self.mapping.test_to_attribute_mapping.get(
getattr(test, 'test_method_name'))
if attribute_tags is None or len(attribute_tags) == 0:
attribute_tags = []
setattr(test, 'attributes', attribute_tags)
return executed_tests
def summary_result(self):
summary_res = {'tests': str(self.testsRun),
'errors': str(len(self.errors)),
@ -124,21 +105,19 @@ class SummarizeResults(object):
executed_tests = (self.get_passed_tests() + self.parse_failures() +
self.get_errored_tests() + self.get_skipped_tests())
return self.update_tags(executed_tests)
return executed_tests
class Result(object):
def __init__(
self, test_class_name, test_method_name, failure_trace=None,
skipped_msg=None, error_trace=None):
def __init__(self, test_class_name, test_method_name, failure_trace=None,
skipped_msg=None, error_trace=None, tags=None,
attributes=None):
self.test_class_name = test_class_name
self.test_method_name = test_method_name
self.failure_trace = failure_trace
self.skipped_msg = skipped_msg
self.error_trace = error_trace
self.tags = tags
self.attributes = attributes
def __repr__(self):
values = []

View File

@ -1,209 +0,0 @@
"""
Copyright 2014 Rackspace
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
"""
from unittest import TextTestResult
from cafe.drivers.unittest.decorators import TAGS_DECORATOR_TAG_LIST_NAME
from cafe.drivers.unittest.decorators import TAGS_DECORATOR_ATTR_DICT_NAME
class TaggedTextTestResult(TextTestResult):
""" Extended TextTestResult object to include support for tagged methods"""
def __init__(self, stream, descriptions, verbosity):
super(TaggedTextTestResult, self).__init__(
stream, descriptions, verbosity)
self.mapping = TestCaseTagMapping(self)
def stopTest(self, test):
""" Override stopTest method to capture test object and extract tags"""
super(TaggedTextTestResult, self).stopTest(test)
test_method = getattr(test, test._testMethodName)
if hasattr(test_method, TAGS_DECORATOR_TAG_LIST_NAME):
self.mapping.update_mapping(test._testMethodName, getattr(
test_method, TAGS_DECORATOR_TAG_LIST_NAME))
if hasattr(test_method, TAGS_DECORATOR_ATTR_DICT_NAME):
self.mapping.update_attribute_mapping(
test._testMethodName, getattr(test_method,
TAGS_DECORATOR_ATTR_DICT_NAME))
class TestCaseTagMapping(object):
""" Test case mapping class which keeps track of test-to-tag and
tag-to-test mapping
"""
def __init__(self, test_result):
self.test_ref = test_result
self.test_to_tag_mapping = dict()
self.tag_to_test_mapping = dict()
self.test_to_attribute_mapping = dict()
self.attribute_to_test_mapping = dict()
def update_mapping(self, test_name, tag_list):
""" Takes the test name and the list of associated tags and updates
the mapping
"""
if not self.test_to_tag_mapping.__contains__(test_name):
self.test_to_tag_mapping[test_name] = tag_list
for tag in tag_list:
if self.tag_to_test_mapping.__contains__(
tag) and not self.tag_to_test_mapping.get(
tag).__contains__(test_name):
self.tag_to_test_mapping[tag].append(test_name)
else:
self.tag_to_test_mapping[tag] = [test_name]
def update_attribute_mapping(self, test_name, attribute_list):
if not self.test_to_attribute_mapping.__contains__(test_name):
self.test_to_attribute_mapping[test_name] = attribute_list
for attribute, entries in attribute_list.items():
for entry in entries.split(","):
entry = entry.lstrip().rstrip()
attribute_tuple = (attribute, entry)
if self.attribute_to_test_mapping.__contains__(
attribute_tuple) and not \
self.attribute_to_test_mapping.get(
attribute_tuple).__contains__(test_name):
self.attribute_to_test_mapping[attribute_tuple].append(
test_name)
else:
self.attribute_to_test_mapping[
attribute_tuple] = [test_name]
def print_test_to_tag_mapping(self):
""" Prints the test-to-tag dict mapping to result stream """
if len(self.test_to_tag_mapping) == 0:
return
max_len = 0
self.test_ref.stream.writeln()
self.test_ref.stream.writeln("Tags and attributes associated to tests")
self.test_ref.stream.writeln(self.test_ref.separator1)
max_len = self.__get_max_entry_length(self.test_to_tag_mapping.keys())
for entry in self.test_to_tag_mapping.keys():
self.test_ref.stream.write("{entry}{spacer}: ".format(
entry=entry, spacer=(" " * (max_len - len(entry)))))
self.test_ref.stream.write(
str(self.test_to_tag_mapping.get(entry)))
if entry in self.test_to_attribute_mapping:
self.test_ref.stream.write(" Attributes: {attributes}".format(
attributes=str(self.test_to_attribute_mapping.get(entry))))
self.test_ref.stream.write("\n")
self.test_ref.stream.writeln(self.test_ref.separator1)
self.test_ref.stream.flush()
def print_tag_to_test_mapping(self):
""" Prints the tag-to-test dict mapping to result stream """
if len(self.tag_to_test_mapping) == 0:
return
max_len = 0
self.test_ref.stream.writeln("Tests associated to tags")
self.test_ref.stream.writeln(self.test_ref.separator1)
max_len = self.__get_max_entry_length(self.tag_to_test_mapping.keys())
for entry in self.tag_to_test_mapping.keys():
self.test_ref.stream.write("{entry}{spacer} : ".format(
entry=entry, spacer=(" " * (max_len - len(entry)))))
self.test_ref.stream.writeln(self.__generate_summary(
entry, self.tag_to_test_mapping))
self.test_ref.stream.writeln(
str(self.tag_to_test_mapping.get(entry)))
self.test_ref.stream.writeln("\n")
self.test_ref.stream.writeln(self.test_ref.separator1)
self.test_ref.stream.flush()
def print_attribute_to_test_mapping(self):
""" Prints the attribute-to-test dict mapping to result stream """
if len(self.attribute_to_test_mapping) == 0:
return
max_len = 0
self.test_ref.stream.writeln("Tests associated to attributes")
self.test_ref.stream.writeln(self.test_ref.separator1)
max_len = self.__get_max_entry_length(
self.attribute_to_test_mapping.keys())
for entry in self.attribute_to_test_mapping.keys():
self.test_ref.stream.write("{entry}{spacer} : ".format(
entry=entry, spacer=(" " * (max_len - len(str(entry))))))
self.test_ref.stream.writeln(self.__generate_summary(
entry, self.attribute_to_test_mapping))
self.test_ref.stream.writeln(
str(self.attribute_to_test_mapping.get(entry)))
self.test_ref.stream.writeln("\n")
self.test_ref.stream.writeln(self.test_ref.separator1)
self.test_ref.stream.flush()
def write_to_stream(self, data):
""" Writes to the stream object passed to the result object
"""
self.test_ref.stream.write(data)
self.test_ref.stream.flush()
@staticmethod
def __tuple_contains(test, test_ref_list):
if test_ref_list is None or len(test_ref_list) == 0:
return False
else:
for item in test_ref_list:
if vars(item[0]).get('_testMethodName') == test:
return True
return False
@staticmethod
def __get_max_entry_length(listing):
max_len = 0
for entry in listing:
if type(entry) is not str:
entry = str(entry)
if len(entry) > max_len:
max_len = len(entry)
return max_len
def __generate_summary(self, tag, listing):
""" Generates a run summary for a given tag """
pass_count = 0
fail_count = 0
skip_count = 0
error_count = 0
tests = listing.get(tag)
for test in tests:
if self.__tuple_contains(test, self.test_ref.failures):
fail_count += 1
continue
elif self.__tuple_contains(test, self.test_ref.errors):
error_count += 1
continue
elif self.__tuple_contains(test, self.test_ref.skipped):
skip_count += 1
continue
else:
pass_count += 1
total_count = pass_count + fail_count + skip_count + error_count
if pass_count == 0:
pass_rate = float(0)
else:
pass_rate = 100 * float(pass_count) / float(total_count)
return ("Pass: {0} Fail: {1} Error: {2} Skipped: {3} Total: {4} "
"Pass Rate: {5}%").format(pass_count, fail_count, error_count,
skip_count, total_count, pass_rate)

View File

@ -28,7 +28,6 @@ from traceback import extract_tb
import unittest
import uuid
from result import TaggedTextTestResult
from cafe.drivers.unittest.fixtures import BaseTestFixture
from cafe.common.reporting.cclogging import log_results
from cafe.drivers.unittest.parsers import SummarizeResults
@ -117,10 +116,7 @@ class _WritelnDecorator(object):
class OpenCafeParallelTextTestRunner(unittest.TextTestRunner):
def __init__(self, stream=sys.stderr, descriptions=1, verbosity=1,
resultclass=None):
super(OpenCafeParallelTextTestRunner, self).__init__(
stream, descriptions, verbosity, resultclass=resultclass)
def __init__(self, stream=sys.stderr, descriptions=1, verbosity=1):
self.stream = _WritelnDecorator(stream)
self.descriptions = descriptions
self.verbosity = verbosity
@ -847,13 +843,8 @@ class UnittestRunner(object):
print "=" * 150
@staticmethod
def execute_test(runner, test_id, test, results, verbosity):
def execute_test(runner, test_id, test, results):
result = runner.run(test)
# Inject tag mapping and log results to console
UnittestRunner._inject_tag_mapping(result)
log_results(result, test_id, verbosity=verbosity)
results.update({test_id: result})
@staticmethod
@ -861,13 +852,11 @@ class UnittestRunner(object):
test_runner = None
# Use the parallel text runner so the console logs look correct
# Use custom test result object to keep track of tags
if parallel:
test_runner = OpenCafeParallelTextTestRunner(stream=sys.stdout,
verbosity=int(verbosity), resultclass=TaggedTextTestResult)
test_runner = OpenCafeParallelTextTestRunner(
verbosity=int(verbosity))
else:
test_runner = unittest.TextTestRunner(stream=sys.stdout,
verbosity=int(verbosity), resultclass=TaggedTextTestResult)
test_runner = unittest.TextTestRunner(verbosity=int(verbosity))
test_runner.failfast = fail_fast
return test_runner
@ -895,16 +884,6 @@ class UnittestRunner(object):
return errors, failures, tests_run
@staticmethod
def _inject_tag_mapping(result):
"""Inject tag mapping into the result __dict__ object if available"""
if hasattr(result, 'mapping'):
mapping = result.mapping.test_to_tag_mapping
setattr(result, 'tags', mapping or [])
attributes = result.mapping.test_to_attribute_mapping
setattr(result, 'attributes', attributes or [])
def run(self):
"""
loops through all the packages, modules, and methods sent in from
@ -943,20 +922,18 @@ class UnittestRunner(object):
exit_code = self.run_parallel(
parallel_test_list, test_runner,
result_type=self.cl_args.result,
results_path=self.cl_args.result_directory,
verbosity=self.cl_args.verbose)
results_path=self.cl_args.result_directory)
exit(exit_code)
else:
exit_code = self.run_serialized(
master_suite, test_runner, result_type=self.cl_args.result,
results_path=self.cl_args.result_directory,
verbosity=self.cl_args.verbose)
results_path=self.cl_args.result_directory)
exit(exit_code)
@staticmethod
def run_parallel(test_suites, test_runner, result_type=None,
results_path=None, verbosity=0):
def run_parallel(
self, test_suites, test_runner, result_type=None,
results_path=None):
exit_code = 0
proc = None
@ -975,8 +952,8 @@ class UnittestRunner(object):
test_mapping[test_id] = test_suite
proc = Process(
target=UnittestRunner.execute_test,
args=(test_runner, test_id, test_suite, results, verbosity))
target=self.execute_test,
args=(test_runner, test_id, test_suite, results))
processes.append(proc)
proc.start()
@ -985,8 +962,7 @@ class UnittestRunner(object):
finish = time.time()
errors, failures, _ = UnittestRunner.dump_results(
start, finish, results)
errors, failures, _ = self.dump_results(start, finish, results)
if result_type is not None:
all_results = []
@ -1006,10 +982,9 @@ class UnittestRunner(object):
return exit_code
@staticmethod
def run_serialized(
master_suite, test_runner, result_type=None,
results_path=None, verbosity=0):
self, master_suite, test_runner, result_type=None,
results_path=None):
exit_code = 0
unittest.installHandler()
@ -1017,9 +992,6 @@ class UnittestRunner(object):
result = test_runner.run(master_suite)
total_execution_time = time.time() - start_time
# Inject tag mapping
UnittestRunner._inject_tag_mapping(result)
if result_type is not None:
result_parser = SummarizeResults(
vars(result), master_suite, total_execution_time)
@ -1029,7 +1001,7 @@ class UnittestRunner(object):
reporter.generate_report(
result_type=result_type, path=results_path)
log_results(result, verbosity=verbosity)
log_results(result)
if not result.wasSuccessful():
exit_code = 1

View File

@ -21,8 +21,6 @@ from uuid import uuid4
from cafe.common.reporting.reporter import Reporter
from cafe.drivers.unittest.parsers import SummarizeResults
from cafe.drivers.unittest.decorators import tags
from cafe.drivers.unittest.runner import UnittestRunner
from cafe.drivers.unittest.suite import OpenCafeUnittestTestSuite
def load_tests(*args, **kwargs):