Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

dpf: Harmonize settings naming in data-plane-transfer #1164

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ in the detailed section referring to by linking pull requests or issues.
* Provisioned resource information in Data Management API (#1221)
* Add custom Jackson (de)serializer for `XMLGregorianCalendar` (#1226)
* Add contract validation rule (#1239)
* Harmonize setting names in `data-plane-transfer` (#1164)

#### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ public void register(DataFlowController controller) {

@WithSpan
@Override
public @NotNull StatusResult<String> initiate(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
public @NotNull StatusResult<Void> initiate(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
try {
return controllers.stream()
.filter(controller -> controller.canHandle(dataRequest, contentAddress))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public boolean canHandle(DataRequest dataRequest, DataAddress contentAddress) {
}

@Override
public @NotNull StatusResult<String> initiateFlow(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
public @NotNull StatusResult<Void> initiateFlow(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
var destinationType = dataRequest.getDestinationType();
monitor.info(format("Copying data from %s to %s", contentAddress.getType(), destinationType));

Expand Down Expand Up @@ -83,6 +83,6 @@ public boolean canHandle(DataRequest dataRequest, DataAddress contentAddress) {
}
}

return StatusResult.success("Inline data flow successful");
return StatusResult.success();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.transfer.core.flow;

import com.github.javafaker.Faker;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.EdcException;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
Expand All @@ -30,16 +31,18 @@

class DataFlowManagerImplTest {

private static final Faker FAKER = new Faker();

@Test
void should_initiate_flow_on_correct_controller() {
var manager = new DataFlowManagerImpl();
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("type").build();
var dataRequest = DataRequest.Builder.newInstance().destinationType(FAKER.lorem().word()).build();
var policy = Policy.Builder.newInstance().build();
var dataAddress = DataAddress.Builder.newInstance().type("test").build();
var dataAddress = DataAddress.Builder.newInstance().type(FAKER.lorem().word()).build();

when(controller.canHandle(any(), any())).thenReturn(true);
when(controller.initiateFlow(any(), any(), any())).thenReturn(StatusResult.success("success"));
when(controller.initiateFlow(any(), any(), any())).thenReturn(StatusResult.success());
manager.register(controller);

var response = manager.initiate(dataRequest, dataAddress, policy);
Expand All @@ -51,8 +54,8 @@ void should_initiate_flow_on_correct_controller() {
void should_return_fatal_error_if_no_controller_can_handle_the_request() {
var manager = new DataFlowManagerImpl();
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test").build();
var dataRequest = DataRequest.Builder.newInstance().destinationType(FAKER.lorem().word()).build();
var dataAddress = DataAddress.Builder.newInstance().type(FAKER.lorem().word()).build();
var policy = Policy.Builder.newInstance().build();

when(controller.canHandle(any(), any())).thenReturn(false);
Expand All @@ -68,18 +71,19 @@ void should_return_fatal_error_if_no_controller_can_handle_the_request() {
void should_catch_exceptions_and_return_fatal_error() {
var manager = new DataFlowManagerImpl();
var controller = mock(DataFlowController.class);
var dataRequest = DataRequest.Builder.newInstance().destinationType("type").build();
var dataAddress = DataAddress.Builder.newInstance().type("test").build();
var dataRequest = DataRequest.Builder.newInstance().destinationType(FAKER.lorem().word()).build();
var dataAddress = DataAddress.Builder.newInstance().type(FAKER.lorem().word()).build();
var policy = Policy.Builder.newInstance().build();

var errorMsg = FAKER.lorem().sentence();
when(controller.canHandle(any(), any())).thenReturn(true);
when(controller.initiateFlow(any(), any(), any())).thenThrow(new EdcException("error"));
when(controller.initiateFlow(any(), any(), any())).thenThrow(new EdcException(errorMsg));
manager.register(controller);

var response = manager.initiate(dataRequest, dataAddress, policy);

assertThat(response.succeeded()).isFalse();
assertThat(response.getFailure().status()).isEqualTo(FATAL_ERROR);
assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains("error"));
assertThat(response.getFailureMessages()).hasSize(1).first().matches(message -> message.contains(errorMsg));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -326,7 +326,7 @@ void provisionedProvider_shouldTransitionToInProgress() throws InterruptedExcept
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
when(policyArchive.findPolicyForContract(anyString())).thenReturn(Policy.Builder.newInstance().build());
when(transferProcessStore.nextForState(eq(PROVISIONED.code()), anyInt())).thenReturn(List.of(process)).thenReturn(emptyList());
when(dataFlowManager.initiate(any(), any(), any())).thenReturn(StatusResult.success("any"));
when(dataFlowManager.initiate(any(), any(), any())).thenReturn(StatusResult.success());
var latch = countDownOnUpdateLatch();

manager.start();
Expand Down
13 changes: 13 additions & 0 deletions extensions/data-plane-transfer/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
# Data Plane Transfer extension

This extension provides resources used to delegate data transfer to the Data Plane, or to use the Data Plane as a proxy for querying the data.

The setting parameters of this extension are listed below:

| Parameter name | Description | Mandatory | Default value |
|:----------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------|:----------|:---------------------------------------|
| `edc.transfer.proxy.token.signer.privatekey.alias` | Alias of private key used to sign token used to hit Data Plane public API | true | |
| `edc.transfer.proxy.token.verifier.publickey.alias` | Alias of public key used to verify tokens hitting the Data Plane public API (public key must be in the Vault) | false | private key alias suffixed with "-pub" |
| `edc.transfer.proxy.token.validity.seconds` | Validity of tokens generated for hitting Data Plane public API (in seconds) | false | 600 |
| `edc.transfer.proxy.endpoint` | Public API endpoint of the Data Plane (TODO: this has to be dynamically generated by the Data Plane Selector) based on the request | true | |
| `edc.transfer.client.selector.strategy` | Selection strategy used by the client to determine to which Data Plane instance data transfer should be delegated | true | |
Original file line number Diff line number Diff line change
Expand Up @@ -16,21 +16,16 @@ plugins {
`java-library`
}

val nimbusVersion: String by project

dependencies {
api(project(":spi:core-spi"))
api(project(":spi:transfer-spi"))
api(project(":extensions:data-plane-transfer:data-plane-transfer-spi"))
api(project(":common:token-generation-lib"))

api("com.nimbusds:nimbus-jose-jwt:${nimbusVersion}")
api(project(":extensions:data-plane-transfer:data-plane-transfer-sync"))
api(project(":extensions:data-plane-transfer:data-plane-transfer-client"))
}

publishing {
publications {
create<MavenPublication>("data-plane-transfer-core") {
artifactId = "data-plane-transfer-core"
create<MavenPublication>("data-plane-transfer") {
artifactId = "data-plane-transfer"
from(components["java"])
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ val faker: String by project

dependencies {
api(project(":spi:core-spi"))
api(project(":spi:transfer-spi"))
api(project(":extensions:data-plane:data-plane-spi"))
api(project(":extensions:data-plane-transfer:data-plane-transfer-spi"))
api(project(":extensions:data-plane-selector:selector-spi"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,10 +24,10 @@
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
import org.eclipse.dataspaceconnector.spi.transfer.flow.DataFlowManager;
import org.eclipse.dataspaceconnector.transfer.dataplane.client.DataPlaneTransferClient;
import org.eclipse.dataspaceconnector.transfer.dataplane.client.EmbeddedDataPlaneTransferClient;
import org.eclipse.dataspaceconnector.transfer.dataplane.client.RemoteDataPlaneTransferClient;
import org.eclipse.dataspaceconnector.transfer.dataplane.flow.DataPlaneTransferFlowController;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;

import java.util.Objects;

Expand All @@ -37,7 +37,7 @@
public class DataPlaneTransferClientExtension implements ServiceExtension {

@EdcSetting
private static final String DPF_SELECTOR_STRATEGY = "edc.dpf.selector.strategy";
private static final String DPF_SELECTOR_STRATEGY = "edc.transfer.client.selector.strategy";

@Inject(required = false)
private DataPlaneSelectorClient selectorClient;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import org.eclipse.dataspaceconnector.spi.response.ResponseStatus;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;

/**
* Implementation of a {@link DataPlaneTransferClient} that uses a local {@link org.eclipse.dataspaceconnector.dataplane.spi.manager.DataPlaneManager},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.eclipse.dataspaceconnector.spi.response.ResponseStatus;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;
import org.jetbrains.annotations.NotNull;

import java.io.IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,19 +22,18 @@
import org.eclipse.dataspaceconnector.spi.types.domain.DataAddress;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataRequest;
import org.eclipse.dataspaceconnector.transfer.dataplane.client.DataPlaneTransferClient;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;
import org.jetbrains.annotations.NotNull;

import java.util.UUID;

import static java.lang.String.join;
import static org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType.SYNC;
import static org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType.HTTP_PROXY;

/**
* Implementation of {@link DataFlowController} that delegates data transfer to Data Plane instance.
* Note that Data Plane can be embedded in current runtime (test, samples...) or accessed remotely.
* The present {@link DataFlowController} is triggered when destination type in the {@link DataRequest} is different from
* {@link org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType#SYNC}, as this one is reserved for synchronous data transfers.
* {@link org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType#HTTP_PROXY}, as this one is reserved for synchronous data transfers.
*/
public class DataPlaneTransferFlowController implements DataFlowController {
private final DataPlaneTransferClient client;
Expand All @@ -47,20 +46,20 @@ public DataPlaneTransferFlowController(DataPlaneTransferClient client) {
public boolean canHandle(DataRequest dataRequest, DataAddress contentAddress) {
var type = dataRequest.getDestinationType();
if (!StringUtils.isNullOrBlank(type)) {
return !SYNC.equals(dataRequest.getDestinationType());
return !HTTP_PROXY.equals(dataRequest.getDestinationType());
}
return false;
}

@Override
public @NotNull StatusResult<String> initiateFlow(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
public @NotNull StatusResult<Void> initiateFlow(DataRequest dataRequest, DataAddress contentAddress, Policy policy) {
var dataFlowRequest = createRequest(dataRequest, contentAddress);
var result = client.transfer(dataFlowRequest);
if (result.failed()) {
return StatusResult.failure(ResponseStatus.FATAL_ERROR,
"Failed to delegate data transfer to Data Plane: " + join(", ", result.getFailureMessages()));
"Failed to delegate data transfer to Data Plane: " + String.join(", ", result.getFailureMessages()));
}
return StatusResult.success("");
return StatusResult.success();
}

private DataFlowRequest createRequest(DataRequest dataRequest, DataAddress sourceAddress) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import com.github.javafaker.Faker;
import org.eclipse.dataspaceconnector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand All @@ -26,7 +27,6 @@
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

Expand All @@ -52,7 +52,7 @@ void validationFailure_shouldReturnFailedResult() {

var result = client.transfer(request);

verify(dataPlaneManagerMock, times(1)).validate(request);
verify(dataPlaneManagerMock).validate(request);
verify(dataPlaneManagerMock, never()).initiateTransfer(any());

assertThat(result.failed()).isTrue();
Expand All @@ -69,8 +69,8 @@ void transferSuccess() {

var result = client.transfer(request);

verify(dataPlaneManagerMock, times(1)).validate(request);
verify(dataPlaneManagerMock, times(1)).initiateTransfer(request);
verify(dataPlaneManagerMock).validate(request);
verify(dataPlaneManagerMock).initiateTransfer(request);

assertThat(result.succeeded()).isTrue();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.dataspaceconnector.dataplane.selector.instance.DataPlaneInstance;
import org.eclipse.dataspaceconnector.dataplane.spi.response.TransferErrorResponse;
import org.eclipse.dataspaceconnector.spi.response.ResponseStatus;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,13 @@
import org.eclipse.dataspaceconnector.spi.types.domain.DataAddress;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataRequest;
import org.eclipse.dataspaceconnector.transfer.dataplane.client.DataPlaneTransferClient;
import org.eclipse.dataspaceconnector.transfer.dataplane.spi.client.DataPlaneTransferClient;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.ArgumentCaptor;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType.SYNC;
import static org.eclipse.dataspaceconnector.transfer.dataplane.spi.DataPlaneTransferType.HTTP_PROXY;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
Expand All @@ -50,9 +50,9 @@ public void setUp() {

@Test
void canHandle() {
var contentAddress = DataAddress.Builder.newInstance().type(SYNC).build();
var contentAddress = DataAddress.Builder.newInstance().type(HTTP_PROXY).build();
assertThat(flowController.canHandle(createDataRequest(), contentAddress)).isTrue();
assertThat(flowController.canHandle(createDataRequest(SYNC), contentAddress)).isFalse();
assertThat(flowController.canHandle(createDataRequest(HTTP_PROXY), contentAddress)).isFalse();
}

@Test
Expand Down
Loading