Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: avoid default limit 50 on catalog creation #2168

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import org.jetbrains.annotations.NotNull;

import java.net.URI;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -74,7 +75,9 @@ public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query) {
.filter(concat(criteria.stream(), query.getAssetsCriteria().stream()).collect(Collectors.toList()));

var querySpec = querySpecBuilder.build();
var numAssets = assetIndex.countAssets(querySpec);
var numAssets = assetIndex.countAssets(querySpec.getFilterExpression());

querySpecBuilder.limit((int) numAssets);

if (skip.get() > 0) {
querySpecBuilder.offset(skip.get());
Expand All @@ -83,24 +86,28 @@ public Stream<ContractOffer> queryContractOffers(ContractOfferQuery query) {
querySpecBuilder.limit(limit);
}

if (skip.get() < numAssets) {
var byId = policyStore.findById(definition.getContractPolicyId());
if (byId == null) { //policy not found
return Stream.empty();
}
var assets = assetIndex.queryAssets(querySpecBuilder.build());
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return assets.map(a -> createContractOffer(definition, byId.getPolicy(), a));

Stream<ContractOffer> offers;
if (skip.get() >= numAssets) {
offers = Stream.empty();
} else {
numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return Stream.empty();
offers = createContractOffers(definition, querySpecBuilder.build());
}

numSeenAssets.addAndGet(numAssets);
skip.addAndGet(Long.valueOf(-numAssets).intValue());
return offers;
});
}

@NotNull
private Stream<@NotNull ContractOffer> createContractOffers(ContractDefinition definition, QuerySpec assetQuerySpec) {
return Optional.of(definition.getContractPolicyId())
.map(policyStore::findById)
.map(policyDefinition -> assetIndex.queryAssets(assetQuerySpec)
.map(asset -> createContractOffer(definition, policyDefinition.getPolicy(), asset)))
.orElse(Stream.empty());
}

@NotNull
private ContractOffer createContractOffer(ContractDefinition definition, Policy policy, Asset asset) {
return ContractOffer.Builder.newInstance()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,10 @@
import static java.util.stream.IntStream.range;
import static java.util.stream.Stream.concat;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.AdditionalMatchers.and;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.argThat;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.Mockito.atLeastOnce;
Expand Down Expand Up @@ -77,7 +80,7 @@ void shouldGetContractOffers() {
when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenReturn(Stream.of(contractDefinition));
var assetStream = Stream.of(Asset.Builder.newInstance().build(), Asset.Builder.newInstance().build());
when(assetIndex.countAssets(any())).thenReturn(2L);
when(assetIndex.countAssets(anyList())).thenReturn(2L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(assetStream);
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

Expand Down Expand Up @@ -111,7 +114,7 @@ void shouldLimitResult() {

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> contractDefinition);
when(assetIndex.countAssets(any())).thenReturn(100L);
when(assetIndex.countAssets(anyList())).thenReturn(100L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(inv -> range(20, 50).mapToObj(i -> createAsset("asset" + i).build()));
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

Expand All @@ -133,6 +136,26 @@ void shouldLimitResult() {
verify(policyStore).findById("contract");
}


@Test
void shouldNotLimitResult_whenAssetsAreLessThanTheRequested() {
var contractDefinition = getContractDefBuilder("1").build();

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenReturn(Stream.of(contractDefinition));
when(assetIndex.countAssets(anyList())).thenReturn(40L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(inv -> range(20, 50).mapToObj(i -> createAsset("asset" + i).build()));
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var from = 20;
var to = 80;

var result = contractOfferResolver.queryContractOffers(getQuery(from, to));

assertThat(result).isNotEmpty();
verify(assetIndex, times(1)).queryAssets(and(isA(QuerySpec.class), argThat(it -> it.getLimit() == 40)));
}

@Test
void shouldLimitResult_insufficientAssets() {
var contractDefinition = range(0, 4).mapToObj(i -> getContractDefBuilder(String.valueOf(i))
Expand All @@ -141,7 +164,7 @@ void shouldLimitResult_insufficientAssets() {
when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> contractDefinition);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(inv -> range(0, 10).mapToObj(i -> createAsset("asset" + i).build()));
when(assetIndex.countAssets(any())).thenReturn(10L);
when(assetIndex.countAssets(anyList())).thenReturn(10L);
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

var from = 20;
Expand All @@ -164,7 +187,7 @@ void shouldLimitResult_pageOffsetLargerThanNumAssets() {

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> contractDefinition);
when(assetIndex.countAssets(any())).thenReturn(10L);
when(assetIndex.countAssets(anyList())).thenReturn(10L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(inv -> range(0, 10).mapToObj(i -> createAsset("asset" + i).build()));
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

Expand All @@ -188,7 +211,7 @@ void shouldLimitResultOfSingleAssetForContractDefinition() {

when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenAnswer(i -> contractDefinitions);
when(assetIndex.countAssets(any())).thenReturn(1L);
when(assetIndex.countAssets(anyList())).thenReturn(1L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenAnswer(inv -> Stream.of(createAsset("asset").build()));
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

Expand All @@ -215,7 +238,7 @@ void shouldGetContractOffersWithAssetFilteringApplied() {
when(agentService.createFor(isA(ClaimToken.class))).thenReturn(new ParticipantAgent(emptyMap(), emptyMap()));
when(contractDefinitionService.definitionsFor(isA(ParticipantAgent.class))).thenReturn(Stream.of(contractDefinition));
var assetStream = Stream.of(Asset.Builder.newInstance().build(), Asset.Builder.newInstance().build());
when(assetIndex.countAssets(isA(QuerySpec.class))).thenReturn(1000L);
when(assetIndex.countAssets(anyList())).thenReturn(1000L);
when(assetIndex.queryAssets(isA(QuerySpec.class))).thenReturn(assetStream);
when(policyStore.findById(any())).thenReturn(PolicyDefinition.Builder.newInstance().policy(Policy.Builder.newInstance().build()).build());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,12 @@

import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.AssetSelectorExpression;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.query.SortOrder;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.spi.types.domain.asset.AssetEntry;
import org.eclipse.edc.util.collection.CollectionUtil;
import org.jetbrains.annotations.Nullable;

import java.util.Collections;
Expand All @@ -31,7 +31,6 @@
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.Predicate;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static java.lang.String.format;
Expand Down Expand Up @@ -65,20 +64,10 @@ public Stream<Asset> queryAssets(AssetSelectorExpression expression) {

@Override
public Stream<Asset> queryAssets(QuerySpec querySpec) {
// first filter...
var expr = querySpec.getFilterExpression();
Stream<Asset> result;

lock.readLock().lock();
try {
if (CollectionUtil.isNotEmpty(expr)) {
// convert all the criteria into predicates since we're in memory anyway, collate all predicates into one and
// apply it to the stream
var rootPredicate = expr.stream().map(predicateFactory::convert).reduce(x -> true, Predicate::and);
result = filterByPredicate(cache, rootPredicate);
} else {
result = cache.values().stream();
}
// filter
var result = filterBy(querySpec.getFilterExpression());

// ... then sort
var sortField = querySpec.getSortField();
Expand All @@ -100,17 +89,26 @@ public Stream<Asset> queryAssets(QuerySpec querySpec) {
}
}

private Stream<Asset> filterBy(List<Criterion> criteria) {
var predicate = criteria.stream()
.map(predicateFactory::convert)
.reduce(x -> true, Predicate::and);

return cache.values().stream()
.filter(predicate);
}

@Override
public Asset findById(String assetId) {
Predicate<Asset> predicate = (asset) -> asset.getId().equals(assetId);
List<Asset> assets;
lock.readLock().lock();
try {
assets = filterByPredicate(cache, predicate).collect(Collectors.toList());
return cache.values().stream()
.filter(asset -> asset.getId().equals(assetId))
.findFirst()
.orElse(null);
} finally {
lock.readLock().unlock();
}
return assets.isEmpty() ? null : assets.get(0);
}

@Override
Expand All @@ -134,8 +132,8 @@ public Asset deleteById(String assetId) {
}

@Override
public long countAssets(QuerySpec querySpec) {
return queryAssets(querySpec).count();
public long countAssets(List<Criterion> criteria) {
return filterBy(criteria).count();
}

@Override
Expand Down Expand Up @@ -176,8 +174,4 @@ private void add(Asset asset, DataAddress address) {
cache.put(id, asset);
dataAddresses.put(id, address);
}

private Stream<Asset> filterByPredicate(Map<String, Asset> assets, Predicate<Asset> predicate) {
return assets.values().stream().filter(predicate);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,6 @@ void findAll_noQuerySpec() {
var assets = IntStream.range(0, 10).mapToObj(i -> createAsset("test-asset", "id" + i))
.peek(a -> index.accept(a, createDataAddress(a))).collect(Collectors.toList());


assertThat(index.queryAssets(QuerySpec.Builder.newInstance().build())).containsAll(assets);
}

Expand Down Expand Up @@ -244,7 +243,6 @@ void findAll_withFiltering() {
assertThat(index.queryAssets(spec)).hasSize(1).containsExactly(assets.get(1));
}


@Test
void findAll_withFiltering_limitExceedsResultSize() {
IntStream.range(0, 10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ dependencies {

testImplementation(project(":core:control-plane:control-plane-core"))
testImplementation(project(":extensions:common:http"))
testImplementation(project(":extensions:common:junit"))
testImplementation(project(":core:common:junit"))
testImplementation("io.rest-assured:rest-assured:${restAssured}")
testImplementation("org.awaitility:awaitility:${awaitility}")
testImplementation("org.mock-server:mockserver-netty:${httpMockServer}:shaded")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,15 @@
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.AssetSelectorExpression;
import org.eclipse.edc.spi.monitor.Monitor;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.query.SortOrder;
import org.eclipse.edc.spi.types.TypeManager;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.asset.Asset;
import org.eclipse.edc.spi.types.domain.asset.AssetEntry;

import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
Expand Down Expand Up @@ -115,18 +117,13 @@ public Asset deleteById(String assetId) {
}

@Override
public long countAssets(QuerySpec querySpec) {
var expr = querySpec.getFilterExpression();
public long countAssets(List<Criterion> criteria) {
var sqlQuery = queryBuilder.from(criteria);

var sortField = querySpec.getSortField();
var limit = querySpec.getLimit();
var sortAsc = querySpec.getSortOrder() == SortOrder.ASC;

var sqlQuery = queryBuilder.from(expr, sortField, sortAsc, limit, querySpec.getOffset());
var stmt = sqlQuery.getQueryText().replace("SELECT * ", "SELECT COUNT(1) ");
sqlQuery.setQueryText(stmt);
var response = with(retryPolicy).get(() -> assetDb.queryItems(sqlQuery));
return response.findFirst().map(o -> extractCount(o)).orElse(0L);
return with(retryPolicy).get(() -> assetDb.queryItems(sqlQuery))
.findFirst().map(this::extractCount).orElse(0L);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.edc.spi.asset.AssetIndex;
import org.eclipse.edc.spi.asset.AssetSelectorExpression;
import org.eclipse.edc.spi.persistence.EdcPersistenceException;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.spi.types.domain.DataAddress;
import org.eclipse.edc.spi.types.domain.asset.Asset;
Expand All @@ -35,6 +36,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.AbstractMap;
import java.util.List;
import java.util.Objects;
import java.util.stream.Stream;

Expand Down Expand Up @@ -172,9 +174,9 @@ public Asset deleteById(String assetId) {
}

@Override
public long countAssets(QuerySpec querySpec) {
public long countAssets(List<Criterion> criteria) {
try (var connection = getConnection()) {
var statement = assetStatements.createQuery(querySpec);
var statement = assetStatements.createQuery(criteria);

var queryAsString = statement.getQueryAsString().replace("SELECT * ", "SELECT COUNT (*) ");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,12 @@
package org.eclipse.edc.connector.store.sql.assetindex.schema;

import org.eclipse.edc.runtime.metamodel.annotation.ExtensionPoint;
import org.eclipse.edc.spi.query.Criterion;
import org.eclipse.edc.spi.query.QuerySpec;
import org.eclipse.edc.sql.translation.SqlQueryStatement;

import java.util.List;

/**
* Defines queries used by the SqlAssetIndexServiceExtension.
*/
Expand Down Expand Up @@ -156,6 +159,13 @@ default String getCreatedAtColumn() {
*/
SqlQueryStatement createQuery(QuerySpec query);

/**
* Generates a SQL query using sub-select statements out of the criterion.
*
* @return A {@link SqlQueryStatement} that contains the SQL and statement parameters
*/
SqlQueryStatement createQuery(List<Criterion> query);

/**
* Select single asset by ID
*/
Expand Down
Loading