|
1 | 1 | # SPDX-License-Identifier: MPL-2.0
|
2 |
| -# Copyright (C) 2018 - 2023 Gemeente Amsterdam |
| 2 | +# Copyright (C) 2018 - 2025 Gemeente Amsterdam |
| 3 | +from typing import override |
| 4 | + |
3 | 5 | from django.conf import settings
|
4 | 6 | from django.contrib.auth.models import User
|
5 |
| -from django.core.cache import cache |
6 |
| -from rest_framework import exceptions |
7 |
| -from rest_framework.authentication import BaseAuthentication |
| 7 | +from mozilla_django_oidc.contrib.drf import OIDCAuthentication |
| 8 | +from rest_framework.exceptions import AuthenticationFailed |
8 | 9 | from rest_framework.request import Request
|
9 | 10 |
|
10 |
| -from .tokens import JWTAccessToken |
11 |
| - |
12 |
| -USER_NOT_AUTHORIZED = "User {} is not authorized" |
13 |
| -USER_DOES_NOT_EXIST = -1 |
14 |
| - |
15 | 11 |
|
16 |
| -class JWTAuthBackend(BaseAuthentication): |
17 |
| - """ |
18 |
| - Retrieve user from backend and cache the result |
19 |
| - """ |
20 |
| - @staticmethod |
21 |
| - def get_user(user_id: int) -> User: |
22 |
| - # Now we know we have a Amsterdam municipal employee (may or may not be allowed acceess) |
23 |
| - # or external user with access to the `signals` application, we retrieve the Django user. |
24 |
| - user = cache.get(user_id) |
25 |
| - |
26 |
| - if user == USER_DOES_NOT_EXIST: |
27 |
| - raise exceptions.AuthenticationFailed(USER_NOT_AUTHORIZED.format(user_id)) |
28 |
| - |
29 |
| - # We hit the database max once per 5 minutes, and then cache the results. |
30 |
| - if user is None: # i.e. cache miss |
31 |
| - try: |
32 |
| - user = User.objects.get(username__iexact=user_id) # insensitive match fixes log-in bug |
33 |
| - except User.DoesNotExist: |
34 |
| - cache.set(user_id, USER_DOES_NOT_EXIST, 5 * 60) |
35 |
| - raise exceptions.AuthenticationFailed(USER_NOT_AUTHORIZED.format(user_id)) |
36 |
| - else: |
37 |
| - cache.set(user_id, user, 5 * 60) |
38 |
| - |
39 |
| - if not user.is_active: |
40 |
| - raise exceptions.AuthenticationFailed('User inactive') |
41 |
| - return user |
| 12 | +class JWTAuthBackend(OIDCAuthentication): |
| 13 | + www_authenticate_realm = "signals" |
42 | 14 |
|
| 15 | + @override |
43 | 16 | def authenticate(self, request: Request) -> tuple[User, str]:
|
44 |
| - """ |
45 |
| - Authenticate. Check if required scope is present and get user_email from JWT token. |
46 |
| - use ALWAYS_OK = True to skip token verification. Useful for local dev/testing |
47 |
| - """ |
48 |
| - auth_header = request.META.get('HTTP_AUTHORIZATION') |
49 |
| - _, user_id = JWTAccessToken.token_data(auth_header) |
50 |
| - if user_id == "ALWAYS_OK": |
51 |
| - user_id = settings.TEST_LOGIN |
52 |
| - |
53 |
| - auth_user = JWTAuthBackend.get_user(user_id) |
| 17 | + if settings.SIGNALS_AUTH.get("ALWAYS_OK", False): |
| 18 | + try: |
| 19 | + user = User.objects.get(username__iexact=settings.TEST_LOGIN) |
| 20 | + except User.DoesNotExist as e: |
| 21 | + raise AuthenticationFailed("User not found") from e |
54 | 22 |
|
55 |
| - return auth_user, '' |
| 23 | + return user, "" |
56 | 24 |
|
57 |
| - def authenticate_header(self, request: Request) -> str: |
58 |
| - """ |
59 |
| - Return a string to be used as the value of the `WWW-Authenticate` |
60 |
| - header in a `401 Unauthenticated` response, or `None` if the |
61 |
| - authentication scheme should return `403 Permission Denied` responses. |
62 |
| - """ |
63 |
| - return 'Bearer realm="signals"' |
| 25 | + return super().authenticate(request) |
0 commit comments