1
- from pytz import utc
2
1
import asyncio
3
2
from typing import Dict , List
4
3
from urllib .parse import urlencode
5
- from datetime import datetime , timedelta
6
- import jwt
7
4
from pydantic import BaseModel
8
5
9
6
from fastapi import APIRouter , Request , HTTPException , Response , status , Query
10
7
from fastapi .responses import RedirectResponse
11
8
12
9
13
- from cat .auth .utils import AuthPermission , AuthResource , get_full_permissions , check_password
14
- from cat .db .crud import get_users
15
- from cat .env import get_env
10
+ from cat .auth .utils import AuthPermission , AuthResource , get_full_permissions
16
11
from cat .routes .static .templates import get_jinja_templates
17
12
18
-
19
13
router = APIRouter ()
20
14
21
15
class UserCredentials (BaseModel ):
@@ -34,7 +28,8 @@ async def core_login_token(request: Request, response: Response):
34
28
form_data = await request .form ()
35
29
36
30
# use username and password to authenticate user from local identity provider and get token
37
- access_token = await authenticate_local_user (
31
+ auth_handler = request .app .state .ccat .core_auth_handler
32
+ access_token = await auth_handler .issue_jwt (
38
33
form_data ["username" ], form_data ["password" ]
39
34
)
40
35
@@ -86,13 +81,14 @@ async def get_available_permissions():
86
81
return get_full_permissions ()
87
82
88
83
@router .post ("/token" , response_model = JWTResponse )
89
- async def auth_token (credentials : UserCredentials ):
84
+ async def auth_token (request : Request , credentials : UserCredentials ):
90
85
"""Endpoint called from client to get a JWT from local identity provider.
91
86
This endpoint receives username and password as form-data, validates credentials and issues a JWT.
92
87
"""
93
88
94
89
# use username and password to authenticate user from local identity provider and get token
95
- access_token = await authenticate_local_user (
90
+ auth_handler = request .app .state .ccat .core_auth_handler
91
+ access_token = await auth_handler .issue_jwt (
96
92
credentials .username , credentials .password
97
93
)
98
94
@@ -103,33 +99,3 @@ async def auth_token(credentials: UserCredentials):
103
99
# wait a little to avoid brute force attacks
104
100
await asyncio .sleep (1 )
105
101
raise HTTPException (status_code = 403 , detail = {"error" : "Invalid Credentials" })
106
-
107
-
108
- async def authenticate_local_user (username : str , password : str ) -> str | None :
109
- # authenticate local user credentials and return a JWT token
110
-
111
- # brutal search over users, which are stored in a simple dictionary.
112
- # waiting to have graph in core to store them properly
113
- # TODOAUTH: get rid of this shameful loop
114
- users = get_users ()
115
- for user_id , user in users .items ():
116
- if user ["username" ] == username and check_password (password , user ["password" ]):
117
- # TODOAUTH: expiration with timezone needs to be tested
118
- # using seconds for easier testing
119
- expire_delta_in_seconds = float (get_env ("CCAT_JWT_EXPIRE_MINUTES" )) * 60
120
- expires = datetime .now (utc ) + timedelta (seconds = expire_delta_in_seconds )
121
- # TODOAUTH: add issuer and redirect_uri (and verify them when a token is validated)
122
- # TODOAUTH: move this method in auth.utils
123
-
124
- jwt_content = {
125
- "sub" : user_id , # Subject (the user ID)
126
- "username" : username , # Username
127
- "permissions" : user ["permissions" ], # User permissions
128
- "exp" : expires # Expiry date as a Unix timestamp
129
- }
130
- return jwt .encode (
131
- jwt_content ,
132
- get_env ("CCAT_JWT_SECRET" ),
133
- algorithm = get_env ("CCAT_JWT_ALGORITHM" ),
134
- )
135
- return None
0 commit comments