Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(store): remove cache on CosmosPolicyDefinitionStore #1976

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,12 @@
import org.junit.jupiter.api.Test;

import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.dataspaceconnector.spi.policy.TestFunctions.createPolicy;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -76,6 +79,16 @@ void deleteById_exceptionThrown() {
assertThatExceptionOfType(EdcPersistenceException.class).isThrownBy(() -> store.deleteById("any-policy-id"));
}

@Test
void findAll_verifyFiltering_invalidFilterExpression() {
IntStream.range(0, 10).mapToObj(i -> createPolicy("test-id")).forEach(d -> getPolicyDefinitionStore().save(d));

var query = QuerySpec.Builder.newInstance().filter("something contains other").build();

assertThatThrownBy(() -> getPolicyDefinitionStore().findAll(query)).isInstanceOfAny(EdcPersistenceException.class);
}


@Override
protected PolicyDefinitionStore getPolicyDefinitionStore() {
return store;
Expand All @@ -86,6 +99,11 @@ protected boolean supportCollectionQuery() {
return false;
}

@Override
protected boolean supportCollectionIndexQuery() {
return true;
}

@Override
protected Boolean supportSortOrder() {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,10 @@ private String toValuePlaceholder() {
private String getName() {
return criterion.getOperandLeft().toString()
.replace(":", "_")
.replace(".", "_");
.replace(".", "_")
.replace("[", "")
.replace("]", "")
.replaceAll("[0-9]", "");
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,16 @@
import com.azure.cosmos.implementation.NotFoundException;
import dev.failsafe.RetryPolicy;
import org.eclipse.dataspaceconnector.azure.cosmos.CosmosDbApi;
import org.eclipse.dataspaceconnector.common.concurrency.LockManager;
import org.eclipse.dataspaceconnector.azure.cosmos.dialect.SqlStatement;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyDefinitionStore;
import org.eclipse.dataspaceconnector.spi.query.QueryResolver;
import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.query.ReflectionBasedQueryResolver;
import org.eclipse.dataspaceconnector.spi.query.SortOrder;
import org.eclipse.dataspaceconnector.spi.types.TypeManager;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static dev.failsafe.Failsafe.with;
Expand All @@ -46,99 +40,64 @@ public class CosmosPolicyDefinitionStore implements PolicyDefinitionStore {
private final CosmosDbApi cosmosDbApi;
private final TypeManager typeManager;
private final RetryPolicy<Object> retryPolicy;
private final LockManager lockManager;
private final String partitionKey;
private final QueryResolver<PolicyDefinition> queryResolver;
private final AtomicReference<Map<String, PolicyDefinition>> objectCache;
private final Monitor monitor;

public CosmosPolicyDefinitionStore(CosmosDbApi cosmosDbApi, TypeManager typeManager, RetryPolicy<Object> retryPolicy, String partitionKey, Monitor monitor) {
this.cosmosDbApi = cosmosDbApi;
this.typeManager = typeManager;
this.retryPolicy = retryPolicy;
this.partitionKey = partitionKey;
lockManager = new LockManager(new ReentrantReadWriteLock(true));
queryResolver = new ReflectionBasedQueryResolver<>(PolicyDefinition.class);
objectCache = new AtomicReference<>(new HashMap<>());
this.monitor = monitor;
}

@Override
public PolicyDefinition findById(String policyId) {
return lockManager.readLock(() -> getCache().get(policyId));
var policyDefinition = cosmosDbApi.queryItemById(policyId);
return policyDefinition != null ? convert(policyDefinition) : null;
}

@Override
public Stream<PolicyDefinition> findAll(QuerySpec spec) {
return lockManager.readLock(() -> queryResolver.query(getCache().values().stream(), spec));
var statement = new SqlStatement<>(PolicyDocument.class);
var query = statement.where(spec.getFilterExpression())
.offset(spec.getOffset())
.limit(spec.getLimit())
.orderBy(spec.getSortField(), spec.getSortOrder() == SortOrder.ASC)
.getQueryAsSqlQuerySpec();

var objects = with(retryPolicy).get(() -> cosmosDbApi.queryItems(query));
return objects.map(this::convert);
}

@Override
public void save(PolicyDefinition policy) {
lockManager.writeLock(() -> {
with(retryPolicy).run(() -> cosmosDbApi.saveItem(convertToDocument(policy)));
storeInCache(policy);
return null;
});
with(retryPolicy).run(() -> cosmosDbApi.saveItem(convertToDocument(policy)));
}

@Override
public @Nullable PolicyDefinition deleteById(String policyId) {
return lockManager.writeLock(() -> {

try {
var deletedItem = cosmosDbApi.deleteItem(policyId);
if (deletedItem == null) {
return null;
}
removeFromCache(policyId);
return convert(deletedItem);
} catch (NotFoundException e) {
monitor.debug(() -> String.format("PolicyDefinition with id %s not found", policyId));
try {
var deletedItem = cosmosDbApi.deleteItem(policyId);
if (deletedItem == null) {
return null;
}
});
return convert(deletedItem);
} catch (NotFoundException e) {
monitor.debug(() -> String.format("PolicyDefinition with id %s not found", policyId));
return null;
}
}

@Override
public void reload() {
lockManager.readLock(() -> {
// this reloads ALL items from the database. We might want something more elaborate in the future, especially
// if large amounts of ContractDefinitions need to be held in memory
var databaseObjects = with(retryPolicy)
.get(() -> cosmosDbApi.queryAllItems())
.stream()
.map(this::convert)
.collect(Collectors.toMap(PolicyDefinition::getUid, cd -> cd));

objectCache.set(databaseObjects);
return null;
});
}

private PolicyDefinition removeFromCache(String policyId) {
return lockManager.readLock(() -> {
var map = getCache();
return map.remove(policyId);
});

}

private void storeInCache(PolicyDefinition definition) {
getCache().put(definition.getUid(), definition);
}

@NotNull
private PolicyDocument convertToDocument(PolicyDefinition policy) {
return new PolicyDocument(policy, partitionKey);
}

private Map<String, PolicyDefinition> getCache() {
if (objectCache.get().isEmpty()) {
reload();
}
return objectCache.get();
}

private PolicyDefinition convert(Object object) {
var json = typeManager.writeValueAsString(object);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,8 @@
import org.eclipse.dataspaceconnector.azure.cosmos.CosmosDbApiImpl;
import org.eclipse.dataspaceconnector.azure.testfixtures.CosmosTestClient;
import org.eclipse.dataspaceconnector.azure.testfixtures.annotations.AzureCosmosDbIntegrationTest;
import org.eclipse.dataspaceconnector.policy.model.Action;
import org.eclipse.dataspaceconnector.policy.model.Duty;
import org.eclipse.dataspaceconnector.policy.model.Permission;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.policy.model.PolicyRegistrationTypes;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyDefinition;
Expand All @@ -48,7 +46,6 @@
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.dataspaceconnector.cosmos.policy.store.TestFunctions.generateDocument;
import static org.eclipse.dataspaceconnector.cosmos.policy.store.TestFunctions.generatePolicy;
import static org.mockito.Mockito.mock;
Expand Down Expand Up @@ -227,16 +224,6 @@ void findAll_verifyFiltering() {
assertThat(store.findAll(query)).extracting(PolicyDefinition::getUid).containsOnly(expectedId);
}

@Test
void findAll_verifyFiltering_invalidFilterExpression() {
IntStream.range(0, 10).mapToObj(i -> generateDocument(TEST_PARTITION_KEY)).forEach(d -> container.createItem(d));

var query = QuerySpec.Builder.newInstance().filter("something contains other").build();

// message is coming from the predicate converter rather than the SQL statement translation layer
assertThatThrownBy(() -> store.findAll(query)).isInstanceOfAny(IllegalArgumentException.class).hasMessage("Operator [contains] is not supported by this converter!");
}

@Test
void findAll_verifyFiltering_unsuccessfulFilterExpression() {
IntStream.range(0, 10).mapToObj(i -> generateDocument(TEST_PARTITION_KEY)).forEach(d -> container.createItem(d));
Expand All @@ -257,6 +244,7 @@ void findAll_verifySorting() {
assertThat(store.findAll(descendingQuery)).hasSize(10).isSortedAccordingTo((c1, c2) -> c2.getUid().compareTo(c1.getUid()));
}

// Override the base test since Cosmos returns documents where the property subject of ORDER BY does not exist
@Test
void findAll_sorting_nonExistentProperty() {

Expand All @@ -266,37 +254,9 @@ void findAll_sorting_nonExistentProperty() {
var query = QuerySpec.Builder.newInstance().sortField("notexist").sortOrder(SortOrder.DESC).build();

var all = store.findAll(query).collect(Collectors.toList());
assertThat(all).isEmpty();
assertThat(all).hasSize(10);
}

@Test
void verify_readWriteFindAll() {
// add an object
var policy = generatePolicy();
store.save(policy);
assertThat(store.findAll(QuerySpec.none())).containsExactly(policy);

// modify the object
var modifiedPolicy = PolicyDefinition.Builder.newInstance()
.policy(Policy.Builder.newInstance()

.permission(Permission.Builder.newInstance()
.target("test-asset-id")
.action(Action.Builder.newInstance()
.type("USE")
.build())
.build())
.build())
.id(policy.getUid())
.build();

store.save(modifiedPolicy);

// re-read
var all = store.findAll(QuerySpec.Builder.newInstance().filter("policy.permissions[0].target=test-asset-id").build()).collect(Collectors.toList());
assertThat(all).hasSize(1).containsExactly(modifiedPolicy);

}

@Override
protected PolicyDefinitionStore getPolicyDefinitionStore() {
Expand All @@ -308,6 +268,11 @@ protected boolean supportCollectionQuery() {
return false;
}

@Override
protected boolean supportCollectionIndexQuery() {
return true;
}

@Override
protected Boolean supportSortOrder() {
return true;
Expand Down
Loading