Skip to content

Commit 67e713c

Browse files
ZsailerGitHub Enterprise
authored and
GitHub Enterprise
committed
Consistent handling/emitting of kernel session state (jupyter-server#366)
* update state * streamline events about sessions * compute a kernel path server-side and emit that in event data * rename some modules * code review cleanup * define kernel states as constants for easier future maintenance * fix broken unit test
1 parent 8cea5b8 commit 67e713c

22 files changed

+440
-406
lines changed

conftest.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@
1010
from data_studio_jupyter_extensions.configurables.notebook_service import (
1111
NotebookServiceClient,
1212
)
13-
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
13+
from data_studio_jupyter_extensions.extensions.events.bus import EventBus
1414
from data_studio_jupyter_extensions.tests.mock.client import MockNotebookServiceClient
1515
from data_studio_jupyter_extensions.tests.mock.utils import load_openapi_spec
1616

data_studio_jupyter_extensions/configurables/kernel_manager.py

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,9 +42,9 @@ async def post_start_kernel(self, **kwargs):
4242
async def start_heartbeat(self):
4343
"""Start a heartbeat for the kernel."""
4444
# Connect a heartbeat from the manager.
45-
self._emit_status(
46-
status="Connecting",
47-
description="",
45+
self._emit(
46+
state=constants.KERNEL_STATE.CONNECTING,
47+
msg="Waiting for a successful heartbeat from the kernel.",
4848
)
4949
client = self.client()
5050
self.heartbeat = client.hb_channel

data_studio_jupyter_extensions/configurables/kernel_restarter.py

+17-17
Original file line numberDiff line numberDiff line change
@@ -5,6 +5,7 @@
55
from traitlets import Float
66
from traitlets import Int
77

8+
from data_studio_jupyter_extensions import constants
89
from data_studio_jupyter_extensions.configurables.kernel_status import KernelStatusMixin
910

1011

@@ -39,13 +40,15 @@ def _dead_state(self):
3940
self.log.warning(
4041
f"Kernel (kernel_id={self.kernel_manager.kernel_id}) is no longer running in Notebook Service."
4142
)
42-
self._emit_status(status="Dead", description="Kernel is no longer running.")
43+
self._emit(
44+
state=constants.KERNEL_STATE.DEAD, msg="Kernel is no longer running."
45+
)
4346

4447
def _disconnected_state(self):
4548
self.log.warning(f"No heartbeat detected for: {self.kernel_manager.kernel_id}")
46-
self._emit_status(
47-
status="Disconnected",
48-
description="Kernel appears to be running, but a connection could not be established.",
49+
self._emit(
50+
state=constants.KERNEL_STATE.DISCONNECTED,
51+
msg="Kernel appears to be running, but a connection could not be established.",
4952
)
5053
self.stop()
5154

@@ -60,10 +63,10 @@ async def poll(self):
6063

6164
# If the kernel is communicating, we're good here.
6265
if km.is_communicating():
63-
if self._attempt_count > 0:
64-
self._emit_status(
65-
status="Connected",
66-
description="",
66+
if not self._connected_once or self._attempt_count > 0:
67+
self._emit(
68+
state=constants.KERNEL_STATE.CONNECTED,
69+
msg="Kernel heartbeat established.",
6770
)
6871
self._connected_once = True
6972
self._attempt_count = 0
@@ -73,9 +76,9 @@ async def poll(self):
7376

7477
# Check if the kernel ever successfully connected.
7578
if not self._connected_once:
76-
self._emit_status(
77-
status="Connecting",
78-
description="",
79+
self._emit(
80+
state=constants.KERNEL_STATE.CONNECTING,
81+
msg="",
7982
)
8083
# Kernel is disconnected due to timeout.
8184
if km.heartbeat_timeout < (now - self._start_time):
@@ -85,12 +88,9 @@ async def poll(self):
8588
else:
8689
self._attempt_count += 1
8790
if self._attempt_count == 1:
88-
self.log.warning(
89-
f"Missed a heartbeat for kernel: {self.kernel_manager.kernel_id}"
90-
)
91-
self._emit_status(
92-
status="Missed a kernel heartbeat. Trying to reconnect.",
93-
description="",
91+
self._emit(
92+
state=constants.KERNEL_STATE.DISCONNECTED,
93+
msg="Missed a kernel heartbeat. Trying to reconnect.",
9494
)
9595
elif self._attempt_count == self.restart_limit:
9696
self._last_attempt = now
Original file line numberDiff line numberDiff line change
@@ -1,16 +1,4 @@
1-
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
2-
3-
4-
KERNEL_STATES = [
5-
"unknown",
6-
"starting",
7-
"started",
8-
"terminating",
9-
"dead",
10-
"connecting",
11-
"connected",
12-
"disconnected",
13-
]
1+
from data_studio_jupyter_extensions.extensions.events.bus import EventBus
142

153

164
class KernelStatusMixin:
@@ -22,35 +10,39 @@ class KernelStatusMixin:
2210
def event_bus(self):
2311
return EventBus.instance()
2412

25-
def _emit_status(self, status, description=""):
13+
def _emit(self, state, msg=""):
14+
"""Emit a kernel's message to both the banner and the console."""
15+
self._emit_banner(state, msg)
16+
self._emit_console(state, msg)
17+
18+
def _emit_banner(self, state, msg=""):
2619
"""Emit a kernel status event."""
27-
self.log.debug(f"{self.kernel_id} status {status} ({description})")
2820
self.event_bus.record_event(
29-
schema_name="event.datastudio.jupyter.com/kernel-status",
21+
schema_name="event.datastudio.jupyter.com/kernel-message",
3022
version=1,
3123
event={
3224
"notebook_id": self.notebook_id or "Not set",
3325
"process_id": self.process_id or "Not set yet",
3426
"kernel_id": self.kernel_id,
35-
"status": status,
36-
"details": description,
27+
"state": state,
28+
"msg": msg,
29+
"banner": True,
30+
"console": False,
3731
},
3832
)
3933

40-
def _emit(self, message):
41-
"""Am alias to emit a message"""
42-
self._emit_kernel_message(message)
43-
44-
def _emit_kernel_message(self, message):
45-
"""Emit a kernel message."""
46-
self.log.debug(f"{self.kernel_id} message: {message}")
34+
def _emit_console(self, state, msg):
35+
"""Emit a kernel message to the log console in JupyterLab."""
4736
self.event_bus.record_event(
4837
schema_name="event.datastudio.jupyter.com/kernel-message",
4938
version=1,
5039
event={
5140
"notebook_id": self.notebook_id or "Not set",
5241
"process_id": self.process_id or "Not set yet",
53-
"description": message,
5442
"kernel_id": self.kernel_id,
43+
"state": state,
44+
"msg": msg,
45+
"banner": False,
46+
"console": True,
5547
},
5648
)

data_studio_jupyter_extensions/configurables/multi_kernel_manager.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
from traitlets import DottedObjectName
55

66
from data_studio_jupyter_extensions.configurables.provisioner import KernelFailedError
7-
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
7+
from data_studio_jupyter_extensions.extensions.events.bus import EventBus
88

99

1010
def raise_error_if_pending(method):

data_studio_jupyter_extensions/configurables/provisioner.py

+22-21
Original file line numberDiff line numberDiff line change
@@ -307,8 +307,8 @@ async def _wait_for_status(
307307
s = "Pod Pending"
308308

309309
# Logging and eventbus
310-
self._emit_kernel_message(message)
311-
self._emit_status(s, d)
310+
self._emit_console(s, message)
311+
self._emit_banner(s, d)
312312

313313
# If the status is known, return
314314
if resp["status"].lower() in status:
@@ -362,9 +362,13 @@ async def kill(self, restart: bool = False) -> None:
362362
363363
restart is True if this operation will precede a subsequent launch_kernel request.
364364
"""
365-
self._emit_status(status="Terminating the current kernel.")
365+
self._emit(
366+
constants.KERNEL_STATE.TERMINATING, msg="Shutting down the current kernel."
367+
)
366368
await self.nbservice_client.stop_kernel(self.process_id)
367-
self._emit_status(status="Dead", description="Kernel is terminated.")
369+
self._emit(
370+
constants.KERNEL_STATE.DEAD, msg="Kernel has been successfully terminated."
371+
)
368372
self.process_id = None
369373

370374
async def terminate(self, restart: bool = False) -> None:
@@ -378,9 +382,11 @@ async def terminate(self, restart: bool = False) -> None:
378382
379383
restart is True if this operation precedes a start launch_kernel request.
380384
"""
381-
self._emit_status(status="Terminating the current kernel.")
385+
self._emit(
386+
constants.KERNEL_STATE.TERMINATING, msg="Terminating the current kernel."
387+
)
382388
await self.nbservice_client.stop_kernel(self.process_id)
383-
self._emit_status(status="Dead", description="Kernel is terminated.")
389+
self._emit(constants.KERNEL_STATE.DEAD, msg="Kernel is terminated.")
384390
self.process_id = None
385391

386392
async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
@@ -428,22 +434,14 @@ async def launch_kernel(
428434
This method is called from `KernelManager.launch_kernel()` during the
429435
kernel manager's start kernel sequence.
430436
"""
431-
# If the caller provided metadata for an already-running
432-
# kernel, get that data here and check if its valid
433-
# in the next step.
434-
if kwargs.get("ds_metadata"):
435-
metadata = kwargs["ds_metadata"]
436-
if metadata.get("id") == self.notebook_id:
437-
self.process_id = metadata.get("kernel")
438-
439437
if "process_id" in kwargs:
440438
self.process_id = kwargs["process_id"]
441439

442440
# If a process ID already exists for this provisioner,
443441
# check with the notebook-service to see if its still running.
444442
if self.process_id:
445-
self._emit_status("Kernel found", "checking its status")
446443
try:
444+
self._emit(constants.KERNEL_STATE.RECONNECTING)
447445
r = await self.nbservice_client.get_kernel_status(
448446
self.process_id, query_params_dict={"cause": "view"}
449447
)
@@ -456,25 +454,28 @@ async def launch_kernel(
456454
"process_id": self.process_id,
457455
"kernel_id": self.kernel_id,
458456
}
459-
self._emit_status("Reconnecting")
460457
# Fetch the kernel connection info if the kernel already exists.
461458
await self._fetch_connection_info()
462459
return self.connection_info
463-
# If notebook service returned an error, start a new kernel.
464-
self._emit("The listed kernel no longer exists. Starting a new kernel.")
460+
raise HTTPError
465461
except (HTTPError, HTTPClientError):
466462
# If notebook service returned an error, start a new kernel.
467-
self._emit("The listed kernel no longer exists. Starting a new kernel.")
463+
self._emit(
464+
constants.KERNEL_STATE.DEAD,
465+
"The listed kernel no longer exists. Starting a new kernel.",
466+
)
468467

469468
# If the block above didn't return, a new kernel is needed.
470469
# Request notebook service to start a kernel.
471-
self._emit_status("Starting", "This may take a minute...")
470+
self._emit(
471+
constants.KERNEL_STATE.STARTING,
472+
msg=f"Launching a {self.kernel_spec.display_name} kernel.",
473+
)
472474
r = await self.nbservice_client.start_kernel(self.kernel_spec.name)
473475

474476
# Store the new process ID from notebook service
475477
response = json_decode(r.body)
476478
self.process_id = str(response["id"])
477-
478479
await self.wait_for_ready(timeout=self.launch_timeout)
479480

480481
# Set the connection_info for the given mode.

data_studio_jupyter_extensions/configurables/session_manager.py

+26-34
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
from data_studio_jupyter_extensions.configurables.notebook_service import (
1919
NotebookServiceClient,
2020
)
21-
from data_studio_jupyter_extensions.extensions.events.logger import EventBus
21+
from data_studio_jupyter_extensions.extensions.events.bus import EventBus
2222

2323

2424
KERNEL_SESSION_DB_PATH = osp.join(jupyter_runtime_dir(), "jupyter-session.db")
@@ -29,6 +29,12 @@ class RemoteKernelRecord(KernelRecord):
2929
process_id: Union[None, str] = None
3030

3131

32+
@dataclass(eq=False)
33+
class RemoteKernelSessionRecord(KernelSessionRecord):
34+
path: Union[None, str] = None
35+
name: Union[None, str] = None
36+
37+
3238
class DataStudioSessionManager(SynchronizerSessionManager):
3339

3440
database_filepath = Unicode(KERNEL_SESSION_DB_PATH)
@@ -79,9 +85,10 @@ async def start_kernel_for_session(self, session_id, path, name, type, kernel_na
7985
# Assign a kernel id so we can use it in the client before the kernel has started.
8086
kernel_id = str(uuid.uuid4())
8187

82-
# Update the pending session,
8388
self._pending_sessions.update(
84-
KernelSessionRecord(session_id=session_id, kernel_id=kernel_id)
89+
RemoteKernelSessionRecord(
90+
session_id=session_id, kernel_id=kernel_id, path=path, name=name
91+
)
8592
)
8693

8794
# allow contents manager to specify kernels cwd
@@ -96,46 +103,31 @@ async def start_kernel_for_session(self, session_id, path, name, type, kernel_na
96103
return model
97104

98105
async def list_sessions(self):
99-
self.event_bus.record_event(
100-
schema_name="event.datastudio.jupyter.com/syncing-state",
101-
version=1,
102-
event={
103-
"syncing": True,
104-
"msg": "Syncing running sessions...",
105-
"last_sync": self._last_sync,
106-
},
107-
)
106+
self._emit_sync_message(syncing=True, msg="Syncing running sessions...")
108107
# Run the synchronizer loop
109108
try:
110109
await self.sync_managers()
111110
# Update the last sync time
112111
self._last_sync = datetime.datetime.now().strftime("%c")
113-
self.event_bus.record_event(
114-
schema_name="event.datastudio.jupyter.com/syncing-state",
115-
version=1,
116-
event={
117-
"syncing": False,
118-
"msg": "Successfully synced.",
119-
"last_sync": self._last_sync,
120-
},
121-
)
112+
self._emit_sync_message(syncing=False, msg="Successfully synced.")
122113
except Exception as e:
123-
last_sync = datetime.datetime.now().strftime("%c")
124-
self.event_bus.record_event(
125-
schema_name="event.datastudio.jupyter.com/syncing-state",
126-
version=1,
127-
event={
128-
"syncing": False,
129-
"msg": (
130-
"Failed to sync:"
131-
f"{last_sync}"
132-
"This tab might be out of date."
133-
),
134-
"last_sync": self._last_sync,
135-
},
114+
self._emit_sync_message(
115+
syncing=False, msg="Failed to sync. This tab might be out of date."
136116
)
137117
self.log.error(e)
138118
pass
139119

140120
out = await SessionManager.list_sessions(self)
141121
return out
122+
123+
def _emit_sync_message(self, syncing: bool, msg=""):
124+
"""Emit an event message to the synchronizer client."""
125+
self.event_bus.record_event(
126+
schema_name="event.datastudio.jupyter.com/syncing-state",
127+
version=1,
128+
event={
129+
"syncing": syncing,
130+
"msg": msg,
131+
"last_sync": self._last_sync,
132+
},
133+
)

0 commit comments

Comments
 (0)