Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor scopes methods #580

Merged
merged 7 commits into from
Feb 26, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,8 @@
# 2025-02-25 (6.5.2)

* Fixed bug where grantees were not found for Scope objects. This was handled by a fallback
in previous versions.

# 2025-02-17 (6.5.1)

* Bugfix for interoperability with DSO-API. The RuntimeErrors resulting from missing loaders
Expand Down
2 changes: 1 addition & 1 deletion setup.cfg
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[metadata]
name = amsterdam-schema-tools
version = 6.5.1
version = 6.5.2
url = https://github.com/amsterdam/schema-tools
license = Mozilla Public 2.0
author = Team Data Diensten, van het Dataplatform onder de Directie Digitale Voorzieningen (Gemeente Amsterdam)
Expand Down
2 changes: 1 addition & 1 deletion src/schematools/permissions/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def get_all_dataset_scopes(
def _fetch_grantees(scopes: frozenset[str] | frozenset[Scope]) -> list[str]:
if role == "AUTO":
return [scope_to_role(_scope) for _scope in scopes]
elif scope in scopes:
elif any(s.id == scope if isinstance(s, Scope) else s == scope for s in scopes):
return [role]
else:
return []
Expand Down
75 changes: 30 additions & 45 deletions src/schematools/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -437,19 +437,16 @@ def _find_scope_by_id(self, id):

@cached_property
def scopes(self) -> frozenset[Scope]:
try:
scopes = self._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self._find_scope_by_id(s) for s in scopes]
)
return frozenset({self._find_scope_by_id(_PUBLIC_SCOPE)})
except ScopeNotFound:
return self.auth
scopes = self._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self._find_scope_by_id(s) for s in scopes]
)
return frozenset({self._find_scope_by_id(_PUBLIC_SCOPE)})

@cached_property
def auth(self) -> frozenset[str]:
Expand Down Expand Up @@ -1181,22 +1178,16 @@ def schema(self) -> DatasetSchema:

@cached_property
def scopes(self) -> frozenset[Scope]:
try:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[
s if isinstance(s, Scope) else self.schema._find_scope_by_id(s)
for s in scopes
]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
except ScopeNotFound:
return self.auth
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self.schema._find_scope_by_id(s) for s in scopes]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})

@cached_property
def auth(self) -> frozenset[str]:
Expand Down Expand Up @@ -1892,22 +1883,16 @@ def through_table(self) -> DatasetTableSchema | None:

@cached_property
def scopes(self) -> frozenset[Scope]:
try:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[
s if isinstance(s, Scope) else self.schema._find_scope_by_id(s)
for s in scopes
]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
except ScopeNotFound:
return self.auth
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self.schema._find_scope_by_id(s) for s in scopes]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})

@cached_property
def auth(self) -> frozenset[str]:
Expand Down
9 changes: 8 additions & 1 deletion tests/django/fixtures.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,14 @@

from schematools.contrib.django.factories import remove_dynamic_models
from schematools.contrib.django.models import Dataset, Profile
from schematools.types import DatasetSchema, ProfileSchema
from schematools.types import DatasetSchema, ProfileSchema, Scope


# In test files we use a lot of non-existent scopes, so instead of writing scope
# json files we monkeypatch this method.
@pytest.fixture(autouse=True)
def patch_find_scope_by_id(monkeypatch):
monkeypatch.setattr(DatasetSchema, "_find_scope_by_id", Scope.from_string)


@pytest.fixture(autouse=True)
Expand Down
8 changes: 8 additions & 0 deletions tests/test_permissions.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,14 @@

from schematools.importer.ndjson import NDJSONImporter
from schematools.permissions.db import apply_schema_and_profile_permissions
from schematools.types import DatasetSchema, Scope


# In test files we use a lot of non-existent scopes, so instead of writing scope
# json files we monkeypatch this method.
@pytest.fixture(autouse=True)
def patch_find_scope_by_id(monkeypatch):
monkeypatch.setattr(DatasetSchema, "_find_scope_by_id", Scope.from_string)


class TestReadPermissions:
Expand Down
15 changes: 14 additions & 1 deletion tests/test_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

import pytest

from schematools.exceptions import SchemaObjectNotFound
from schematools.exceptions import SchemaObjectNotFound, ScopeNotFound
from schematools.types import (
DatasetSchema,
DatasetTableSchema,
Expand Down Expand Up @@ -389,6 +389,19 @@ def test_scope_db_python_names():
assert scope_a.python_name == "scope_a"


def test_find_scope_by_id_is_happy(schema_loader):
schema = schema_loader.get_dataset_from_file("metaschema2.json")

assert schema._find_scope_by_id(HARRY_ONE_SCOPE.id) == HARRY_ONE_SCOPE


@pytest.mark.xfail(raises=ScopeNotFound)
def test_find_scope_by_id_fails_gracefully(schema_loader):
schema = schema_loader.get_dataset_from_file("metaschema2.json")

schema._find_scope_by_id("SOME_RANDOM_SCOPE_ID_THAT_DOESNT_EXIST")


def test_loading_scopes_from_dataset(schema_loader):
schema = schema_loader.get_dataset_from_file("metaschema2.json")

Expand Down