|
2 | 2 | # these are all functions _testcapi exports whose name begins with 'test_'.
|
3 | 3 |
|
4 | 4 | from collections import OrderedDict
|
5 |
| -from contextlib import contextmanager |
| 5 | +from contextlib import contextmanager, ExitStack |
6 | 6 | import _thread
|
7 | 7 | import importlib.machinery
|
8 | 8 | import importlib.util
|
@@ -1606,5 +1606,172 @@ def test_clear_unassigned_watcher_id(self):
|
1606 | 1606 | self.clear_watcher(1)
|
1607 | 1607 |
|
1608 | 1608 |
|
| 1609 | +class TestTypeWatchers(unittest.TestCase): |
| 1610 | + # types of watchers testcapimodule can add: |
| 1611 | + TYPES = 0 # appends modified types to global event list |
| 1612 | + ERROR = 1 # unconditionally sets and signals a RuntimeException |
| 1613 | + WRAP = 2 # appends modified type wrapped in list to global event list |
| 1614 | + |
| 1615 | + # duplicating the C constant |
| 1616 | + TYPE_MAX_WATCHERS = 8 |
| 1617 | + |
| 1618 | + def add_watcher(self, kind=TYPES): |
| 1619 | + return _testcapi.add_type_watcher(kind) |
| 1620 | + |
| 1621 | + def clear_watcher(self, watcher_id): |
| 1622 | + _testcapi.clear_type_watcher(watcher_id) |
| 1623 | + |
| 1624 | + @contextmanager |
| 1625 | + def watcher(self, kind=TYPES): |
| 1626 | + wid = self.add_watcher(kind) |
| 1627 | + try: |
| 1628 | + yield wid |
| 1629 | + finally: |
| 1630 | + self.clear_watcher(wid) |
| 1631 | + |
| 1632 | + def assert_events(self, expected): |
| 1633 | + actual = _testcapi.get_type_modified_events() |
| 1634 | + self.assertEqual(actual, expected) |
| 1635 | + |
| 1636 | + def watch(self, wid, t): |
| 1637 | + _testcapi.watch_type(wid, t) |
| 1638 | + |
| 1639 | + def unwatch(self, wid, t): |
| 1640 | + _testcapi.unwatch_type(wid, t) |
| 1641 | + |
| 1642 | + def test_watch_type(self): |
| 1643 | + class C: pass |
| 1644 | + with self.watcher() as wid: |
| 1645 | + self.watch(wid, C) |
| 1646 | + C.foo = "bar" |
| 1647 | + self.assert_events([C]) |
| 1648 | + |
| 1649 | + def test_event_aggregation(self): |
| 1650 | + class C: pass |
| 1651 | + with self.watcher() as wid: |
| 1652 | + self.watch(wid, C) |
| 1653 | + C.foo = "bar" |
| 1654 | + C.bar = "baz" |
| 1655 | + # only one event registered for both modifications |
| 1656 | + self.assert_events([C]) |
| 1657 | + |
| 1658 | + def test_lookup_resets_aggregation(self): |
| 1659 | + class C: pass |
| 1660 | + with self.watcher() as wid: |
| 1661 | + self.watch(wid, C) |
| 1662 | + C.foo = "bar" |
| 1663 | + # lookup resets type version tag |
| 1664 | + self.assertEqual(C.foo, "bar") |
| 1665 | + C.bar = "baz" |
| 1666 | + # both events registered |
| 1667 | + self.assert_events([C, C]) |
| 1668 | + |
| 1669 | + def test_unwatch_type(self): |
| 1670 | + class C: pass |
| 1671 | + with self.watcher() as wid: |
| 1672 | + self.watch(wid, C) |
| 1673 | + C.foo = "bar" |
| 1674 | + self.assertEqual(C.foo, "bar") |
| 1675 | + self.assert_events([C]) |
| 1676 | + self.unwatch(wid, C) |
| 1677 | + C.bar = "baz" |
| 1678 | + self.assert_events([C]) |
| 1679 | + |
| 1680 | + def test_clear_watcher(self): |
| 1681 | + class C: pass |
| 1682 | + # outer watcher is unused, it's just to keep events list alive |
| 1683 | + with self.watcher() as _: |
| 1684 | + with self.watcher() as wid: |
| 1685 | + self.watch(wid, C) |
| 1686 | + C.foo = "bar" |
| 1687 | + self.assertEqual(C.foo, "bar") |
| 1688 | + self.assert_events([C]) |
| 1689 | + C.bar = "baz" |
| 1690 | + # Watcher on C has been cleared, no new event |
| 1691 | + self.assert_events([C]) |
| 1692 | + |
| 1693 | + def test_watch_type_subclass(self): |
| 1694 | + class C: pass |
| 1695 | + class D(C): pass |
| 1696 | + with self.watcher() as wid: |
| 1697 | + self.watch(wid, D) |
| 1698 | + C.foo = "bar" |
| 1699 | + self.assert_events([D]) |
| 1700 | + |
| 1701 | + def test_error(self): |
| 1702 | + class C: pass |
| 1703 | + with self.watcher(kind=self.ERROR) as wid: |
| 1704 | + self.watch(wid, C) |
| 1705 | + with catch_unraisable_exception() as cm: |
| 1706 | + C.foo = "bar" |
| 1707 | + self.assertIs(cm.unraisable.object, C) |
| 1708 | + self.assertEqual(str(cm.unraisable.exc_value), "boom!") |
| 1709 | + self.assert_events([]) |
| 1710 | + |
| 1711 | + def test_two_watchers(self): |
| 1712 | + class C1: pass |
| 1713 | + class C2: pass |
| 1714 | + with self.watcher() as wid1: |
| 1715 | + with self.watcher(kind=self.WRAP) as wid2: |
| 1716 | + self.assertNotEqual(wid1, wid2) |
| 1717 | + self.watch(wid1, C1) |
| 1718 | + self.watch(wid2, C2) |
| 1719 | + C1.foo = "bar" |
| 1720 | + C2.hmm = "baz" |
| 1721 | + self.assert_events([C1, [C2]]) |
| 1722 | + |
| 1723 | + def test_watch_non_type(self): |
| 1724 | + with self.watcher() as wid: |
| 1725 | + with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): |
| 1726 | + self.watch(wid, 1) |
| 1727 | + |
| 1728 | + def test_watch_out_of_range_watcher_id(self): |
| 1729 | + class C: pass |
| 1730 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| 1731 | + self.watch(-1, C) |
| 1732 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| 1733 | + self.watch(self.TYPE_MAX_WATCHERS, C) |
| 1734 | + |
| 1735 | + def test_watch_unassigned_watcher_id(self): |
| 1736 | + class C: pass |
| 1737 | + with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| 1738 | + self.watch(1, C) |
| 1739 | + |
| 1740 | + def test_unwatch_non_type(self): |
| 1741 | + with self.watcher() as wid: |
| 1742 | + with self.assertRaisesRegex(ValueError, r"Cannot watch non-type"): |
| 1743 | + self.unwatch(wid, 1) |
| 1744 | + |
| 1745 | + def test_unwatch_out_of_range_watcher_id(self): |
| 1746 | + class C: pass |
| 1747 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| 1748 | + self.unwatch(-1, C) |
| 1749 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| 1750 | + self.unwatch(self.TYPE_MAX_WATCHERS, C) |
| 1751 | + |
| 1752 | + def test_unwatch_unassigned_watcher_id(self): |
| 1753 | + class C: pass |
| 1754 | + with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| 1755 | + self.unwatch(1, C) |
| 1756 | + |
| 1757 | + def test_clear_out_of_range_watcher_id(self): |
| 1758 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID -1"): |
| 1759 | + self.clear_watcher(-1) |
| 1760 | + with self.assertRaisesRegex(ValueError, r"Invalid type watcher ID 8"): |
| 1761 | + self.clear_watcher(self.TYPE_MAX_WATCHERS) |
| 1762 | + |
| 1763 | + def test_clear_unassigned_watcher_id(self): |
| 1764 | + with self.assertRaisesRegex(ValueError, r"No type watcher set for ID 1"): |
| 1765 | + self.clear_watcher(1) |
| 1766 | + |
| 1767 | + def test_no_more_ids_available(self): |
| 1768 | + contexts = [self.watcher() for i in range(self.TYPE_MAX_WATCHERS)] |
| 1769 | + with ExitStack() as stack: |
| 1770 | + for ctx in contexts: |
| 1771 | + stack.enter_context(ctx) |
| 1772 | + with self.assertRaisesRegex(RuntimeError, r"no more type watcher IDs"): |
| 1773 | + self.add_watcher() |
| 1774 | + |
| 1775 | + |
1609 | 1776 | if __name__ == "__main__":
|
1610 | 1777 | unittest.main()
|
0 commit comments