Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Gateway] Track only this server's kernels #407

Merged
merged 1 commit into from
Feb 18, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
61 changes: 32 additions & 29 deletions jupyter_server/gateway/managers.py
Original file line number Diff line number Diff line change
Expand Up @@ -374,14 +374,14 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
The API path (unicode, '/' delimited) for the cwd.
Will be transformed to an OS path relative to root_dir.
"""
self.log.info('Request start kernel: kernel_id=%s, path="%s"', kernel_id, path)
self.log.info(f"Request start kernel: kernel_id={kernel_id}, path='{path}'")

if kernel_id is None:
if path is not None:
kwargs['cwd'] = self.cwd_for_path(path)
kernel_name = kwargs.get('kernel_name', 'python3')
kernel_url = self._get_kernel_endpoint_url()
self.log.debug("Request new kernel at: %s" % kernel_url)
self.log.debug(f"Request new kernel at: {kernel_url}")

# Let KERNEL_USERNAME take precedent over http_user config option.
if os.environ.get('KERNEL_USERNAME') is None and GatewayClient.instance().http_user:
Expand All @@ -399,12 +399,12 @@ async def start_kernel(self, kernel_id=None, path=None, **kwargs):
response = await gateway_request(kernel_url, method='POST', body=json_body)
kernel = json_decode(response.body)
kernel_id = kernel['id']
self.log.info("Kernel started: %s" % kernel_id)
self.log.debug("Kernel args: %r" % kwargs)
self.log.info(f"Kernel started: {kernel_id}")
self.log.debug(f"Kernel args: {kwargs}")
else:
kernel = await self.get_kernel(kernel_id)
kernel_id = kernel['id']
self.log.info("Using existing kernel: %s" % kernel_id)
self.log.info(f"Using existing kernel: {kernel_id}")

self._kernels[kernel_id] = kernel
return kernel_id
Expand All @@ -418,20 +418,25 @@ async def get_kernel(self, kernel_id=None, **kwargs):
The uuid of the kernel.
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id)
self.log.debug("Request kernel at: %s" % kernel_url)
self.log.debug(f"Request kernel at: {kernel_url}")
try:
response = await gateway_request(kernel_url, method='GET')
except web.HTTPError as error:
if error.status_code == 404:
self.log.warn("Kernel not found at: %s" % kernel_url)
self.log.warn(f"Kernel not found at: {kernel_url}")
self.remove_kernel(kernel_id)
kernel = None
else:
raise
else:
kernel = json_decode(response.body)
self._kernels[kernel_id] = kernel
self.log.debug("Kernel retrieved: %s" % kernel)
# Only update our models if we already know about this kernel
if kernel_id in self._kernels:
self._kernels[kernel_id] = kernel
self.log.debug(f"Kernel retrieved: {kernel}")
else:
self.log.warning(f"Kernel '{kernel_id}' is not managed by this instance.")
kernel = None
return kernel

async def kernel_model(self, kernel_id):
Expand All @@ -443,18 +448,18 @@ async def kernel_model(self, kernel_id):
kernel_id : uuid
The uuid of the kernel.
"""
self.log.debug("RemoteKernelManager.kernel_model: %s", kernel_id)
model = await self.get_kernel(kernel_id)
return model

async def list_kernels(self, **kwargs):
"""Get a list of kernels."""
kernel_url = self._get_kernel_endpoint_url()
self.log.debug("Request list kernels: %s", kernel_url)
self.log.debug(f"Request list kernels: {kernel_url}")
response = await gateway_request(kernel_url, method='GET')
kernels = json_decode(response.body)
self._kernels = {x['id']: x for x in kernels}
return kernels
# Only update our models if we already know about the kernels
self._kernels = {x['id']: x for x in kernels if x['id'] in self._kernels}
return list(self._kernels.values())

async def shutdown_kernel(self, kernel_id, now=False, restart=False):
"""Shutdown a kernel by its kernel uuid.
Expand All @@ -469,9 +474,9 @@ async def shutdown_kernel(self, kernel_id, now=False, restart=False):
The purpose of this shutdown is to restart the kernel (True)
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id)
self.log.debug("Request shutdown kernel at: %s", kernel_url)
self.log.debug(f"Request shutdown kernel at: {kernel_url}")
response = await gateway_request(kernel_url, method='DELETE')
self.log.debug("Shutdown kernel response: %d %s", response.code, response.reason)
self.log.debug(f"Shutdown kernel response: {response.code} {response.reason}")
self.remove_kernel(kernel_id)

async def restart_kernel(self, kernel_id, now=False, **kwargs):
Expand All @@ -483,9 +488,9 @@ async def restart_kernel(self, kernel_id, now=False, **kwargs):
The id of the kernel to restart.
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/restart'
self.log.debug("Request restart kernel at: %s", kernel_url)
self.log.debug(f"Request restart kernel at: {kernel_url}")
response = await gateway_request(kernel_url, method='POST', body=json_encode({}))
self.log.debug("Restart kernel response: %d %s", response.code, response.reason)
self.log.debug(f"Restart kernel response: {response.code} {response.reason}")

async def interrupt_kernel(self, kernel_id, **kwargs):
"""Interrupt a kernel by its kernel uuid.
Expand All @@ -496,9 +501,9 @@ async def interrupt_kernel(self, kernel_id, **kwargs):
The id of the kernel to interrupt.
"""
kernel_url = self._get_kernel_endpoint_url(kernel_id) + '/interrupt'
self.log.debug("Request interrupt kernel at: %s", kernel_url)
self.log.debug(f"Request interrupt kernel at: {kernel_url}")
response = await gateway_request(kernel_url, method='POST', body=json_encode({}))
self.log.debug("Interrupt kernel response: %d %s", response.code, response.reason)
self.log.debug(f"Interrupt kernel response: {response.code} {response.reason}")

def shutdown_all(self, now=False):
"""Shutdown all kernels."""
Expand All @@ -507,15 +512,15 @@ def shutdown_all(self, now=False):
kwargs = {'method': 'DELETE'}
kwargs = GatewayClient.instance().load_connection_args(**kwargs)
client = HTTPClient()
for kernel_id in self._kernels.keys():
for kernel_id in self._kernels:
kernel_url = self._get_kernel_endpoint_url(kernel_id)
self.log.debug("Request delete kernel at: %s", kernel_url)
self.log.debug(f"Request delete kernel at: {kernel_url}")
try:
response = client.fetch(kernel_url, **kwargs)
except HTTPError:
pass
else:
self.log.debug("Delete kernel response: %d %s", response.code, response.reason)
self.log.debug(f"Delete kernel response: {response.code} {response.reason}")
shutdown_kernels.append(kernel_id) # avoid changing dict size during iteration
client.close()
for kernel_id in shutdown_kernels:
Expand Down Expand Up @@ -562,10 +567,8 @@ async def get_all_specs(self):
km = self.parent.kernel_manager
remote_default_kernel_name = fetched_kspecs.get('default')
if remote_default_kernel_name != km.default_kernel_name:
self.log.info("Default kernel name on Gateway server ({gateway_default}) differs from "
"Notebook server ({notebook_default}). Updating to Gateway server's value.".
format(gateway_default=remote_default_kernel_name,
notebook_default=km.default_kernel_name))
self.log.info(f"Default kernel name on Gateway server ({remote_default_kernel_name}) differs from "
f"Notebook server ({km.default_kernel_name}). Updating to Gateway server's value.")
km.default_kernel_name = remote_default_kernel_name

remote_kspecs = fetched_kspecs.get('kernelspecs')
Expand All @@ -574,7 +577,7 @@ async def get_all_specs(self):
async def list_kernel_specs(self):
"""Get a list of kernel specs."""
kernel_spec_url = self._get_kernelspecs_endpoint_url()
self.log.debug("Request list kernel specs at: %s", kernel_spec_url)
self.log.debug(f"Request list kernel specs at: {kernel_spec_url}")
response = await gateway_request(kernel_spec_url, method='GET')
kernel_specs = json_decode(response.body)
return kernel_specs
Expand All @@ -588,7 +591,7 @@ async def get_kernel_spec(self, kernel_name, **kwargs):
The name of the kernel.
"""
kernel_spec_url = self._get_kernelspecs_endpoint_url(kernel_name=str(kernel_name))
self.log.debug("Request kernel spec at: %s" % kernel_spec_url)
self.log.debug(f"Request kernel spec at: {kernel_spec_url}")
try:
response = await gateway_request(kernel_spec_url, method='GET')
except web.HTTPError as error:
Expand Down Expand Up @@ -617,7 +620,7 @@ async def get_kernel_spec_resource(self, kernel_name, path):
The name of the desired resource
"""
kernel_spec_resource_url = url_path_join(self.base_resource_endpoint, str(kernel_name), str(path))
self.log.debug("Request kernel spec resource '{}' at: {}".format(path, kernel_spec_resource_url))
self.log.debug(f"Request kernel spec resource '{path}' at: {kernel_spec_resource_url}")
try:
response = await gateway_request(kernel_spec_resource_url, method='GET')
except web.HTTPError as error:
Expand Down
4 changes: 2 additions & 2 deletions jupyter_server/services/kernels/handlers.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ async def post(self):
class KernelHandler(APIHandler):

@web.authenticated
def get(self, kernel_id):
async def get(self, kernel_id):
km = self.kernel_manager
model = km.kernel_model(kernel_id)
model = await ensure_async(km.kernel_model(kernel_id))
self.finish(json.dumps(model, default=date_default))

@web.authenticated
Expand Down