|
7 | 7 | import json
|
8 | 8 | import os
|
9 | 9 | from collections import OrderedDict
|
| 10 | +from typing import Dict |
| 11 | +from typing import Generator |
10 | 12 | from typing import List
|
| 13 | +from typing import Optional |
| 14 | +from typing import Set |
11 | 15 |
|
12 | 16 | import attr
|
13 | 17 | import py
|
|
16 | 20 | from .pathlib import Path
|
17 | 21 | from .pathlib import resolve_from_str
|
18 | 22 | from .pathlib import rm_rf
|
| 23 | +from .reports import CollectReport |
19 | 24 | from _pytest import nodes
|
20 | 25 | from _pytest._io import TerminalWriter
|
21 | 26 | from _pytest.config import Config
|
22 | 27 | from _pytest.main import Session
|
| 28 | +from _pytest.python import Module |
23 | 29 |
|
24 | 30 | README_CONTENT = """\
|
25 | 31 | # pytest cache directory #
|
@@ -161,42 +167,88 @@ def _ensure_supporting_files(self):
|
161 | 167 | cachedir_tag_path.write_bytes(CACHEDIR_TAG_CONTENT)
|
162 | 168 |
|
163 | 169 |
|
| 170 | +class LFPluginCollWrapper: |
| 171 | + def __init__(self, lfplugin: "LFPlugin"): |
| 172 | + self.lfplugin = lfplugin |
| 173 | + self._collected_at_least_one_failure = False |
| 174 | + |
| 175 | + @pytest.hookimpl(hookwrapper=True) |
| 176 | + def pytest_make_collect_report(self, collector) -> Generator: |
| 177 | + if isinstance(collector, Session): |
| 178 | + out = yield |
| 179 | + res = out.get_result() # type: CollectReport |
| 180 | + |
| 181 | + # Sort any lf-paths to the beginning. |
| 182 | + lf_paths = self.lfplugin._last_failed_paths |
| 183 | + res.result = sorted( |
| 184 | + res.result, key=lambda x: 0 if Path(x.fspath) in lf_paths else 1, |
| 185 | + ) |
| 186 | + out.force_result(res) |
| 187 | + return |
| 188 | + |
| 189 | + elif isinstance(collector, Module): |
| 190 | + if Path(collector.fspath) in self.lfplugin._last_failed_paths: |
| 191 | + out = yield |
| 192 | + res = out.get_result() |
| 193 | + |
| 194 | + filtered_result = [ |
| 195 | + x for x in res.result if x.nodeid in self.lfplugin.lastfailed |
| 196 | + ] |
| 197 | + if filtered_result: |
| 198 | + res.result = filtered_result |
| 199 | + out.force_result(res) |
| 200 | + |
| 201 | + if not self._collected_at_least_one_failure: |
| 202 | + self.lfplugin.config.pluginmanager.register( |
| 203 | + LFPluginCollSkipfiles(self.lfplugin), "lfplugin-collskip" |
| 204 | + ) |
| 205 | + self._collected_at_least_one_failure = True |
| 206 | + return res |
| 207 | + yield |
| 208 | + |
| 209 | + |
| 210 | +class LFPluginCollSkipfiles: |
| 211 | + def __init__(self, lfplugin: "LFPlugin"): |
| 212 | + self.lfplugin = lfplugin |
| 213 | + |
| 214 | + @pytest.hookimpl |
| 215 | + def pytest_make_collect_report(self, collector) -> Optional[CollectReport]: |
| 216 | + if isinstance(collector, Module): |
| 217 | + if Path(collector.fspath) not in self.lfplugin._last_failed_paths: |
| 218 | + self.lfplugin._skipped_files += 1 |
| 219 | + |
| 220 | + return CollectReport( |
| 221 | + collector.nodeid, "passed", longrepr=None, result=[] |
| 222 | + ) |
| 223 | + return None |
| 224 | + |
| 225 | + |
164 | 226 | class LFPlugin:
|
165 | 227 | """ Plugin which implements the --lf (run last-failing) option """
|
166 | 228 |
|
167 |
| - def __init__(self, config): |
| 229 | + def __init__(self, config: Config) -> None: |
168 | 230 | self.config = config
|
169 | 231 | active_keys = "lf", "failedfirst"
|
170 | 232 | self.active = any(config.getoption(key) for key in active_keys)
|
171 |
| - self.lastfailed = config.cache.get("cache/lastfailed", {}) |
| 233 | + assert config.cache |
| 234 | + self.lastfailed = config.cache.get( |
| 235 | + "cache/lastfailed", {} |
| 236 | + ) # type: Dict[str, bool] |
172 | 237 | self._previously_failed_count = None
|
173 | 238 | self._report_status = None
|
174 | 239 | self._skipped_files = 0 # count skipped files during collection due to --lf
|
175 | 240 |
|
176 |
| - def last_failed_paths(self): |
177 |
| - """Returns a set with all Paths()s of the previously failed nodeids (cached). |
178 |
| - """ |
179 |
| - try: |
180 |
| - return self._last_failed_paths |
181 |
| - except AttributeError: |
182 |
| - rootpath = Path(self.config.rootdir) |
183 |
| - result = {rootpath / nodeid.split("::")[0] for nodeid in self.lastfailed} |
184 |
| - result = {x for x in result if x.exists()} |
185 |
| - self._last_failed_paths = result |
186 |
| - return result |
187 |
| - |
188 |
| - def pytest_ignore_collect(self, path): |
189 |
| - """ |
190 |
| - Ignore this file path if we are in --lf mode and it is not in the list of |
191 |
| - previously failed files. |
192 |
| - """ |
193 |
| - if self.active and self.config.getoption("lf") and path.isfile(): |
194 |
| - last_failed_paths = self.last_failed_paths() |
195 |
| - if last_failed_paths: |
196 |
| - skip_it = Path(path) not in self.last_failed_paths() |
197 |
| - if skip_it: |
198 |
| - self._skipped_files += 1 |
199 |
| - return skip_it |
| 241 | + if config.getoption("lf"): |
| 242 | + self._last_failed_paths = self.get_last_failed_paths() |
| 243 | + config.pluginmanager.register( |
| 244 | + LFPluginCollWrapper(self), "lfplugin-collwrapper" |
| 245 | + ) |
| 246 | + |
| 247 | + def get_last_failed_paths(self) -> Set[Path]: |
| 248 | + """Returns a set with all Paths()s of the previously failed nodeids.""" |
| 249 | + rootpath = Path(self.config.rootdir) |
| 250 | + result = {rootpath / nodeid.split("::")[0] for nodeid in self.lastfailed} |
| 251 | + return {x for x in result if x.exists()} |
200 | 252 |
|
201 | 253 | def pytest_report_collectionfinish(self):
|
202 | 254 | if self.active and self.config.getoption("verbose") >= 0:
|
@@ -380,7 +432,7 @@ def pytest_cmdline_main(config):
|
380 | 432 |
|
381 | 433 |
|
382 | 434 | @pytest.hookimpl(tryfirst=True)
|
383 |
| -def pytest_configure(config): |
| 435 | +def pytest_configure(config: Config) -> None: |
384 | 436 | config.cache = Cache.for_config(config)
|
385 | 437 | config.pluginmanager.register(LFPlugin(config), "lfplugin")
|
386 | 438 | config.pluginmanager.register(NFPlugin(config), "nfplugin")
|
|
0 commit comments