Skip to content

Commit f688274

Browse files
authored
refactor: avoid potential unnecessary store accesses on dataset resolution (#4513)
* refactor: avoid potential unnecessary store accesses on dataset resolution * pr remarks
1 parent f04c8f7 commit f688274

File tree

16 files changed

+512
-477
lines changed

16 files changed

+512
-477
lines changed

core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/CatalogCoreExtension.java

+13-3
Original file line numberDiff line numberDiff line change
@@ -17,8 +17,10 @@
1717
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
1818
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
1919
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
20-
import org.eclipse.edc.connector.controlplane.contract.spi.offer.ContractDefinitionResolver;
20+
import org.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore;
2121
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
22+
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
23+
import org.eclipse.edc.policy.engine.spi.PolicyScope;
2224
import org.eclipse.edc.runtime.metamodel.annotation.Extension;
2325
import org.eclipse.edc.runtime.metamodel.annotation.Inject;
2426
import org.eclipse.edc.runtime.metamodel.annotation.Provider;
@@ -30,8 +32,8 @@ public class CatalogCoreExtension implements ServiceExtension {
3032

3133
public static final String NAME = "Catalog Core";
3234

33-
@Inject
34-
private ContractDefinitionResolver contractDefinitionResolver;
35+
@PolicyScope
36+
public static final String CATALOG_SCOPE = "catalog";
3537

3638
@Inject
3739
private AssetIndex assetIndex;
@@ -45,14 +47,22 @@ public class CatalogCoreExtension implements ServiceExtension {
4547
@Inject
4648
private CriterionOperatorRegistry criterionOperatorRegistry;
4749

50+
@Inject
51+
private ContractDefinitionStore contractDefinitionStore;
52+
53+
@Inject
54+
private PolicyEngine policyEngine;
55+
4856
@Override
4957
public String name() {
5058
return NAME;
5159
}
5260

5361
@Provider
5462
public DatasetResolver datasetResolver() {
63+
var contractDefinitionResolver = new ContractDefinitionResolverImpl(contractDefinitionStore, policyEngine, policyDefinitionStore);
5564
return new DatasetResolverImpl(contractDefinitionResolver, assetIndex, policyDefinitionStore,
5665
distributionResolver, criterionOperatorRegistry);
5766
}
67+
5868
}

core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/CatalogDefaultServicesExtension.java

+1
Original file line numberDiff line numberDiff line change
@@ -52,4 +52,5 @@ public DataServiceRegistry dataServiceRegistry() {
5252
public DistributionResolver distributionResolver() {
5353
return new DefaultDistributionResolver(dataServiceRegistry, dataFlowManager);
5454
}
55+
5556
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
/*
2+
* Copyright (c) 2024 Cofinity-X
3+
*
4+
* This program and the accompanying materials are made available under the
5+
* terms of the Apache License, Version 2.0 which is available at
6+
* https://www.apache.org/licenses/LICENSE-2.0
7+
*
8+
* SPDX-License-Identifier: Apache-2.0
9+
*
10+
* Contributors:
11+
* Cofinity-X - initial API and implementation
12+
*
13+
*/
14+
15+
package org.eclipse.edc.connector.controlplane.catalog;
16+
17+
import org.eclipse.edc.connector.controlplane.catalog.spi.ContractDefinitionResolver;
18+
import org.eclipse.edc.connector.controlplane.catalog.spi.ResolvedContractDefinitions;
19+
import org.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore;
20+
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
21+
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
22+
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
23+
import org.eclipse.edc.policy.engine.spi.PolicyContextImpl;
24+
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
25+
import org.eclipse.edc.policy.model.Policy;
26+
import org.eclipse.edc.spi.agent.ParticipantAgent;
27+
import org.eclipse.edc.spi.query.QuerySpec;
28+
import org.eclipse.edc.spi.result.Result;
29+
30+
import java.util.HashMap;
31+
import java.util.Optional;
32+
33+
import static java.lang.String.format;
34+
import static org.eclipse.edc.connector.controlplane.catalog.CatalogCoreExtension.CATALOG_SCOPE;
35+
36+
/**
37+
* Determines the contract definitions applicable to a {@link ParticipantAgent} by evaluating the access control and
38+
* usage policies associated with a set of assets as defined by {@link ContractDefinition}s. On the distinction between
39+
* access control and usage policy, see {@link ContractDefinition}.
40+
*/
41+
public class ContractDefinitionResolverImpl implements ContractDefinitionResolver {
42+
private final PolicyEngine policyEngine;
43+
private final PolicyDefinitionStore policyStore;
44+
private final ContractDefinitionStore definitionStore;
45+
46+
public ContractDefinitionResolverImpl(ContractDefinitionStore contractDefinitionStore, PolicyEngine policyEngine, PolicyDefinitionStore policyStore) {
47+
definitionStore = contractDefinitionStore;
48+
this.policyEngine = policyEngine;
49+
this.policyStore = policyStore;
50+
}
51+
52+
@Override
53+
public ResolvedContractDefinitions resolveFor(ParticipantAgent agent) {
54+
var policies = new HashMap<String, Policy>();
55+
var definitions = definitionStore.findAll(QuerySpec.max())
56+
.filter(definition -> {
57+
var policyContext = PolicyContextImpl.Builder.newInstance().additional(ParticipantAgent.class, agent).build();
58+
var accessResult = Optional.of(definition.getAccessPolicyId())
59+
.map(policyId -> policies.computeIfAbsent(policyId,
60+
key -> Optional.ofNullable(policyStore.findById(key))
61+
.map(PolicyDefinition::getPolicy)
62+
.orElse(null))
63+
)
64+
.map(policy -> policyEngine.evaluate(CATALOG_SCOPE, policy, policyContext))
65+
.orElse(Result.failure(format("Policy %s not found", definition.getAccessPolicyId())));
66+
67+
return accessResult.succeeded();
68+
})
69+
.toList();
70+
71+
return new ResolvedContractDefinitions(definitions, policies);
72+
}
73+
74+
}

core/control-plane/control-plane-catalog/src/main/java/org/eclipse/edc/connector/controlplane/catalog/DatasetResolverImpl.java

+20-10
Original file line numberDiff line numberDiff line change
@@ -17,15 +17,17 @@
1717
import org.eclipse.edc.connector.controlplane.asset.spi.domain.Asset;
1818
import org.eclipse.edc.connector.controlplane.asset.spi.index.AssetIndex;
1919
import org.eclipse.edc.connector.controlplane.catalog.spi.Catalog;
20+
import org.eclipse.edc.connector.controlplane.catalog.spi.ContractDefinitionResolver;
2021
import org.eclipse.edc.connector.controlplane.catalog.spi.DataService;
2122
import org.eclipse.edc.connector.controlplane.catalog.spi.Dataset;
2223
import org.eclipse.edc.connector.controlplane.catalog.spi.DatasetResolver;
2324
import org.eclipse.edc.connector.controlplane.catalog.spi.DistributionResolver;
2425
import org.eclipse.edc.connector.controlplane.contract.spi.ContractOfferId;
25-
import org.eclipse.edc.connector.controlplane.contract.spi.offer.ContractDefinitionResolver;
2626
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
27+
import org.eclipse.edc.connector.controlplane.policy.spi.PolicyDefinition;
2728
import org.eclipse.edc.connector.controlplane.policy.spi.store.PolicyDefinitionStore;
2829
import org.eclipse.edc.dataaddress.httpdata.spi.HttpDataAddressSchema;
30+
import org.eclipse.edc.policy.model.Policy;
2931
import org.eclipse.edc.policy.model.PolicyType;
3032
import org.eclipse.edc.spi.agent.ParticipantAgent;
3133
import org.eclipse.edc.spi.query.CriterionOperatorRegistry;
@@ -34,6 +36,7 @@
3436

3537
import java.util.Base64;
3638
import java.util.List;
39+
import java.util.Map;
3740
import java.util.Optional;
3841
import java.util.function.Predicate;
3942
import java.util.stream.Stream;
@@ -61,34 +64,36 @@ public DatasetResolverImpl(ContractDefinitionResolver contractDefinitionResolver
6164
@Override
6265
@NotNull
6366
public Stream<Dataset> query(ParticipantAgent agent, QuerySpec querySpec) {
64-
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
67+
var resolved = contractDefinitionResolver.resolveFor(agent);
68+
var contractDefinitions = resolved.contractDefinitions();
6569
if (contractDefinitions.isEmpty()) {
6670
return Stream.empty();
6771
}
6872

6973
var assetsQuery = QuerySpec.Builder.newInstance().offset(0).limit(MAX_VALUE).filter(querySpec.getFilterExpression()).build();
7074
return assetIndex.queryAssets(assetsQuery)
71-
.map(asset -> toDataset(contractDefinitions, asset))
75+
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies()))
7276
.filter(Dataset::hasOffers)
7377
.skip(querySpec.getOffset())
7478
.limit(querySpec.getLimit());
7579
}
7680

7781
@Override
7882
public Dataset getById(ParticipantAgent agent, String id) {
79-
var contractDefinitions = contractDefinitionResolver.definitionsFor(agent).toList();
83+
var resolved = contractDefinitionResolver.resolveFor(agent);
84+
var contractDefinitions = resolved.contractDefinitions();
8085
if (contractDefinitions.isEmpty()) {
8186
return null;
8287
}
8388

8489
return Optional.of(id)
8590
.map(assetIndex::findById)
86-
.map(asset -> toDataset(contractDefinitions, asset))
91+
.map(asset -> toDataset(contractDefinitions, asset, resolved.policies()))
8792
.filter(Dataset::hasOffers)
8893
.orElse(null);
8994
}
9095

91-
private Dataset.Builder buildDataset(Asset asset) {
96+
private Dataset.Builder<?, ?> buildDataset(Asset asset) {
9297
if (!asset.isCatalog()) {
9398
return Dataset.Builder.newInstance();
9499
}
@@ -101,7 +106,7 @@ private Dataset.Builder buildDataset(Asset asset) {
101106
.build());
102107
}
103108

104-
private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset) {
109+
private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset asset, Map<String, Policy> policies) {
105110

106111
var distributions = distributionResolver.getDistributions(asset);
107112
var datasetBuilder = buildDataset(asset)
@@ -116,10 +121,15 @@ private Dataset toDataset(List<ContractDefinition> contractDefinitions, Asset as
116121
.test(asset)
117122
)
118123
.forEach(contractDefinition -> {
119-
var policyDefinition = policyDefinitionStore.findById(contractDefinition.getContractPolicyId());
120-
if (policyDefinition != null) {
124+
var policy = policies.computeIfAbsent(contractDefinition.getContractPolicyId(), policyId ->
125+
Optional.ofNullable(policyDefinitionStore.findById(policyId))
126+
.map(PolicyDefinition::getPolicy)
127+
.orElse(null)
128+
);
129+
130+
if (policy != null) {
121131
var contractId = ContractOfferId.create(contractDefinition.getId(), asset.getId());
122-
var offerPolicy = policyDefinition.getPolicy().toBuilder().type(PolicyType.OFFER).build();
132+
var offerPolicy = policy.toBuilder().type(PolicyType.OFFER).build();
123133
datasetBuilder.offer(contractId.toString(), offerPolicy);
124134
}
125135
});
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@
1313
*
1414
*/
1515

16-
package org.eclipse.edc.connector.controlplane.contract.offer;
16+
package org.eclipse.edc.connector.controlplane.catalog;
1717

1818
import org.eclipse.edc.connector.controlplane.contract.spi.offer.store.ContractDefinitionStore;
1919
import org.eclipse.edc.connector.controlplane.contract.spi.types.offer.ContractDefinition;
@@ -23,123 +23,113 @@
2323
import org.eclipse.edc.policy.engine.spi.PolicyEngine;
2424
import org.eclipse.edc.policy.model.Policy;
2525
import org.eclipse.edc.spi.agent.ParticipantAgent;
26-
import org.eclipse.edc.spi.monitor.Monitor;
2726
import org.eclipse.edc.spi.query.QuerySpec;
2827
import org.eclipse.edc.spi.result.Result;
29-
import org.junit.jupiter.api.BeforeEach;
3028
import org.junit.jupiter.api.Test;
3129

32-
import java.util.Map;
3330
import java.util.stream.Stream;
3431

32+
import static java.util.Collections.emptyMap;
3533
import static org.assertj.core.api.Assertions.assertThat;
36-
import static org.eclipse.edc.connector.controlplane.contract.spi.offer.ContractDefinitionResolver.CATALOGING_SCOPE;
34+
import static org.assertj.core.api.Assertions.entry;
35+
import static org.eclipse.edc.connector.controlplane.catalog.CatalogCoreExtension.CATALOG_SCOPE;
3736
import static org.mockito.AdditionalMatchers.and;
3837
import static org.mockito.ArgumentMatchers.any;
3938
import static org.mockito.ArgumentMatchers.argThat;
4039
import static org.mockito.ArgumentMatchers.eq;
4140
import static org.mockito.ArgumentMatchers.isA;
4241
import static org.mockito.Mockito.atLeastOnce;
4342
import static org.mockito.Mockito.mock;
43+
import static org.mockito.Mockito.only;
4444
import static org.mockito.Mockito.verify;
4545
import static org.mockito.Mockito.verifyNoInteractions;
4646
import static org.mockito.Mockito.when;
4747

4848
class ContractDefinitionResolverImplTest {
4949

50-
private final PolicyEngine policyEngine = mock(PolicyEngine.class);
51-
private final PolicyDefinitionStore policyStore = mock(PolicyDefinitionStore.class);
52-
private final ContractDefinitionStore definitionStore = mock(ContractDefinitionStore.class);
50+
private final PolicyEngine policyEngine = mock();
51+
private final PolicyDefinitionStore policyStore = mock();
52+
private final ContractDefinitionStore definitionStore = mock();
5353

54-
private ContractDefinitionResolverImpl definitionService;
55-
56-
@BeforeEach
57-
void setUp() {
58-
definitionService = new ContractDefinitionResolverImpl(mock(Monitor.class), definitionStore, policyEngine, policyStore);
59-
}
54+
private final ContractDefinitionResolverImpl resolver = new ContractDefinitionResolverImpl(definitionStore,
55+
policyEngine, policyStore);
6056

6157
@Test
62-
void definitionsFor_verifySatisfiesPolicies() {
63-
var agent = new ParticipantAgent(Map.of(), Map.of());
58+
void shouldReturnDefinition_whenAccessPolicySatisfied() {
59+
var agent = new ParticipantAgent(emptyMap(), emptyMap());
6460
var def = PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build();
6561
when(policyStore.findById(any())).thenReturn(def);
6662
when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.success());
6763
when(definitionStore.findAll(any())).thenReturn(Stream.of(createContractDefinition()));
6864

69-
var definitions = definitionService.definitionsFor(agent);
65+
var result = resolver.resolveFor(agent);
7066

71-
assertThat(definitions).hasSize(1);
67+
assertThat(result.contractDefinitions()).hasSize(1);
68+
assertThat(result.policies()).hasSize(1);
7269
verify(policyEngine, atLeastOnce()).evaluate(
73-
eq(CATALOGING_SCOPE),
70+
eq(CATALOG_SCOPE),
7471
eq(def.getPolicy()),
7572
and(isA(PolicyContext.class), argThat(c -> c.getContextData(ParticipantAgent.class).equals(agent)))
7673
);
7774
verify(definitionStore).findAll(any());
7875
}
7976

8077
@Test
81-
void definitionsFor_verifyDoesNotSatisfyAccessPolicy() {
82-
var agent = new ParticipantAgent(Map.of(), Map.of());
78+
void shouldNotReturnDefinition_whenAccessPolicyNotSatisfied() {
79+
var agent = new ParticipantAgent(emptyMap(), emptyMap());
8380
var definition = PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).id("access").build();
8481
when(policyStore.findById(any())).thenReturn(definition);
8582
var contractDefinition = createContractDefinition();
8683
when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.failure("invalid"));
8784
when(definitionStore.findAll(any())).thenReturn(Stream.of(contractDefinition));
8885

89-
var result = definitionService.definitionsFor(agent);
86+
var result = resolver.resolveFor(agent);
9087

91-
assertThat(result).isEmpty();
88+
assertThat(result.contractDefinitions()).isEmpty();
89+
assertThat(result.policies()).hasSize(1);
9290
verify(definitionStore).findAll(any());
9391
}
9492

9593
@Test
96-
void definitionsFor_verifyPoliciesNotFound() {
97-
var agent = new ParticipantAgent(Map.of(), Map.of());
94+
void shouldNotReturnDefinition_whenAccessPolicyDoesNotExist() {
95+
var agent = new ParticipantAgent(emptyMap(), emptyMap());
9896
when(policyStore.findById(any())).thenReturn(null);
9997
when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.success());
10098
when(definitionStore.findAll(QuerySpec.max())).thenReturn(Stream.of(createContractDefinition()));
10199

102-
var definitions = definitionService.definitionsFor(agent);
100+
var result = resolver.resolveFor(agent);
103101

104-
assertThat(definitions).isEmpty();
102+
assertThat(result.contractDefinitions()).isEmpty();
103+
assertThat(result.policies()).isEmpty();
105104
verifyNoInteractions(policyEngine);
106105
}
107106

108107
@Test
109-
void definitionFor_found() {
110-
var agent = new ParticipantAgent(Map.of(), Map.of());
111-
var definition = PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build();
112-
var contractDefinition = createContractDefinition();
113-
when(policyStore.findById(any())).thenReturn(definition);
108+
void shouldQueryPolicyOnce_whenDifferentDefinitionsHaveSamePolicy() {
109+
var contractDefinition1 = contractDefinitionBuilder().accessPolicyId("accessPolicyId").build();
110+
var contractDefinition2 = contractDefinitionBuilder().accessPolicyId("accessPolicyId").build();
111+
var policy = Policy.Builder.newInstance().build();
112+
var policyDefinition = PolicyDefinition.Builder.newInstance().policy(policy).build();
113+
when(policyStore.findById(any())).thenReturn(policyDefinition);
114114
when(policyEngine.evaluate(any(), any(), isA(PolicyContext.class))).thenReturn(Result.success());
115-
when(definitionStore.findById("1")).thenReturn(contractDefinition);
115+
when(definitionStore.findAll(any())).thenReturn(Stream.of(contractDefinition1, contractDefinition2));
116116

117-
var result = definitionService.definitionFor(agent, "1");
117+
var result = resolver.resolveFor(new ParticipantAgent(emptyMap(), emptyMap()));
118118

119-
assertThat(result).isNotNull();
120-
verify(policyEngine, atLeastOnce()).evaluate(
121-
eq(CATALOGING_SCOPE),
122-
eq(definition.getPolicy()),
123-
and(isA(PolicyContext.class), argThat(c -> c.getContextData(ParticipantAgent.class).equals(agent)))
124-
);
119+
assertThat(result.contractDefinitions()).hasSize(2);
120+
assertThat(result.policies()).hasSize(1).containsOnly(entry("accessPolicyId", policy));
121+
verify(policyStore, only()).findById("accessPolicyId");
125122
}
126123

127-
@Test
128-
void definitionFor_notFound() {
129-
var agent = new ParticipantAgent(Map.of(), Map.of());
130-
when(definitionStore.findById(any())).thenReturn(null);
131-
132-
var result = definitionService.definitionFor(agent, "nodefinition");
133-
134-
assertThat(result).isNull();
135-
verifyNoInteractions(policyEngine);
124+
private ContractDefinition createContractDefinition() {
125+
return contractDefinitionBuilder()
126+
.build();
136127
}
137128

138-
private ContractDefinition createContractDefinition() {
129+
private ContractDefinition.Builder contractDefinitionBuilder() {
139130
return ContractDefinition.Builder.newInstance()
140131
.id("1")
141132
.accessPolicyId("access")
142-
.contractPolicyId("contract")
143-
.build();
133+
.contractPolicyId("contract");
144134
}
145135
}

0 commit comments

Comments
 (0)