|
4 | 4 | captured by the client and rendered in the notebook status bar.
|
5 | 5 | """
|
6 | 6 | import asyncio
|
7 |
| -import os |
8 |
| -import socket |
9 | 7 | import time
|
10 | 8 | from typing import Any
|
11 | 9 | from typing import Dict
|
|
15 | 13 |
|
16 | 14 | from jupyter_client import provisioning
|
17 | 15 | from jupyter_client.connect import KernelConnectionInfo
|
18 |
| -from kubernetes_asyncio import client as kclient |
19 |
| -from kubernetes_asyncio import config as kconfig |
20 |
| -from kubernetes_asyncio.client.api_client import ApiClient |
21 | 16 | from tornado.escape import json_decode
|
22 | 17 | from tornado.httpclient import HTTPClientError
|
23 | 18 | from tornado.web import HTTPError
|
|
39 | 34 | from data_studio_jupyter_extensions.traits import IntFromEnv
|
40 | 35 | from data_studio_jupyter_extensions.traits import UnicodeFromEnv
|
41 | 36 | from data_studio_jupyter_extensions.utils import get_available_mode_names
|
42 |
| -from data_studio_jupyter_extensions.utils import run_async |
43 | 37 |
|
44 | 38 |
|
45 | 39 | class KernelFailedError(Exception):
|
@@ -127,118 +121,6 @@ def connection_info(self) -> KernelConnectionInfo:
|
127 | 121 | )
|
128 | 122 | return info
|
129 | 123 |
|
130 |
| - async def _run_kube_method( |
131 |
| - self, method_type, name, quiet=False |
132 |
| - ): # pragma: no cover |
133 |
| - if not quiet: |
134 |
| - self.log.debug(f"{method_type}ing service {name}") |
135 |
| - await kconfig.load_kube_config() |
136 |
| - |
137 |
| - # use the context manager to close http sessions automatically |
138 |
| - async with ApiClient() as api: |
139 |
| - |
140 |
| - v1 = kclient.CoreV1Api(api) |
141 |
| - method = getattr(v1, f"{method_type}_namespaced_service") |
142 |
| - return await method(namespace=self.namespace, name=name) |
143 |
| - |
144 |
| - async def open_route(self, process_id): # pragma: no cover |
145 |
| - """Connect to a remote kernel given by a process id |
146 |
| - Returns a map of port by port name |
147 |
| -
|
148 |
| - Prerequisites: |
149 |
| - kcli init |
150 |
| -
|
151 |
| - Representative url: |
152 |
| - https://ds-int.apple.com/projects/2j1cd6f8ekj1/notebooks/servers/p4xpnb1208u4/kernels/launch?notebookName=Untitled.ipynb&kernelspecId=bkvgsydsi1rd&kernelspecDisplayName=Python+3&kernelspecLang=python&kernelId=vhz7kghpaujd |
153 |
| -
|
154 |
| - The process id is the kernelId in the above url |
155 |
| -
|
156 |
| - Used them when launching the kernel instead of using the env variables |
157 |
| - only if LOCAL_MODE is set |
158 |
| - """ |
159 |
| - logger = self.log |
160 |
| - here = os.path.abspath(os.path.dirname(__file__)) |
161 |
| - |
162 |
| - # First check if the kernel is already set up and accessible |
163 |
| - try: |
164 |
| - name = f"ipython-{process_id}-kernel-service-iopub" |
165 |
| - ret = await self._run_kube_method("read", name) |
166 |
| - ip = ret.spec.external_i_ps[0] |
167 |
| - |
168 |
| - message = f"Connecting to running kernel for {process_id}" |
169 |
| - self._emit_kernel_message(message) |
170 |
| - connected = True |
171 |
| - except Exception as e: |
172 |
| - logger.error(str(e)) |
173 |
| - connected = False |
174 |
| - |
175 |
| - # Create the connection if needed |
176 |
| - if not connected: |
177 |
| - message = f"Creating a connection to kernel: {process_id}" |
178 |
| - self._emit_kernel_message(message) |
179 |
| - |
180 |
| - # Remove the root service and network policy created by notebook-service |
181 |
| - name = f"ipython-{process_id}-kernel-service" |
182 |
| - try: |
183 |
| - await self._run_kube_method("delete", name) |
184 |
| - except Exception as e: |
185 |
| - logger.error(e) |
186 |
| - |
187 |
| - cmd = f"kubectl delete AppleNetworkPolicy ipython-{process_id}" |
188 |
| - await run_async(cmd, logger) |
189 |
| - |
190 |
| - # Install the helm chart |
191 |
| - cmd = "helm uninstall remote-kernel" |
192 |
| - await run_async(cmd, logger) |
193 |
| - |
194 |
| - helm_file = os.path.abspath(os.path.join(here, "..", "helm")) |
195 |
| - cmd = f"helm install --set process={process_id} remote-kernel {helm_file}" |
196 |
| - ret = await run_async(cmd, logger) |
197 |
| - assert ret == 0, "Could not run helm install" |
198 |
| - |
199 |
| - # Get the port map |
200 |
| - ports = {} |
201 |
| - annotation = "pie.traffic.plb/tcp_service_port" |
202 |
| - |
203 |
| - for name in ["hb", "control", "iopub", "shell", "stdin"]: |
204 |
| - service = f"ipython-{process_id}-kernel-service-{name}" |
205 |
| - self._emit_kernel_message(f"> {cmd}") |
206 |
| - attempts = 0 |
207 |
| - while attempts < 100: |
208 |
| - ret = await self._run_kube_method("read", service, quiet=True) |
209 |
| - port = ret.metadata.annotations.get(annotation, "") |
210 |
| - if port: |
211 |
| - ports[name] = int(port) |
212 |
| - break |
213 |
| - await asyncio.sleep(0.1) |
214 |
| - if attempts == 10: |
215 |
| - raise ValueError(f"Could not find port for {name}") |
216 |
| - |
217 |
| - # Establish a connection to the shell and iopub ports |
218 |
| - name = f"ipython-{process_id}-kernel-service-iopub" |
219 |
| - ret = await self._run_kube_method("read", name) |
220 |
| - ip = ret.spec.external_i_ps[0] |
221 |
| - |
222 |
| - for name in ["shell", "iopub"]: |
223 |
| - port = ports[name] |
224 |
| - location = (ip, port) |
225 |
| - attempts = 0 |
226 |
| - message = f"Connecting to {name} on {ip}:{port}" |
227 |
| - self._emit_kernel_message(message) |
228 |
| - allowed_attempts = 6000 |
229 |
| - while attempts < allowed_attempts: |
230 |
| - sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) |
231 |
| - sock.settimeout(0.1) |
232 |
| - result = sock.connect_ex(location) |
233 |
| - if result == 0: |
234 |
| - break |
235 |
| - attempts += 1 |
236 |
| - await asyncio.sleep(0.1) |
237 |
| - if attempts == allowed_attempts: |
238 |
| - raise ValueError(f"Could not connect to port for {name}") |
239 |
| - |
240 |
| - return ip, ports |
241 |
| - |
242 | 124 | @property
|
243 | 125 | def has_process(self) -> bool:
|
244 | 126 | """
|
@@ -403,22 +285,8 @@ async def pre_launch(self, **kwargs: Any) -> Dict[str, Any]:
|
403 | 285 |
|
404 | 286 | async def _fetch_connection_info(self) -> None:
|
405 | 287 | """
|
406 |
| - Fetch connection info from notebook-service (or the control |
407 |
| - plane in local-cluster mode). |
| 288 | + Fetch connection info from notebook-service |
408 | 289 | """
|
409 |
| - if self.mode == "local-cluster": |
410 |
| - # Ports and host will depend on the ingress routes |
411 |
| - # set by the kubernetes cluster. |
412 |
| - host, port_map = await self.open_route(self.process_id) |
413 |
| - # set up the connection info |
414 |
| - self.iopub_port = port_map["iopub"] |
415 |
| - self.hb_port = port_map["hb"] |
416 |
| - self.control_port = port_map["control"] |
417 |
| - self.shell_port = port_map["shell"] |
418 |
| - self.stdin_port = port_map["stdin"] |
419 |
| - self.ip = self.ip or host |
420 |
| - return |
421 |
| - |
422 | 290 | r = await self.nbservice_client.get_kernel_details(self.process_id)
|
423 | 291 | kernel_info = json_decode(r.body)
|
424 | 292 | self.ip = self.ip or kernel_info["host"]
|
|
0 commit comments