Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for Croissant RecordSet objects #341

Merged
merged 15 commits into from
Feb 25, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
83 changes: 77 additions & 6 deletions ckanext/dcat/profiles/croissant.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
from dateutil.parser import parse as parse_date
from rdflib import URIRef, BNode, Literal
from rdflib.namespace import Namespace
from ckantoolkit import url_for, config, asbool
from ckantoolkit import url_for, config, asbool, get_action

from ckanext.dcat.utils import resource_uri
from .base import RDFProfile, CleanedURIRef
Expand Down Expand Up @@ -60,6 +60,20 @@
}


CROISSANT_FIELD_TYPES = {
"text": SCHEMA.Text,
"int": SCHEMA.Integer,
"int4": SCHEMA.Integer,
"int8": SCHEMA.Integer,
"float": SCHEMA.Float,
"float4": SCHEMA.Float,
"float8": SCHEMA.Float,
"numeric": SCHEMA.Float,
"double precision": SCHEMA.Float,
"timestamp": SCHEMA.Date,
}


class CroissantProfile(RDFProfile):
"""
An RDF profile based on the schema.org Dataset, modified by Croissant.
Expand Down Expand Up @@ -377,6 +391,9 @@ def _resource_graph(
# Subresources
self._resource_subresources_graph(dataset_ref, resource_ref, resource_dict)

# RecordSet
self._recordset_graph(dataset_ref, resource_ref, resource_dict)

def _resource_basic_fields_graph(
self, resource_ref, resource_dict, is_subresource=False
):
Expand Down Expand Up @@ -422,18 +439,19 @@ def _resource_list_fields_graph(self, resource_ref, resource_dict):
self._add_list_triples_from_dict(resource_dict, resource_ref, items)

def _resource_format_graph(self, resource_ref, resource_dict):
if resource_dict.get("format"):
self.g.add(
(resource_ref, SCHEMA.encodingFormat, Literal(resource_dict["format"]))
)
elif resource_dict.get("mimetype"):
if resource_dict.get("mimetype"):
self.g.add(
(
resource_ref,
SCHEMA.encodingFormat,
Literal(resource_dict["mimetype"]),
)
)
elif resource_dict.get("format"):
self.g.add(
(resource_ref, SCHEMA.encodingFormat, Literal(resource_dict["format"]))
)


def _resource_url_graph(self, resource_ref, resource_dict):
if (resource_dict.get("type") == "fileObject") and resource_dict.get("url"):
Expand Down Expand Up @@ -486,3 +504,56 @@ def _resource_subresources_graph(self, dataset_ref, resource_ref, resource_dict)
self._resource_graph(
dataset_ref, subresource_ref, subresource_dict, is_subresource=True
)

def _recordset_graph(self, dataset_ref, resource_ref, resource_dict):

# Skip if data not in the DataStore
if not resource_dict.get("id") or not asbool(resource_dict.get("datastore_active")):
return

# Get fields info
try:
datastore_info = get_action("datastore_info")(
{"ignore_auth": True},
{"id": resource_dict["id"]}
)
except KeyError:
# DataStore not enabled
return

if not datastore_info or not datastore_info.get("fields"):
return

recordset_ref = URIRef(f"{resource_dict['id']}/records")

self.g.add((recordset_ref, RDF.type, CR.RecordSet))

unique_fields = []

for field in datastore_info["fields"]:

field_ref = URIRef(f"{resource_dict['id']}/records/{field['id']}")

self.g.add((recordset_ref, CR.field, field_ref))
self.g.add((field_ref, RDF.type, CR.Field))
if field_type := CROISSANT_FIELD_TYPES.get(field["type"]):
self.g.add((field_ref, CR.dataType, field_type))

source_ref = BNode()

self.g.add((field_ref, CR.source, source_ref))
self.g.add((source_ref, CR.fileObject, resource_ref))

extract_ref = BNode()

self.g.add((source_ref, CR.extract, extract_ref))
self.g.add((extract_ref, CR.column, Literal(field['id'])))

if field["schema"]["is_index"]:
unique_fields.append(field_ref)

if unique_fields:
for unique_field_ref in unique_fields:
self.g.add((recordset_ref, CR.key, unique_field_ref))

self.g.add((dataset_ref, CR.recordSet, recordset_ref))
76 changes: 75 additions & 1 deletion ckanext/dcat/tests/profiles/croissant/test_serialize.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from builtins import str
import json
from unittest import mock
import uuid

import pytest

Expand All @@ -14,7 +16,7 @@
from ckanext.dcat import utils
from ckanext.dcat.profiles import XSD, DCT, FOAF
from ckanext.dcat.processors import RDFSerializer
from ckanext.dcat.profiles.croissant import SCHEMA, CR
from ckanext.dcat.profiles.croissant import SCHEMA, CR, CROISSANT_FIELD_TYPES

from ckanext.dcat.tests.profiles.dcat_ap.test_euro_dcatap_profile_serialize import (
BaseSerializeTest,
Expand Down Expand Up @@ -262,6 +264,78 @@ def test_graph_from_dataset(self):
sub_resource_file_set_dict["excludes"],
)

def test_graph_from_dataset_with_recordset(self):

dataset_id = str(uuid.uuid4())
resource_id = str(uuid.uuid4())

dataset_dict = {
"id": dataset_id,
"name": "test-dataset",
"title": "Test Dataset",
"notes": "Test description",
"resources": [
{
"id": resource_id,
"url": "http://example.com/data.csv",
"format": "CSV",
"datastore_active": True,
}
],
}
fields_datastore = [
{"id": "name", "type": "text", "schema": {"is_index": True}},
{"id": "age", "type": "int", "schema": {"is_index": False}},
{"id": "temperature", "type": "float", "schema": {"is_index": False}},
{"id": "timestamp", "type": "timestamp", "schema": {"is_index": False}},
]

def mock_datastore_info(context, data_dict):
return {
"meta": {"id": resource_id, "count": 10, "table_type": "BASE TABLE"},
"fields": fields_datastore,
}

with mock.patch(
"ckanext.dcat.profiles.croissant.get_action"
) as mock_get_action:
mock_get_action.return_value = mock_datastore_info

s = RDFSerializer(profiles=["croissant"])
g = s.g

dataset_ref = s.graph_from_dataset(dataset_dict)
resource_ref = list(g.objects(dataset_ref, SCHEMA.distribution))[0]

recordset_ref = URIRef(f"{resource_id}/records")
assert self._triple(g, dataset_ref, CR.recordSet, recordset_ref)
assert self._triple(g, recordset_ref, RDF.type, CR.RecordSet)

# Test fields
fields = list(g.objects(recordset_ref, CR.field))
assert len(fields) == 4

for field_datastore in fields_datastore:
field_ref = URIRef(f"{resource_id}/records/{field_datastore['id']}")

assert self._triple(g, recordset_ref, CR.field, field_ref)

assert self._triple(
g, field_ref, CR.dataType, CROISSANT_FIELD_TYPES.get(field_datastore["type"])
)

source_ref = list(g.objects(field_ref, CR.source))[0]

assert self._triple(g, source_ref, CR.fileObject, resource_ref)

extract_ref = list(g.objects(source_ref, CR.extract))[0]

assert self._triple(g, extract_ref, CR.column, field_datastore["id"])

assert self._triple(
g, recordset_ref, CR.key, URIRef(f"{resource_id}/records/name")
)

@pytest.mark.usefixtures("with_plugins", "clean_db")
def test_graph_from_dataset_org_fallback(self):

Expand Down
37 changes: 36 additions & 1 deletion ckanext/dcat/tests/profiles/croissant/test_validate.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import json
import sys
from unittest import mock

try:
import mlcroissant as mlc
Expand All @@ -15,7 +16,7 @@


@pytest.mark.skipif(
sys.version_info < (3, 10), reason="croissant is not available in py<3.10"
sys.version_info < (3, 10), reason="mlcroissant is not available in py<3.10"
)
def test_valid_output():

Expand All @@ -29,3 +30,37 @@ def test_valid_output():
mlc.Dataset(croissant_dict)
except mlc.ValidationError as exception:
raise


@pytest.mark.skipif(
sys.version_info < (3, 10), reason="mlcroissant is not available in py<3.10"
)
def test_valid_output_with_recordset():

dataset_dict = json.loads(
get_file_contents("ckan/ckan_full_dataset_croissant.json")
)

resource_id = dataset_dict["resources"][0]["id"]

def mock_datastore_info(context, data_dict):
return {
"meta": {"id": resource_id, "count": 10, "table_type": "BASE TABLE"},
"fields": [
{"id": "name", "type": "text", "schema": {"is_index": True}},
{"id": "age", "type": "int", "schema": {"is_index": False}},
{"id": "temperature", "type": "float", "schema": {"is_index": False}},
{"id": "timestamp", "type": "timestamp", "schema": {"is_index": False}},
],
}

with mock.patch("ckanext.dcat.profiles.croissant.get_action") as mock_get_action:
mock_get_action.return_value = mock_datastore_info


croissant_dict = json.loads(croissant(dataset_dict))

try:
mlc.Dataset(croissant_dict)
except mlc.ValidationError as exception:
raise
84 changes: 83 additions & 1 deletion docs/croissant.md
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,15 @@ Once the plugin is enabled, the Croissant output will be embedded in the source

https://{ckan-instance-host}/dataset/{dataset-id}/croissant.jsonld

## Schema mapping

The extension includes a [schema](getting-started.md#schemas) ([`ckanext/dcat/schemas/croissant.yaml`](https://github.com/ckan/ckanext-dcat/tree/master/ckanext/dcat/schemas/croissant.yml)) for sites that want to take advantage of all the entities and properties of the Croissant spec.

This maps CKAN's datasets to [schema.org Datasets](https://docs.mlcommons.org/croissant/docs/croissant-spec.html#dataset-level-information) and resources to [Croissant resources](https://docs.mlcommons.org/croissant/docs/croissant-spec.html#resources), which can have type `FileObject` or `FileSet`. For `FileSet` resources, use the "Sub-resources" repeating subfield to describe the contents of the file set.

Additionally, for resources that have been imported to the CKAN [DataStore](https://docs.ckan.org/en/latest/maintaining/datastore.html), the resource will also expose Croissant's [RecordSet](https://docs.mlcommons.org/croissant/docs/croissant-spec.html#recordset) objects with information about the data fields (e.g. column names and types).


## Customizing

If you want to modify the Croissant output you can [write your own profile](writing-profiles.md) extending the builtin `ckanext.dcat.profiles.croissant.CroissantProfile` class and register it.
Expand All @@ -36,7 +43,20 @@ ckanext.dcat.croissant.profiles = my_custom_croissant_profile
## Examples

* The [`examples/ckan/ckan_full_dataset_croissant.json`](https://github.com/ckan/ckanext-dcat/tree/master/examples/ckan/ckan_full_dataset_croissant.json) file contains a full CKAN dataset dict that implements the custom Croissant schema.
* Below is the Croissant serialization resulting from the dataset above:
* Below is the Croissant serialization resulting from the dataset above, and assuming the resource has a DataStore tableassociated with the following structure:

```json
{
"fields": [
{"id": "name", "type": "text", "schema": {"is_index": True}},
{"id": "age", "type": "int", "schema": {"is_index": False}},
{"id": "temperature", "type": "float", "schema": {"is_index": False}},
{"id": "timestamp", "type": "timestamp", "schema": {"is_index": False}},
]
}

```


```json
{
Expand Down Expand Up @@ -172,6 +192,68 @@ ckanext.dcat.croissant.profiles = my_custom_croissant_profile
"name": "Test Publisher",
"url": "https://example.org"
},
"recordSet": {
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records",
"@type": "cr:RecordSet",
"field": [
{
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records/temperature",
"@type": "cr:Field",
"dataType": "Float",
"source": {
"extract": {
"column": "temperature"
},
"fileObject": {
"@id": "my-custom-resource-id",
}
}
},
{
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records/timestamp",
"@type": "cr:Field",
"dataType": "Date",
"source": {
"extract": {
"column": "timestamp"
},
"fileObject": {
"@id": "my-custom-resource-id"
}
}
},
{
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records/name",
"@type": "cr:Field",
"dataType": "Text",
"source": {
"extract": {
"column": "name"
},
"fileObject": {
"@id": "my-custom-resource-id"
}
}
},
{
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records/age",
"@type": "cr:Field",
"dataType": "Integer",
"source": {
"extract": {
"column": "age"
},
"fileObject": {
"@id": "my-custom-resource-id"
}
}
}
],
"key": {
"@id": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records/name"
},
"name": "568b8ac9-8c69-4475-b35e-d7f812a63c32/records"
},
"sameAs": [
"https://some.other.catalog/dataset/123",
"https://yet.another.catalog/dataset/xyz"
Expand Down
1 change: 1 addition & 0 deletions examples/ckan/ckan_full_dataset_croissant.json
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
"id_given": "my-custom-resource-id",
"size": "12323",
"hash": "b221d9dbb083a7f33428d7c2a3c3198ae925614d70210e28716ccaa7cd4ddb79",
"datastore_active": true,
"subresources": [
{
"type": "fileObject",
Expand Down
Loading