@@ -498,12 +498,14 @@ def cleanup_resources(self, restart=False):
498
498
class ChannelQueue (Queue ):
499
499
500
500
channel_name : Optional [str ] = None
501
+ response_router_finished : bool
501
502
502
503
def __init__ (self , channel_name : str , channel_socket : websocket .WebSocket , log : Logger ):
503
504
super ().__init__ ()
504
505
self .channel_name = channel_name
505
506
self .channel_socket = channel_socket
506
507
self .log = log
508
+ self .response_router_finished = False
507
509
508
510
async def _async_get (self , timeout = None ):
509
511
if timeout is None :
@@ -516,6 +518,8 @@ async def _async_get(self, timeout=None):
516
518
try :
517
519
return self .get (block = False )
518
520
except Empty :
521
+ if self .response_router_finished :
522
+ raise RuntimeError ("Response router had finished" )
519
523
if monotonic () > end_time :
520
524
raise
521
525
await asyncio .sleep (0 )
@@ -597,17 +601,17 @@ class GatewayKernelClient(AsyncKernelClient):
597
601
598
602
# flag for whether execute requests should be allowed to call raw_input:
599
603
allow_stdin = False
600
- _channels_stopped = False
601
- _channel_queues : Optional [dict ] = {}
604
+ _channels_stopped : bool
605
+ _channel_queues : Optional [Dict [ str , ChannelQueue ]]
602
606
_control_channel : Optional [ChannelQueue ]
603
607
_hb_channel : Optional [ChannelQueue ]
604
608
_stdin_channel : Optional [ChannelQueue ]
605
609
_iopub_channel : Optional [ChannelQueue ]
606
610
_shell_channel : Optional [ChannelQueue ]
607
611
608
- def __init__ (self , ** kwargs ):
612
+ def __init__ (self , kernel_id , ** kwargs ):
609
613
super ().__init__ (** kwargs )
610
- self .kernel_id = kwargs [ " kernel_id" ]
614
+ self .kernel_id = kernel_id
611
615
self .channel_socket : Optional [websocket .WebSocket ] = None
612
616
self .response_router : Optional [Thread ] = None
613
617
@@ -642,13 +646,14 @@ async def start_channels(self, shell=True, iopub=True, stdin=True, hb=True, cont
642
646
enable_multithread = True ,
643
647
sslopt = ssl_options ,
644
648
)
645
- self .response_router = Thread (target = self ._route_responses )
646
- self .response_router .start ()
647
649
648
650
await ensure_async (
649
651
super ().start_channels (shell = shell , iopub = iopub , stdin = stdin , hb = hb , control = control )
650
652
)
651
653
654
+ self .response_router = Thread (target = self ._route_responses )
655
+ self .response_router .start ()
656
+
652
657
def stop_channels (self ):
653
658
"""Stops all the running channels for this kernel.
654
659
@@ -751,6 +756,11 @@ def _route_responses(self):
751
756
if not self ._channels_stopped :
752
757
self .log .warning (f"Unexpected exception encountered ({ be } )" )
753
758
759
+ # Notify channel queues that this thread had finished and no more messages are being received
760
+ assert self ._channel_queues is not None
761
+ for channel_queue in self ._channel_queues .values ():
762
+ channel_queue .response_router_finished = True
763
+
754
764
self .log .debug ("Response router thread exiting..." )
755
765
756
766
0 commit comments