Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPF: Fix Azure storage copy process #1245

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 3 additions & 1 deletion .github/workflows/verify.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,9 @@ jobs:
- name: Azure Storage Tests
uses: ./.github/actions/run-tests
with:
command: ./gradlew -p extensions/azure test -DincludeTags="AzureStorageIntegrationTest"
command: |
./gradlew -p extensions/azure test -DincludeTags="AzureStorageIntegrationTest"
./gradlew -p system-tests/azure-tests test -DincludeTags="AzureStorageIntegrationTest"

Check-Cosmos-Key:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ in the detailed section referring to by linking pull requests or issues.
* Break lease after TransferProcessManager status check (#1214)
* Fix path conflicts between `CatalogApiController` and `FederatedCatalogApiController` (#1225)
* Always use configured IDS API path in IDS webhook address (#1249)
* Fix Azure storage transfer (#1245)

## [milestone-3] - 2022-04-08

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,5 +27,4 @@ private AzureBlobStoreSchema() {
public static final String CONTAINER_NAME = "container";
public static final String ACCOUNT_NAME = "account";
public static final String BLOB_NAME = "blobname";
public static final String SHARED_KEY = "sharedKey";
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.azure.blob.core.api;

import com.azure.core.credential.AzureSasCredential;
import com.azure.storage.blob.models.BlobItem;
import org.eclipse.dataspaceconnector.azure.blob.core.adapter.BlobAdapter;

Expand All @@ -39,4 +40,6 @@ public interface BlobStoreApi {
byte[] getBlob(String account, String container, String blobName);

BlobAdapter getBlobAdapter(String accountName, String containerName, String blobName, String sharedKey);

BlobAdapter getBlobAdapter(String accountName, String containerName, String blobName, AzureSasCredential credential);
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.azure.blob.core.api;

import com.azure.core.credential.AzureSasCredential;
import com.azure.core.util.BinaryData;
import com.azure.storage.blob.BlobServiceClient;
import com.azure.storage.blob.BlobServiceClientBuilder;
Expand Down Expand Up @@ -127,6 +128,12 @@ public BlobAdapter getBlobAdapter(String accountName, String containerName, Stri
return getBlobAdapter(accountName, containerName, blobName, builder);
}

@Override
public BlobAdapter getBlobAdapter(String accountName, String containerName, String blobName, AzureSasCredential credential) {
BlobServiceClientBuilder builder = new BlobServiceClientBuilder().credential(credential);
return getBlobAdapter(accountName, containerName, blobName, builder);
}

private BlobAdapter getBlobAdapter(String accountName, String containerName, String blobName, BlobServiceClientBuilder builder) {
var blobServiceClient = builder
.endpoint(createEndpoint(accountName))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@

import org.eclipse.dataspaceconnector.common.string.StringUtils;

import java.util.Base64;
import java.util.regex.Pattern;


Expand All @@ -28,8 +27,6 @@
* Azure documentation</a>.
*/
public class AzureStorageValidator {
private static final Base64.Decoder DECODER = Base64.getDecoder();

private static final int ACCOUNT_MIN_LENGTH = 3;
private static final int ACCOUNT_MAX_LENGTH = 24;
private static final int CONTAINER_MIN_LENGTH = 3;
Expand All @@ -42,7 +39,6 @@ public class AzureStorageValidator {
private static final String ACCOUNT = "account";
private static final String BLOB = "blob";
private static final String CONTAINER = "container";
private static final String INVALID_KEY = "Storage Key is not a valid base64 encoded string";
private static final String INVALID_RESOURCE_NAME = "Invalid %s name";
private static final String INVALID_RESOURCE_NAME_LENGTH = "Invalid %s name length, the name must be between %s and %s characters long";
private static final String RESOURCE_NAME_EMPTY = "Invalid %s name, the name may not be null, empty or blank";
Expand Down Expand Up @@ -94,23 +90,6 @@ public static void validateBlobName(String blobName) {
}
}

/**
* Checks if a storage account shared key is in the expected format.
*
* @param accountKey A String representing the shared key to validate.
* @throws IllegalArgumentException if the string does not represent a value encoded in the expected format.
*/
public static void validateSharedKey(String accountKey) {
if (StringUtils.isNullOrBlank(accountKey)) {
throw new IllegalArgumentException(INVALID_KEY);
}
try {
DECODER.decode(accountKey);
} catch (IllegalArgumentException e) {
throw new IllegalArgumentException(INVALID_KEY);
}
}

private static void checkLength(String name, String resourceType, int minLength, int maxLength) {
if (StringUtils.isNullOrBlank(name)) {
throw new IllegalArgumentException(String.format(RESOURCE_NAME_EMPTY, resourceType));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,18 +91,4 @@ void validateBlobName_fail(String input) {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> AzureStorageValidator.validateBlobName(input));
}

@ParameterizedTest
@ValueSource(strings = {"YQo=", "YWJjZGVmZ2hpamtsbW5hCg=="})
void validateSharedKey_success(String input) {
AzureStorageValidator.validateSharedKey(input);
}

@ParameterizedTest
@NullAndEmptySource
@ValueSource(strings = {" ", "YWJjZGVmZ2hpamtsbW5hCg="})
void validateSharedKey_fail(String input) {
assertThatExceptionOfType(IllegalArgumentException.class)
.isThrownBy(() -> AzureStorageValidator.validateSharedKey(input));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,10 @@ public static String createSharedKey() {
return faker.lorem().characters();
}

public static String createSharedAccessSignature() {
return faker.lorem().characters();
}

private AzureStorageTestFixtures() {
}

Expand Down
47 changes: 2 additions & 45 deletions extensions/azure/data-plane/storage/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,49 +6,6 @@ This module contains a Data Plane extension to copy data to and from Azure Blob

When used as a source, it currently only supports copying a single blob.

### Example usage
The source `keyName` should reference a vault entry containing a storage [Shared Key](https://docs.microsoft.com/rest/api/storageservices/authorize-with-shared-key).

Create two Azure storage accounts with arbitrary names, here called ACCOUNT1 and ACCOUNT2.

In ACCOUNT1, create a storage container named `src`.

In ACCOUNT2, create a storage container named `dest`.

In ACCOUNT1, under storage container `src`, upload a file named `file.txt` with arbitrary content.

Run the Data Plane server:

```sh
env web.http.public.port=9191 web.http.control.path=/control ./gradlew :launchers:data-plane-server:run
```

In another terminal, send a data flow request, replacing ACCOUNT1 and ACCOUNT2 with the storage account names, and KEY1 and KEY2 with their respective access keys.

```sh
curl 'http://localhost:8181/control/transfer' \
--header 'Content-Type: application/json' \
--data '{
"id": "B4819DE5-8B9F-4B44-8227-F37CF94744E9",
"edctype": "dataspaceconnector:dataflowrequest",
"processId": "6593E90E-DD13-4132-A6D0-ADEB02C32ECB",
"sourceDataAddress": {
"properties": {
"type": "AzureStorage",
"account": "ACCOUNT1",
"container": "src",
"blob": "file.txt",
"sharedKey": "KEY1"
}
},
"destinationDataAddress": {
"properties": {
"type": "AzureStorage",
"account": "ACCOUNT2",
"container": "dest",
"sharedKey": "KEY2"
}
}
}'
```

The file `file.txt` is now copied into ACCOUNT2 under the container `dest`.
The destination `keyName` should reference a vault entry containing a JSON-serialized `AzureSasToken` object wrapping a [storage access signature](https://docs.microsoft.com/azure/storage/common/storage-sas-overview).
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import org.eclipse.dataspaceconnector.azure.dataplane.azurestorage.pipeline.AzureStorageDataSourceFactory;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataTransferExecutorServiceContainer;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.dataspaceconnector.spi.security.Vault;
import org.eclipse.dataspaceconnector.spi.system.Inject;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
Expand Down Expand Up @@ -49,11 +50,12 @@ public String name() {
@Override
public void initialize(ServiceExtensionContext context) {
var monitor = context.getMonitor();
Vault vault = context.getService(Vault.class);

var sourceFactory = new AzureStorageDataSourceFactory(blobStoreApi, retryPolicy, monitor);
var sourceFactory = new AzureStorageDataSourceFactory(blobStoreApi, retryPolicy, monitor, vault);
pipelineService.registerFactory(sourceFactory);

var sinkFactory = new AzureStorageDataSinkFactory(blobStoreApi, executorContainer.getExecutorService(), 5, monitor);
var sinkFactory = new AzureStorageDataSinkFactory(blobStoreApi, executorContainer.getExecutorService(), 5, monitor, vault, context.getTypeManager());
pipelineService.registerFactory(sinkFactory);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.azure.dataplane.azurestorage.pipeline;

import com.azure.core.credential.AzureSasCredential;
import org.eclipse.dataspaceconnector.azure.blob.core.api.BlobStoreApi;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.ParallelSink;
Expand All @@ -35,7 +36,7 @@ public class AzureStorageDataSink extends ParallelSink {

private String accountName;
private String containerName;
private String sharedKey;
private String sharedAccessSignature;
private BlobStoreApi blobStoreApi;

/**
Expand All @@ -45,7 +46,7 @@ protected StatusResult<Void> transferParts(List<DataSource.Part> parts) {
for (DataSource.Part part : parts) {
String blobName = part.name();
try (var input = part.openStream()) {
try (var output = blobStoreApi.getBlobAdapter(accountName, containerName, blobName, sharedKey)
try (var output = blobStoreApi.getBlobAdapter(accountName, containerName, blobName, new AzureSasCredential(sharedAccessSignature))
.getOutputStream()) {
try {
input.transferTo(output);
Expand All @@ -66,7 +67,7 @@ protected StatusResult<Void> transferParts(List<DataSource.Part> parts) {
protected StatusResult<Void> complete() {
try {
// Write an empty blob to indicate completion
blobStoreApi.getBlobAdapter(accountName, containerName, COMPLETE_BLOB_NAME, sharedKey)
blobStoreApi.getBlobAdapter(accountName, containerName, COMPLETE_BLOB_NAME, new AzureSasCredential(sharedAccessSignature))
.getOutputStream().close();
} catch (Exception e) {
return getTransferResult(e, "Error creating blob %s on account %s", COMPLETE_BLOB_NAME, accountName);
Expand Down Expand Up @@ -100,8 +101,8 @@ public Builder containerName(String containerName) {
return this;
}

public Builder sharedKey(String sharedKey) {
sink.sharedKey = sharedKey;
public Builder sharedAccessSignature(String sharedAccessSignature) {
sink.sharedAccessSignature = sharedAccessSignature;
return this;
}

Expand All @@ -113,7 +114,7 @@ public Builder blobStoreApi(BlobStoreApi blobStoreApi) {
protected void validate() {
Objects.requireNonNull(sink.accountName, "accountName");
Objects.requireNonNull(sink.containerName, "containerName");
Objects.requireNonNull(sink.sharedKey, "sharedKey");
Objects.requireNonNull(sink.sharedAccessSignature, "sharedAccessSignature");
Objects.requireNonNull(sink.blobStoreApi, "blobStoreApi");
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package org.eclipse.dataspaceconnector.azure.dataplane.azurestorage.pipeline;

import org.eclipse.dataspaceconnector.azure.blob.core.AzureBlobStoreSchema;
import org.eclipse.dataspaceconnector.azure.blob.core.AzureSasToken;
import org.eclipse.dataspaceconnector.azure.blob.core.api.BlobStoreApi;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSink;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSinkFactory;
import org.eclipse.dataspaceconnector.spi.EdcException;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.security.Vault;
import org.eclipse.dataspaceconnector.spi.types.TypeManager;
import org.eclipse.dataspaceconnector.spi.types.domain.DataAddress;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.jetbrains.annotations.NotNull;
Expand All @@ -31,7 +34,6 @@
import static java.lang.String.format;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateAccountName;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateContainerName;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateSharedKey;

/**
* Instantiates {@link AzureStorageDataSink}s for requests whose source data type is {@link AzureBlobStoreSchema#TYPE}.
Expand All @@ -41,12 +43,16 @@ public class AzureStorageDataSinkFactory implements DataSinkFactory {
private final ExecutorService executorService;
private final int partitionSize;
private final Monitor monitor;
private final Vault vault;
private final TypeManager typeManager;

public AzureStorageDataSinkFactory(BlobStoreApi blobStoreApi, ExecutorService executorService, int partitionSize, Monitor monitor) {
public AzureStorageDataSinkFactory(BlobStoreApi blobStoreApi, ExecutorService executorService, int partitionSize, Monitor monitor, Vault vault, TypeManager typeManager) {
this.blobStoreApi = blobStoreApi;
this.executorService = executorService;
this.partitionSize = partitionSize;
this.monitor = monitor;
this.vault = vault;
this.typeManager = typeManager;
}

@Override
Expand All @@ -61,7 +67,7 @@ public boolean canHandle(DataFlowRequest request) {
try {
validateAccountName(properties.remove(AzureBlobStoreSchema.ACCOUNT_NAME));
validateContainerName(properties.remove(AzureBlobStoreSchema.CONTAINER_NAME));
validateSharedKey(properties.remove(AzureBlobStoreSchema.SHARED_KEY));
properties.remove(DataAddress.KEY_NAME);
properties.keySet().stream().filter(k -> !DataAddress.TYPE.equals(k)).findFirst().ifPresent(k -> {
throw new IllegalArgumentException(format("Unexpected property %s", k));
});
Expand All @@ -81,10 +87,13 @@ public DataSink createSink(DataFlowRequest request) {
var dataAddress = request.getDestinationDataAddress();
var requestId = request.getId();

var secret = vault.resolveSecret(dataAddress.getKeyName());
var token = typeManager.readValue(secret, AzureSasToken.class);

return AzureStorageDataSink.Builder.newInstance()
.accountName(dataAddress.getProperty(AzureBlobStoreSchema.ACCOUNT_NAME))
.containerName(dataAddress.getProperty(AzureBlobStoreSchema.CONTAINER_NAME))
.sharedKey(dataAddress.getProperty(AzureBlobStoreSchema.SHARED_KEY))
.sharedAccessSignature(token.getSas())
.requestId(requestId)
.partitionSize(partitionSize)
.blobStoreApi(blobStoreApi)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.dataspaceconnector.spi.EdcException;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.security.Vault;
import org.eclipse.dataspaceconnector.spi.types.domain.DataAddress;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;
import org.jetbrains.annotations.NotNull;
Expand All @@ -32,7 +33,6 @@
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateAccountName;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateBlobName;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateContainerName;
import static org.eclipse.dataspaceconnector.azure.blob.core.validator.AzureStorageValidator.validateSharedKey;

/**
* Instantiates {@link AzureStorageDataSource}s for requests whose source data type is {@link AzureBlobStoreSchema#TYPE}.
Expand All @@ -41,11 +41,13 @@ public class AzureStorageDataSourceFactory implements DataSourceFactory {
private final BlobStoreApi blobStoreApi;
private final RetryPolicy<Object> retryPolicy;
private final Monitor monitor;
private final Vault vault;

public AzureStorageDataSourceFactory(BlobStoreApi blobStoreApi, RetryPolicy<Object> retryPolicy, Monitor monitor) {
public AzureStorageDataSourceFactory(BlobStoreApi blobStoreApi, RetryPolicy<Object> retryPolicy, Monitor monitor, Vault vault) {
this.blobStoreApi = blobStoreApi;
this.retryPolicy = retryPolicy;
this.monitor = monitor;
this.vault = vault;
}

@Override
Expand All @@ -61,7 +63,7 @@ public boolean canHandle(DataFlowRequest request) {
validateAccountName(properties.remove(AzureBlobStoreSchema.ACCOUNT_NAME));
validateContainerName(properties.remove(AzureBlobStoreSchema.CONTAINER_NAME));
validateBlobName(properties.remove(AzureBlobStoreSchema.BLOB_NAME));
validateSharedKey(properties.remove(AzureBlobStoreSchema.SHARED_KEY));
properties.remove(DataAddress.KEY_NAME);
properties.keySet().stream().filter(k -> !DataAddress.TYPE.equals(k)).findFirst().ifPresent(k -> {
throw new IllegalArgumentException(format("Unexpected property %s", k));
});
Expand All @@ -82,7 +84,7 @@ public DataSource createSource(DataFlowRequest request) {
return AzureStorageDataSource.Builder.newInstance()
.accountName(dataAddress.getProperty(AzureBlobStoreSchema.ACCOUNT_NAME))
.containerName(dataAddress.getProperty(AzureBlobStoreSchema.CONTAINER_NAME))
.sharedKey(dataAddress.getProperty(AzureBlobStoreSchema.SHARED_KEY))
.sharedKey(vault.resolveSecret(dataAddress.getKeyName()))
.blobStoreApi(blobStoreApi)
.blobName(dataAddress.getProperty(AzureBlobStoreSchema.BLOB_NAME))
.requestId(request.getId())
Expand Down
Loading