|
| 1 | +from __future__ import annotations |
| 2 | + |
1 | 3 | import asyncio
|
2 | 4 | import dataclasses
|
3 | 5 | import time
|
4 | 6 | from asyncio.events import AbstractEventLoop
|
5 |
| -from typing import ( |
6 |
| - Any, |
7 |
| - AsyncGenerator, |
8 |
| - Awaitable, |
9 |
| - Dict, |
10 |
| - Optional, |
11 |
| - Sequence, |
12 |
| - Set, |
13 |
| - Union, |
14 |
| - cast, |
15 |
| -) |
| 7 | +from typing import Any, AsyncGenerator, Dict, Optional, Sequence, Set, Union, cast |
16 | 8 |
|
17 | 9 | from typing_extensions import Literal, TypeGuard
|
18 | 10 |
|
@@ -70,11 +62,11 @@ async def window_generator(
|
70 | 62 | # Wait until there are new messages available.
|
71 | 63 | most_recent_message_id = self.message_counter - 1
|
72 | 64 | while last_sent_id >= most_recent_message_id:
|
73 |
| - next_message = self.message_event.wait() |
| 65 | + next_message = asyncio.create_task(self.message_event.wait()) |
| 66 | + flush_wait = asyncio.create_task(self._flush_event.wait()) |
74 | 67 | send_window = False
|
75 | 68 | try:
|
76 |
| - flush_wait = self._flush_event.wait() |
77 |
| - done, pending = await asyncio.wait( # type: ignore |
| 69 | + done, pending = await asyncio.wait( |
78 | 70 | [flush_wait, next_message],
|
79 | 71 | timeout=window.max_time_until_ready(),
|
80 | 72 | return_when=asyncio.FIRST_COMPLETED,
|
@@ -147,28 +139,26 @@ def append_to_window(self, message: Union[Message, DoneSentinel]) -> None:
|
147 | 139 |
|
148 | 140 | async def wait_and_append_to_window(
|
149 | 141 | self,
|
150 |
| - message: Awaitable[Union[Message, DoneSentinel]], |
| 142 | + message_task: asyncio.Task[Union[Message, DoneSentinel]], |
151 | 143 | flush_event: asyncio.Event,
|
152 | 144 | ) -> bool:
|
153 | 145 | """Async version of `append_to_window()`. Returns `True` if successful, `False`
|
154 | 146 | if timed out."""
|
155 | 147 | if len(self._window) == 0:
|
156 |
| - self.append_to_window(await message) |
| 148 | + self.append_to_window(await message_task) |
157 | 149 | return True
|
158 | 150 |
|
159 |
| - message = asyncio.shield(message) |
160 |
| - flush_wait = asyncio.shield(flush_event.wait()) |
161 |
| - (done, pending) = await asyncio.wait( # type: ignore |
162 |
| - [message, flush_wait], |
| 151 | + flush_wait = asyncio.create_task(flush_event.wait()) |
| 152 | + done, pending = await asyncio.wait( |
| 153 | + [message_task, flush_wait], |
163 | 154 | timeout=self.max_time_until_ready(),
|
164 | 155 | return_when=asyncio.FIRST_COMPLETED,
|
165 | 156 | )
|
166 | 157 | del pending
|
167 | 158 | if flush_wait in done:
|
168 | 159 | flush_event.clear()
|
169 |
| - flush_wait.cancel() |
170 |
| - if message in cast(Set[Any], done): # Cast to prevent type narrowing. |
171 |
| - self.append_to_window(await message) |
| 160 | + if message_task in cast(Set[Any], done): # Cast to prevent type narrowing. |
| 161 | + self.append_to_window(await message_task) |
172 | 162 | return True
|
173 | 163 | return False
|
174 | 164 |
|
|
0 commit comments