From 46e884e7b1bd20fea9323cf0f4f081ff464dd188 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 06:14:53 +0300 Subject: [PATCH 01/19] feat: Extra configuration for HttpDataSink #1480 --- CHANGELOG.md | 1 + .../data-plane/data-plane-http/README.md | 26 +++++- .../http/DataPlaneHttpExtension.java | 9 +- .../dataplane/http/pipeline/HttpDataSink.java | 86 ++++++++++++++++++- .../http/pipeline/HttpDataSinkFactory.java | 6 ++ .../pipeline/HttpDataSinkFactoryTest.java | 53 ++++++++++++ .../spi/types/domain/DataAddress.java | 9 ++ .../spi/types/domain/HttpDataAddress.java | 47 ++++++++++ 8 files changed, 231 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 9151385316d..8b753fe9101 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -370,6 +370,7 @@ in the detailed section referring to by linking pull requests or issues. * Add `IN` operator to all `AssetIndex` implementations (#322) * Support IDS logical constraint transformations (#342) * Add SQL persistence for contract definitions (#460) (#461) +* Extra configuration for HttpDataSink (#1480) #### Changed diff --git a/extensions/data-plane/data-plane-http/README.md b/extensions/data-plane/data-plane-http/README.md index 260dd9cddc7..94fd48e1faf 100644 --- a/extensions/data-plane/data-plane-http/README.md +++ b/extensions/data-plane/data-plane-http/README.md @@ -6,4 +6,28 @@ HTTP endpoint can receive data from any `DataSource` type. The extension is desi consumption under load. Note that Azure Object Storage or S3 extensions should be preferred to the current extensions when performing large data -transfers as support more scalable parallelization. \ No newline at end of file +transfers as support more scalable parallelization. + +# Configuration + +## Configuration properties + +* edc.dataplane.http.sink.partition.size - Sets the number o parallel partions for the sink (default set to 5). + +## Data properties + +see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java) + +* type - The HTTP transfer type is "HttpData". +* endpoint - The http endpoint. +* name - The name associated with the HTTP data, typically a filename (optional). +* authKey - The authentication key property name (optional). +* authCode - The authentication code property name (optional). +* secretName - The name of the vault secret that is containing the authorization code (optional). +* proxyBody - If set to true the body of the actual request will be used to retrieve data from this address. +* proxyPath - If set to true the path of the actual request will be used to retrieve data from this address. +* proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address. +* proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address. +* httpVerb - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST. +* usePartName - Use partition name when sending to endpoint. When set to true (default) - it appends the name of the part to the sink endpoint. +* additionalHeaders - The additional headers to use as json string e.g. ```"additionalHeaders" : "{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}"```. \ No newline at end of file diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java index b80942ce592..85741020410 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java @@ -9,17 +9,20 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Siemens AG - changes to make it compatible with AWS S3, Azure blob and AWS China S3 presigned URL for upload * */ package org.eclipse.dataspaceconnector.dataplane.http; +import com.fasterxml.jackson.databind.ObjectMapper; import net.jodah.failsafe.RetryPolicy; import okhttp3.OkHttpClient; import org.eclipse.dataspaceconnector.dataplane.http.pipeline.HttpDataSinkFactory; import org.eclipse.dataspaceconnector.dataplane.http.pipeline.HttpDataSourceFactory; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.PipelineService; +import org.eclipse.dataspaceconnector.spi.EdcSetting; import org.eclipse.dataspaceconnector.spi.security.Vault; import org.eclipse.dataspaceconnector.spi.system.Inject; import org.eclipse.dataspaceconnector.spi.system.ServiceExtension; @@ -46,6 +49,9 @@ public class DataPlaneHttpExtension implements ServiceExtension { @Inject private Vault vault; + @EdcSetting + private static final String EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE = "edc.dataplane.http.sink.partition.size"; + @Override public String name() { return "Data Plane HTTP"; @@ -54,11 +60,12 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { var monitor = context.getMonitor(); + var sinkPartitionSize = Integer.valueOf(context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, "5")); @SuppressWarnings("unchecked") var sourceFactory = new HttpDataSourceFactory(httpClient, retryPolicy, monitor, vault); pipelineService.registerFactory(sourceFactory); - var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), 5, monitor); + var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor); pipelineService.registerFactory(sinkFactory); } } diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index b2c581ad540..1ec2ae955bb 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -9,6 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Siemens AG - changes to make it compatible with AWS S3, Azure blob and AWS China S3 presigned URL for upload * */ @@ -16,12 +17,18 @@ import okhttp3.OkHttpClient; import okhttp3.Request; +import okhttp3.RequestBody; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.ParallelSink; import org.eclipse.dataspaceconnector.spi.response.StatusResult; +import java.io.IOException; +import java.io.InputStream; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Objects; +import java.util.Optional; import static java.lang.String.format; import static org.eclipse.dataspaceconnector.spi.response.ResponseStatus.ERROR_RETRY; @@ -30,10 +37,15 @@ * Writes data in a streaming fashion to an HTTP endpoint. */ public class HttpDataSink extends ParallelSink { + private static final StatusResult ERROR_WRITING_DATA = StatusResult.failure(ERROR_RETRY, "Error writing data"); + private String authKey; private String authCode; private String endpoint; + private boolean usePartName = true; private OkHttpClient httpClient; + private Map additionalHeaders = new HashMap<>(); + private HttpDataSinkRequest requestBuilder = new HttpDataSinkRequestPost(); /** * Sends the parts to the destination endpoint using an HTTP POST. @@ -41,22 +53,29 @@ public class HttpDataSink extends ParallelSink { @Override protected StatusResult transferParts(List parts) { for (DataSource.Part part : parts) { - var requestBody = new StreamingRequestBody(part); var requestBuilder = new Request.Builder(); if (authKey != null) { requestBuilder.header(authKey, authCode); } - var request = requestBuilder.url(endpoint).post(requestBody).build(); + if (additionalHeaders != null) { + additionalHeaders.forEach(requestBuilder::header); + } + + var request = this.requestBuilder.makeRequestForPart(requestBuilder, part) + .orElseThrow(() -> new IllegalStateException("Failed to build a request")); + try (var response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { monitor.severe(format("Error {%s: %s} received writing HTTP data %s to endpoint %s for request: %s", response.code(), response.message(), part.name(), endpoint, request)); - return StatusResult.failure(ERROR_RETRY, "Error writing data"); + return ERROR_WRITING_DATA; } + + return StatusResult.success(); } catch (Exception e) { monitor.severe(format("Error writing HTTP data %s to endpoint %s for request: %s", part.name(), endpoint, request), e); - return StatusResult.failure(ERROR_RETRY, "Error writing data"); + return ERROR_WRITING_DATA; } } return StatusResult.success(); @@ -65,6 +84,46 @@ protected StatusResult transferParts(List parts) { private HttpDataSink() { } + private interface HttpDataSinkRequest { + Optional makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part); + } + + private class HttpDataSinkRequestPut implements HttpDataSinkRequest { + @Override + public Optional makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part) { + RequestBody body; + + try (InputStream stream = part.openStream()) { + body = RequestBody.create(stream.readAllBytes()); + } catch (IOException e) { + monitor.severe(format("Error reading bytes for HTTP part data %s", part.name())); + return Optional.empty(); + } + + return Optional.of( + requestBuilder + .url(makeUrl(part)) + .put(body) + .build()); + } + } + + private class HttpDataSinkRequestPost implements HttpDataSinkRequest { + @Override + public Optional makeRequestForPart(final Request.Builder requestBuilder, final DataSource.Part part) { + var requestBody = new StreamingRequestBody(part); + return Optional.of( + requestBuilder + .url(makeUrl(part)) + .post(requestBody) + .build()); + } + } + + private String makeUrl(DataSource.Part part) { + return usePartName ? endpoint + "/" + part.name() : endpoint; + } + public static class Builder extends ParallelSink.Builder { public static Builder newInstance() { @@ -91,6 +150,25 @@ public Builder httpClient(OkHttpClient httpClient) { return this; } + public Builder additionalHeaders(Map additionalHeaders) { + sink.additionalHeaders = additionalHeaders; + return this; + } + + public Builder usePartName(boolean usePartName) { + sink.usePartName = usePartName; + return this; + } + + public Builder httpVerb(String httpVerb) { + sink.requestBuilder = makeSender(httpVerb); + return this; + } + + private HttpDataSinkRequest makeSender(String httpVerb) { + return "PUT".equalsIgnoreCase(httpVerb) ? sink.new HttpDataSinkRequestPut() : sink.new HttpDataSinkRequestPost(); + } + protected void validate() { Objects.requireNonNull(sink.endpoint, "endpoint"); Objects.requireNonNull(sink.httpClient, "httpClient"); diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index 138f562de11..6e6e7fa342a 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -75,6 +75,9 @@ private Result createDataSink(DataFlowRequest request) { } var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); + var httpVerb = dataAddress.getHttpVerb(); + var usePartName = dataAddress.isUsePartName(); + var additionalHeaders = dataAddress.getAdditionalHeaders(); var sink = HttpDataSink.Builder.newInstance() .endpoint(baseUrl) @@ -83,6 +86,9 @@ private Result createDataSink(DataFlowRequest request) { .authKey(authKey) .authCode(authCode) .httpClient(httpClient) + .usePartName(usePartName) + .httpVerb(httpVerb) + .additionalHeaders(additionalHeaders) .executorService(executorService) .monitor(monitor) .build(); diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index c81093ceb55..edbc63097d1 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -110,6 +110,59 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx verify(call).execute(); } + @Test + void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOException { + var dataAddress = HttpDataAddress.Builder.newInstance() + .baseUrl("http://example.com") + .httpVerb("PUT") + .build(); + + var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); + + var call = mock(Call.class); + when(call.execute()).thenReturn(createHttpResponse().build()); + + when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> { + assertThat(((Request) r.getArgument(0)).method()).isEqualTo("PUT"); // verify verb PUT + return call; + }); + + var sink = factory.createSink(validRequest); + + var result = sink.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("test".getBytes()))).get(); + + assertThat(result.failed()).isFalse(); + + verify(call).execute(); + } + + @Test + void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionException, IOException { + var dataAddress = HttpDataAddress.Builder.newInstance() + .baseUrl("http://example.com") + .additionalHeaders("{\"Content-Type\" : \"application/test-octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") + .build(); + + var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); + + var call = mock(Call.class); + when(call.execute()).thenReturn(createHttpResponse().build()); + + when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> { + assertThat(((Request) r.getArgument(0)).headers("Content-Type").get(0)).isEqualTo("application/test-octet-stream"); // verify Content-Type + assertThat(((Request) r.getArgument(0)).headers("x-ms-blob-type").get(0)).isEqualTo("BlockBlob"); // verify x-ms-blob-type + return call; + }); + + var sink = factory.createSink(validRequest); + + var result = sink.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("test".getBytes()))).get(); + + assertThat(result.failed()).isFalse(); + + verify(call).execute(); + } + @BeforeEach void setUp() { httpClient = mock(OkHttpClient.class); diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddress.java index a91a6166327..d4b8bafa2ba 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddress.java @@ -9,6 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Siemens AG - enable read property and return a default value is missing * */ @@ -56,6 +57,14 @@ public String getProperty(String key) { return properties.get(key); } + public String getProperty(String key, String defaultValue) { + if (properties.containsKey(key)) { + return properties.get(key); + } + + return defaultValue; + } + public Map getProperties() { return properties; } diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index e7100730c6c..d74b554d9e5 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -17,8 +17,14 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeName; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; +import org.eclipse.dataspaceconnector.common.string.StringUtils; + +import java.util.HashMap; +import java.util.Map; /** * This is a wrapper class for the {@link DataAddress} object, which has typed accessors for properties specific to @@ -39,6 +45,9 @@ public class HttpDataAddress extends DataAddress { private static final String PROXY_PATH = "proxyPath"; private static final String PROXY_QUERY_PARAMS = "proxyQueryParams"; private static final String PROXY_METHOD = "proxyMethod"; + private static final String HTTP_VERB = "httpVerb"; + private static final String USE_PART_NAME = "usePartName"; + private static final String ADDITIONAL_HEADERS = "additionalHeaders"; private HttpDataAddress() { super(); @@ -89,6 +98,29 @@ public String getProxyMethod() { return getProperty(PROXY_METHOD); } + @JsonIgnore + public String getHttpVerb() { + return getProperty(HTTP_VERB, "POST"); + } + + @JsonIgnore + public boolean isUsePartName() { + return Boolean.parseBoolean(getProperty(USE_PART_NAME, Boolean.TRUE.toString())); + } + + @JsonIgnore + public Map getAdditionalHeaders() { + return convertAdditionalHeaders(getProperty(ADDITIONAL_HEADERS, "{}")); + } + + private Map convertAdditionalHeaders(String additionalHeaders) { + try { + return new ObjectMapper().readValue(StringUtils.isNullOrBlank(additionalHeaders) ? "" : additionalHeaders, Map.class); + } catch (JsonProcessingException e) { + return new HashMap<>(); + } + } + @JsonPOJOBuilder(withPrefix = "") public static final class Builder extends DataAddress.Builder { @@ -147,6 +179,21 @@ public Builder proxyMethod(String proxyMethod) { return this; } + public Builder httpVerb(String httpVerb) { + this.property(HTTP_VERB, httpVerb); + return this; + } + + public Builder usePartName(String usePartName) { + this.property(USE_PART_NAME, usePartName); + return this; + } + + public Builder additionalHeaders(String additionalHeaders) { + this.property(ADDITIONAL_HEADERS, additionalHeaders); + return this; + } + public Builder copyFrom(DataAddress other) { other.getProperties().forEach(this::property); return this; From b31b7c4400e0936edfe552bcb3740743f75b243f Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 06:27:36 +0300 Subject: [PATCH 02/19] cleanup --- .../dataplane/http/DataPlaneHttpExtension.java | 3 +-- .../dataplane/http/pipeline/HttpDataSink.java | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java index 85741020410..2dc0787f3aa 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java @@ -9,13 +9,12 @@ * * Contributors: * Microsoft Corporation - initial API and implementation - * Siemens AG - changes to make it compatible with AWS S3, Azure blob and AWS China S3 presigned URL for upload + * Siemens AG - changes to make it compatible with AWS S3, Azure blob and ALI Object Storage presigned URL for upload * */ package org.eclipse.dataspaceconnector.dataplane.http; -import com.fasterxml.jackson.databind.ObjectMapper; import net.jodah.failsafe.RetryPolicy; import okhttp3.OkHttpClient; import org.eclipse.dataspaceconnector.dataplane.http.pipeline.HttpDataSinkFactory; diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index 1ec2ae955bb..d4343fcd4b0 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -9,7 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation - * Siemens AG - changes to make it compatible with AWS S3, Azure blob and AWS China S3 presigned URL for upload + * Siemens AG - changes to make it compatible with AWS S3, Azure blob and ALI Object Storage presigned URL for upload * */ From 422d6a745c296604a705e5e95aa40fbfe231dfc5 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 06:33:29 +0300 Subject: [PATCH 03/19] update header --- .../dataspaceconnector/spi/types/domain/HttpDataAddress.java | 1 + 1 file changed, 1 insertion(+) diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index d74b554d9e5..d09928b9e42 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -9,6 +9,7 @@ * * Contributors: * Amadeus - Initial implementation + * Siemens AG - added httpVerb, usePartName and additionalHeaders * */ From c9c78de6fe95aac13a9d0ead38db7f035091e5bd Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 10:44:52 +0300 Subject: [PATCH 04/19] add more tests --- .../spi/types/domain/DataAddressTest.java | 12 +++++ .../spi/types/domain/HttpDataAddressTest.java | 54 +++++++++++++++++++ 2 files changed, 66 insertions(+) create mode 100644 spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java index 460b41c85e8..2f162865342 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java @@ -60,4 +60,16 @@ void verifyNullKeyThrowsException() { .hasMessageContaining("Property key null."); } + @Test + void verifyGetDefaultPropertyValue() { + assertEquals("defaultValue", DataAddress.Builder.newInstance().type("sometype").build() + .getProperty("missing","defaultValue")); + } + + @Test + void verifyGetExistingPropertyValue() { + assertEquals("existingValue", DataAddress.Builder.newInstance().type("sometype") + .property("existing", "existingValue") + .build().getProperty("existing","defaultValue")); + } } diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java new file mode 100644 index 00000000000..f4828006344 --- /dev/null +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -0,0 +1,54 @@ +package org.eclipse.dataspaceconnector.spi.types.domain; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +class HttpDataAddressTest { + + @Test + void verifyGetProperties() { + HttpDataAddress dataAddress = HttpDataAddress.Builder.newInstance() + .name("name1") + .baseUrl("http://myendpoint") + .authCode("secret") + .authKey("myKey") + .secretName("mysecret") + .httpVerb("PUT") + .usePartName("false") + .additionalHeaders("{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") + .proxyBody("proxyBody1") + .proxyMethod("proxyMethod1") + .proxyPath("proxyPath1") + .proxyQueryParams("proxyQueryParams1") + .build(); + + assertEquals("HttpData", dataAddress.getType()); + assertEquals("name1", dataAddress.getName()); + assertEquals("http://myendpoint", dataAddress.getBaseUrl()); + assertEquals("myKey", dataAddress.getAuthKey()); + assertEquals("secret", dataAddress.getAuthCode()); + assertEquals("proxyBody1", dataAddress.getProxyBody()); + assertEquals("proxyMethod1", dataAddress.getProxyMethod()); + assertEquals("proxyPath1", dataAddress.getProxyPath()); + assertEquals("proxyQueryParams1", dataAddress.getProxyQueryParams()); + assertEquals("mysecret", dataAddress.getSecretName()); + assertEquals("PUT", dataAddress.getHttpVerb()); + assertFalse(dataAddress.isUsePartName()); + assertEquals(2, dataAddress.getAdditionalHeaders().size()); + assertEquals("application/octet-stream", dataAddress.getAdditionalHeaders().get("Content-Type")); + assertEquals("BlockBlob", dataAddress.getAdditionalHeaders().get("x-ms-blob-type")); + } + + @Test + void verifyGetDefaultValues() { + HttpDataAddress dataAddress = HttpDataAddress.Builder.newInstance().build(); + + assertEquals("HttpData", dataAddress.getType()); + assertEquals("POST", dataAddress.getHttpVerb()); + assertTrue(dataAddress.isUsePartName()); + assertEquals(0, dataAddress.getAdditionalHeaders().size()); + } +} \ No newline at end of file From bb521998a40c324f300cb3a01a4822a353c88aa0 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 11:09:23 +0300 Subject: [PATCH 05/19] remove usePartName --- extensions/data-plane/data-plane-http/README.md | 1 - .../dataplane/http/DataPlaneHttpExtension.java | 13 ++++++++++++- .../dataplane/http/pipeline/HttpDataSink.java | 14 ++------------ .../http/pipeline/HttpDataSinkFactory.java | 2 -- .../spi/types/domain/HttpDataAddress.java | 13 +------------ .../spi/types/domain/HttpDataAddressTest.java | 3 --- 6 files changed, 15 insertions(+), 31 deletions(-) diff --git a/extensions/data-plane/data-plane-http/README.md b/extensions/data-plane/data-plane-http/README.md index 94fd48e1faf..2cb9af42a9b 100644 --- a/extensions/data-plane/data-plane-http/README.md +++ b/extensions/data-plane/data-plane-http/README.md @@ -29,5 +29,4 @@ see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/datas * proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address. * proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address. * httpVerb - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST. -* usePartName - Use partition name when sending to endpoint. When set to true (default) - it appends the name of the part to the sink endpoint. * additionalHeaders - The additional headers to use as json string e.g. ```"additionalHeaders" : "{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}"```. \ No newline at end of file diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java index 2dc0787f3aa..bc42634535a 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java @@ -26,11 +26,13 @@ import org.eclipse.dataspaceconnector.spi.system.Inject; import org.eclipse.dataspaceconnector.spi.system.ServiceExtension; import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext; +import org.jetbrains.annotations.NotNull; /** * Provides support for reading data from an HTTP endpoint and sending data to an HTTP endpoint. */ public class DataPlaneHttpExtension implements ServiceExtension { + private static final int DEFAULT_PART_SIZE = 5; @Inject private OkHttpClient httpClient; @@ -59,7 +61,7 @@ public String name() { @Override public void initialize(ServiceExtensionContext context) { var monitor = context.getMonitor(); - var sinkPartitionSize = Integer.valueOf(context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, "5")); + var sinkPartitionSize = getSinkPartitionSize(context); @SuppressWarnings("unchecked") var sourceFactory = new HttpDataSourceFactory(httpClient, retryPolicy, monitor, vault); pipelineService.registerFactory(sourceFactory); @@ -67,4 +69,13 @@ public void initialize(ServiceExtensionContext context) { var sinkFactory = new HttpDataSinkFactory(httpClient, executorContainer.getExecutorService(), sinkPartitionSize, monitor); pipelineService.registerFactory(sinkFactory); } + + @NotNull + private Integer getSinkPartitionSize(ServiceExtensionContext context) { + try { + return context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, DEFAULT_PART_SIZE); + } catch (NumberFormatException e) { + return DEFAULT_PART_SIZE; + } + } } diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index d4343fcd4b0..d13f18e9bb2 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -42,7 +42,6 @@ public class HttpDataSink extends ParallelSink { private String authKey; private String authCode; private String endpoint; - private boolean usePartName = true; private OkHttpClient httpClient; private Map additionalHeaders = new HashMap<>(); private HttpDataSinkRequest requestBuilder = new HttpDataSinkRequestPost(); @@ -102,7 +101,7 @@ public Optional makeRequestForPart(Request.Builder requestBuilder, Data return Optional.of( requestBuilder - .url(makeUrl(part)) + .url(endpoint) .put(body) .build()); } @@ -114,16 +113,12 @@ public Optional makeRequestForPart(final Request.Builder requestBuilder var requestBody = new StreamingRequestBody(part); return Optional.of( requestBuilder - .url(makeUrl(part)) + .url(endpoint) .post(requestBody) .build()); } } - private String makeUrl(DataSource.Part part) { - return usePartName ? endpoint + "/" + part.name() : endpoint; - } - public static class Builder extends ParallelSink.Builder { public static Builder newInstance() { @@ -155,11 +150,6 @@ public Builder additionalHeaders(Map additionalHeaders) { return this; } - public Builder usePartName(boolean usePartName) { - sink.usePartName = usePartName; - return this; - } - public Builder httpVerb(String httpVerb) { sink.requestBuilder = makeSender(httpVerb); return this; diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index 6e6e7fa342a..74b21470865 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -76,7 +76,6 @@ private Result createDataSink(DataFlowRequest request) { var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); var httpVerb = dataAddress.getHttpVerb(); - var usePartName = dataAddress.isUsePartName(); var additionalHeaders = dataAddress.getAdditionalHeaders(); var sink = HttpDataSink.Builder.newInstance() @@ -86,7 +85,6 @@ private Result createDataSink(DataFlowRequest request) { .authKey(authKey) .authCode(authCode) .httpClient(httpClient) - .usePartName(usePartName) .httpVerb(httpVerb) .additionalHeaders(additionalHeaders) .executorService(executorService) diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index d09928b9e42..9591d216dea 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -9,7 +9,7 @@ * * Contributors: * Amadeus - Initial implementation - * Siemens AG - added httpVerb, usePartName and additionalHeaders + * Siemens AG - added httpVerb and additionalHeaders * */ @@ -47,7 +47,6 @@ public class HttpDataAddress extends DataAddress { private static final String PROXY_QUERY_PARAMS = "proxyQueryParams"; private static final String PROXY_METHOD = "proxyMethod"; private static final String HTTP_VERB = "httpVerb"; - private static final String USE_PART_NAME = "usePartName"; private static final String ADDITIONAL_HEADERS = "additionalHeaders"; private HttpDataAddress() { @@ -104,11 +103,6 @@ public String getHttpVerb() { return getProperty(HTTP_VERB, "POST"); } - @JsonIgnore - public boolean isUsePartName() { - return Boolean.parseBoolean(getProperty(USE_PART_NAME, Boolean.TRUE.toString())); - } - @JsonIgnore public Map getAdditionalHeaders() { return convertAdditionalHeaders(getProperty(ADDITIONAL_HEADERS, "{}")); @@ -185,11 +179,6 @@ public Builder httpVerb(String httpVerb) { return this; } - public Builder usePartName(String usePartName) { - this.property(USE_PART_NAME, usePartName); - return this; - } - public Builder additionalHeaders(String additionalHeaders) { this.property(ADDITIONAL_HEADERS, additionalHeaders); return this; diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java index f4828006344..fe2b8a28464 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -17,7 +17,6 @@ void verifyGetProperties() { .authKey("myKey") .secretName("mysecret") .httpVerb("PUT") - .usePartName("false") .additionalHeaders("{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") .proxyBody("proxyBody1") .proxyMethod("proxyMethod1") @@ -36,7 +35,6 @@ void verifyGetProperties() { assertEquals("proxyQueryParams1", dataAddress.getProxyQueryParams()); assertEquals("mysecret", dataAddress.getSecretName()); assertEquals("PUT", dataAddress.getHttpVerb()); - assertFalse(dataAddress.isUsePartName()); assertEquals(2, dataAddress.getAdditionalHeaders().size()); assertEquals("application/octet-stream", dataAddress.getAdditionalHeaders().get("Content-Type")); assertEquals("BlockBlob", dataAddress.getAdditionalHeaders().get("x-ms-blob-type")); @@ -48,7 +46,6 @@ void verifyGetDefaultValues() { assertEquals("HttpData", dataAddress.getType()); assertEquals("POST", dataAddress.getHttpVerb()); - assertTrue(dataAddress.isUsePartName()); assertEquals(0, dataAddress.getAdditionalHeaders().size()); } } \ No newline at end of file From 2347b1e9e5d36df9a63585feee3d5be0c47f6eab Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 16:15:15 +0300 Subject: [PATCH 06/19] apply review --- .../dataplane/http/DataPlaneHttpExtension.java | 6 +----- .../dataplane/http/pipeline/HttpDataSinkFactory.java | 2 +- .../http/pipeline/HttpDataSinkFactoryTest.java | 2 +- .../spi/types/domain/HttpDataAddress.java | 12 ++++++------ .../spi/types/domain/DataAddressTest.java | 4 ++-- .../spi/types/domain/HttpDataAddressTest.java | 8 +++----- 6 files changed, 14 insertions(+), 20 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java index bc42634535a..97afc45f9d0 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java @@ -72,10 +72,6 @@ public void initialize(ServiceExtensionContext context) { @NotNull private Integer getSinkPartitionSize(ServiceExtensionContext context) { - try { - return context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, DEFAULT_PART_SIZE); - } catch (NumberFormatException e) { - return DEFAULT_PART_SIZE; - } + return context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, DEFAULT_PART_SIZE); } } diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index 74b21470865..b5dd66232dc 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -75,7 +75,7 @@ private Result createDataSink(DataFlowRequest request) { } var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); - var httpVerb = dataAddress.getHttpVerb(); + var httpVerb = dataAddress.getMethod(); var additionalHeaders = dataAddress.getAdditionalHeaders(); var sink = HttpDataSink.Builder.newInstance() diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index edbc63097d1..51b176060a8 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -114,7 +114,7 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOException { var dataAddress = HttpDataAddress.Builder.newInstance() .baseUrl("http://example.com") - .httpVerb("PUT") + .method("PUT") .build(); var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index 9591d216dea..4b1a97e3076 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -9,7 +9,7 @@ * * Contributors: * Amadeus - Initial implementation - * Siemens AG - added httpVerb and additionalHeaders + * Siemens AG - added method and additionalHeaders * */ @@ -46,7 +46,7 @@ public class HttpDataAddress extends DataAddress { private static final String PROXY_PATH = "proxyPath"; private static final String PROXY_QUERY_PARAMS = "proxyQueryParams"; private static final String PROXY_METHOD = "proxyMethod"; - private static final String HTTP_VERB = "httpVerb"; + private static final String METHOD = "method"; private static final String ADDITIONAL_HEADERS = "additionalHeaders"; private HttpDataAddress() { @@ -99,8 +99,8 @@ public String getProxyMethod() { } @JsonIgnore - public String getHttpVerb() { - return getProperty(HTTP_VERB, "POST"); + public String getMethod() { + return getProperty(METHOD, "POST"); } @JsonIgnore @@ -174,8 +174,8 @@ public Builder proxyMethod(String proxyMethod) { return this; } - public Builder httpVerb(String httpVerb) { - this.property(HTTP_VERB, httpVerb); + public Builder method(String method) { + this.property(METHOD, method); return this; } diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java index 2f162865342..450d042e236 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java @@ -63,13 +63,13 @@ void verifyNullKeyThrowsException() { @Test void verifyGetDefaultPropertyValue() { assertEquals("defaultValue", DataAddress.Builder.newInstance().type("sometype").build() - .getProperty("missing","defaultValue")); + .getProperty("missing", "defaultValue")); } @Test void verifyGetExistingPropertyValue() { assertEquals("existingValue", DataAddress.Builder.newInstance().type("sometype") .property("existing", "existingValue") - .build().getProperty("existing","defaultValue")); + .build().getProperty("existing", "defaultValue")); } } diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java index fe2b8a28464..b4ae0a0da90 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -3,8 +3,6 @@ import org.junit.jupiter.api.Test; import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertFalse; -import static org.junit.jupiter.api.Assertions.assertTrue; class HttpDataAddressTest { @@ -16,7 +14,7 @@ void verifyGetProperties() { .authCode("secret") .authKey("myKey") .secretName("mysecret") - .httpVerb("PUT") + .method("PUT") .additionalHeaders("{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") .proxyBody("proxyBody1") .proxyMethod("proxyMethod1") @@ -34,7 +32,7 @@ void verifyGetProperties() { assertEquals("proxyPath1", dataAddress.getProxyPath()); assertEquals("proxyQueryParams1", dataAddress.getProxyQueryParams()); assertEquals("mysecret", dataAddress.getSecretName()); - assertEquals("PUT", dataAddress.getHttpVerb()); + assertEquals("PUT", dataAddress.getMethod()); assertEquals(2, dataAddress.getAdditionalHeaders().size()); assertEquals("application/octet-stream", dataAddress.getAdditionalHeaders().get("Content-Type")); assertEquals("BlockBlob", dataAddress.getAdditionalHeaders().get("x-ms-blob-type")); @@ -45,7 +43,7 @@ void verifyGetDefaultValues() { HttpDataAddress dataAddress = HttpDataAddress.Builder.newInstance().build(); assertEquals("HttpData", dataAddress.getType()); - assertEquals("POST", dataAddress.getHttpVerb()); + assertEquals("POST", dataAddress.getMethod()); assertEquals(0, dataAddress.getAdditionalHeaders().size()); } } \ No newline at end of file From db87cd412d313984d6637c28cad73ecce347bd97 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 16:17:53 +0300 Subject: [PATCH 07/19] apply review --- extensions/data-plane/data-plane-http/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/data-plane/data-plane-http/README.md b/extensions/data-plane/data-plane-http/README.md index 2cb9af42a9b..c37bad2f396 100644 --- a/extensions/data-plane/data-plane-http/README.md +++ b/extensions/data-plane/data-plane-http/README.md @@ -28,5 +28,5 @@ see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/datas * proxyPath - If set to true the path of the actual request will be used to retrieve data from this address. * proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address. * proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address. -* httpVerb - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST. +* method - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST. * additionalHeaders - The additional headers to use as json string e.g. ```"additionalHeaders" : "{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}"```. \ No newline at end of file From 14c362b0eba0a23ee77026496d7afdec982e4637 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 16:25:41 +0300 Subject: [PATCH 08/19] apply review --- .../dataplane/http/pipeline/HttpDataSink.java | 58 +++---------------- 1 file changed, 8 insertions(+), 50 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index d13f18e9bb2..53eb8e4de3f 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -17,18 +17,14 @@ import okhttp3.OkHttpClient; import okhttp3.Request; -import okhttp3.RequestBody; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.ParallelSink; import org.eclipse.dataspaceconnector.spi.response.StatusResult; -import java.io.IOException; -import java.io.InputStream; import java.util.HashMap; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Optional; import static java.lang.String.format; import static org.eclipse.dataspaceconnector.spi.response.ResponseStatus.ERROR_RETRY; @@ -44,7 +40,7 @@ public class HttpDataSink extends ParallelSink { private String endpoint; private OkHttpClient httpClient; private Map additionalHeaders = new HashMap<>(); - private HttpDataSinkRequest requestBuilder = new HttpDataSinkRequestPost(); + private String method = "POST"; /** * Sends the parts to the destination endpoint using an HTTP POST. @@ -52,7 +48,7 @@ public class HttpDataSink extends ParallelSink { @Override protected StatusResult transferParts(List parts) { for (DataSource.Part part : parts) { - + var requestBody = new StreamingRequestBody(part); var requestBuilder = new Request.Builder(); if (authKey != null) { requestBuilder.header(authKey, authCode); @@ -62,8 +58,10 @@ protected StatusResult transferParts(List parts) { additionalHeaders.forEach(requestBuilder::header); } - var request = this.requestBuilder.makeRequestForPart(requestBuilder, part) - .orElseThrow(() -> new IllegalStateException("Failed to build a request")); + var request = requestBuilder + .url(endpoint) + .method(method, requestBody) + .build(); try (var response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { @@ -83,42 +81,6 @@ protected StatusResult transferParts(List parts) { private HttpDataSink() { } - private interface HttpDataSinkRequest { - Optional makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part); - } - - private class HttpDataSinkRequestPut implements HttpDataSinkRequest { - @Override - public Optional makeRequestForPart(Request.Builder requestBuilder, DataSource.Part part) { - RequestBody body; - - try (InputStream stream = part.openStream()) { - body = RequestBody.create(stream.readAllBytes()); - } catch (IOException e) { - monitor.severe(format("Error reading bytes for HTTP part data %s", part.name())); - return Optional.empty(); - } - - return Optional.of( - requestBuilder - .url(endpoint) - .put(body) - .build()); - } - } - - private class HttpDataSinkRequestPost implements HttpDataSinkRequest { - @Override - public Optional makeRequestForPart(final Request.Builder requestBuilder, final DataSource.Part part) { - var requestBody = new StreamingRequestBody(part); - return Optional.of( - requestBuilder - .url(endpoint) - .post(requestBody) - .build()); - } - } - public static class Builder extends ParallelSink.Builder { public static Builder newInstance() { @@ -150,15 +112,11 @@ public Builder additionalHeaders(Map additionalHeaders) { return this; } - public Builder httpVerb(String httpVerb) { - sink.requestBuilder = makeSender(httpVerb); + public Builder method(String method) { + sink.method = method; return this; } - private HttpDataSinkRequest makeSender(String httpVerb) { - return "PUT".equalsIgnoreCase(httpVerb) ? sink.new HttpDataSinkRequestPut() : sink.new HttpDataSinkRequestPost(); - } - protected void validate() { Objects.requireNonNull(sink.endpoint, "endpoint"); Objects.requireNonNull(sink.httpClient, "httpClient"); From afc612f237eba2dc20aad0185feb348f9446cdbd Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Wed, 22 Jun 2022 16:28:24 +0300 Subject: [PATCH 09/19] apply review --- .../dataplane/http/pipeline/HttpDataSinkFactory.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index b5dd66232dc..ac6446420fa 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -75,7 +75,7 @@ private Result createDataSink(DataFlowRequest request) { } var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); - var httpVerb = dataAddress.getMethod(); + var method = dataAddress.getMethod(); var additionalHeaders = dataAddress.getAdditionalHeaders(); var sink = HttpDataSink.Builder.newInstance() @@ -85,7 +85,7 @@ private Result createDataSink(DataFlowRequest request) { .authKey(authKey) .authCode(authCode) .httpClient(httpClient) - .httpVerb(httpVerb) + .method(method) .additionalHeaders(additionalHeaders) .executorService(executorService) .monitor(monitor) From 1b1bd01b9c94f5e8384e072d1f801b9c78fa7235 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 08:38:32 +0300 Subject: [PATCH 10/19] apply review - remove method from sink and use header: for additional headers --- .../data-plane/data-plane-http/README.md | 3 +- .../dataplane/http/pipeline/HttpDataSink.java | 6 +--- .../pipeline/HttpDataSinkFactoryTest.java | 4 +-- .../spi/types/domain/HttpDataAddress.java | 34 +++++-------------- .../spi/types/domain/HttpDataAddressTest.java | 6 ++-- 5 files changed, 15 insertions(+), 38 deletions(-) diff --git a/extensions/data-plane/data-plane-http/README.md b/extensions/data-plane/data-plane-http/README.md index c37bad2f396..67a404d9580 100644 --- a/extensions/data-plane/data-plane-http/README.md +++ b/extensions/data-plane/data-plane-http/README.md @@ -28,5 +28,4 @@ see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/datas * proxyPath - If set to true the path of the actual request will be used to retrieve data from this address. * proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address. * proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address. -* method - The http verb to use for sink endpoint - possible values POST/PUT - default set to POST. -* additionalHeaders - The additional headers to use as json string e.g. ```"additionalHeaders" : "{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}"```. \ No newline at end of file +* header:* - The additional headers to use as json string e.g. ```"header:Content-Type" : "application/octet-stream","header:x-ms-blob-type": "BlockBlob"```. \ No newline at end of file diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index 53eb8e4de3f..d6b76f52a85 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -58,11 +58,7 @@ protected StatusResult transferParts(List parts) { additionalHeaders.forEach(requestBuilder::header); } - var request = requestBuilder - .url(endpoint) - .method(method, requestBody) - .build(); - + var request = requestBuilder.url(endpoint).post(requestBody).build(); try (var response = httpClient.newCall(request).execute()) { if (!response.isSuccessful()) { monitor.severe(format("Error {%s: %s} received writing HTTP data %s to endpoint %s for request: %s", response.code(), response.message(), part.name(), endpoint, request)); diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index 51b176060a8..8670f984038 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -114,7 +114,6 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOException { var dataAddress = HttpDataAddress.Builder.newInstance() .baseUrl("http://example.com") - .method("PUT") .build(); var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); @@ -140,7 +139,8 @@ void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOEx void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionException, IOException { var dataAddress = HttpDataAddress.Builder.newInstance() .baseUrl("http://example.com") - .additionalHeaders("{\"Content-Type\" : \"application/test-octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") + .addAdditionalHeader("Content-Type", "application/test-octet-stream") + .addAdditionalHeader("x-ms-blob-type", "BlockBlob") .build(); var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index 4b1a97e3076..5b4a2e9befb 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -18,14 +18,12 @@ import com.fasterxml.jackson.annotation.JsonCreator; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonTypeName; -import com.fasterxml.jackson.core.JsonProcessingException; -import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.annotation.JsonDeserialize; import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder; -import org.eclipse.dataspaceconnector.common.string.StringUtils; -import java.util.HashMap; import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; /** * This is a wrapper class for the {@link DataAddress} object, which has typed accessors for properties specific to @@ -46,8 +44,7 @@ public class HttpDataAddress extends DataAddress { private static final String PROXY_PATH = "proxyPath"; private static final String PROXY_QUERY_PARAMS = "proxyQueryParams"; private static final String PROXY_METHOD = "proxyMethod"; - private static final String METHOD = "method"; - private static final String ADDITIONAL_HEADERS = "additionalHeaders"; + public static final String ADDITIONAL_HEADER = "header:"; private HttpDataAddress() { super(); @@ -98,24 +95,16 @@ public String getProxyMethod() { return getProperty(PROXY_METHOD); } - @JsonIgnore - public String getMethod() { - return getProperty(METHOD, "POST"); - } @JsonIgnore public Map getAdditionalHeaders() { - return convertAdditionalHeaders(getProperty(ADDITIONAL_HEADERS, "{}")); - } + return getProperties().entrySet().stream() + .filter(entry -> entry.getKey().startsWith(ADDITIONAL_HEADER)) + .collect(Collectors.toMap(entry -> entry.getKey().replace(ADDITIONAL_HEADER, ""), Map.Entry::getValue)); - private Map convertAdditionalHeaders(String additionalHeaders) { - try { - return new ObjectMapper().readValue(StringUtils.isNullOrBlank(additionalHeaders) ? "" : additionalHeaders, Map.class); - } catch (JsonProcessingException e) { - return new HashMap<>(); - } } + @JsonPOJOBuilder(withPrefix = "") public static final class Builder extends DataAddress.Builder { @@ -174,13 +163,8 @@ public Builder proxyMethod(String proxyMethod) { return this; } - public Builder method(String method) { - this.property(METHOD, method); - return this; - } - - public Builder additionalHeaders(String additionalHeaders) { - this.property(ADDITIONAL_HEADERS, additionalHeaders); + public Builder addAdditionalHeader(String additionalHeaderName, String additionalHeaderValue) { + address.getProperties().put(ADDITIONAL_HEADER + additionalHeaderName, Objects.requireNonNull(additionalHeaderValue)); return this; } diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java index b4ae0a0da90..66e52039a67 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -14,8 +14,8 @@ void verifyGetProperties() { .authCode("secret") .authKey("myKey") .secretName("mysecret") - .method("PUT") - .additionalHeaders("{\"Content-Type\" : \"application/octet-stream\",\"x-ms-blob-type\": \"BlockBlob\"}") + .addAdditionalHeader("Content-Type", "application/octet-stream") + .addAdditionalHeader("x-ms-blob-type", "BlockBlob") .proxyBody("proxyBody1") .proxyMethod("proxyMethod1") .proxyPath("proxyPath1") @@ -32,7 +32,6 @@ void verifyGetProperties() { assertEquals("proxyPath1", dataAddress.getProxyPath()); assertEquals("proxyQueryParams1", dataAddress.getProxyQueryParams()); assertEquals("mysecret", dataAddress.getSecretName()); - assertEquals("PUT", dataAddress.getMethod()); assertEquals(2, dataAddress.getAdditionalHeaders().size()); assertEquals("application/octet-stream", dataAddress.getAdditionalHeaders().get("Content-Type")); assertEquals("BlockBlob", dataAddress.getAdditionalHeaders().get("x-ms-blob-type")); @@ -43,7 +42,6 @@ void verifyGetDefaultValues() { HttpDataAddress dataAddress = HttpDataAddress.Builder.newInstance().build(); assertEquals("HttpData", dataAddress.getType()); - assertEquals("POST", dataAddress.getMethod()); assertEquals(0, dataAddress.getAdditionalHeaders().size()); } } \ No newline at end of file From ea5f1e703a08b85bd8e7ef24c77c1505aed8766e Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 08:41:26 +0300 Subject: [PATCH 11/19] fix missing header --- .../spi/types/domain/HttpDataAddressTest.java | 14 ++++++++++++++ 1 file changed, 14 insertions(+) diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java index 66e52039a67..42c7f1c1bd4 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2022 Siemens AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * Siemens AG - initial implementation + * + */ + package org.eclipse.dataspaceconnector.spi.types.domain; import org.junit.jupiter.api.Test; From f672aa3fc5675db7b6acccbe496259f158cf3d71 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 08:42:55 +0300 Subject: [PATCH 12/19] cleanup --- .../dataplane/http/DataPlaneHttpExtension.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java index 97afc45f9d0..3eecc8554e1 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/DataPlaneHttpExtension.java @@ -26,7 +26,6 @@ import org.eclipse.dataspaceconnector.spi.system.Inject; import org.eclipse.dataspaceconnector.spi.system.ServiceExtension; import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext; -import org.jetbrains.annotations.NotNull; /** * Provides support for reading data from an HTTP endpoint and sending data to an HTTP endpoint. @@ -70,8 +69,7 @@ public void initialize(ServiceExtensionContext context) { pipelineService.registerFactory(sinkFactory); } - @NotNull - private Integer getSinkPartitionSize(ServiceExtensionContext context) { + private int getSinkPartitionSize(ServiceExtensionContext context) { return context.getSetting(EDC_DATAPLANE_HTTP_SINK_PARTITION_SIZE, DEFAULT_PART_SIZE); } } From 971e516366d804fd82fed944f4894bb77fa5c6df Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 08:55:13 +0300 Subject: [PATCH 13/19] fix compilation --- .../dataplane/http/pipeline/HttpDataSinkFactory.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index ac6446420fa..ec6b0ecc0df 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -75,7 +75,6 @@ private Result createDataSink(DataFlowRequest request) { } var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); - var method = dataAddress.getMethod(); var additionalHeaders = dataAddress.getAdditionalHeaders(); var sink = HttpDataSink.Builder.newInstance() @@ -85,7 +84,6 @@ private Result createDataSink(DataFlowRequest request) { .authKey(authKey) .authCode(authCode) .httpClient(httpClient) - .method(method) .additionalHeaders(additionalHeaders) .executorService(executorService) .monitor(monitor) From fe06054de10d205c5f28ac6637b3fb7908927d19 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 09:03:54 +0300 Subject: [PATCH 14/19] remove test for PUT --- .../pipeline/HttpDataSinkFactoryTest.java | 25 ------------------- 1 file changed, 25 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index 8670f984038..3e0fb969d02 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -110,31 +110,6 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx verify(call).execute(); } - @Test - void verifyCreatePutVerb() throws InterruptedException, ExecutionException, IOException { - var dataAddress = HttpDataAddress.Builder.newInstance() - .baseUrl("http://example.com") - .build(); - - var validRequest = createRequest(HttpDataAddress.DATA_TYPE).destinationDataAddress(dataAddress).build(); - - var call = mock(Call.class); - when(call.execute()).thenReturn(createHttpResponse().build()); - - when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> { - assertThat(((Request) r.getArgument(0)).method()).isEqualTo("PUT"); // verify verb PUT - return call; - }); - - var sink = factory.createSink(validRequest); - - var result = sink.transfer(new InputStreamDataSource("test", new ByteArrayInputStream("test".getBytes()))).get(); - - assertThat(result.failed()).isFalse(); - - verify(call).execute(); - } - @Test void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionException, IOException { var dataAddress = HttpDataAddress.Builder.newInstance() From e2bcdea069ff803ea824f653c49f8ac59a447f2e Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 12:04:53 +0300 Subject: [PATCH 15/19] apply review - use assertj and move content type to it's own field --- .../data-plane/data-plane-http/README.md | 3 +- .../dataplane/http/pipeline/HttpDataSink.java | 10 ++--- .../http/pipeline/HttpDataSinkFactory.java | 1 + .../http/pipeline/StreamingRequestBody.java | 9 +++-- .../pipeline/StreamingRequestBodyTest.java | 3 +- .../spi/types/domain/HttpDataAddress.java | 19 ++++++++- .../spi/types/domain/DataAddressTest.java | 19 ++++----- .../spi/types/domain/HttpDataAddressTest.java | 39 +++++++++++-------- 8 files changed, 63 insertions(+), 40 deletions(-) diff --git a/extensions/data-plane/data-plane-http/README.md b/extensions/data-plane/data-plane-http/README.md index 67a404d9580..03da6cd65d1 100644 --- a/extensions/data-plane/data-plane-http/README.md +++ b/extensions/data-plane/data-plane-http/README.md @@ -28,4 +28,5 @@ see [HttpDataAddress.java](../../../spi/core-spi/src/main/java/org/eclipse/datas * proxyPath - If set to true the path of the actual request will be used to retrieve data from this address. * proxyQueryParams - If set to true the query params of the actual request will be used to retrieve data from this address. * proxyMethod - If set to true the http method of the actual request will be used to retrieve data from this address. -* header:* - The additional headers to use as json string e.g. ```"header:Content-Type" : "application/octet-stream","header:x-ms-blob-type": "BlockBlob"```. \ No newline at end of file +* header:* - The additional headers to use as json string e.g. ```"header:x-ms-blob-type": "BlockBlob"```. Note that Content-Type is handled separately - it will not be taken into consideration if specified here. +* contentType - The HTTP Content-Type header is handled separate and has a default value of application/octet-stream. \ No newline at end of file diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index d6b76f52a85..97246fbdd02 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -9,7 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation - * Siemens AG - changes to make it compatible with AWS S3, Azure blob and ALI Object Storage presigned URL for upload + * Siemens AG - added additionalHeaders * */ @@ -38,9 +38,9 @@ public class HttpDataSink extends ParallelSink { private String authKey; private String authCode; private String endpoint; + private String contentType; private OkHttpClient httpClient; private Map additionalHeaders = new HashMap<>(); - private String method = "POST"; /** * Sends the parts to the destination endpoint using an HTTP POST. @@ -48,7 +48,7 @@ public class HttpDataSink extends ParallelSink { @Override protected StatusResult transferParts(List parts) { for (DataSource.Part part : parts) { - var requestBody = new StreamingRequestBody(part); + var requestBody = new StreamingRequestBody(part, contentType); var requestBuilder = new Request.Builder(); if (authKey != null) { requestBuilder.header(authKey, authCode); @@ -108,8 +108,8 @@ public Builder additionalHeaders(Map additionalHeaders) { return this; } - public Builder method(String method) { - sink.method = method; + public Builder contentType(String contentType) { + sink.contentType = contentType; return this; } diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index ec6b0ecc0df..9863c3e636c 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -9,6 +9,7 @@ * * Contributors: * Microsoft Corporation - initial API and implementation + * Siemens AG - added additionalHeaders * */ diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java index 4a6f8c4f053..d02bb0294f6 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java @@ -18,6 +18,7 @@ import okhttp3.RequestBody; import okio.BufferedSink; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource; +import org.eclipse.dataspaceconnector.spi.types.domain.HttpDataAddress; import java.io.IOException; @@ -25,17 +26,17 @@ * Streams content into an OK HTTP buffered sink. */ public class StreamingRequestBody extends RequestBody { - private static final String OCTET_STREAM = "application/octet-stream"; - private final DataSource.Part part; + private final String contentType; - public StreamingRequestBody(DataSource.Part part) { + public StreamingRequestBody(DataSource.Part part, String contentType) { this.part = part; + this.contentType = contentType; } @Override public MediaType contentType() { - return MediaType.parse(OCTET_STREAM); + return MediaType.parse(contentType); } @Override diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBodyTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBodyTest.java index 9bcf7c5c630..c01fdf037a9 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBodyTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBodyTest.java @@ -16,6 +16,7 @@ import okio.BufferedSink; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource; +import org.eclipse.dataspaceconnector.spi.types.domain.HttpDataAddress; import org.junit.jupiter.api.Test; import java.io.ByteArrayInputStream; @@ -39,7 +40,7 @@ void verifyStreamingTransfer() throws IOException { when(sink.outputStream()).thenReturn(outputStream); - var body = new StreamingRequestBody(part); + var body = new StreamingRequestBody(part, HttpDataAddress.OCTET_STREAM); body.writeTo(sink); assertThat(outputStream.toByteArray()).isEqualTo(CONTENT); diff --git a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java index 5b4a2e9befb..3c9546542d8 100644 --- a/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java +++ b/spi/core-spi/src/main/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddress.java @@ -9,7 +9,7 @@ * * Contributors: * Amadeus - Initial implementation - * Siemens AG - added method and additionalHeaders + * Siemens AG - added additionalHeaders * */ @@ -23,6 +23,7 @@ import java.util.Map; import java.util.Objects; +import java.util.Set; import java.util.stream.Collectors; /** @@ -45,6 +46,9 @@ public class HttpDataAddress extends DataAddress { private static final String PROXY_QUERY_PARAMS = "proxyQueryParams"; private static final String PROXY_METHOD = "proxyMethod"; public static final String ADDITIONAL_HEADER = "header:"; + public static final String CONTENT_TYPE = "contentType"; + public static final String OCTET_STREAM = "application/octet-stream"; + public static final Set ADDITIONAL_HEADERS_TO_IGNORE = Set.of("content-type"); private HttpDataAddress() { super(); @@ -95,6 +99,10 @@ public String getProxyMethod() { return getProperty(PROXY_METHOD); } + @JsonIgnore + public String getContentType() { + return getProperty(CONTENT_TYPE, OCTET_STREAM); + } @JsonIgnore public Map getAdditionalHeaders() { @@ -164,10 +172,19 @@ public Builder proxyMethod(String proxyMethod) { } public Builder addAdditionalHeader(String additionalHeaderName, String additionalHeaderValue) { + if (ADDITIONAL_HEADERS_TO_IGNORE.contains(additionalHeaderName.toLowerCase())) { + return this; + } + address.getProperties().put(ADDITIONAL_HEADER + additionalHeaderName, Objects.requireNonNull(additionalHeaderValue)); return this; } + public Builder contentType(String contentType) { + this.property(CONTENT_TYPE, contentType); + return this; + } + public Builder copyFrom(DataAddress other) { other.getProperties().forEach(this::property); return this; diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java index 450d042e236..7d35ceb0459 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java @@ -21,9 +21,7 @@ import java.io.StringWriter; import static org.assertj.core.api.Assertions.assertThatThrownBy; -import static org.junit.jupiter.api.Assertions.assertEquals; -import static org.junit.jupiter.api.Assertions.assertNotNull; - +import static org.assertj.core.api.Assertions.assertThat; class DataAddressTest { @@ -40,10 +38,10 @@ void verifyDeserialization() throws IOException { DataAddress deserialized = mapper.readValue(writer.toString(), DataAddress.class); - assertNotNull(deserialized); + assertThat(deserialized).isNotNull(); - assertEquals("test", deserialized.getType()); - assertEquals("bar", deserialized.getProperty("foo")); + assertThat(deserialized.getType()).isEqualTo("test"); + assertThat(deserialized.getProperty("foo")).isEqualTo("bar"); } @Test @@ -62,14 +60,13 @@ void verifyNullKeyThrowsException() { @Test void verifyGetDefaultPropertyValue() { - assertEquals("defaultValue", DataAddress.Builder.newInstance().type("sometype").build() - .getProperty("missing", "defaultValue")); + assertThat(DataAddress.Builder.newInstance().type("sometype").build().getProperty("missing", "defaultValue")) + .isEqualTo("defaultValue"); } @Test void verifyGetExistingPropertyValue() { - assertEquals("existingValue", DataAddress.Builder.newInstance().type("sometype") - .property("existing", "existingValue") - .build().getProperty("existing", "defaultValue")); + assertThat(DataAddress.Builder.newInstance().type("sometype").property("existing", "existingValue").build().getProperty("existing", "defaultValue")) + .isEqualTo("existingValue"); } } diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java index 42c7f1c1bd4..a0070438620 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/HttpDataAddressTest.java @@ -16,7 +16,8 @@ import org.junit.jupiter.api.Test; -import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.assertj.core.api.Assertions.assertThat; + class HttpDataAddressTest { @@ -28,7 +29,9 @@ void verifyGetProperties() { .authCode("secret") .authKey("myKey") .secretName("mysecret") - .addAdditionalHeader("Content-Type", "application/octet-stream") + .contentType("application/octet-stream") + .addAdditionalHeader("Content-Type", "text/html; charset=UTF-8") + .addAdditionalHeader("Keep-Alive", "timeout=5, max=1000") .addAdditionalHeader("x-ms-blob-type", "BlockBlob") .proxyBody("proxyBody1") .proxyMethod("proxyMethod1") @@ -36,26 +39,28 @@ void verifyGetProperties() { .proxyQueryParams("proxyQueryParams1") .build(); - assertEquals("HttpData", dataAddress.getType()); - assertEquals("name1", dataAddress.getName()); - assertEquals("http://myendpoint", dataAddress.getBaseUrl()); - assertEquals("myKey", dataAddress.getAuthKey()); - assertEquals("secret", dataAddress.getAuthCode()); - assertEquals("proxyBody1", dataAddress.getProxyBody()); - assertEquals("proxyMethod1", dataAddress.getProxyMethod()); - assertEquals("proxyPath1", dataAddress.getProxyPath()); - assertEquals("proxyQueryParams1", dataAddress.getProxyQueryParams()); - assertEquals("mysecret", dataAddress.getSecretName()); - assertEquals(2, dataAddress.getAdditionalHeaders().size()); - assertEquals("application/octet-stream", dataAddress.getAdditionalHeaders().get("Content-Type")); - assertEquals("BlockBlob", dataAddress.getAdditionalHeaders().get("x-ms-blob-type")); + assertThat(dataAddress.getType()).isEqualTo("HttpData"); + assertThat(dataAddress.getName()).isEqualTo("name1"); + assertThat(dataAddress.getBaseUrl()).isEqualTo("http://myendpoint"); + assertThat(dataAddress.getAuthKey()).isEqualTo("myKey"); + assertThat(dataAddress.getAuthCode()).isEqualTo("secret"); + assertThat(dataAddress.getProxyBody()).isEqualTo("proxyBody1"); + assertThat(dataAddress.getProxyMethod()).isEqualTo("proxyMethod1"); + assertThat(dataAddress.getProxyPath()).isEqualTo("proxyPath1"); + assertThat(dataAddress.getProxyQueryParams()).isEqualTo("proxyQueryParams1"); + assertThat(dataAddress.getSecretName()).isEqualTo("mysecret"); + assertThat(dataAddress.getContentType()).isEqualTo("application/octet-stream"); + assertThat(dataAddress.getAdditionalHeaders().size()).isEqualTo(2); + assertThat(dataAddress.getAdditionalHeaders().get("Keep-Alive")).isEqualTo("timeout=5, max=1000"); + assertThat(dataAddress.getAdditionalHeaders().get("x-ms-blob-type")).isEqualTo("BlockBlob"); } @Test void verifyGetDefaultValues() { HttpDataAddress dataAddress = HttpDataAddress.Builder.newInstance().build(); - assertEquals("HttpData", dataAddress.getType()); - assertEquals(0, dataAddress.getAdditionalHeaders().size()); + assertThat(dataAddress.getType()).isEqualTo("HttpData"); + assertThat(dataAddress.getAdditionalHeaders().size()).isEqualTo(0); + assertThat(dataAddress.getContentType()).isEqualTo("application/octet-stream"); } } \ No newline at end of file From c51cde910b68bc681e013bc9934ad5099d09299e Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 12:36:51 +0300 Subject: [PATCH 16/19] fix style --- .../dataspaceconnector/spi/types/domain/DataAddressTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java index 7d35ceb0459..73896c70f90 100644 --- a/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java +++ b/spi/core-spi/src/test/java/org/eclipse/dataspaceconnector/spi/types/domain/DataAddressTest.java @@ -20,8 +20,8 @@ import java.io.IOException; import java.io.StringWriter; -import static org.assertj.core.api.Assertions.assertThatThrownBy; import static org.assertj.core.api.Assertions.assertThat; +import static org.assertj.core.api.Assertions.assertThatThrownBy; class DataAddressTest { From c393df8b3407acba169e436c2ae57bdc4d9fde74 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 12:39:20 +0300 Subject: [PATCH 17/19] fix style --- .../dataplane/http/pipeline/StreamingRequestBody.java | 1 - 1 file changed, 1 deletion(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java index d02bb0294f6..2a347ae039f 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/StreamingRequestBody.java @@ -18,7 +18,6 @@ import okhttp3.RequestBody; import okio.BufferedSink; import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource; -import org.eclipse.dataspaceconnector.spi.types.domain.HttpDataAddress; import java.io.IOException; From 437d58f6c72a146a5f1984900d6c3c32513ba502 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Thu, 23 Jun 2022 12:56:18 +0300 Subject: [PATCH 18/19] fix failing test --- .../dataplane/http/pipeline/HttpDataSinkFactory.java | 2 ++ .../dataplane/http/pipeline/HttpDataSinkFactoryTest.java | 4 ++-- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java index 9863c3e636c..38e777a0cc4 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactory.java @@ -77,6 +77,7 @@ private Result createDataSink(DataFlowRequest request) { var authKey = dataAddress.getAuthKey(); var authCode = dataAddress.getAuthCode(); var additionalHeaders = dataAddress.getAdditionalHeaders(); + var contentType = dataAddress.getContentType(); var sink = HttpDataSink.Builder.newInstance() .endpoint(baseUrl) @@ -85,6 +86,7 @@ private Result createDataSink(DataFlowRequest request) { .authKey(authKey) .authCode(authCode) .httpClient(httpClient) + .contentType(contentType) .additionalHeaders(additionalHeaders) .executorService(executorService) .monitor(monitor) diff --git a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java index 3e0fb969d02..0ff8013149c 100644 --- a/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java +++ b/extensions/data-plane/data-plane-http/src/test/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSinkFactoryTest.java @@ -114,7 +114,7 @@ void verifyCreateAuthenticatingSource() throws InterruptedException, ExecutionEx void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionException, IOException { var dataAddress = HttpDataAddress.Builder.newInstance() .baseUrl("http://example.com") - .addAdditionalHeader("Content-Type", "application/test-octet-stream") + .contentType("application/test-octet-stream") .addAdditionalHeader("x-ms-blob-type", "BlockBlob") .build(); @@ -124,7 +124,7 @@ void verifyCreateAdditionalHeaders() throws InterruptedException, ExecutionExcep when(call.execute()).thenReturn(createHttpResponse().build()); when(httpClient.newCall(isA(Request.class))).thenAnswer(r -> { - assertThat(((Request) r.getArgument(0)).headers("Content-Type").get(0)).isEqualTo("application/test-octet-stream"); // verify Content-Type + assertThat(((Request) r.getArgument(0)).body().contentType().toString()).isEqualTo("application/test-octet-stream"); // verify Content-Type assertThat(((Request) r.getArgument(0)).headers("x-ms-blob-type").get(0)).isEqualTo("BlockBlob"); // verify x-ms-blob-type return call; }); From 5b6a6446a125fdd5d26b336ff4cc59f21ecc66c8 Mon Sep 17 00:00:00 2001 From: Lucian Torje Date: Mon, 27 Jun 2022 06:29:34 +0300 Subject: [PATCH 19/19] apply review - use putall --- .../dataplane/http/pipeline/HttpDataSink.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java index 97246fbdd02..60d626b3ec9 100644 --- a/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java +++ b/extensions/data-plane/data-plane-http/src/main/java/org/eclipse/dataspaceconnector/dataplane/http/pipeline/HttpDataSink.java @@ -104,7 +104,7 @@ public Builder httpClient(OkHttpClient httpClient) { } public Builder additionalHeaders(Map additionalHeaders) { - sink.additionalHeaders = additionalHeaders; + sink.additionalHeaders.putAll(additionalHeaders); return this; }