|
1 | 1 | # Copyright (c) Jupyter Development Team.
|
2 | 2 | # Distributed under the terms of the Modified BSD License.
|
| 3 | +import asyncio |
3 | 4 | import json
|
| 5 | +import logging |
4 | 6 | import os
|
| 7 | +import typing as ty |
5 | 8 | from socket import gaierror
|
6 | 9 |
|
7 | 10 | from tornado import web
|
8 |
| -from tornado.httpclient import AsyncHTTPClient, HTTPError |
| 11 | +from tornado.httpclient import AsyncHTTPClient, HTTPClientError, HTTPResponse |
9 | 12 | from traitlets import Bool, Float, Int, TraitError, Unicode, default, validate
|
10 | 13 | from traitlets.config import SingletonConfigurable
|
11 | 14 |
|
@@ -417,40 +420,101 @@ def load_connection_args(self, **kwargs):
|
417 | 420 | return kwargs
|
418 | 421 |
|
419 | 422 |
|
420 |
| -async def gateway_request(endpoint, **kwargs): |
| 423 | +class RetryableHTTPClient: |
| 424 | + """ |
| 425 | + Inspired by urllib.util.Retry (https://urllib3.readthedocs.io/en/stable/reference/urllib3.util.html), |
| 426 | + this class is initialized with desired retry characteristics, uses a recursive method `fetch()` against an instance |
| 427 | + of `AsyncHTTPClient` which tracks the current retry count across applicable request retries. |
| 428 | + """ |
| 429 | + |
| 430 | + MAX_RETRIES_DEFAULT = 2 |
| 431 | + MAX_RETRIES_CAP = 10 # The upper limit to max_retries value. |
| 432 | + max_retries: int = int(os.getenv("JUPYTER_GATEWAY_MAX_REQUEST_RETRIES", MAX_RETRIES_DEFAULT)) |
| 433 | + max_retries = max(0, min(max_retries, MAX_RETRIES_CAP)) # Enforce boundaries |
| 434 | + retried_methods: ty.Set[str] = {"GET", "DELETE"} |
| 435 | + retried_errors: ty.Set[int] = {502, 503, 504, 599} |
| 436 | + retried_exceptions: ty.Set[type] = {ConnectionError} |
| 437 | + backoff_factor: float = 0.1 |
| 438 | + |
| 439 | + def __init__(self): |
| 440 | + self.retry_count: int = 0 |
| 441 | + self.client: AsyncHTTPClient = AsyncHTTPClient() |
| 442 | + |
| 443 | + async def fetch(self, endpoint: str, **kwargs: ty.Any) -> HTTPResponse: |
| 444 | + """ |
| 445 | + Retryable AsyncHTTPClient.fetch() method. When the request fails, this method will |
| 446 | + recurse up to max_retries times if the condition deserves a retry. |
| 447 | + """ |
| 448 | + self.retry_count = 0 |
| 449 | + return await self._fetch(endpoint, **kwargs) |
| 450 | + |
| 451 | + async def _fetch(self, endpoint: str, **kwargs: ty.Any) -> HTTPResponse: |
| 452 | + """ |
| 453 | + Performs the fetch against the contained AsyncHTTPClient instance and determines |
| 454 | + if retry is necessary on any exceptions. If so, retry is performed recursively. |
| 455 | + """ |
| 456 | + try: |
| 457 | + response: HTTPResponse = await self.client.fetch(endpoint, **kwargs) |
| 458 | + except Exception as e: |
| 459 | + is_retryable: bool = await self._is_retryable(kwargs["method"], e) |
| 460 | + if not is_retryable: |
| 461 | + raise e |
| 462 | + logging.getLogger("ServerApp").info( |
| 463 | + f"Attempting retry ({self.retry_count}) against " |
| 464 | + f"endpoint '{endpoint}'. Retried error: '{repr(e)}'" |
| 465 | + ) |
| 466 | + response = await self._fetch(endpoint, **kwargs) |
| 467 | + return response |
| 468 | + |
| 469 | + async def _is_retryable(self, method: str, exception: Exception) -> bool: |
| 470 | + """Determines if the given exception is retryable based on object's configuration.""" |
| 471 | + |
| 472 | + if method not in self.retried_methods: |
| 473 | + return False |
| 474 | + if self.retry_count == self.max_retries: |
| 475 | + return False |
| 476 | + |
| 477 | + # Determine if error is retryable... |
| 478 | + if isinstance(exception, HTTPClientError): |
| 479 | + hce: HTTPClientError = exception |
| 480 | + if hce.code not in self.retried_errors: |
| 481 | + return False |
| 482 | + elif not any(isinstance(exception, error) for error in self.retried_exceptions): |
| 483 | + return False |
| 484 | + |
| 485 | + # Is retryable, wait for backoff, then increment count |
| 486 | + await asyncio.sleep(self.backoff_factor * (2**self.retry_count)) |
| 487 | + self.retry_count += 1 |
| 488 | + return True |
| 489 | + |
| 490 | + |
| 491 | +async def gateway_request(endpoint: str, **kwargs: ty.Any) -> HTTPResponse: |
421 | 492 | """Make an async request to kernel gateway endpoint, returns a response"""
|
422 |
| - client = AsyncHTTPClient() |
423 | 493 | kwargs = GatewayClient.instance().load_connection_args(**kwargs)
|
| 494 | + rhc = RetryableHTTPClient() |
424 | 495 | try:
|
425 |
| - response = await client.fetch(endpoint, **kwargs) |
| 496 | + response = await rhc.fetch(endpoint, **kwargs) |
426 | 497 | # Trap a set of common exceptions so that we can inform the user that their Gateway url is incorrect
|
427 | 498 | # or the server is not running.
|
428 |
| - # NOTE: We do this here since this handler is called during the Notebook's startup and subsequent refreshes |
| 499 | + # NOTE: We do this here since this handler is called during the server's startup and subsequent refreshes |
429 | 500 | # of the tree view.
|
430 |
| - except ConnectionRefusedError as e: |
| 501 | + except HTTPClientError as e: |
431 | 502 | raise web.HTTPError(
|
432 |
| - 503, |
433 |
| - "Connection refused from Gateway server url '{}'. " |
434 |
| - "Check to be sure the Gateway instance is running.".format( |
435 |
| - GatewayClient.instance().url |
436 |
| - ), |
| 503 | + e.code, |
| 504 | + f"Error attempting to connect to Gateway server url '{GatewayClient.instance().url}'. " |
| 505 | + "Ensure gateway url is valid and the Gateway instance is running.", |
437 | 506 | ) from e
|
438 |
| - except HTTPError as e: |
439 |
| - # This can occur if the host is valid (e.g., foo.com) but there's nothing there. |
| 507 | + except ConnectionError as e: |
440 | 508 | raise web.HTTPError(
|
441 |
| - e.code, |
442 |
| - "Error attempting to connect to Gateway server url '{}'. " |
443 |
| - "Ensure gateway url is valid and the Gateway instance is running.".format( |
444 |
| - GatewayClient.instance().url |
445 |
| - ), |
| 509 | + 503, |
| 510 | + f"ConnectionError was received from Gateway server url '{GatewayClient.instance().url}'. " |
| 511 | + "Check to be sure the Gateway instance is running.", |
446 | 512 | ) from e
|
447 | 513 | except gaierror as e:
|
448 | 514 | raise web.HTTPError(
|
449 | 515 | 404,
|
450 |
| - "The Gateway server specified in the gateway_url '{}' doesn't appear to be valid. " |
451 |
| - "Ensure gateway url is valid and the Gateway instance is running.".format( |
452 |
| - GatewayClient.instance().url |
453 |
| - ), |
| 516 | + f"The Gateway server specified in the gateway_url '{GatewayClient.instance().url}' doesn't " |
| 517 | + f"appear to be valid. Ensure gateway url is valid and the Gateway instance is running.", |
454 | 518 | ) from e
|
455 | 519 |
|
456 | 520 | return response
|
0 commit comments