"""
Outputting and reporting for virtuous test runs.
"""
from collections import defaultdict
from traceback import format_exception
import sys
import time
from pyrsistent import m, v
from pyrsistent.typing import PMap, PVector
import attrs
@attrs.frozen
class _DelayedMessage:
"""
A test status message that will be shown at the end of a run.
"""
width: int
status: str
subject: str
body: str = ""
def lines(self):
yield "=" * self.width
yield "\n"
yield self.status
yield "\n"
if self.body:
yield self.body
[docs]class Outputter:
"""
An outputter converts test results to renderable strings.
"""
_last_test_class = None
_last_test_module = None
_current_subtests_test = None
FAILED, PASSED = "FAILED", "PASSED"
ERROR, FAIL, OK, SKIPPED = "[ERROR]", "[FAIL]", "[OK]", "[SKIPPED]"
EXPECTED_FAILURE, UNEXPECTED_SUCCESS = "[XFAIL]", "[UNEXPECTED SUCCESS]"
_COLORS = [
("_error", "RED", ERROR),
("_fail", "RED", FAIL),
("_failed", "RED", FAILED),
("_ok", "GREEN", OK),
("_passed", "GREEN", PASSED),
("_skipped", "BLUE", SKIPPED),
("_unexpected_success", "YELLOW", UNEXPECTED_SUCCESS),
("_expected_failure", "BLUE", EXPECTED_FAILURE),
]
def __init__(self, colored=True, indent=" " * 2, line_width=120):
self.indent = indent
self.line_width = line_width
if colored:
from colorama import Fore, Style
for attribute, color, text in self._COLORS:
color = getattr(Fore, color)
message = f"{Style.BRIGHT}{color}{text}{Style.RESET_ALL}"
setattr(self, attribute, message)
else:
for attribute, _, text in self._COLORS:
setattr(self, attribute, text)
self._later = {
status: defaultdict(list)
for status in [
self.SKIPPED,
self.EXPECTED_FAILURE,
self.FAIL,
self.UNEXPECTED_SUCCESS,
self.ERROR,
]
}
def _show_later(self, **kwargs):
message = _DelayedMessage(width=self.line_width, **kwargs)
self._later[message.status][message.body].append(message)
[docs] def run_started(self):
"""
Output nothing.
"""
[docs] def run_stopped(self, recorder, runtime):
"""
Output all the messages stored for later, as well as a final summary.
"""
for status in self._later.values():
for messages in status.values():
if not messages:
continue
yield "\n"
yield from messages[0].lines()
for message in messages:
yield "\n"
yield message.subject.id()
yield "\n"
yield "-" * self.line_width
count = recorder.testsRun
tests = "tests" if count != 1 else "test"
subcount = recorder.subtests
if subcount:
pluralized = "subtests" if subcount != 1 else "subtest"
subtests = f" with {subcount} {pluralized}"
else:
subtests = ""
yield f"\nRan {count} {tests}{subtests} in {runtime:.3f}s\n\n"
if recorder.wasSuccessful():
yield self._passed
else:
yield self._failed
if recorder.testsRun:
yield " ("
summary = []
for attribute in (
"successes",
"skips",
"failures",
"errors",
"expected_failures",
"unexpected_successes",
"subtest_failures",
"subtest_errors",
):
subcount = len(getattr(recorder, attribute))
if subcount:
summary.append(f"{attribute}={subcount}")
yield ", ".join(summary)
yield ")"
yield "\n"
[docs] def test_started(self, test):
"""
Output the test name.
"""
cls = test.__class__
module = cls.__module__
if self._last_test_module != module:
self._last_test_module = module
yield module
yield "\n"
if self._last_test_class != cls:
self._last_test_class = cls
yield self.indent
yield cls.__name__
yield "\n"
[docs] def test_stopped(self, test):
"""
Output nothing.
"""
[docs] def test_errored(self, test, exc_info):
"""
Output an error.
"""
self._show_later(
status=self.ERROR,
body="".join(format_exception(*exc_info)),
subject=test,
)
return self._format_line(test, self._error)
[docs] def test_failed(self, test, exc_info):
"""
Output a failure.
"""
self._show_later(
status=self.FAIL,
body="".join(format_exception(*exc_info)),
subject=test,
)
return self._format_line(test, self._fail)
[docs] def test_skipped(self, test, reason):
"""
Output a skip.
"""
self._show_later(status=self.SKIPPED, body=reason, subject=test)
return self._format_line(test, self._skipped)
[docs] def test_expectedly_failed(self, test, exc_info):
"""
Output an expected failure.
"""
self._show_later(status=self.EXPECTED_FAILURE, subject=test)
return self._format_line(test, self._expected_failure)
[docs] def test_unexpectedly_succeeded(self, test):
"""
Output an unexpected success.
"""
self._show_later(status=self.UNEXPECTED_SUCCESS, subject=test)
return self._format_line(test, self._unexpected_success)
[docs] def test_succeeded(self, test):
"""
Output a success.
"""
return self._format_line(test, self._ok)
[docs] def subtest_succeeded(self, test, subtest):
"""
Output nothing.
"""
[docs] def subtest_failed(self, test, subtest, exc_info):
"""
Output a failed subtest.
"""
self._show_later(
status=self.FAIL,
body="".join(format_exception(*exc_info)),
subject=subtest,
)
return self._format_subtest_result(test, subtest, self._fail)
[docs] def subtest_errored(self, test, subtest, exc_info):
"""
Output an errored subtest.
"""
self._show_later(
status=self.ERROR,
body="".join(format_exception(*exc_info)),
subject=subtest,
)
return self._format_subtest_result(test, subtest, self._error)
def _format_line(self, test, result):
before = f"{self.indent}{self.indent}{test._testMethodName} ..."
return self._pad_center(left=before, right=result) + "\n"
def _format_subtest_result(self, test, subtest, result):
if self._current_subtests_test != test.id():
before = f"{self.indent}{self.indent}{test._testMethodName}\n"
else:
before = ""
self._current_subtests_test = test.id()
line = f"{self.indent * 3}{subtest._subDescription()[1:-1]} ..."
return f"{before}{self._pad_center(left=line, right=result)}\n"
def _pad_center(self, left, right):
space = self.line_width - len(left) - len(right)
return left + " " * space + right
[docs]@attrs.define(slots=False)
class Counter:
"""
A counter is a recorder that does not hold references to tests it sees.
"""
errors: int = 0
failures: int = 0
expected_failures: int = 0
unexpected_successes: int = 0
successes: int = 0
subtest_successes: int = 0
subtest_failures: int = 0
subtest_errors: int = 0
shouldStop = False
@property
def count(self):
"""
Return a total count of all tests.
"""
return sum(attrs.astuple(self))
testsRun = count
[docs] def startTest(self, test): # noqa: D102
pass
[docs] def stopTest(self, test): # noqa: D102
pass
[docs] def addError(self, test, exc_info): # noqa: D102
self.errors += 1
[docs] def addFailure(self, test, exc_info): # noqa: D102
self.failures += 1
[docs] def addExpectedFailure(self, *args, **kwargs): # noqa: D102
self.expected_failures += 1
[docs] def addUnexpectedSuccess(self, test): # noqa: D102
self.unexpected_successes += 1
[docs] def addSuccess(self, test): # noqa: D102
self.successes += 1
[docs] def addDuration(self, test, elapsed): # noqa: D102
pass
[docs] def addSubTest(self, test, subtest, outcome): # noqa: D102
if outcome is None:
self.subtest_successes += 1
elif issubclass(outcome[0], test.failureException):
self.subtest_failures += 1
else:
self.subtest_errors += 1
[docs]@attrs.define(slots=False)
class Recorder:
"""
Record test results for later inspection.
"""
errors: PVector = v()
failures: PVector = v()
skips: PVector = v()
successes: PVector = v()
expected_failures: PVector = v()
unexpected_successes: PVector = v()
subtest_successes: PMap = m()
subtest_failures: PMap = m()
subtest_errors: PMap = m()
shouldStop = False
@property
def testsRun(self): # noqa: D102
fields = attrs.astuple(
self,
filter=lambda f, _: not f.name.startswith("subtest_"),
)
# It seems addSuccess is called for tests with all passing subtests
# but the reverse isn't true if a subtest fails...
tests_with_subtests = len(self.subtest_failures | self.subtest_errors)
return sum(len(each) for each in fields) + tests_with_subtests
@property
def subtests(self): # noqa: D102
return sum(
1
for each in (
self.subtest_successes,
self.subtest_failures,
self.subtest_errors,
)
for value in each.values()
for _ in value
)
[docs] def startTestRun(self): # noqa: D102
pass
[docs] def stopTestRun(self): # noqa: D102
pass
[docs] def startTest(self, test): # noqa: D102
pass
[docs] def stopTest(self, test): # noqa: D102
pass
[docs] def addError(self, test, exc_info): # noqa: D102
self.errors = self.errors.append((test, exc_info))
[docs] def addFailure(self, test, exc_info): # noqa: D102
self.failures = self.failures.append((test, exc_info))
[docs] def addExpectedFailure(self, test, exc_info): # noqa: D102
self.expected_failures = self.expected_failures.append(
(test, exc_info),
)
[docs] def addSkip(self, test, reason): # noqa: D102
self.skips = self.skips.append(test)
[docs] def addUnexpectedSuccess(self, test): # noqa: D102
self.unexpected_successes = self.unexpected_successes.append(test)
[docs] def addSuccess(self, test): # noqa: D102
self.successes = self.successes.append(test)
[docs] def addDuration(self, test, elapsed): # noqa: D102
pass
[docs] def addSubTest(self, test, subtest, outcome): # noqa: D102
if outcome is None:
self.subtest_successes = self.subtest_successes.set(
test,
self.subtest_successes.get(test, v()).append(subtest),
)
elif issubclass(outcome[0], test.failureException):
self.subtest_failures = self.subtest_failures.set(
test,
self.subtest_failures.get(test, v()).append(subtest),
)
else:
self.subtest_errors = self.subtest_errors.set(
test,
self.subtest_errors.get(test, v()).append(subtest),
)
[docs] def wasSuccessful(self): # noqa: D102
return not (
self.errors
or self.failures
or self.unexpected_successes
or self.subtest_failures
or self.subtest_errors
)
[docs]@attrs.define(slots=False)
class ComponentizedReporter:
"""
Combine together outputting and recording capabilities.
"""
outputter: Outputter = attrs.field(factory=Outputter)
recorder = attrs.field(factory=Recorder, repr=False)
stream = attrs.field(default=sys.stdout)
_time = attrs.field(default=time.time, repr=False)
failfast = False # FIXME: needed for subtests?
shouldStop = False
@property
def testsRun(self): # noqa: D102
return self.recorder.testsRun
[docs] def startTestRun(self): # noqa: D102
self._start_time = self._time()
self.recorder.startTestRun()
self.stream.writelines(self.outputter.run_started() or "")
[docs] def stopTestRun(self): # noqa: D102
self.recorder.stopTestRun()
runtime = self._time() - self._start_time
self.stream.writelines(
self.outputter.run_stopped(self.recorder, runtime) or "",
)
[docs] def startTest(self, test): # noqa: D102
self.recorder.startTest(test)
self.stream.writelines(self.outputter.test_started(test) or "")
[docs] def stopTest(self, test): # noqa: D102
self.recorder.stopTest(test)
self.stream.writelines(self.outputter.test_stopped(test) or "")
[docs] def addError(self, test, exc_info): # noqa: D102
self.recorder.addError(test, exc_info)
self.stream.writelines(
self.outputter.test_errored(test, exc_info) or "",
)
[docs] def addFailure(self, test, exc_info): # noqa: D102
self.recorder.addFailure(test, exc_info)
self.stream.writelines(
self.outputter.test_failed(test, exc_info) or "",
)
[docs] def addSkip(self, test, reason): # noqa: D102
self.recorder.addSkip(test, reason)
self.stream.writelines(self.outputter.test_skipped(test, reason) or "")
[docs] def addExpectedFailure(self, test, exc_info): # noqa: D102
self.recorder.addExpectedFailure(test, exc_info)
self.stream.writelines(
self.outputter.test_expectedly_failed(test, exc_info) or "",
)
[docs] def addUnexpectedSuccess(self, test): # noqa: D102
self.recorder.addUnexpectedSuccess(test)
self.stream.writelines(
self.outputter.test_unexpectedly_succeeded(test) or "",
)
[docs] def addSuccess(self, test): # noqa: D102
self.recorder.addSuccess(test)
self.stream.writelines(self.outputter.test_succeeded(test) or "")
[docs] def addDuration(self, test, elapsed): # noqa: D102
self.recorder.addDuration(test, elapsed)
[docs] def addSubTest(self, test, subtest, outcome): # noqa: D102
self.recorder.addSubTest(test, subtest, outcome)
if outcome is None:
output = self.outputter.subtest_succeeded(test, subtest)
elif issubclass(outcome[0], test.failureException):
output = self.outputter.subtest_failed(test, subtest, outcome)
else:
output = self.outputter.subtest_errored(test, subtest, outcome)
self.stream.writelines(output or "")
[docs] def wasSuccessful(self): # noqa: D102
return self.recorder.wasSuccessful()