From 9ecf95ad96d684cebbc86d8837b9367f7a3e46ad Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=82=85=E7=AB=8B=E4=B8=9A=EF=BC=88Chris=20Fu=EF=BC=89?= <17433201@qq.com> Date: Wed, 29 Jun 2022 14:40:37 +0800 Subject: [PATCH] Make `_T` in `Observable[_T]` covariant --- reactivex/observable/observable.py | 38 +++++++++++++++--------------- 1 file changed, 19 insertions(+), 19 deletions(-) diff --git a/reactivex/observable/observable.py b/reactivex/observable/observable.py index d13b4237d..6a5bc0c59 100644 --- a/reactivex/observable/observable.py +++ b/reactivex/observable/observable.py @@ -20,16 +20,16 @@ _F = TypeVar("_F") _G = TypeVar("_G") -_T = TypeVar("_T") +_T_out = TypeVar("_T_out", covariant=True) -class Observable(abc.ObservableBase[_T]): +class Observable(abc.ObservableBase[_T_out]): """Observable base class. Represents a push-style collection, which you can :func:`pipe ` into :mod:`operators `.""" - def __init__(self, subscribe: Optional[abc.Subscription[_T]] = None) -> None: + def __init__(self, subscribe: Optional[abc.Subscription[_T_out]] = None) -> None: """Creates an observable sequence object from the specified subscription function. @@ -43,14 +43,14 @@ def __init__(self, subscribe: Optional[abc.Subscription[_T]] = None) -> None: def _subscribe_core( self, - observer: abc.ObserverBase[_T], + observer: abc.ObserverBase[_T_out], scheduler: Optional[abc.SchedulerBase] = None, ) -> abc.DisposableBase: return self._subscribe(observer, scheduler) if self._subscribe else Disposable() def subscribe( self, - on_next: Optional[Union[abc.ObserverBase[_T], abc.OnNext[_T], None]] = None, + on_next: Optional[Union[abc.ObserverBase[_T_out], abc.OnNext[_T_out], None]] = None, on_error: Optional[abc.OnError] = None, on_completed: Optional[abc.OnCompleted] = None, *, @@ -93,12 +93,12 @@ def subscribe( or hasattr(on_next, "on_next") and callable(getattr(on_next, "on_next")) ): - obv = cast(abc.ObserverBase[_T], on_next) + obv = cast(abc.ObserverBase[_T_out], on_next) on_next = obv.on_next on_error = obv.on_error on_completed = obv.on_completed - auto_detach_observer: AutoDetachObserver[_T] = AutoDetachObserver( + auto_detach_observer: AutoDetachObserver[_T_out] = AutoDetachObserver( on_next, on_error, on_completed ) @@ -145,13 +145,13 @@ def set_disposable( return Disposable(auto_detach_observer.dispose) @overload - def pipe(self, __op1: Callable[[Observable[_T]], _A]) -> _A: + def pipe(self, __op1: Callable[[Observable[_T_out]], _A]) -> _A: ... @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], ) -> _B: ... @@ -159,7 +159,7 @@ def pipe( @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], __op3: Callable[[_B], _C], ) -> _C: @@ -168,7 +168,7 @@ def pipe( @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], __op3: Callable[[_B], _C], __op4: Callable[[_C], _D], @@ -178,7 +178,7 @@ def pipe( @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], __op3: Callable[[_B], _C], __op4: Callable[[_C], _D], @@ -189,7 +189,7 @@ def pipe( @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], __op3: Callable[[_B], _C], __op4: Callable[[_C], _D], @@ -201,7 +201,7 @@ def pipe( @overload def pipe( self, - __op1: Callable[[Observable[_T]], _A], + __op1: Callable[[Observable[_T_out]], _A], __op2: Callable[[_A], _B], __op3: Callable[[_B], _C], __op4: Callable[[_C], _D], @@ -256,7 +256,7 @@ def run(self) -> Any: return run(self) - def __await__(self) -> Generator[Any, None, _T]: + def __await__(self) -> Generator[Any, None, _T_out]: """Awaits the given observable. Returns: @@ -265,12 +265,12 @@ def __await__(self) -> Generator[Any, None, _T]: from ..operators._tofuture import to_future_ loop = asyncio.get_event_loop() - future: asyncio.Future[_T] = self.pipe( + future: asyncio.Future[_T_out] = self.pipe( to_future_(scheduler=AsyncIOScheduler(loop=loop)) ) return future.__await__() - def __add__(self, other: Observable[_T]) -> Observable[_T]: + def __add__(self, other: Observable[_T_out]) -> Observable[_T_out]: """Pythonic version of :func:`concat `. Example: @@ -286,7 +286,7 @@ def __add__(self, other: Observable[_T]) -> Observable[_T]: return concat(self, other) - def __iadd__(self, other: Observable[_T]) -> "Observable[_T]": + def __iadd__(self, other: Observable[_T_out]) -> "Observable[_T_out]": """Pythonic use of :func:`concat `. Example: @@ -302,7 +302,7 @@ def __iadd__(self, other: Observable[_T]) -> "Observable[_T]": return concat(self, other) - def __getitem__(self, key: Union[slice, int]) -> Observable[_T]: + def __getitem__(self, key: Union[slice, int]) -> Observable[_T_out]: """ Pythonic version of :func:`slice `.