|
24 | 24 | from datetime import datetime, timezone
|
25 | 25 | from pathlib import Path
|
26 | 26 | from subprocess import check_call, check_output
|
| 27 | +from urllib.parse import parse_qs, urlparse |
27 | 28 |
|
28 | 29 | import pytest
|
29 | 30 | import requests
|
@@ -122,9 +123,62 @@ def _delete_airflow_pod(name=""):
|
122 | 123 | if names:
|
123 | 124 | check_call(["kubectl", "delete", "pod", names[0]])
|
124 | 125 |
|
| 126 | + @staticmethod |
| 127 | + def _get_jwt_token(session: requests.Session, username: str, password: str) -> str: |
| 128 | + """Get the JWT token for the given username and password. |
| 129 | +
|
| 130 | + Note: API server is still using FAB Auth Manager. |
| 131 | +
|
| 132 | + Steps: |
| 133 | + 1. Get the login page to get the csrf token |
| 134 | + - The csrf token is in the hidden input field with id "csrf_token" |
| 135 | + 2. Login with the username and password |
| 136 | + - Must use the same session to keep the csrf token session |
| 137 | + 3. Extract the JWT token from the redirect url |
| 138 | + - Expected to have a connection error |
| 139 | + - The redirect url should have the JWT token as a query parameter |
| 140 | +
|
| 141 | + :param session: The session to use for the request |
| 142 | + :param username: The username to use for the login |
| 143 | + :param password: The password to use for the login |
| 144 | + :return: The JWT token |
| 145 | + """ |
| 146 | + # get csrf token from login page |
| 147 | + get_login_form_response = session.get(f"http://{KUBERNETES_HOST_PORT}/auth/login") |
| 148 | + # input id="csrf_token" |
| 149 | + csrf_token = re.search( |
| 150 | + r'<input id="csrf_token" name="csrf_token" type="hidden" value="(.+?)">', |
| 151 | + get_login_form_response.text, |
| 152 | + ) |
| 153 | + assert csrf_token, "Failed to get csrf token from login page" |
| 154 | + csrf_token_str = csrf_token.group(1) |
| 155 | + assert csrf_token_str, "Failed to get csrf token from login page" |
| 156 | + try: |
| 157 | + # login with form data |
| 158 | + session.post( |
| 159 | + f"http://{KUBERNETES_HOST_PORT}/auth/login", |
| 160 | + data={"username": username, "password": password, "csrf_token": csrf_token_str}, |
| 161 | + ) |
| 162 | + except requests.exceptions.ConnectionError as e: |
| 163 | + # expected to have a connection error |
| 164 | + # currently, the login page redirects to http://localhost:8080/?token=... with status code 308 |
| 165 | + # but the KUBERNETES_HOST_PORT is *not* localhost:8080 |
| 166 | + # TODO: remove this try/except block when the redirect url is fixed |
| 167 | + redirect_url = e.request.url if e.request else None |
| 168 | + # ensure redirect_url is a string |
| 169 | + redirect_url_str = str(redirect_url) if redirect_url is not None else "" |
| 170 | + assert "/?token" in redirect_url_str, f"Login failed with redirect url {redirect_url_str}" |
| 171 | + parsed_url = urlparse(redirect_url_str) |
| 172 | + query_params = parse_qs(str(parsed_url.query)) |
| 173 | + jwt_token_list = query_params.get("token") |
| 174 | + jwt_token = jwt_token_list[0] if jwt_token_list else None |
| 175 | + assert jwt_token, f"Failed to get JWT token from redirect url {redirect_url_str}" |
| 176 | + return jwt_token |
| 177 | + |
125 | 178 | def _get_session_with_retries(self):
|
126 | 179 | session = requests.Session()
|
127 |
| - session.auth = ("admin", "admin") |
| 180 | + jwt_token = self._get_jwt_token(session, "admin", "admin") |
| 181 | + session.headers.update({"Authorization": f"Bearer {jwt_token}"}) |
128 | 182 | retries = Retry(
|
129 | 183 | total=3,
|
130 | 184 | backoff_factor=10,
|
|
0 commit comments