Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(test): introduce a base class for all TransferProcessStore #1982

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/control-plane/control-plane-core/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
testImplementation(testFixtures(project(":spi:common:core-spi")))
testImplementation(testFixtures(project(":spi:control-plane:contract-spi")))
testImplementation(testFixtures(project(":spi:control-plane:policy-spi")))
testImplementation(testFixtures(project(":spi:control-plane:transfer-spi")))
}

publishing {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public TransferProcess find(String id) {

@Override
@Nullable
public String processIdForTransferId(String id) {
public String processIdForDataRequestId(String id) {
return store.findAll()
.filter(p -> id.equals(p.getDataRequest().getId()))
.findFirst()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,218 +14,66 @@

package org.eclipse.dataspaceconnector.core.controlplane.defaults.transferprocessstore;

import org.eclipse.dataspaceconnector.spi.query.QuerySpec;
import org.eclipse.dataspaceconnector.spi.query.SortOrder;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataRequest;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.ResourceManifest;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcess;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcessStates;
import org.jetbrains.annotations.NotNull;
import org.eclipse.dataspaceconnector.spi.persistence.Lease;
import org.eclipse.dataspaceconnector.spi.transfer.store.TransferProcessStore;
import org.eclipse.dataspaceconnector.transfer.store.TransferProcessStoreTestBase;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

import java.util.Comparator;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;
import static org.eclipse.dataspaceconnector.core.controlplane.defaults.transferprocessstore.TestFunctions.createProcess;
import static org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcessStates.INITIAL;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertNotSame;
import static org.junit.jupiter.api.Assertions.assertNull;

class InMemoryTransferProcessStoreTest {

import java.time.Clock;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;

class InMemoryTransferProcessStoreTest extends TransferProcessStoreTestBase {
private final Map<String, Lease> leases = new HashMap<>();
private InMemoryTransferProcessStore store;

@BeforeEach
void setUp() {
store = new InMemoryTransferProcessStore();
}

@Test
void verifyCreateUpdateDelete() {
String id = UUID.randomUUID().toString();
TransferProcess transferProcess = TransferProcess.Builder.newInstance().id(id).dataRequest(DataRequest.Builder.newInstance().id("clientid").destinationType("test").build()).build();
transferProcess.transitionInitial();
store.create(transferProcess);

TransferProcess found = store.find(id);

assertNotNull(found);
assertNotSame(found, transferProcess); // enforce by-value

assertNotNull(store.processIdForTransferId("clientid"));

assertEquals(INITIAL.code(), found.getState());

transferProcess.transitionProvisioning(ResourceManifest.Builder.newInstance().build());

store.update(transferProcess);

found = store.find(id);
assertNotNull(found);
assertEquals(TransferProcessStates.PROVISIONING.code(), found.getState());

store.delete(id);
assertNull(store.find(id));
assertNull(store.processIdForTransferId("clientid"));

}

@Test
void verifyNext() throws InterruptedException {
var transferProcess1 = initialTransferProcess();
store.create(transferProcess1);
var transferProcess2 = initialTransferProcess();
store.create(transferProcess2);

transferProcess2.transitionProvisioning(ResourceManifest.Builder.newInstance().build());
store.update(transferProcess2);
Thread.sleep(1);
transferProcess1.transitionProvisioning(ResourceManifest.Builder.newInstance().build());
store.update(transferProcess1);

assertThat(store.nextForState(INITIAL.code(), 1)).isEmpty();

var found = store.nextForState(TransferProcessStates.PROVISIONING.code(), 1);
assertThat(found).hasSize(1).first().matches(it -> it.equals(transferProcess2));

found = store.nextForState(TransferProcessStates.PROVISIONING.code(), 3);
assertThat(found).hasSize(1).first().matches(it -> it.equals(transferProcess1));
}

@Test
void nextForState_shouldLeaseEntityUntilUpdate() {
var initialTransferProcess = initialTransferProcess();
store.create(initialTransferProcess);

var firstQueryResult = store.nextForState(INITIAL.code(), 1);
assertThat(firstQueryResult).hasSize(1);

var secondQueryResult = store.nextForState(INITIAL.code(), 1);
assertThat(secondQueryResult).hasSize(0);

var retrieved = firstQueryResult.get(0);
store.update(retrieved);

var thirdQueryResult = store.nextForState(INITIAL.code(), 1);
assertThat(thirdQueryResult).hasSize(1);
store = new InMemoryTransferProcessStore(CONNECTOR_NAME, Clock.systemUTC(), leases);
}

@Test
void verifyMutlipleRequets() {
String id1 = UUID.randomUUID().toString();
TransferProcess transferProcess1 = TransferProcess.Builder.newInstance().id(id1).dataRequest(DataRequest.Builder.newInstance().id("clientid1").destinationType("test").build()).build();
transferProcess1.transitionInitial();
store.create(transferProcess1);

String id2 = UUID.randomUUID().toString();
TransferProcess transferProcess2 = TransferProcess.Builder.newInstance().id(id2).dataRequest(DataRequest.Builder.newInstance().id("clientid2").destinationType("test").build()).build();
transferProcess2.transitionInitial();
store.create(transferProcess2);


TransferProcess found1 = store.find(id1);
assertNotNull(found1);

TransferProcess found2 = store.find(id2);
assertNotNull(found2);

var found = store.nextForState(INITIAL.code(), 3);
assertEquals(2, found.size());
@Override
protected boolean supportsCollectionQuery() {
return false;
}

@Test
void verifyOrderingByTimestamp() {
for (int i = 0; i < 100; i++) {
TransferProcess process = createProcess("test-process-" + i);
process.transitionInitial();
store.create(process);
}

List<TransferProcess> processes = store.nextForState(INITIAL.code(), 50);

assertThat(processes).hasSize(50);
assertThat(processes).allMatch(p -> p.getStateTimestamp() > 0);
@Override
protected boolean supportsLikeOperator() {
return false;
}

@Test
void verifyNextForState_avoidsStarvation() throws InterruptedException {
for (int i = 0; i < 10; i++) {
TransferProcess process = createProcess("test-process-" + i);
process.transitionInitial();
store.create(process);
}

var list1 = store.nextForState(INITIAL.code(), 5);
Thread.sleep(50); //simulate a short delay to generate different timestamps
list1.forEach(tp -> {
tp.updateStateTimestamp();
store.update(tp);
});
var list2 = store.nextForState(INITIAL.code(), 5);
assertThat(list1).isNotEqualTo(list2).doesNotContainAnyElementsOf(list2);
@Override
protected boolean supportsInOperator() {
return true;
}

@Test
void findAll_noQuerySpec() {
IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));

var all = store.findAll(QuerySpec.Builder.newInstance().build());
assertThat(all).hasSize(10);
@Override
protected boolean supportsSortOrder() {
return true;
}

@Test
void findAll_verifyPaging() {

IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));

// page size fits
assertThat(store.findAll(QuerySpec.Builder.newInstance().offset(3).limit(4).build())).hasSize(4);

// page size too large
assertThat(store.findAll(QuerySpec.Builder.newInstance().offset(5).limit(100).build())).hasSize(5);
@Override
protected TransferProcessStore getTransferProcessStore() {
return store;
}

@Test
void findAll_verifyFiltering() {
IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));
assertThat(store.findAll(QuerySpec.Builder.newInstance().equalsAsContains(false).filter("id=test-neg-3").build())).extracting(TransferProcess::getId).containsOnly("test-neg-3");
@Override
protected void lockEntity(String negotiationId, String owner, Duration duration) {
leases.put(negotiationId, new Lease(owner, Clock.systemUTC().millis(), duration.toMillis()));
}

@Test
void findAll_verifyFiltering_invalidFilterExpression() {
IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));
assertThatThrownBy(() -> store.findAll(QuerySpec.Builder.newInstance().filter("something foobar other").build())).isInstanceOfAny(IllegalArgumentException.class);
@Override
protected boolean isLockedBy(String negotiationId, String owner) {
return leases.entrySet().stream().anyMatch(e -> e.getKey().equals(negotiationId) &&
e.getValue().getLeasedBy().equals(owner) &&
!isExpired(e.getValue()));
}

@Test
void findAll_verifySorting() {
IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));

assertThat(store.findAll(QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.ASC).build())).hasSize(10).isSortedAccordingTo(Comparator.comparing(TransferProcess::getId));
assertThat(store.findAll(QuerySpec.Builder.newInstance().sortField("id").sortOrder(SortOrder.DESC).build())).hasSize(10).isSortedAccordingTo((c1, c2) -> c2.getId().compareTo(c1.getId()));
private boolean isExpired(Lease e) {
return e.getLeasedAt() + e.getLeaseDuration() < Clock.systemUTC().millis();
}

@Test
void findAll_verifySorting_invalidProperty() {
IntStream.range(0, 10).forEach(i -> store.create(createProcess("test-neg-" + i)));

var query = QuerySpec.Builder.newInstance().sortField("notexist").sortOrder(SortOrder.DESC).build();

assertThat(store.findAll(query).collect(Collectors.toList())).isEmpty();
}

@NotNull
private TransferProcess initialTransferProcess() {
var process = TransferProcess.Builder.newInstance()
.id(UUID.randomUUID().toString()).dataRequest(DataRequest.Builder.newInstance().id("clientid").destinationType("test").build()).build();
process.transitionInitial();
return process;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,15 @@ public class TransferProcessManagerImpl implements TransferProcessManager, Provi
private TransferProcessManagerImpl() {
}

@NotNull
private static Result<Void> toFatalError(StatusResult<?> result) {
if (result.fatalError()) {
return Result.failure(result.getFailureMessages());
} else {
return Result.success();
}
}

public void start() {
stateMachineManager = StateMachineManager.Builder.newInstance("transfer-process", monitor, executorInstrumentation, waitStrategy)
.processor(processTransfersInState(INITIAL, this::processInitial))
Expand Down Expand Up @@ -245,7 +254,7 @@ public void handleDeprovisionResult(String processId, List<StatusResult<Deprovis

private StatusResult<String> initiateRequest(TransferProcess.Type type, DataRequest dataRequest) {
// make the request idempotent: if the process exists, return
var processId = transferProcessStore.processIdForTransferId(dataRequest.getId());
var processId = transferProcessStore.processIdForDataRequestId(dataRequest.getId());
if (processId != null) {
return StatusResult.success(processId);
}
Expand Down Expand Up @@ -545,15 +554,6 @@ private void handleDeprovisionResponses(TransferProcess transferProcess, List<De
}
}

@NotNull
private static Result<Void> toFatalError(StatusResult<?> result) {
if (result.fatalError()) {
return Result.failure(result.getFailureMessages());
} else {
return Result.success();
}
}

@NotNull
private Result<Void> storeProvisionedSecrets(String transferProcessId, ProvisionResponse response) {
var resource = response.getResource();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -118,13 +118,11 @@ class TransferProcessManagerImplTest {
private final PolicyArchive policyArchive = mock(PolicyArchive.class);
private final DataFlowManager dataFlowManager = mock(DataFlowManager.class);
private final Vault vault = mock(Vault.class);
@SuppressWarnings("unchecked")
private final SendRetryManager<StatefulEntity<?>> sendRetryManager = mock(SendRetryManager.class);
private final TransferProcessListener listener = mock(TransferProcessListener.class);

private TransferProcessManagerImpl manager;

@SuppressWarnings("unchecked")
@BeforeEach
void setup() {
var observable = new TransferProcessObservableImpl();
Expand Down Expand Up @@ -156,7 +154,7 @@ void setup() {
*/
@Test
void verifyIdempotency() {
when(transferProcessStore.processIdForTransferId("1")).thenReturn(null, "2");
when(transferProcessStore.processIdForDataRequestId("1")).thenReturn(null, "2");
var dataRequest = DataRequest.Builder.newInstance().id("1").destinationType("test").build();

manager.start();
Expand All @@ -165,12 +163,12 @@ void verifyIdempotency() {
manager.stop();

verify(transferProcessStore, times(1)).create(isA(TransferProcess.class));
verify(transferProcessStore, times(2)).processIdForTransferId(anyString());
verify(transferProcessStore, times(2)).processIdForDataRequestId(anyString());
}

@Test
void verifyCreatedTimestamp() {
when(transferProcessStore.processIdForTransferId("1")).thenReturn(null, "2");
when(transferProcessStore.processIdForDataRequestId("1")).thenReturn(null, "2");
var dataRequest = DataRequest.Builder.newInstance().id("1").destinationType("test").build();

manager.start();
Expand Down Expand Up @@ -199,7 +197,7 @@ void initial_shouldTransitionToProvisioning() {
verify(transferProcessStore).update(argThat(p -> p.getState() == PROVISIONING.code()));
});
}

@Test
void initial_manifestEvaluationFailed_shouldTransitionToError() {
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
Expand All @@ -208,9 +206,9 @@ void initial_manifestEvaluationFailed_shouldTransitionToError() {
.thenReturn(emptyList());
when(manifestGenerator.generateConsumerResourceManifest(any(DataRequest.class), any(Policy.class)))
.thenReturn(Result.failure("error"));

manager.start();

await().untilAsserted(() -> {
verify(policyArchive, atLeastOnce()).findPolicyForContract(anyString());
verifyNoInteractions(provisionManager);
Expand Down Expand Up @@ -861,7 +859,7 @@ public static Builder newInstance() {
private static class TokenTestProvisionResource extends TestProvisionedDataDestinationResource {
TokenTestProvisionResource(String resourceName, String id) {
super(resourceName, id);
this.hasToken = true;
hasToken = true;
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ public TransferProcess find(String id) {
}

@Override
public @Nullable String processIdForTransferId(String id) {
public @Nullable String processIdForDataRequestId(String id) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ public TransferProcess find(String id) {
}

@Override
public @Nullable String processIdForTransferId(String id) {
public @Nullable String processIdForDataRequestId(String id) {
return null;
}

Expand Down
Loading