Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bugfix scopes property for dso #579

Merged
merged 12 commits into from
Feb 19, 2025
Next Next commit
Ensure we are not leaking errors on the scopes property
- wrap loader in own @Property with centralized error handling
- catch errors inside scopes @Property
  • Loading branch information
pstokkink committed Feb 17, 2025

Verified

This commit was signed with the committer’s verified signature.
commit 75e5104d604d1fa55374ec50775cce63b2ea010a
126 changes: 65 additions & 61 deletions src/schematools/types.py
Original file line number Diff line number Diff line change
@@ -309,13 +309,19 @@ def __init__(

super().__init__(data)

self.loader = dataset_collection
self._loader = dataset_collection
if dataset_collection is not None:
dataset_collection.add_dataset(self) # done early for self-references

def __repr__(self) -> str:
return f"<{self.__class__.__name__}: {self['id']}>"

@property
def loader(self) -> CachedSchemaLoader:
if self._loader is None:
raise RuntimeError(f"{self!r} has no loader defined, can't resolve requested object.")
return self._loader

@classmethod
def from_dict(
cls, obj: dict[str, Any], dataset_collection: CachedSchemaLoader | None = None
@@ -339,7 +345,9 @@ def json(
if inline_publishers and self.publisher is not None:
data["publisher"] = self.publisher.json_data()
if inline_scopes:
data["auth"] = [s.json_data() if isinstance(s, Scope) else s for s in self.scopes]
scopes = self.scopes
if scopes:
data["auth"] = [s.json_data() for s in self.scopes]
return json.dumps(data)

def json_data(
@@ -410,8 +418,6 @@ def _resolve_scope(self, auth):
"""
if isinstance(auth, dict):
if "$ref" in auth:
if self.loader is None:
raise RuntimeError(f"{self!r} has no loader defined, can't resolve auth.")
return self.loader.get_scope(auth["$ref"])
return Scope(auth)
elif isinstance(auth, list):
@@ -422,23 +428,26 @@ def _resolve_scope(self, auth):
def _find_scope_by_id(self, id):
all_scopes = self.loader.get_all_scopes()
name = id.replace("/", "_").lower()
# TODO: return the string value if the scope is not in the selection for now,
# this should not be necessary with live data.
# Tests rely on scope strings that do not exist in reality.
return all_scopes.get(name, id)

@cached_property
def scopes(self) -> frozenset[Scope] | frozenset[str]:
scopes = self._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self._find_scope_by_id(s) for s in scopes]
)
return frozenset({self._find_scope_by_id(_PUBLIC_SCOPE)})
scope = all_scopes.get(name)
if not scope:
raise RuntimeError(f"Scope {id} doesn't exist")
return scope

@cached_property
def scopes(self) -> frozenset[Scope] | None:
try:
scopes = self._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self._find_scope_by_id(s) for s in scopes]
)
return frozenset({self._find_scope_by_id(_PUBLIC_SCOPE)})
except RuntimeError:
return None

@cached_property
def auth(self) -> frozenset[str]:
@@ -460,16 +469,6 @@ def _get_dataset_schema(self, dataset_id: str) -> DatasetSchema:
if dataset_id == self.id:
return self # shortcut to avoid unneeded lookups

if self.loader is None:
# Ideally the dataset collection should be mandatory at construction,
# but this breaks compatibility. Adding a default collection is also tricky,
# as that introduces an implicit cache that would affect unit testing code.
# The best option is simply to have an error when the loader would be called.
raise RuntimeError(
f"{self!r} has no dataset collection defined,"
f" can't resolve relation to '{dataset_id}'."
) from None

# It's assumed here that the loader is a CachedSchemaLoader,
# so data can be fetched multiple times.
return self.loader.get_dataset(dataset_id)
@@ -482,8 +481,6 @@ def tables(self) -> list[DatasetTableSchema]:
if "$ref" in table_json:
# Dataset uses the new format to define table versions.
# Load the default version
if self.loader is None:
raise RuntimeError(f"{self!r} has no loader defined, can't resolve tables.")
table = self.loader.get_table(self, table_json["$ref"])
else:
# Old format, a single "dataset.json" with all tables embedded.
@@ -513,26 +510,21 @@ def publisher(self) -> Publisher | None:
raw_publisher = self["publisher"]
if isinstance(raw_publisher, str):
# Compatibility with meta-schemas prior to 2.0
if self.loader is None:
return None
publishers = self.loader.get_all_publishers()
try:
publishers = self.loader.get_all_publishers()
return [
publisher
for publisher in publishers.values()
if publisher["name"] == raw_publisher
][0]
except IndexError:
except (IndexError, RuntimeError):
return None

# From metaschema 2.0 it is an object { "$ref": "/publishers/ID" }.
# First check if we already replaced the publisher in the schema.
if "$ref" not in raw_publisher:
return Publisher.from_dict(raw_publisher)

if self.loader is None:
raise RuntimeError(f"{self!r} has no loader defined, can't resolve publisher.")

publisher_id = raw_publisher["$ref"].split("/")[-1]
return self.loader.get_publisher(publisher_id)

@@ -1159,17 +1151,23 @@ def schema(self) -> DatasetSchema:
return self._parent_schema

@cached_property
def scopes(self) -> frozenset[Scope]:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self.schema._find_scope_by_id(s) for s in scopes]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
def scopes(self) -> frozenset[Scope] | None:
try:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[
s if isinstance(s, Scope) else self.schema._find_scope_by_id(s)
for s in scopes
]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
except RuntimeError:
return None

@cached_property
def auth(self) -> frozenset[str]:
@@ -1865,16 +1863,22 @@ def through_table(self) -> DatasetTableSchema | None:

@cached_property
def scopes(self) -> frozenset[Scope]:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[s if isinstance(s, Scope) else self.schema._find_scope_by_id(s) for s in scopes]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
try:
scopes = self.schema._resolve_scope(self.get("auth"))
if isinstance(scopes, Scope):
return frozenset({scopes})
elif isinstance(scopes, str):
return frozenset({self.schema._find_scope_by_id(scopes)})
elif isinstance(scopes, list):
return frozenset(
[
s if isinstance(s, Scope) else self.schema._find_scope_by_id(s)
for s in scopes
]
)
return frozenset({self.schema._find_scope_by_id(_PUBLIC_SCOPE)})
except RuntimeError:
return None

@cached_property
def auth(self) -> frozenset[str]: