|
8 | 8 | from io import StringIO
|
9 | 9 | from pathlib import Path
|
10 | 10 | from typing import AbstractSet
|
| 11 | +from typing import cast |
11 | 12 | from typing import Dict
|
12 | 13 | from typing import Generator
|
13 | 14 | from typing import List
|
14 | 15 | from typing import Mapping
|
15 | 16 | from typing import Optional
|
| 17 | +from typing import Set |
16 | 18 | from typing import Tuple
|
17 | 19 | from typing import TYPE_CHECKING
|
18 | 20 | from typing import TypeVar
|
|
47 | 49 | _ANSI_ESCAPE_SEQ = re.compile(r"\x1b\[[\d;]+m")
|
48 | 50 | caplog_handler_key = StashKey["LogCaptureHandler"]()
|
49 | 51 | caplog_records_key = StashKey[Dict[str, List[logging.LogRecord]]]()
|
| 52 | +_HandlerType = TypeVar("_HandlerType", bound=logging.Handler) |
50 | 53 |
|
51 | 54 |
|
52 | 55 | def _remove_ansi_escape_sequences(text: str) -> str:
|
53 | 56 | return _ANSI_ESCAPE_SEQ.sub("", text)
|
54 | 57 |
|
55 | 58 |
|
| 59 | +class CatchedLoggers: |
| 60 | + __slots__ = ("handler", "level", "loggers") |
| 61 | + |
| 62 | + def __init__( |
| 63 | + self, |
| 64 | + initial_set: Set[logging.Logger], |
| 65 | + handler: _HandlerType, |
| 66 | + level: Optional[int] = None, |
| 67 | + ): |
| 68 | + self.handler = handler |
| 69 | + self.level = level |
| 70 | + self.loggers: Dict[logging.Logger, Optional[int]] = {} |
| 71 | + if self.level is not None: |
| 72 | + self.handler.setLevel(self.level) |
| 73 | + for logger in initial_set: |
| 74 | + self.catch_logger(logger) |
| 75 | + |
| 76 | + def catch_logger(self, logger: logging.Logger): |
| 77 | + if logger not in self.loggers: |
| 78 | + logger.addHandler(self.handler) |
| 79 | + if self.level is not None: |
| 80 | + logger.setLevel(min(logger.level, self.level)) |
| 81 | + self.loggers[logger] = logger.level # remember original log level |
| 82 | + else: |
| 83 | + self.loggers[logger] = None |
| 84 | + |
| 85 | + def release_logger(self, logger: logging.Logger): |
| 86 | + if logger in self.loggers: |
| 87 | + logger.removeHandler(self.handler) |
| 88 | + orig_level = self.loggers.get(logger) |
| 89 | + if orig_level is not None: |
| 90 | + logger.setLevel(orig_level) |
| 91 | + del self.loggers[logger] |
| 92 | + |
| 93 | + def release_all_loggers(self): |
| 94 | + for logger in list(self.loggers.keys()): |
| 95 | + self.release_logger(logger) |
| 96 | + |
| 97 | + |
| 98 | +class LoggerPropagateInterceptor: |
| 99 | + """Descriptor to be patched into standard libs logging module. |
| 100 | +
|
| 101 | + It intercepts propagate = False assignments from user code. |
| 102 | +
|
| 103 | + While catching_logs is inactive, it tracks non-propagating loggers without side effects. |
| 104 | + While catching_logs is active, it also adds and removes pytest handlers instantly. |
| 105 | + """ |
| 106 | + |
| 107 | + non_propagating_loggers: Set[logging.Logger] = set() |
| 108 | + catchers: Set[CatchedLoggers] = set() |
| 109 | + |
| 110 | + def __init__(self): |
| 111 | + for item in logging.getLogger().manager.loggerDict: |
| 112 | + if isinstance(item, logging.Logger) and not item.propagate: |
| 113 | + self.non_propagating_loggers.add(item) |
| 114 | + |
| 115 | + def __set__(self, logger: logging.Logger, propagate: bool): |
| 116 | + if propagate is False: |
| 117 | + if self.catchers: |
| 118 | + for catcher in self.catchers: |
| 119 | + catcher.catch_logger(logger) |
| 120 | + self.non_propagating_loggers.add(logger) |
| 121 | + elif propagate is True: |
| 122 | + if self.catchers: |
| 123 | + for catcher in self.catchers: |
| 124 | + catcher.release_logger(logger) |
| 125 | + if logger in self.non_propagating_loggers: |
| 126 | + self.non_propagating_loggers.remove(logger) |
| 127 | + logger.__dict__["propagate"] = propagate |
| 128 | + |
| 129 | + |
| 130 | +# From now, we intercept each assignment to logger.propagate |
| 131 | +logging.Logger.propagate = cast(bool, LoggerPropagateInterceptor()) |
| 132 | + |
| 133 | + |
56 | 134 | class ColoredLevelFormatter(logging.Formatter):
|
57 | 135 | """A logging formatter which colorizes the %(levelname)..s part of the
|
58 | 136 | log format passed to __init__."""
|
@@ -299,34 +377,30 @@ def add_option_ini(option, dest, default=None, type=None, **kwargs):
|
299 | 377 | )
|
300 | 378 |
|
301 | 379 |
|
302 |
| -_HandlerType = TypeVar("_HandlerType", bound=logging.Handler) |
303 |
| - |
304 |
| - |
305 | 380 | # Not using @contextmanager for performance reasons.
|
306 | 381 | class catching_logs:
|
307 | 382 | """Context manager that prepares the whole logging machinery properly."""
|
308 | 383 |
|
309 |
| - __slots__ = ("handler", "level", "orig_level") |
| 384 | + __slots__ = ("handler", "level", "catcher") |
310 | 385 |
|
311 | 386 | def __init__(self, handler: _HandlerType, level: Optional[int] = None) -> None:
|
312 | 387 | self.handler = handler
|
313 | 388 | self.level = level
|
| 389 | + self.catcher: Optional[CatchedLoggers] = None |
314 | 390 |
|
315 | 391 | def __enter__(self):
|
316 |
| - root_logger = logging.getLogger() |
317 |
| - if self.level is not None: |
318 |
| - self.handler.setLevel(self.level) |
319 |
| - root_logger.addHandler(self.handler) |
320 |
| - if self.level is not None: |
321 |
| - self.orig_level = root_logger.level |
322 |
| - root_logger.setLevel(min(self.orig_level, self.level)) |
| 392 | + self.catcher = CatchedLoggers( |
| 393 | + LoggerPropagateInterceptor.non_propagating_loggers | {logging.getLogger()}, |
| 394 | + self.handler, |
| 395 | + self.level, |
| 396 | + ) |
| 397 | + LoggerPropagateInterceptor.catchers.add(self.catcher) |
323 | 398 | return self.handler
|
324 | 399 |
|
325 | 400 | def __exit__(self, type, value, traceback):
|
326 |
| - root_logger = logging.getLogger() |
327 |
| - if self.level is not None: |
328 |
| - root_logger.setLevel(self.orig_level) |
329 |
| - root_logger.removeHandler(self.handler) |
| 401 | + assert self.catcher |
| 402 | + LoggerPropagateInterceptor.catchers.remove(self.catcher) |
| 403 | + self.catcher.release_all_loggers() |
330 | 404 |
|
331 | 405 |
|
332 | 406 | class LogCaptureHandler(logging_StreamHandler):
|
|
0 commit comments