Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Extra configuration for HttpDataSink #1480 #1510

Merged
merged 20 commits into from
Jun 27, 2022
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ in the detailed section referring to by linking pull requests or issues.
* Add `IN` operator to all `AssetIndex` implementations (#322)
* Support IDS logical constraint transformations (#342)
* Add SQL persistence for contract definitions (#460) (#461)
* Extra configuration for HttpDataSink (#1480)

#### Changed

Expand Down
26 changes: 25 additions & 1 deletion extensions/data-plane/data-plane-http/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,28 @@ HTTP endpoint can receive data from any `DataSource` type. The extension is desi
consumption under load.

Note that Azure Object Storage or S3 extensions should be preferred to the current extensions when performing large data
transfers as support more scalable parallelization.
transfers as support more scalable parallelization.

# Configuration

## Configuration properties

* edc.dataplane.http.sink.partition.size - Sets the number o parallel partions for the sink (default set to 5).

## Data properties

see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java)

* type - The HTTP transfer type is "HttpData".
* endpoint - The http endpoint.
* name - The name associated with the HTTP data, typically a filename (optional).
* authKey - The authentication key property name (optional).
* authCode - The authentication code property name (optional).
* secretName - The name of the vault secret that is containing the authorization code (optional).
* proxyBody - If set to true the body of the actual request will be used to retrieve data from this address.
* proxyPath - If set to true the path of the actual request will be used to retrieve data from this address.
* proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address.
* proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address.
* httpVerb - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST.
* usePartName - Use partition name when sending to endpoint. When set to true (default) - it appends the name of the part to the sink endpoint.
* additionalHeaders - The additional headers to use as json string e.g. ```"additionalHeaders" : "{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}"```.
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Microsoft Corporation - initial API and implementation
* Siemens AG - changes to make it compatible with AWS S3, Azure blob and ALI Object Storage presigned URL for upload
*
*/

Expand All @@ -20,6 +21,7 @@
import org.eclipse.dataspaceconnector.dataplane.http.pipeline.HttpDataSourceFactory;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.dataspaceconnector.spi.EdcSetting;
import org.eclipse.dataspaceconnector.spi.security.Vault;
import org.eclipse.dataspaceconnector.spi.system.Inject;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
Expand All @@ -46,6 +48,9 @@ public class DataPlaneHttpExtension implements ServiceExtension {
@Inject
private Vault vault;

@EdcSetting
private static final String EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE = "edc.dataplane.http.sink.partition.size";

@Override
public String name() {
return "Data Plane HTTP";
Expand All @@ -54,11 +59,12 @@ public String name() {
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
var sinkPartitionSize = Integer.valueOf(context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, "5"));

@SuppressWarnings("unchecked") var sourceFactory = new HttpDataSourceFactory(httpClient, retryPolicy, monitor, vault);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), 5, monitor);
var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor);
pipelineService.registerFactory(sinkFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,19 +9,26 @@
*
* Contributors:
* Microsoft Corporation - initial API and implementation
* Siemens AG - changes to make it compatible with AWS S3, Azure blob and ALI Object Storage presigned URL for upload
*
*/

package org.eclipse.dataspaceconnector.dataplane.http.pipeline;

import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.ParallelSink;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;

import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;

import static java.lang.String.format;
import static org.eclipse.dataspaceconnector.spi.response.ResponseStatus.ERROR_RETRY;
Expand All @@ -30,33 +37,45 @@
* Writes data in a streaming fashion to an HTTP endpoint.
*/
public class HttpDataSink extends ParallelSink {
private static final StatusResult ERROR_WRITING_DATA = StatusResult.failure(ERROR_RETRY, "Error writing data");

private String authKey;
private String authCode;
private String endpoint;
private boolean usePartName = true;
private OkHttpClient httpClient;
private Map<String, String> additionalHeaders = new HashMap<>();
private HttpDataSinkRequest requestBuilder = new HttpDataSinkRequestPost();

/**
* Sends the parts to the destination endpoint using an HTTP POST.
*/
@Override
protected StatusResult<Void> transferParts(List<DataSource.Part> parts) {
for (DataSource.Part part : parts) {
var requestBody = new StreamingRequestBody(part);

var requestBuilder = new Request.Builder();
if (authKey != null) {
requestBuilder.header(authKey, authCode);
}

var request = requestBuilder.url(endpoint).post(requestBody).build();
if (additionalHeaders != null) {
additionalHeaders.forEach(requestBuilder::header);
}

var request = this.requestBuilder.makeRequestForPart(requestBuilder, part)
.orElseThrow(() -> new IllegalStateException("Failed to build a request"));

try (var response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
monitor.severe(format("Error {%s: %s} received writing HTTP data %s to endpoint %s for request: %s", response.code(), response.message(), part.name(), endpoint, request));
return StatusResult.failure(ERROR_RETRY, "Error writing data");
return ERROR_WRITING_DATA;
}

return StatusResult.success();
} catch (Exception e) {
monitor.severe(format("Error writing HTTP data %s to endpoint %s for request: %s", part.name(), endpoint, request), e);
return StatusResult.failure(ERROR_RETRY, "Error writing data");
return ERROR_WRITING_DATA;
}
}
return StatusResult.success();
Expand All @@ -65,6 +84,46 @@ protected StatusResult<Void> transferParts(List<DataSource.Part> parts) {
private HttpDataSink() {
}

private interface HttpDataSinkRequest {
Optional<Request> makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part);
}

private class HttpDataSinkRequestPut implements HttpDataSinkRequest {
@Override
public Optional<Request> makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part) {
RequestBody body;

try (InputStream stream = part.openStream()) {
body = RequestBody.create(stream.readAllBytes());
} catch (IOException e) {
monitor.severe(format("Error reading bytes for HTTP part data %s", part.name()));
return Optional.empty();
}

return Optional.of(
requestBuilder
.url(makeUrl(part))
.put(body)
.build());
}
}

private class HttpDataSinkRequestPost implements HttpDataSinkRequest {
@Override
public Optional<Request> makeRequestForPart(final Request.Builder requestBuilder, final DataSource.Part part) {
var requestBody = new StreamingRequestBody(part);
return Optional.of(
requestBuilder
.url(makeUrl(part))
.post(requestBody)
.build());
}
}

private String makeUrl(DataSource.Part part) {
return usePartName ? endpoint + "/" + part.name() : endpoint;
}

public static class Builder extends ParallelSink.Builder<Builder, HttpDataSink> {

public static Builder newInstance() {
Expand All @@ -91,6 +150,25 @@ public Builder httpClient(OkHttpClient httpClient) {
return this;
}

public Builder additionalHeaders(Map<String, String> additionalHeaders) {
sink.additionalHeaders = additionalHeaders;
return this;
}

public Builder usePartName(boolean usePartName) {
sink.usePartName = usePartName;
return this;
}

public Builder httpVerb(String httpVerb) {
sink.requestBuilder = makeSender(httpVerb);
return this;
}

private HttpDataSinkRequest makeSender(String httpVerb) {
return "PUT".equalsIgnoreCase(httpVerb) ? sink.new HttpDataSinkRequestPut() : sink.new HttpDataSinkRequestPost();
}

protected void validate() {
Objects.requireNonNull(sink.endpoint, "endpoint");
Objects.requireNonNull(sink.httpClient, "httpClient");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ private Result<DataSink> createDataSink(DataFlowRequest request) {
}
var authKey = dataAddress.getAuthKey();
var authCode = dataAddress.getAuthCode();
var httpVerb = dataAddress.getHttpVerb();
var usePartName = dataAddress.isUsePartName();
var additionalHeaders = dataAddress.getAdditionalHeaders();

var sink = HttpDataSink.Builder.newInstance()
.endpoint(baseUrl)
Expand All @@ -83,6 +86,9 @@ private Result<DataSink> createDataSink(DataFlowRequest request) {
.authKey(authKey)
.authCode(authCode)
.httpClient(httpClient)
.usePartName(usePartName)
.httpVerb(httpVerb)
.additionalHeaders(additionalHeaders)
.executorService(executorService)
.monitor(monitor)
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,59 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx
verify(call).execute();
}

@Test
void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOException {
var dataAddress = HttpDataAddress.Builder.newInstance()
.baseUrl("http://example.com")
.httpVerb("PUT")
.build();

var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build();

var call = mock(Call.class);
when(call.execute()).thenReturn(createHttpResponse().build());

when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> {
assertThat(((Request) r.getArgument(0)).method()).isEqualTo("PUT"); // verify verb PUT
return call;
});

var sink = factory.createSink(validRequest);

var result = sink.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("test".getBytes()))).get();

assertThat(result.failed()).isFalse();

verify(call).execute();
}

@Test
void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionException, IOException {
var dataAddress = HttpDataAddress.Builder.newInstance()
.baseUrl("http://example.com")
.additionalHeaders("{\"Content-Type\" : \"application/test-octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}")
.build();

var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build();

var call = mock(Call.class);
when(call.execute()).thenReturn(createHttpResponse().build());

when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> {
assertThat(((Request) r.getArgument(0)).headers("Content-Type").get(0)).isEqualTo("application/test-octet-stream"); // verify Content-Type
assertThat(((Request) r.getArgument(0)).headers("x-ms-blob-type").get(0)).isEqualTo("BlockBlob"); // verify x-ms-blob-type
return call;
});

var sink = factory.createSink(validRequest);

var result = sink.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("test".getBytes()))).get();

assertThat(result.failed()).isFalse();

verify(call).execute();
}

@BeforeEach
void setUp() {
httpClient = mock(OkHttpClient.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
*
* Contributors:
* Microsoft Corporation - initial API and implementation
* Siemens AG - enable read property and return a default value is missing
*
*/

Expand Down Expand Up @@ -56,6 +57,14 @@ public String getProperty(String key) {
return properties.get(key);
}

public String getProperty(String key, String defaultValue) {
if (properties.containsKey(key)) {
return properties.get(key);
}

return defaultValue;
}

public Map<String, String> getProperties() {
return properties;
}
Expand Down
Loading