diff --git a/edc-extensions/control-plane-adapter/README.md b/edc-extensions/control-plane-adapter/README.md index 9a9e56fa4..7b6dfa31b 100644 --- a/edc-extensions/control-plane-adapter/README.md +++ b/edc-extensions/control-plane-adapter/README.md @@ -4,23 +4,36 @@ The goal of this extension is to simplify the process of retrieving data out of Additional requirements, that affects the architecture of the extension: - can return data both in SYNC and ASYNC mode (currently only SYNC endpoint available) -- can be persistent, so that process can be restored from the point where it was before application was stopped (not implemented yet) -- prepared to scale horizontally (not yet implemented) +- can be persistent, so that process can be restored from the point where it was before application was stopped +- scaling horizontally (when persistence is added to configuration) - can retry failed part of the process (no need to start the process from the beginning) -Configuration: +## Configuration: -| Key | Description | Mandatory | Default | -|:-------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------| -| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 | -| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 | -| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 | -| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement. Value 1 = on, 0 = off. | no | 1 | -| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 | +| Key | Description | Mandatory | Default | +|:-------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------| +| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 | +| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 | +| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 | +| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement (if exists) pulled from the EDC. | no | true | +| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 | | edc.cp.adapter.catalog.request.limit | Maximum number of items taken from Catalog within single request. Requests are repeated until all offers of the query are retrieved | no | 100 | +By default, the extension works in "IN MEMORY" mode. This setup has some limitations: ++ It can work only within single EDC instance. If CP-adapter requests are handled by more than one EDC, data flow may be broken. ++ If the EDC instance is restarted, all running processes are lost. -How to use it: +To run CP-Adapter in "PERSISTENT" mode, You need to create a proper tables with [this](docs/schema.sql) script, and add the following configuration values to Your control-plane EDC properties file: + +| Key | Description | +|-----------------------------------|-------------| +| edc.datasource.cpadapter.name | data source name | +| edc.datasource.cpadapter.url | data source url | +| edc.datasource.cpadapter.user | data source user | +| edc.datasource.cpadapter.password | data source password | + + +## How to use it: 1. Client sends a GET request with two parameters: assetId and the url of the provider control-plane: ``` @@ -33,13 +46,14 @@ Additional requirements, that affects the architecture of the extension: http://localhost:9193/api/v1/data/adapter/asset/sync/123?providerUrl=http://localhost:8182/api/v1/ids/data ``` - Optional request parameters: - - | Name | Description | - |-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---| - | contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. | - | contractAgreementReuse | Similar to edc.cp.adapter.reuse.contract.agreement option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to '0' and new contract agrement will be negotiated. | + Optional request parameters, that overwrite the settings for a single request: + | Name | Description | + |---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|--- | + | contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. | + | contractAgreementReuse | Similar to edc.cp.adapter.reuse.contract.agreement option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to 'false' and new contract agrement will be negotiated. | + | timeout | Similar to edc.cp.adapter.default.sync.request.timeout, defines the maximum time of the request. If data is not ready, time out error will be returned. | + The controller is registered under the context alias of DataManagement API. The authentication depends on the DataManagement configuration. To find out more please visit: @@ -70,7 +84,7 @@ Additional requirements, that affects the architecture of the extension: header: Authorization:eyJhbGciOiJSUzI1NiJ9.eyJkYWQiOi... {authKey:authCode} ``` -Internal design of the extension: +### Internal design of the extension: ![diagram](src/main/resources/control-plane-adapter.jpg) diff --git a/edc-extensions/control-plane-adapter/docs/schema.sql b/edc-extensions/control-plane-adapter/docs/schema.sql new file mode 100644 index 000000000..3e64d3c45 --- /dev/null +++ b/edc-extensions/control-plane-adapter/docs/schema.sql @@ -0,0 +1,54 @@ +-- +-- Copyright (c) 2022 ZF Friedrichshafen AG +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- ZF Friedrichshafen AG - Initial SQL Query +-- + +-- Statements are designed for and tested with Postgres only! + + +CREATE TABLE IF NOT EXISTS edc_lease +( + leased_by VARCHAR NOT NULL, + leased_at BIGINT, + lease_duration INTEGER NOT NULL, + lease_id VARCHAR NOT NULL + CONSTRAINT lease_pk + PRIMARY KEY +); + +CREATE TABLE IF NOT EXISTS edc_cpadapter_queue +( + id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + channel VARCHAR, + message JSON, + invoke_after BIGINT NOT NULL, + lease_id VARCHAR + CONSTRAINT cpadapter_queue_lease_lease_id_fk + REFERENCES edc_lease + ON DELETE SET NULL, + PRIMARY KEY (id) +); + +CREATE UNIQUE INDEX IF NOT EXISTS edc_cpadapter_queue_id_uindex + ON edc_cpadapter_queue (id); + +CREATE TABLE IF NOT EXISTS edc_cpadapter_object_store +( + id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + type VARCHAR, + object JSON, + PRIMARY KEY (id) +); + + + diff --git a/edc-extensions/control-plane-adapter/pom.xml b/edc-extensions/control-plane-adapter/pom.xml index bdb97d1b5..f6fe645a1 100644 --- a/edc-extensions/control-plane-adapter/pom.xml +++ b/edc-extensions/control-plane-adapter/pom.xml @@ -115,6 +115,28 @@ aggregate-service-spi + + org.eclipse.edc + sql-core + + + org.eclipse.edc + sql-lease + ${org.eclipse.edc.version} + + + org.eclipse.edc + sql-pool-apache-commons + + + org.eclipse.edc + transaction-datasource-spi + + + org.postgresql + postgresql + + org.projectlombok diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterConfig.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterConfig.java index 124addea6..cd243f9ba 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterConfig.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterConfig.java @@ -21,12 +21,24 @@ public class ApiAdapterConfig { "edc.cp.adapter.default.message.retry.number"; private static final String DEFAULT_SYNC_REQUEST_TIMEOUT = "edc.cp.adapter.default.sync.request.timeout"; - private static final String IN_MEMORY_MESSAGE_BUS_THREAD_NUMBER = - "edc.cp.adapter.messagebus.inmemory.thread.number"; private static final String CATALOG_EXPIRE_AFTER_TIME = "edc.cp.adapter.cache.catalog.expire.after"; - private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit"; private static final String REUSE_CONTRACT_AGREEMENT = "edc.cp.adapter.reuse.contract.agreement"; + private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit"; + + private static final String DATASOURCE_NAME = "edc.datasource.cpadapter.name"; + private static final String DATASOURCE_URL = "edc.datasource.cpadapter.url"; + private static final String DATASOURCE_USER = "edc.datasource.cpadapter.user"; + private static final String DATASOURCE_PASS = "edc.datasource.cpadapter.password"; + + private static final String IN_MEMORY_MESSAGE_BUS_THREAD_NUMBER = + "edc.cp.adapter.messagebus.inmemory.thread.number"; + private static final String SQL_MESSAGE_BUS_THREAD_NUMBER = + "edc.cp.adapter.messagebus.sql.thread.number"; + private static final String SQL_MESSAGE_BUS_MAX_DELIVERY = + "edc.cp.adapter.messagebus.sql.max.delivery"; + private static final String SQL_MESSAGE_BUS_DELIVERY_INTERVAL = + "edc.cp.adapter.messagebus.sql.max.delivery"; private final ServiceExtensionContext context; @@ -47,14 +59,42 @@ public int getInMemoryMessageBusThreadNumber() { } public boolean isContractAgreementReuseOn() { - return context.getSetting(REUSE_CONTRACT_AGREEMENT, 1) != 0; + return context.getSetting(REUSE_CONTRACT_AGREEMENT, true); } public int getCatalogExpireAfterTime() { - return context.getSetting(CATALOG_EXPIRE_AFTER_TIME, 3600); + return context.getSetting(CATALOG_EXPIRE_AFTER_TIME, 180); } public int getCatalogRequestLimit() { return context.getSetting(CATALOG_REQUEST_LIMIT, 100); } + + public String getDataSourceName() { + return context.getSetting(DATASOURCE_NAME, "cpadapter"); + } + + public String getDataSourceUrl() { + return context.getSetting(DATASOURCE_URL, null); + } + + public String getDataSourceUser() { + return context.getSetting(DATASOURCE_USER, null); + } + + public String getDataSourcePass() { + return context.getSetting(DATASOURCE_PASS, null); + } + + public int getSqlMessageBusThreadNumber() { + return context.getSetting(SQL_MESSAGE_BUS_THREAD_NUMBER, 10); + } + + public int getSqlMessageBusMaxDelivery() { + return context.getSetting(SQL_MESSAGE_BUS_MAX_DELIVERY, 10); + } + + public int getSqlMessageBusDeliveryInterval() { + return context.getSetting(SQL_MESSAGE_BUS_DELIVERY_INTERVAL, 1); + } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterExtension.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterExtension.java index 27343e028..16eb0a32a 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterExtension.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterExtension.java @@ -16,10 +16,15 @@ import static java.util.Objects.nonNull; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Clock; +import java.util.Objects; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import org.eclipse.edc.connector.api.management.configuration.ManagementApiConfiguration; import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationListener; import org.eclipse.edc.connector.contract.spi.negotiation.observe.ContractNegotiationObservable; -import org.eclipse.edc.connector.contract.spi.negotiation.store.ContractNegotiationStore; import org.eclipse.edc.connector.spi.catalog.CatalogService; import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; import org.eclipse.edc.connector.spi.contractnegotiation.ContractNegotiationService; @@ -27,26 +32,28 @@ import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiver; import org.eclipse.edc.connector.transfer.spi.edr.EndpointDataReferenceReceiverRegistry; import org.eclipse.edc.runtime.metamodel.annotation.Inject; -import org.eclipse.edc.spi.message.RemoteMessageDispatcherRegistry; import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.system.ServiceExtension; import org.eclipse.edc.spi.system.ServiceExtensionContext; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; import org.eclipse.edc.transaction.spi.TransactionContext; import org.eclipse.edc.web.spi.WebService; -import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel; -import org.eclipse.tractusx.edc.cp.adapter.messaging.InMemoryMessageBus; -import org.eclipse.tractusx.edc.cp.adapter.messaging.ListenerService; +import org.eclipse.tractusx.edc.cp.adapter.messaging.*; import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.CatalogCachedRetriever; import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.CatalogRetriever; import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractAgreementRetriever; import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractNegotiationHandler; import org.eclipse.tractusx.edc.cp.adapter.process.contractnotification.*; -import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefInMemorySyncService; -import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefNotificationSyncService; -import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataReferenceHandler; -import org.eclipse.tractusx.edc.cp.adapter.process.datareference.EndpointDataReferenceReceiverImpl; +import org.eclipse.tractusx.edc.cp.adapter.process.datareference.*; import org.eclipse.tractusx.edc.cp.adapter.service.ErrorResultService; import org.eclipse.tractusx.edc.cp.adapter.service.ResultService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceInMemory; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceSql; +import org.eclipse.tractusx.edc.cp.adapter.store.SqlObjectStore; +import org.eclipse.tractusx.edc.cp.adapter.store.SqlQueueStore; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.postgres.PostgresDialectObjectStoreStatements; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.postgres.PostgresDialectQueueStatements; import org.eclipse.tractusx.edc.cp.adapter.util.ExpiringMap; import org.eclipse.tractusx.edc.cp.adapter.util.LockMap; @@ -55,14 +62,14 @@ public class ApiAdapterExtension implements ServiceExtension { @Inject private ContractNegotiationObservable negotiationObservable; @Inject private WebService webService; @Inject private ContractNegotiationService contractNegotiationService; - @Inject private RemoteMessageDispatcherRegistry dispatcher; @Inject private EndpointDataReferenceReceiverRegistry receiverRegistry; @Inject private ManagementApiConfiguration apiConfig; @Inject private TransferProcessService transferProcessService; - @Inject private ContractNegotiationStore contractNegotiationStore; @Inject private TransactionContext transactionContext; @Inject private CatalogService catalogService; @Inject private ContractAgreementService agreementService; + @Inject private DataSourceRegistry dataSourceRegistry; + @Inject private Clock clock; @Override public String name() { @@ -73,16 +80,19 @@ public String name() { public void initialize(ServiceExtensionContext context) { ApiAdapterConfig config = new ApiAdapterConfig(context); ListenerService listenerService = new ListenerService(); - InMemoryMessageBus messageBus = - new InMemoryMessageBus( - monitor, listenerService, config.getInMemoryMessageBusThreadNumber()); - ResultService resultService = new ResultService(config.getDefaultSyncRequestTimeout()); + MessageBus messageBus = createMessageBus(listenerService, context, config); + ObjectStoreService storeService = getStoreService(context, config); + + ResultService resultService = new ResultService(config.getDefaultSyncRequestTimeout(), monitor); ErrorResultService errorResultService = new ErrorResultService(monitor, messageBus); + ContractNotificationSyncService contractSyncService = - new ContractInMemorySyncService(new LockMap()); + new ContractSyncService(storeService, new LockMap()); + DataTransferInitializer dataTransferInitializer = new DataTransferInitializer(monitor, transferProcessService); + ContractNotificationHandler contractNotificationHandler = new ContractNotificationHandler( monitor, @@ -90,10 +100,12 @@ public void initialize(ServiceExtensionContext context) { contractSyncService, contractNegotiationService, dataTransferInitializer); + ContractNegotiationHandler contractNegotiationHandler = getContractNegotiationHandler(monitor, contractNegotiationService, messageBus, config); + DataRefNotificationSyncService dataRefSyncService = - new DataRefInMemorySyncService(new LockMap()); + new DataRefSyncService(storeService, new LockMap()); DataReferenceHandler dataReferenceHandler = new DataReferenceHandler(monitor, messageBus, dataRefSyncService); @@ -103,17 +115,74 @@ public void initialize(ServiceExtensionContext context) { listenerService.addListener(Channel.RESULT, resultService); listenerService.addListener(Channel.DLQ, errorResultService); - initHttpController(monitor, messageBus, resultService, config); + initHttpController(messageBus, resultService, config); initContractNegotiationListener( - monitor, negotiationObservable, messageBus, contractSyncService, dataTransferInitializer); - initDataReferenceReceiver(monitor, messageBus, dataRefSyncService); + negotiationObservable, messageBus, contractSyncService, dataTransferInitializer); + initDataReferenceReceiver(messageBus, dataRefSyncService); + initDataRefErrorHandler(messageBus, storeService, transferProcessService); + } + + private MessageBus createMessageBus( + ListenerService listenerService, ServiceExtensionContext context, ApiAdapterConfig config) { + if (!isPersistenceConfigured(config)) { + monitor.info( + "Persistent layer configuration is missing. Starting MessageBus in 'IN MEMORY' mode."); + return new InMemoryMessageBus( + monitor, listenerService, config.getInMemoryMessageBusThreadNumber()); + } + + SqlQueueStore sqlQueueStore = + new SqlQueueStore( + dataSourceRegistry, + config.getDataSourceName(), + transactionContext, + context.getTypeManager().getMapper(), + new PostgresDialectQueueStatements(), + context.getConnectorId(), + clock); + SqlMessageBus messageBus = + new SqlMessageBus( + monitor, + listenerService, + sqlQueueStore, + config.getSqlMessageBusThreadNumber(), + config.getSqlMessageBusMaxDelivery()); + initMessageBus(messageBus, config); + return messageBus; + } + + private ObjectStoreService getStoreService( + ServiceExtensionContext context, ApiAdapterConfig config) { + if (!isPersistenceConfigured(config)) { + monitor.info( + "Persistent layer configuration is missing. Starting Control Plane Adapter Extension in 'IN MEMORY' mode."); + return new ObjectStoreServiceInMemory(context.getTypeManager().getMapper()); + } + + ObjectMapper mapper = context.getTypeManager().getMapper(); + SqlObjectStore objectStore = + new SqlObjectStore( + dataSourceRegistry, + config.getDataSourceName(), + transactionContext, + mapper, + new PostgresDialectObjectStoreStatements()); + return new ObjectStoreServiceSql(mapper, objectStore); + } + + private void initMessageBus(SqlMessageBus messageBus, ApiAdapterConfig config) { + final int poolSize = 1; + final int initialDelay = 5; + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(poolSize); + scheduler.scheduleAtFixedRate( + () -> messageBus.deliverMessages(config.getSqlMessageBusMaxDelivery()), + initialDelay, + config.getSqlMessageBusDeliveryInterval(), + TimeUnit.SECONDS); } private void initHttpController( - Monitor monitor, - InMemoryMessageBus messageBus, - ResultService resultService, - ApiAdapterConfig config) { + MessageBus messageBus, ResultService resultService, ApiAdapterConfig config) { webService.registerResource( apiConfig.getContextAlias(), new HttpController(monitor, resultService, messageBus, config)); @@ -122,7 +191,7 @@ private void initHttpController( private ContractNegotiationHandler getContractNegotiationHandler( Monitor monitor, ContractNegotiationService contractNegotiationService, - InMemoryMessageBus messageBus, + MessageBus messageBus, ApiAdapterConfig config) { return new ContractNegotiationHandler( monitor, @@ -135,18 +204,15 @@ private ContractNegotiationHandler getContractNegotiationHandler( } private void initDataReferenceReceiver( - Monitor monitor, - InMemoryMessageBus messageBus, - DataRefNotificationSyncService dataRefSyncService) { + MessageBus messageBus, DataRefNotificationSyncService dataRefSyncService) { EndpointDataReferenceReceiver dataReferenceReceiver = new EndpointDataReferenceReceiverImpl(monitor, messageBus, dataRefSyncService); receiverRegistry.registerReceiver(dataReferenceReceiver); } private void initContractNegotiationListener( - Monitor monitor, ContractNegotiationObservable negotiationObservable, - InMemoryMessageBus messageBus, + MessageBus messageBus, ContractNotificationSyncService contractSyncService, DataTransferInitializer dataTransferInitializer) { ContractNegotiationListener contractNegotiationListener = @@ -156,4 +222,26 @@ private void initContractNegotiationListener( negotiationObservable.registerListener(contractNegotiationListener); } } + + private void initDataRefErrorHandler( + MessageBus messageBus, + ObjectStoreService objectStore, + TransferProcessService transferProcessService) { + + final int poolSize = 1; + final int initialDelay = 5; + final int interval = 5; + ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(poolSize); + + DataReferenceErrorHandler errorHandler = + new DataReferenceErrorHandler(monitor, messageBus, objectStore, transferProcessService); + + scheduler.scheduleAtFixedRate( + errorHandler::validateActiveProcesses, initialDelay, interval, TimeUnit.SECONDS); + } + + private boolean isPersistenceConfigured(ApiAdapterConfig config) { + return Objects.nonNull(config.getDataSourceName()) + && Objects.nonNull(config.getDataSourceUrl()); + } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/HttpController.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/HttpController.java index 5db422535..1d6bbc3fa 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/HttpController.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/HttpController.java @@ -20,8 +20,10 @@ import jakarta.ws.rs.core.MediaType; import jakarta.ws.rs.core.Response; import java.util.Objects; +import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.util.string.StringUtils; import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel; @@ -45,7 +47,8 @@ public Response getAssetSynchronous( @PathParam("assetId") String assetId, @QueryParam("providerUrl") String providerUrl, @QueryParam("contractAgreementId") String contractAgreementId, - @QueryParam("contractAgreementReuse") String contractAgreementReuse) { + @QueryParam("contractAgreementReuse") @DefaultValue("true") boolean contractAgreementReuse, + @QueryParam("timeout") String timeout) { if (invalidParams(assetId, providerUrl)) { return badRequestResponse(); @@ -55,7 +58,10 @@ public Response getAssetSynchronous( initiateProcess(assetId, providerUrl, contractAgreementId, contractAgreementReuse); try { - ProcessData processData = resultService.pull(traceId); + ProcessData processData = + StringUtils.isNullOrEmpty(timeout) || !isNumeric(timeout) + ? resultService.pull(traceId) + : resultService.pull(traceId, Long.parseLong(timeout), TimeUnit.SECONDS); if (Objects.isNull(processData)) { return notFoundResponse(); @@ -87,7 +93,7 @@ private String initiateProcess( String assetId, String providerUrl, String contractAgreementId, - String contractAgreementReuse) { + boolean contractAgreementReuse) { ProcessData processData = ProcessData.builder() .assetId(assetId) @@ -103,8 +109,8 @@ private String initiateProcess( return message.getTraceId(); } - private boolean isContractAgreementReuseOn(String contractAgreementReuse) { - return !"0".equals(contractAgreementReuse) && config.isContractAgreementReuseOn(); + private boolean isContractAgreementReuseOn(boolean contractAgreementReuse) { + return contractAgreementReuse && config.isContractAgreementReuseOn(); } private Response notFoundResponse() { @@ -130,4 +136,8 @@ private Response timeoutResponse() { .entity(Response.Status.REQUEST_TIMEOUT.getReasonPhrase()) .build(); } + + private boolean isNumeric(String str) { + return str != null && str.matches("[0-9]+"); + } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/DataReferenceRetrievalDto.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/DataReferenceRetrievalDto.java index f623032ae..675a991e4 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/DataReferenceRetrievalDto.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/DataReferenceRetrievalDto.java @@ -14,8 +14,10 @@ package org.eclipse.tractusx.edc.cp.adapter.dto; +import lombok.NoArgsConstructor; import org.eclipse.tractusx.edc.cp.adapter.messaging.Message; +@NoArgsConstructor public class DataReferenceRetrievalDto extends Message { public DataReferenceRetrievalDto(ProcessData payload, int retryLimit) { super(payload, retryLimit); diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/ProcessData.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/ProcessData.java index 4555db9df..98792df2a 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/ProcessData.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/ProcessData.java @@ -17,21 +17,20 @@ import static java.lang.System.currentTimeMillis; import jakarta.ws.rs.core.Response; -import lombok.Builder; -import lombok.Getter; -import lombok.Setter; -import lombok.ToString; +import lombok.*; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; @Getter @ToString @Builder +@NoArgsConstructor +@AllArgsConstructor public class ProcessData { private final long timestamp = currentTimeMillis(); // request data - private final String assetId; - private final String provider; + private String assetId; + private String provider; private String contractOfferId; private int catalogExpiryTime; private boolean contractAgreementReuseOn; @@ -40,6 +39,7 @@ public class ProcessData { @Setter private String contractNegotiationId; @Setter private String contractAgreementId; @Builder.Default @Setter private boolean isContractConfirmed = false; + @Setter private String transferProcessId; // result/response data @Setter private EndpointDataReference endpointDataReference; diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/Message.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/Message.java index 610c14877..9bb67c414 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/Message.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/Message.java @@ -15,15 +15,20 @@ package org.eclipse.tractusx.edc.cp.adapter.messaging; import java.util.UUID; -import java.util.concurrent.atomic.AtomicInteger; import lombok.Getter; +import lombok.NoArgsConstructor; +import lombok.Setter; +@NoArgsConstructor +@Getter +@Setter public abstract class Message { - @Getter private final String traceId; - @Getter private final T payload; - private final AtomicInteger errorNumber = new AtomicInteger(); - private final int retryLimit; - @Getter private Exception finalException; + private String traceId; + private T payload; + private int errorNumber; + private int retryLimit; + private Exception exception; + private Exception finalException; public Message(String traceId, T payload, int retryLimit) { this.traceId = traceId; @@ -38,16 +43,16 @@ public Message(T payload, int retryLimit) { } protected long unsucceeded() { - errorNumber.incrementAndGet(); + errorNumber++; return getDelayTime(); } protected void clearErrors() { - errorNumber.set(0); + errorNumber = 0; } protected boolean canRetry() { - return errorNumber.get() < retryLimit; + return errorNumber < retryLimit; } protected void setFinalException(Exception e) { @@ -55,8 +60,6 @@ protected void setFinalException(Exception e) { } private int getDelayTime() { - return errorNumber.get() < 5 - ? errorNumber.get() * 750 - : (int) Math.pow(errorNumber.get(), 2) * 150; + return errorNumber < 5 ? errorNumber * 750 : (int) Math.pow(errorNumber, 2) * 150; } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBus.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBus.java new file mode 100644 index 000000000..d76079c53 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBus.java @@ -0,0 +1,101 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.messaging; + +import static java.util.Objects.isNull; + +import java.time.Instant; +import java.util.List; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.tractusx.edc.cp.adapter.store.SqlQueueStore; +import org.eclipse.tractusx.edc.cp.adapter.store.model.QueueMessage; + +public class SqlMessageBus implements MessageBus { + private final int maxDelivery; + private final Monitor monitor; + private final ListenerService listenerService; + private final ScheduledExecutorService executorService; + private final SqlQueueStore store; + + public SqlMessageBus( + Monitor monitor, + ListenerService listenerService, + SqlQueueStore sqlQueueStore, + int threadPoolSize, + int maxDelivery) { + this.monitor = monitor; + this.listenerService = listenerService; + this.store = sqlQueueStore; + this.maxDelivery = maxDelivery; + this.executorService = Executors.newScheduledThreadPool(threadPoolSize); + } + + @Override + public void send(Channel channel, Message message) { + if (isNull(message)) { + monitor.warning(String.format("Message is empty, channel: %s", channel)); + return; + } + monitor.info(String.format("[%s] Message sent to channel: %s", message.getTraceId(), channel)); + long now = Instant.now().toEpochMilli(); + store.saveMessage( + QueueMessage.builder().channel(channel.name()).message(message).invokeAfter(now).build()); + + deliverMessages(maxDelivery); + } + + public void deliverMessages(int maxElements) { + List queueMessages = store.findMessagesToSend(maxElements); + monitor.debug(String.format("Found [%d] messages to send.", queueMessages.size())); + queueMessages.forEach( + queueMessage -> executorService.submit(() -> deliverMessage(queueMessage))); + } + + private void deliverMessage(QueueMessage queueMessage) { + Channel channel = Channel.valueOf(queueMessage.getChannel()); + Message message = queueMessage.getMessage(); + + int currentErrorNumber = message.getErrorNumber(); + message.clearErrors(); + + try { + listenerService.getListener(channel).process(message); + store.deleteMessage(queueMessage.getId()); + monitor.debug(String.format("[%s] Message sent and removed.", queueMessage.getId())); + } catch (Exception e) { + monitor.warning(String.format("[%s] Message processing error.", message.getTraceId()), e); + message.setErrorNumber(currentErrorNumber); + if (!message.canRetry()) { + monitor.warning(String.format("[%s] Message reached retry limit!", message.getTraceId())); + sendMessageToDlq(message, e); + store.deleteMessage(queueMessage.getId()); + return; + } + long delayTime = message.unsucceeded(); + long now = Instant.now().toEpochMilli(); + queueMessage.setInvokeAfter(now + delayTime); + message.setException(e); + store.updateMessage(queueMessage); + } + } + + private void sendMessageToDlq(Message message, Exception finalException) { + message.clearErrors(); + message.setFinalException(finalException); + send(Channel.DLQ, message); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/CatalogCachedRetriever.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/CatalogCachedRetriever.java index f0ee4187e..b57d6c9bc 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/CatalogCachedRetriever.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/CatalogCachedRetriever.java @@ -17,6 +17,7 @@ import java.util.Objects; import lombok.RequiredArgsConstructor; import org.eclipse.edc.catalog.spi.Catalog; +import org.eclipse.edc.util.collection.CollectionUtil; import org.eclipse.tractusx.edc.cp.adapter.util.ExpiringMap; @RequiredArgsConstructor @@ -35,7 +36,11 @@ public Catalog getEntireCatalog(String providerUrl, String assetId, int catalogE } catalog = catalogRetriever.getEntireCatalog(providerUrl, assetId); - catalogCache.put(getKey(providerUrl, assetId), catalog); + + if (Objects.nonNull(catalog) && CollectionUtil.isNotEmpty(catalog.getContractOffers())) { + catalogCache.put(getKey(providerUrl, assetId), catalog); + } + return catalog; } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractNegotiationHandler.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractNegotiationHandler.java index 34118fb8b..b0d7bf237 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractNegotiationHandler.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractNegotiationHandler.java @@ -49,7 +49,7 @@ public void process(DataReferenceRetrievalDto dto) { ContractAgreement contractAgreement = getContractAgreementById(dto); if (Objects.nonNull(dto.getPayload().getContractAgreementId()) && contractAgreement == null) { - sendNotFoundErrorResult(dto); + sendNotFoundErrorResult(dto, getAgreementNotFoundMessage(dto)); return; } @@ -73,6 +73,11 @@ public void process(DataReferenceRetrievalDto dto) { processData.getProvider(), processData.getCatalogExpiryTime()); + if (Objects.isNull(contractOffer)) { + sendNotFoundErrorResult(dto, getContractNotFoundMessage(dto)); + return; + } + String contractNegotiationId = initializeContractNegotiation( contractOffer, dto.getPayload().getProvider(), dto.getTraceId()); @@ -100,9 +105,7 @@ private ContractOffer findContractOffer( return Optional.ofNullable(catalog.getContractOffers()).orElse(Collections.emptyList()).stream() .filter(it -> it.getAsset().getId().equals(assetId)) .findFirst() - .orElseThrow( - () -> - new ResourceNotFoundException("Could not find Contract Offer for given Asset Id")); + .orElse(null); } private String initializeContractNegotiation( @@ -125,12 +128,17 @@ private String initializeContractNegotiation( .orElseThrow(() -> new ResourceNotFoundException("Could not find Contract NegotiationId")); } - private void sendNotFoundErrorResult(DataReferenceRetrievalDto dto) { - dto.getPayload() - .setErrorMessage( - "Not found the contract agreement with ID: " - + dto.getPayload().getContractAgreementId()); + private void sendNotFoundErrorResult(DataReferenceRetrievalDto dto, String message) { + dto.getPayload().setErrorMessage(message); dto.getPayload().setErrorStatus(Response.Status.NOT_FOUND); messageBus.send(Channel.RESULT, dto); } + + private String getAgreementNotFoundMessage(DataReferenceRetrievalDto dto) { + return "Not found the contract agreement with ID: " + dto.getPayload().getContractAgreementId(); + } + + private String getContractNotFoundMessage(DataReferenceRetrievalDto dto) { + return "Could not find Contract Offer for given Asset Id"; + } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInfo.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInfo.java index 03dd4df52..e4437cbbb 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInfo.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInfo.java @@ -14,8 +14,12 @@ package org.eclipse.tractusx.edc.cp.adapter.process.contractnotification; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; import lombok.Getter; +import lombok.NoArgsConstructor; +@NoArgsConstructor +@JsonIgnoreProperties(ignoreUnknown = true) public class ContractInfo { @Getter private String contractAgreementId; private ContractState contractState; diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerImpl.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerImpl.java index aa31dbd1a..a441eddf6 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerImpl.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerImpl.java @@ -74,7 +74,8 @@ public void failed(ContractNegotiation negotiation) { } public void initiateDataTransfer(DataReferenceRetrievalDto dto) { - dataTransfer.initiate(dto); + String transferProcessId = dataTransfer.initiate(dto); + dto.getPayload().setTransferProcessId(transferProcessId); dto.getPayload().setContractConfirmed(true); messageBus.send(Channel.DATA_REFERENCE, dto); } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNotificationHandler.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNotificationHandler.java index e0e55648f..5bd254194 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNotificationHandler.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNotificationHandler.java @@ -73,7 +73,8 @@ public void process(DataReferenceRetrievalDto dto) { } public void initiateDataTransfer(DataReferenceRetrievalDto dto) { - dataTransfer.initiate(dto); + String transferProcessId = dataTransfer.initiate(dto); + dto.getPayload().setTransferProcessId(transferProcessId); dto.getPayload().setContractConfirmed(true); messageBus.send(Channel.DATA_REFERENCE, dto); } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInMemorySyncService.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncService.java similarity index 53% rename from edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInMemorySyncService.java rename to edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncService.java index 3aceef68e..44f3cbce6 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractInMemorySyncService.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncService.java @@ -16,17 +16,17 @@ import static java.util.Objects.isNull; -import java.util.HashMap; -import java.util.Map; import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectType; import org.eclipse.tractusx.edc.cp.adapter.util.LockMap; -public class ContractInMemorySyncService implements ContractNotificationSyncService { - private final Map dtoMap = new HashMap<>(); - private final Map contractInfoMap = new HashMap<>(); +public class ContractSyncService implements ContractNotificationSyncService { + private final ObjectStoreService storeService; private final LockMap locks; - public ContractInMemorySyncService(LockMap locks) { + public ContractSyncService(ObjectStoreService storeService, LockMap locks) { + this.storeService = storeService; this.locks = locks; } @@ -34,10 +34,14 @@ public ContractInMemorySyncService(LockMap locks) { public DataReferenceRetrievalDto exchangeConfirmedContract( String negotiationId, String agreementId) { locks.lock(negotiationId); - DataReferenceRetrievalDto dto = dtoMap.get(negotiationId); + + DataReferenceRetrievalDto dto = + storeService.get(negotiationId, ObjectType.DTO, DataReferenceRetrievalDto.class); + if (isNull(dto)) { - contractInfoMap.put( - negotiationId, new ContractInfo(agreementId, ContractInfo.ContractState.CONFIRMED)); + ContractInfo contractInfo = + new ContractInfo(agreementId, ContractInfo.ContractState.CONFIRMED); + storeService.put(negotiationId, ObjectType.CONTRACT_INFO, contractInfo); } locks.unlock(negotiationId); return dto; @@ -46,9 +50,13 @@ public DataReferenceRetrievalDto exchangeConfirmedContract( @Override public DataReferenceRetrievalDto exchangeDeclinedContract(String negotiationId) { locks.lock(negotiationId); - DataReferenceRetrievalDto dto = dtoMap.get(negotiationId); + + DataReferenceRetrievalDto dto = + storeService.get(negotiationId, ObjectType.DTO, DataReferenceRetrievalDto.class); + if (isNull(dto)) { - contractInfoMap.put(negotiationId, new ContractInfo(ContractInfo.ContractState.DECLINED)); + ContractInfo contractInfo = new ContractInfo(ContractInfo.ContractState.DECLINED); + storeService.put(negotiationId, ObjectType.CONTRACT_INFO, contractInfo); } locks.unlock(negotiationId); return dto; @@ -57,9 +65,13 @@ public DataReferenceRetrievalDto exchangeDeclinedContract(String negotiationId) @Override public DataReferenceRetrievalDto exchangeErrorContract(String negotiationId) { locks.lock(negotiationId); - DataReferenceRetrievalDto dto = dtoMap.get(negotiationId); + + DataReferenceRetrievalDto dto = + storeService.get(negotiationId, ObjectType.DTO, DataReferenceRetrievalDto.class); + if (isNull(dto)) { - contractInfoMap.put(negotiationId, new ContractInfo(ContractInfo.ContractState.ERROR)); + ContractInfo contractInfo = new ContractInfo(ContractInfo.ContractState.ERROR); + storeService.put(negotiationId, ObjectType.CONTRACT_INFO, contractInfo); } locks.unlock(negotiationId); @@ -69,11 +81,13 @@ public DataReferenceRetrievalDto exchangeErrorContract(String negotiationId) { @Override public ContractInfo exchangeDto(DataReferenceRetrievalDto dto) { String negotiationId = dto.getPayload().getContractNegotiationId(); - locks.lock(negotiationId); - ContractInfo contractInfo = contractInfoMap.get(negotiationId); + + ContractInfo contractInfo = + storeService.get(negotiationId, ObjectType.CONTRACT_INFO, ContractInfo.class); + if (isNull(contractInfo)) { - dtoMap.put(negotiationId, dto); + storeService.put(negotiationId, ObjectType.DTO, dto); } locks.unlock(negotiationId); @@ -82,13 +96,13 @@ public ContractInfo exchangeDto(DataReferenceRetrievalDto dto) { @Override public void removeContractInfo(String negotiationId) { - contractInfoMap.remove(negotiationId); + storeService.remove(negotiationId, ObjectType.CONTRACT_INFO); locks.removeLock(negotiationId); } @Override public void removeDto(String negotiationId) { - dtoMap.remove(negotiationId); + storeService.remove(negotiationId, ObjectType.DTO); locks.removeLock(negotiationId); } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/DataTransferInitializer.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/DataTransferInitializer.java index 1d390ac98..34812567f 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/DataTransferInitializer.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/DataTransferInitializer.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + package org.eclipse.tractusx.edc.cp.adapter.process.contractnotification; import lombok.RequiredArgsConstructor; @@ -15,7 +29,7 @@ public class DataTransferInitializer { private final Monitor monitor; private final TransferProcessService transferProcessService; - public void initiate(DataReferenceRetrievalDto dto) { + public String initiate(DataReferenceRetrievalDto dto) { monitor.info( String.format( "[%s] ContractConfirmationHandler: transfer init - start.", dto.getTraceId())); @@ -46,6 +60,8 @@ public void initiate(DataReferenceRetrievalDto dto) { if (result.failed()) { throwDataRefRequestException(dto); } + + return result.getContent(); } private void throwDataRefRequestException(DataReferenceRetrievalDto dto) { diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefInMemorySyncService.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncService.java similarity index 64% rename from edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefInMemorySyncService.java rename to edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncService.java index cd2f6aebf..812ba70da 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefInMemorySyncService.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncService.java @@ -16,24 +16,26 @@ import static java.util.Objects.isNull; -import java.util.HashMap; -import java.util.Map; import lombok.RequiredArgsConstructor; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectType; import org.eclipse.tractusx.edc.cp.adapter.util.LockMap; @RequiredArgsConstructor -public class DataRefInMemorySyncService implements DataRefNotificationSyncService { - private final Map dtoMap = new HashMap<>(); - private final Map dataReferenceMap = new HashMap<>(); +public class DataRefSyncService implements DataRefNotificationSyncService { + private final ObjectStoreService storeService; private final LockMap locks; public EndpointDataReference exchangeDto(DataReferenceRetrievalDto dto, String agreementId) { locks.lock(agreementId); - EndpointDataReference dataReference = dataReferenceMap.get(agreementId); + + EndpointDataReference dataReference = + storeService.get(agreementId, ObjectType.DATA_REFERENCE, EndpointDataReference.class); + if (isNull(dataReference)) { - dtoMap.put(agreementId, dto); + storeService.put(agreementId, ObjectType.DTO, dto); } locks.unlock(agreementId); return dataReference; @@ -43,9 +45,12 @@ public EndpointDataReference exchangeDto(DataReferenceRetrievalDto dto, String a public DataReferenceRetrievalDto exchangeDataReference( EndpointDataReference dataReference, String agreementId) { locks.lock(agreementId); - DataReferenceRetrievalDto dto = dtoMap.get(agreementId); + + DataReferenceRetrievalDto dto = + storeService.get(agreementId, ObjectType.DTO, DataReferenceRetrievalDto.class); + if (isNull(dto)) { - dataReferenceMap.put(agreementId, dataReference); + storeService.put(agreementId, ObjectType.DATA_REFERENCE, dataReference); } locks.unlock(agreementId); return dto; @@ -53,13 +58,13 @@ public DataReferenceRetrievalDto exchangeDataReference( @Override public void removeDataReference(String agreementId) { - dataReferenceMap.remove(agreementId); + storeService.remove(agreementId, ObjectType.DATA_REFERENCE); locks.removeLock(agreementId); } @Override public void removeDto(String agreementId) { - dtoMap.remove(agreementId); + storeService.remove(agreementId, ObjectType.DTO); locks.removeLock(agreementId); } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandler.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandler.java new file mode 100644 index 000000000..39885837c --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandler.java @@ -0,0 +1,58 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.process.datareference; + +import jakarta.ws.rs.core.Response; +import java.util.List; +import lombok.AllArgsConstructor; +import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.util.string.StringUtils; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel; +import org.eclipse.tractusx.edc.cp.adapter.messaging.MessageBus; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectType; + +@AllArgsConstructor +public class DataReferenceErrorHandler { + private static final String ERROR_MESSAGE = "Data reference process stage failed with status: "; + private final Monitor monitor; + private final MessageBus messageBus; + private final ObjectStoreService objectStore; + private final TransferProcessService transferProcessService; + + private final List errorStates = List.of("CANCELLED", "ERROR"); + + public void validateActiveProcesses() { + monitor.debug("Data reference error handling - START"); + objectStore.get(ObjectType.DTO, DataReferenceRetrievalDto.class).stream() + .filter(dto -> !StringUtils.isNullOrEmpty(dto.getPayload().getTransferProcessId())) + .forEach(this::validateProcess); + } + + private void validateProcess(DataReferenceRetrievalDto dto) { + String state = transferProcessService.getState(dto.getPayload().getTransferProcessId()); + if (errorStates.contains(state)) { + monitor.warning(String.format("[%s] ", dto.getTraceId()) + ERROR_MESSAGE + state); + String contractAgreementId = dto.getPayload().getContractAgreementId(); + objectStore.remove(contractAgreementId, ObjectType.DTO); + + dto.getPayload().setErrorStatus(Response.Status.BAD_GATEWAY); + dto.getPayload().setErrorMessage(ERROR_MESSAGE + state); + messageBus.send(Channel.RESULT, dto); + } + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverImpl.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverImpl.java index 90506b11e..0f1173cf4 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverImpl.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverImpl.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + package org.eclipse.tractusx.edc.cp.adapter.process.datareference; import static java.util.Objects.isNull; diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultService.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultService.java index 606b22f94..494237abe 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultService.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultService.java @@ -18,10 +18,12 @@ import static java.util.concurrent.TimeUnit.SECONDS; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ArrayBlockingQueue; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import lombok.RequiredArgsConstructor; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; import org.eclipse.tractusx.edc.cp.adapter.messaging.Listener; @@ -31,6 +33,7 @@ public class ResultService implements Listener { private final int CAPACITY = 1; private final int DEFAULT_TIMEOUT; private final Map> results = new ConcurrentHashMap<>(); + private final Monitor monitor; public ProcessData pull(String id) throws InterruptedException { return pull(id, DEFAULT_TIMEOUT, SECONDS); @@ -50,17 +53,41 @@ public void process(DataReferenceRetrievalDto dto) { if (isNull(dto) || isNull(dto.getPayload())) { throw new IllegalArgumentException(); } + logReceivedResult(dto); add(dto.getTraceId(), dto.getPayload()); } - private void add(String id, ProcessData ProcessData) { + private void add(String id, ProcessData processData) { if (!results.containsKey(id)) { initiate(id); } - results.get(id).add(ProcessData); + try { + results.get(id).add(processData); + } catch (IllegalStateException e) { + logIgnoredResult(id, processData); + } } private void initiate(String id) { results.put(id, new ArrayBlockingQueue<>(CAPACITY)); } + + private void logReceivedResult(DataReferenceRetrievalDto dto) { + monitor.info( + String.format( + "[%s] Result received: %s", dto.getTraceId(), getResultInfo(dto.getPayload()))); + } + + private void logIgnoredResult(String id, ProcessData processData) { + monitor.warning( + String.format( + "[%s] Other Result was already returned! Result '%s' will be ignored!", + id, getResultInfo(processData))); + } + + private String getResultInfo(ProcessData processData) { + return Objects.nonNull(processData.getErrorMessage()) + ? processData.getErrorMessage() + : processData.getEndpointDataReference().getId(); + } } diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreService.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreService.java new file mode 100644 index 000000000..6c3818379 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreService.java @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.service.objectstore; + +import java.util.List; + +public interface ObjectStoreService { + void put(String key, ObjectType objectType, Object object); + + T get(String key, ObjectType objectType, Class type); + + void remove(String key, ObjectType objectType); + + List get(ObjectType objectType, Class type); +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceInMemory.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceInMemory.java new file mode 100644 index 000000000..3e1a8190a --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceInMemory.java @@ -0,0 +1,74 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.service.objectstore; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; + +@AllArgsConstructor +public class ObjectStoreServiceInMemory implements ObjectStoreService { + private final ObjectMapper mapper; + private final Map map = new HashMap<>(); + + @Override + public void put(String key, ObjectType objectType, Object object) { + try { + map.put(getKey(key, objectType), mapper.writeValueAsString(object)); + } catch (JsonProcessingException e) { + e.printStackTrace(); + throw new IllegalArgumentException(); + } + } + + @Override + public T get(String key, ObjectType objectType, Class type) { + String json = map.get(getKey(key, objectType)); + return Objects.isNull(json) ? null : map(type, json); + } + + @Override + public List get(ObjectType objectType, Class type) { + return map.entrySet().stream() + .filter(entry -> entry.getKey().startsWith(objectType.name())) + .map(Map.Entry::getValue) + .map(s -> map(type, s)) + .collect(Collectors.toList()); + } + + @Override + public void remove(String key, ObjectType objectType) { + map.remove(getKey(key, objectType)); + } + + private String getKey(String key, ObjectType objectType) { + return objectType.name() + key; + } + + private T map(Class type, String json) { + T object = null; + try { + object = mapper.readValue(json, type); + } catch (JsonProcessingException e) { + e.printStackTrace(); + } + return object; + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceSql.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceSql.java new file mode 100644 index 000000000..b16daea50 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceSql.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.service.objectstore; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import java.util.List; +import java.util.Objects; +import java.util.stream.Collectors; +import lombok.AllArgsConstructor; +import org.eclipse.edc.util.collection.CollectionUtil; +import org.eclipse.tractusx.edc.cp.adapter.store.SqlObjectStore; +import org.eclipse.tractusx.edc.cp.adapter.store.model.ObjectEntity; + +@AllArgsConstructor +public class ObjectStoreServiceSql implements ObjectStoreService { + private final ObjectMapper mapper; + private final SqlObjectStore objectStore; + + @Override + public void put(String key, ObjectType objectType, Object object) { + ObjectEntity entity = + ObjectEntity.builder() + .id(key) + .type(objectType.name()) + .object(objectToJson(object, objectType.name())) + .build(); + objectStore.saveMessage(entity); + } + + @Override + public T get(String key, ObjectType objectType, Class type) { + ObjectEntity entity = objectStore.find(key, objectType.name()); + if (Objects.isNull(entity)) { + return null; + } + return jsonToObject(entity, type); + } + + @Override + public List get(ObjectType objectType, Class type) { + List entities = objectStore.find(objectType.name()); + if (CollectionUtil.isEmpty(entities)) { + return List.of(); + } + return entities.stream().map(entity -> jsonToObject(entity, type)).collect(Collectors.toList()); + } + + @Override + public void remove(String key, ObjectType objectType) { + objectStore.deleteMessage(key, objectType.name()); + } + + private String objectToJson(Object object, String type) { + if (Objects.isNull(object)) { + return null; + } + try { + return mapper.writeValueAsString(object); + } catch (JsonProcessingException e) { + e.printStackTrace(); + throw new IllegalArgumentException(String.format("Can not parse object of type %s", type)); + } + } + + private T jsonToObject(ObjectEntity entity, Class type) { + try { + return mapper.readValue(entity.getObject(), type); + } catch (JsonProcessingException e) { + e.printStackTrace(); + throw new IllegalArgumentException(String.format("Can not parse object of type %s", type)); + } + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectType.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectType.java new file mode 100644 index 000000000..5cbee2d0f --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectType.java @@ -0,0 +1,22 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.service.objectstore; + +public enum ObjectType { + DTO, + CONTRACT_INFO, + DATA_REFERENCE, + RESULT +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlObjectStore.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlObjectStore.java new file mode 100644 index 000000000..4a5ec94a9 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlObjectStore.java @@ -0,0 +1,118 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store; + +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery; +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuerySingle; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Instant; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.cp.adapter.store.model.ObjectEntity; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.ObjectStoreStatements; + +public class SqlObjectStore extends AbstractSqlStore { + private final ObjectStoreStatements statements; + + public SqlObjectStore( + DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + ObjectMapper objectMapper, + ObjectStoreStatements statements) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper); + this.statements = statements; + } + + public void saveMessage(ObjectEntity objectEntity) { + long now = Instant.now().toEpochMilli(); + transactionContext.execute( + () -> { + try (var conn = getConnection()) { + var template = statements.getSaveObjectTemplate(); + executeQuery( + conn, + template, + objectEntity.getId(), + now, + objectEntity.getType(), + objectEntity.getObject()); + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + public ObjectEntity find(String id, String type) { + return transactionContext.execute( + () -> { + try (var connection = getConnection()) { + var sql = statements.getFindByIdAndTypeTemplate(); + return executeQuerySingle(connection, false, this::mapObjectEntity, sql, id, type); + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + public List find(String type) { + return transactionContext.execute( + () -> { + try (var connection = getConnection()) { + var sql = statements.getFindByTypeTemplate(); + Stream stream = + executeQuery(connection, false, this::mapObjectEntity, sql, type); + List result = stream.collect(Collectors.toList()); + stream.close(); + return result; + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + public void deleteMessage(String id, String type) { + transactionContext.execute( + () -> { + try (var connection = getConnection()) { + var stmt = statements.getDeleteTemplate(); + executeQuery(connection, stmt, id, type); + } catch (SQLException | IllegalStateException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + private ObjectEntity mapObjectEntity(ResultSet resultSet) throws SQLException { + return ObjectEntity.builder() + .id(resultSet.getString(statements.getIdColumn())) + .createdAt(resultSet.getLong(statements.getCreatedAtColumn())) + .type(resultSet.getString(statements.getTypeColumn())) + .object(resultSet.getString(statements.getObjectColumn())) + .build(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlQueueStore.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlQueueStore.java new file mode 100644 index 000000000..0d808fae1 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlQueueStore.java @@ -0,0 +1,184 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store; + +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery; +import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuerySingle; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.time.Clock; +import java.time.Instant; +import java.util.List; +import java.util.Objects; +import java.util.UUID; +import java.util.stream.Collectors; +import java.util.stream.Stream; +import org.eclipse.edc.spi.persistence.EdcPersistenceException; +import org.eclipse.edc.sql.lease.SqlLeaseContextBuilder; +import org.eclipse.edc.sql.store.AbstractSqlStore; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.store.model.QueueMessage; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.QueueStatements; + +public class SqlQueueStore extends AbstractSqlStore { + private final QueueStatements statements; + private final SqlLeaseContextBuilder leaseContext; + + public SqlQueueStore( + DataSourceRegistry dataSourceRegistry, + String dataSourceName, + TransactionContext transactionContext, + ObjectMapper objectMapper, + QueueStatements statements, + String connectorId, + Clock clock) { + super(dataSourceRegistry, dataSourceName, transactionContext, objectMapper); + this.statements = statements; + leaseContext = SqlLeaseContextBuilder.with(transactionContext, connectorId, statements, clock); + } + + public void saveMessage(QueueMessage queueMessage) { + long now = Instant.now().toEpochMilli(); + transactionContext.execute( + () -> { + try (var conn = getConnection()) { + var template = statements.getSaveMessageTemplate(); + executeQuery( + conn, + template, + now, + UUID.randomUUID().toString(), + queueMessage.getChannel(), + toJson(queueMessage.getMessage()), + queueMessage.getInvokeAfter()); + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + public QueueMessage findById(String id) { + return transactionContext.execute( + () -> { + try (var connection = getConnection()) { + var sql = statements.getFindByIdTemplate(); + return executeQuerySingle(connection, false, this::mapQueueMessage, sql, id); + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + public void deleteMessage(String id) { + transactionContext.execute( + () -> { + var existing = findById(id); + + if (existing != null) { + try (var connection = getConnection()) { + breakLease(connection, id); + var stmt = statements.getDeleteTemplate(); + executeQuery(connection, stmt, id); + } catch (SQLException | IllegalStateException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + } + }); + } + + public void updateMessage(QueueMessage queueMessage) { + transactionContext.execute( + () -> { + var existing = findById(queueMessage.getId()); + + if (existing != null) { + try (var connection = getConnection()) { + var stmt = statements.getUpdateTemplate(); + breakLease(connection, queueMessage.getId()); + executeQuery( + connection, + stmt, + queueMessage.getChannel(), + toJson(queueMessage.getMessage()), + queueMessage.getInvokeAfter(), + queueMessage.getId()); + } catch (SQLException | IllegalStateException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + } + }); + } + + public List findMessagesToSend(int max) { + long now = Instant.now().toEpochMilli(); + return transactionContext.execute( + () -> { + try (var connection = getConnection()) { + var sql = statements.getMessagesToSendTemplate(); + Stream stream = + executeQuery(connection, false, this::mapQueueMessage, sql, now, max); + List result = + stream + .map(queueMessage -> getLeasedQueueMessage(connection, queueMessage)) + .filter(Objects::nonNull) + .collect(Collectors.toList()); + stream.close(); + return result; + } catch (SQLException e) { + e.printStackTrace(); + throw new EdcPersistenceException(e); + } + }); + } + + private QueueMessage getLeasedQueueMessage(Connection connection, QueueMessage queueMessage) { + try { + acquireLease(connection, queueMessage.getId()); + return queueMessage; + } catch (IllegalStateException e) { + return null; + } + } + + private void acquireLease(Connection connection, String id) { + leaseContext.withConnection(connection).acquireLease(id); + } + + private void breakLease(Connection connection, String id) { + leaseContext.withConnection(connection).breakLease(id); + } + + private QueueMessage mapQueueMessage(ResultSet resultSet) throws SQLException { + return QueueMessage.builder() + .id(resultSet.getString(statements.getIdColumn())) + .message( + fromJson( + resultSet.getString(statements.getMessageColumn()), + DataReferenceRetrievalDto.class)) + .invokeAfter(resultSet.getLong(statements.getInvokeAfterColumn())) + .createdAt(resultSet.getLong(statements.getCreatedAtColumn())) + .channel(resultSet.getString(statements.getChannelColumn())) + .build(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/ObjectEntity.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/ObjectEntity.java new file mode 100644 index 000000000..ffefe3927 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/ObjectEntity.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.model; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; + +@Getter +@Setter +@Builder +public class ObjectEntity { + private String id; + private long createdAt; + private String type; + private String object; +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/QueueMessage.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/QueueMessage.java new file mode 100644 index 000000000..e3d1ec8bb --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/model/QueueMessage.java @@ -0,0 +1,31 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.model; + +import lombok.Builder; +import lombok.Getter; +import lombok.Setter; +import org.eclipse.tractusx.edc.cp.adapter.messaging.Message; + +@Getter +@Setter +@Builder +public class QueueMessage { + private String id; + private long createdAt; + private String channel; + private Message message; + private long invokeAfter; +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectObjectStoreStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectObjectStoreStatements.java new file mode 100644 index 000000000..caa7fb39a --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectObjectStoreStatements.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema; + +import static java.lang.String.format; + +import org.eclipse.edc.sql.dialect.BaseSqlDialect; + +public class BaseSqlDialectObjectStoreStatements implements ObjectStoreStatements { + @Override + public String getSaveObjectTemplate() { + return format( + "INSERT INTO %s (%s, %s, %s, %s) VALUES(?, ?, ?, ?%s)", + getObjectStoreTable(), + getIdColumn(), + getCreatedAtColumn(), + getTypeColumn(), + getObjectColumn(), + getFormatJsonOperator()); + } + + @Override + public String getFindByIdAndTypeTemplate() { + return format( + "SELECT * FROM %s WHERE %s = ? AND %s = ?", + getObjectStoreTable(), getIdColumn(), getTypeColumn()); + } + + @Override + public String getFindByTypeTemplate() { + return format("SELECT * FROM %s WHERE %s = ?", getObjectStoreTable(), getTypeColumn()); + } + + @Override + public String getDeleteTemplate() { + return format( + "DELETE FROM %s WHERE %s = ? AND %s = ?;", + getObjectStoreTable(), getIdColumn(), getTypeColumn()); + } + + protected String getFormatJsonOperator() { + return BaseSqlDialect.getJsonCastOperator(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectQueueStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectQueueStatements.java new file mode 100644 index 000000000..7bf19422c --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectQueueStatements.java @@ -0,0 +1,103 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema; + +import static java.lang.String.format; + +import org.eclipse.edc.sql.dialect.BaseSqlDialect; + +public class BaseSqlDialectQueueStatements implements QueueStatements { + + @Override + public String getSaveMessageTemplate() { + return format( + "INSERT INTO %s (%s, %s, %s, %s, %s) VALUES(?, ?, ?, ?%s, ?)", + getQueueTable(), + getCreatedAtColumn(), + getIdColumn(), + getChannelColumn(), + getMessageColumn(), + getInvokeAfterColumn(), + getFormatJsonOperator()); + } + + @Override + public String getAllMessagesTemplate() { + return format("SELECT * FROM %s ", getQueueTable()); + } + + @Override + public String getMessagesToSendTemplate() { + return format( + "SELECT * FROM %s WHERE %s <= ? AND %s IS NULL LIMIT ?", + getQueueTable(), getInvokeAfterColumn(), getLeaseIdColumn()); + } + ; + + @Override + public String getDeleteTemplate() { + return format("DELETE FROM %s WHERE %s = ?", getQueueTable(), getIdColumn()); + } + + @Override + public String getFindByIdTemplate() { + return format("SELECT * FROM %s WHERE %s = ?", getQueueTable(), getIdColumn()); + } + + @Override + public String getUpdateTemplate() { + return format( + "UPDATE %s SET %s=?, %s=?%s, %s=? WHERE %s=?", + getQueueTable(), + getChannelColumn(), + getMessageColumn(), + getFormatJsonOperator(), + getInvokeAfterColumn(), + getIdColumn()); + } + + @Override + public String getDeleteLeaseTemplate() { + return format("DELETE FROM %s WHERE %s = ?;", getLeaseTableName(), getLeaseIdColumn()); + } + + @Override + public String getInsertLeaseTemplate() { + return format( + "INSERT INTO %s (%s, %s, %s, %s)" + "VALUES (?,?,?,?);", + getLeaseTableName(), + getLeaseIdColumn(), + getLeasedByColumn(), + getLeasedAtColumn(), + getLeaseDurationColumn()); + } + + @Override + public String getUpdateLeaseTemplate() { + return format( + "UPDATE %s SET %s=? WHERE %s = ?;", getQueueTable(), getLeaseIdColumn(), getIdColumn()); + } + + @Override + public String getFindLeaseByEntityTemplate() { + return format( + "SELECT * FROM %s WHERE %s = (SELECT lease_id FROM %s WHERE %s=? )", + getLeaseTableName(), getLeaseIdColumn(), getQueueTable(), getIdColumn()); + } + + protected String getFormatJsonOperator() { + return BaseSqlDialect.getJsonCastOperator(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/ObjectStoreStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/ObjectStoreStatements.java new file mode 100644 index 000000000..57f60988a --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/ObjectStoreStatements.java @@ -0,0 +1,45 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema; + +public interface ObjectStoreStatements { + default String getObjectStoreTable() { + return "edc_cpadapter_object_store"; + } + + default String getIdColumn() { + return "id"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + default String getTypeColumn() { + return "type"; + } + + default String getObjectColumn() { + return "object"; + } + + String getSaveObjectTemplate(); + + String getFindByIdAndTypeTemplate(); + + String getFindByTypeTemplate(); + + String getDeleteTemplate(); +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/QueueStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/QueueStatements.java new file mode 100644 index 000000000..918734c2a --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/QueueStatements.java @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema; + +import org.eclipse.edc.sql.lease.LeaseStatements; + +/** Defines all statements that are needed for the ContractDefinition store */ +public interface QueueStatements extends LeaseStatements { + default String getQueueTable() { + return "edc_cpadapter_queue"; + } + + default String getIdColumn() { + return "id"; + } + + default String getCreatedAtColumn() { + return "created_at"; + } + + default String getChannelColumn() { + return "channel"; + } + + default String getMessageColumn() { + return "message"; + } + + default String getInvokeAfterColumn() { + return "invoke_after"; + } + + String getAllMessagesTemplate(); + + String getMessagesToSendTemplate(); + + String getSaveMessageTemplate(); + + String getDeleteTemplate(); + + String getFindByIdTemplate(); + + String getUpdateTemplate(); +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectObjectStoreStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectObjectStoreStatements.java new file mode 100644 index 000000000..85a078981 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectObjectStoreStatements.java @@ -0,0 +1,28 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema.postgres; + +import org.eclipse.edc.sql.dialect.PostgresDialect; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.BaseSqlDialectObjectStoreStatements; + +public class PostgresDialectObjectStoreStatements extends BaseSqlDialectObjectStoreStatements { + /** + * Overridable operator to convert strings to JSON. For postgres, this is the "::json" operator + */ + @Override + protected String getFormatJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectQueueStatements.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectQueueStatements.java new file mode 100644 index 000000000..cdafaf4f3 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/postgres/PostgresDialectQueueStatements.java @@ -0,0 +1,29 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.store.schema.postgres; + +import org.eclipse.edc.sql.dialect.PostgresDialect; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.BaseSqlDialectQueueStatements; + +public class PostgresDialectQueueStatements extends BaseSqlDialectQueueStatements { + + /** + * Overridable operator to convert strings to JSON. For postgres, this is the "::json" operator + */ + @Override + protected String getFormatJsonOperator() { + return PostgresDialect.getJsonCastOperator(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMap.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMap.java index 26c7abce6..657ca8f92 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMap.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMap.java @@ -22,7 +22,7 @@ public class ExpiringMap { private final Map map = new HashMap<>(); private final Map entryTimeMap = new HashMap<>(); - private long expireAfter = 5 * 60; + private long expireAfter = 2 * 60; public ExpiringMap() {} diff --git a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/LockMap.java b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/LockMap.java index f8f73d96c..c8014ee9b 100644 --- a/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/LockMap.java +++ b/edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/util/LockMap.java @@ -18,6 +18,12 @@ import java.util.Map; import java.util.concurrent.locks.ReentrantLock; +/** + * When CP-Adapter works in InMemory mode, LockMap is used to prevent race condition of two events. + * This implementation will not work if both events will be handled by two separate EDC instances + * (persistent mode), but edc.cp.adapter.service.objectstore.ObjectStoreServiceSql#put(...) method + * will not allow to save both events in the table as PRIMARY_KE collision would appear. + */ public class LockMap { private final Map lock = new HashMap<>(); diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/HttpControllerTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/HttpControllerTest.java index d7df56228..0d0f6add3 100644 --- a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/HttpControllerTest.java +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/HttpControllerTest.java @@ -42,7 +42,7 @@ public void getAssetSynchronous_shouldReturnBadRequestIfNoAssetIdParam() { HttpController httpController = new HttpController(monitor, resultService, messageBus, config); // when - Response response = httpController.getAssetSynchronous(null, "providerUrl", null, null); + Response response = httpController.getAssetSynchronous(null, "providerUrl", null, false, null); // then assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); @@ -59,12 +59,37 @@ public void getAssetSynchronous_shouldReturnBadRequestIfNoProviderUrlParam() { HttpController httpController = new HttpController(monitor, resultService, messageBus, config); // when - Response response = httpController.getAssetSynchronous("assetId", null, null, null); + Response response = httpController.getAssetSynchronous("assetId", null, null, false, null); // then assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus()); } + @Test + public void getAssetSynchronous_shouldReturnErrorStatusIfOccurred() throws InterruptedException { + // given + Monitor monitor = Mockito.mock(Monitor.class); + ResultService resultService = Mockito.mock(ResultService.class); + MessageBus messageBus = Mockito.mock(MessageBus.class); + ApiAdapterConfig config = Mockito.mock(ApiAdapterConfig.class); + when(config.getDefaultMessageRetryNumber()).thenReturn(RETRY_NUMBER); + HttpController httpController = new HttpController(monitor, resultService, messageBus, config); + + when(resultService.pull(anyString())) + .thenReturn( + ProcessData.builder() + .errorStatus(Response.Status.BAD_GATEWAY) + .endpointDataReference(getEndpointDataReference()) + .build()); + + // when + Response response = + httpController.getAssetSynchronous("assetId", "providerUrl", null, false, null); + + // then + assertEquals(Response.Status.BAD_GATEWAY.getStatusCode(), response.getStatus()); + } + @Test public void getAssetSynchronous_shouldReturnOkResponse() throws InterruptedException { // given @@ -79,7 +104,8 @@ public void getAssetSynchronous_shouldReturnOkResponse() throws InterruptedExcep ProcessData.builder().endpointDataReference(getEndpointDataReference()).build()); // when - Response response = httpController.getAssetSynchronous("assetId", "providerUrl", null, null); + Response response = + httpController.getAssetSynchronous("assetId", "providerUrl", null, false, null); // then assertEquals(Response.Status.OK.getStatusCode(), response.getStatus()); diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBusTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBusTest.java new file mode 100644 index 000000000..a7a953c3c --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBusTest.java @@ -0,0 +1,235 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.messaging; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import java.time.Clock; +import java.time.Instant; +import java.time.ZoneId; +import java.util.*; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.edc.transaction.datasource.spi.DataSourceRegistry; +import org.eclipse.edc.transaction.spi.TransactionContext; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; +import org.eclipse.tractusx.edc.cp.adapter.store.SqlQueueStore; +import org.eclipse.tractusx.edc.cp.adapter.store.model.QueueMessage; +import org.eclipse.tractusx.edc.cp.adapter.store.schema.QueueStatements; +import org.jetbrains.annotations.NotNull; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class SqlMessageBusTest { + @Mock Monitor monitor; + @Mock Listener listener; + @Mock ListenerService listenerService; + @Mock SqlQueueStore store; + @Mock DataSourceRegistry dataSourceRegistry; + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void send_shouldCallListenerOnce() throws InterruptedException { + // given + Message message = new DataReferenceRetrievalDto(null, 3); + when(listenerService.getListener(any())).thenReturn(listener); + SqlMessageBus messageBus = + new SqlMessageBus(monitor, listenerService, inMemoryFakeStore(), 2, 10); + + // when + messageBus.send(Channel.INITIAL, message); + Thread.sleep(60); + messageBus.deliverMessages(10); + + // then + Thread.sleep(60); + verify(listener, times(1)).process(any(DataReferenceRetrievalDto.class)); + } + + @Test + public void send_shouldCallListenerWithRetryOnException() throws InterruptedException { + // given + Message message = new DataReferenceRetrievalDto(null, 3); + when(listenerService.getListener(any())).thenReturn(listener); + doThrow(new IllegalStateException()).doNothing().when(listener).process(any()); + SqlMessageBus messageBus = + new SqlMessageBus(monitor, listenerService, inMemoryFakeStore(), 2, 10); + + // when + messageBus.send(Channel.INITIAL, message); + messageBus.deliverMessages(10); + Thread.sleep(60); + + // then + verify(listener, times(2)).process(any(DataReferenceRetrievalDto.class)); + } + + @Test + public void send_shouldSendToDlqIfErrorLimitReached() throws InterruptedException { + // given + Message message = new DataReferenceRetrievalDto(null, 3); + message.setErrorNumber(10); + when(listenerService.getListener(any())).thenReturn(listener); + doThrow(new IllegalStateException()).doNothing().when(listener).process(any()); + SqlMessageBus messageBus = + new SqlMessageBus(monitor, listenerService, inMemoryFakeStore(), 2, 10); + + // when + messageBus.send(Channel.INITIAL, message); + Thread.sleep(60); + + // then + verify(listenerService).getListener(eq(Channel.DLQ)); + } + + private SqlQueueStore inMemoryFakeStore() { + return new SqlQueueStore( + dataSourceRegistry, + "dsname", + getFakeTransactionContext(), + new ObjectMapper(), + getFakeStatements(), + "cid", + getFakeClock()) { + + private final Map map = new HashMap<>(); + + @Override + public void saveMessage(QueueMessage queueMessage) { + String id = UUID.randomUUID().toString(); + queueMessage.setId(id); + map.put(id, queueMessage); + } + + @Override + public QueueMessage findById(String id) { + return map.get(id); + } + + @Override + public void deleteMessage(String id) { + map.remove(id); + } + + @Override + public void updateMessage(QueueMessage queueMessage) { + map.remove(queueMessage.getId()); + map.put(queueMessage.getId(), queueMessage); + } + + @Override + public List findMessagesToSend(int max) { + return new ArrayList<>(map.values()); + } + }; + } + + private Clock getFakeClock() { + return new Clock() { + @Override + public ZoneId getZone() { + return null; + } + + @Override + public Clock withZone(ZoneId zone) { + return null; + } + + @Override + public Instant instant() { + return null; + } + }; + } + + @NotNull + private QueueStatements getFakeStatements() { + return new QueueStatements() { + @Override + public String getAllMessagesTemplate() { + return null; + } + + @Override + public String getMessagesToSendTemplate() { + return null; + } + + @Override + public String getSaveMessageTemplate() { + return null; + } + + @Override + public String getDeleteTemplate() { + return null; + } + + @Override + public String getFindByIdTemplate() { + return null; + } + + @Override + public String getUpdateTemplate() { + return null; + } + + @Override + public String getDeleteLeaseTemplate() { + return null; + } + + @Override + public String getInsertLeaseTemplate() { + return null; + } + + @Override + public String getUpdateLeaseTemplate() { + return null; + } + + @Override + public String getFindLeaseByEntityTemplate() { + return null; + } + }; + } + + private TransactionContext getFakeTransactionContext() { + return new TransactionContext() { + @Override + public void execute(TransactionBlock transactionBlock) {} + + @Override + public T execute(ResultTransactionBlock resultTransactionBlock) { + return null; + } + + @Override + public void registerSynchronization(TransactionSynchronization transactionSynchronization) {} + }; + } +} diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractAgreementRetrieverTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractAgreementRetrieverTest.java new file mode 100644 index 000000000..ad4cda3c0 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnegotiation/ContractAgreementRetrieverTest.java @@ -0,0 +1,86 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.when; + +import java.time.Instant; +import java.util.stream.Stream; +import org.eclipse.edc.connector.contract.spi.types.agreement.ContractAgreement; +import org.eclipse.edc.connector.spi.contractagreement.ContractAgreementService; +import org.eclipse.edc.policy.model.Policy; +import org.eclipse.edc.service.spi.result.ServiceResult; +import org.eclipse.edc.spi.monitor.Monitor; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class ContractAgreementRetrieverTest { + @Mock Monitor monitor; + @Mock ContractAgreementService agreementService; + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + } + + @Test + public void getExistingContractByAssetId_shouldReturnValidContract() { + // given + long now = Instant.now().getEpochSecond(); + when(agreementService.query(any())).thenReturn(getResult(now + 1000)); + ContractAgreementRetriever retriever = + new ContractAgreementRetriever(monitor, agreementService); + + // when + ContractAgreement contractAgreement = retriever.getExistingContractByAssetId("id"); + + // then + Assertions.assertNotNull(contractAgreement); + } + + @Test + public void getExistingContractByAssetId_shouldNotReturnExpiredContract() { + // given + long now = Instant.now().getEpochSecond(); + when(agreementService.query(any())).thenReturn(getResult(now - 1000)); + ContractAgreementRetriever retriever = + new ContractAgreementRetriever(monitor, agreementService); + + // when + ContractAgreement contractAgreement = retriever.getExistingContractByAssetId("id"); + + // then + Assertions.assertNull(contractAgreement); + } + + private ServiceResult> getResult(long endDate) { + long now = Instant.now().getEpochSecond(); + return ServiceResult.success( + Stream.of( + ContractAgreement.Builder.newInstance() + .id("id") + .assetId("assetId") + .contractStartDate(now - 2000) + .contractEndDate(endDate) + .providerAgentId("providerId") + .consumerAgentId("consumerId") + .policy(Policy.Builder.newInstance().build()) + .build())); + } +} diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerTest.java index 6628b09c8..5d0e83553 100644 --- a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerTest.java +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + package org.eclipse.tractusx.edc.cp.adapter.process.contractnotification; import static org.mockito.ArgumentMatchers.any; diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncServiceTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncServiceTest.java new file mode 100644 index 000000000..720cc0b37 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractSyncServiceTest.java @@ -0,0 +1,143 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.process.contractnotification; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceInMemory; +import org.eclipse.tractusx.edc.cp.adapter.util.LockMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ContractSyncServiceTest { + + @Test + public void exchangeConfirmedContract_shouldReturnDtoIfAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeDto(getDataReferenceRetrievalDto()); + + // when + DataReferenceRetrievalDto dto = + syncService.exchangeConfirmedContract("negotiationId", "agreementId"); + + // then + Assertions.assertNotNull(dto); + } + + @Test + public void exchangeConfirmedContract_shouldReturnNullIfDtoNotAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + DataReferenceRetrievalDto dto = + syncService.exchangeConfirmedContract("negotiationId", "agreementId"); + + // then + Assertions.assertNull(dto); + } + + @Test + public void exchangeDeclinedContract_shouldReturnDtoIfAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeDto(getDataReferenceRetrievalDto()); + + // when + DataReferenceRetrievalDto dto = syncService.exchangeDeclinedContract("negotiationId"); + + // then + Assertions.assertNotNull(dto); + } + + @Test + public void exchangeDeclinedContract_shouldReturnNullIfDtoNotAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + DataReferenceRetrievalDto dto = syncService.exchangeDeclinedContract("negotiationId"); + + // then + Assertions.assertNull(dto); + } + + @Test + public void exchangeErrorContract_shouldReturnDtoIfAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeDto(getDataReferenceRetrievalDto()); + + // when + DataReferenceRetrievalDto dto = syncService.exchangeErrorContract("negotiationId"); + + // then + Assertions.assertNotNull(dto); + } + + @Test + public void exchangeErrorContract_shouldReturnNullIfDtoNotAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + DataReferenceRetrievalDto dto = syncService.exchangeErrorContract("negotiationId"); + + // then + Assertions.assertNull(dto); + } + + @Test + public void exchangeDto_shouldReturnContractInfoIfAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeConfirmedContract("negotiationId", "agreementId"); + + // when + ContractInfo contractInfo = syncService.exchangeDto(getDataReferenceRetrievalDto()); + + // then + Assertions.assertNotNull(contractInfo); + } + + @Test + public void exchangeDto_shouldReturnNullIfContractInfoNotAvailable() { + // given + ContractSyncService syncService = + new ContractSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + ContractInfo contractInfo = syncService.exchangeDto(getDataReferenceRetrievalDto()); + + // then + Assertions.assertNull(contractInfo); + } + + private DataReferenceRetrievalDto getDataReferenceRetrievalDto() { + ProcessData processData = ProcessData.builder().assetId("assetId").provider("provider").build(); + DataReferenceRetrievalDto dto = new DataReferenceRetrievalDto(processData, 3); + dto.getPayload().setContractNegotiationId("negotiationId"); + return dto; + } +} diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncServiceTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncServiceTest.java new file mode 100644 index 000000000..0b0e537f7 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataRefSyncServiceTest.java @@ -0,0 +1,98 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.process.datareference; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceInMemory; +import org.eclipse.tractusx.edc.cp.adapter.util.LockMap; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class DataRefSyncServiceTest { + + @Test + public void exchangeDto_shouldReturnDataReferenceIfAvailable() { + // given + DataRefSyncService syncService = + new DataRefSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeDataReference(getEndpointDataReference(), "agreementId"); + + // when + EndpointDataReference dataReference = + syncService.exchangeDto(getDataReferenceRetrievalDto(), "agreementId"); + + // then + Assertions.assertNotNull(dataReference); + } + + @Test + public void exchangeDto_shouldReturnNullIfDataReferenceNotAvailable() { + // given + DataRefSyncService syncService = + new DataRefSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + EndpointDataReference dataReference = + syncService.exchangeDto(getDataReferenceRetrievalDto(), "agreementId"); + + // then + Assertions.assertNull(dataReference); + } + + @Test + public void exchangeDataReference_shouldReturnDtoIfAvailable() { + // given + DataRefSyncService syncService = + new DataRefSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + syncService.exchangeDto(getDataReferenceRetrievalDto(), "agreementId"); + + // when + DataReferenceRetrievalDto dto = + syncService.exchangeDataReference(getEndpointDataReference(), "agreementId"); + + // then + Assertions.assertNotNull(dto); + } + + @Test + public void exchangeDataReference_shouldReturnNullIfDtoNotAvailable() { + // given + DataRefSyncService syncService = + new DataRefSyncService(new ObjectStoreServiceInMemory(new ObjectMapper()), new LockMap()); + + // when + DataReferenceRetrievalDto dto = + syncService.exchangeDataReference(getEndpointDataReference(), "agreementId"); + + // then + Assertions.assertNull(dto); + } + + private EndpointDataReference getEndpointDataReference() { + return EndpointDataReference.Builder.newInstance() + .endpoint("endpoint") + .authCode("authCode") + .authKey("authKey") + .build(); + } + + private DataReferenceRetrievalDto getDataReferenceRetrievalDto() { + ProcessData processData = ProcessData.builder().assetId("assetId").provider("provider").build(); + return new DataReferenceRetrievalDto(processData, 3); + } +} diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandlerTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandlerTest.java new file mode 100644 index 000000000..167a47479 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/DataReferenceErrorHandlerTest.java @@ -0,0 +1,110 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.process.datareference; + +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.*; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService; +import org.eclipse.edc.connector.transfer.spi.types.TransferProcessStates; +import org.eclipse.edc.spi.monitor.Monitor; +import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; +import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; +import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel; +import org.eclipse.tractusx.edc.cp.adapter.messaging.Message; +import org.eclipse.tractusx.edc.cp.adapter.messaging.MessageBus; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceInMemory; +import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectType; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; + +public class DataReferenceErrorHandlerTest { + @Mock Monitor monitor; + @Mock MessageBus messageBus; + @Mock TransferProcessService transferProcessService; + ObjectStoreService storeService = new ObjectStoreServiceInMemory(new ObjectMapper()); + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + storeService.put("key1", ObjectType.DTO, getDto()); + storeService.put("key2", ObjectType.DTO, getDto()); + } + + @Test + public void validateActiveProcesses_shouldSkipIfNoError() { + // given + when(transferProcessService.getState("transferId")) + .thenReturn(TransferProcessStates.COMPLETED.name()); + DataReferenceErrorHandler errorHandler = + new DataReferenceErrorHandler(monitor, messageBus, storeService, transferProcessService); + + // when + errorHandler.validateActiveProcesses(); + + // then + verify(messageBus, times(0)).send(eq(Channel.RESULT), any(Message.class)); + } + + @Test + public void validateActiveProcesses_shouldHandleErrorReference() { + // given + when(transferProcessService.getState("transferId")) + .thenReturn(TransferProcessStates.COMPLETED.name()) + .thenReturn(TransferProcessStates.ERROR.name()); + DataReferenceErrorHandler errorHandler = + new DataReferenceErrorHandler(monitor, messageBus, storeService, transferProcessService); + + // when + errorHandler.validateActiveProcesses(); + + // then + verify(messageBus, times(1)).send(eq(Channel.RESULT), any(Message.class)); + } + + @Test + public void validateActiveProcesses_shouldHandleCancelledReference() { + // given + when(transferProcessService.getState("transferId")) + .thenReturn(TransferProcessStates.COMPLETED.name()) + .thenReturn(TransferProcessStates.CANCELLED.name()); + DataReferenceErrorHandler errorHandler = + new DataReferenceErrorHandler(monitor, messageBus, storeService, transferProcessService); + + // when + errorHandler.validateActiveProcesses(); + + // then + verify(messageBus, times(1)).send(eq(Channel.RESULT), any(Message.class)); + } + + private DataReferenceRetrievalDto getDto() { + return new DataReferenceRetrievalDto(getProcessData(), 3); + } + + private ProcessData getProcessData() { + return ProcessData.builder() + .assetId("assetId") + .provider("provider") + .contractAgreementId("agreementId") + .transferProcessId("transferId") + .build(); + } +} diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverTest.java index 6851d7708..9d0d983c4 100644 --- a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverTest.java +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/process/datareference/EndpointDataReferenceReceiverTest.java @@ -1,3 +1,17 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + package org.eclipse.tractusx.edc.cp.adapter.process.datareference; import static org.mockito.ArgumentMatchers.any; diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultServiceTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultServiceTest.java index 0aeaa11db..2b7bff25f 100644 --- a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultServiceTest.java +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/service/ResultServiceTest.java @@ -17,18 +17,28 @@ import static org.junit.jupiter.api.Assertions.fail; import java.util.concurrent.TimeUnit; +import org.eclipse.edc.spi.monitor.Monitor; import org.eclipse.edc.spi.types.domain.edr.EndpointDataReference; import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto; import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData; import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; public class ResultServiceTest { + @Mock Monitor monitor; + + @BeforeEach + void init() { + MockitoAnnotations.openMocks(this); + } @Test public void pull_shouldReturnDataReferenceWhenMessageOccursFirst() throws InterruptedException { // given - ResultService resultService = new ResultService(20); + ResultService resultService = new ResultService(20, monitor); String endpointDataRefId = "456"; DataReferenceRetrievalDto dto = getDto(endpointDataRefId); ProcessData processData; @@ -44,7 +54,7 @@ public void pull_shouldReturnDataReferenceWhenMessageOccursFirst() throws Interr @Test public void pull_shouldReturnDataReferenceWhenMessageOccursSecond() throws InterruptedException { // given - ResultService resultService = new ResultService(20); + ResultService resultService = new ResultService(20, monitor); String endpointDataRefId = "456"; DataReferenceRetrievalDto dto = getDto(endpointDataRefId); ProcessData processData; @@ -60,7 +70,7 @@ public void pull_shouldReturnDataReferenceWhenMessageOccursSecond() throws Inter @Test public void pull_shouldReturnNullOnTimeout() throws InterruptedException { // given - ResultService resultService = new ResultService(20); + ResultService resultService = new ResultService(20, monitor); // when ProcessData processData = resultService.pull("123", 500, TimeUnit.MILLISECONDS); @@ -72,7 +82,7 @@ public void pull_shouldReturnNullOnTimeout() throws InterruptedException { @Test public void process_shouldThrowIllegalArgumentExceptionIfNoDataPayload() { // given - ResultService resultService = new ResultService(20); + ResultService resultService = new ResultService(20, monitor); DataReferenceRetrievalDto dto = new DataReferenceRetrievalDto(null, 3); // when then diff --git a/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMapTest.java b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMapTest.java new file mode 100644 index 000000000..a9ed2a9b5 --- /dev/null +++ b/edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/util/ExpiringMapTest.java @@ -0,0 +1,65 @@ +/* + * Copyright (c) 2022 ZF Friedrichshafen AG + * + * This program and the accompanying materials are made available under the + * terms of the Apache License, Version 2.0 which is available at + * https://www.apache.org/licenses/LICENSE-2.0 + * + * SPDX-License-Identifier: Apache-2.0 + * + * Contributors: + * ZF Friedrichshafen AG - Initial API and Implementation + * + */ + +package org.eclipse.tractusx.edc.cp.adapter.util; + +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.Test; + +public class ExpiringMapTest { + + private static final String KEY = "key"; + private static final String VAL = "value"; + + @Test + public void get_shouldGetWhenNotExpired() { + // given + ExpiringMap expiringMap = new ExpiringMap<>(60); + expiringMap.put(KEY, VAL); + + // when + String value = expiringMap.get(KEY); + + // then + Assertions.assertEquals(VAL, value); + } + + @Test + public void get_shouldGetNullWhenExpired() throws InterruptedException { + // given + ExpiringMap expiringMap = new ExpiringMap<>(0); + expiringMap.put(KEY, VAL); + + // when + Thread.sleep(1000); + String value = expiringMap.get(KEY, 0); + + // then + Assertions.assertNull(value); + } + + @Test + public void get_shouldGetNullWhenRemoved() throws InterruptedException { + // given + ExpiringMap expiringMap = new ExpiringMap<>(0); + expiringMap.put(KEY, VAL); + + // when + expiringMap.remove(KEY); + String value = expiringMap.get(KEY, 1000); + + // then + Assertions.assertNull(value); + } +} diff --git a/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java b/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java new file mode 100644 index 000000000..63e43c1e7 --- /dev/null +++ b/edc-extensions/postgresql-migration/src/main/java/org/eclipse/tractusx/edc/postgresql/migration/CpAdapterPostgresqlMigrationExtension.java @@ -0,0 +1,16 @@ +package org.eclipse.tractusx.edc.postgresql.migration; + +public class CpAdapterPostgresqlMigrationExtension extends AbstractPostgresqlMigrationExtension { + private static final String NAME_SUBSYSTEM = "cpadapter"; + private static final String DATASOURCE_SETTING_NAME = "edc.datasource.cpadapter.name"; + + @Override + protected String getDataSourceNameConfigurationKey() { + return DATASOURCE_SETTING_NAME; + } + + @Override + protected String getSubsystemName() { + return NAME_SUBSYSTEM; + } +} diff --git a/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension b/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension index 4c12f6272..b34529d7e 100644 --- a/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension +++ b/edc-extensions/postgresql-migration/src/main/resources/META-INF/services/org.eclipse.edc.spi.system.ServiceExtension @@ -22,3 +22,4 @@ org.eclipse.tractusx.edc.postgresql.migration.ContractDefinitionPostgresqlMigrat org.eclipse.tractusx.edc.postgresql.migration.ContractNegotiationPostgresqlMigrationExtension org.eclipse.tractusx.edc.postgresql.migration.PolicyPostgresqlMigrationExtension org.eclipse.tractusx.edc.postgresql.migration.TransferProcessPostgresqlMigrationExtension +org.eclipse.tractusx.edc.postgresql.migration.CpAdapterPostgresqlMigrationExtension diff --git a/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql b/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql new file mode 100644 index 000000000..3e64d3c45 --- /dev/null +++ b/edc-extensions/postgresql-migration/src/main/resources/org/eclipse/tractusx/edc/postgresql/migration/cpadapter/V0_0_7__Init_CpAdapter_Database_Schema.sql @@ -0,0 +1,54 @@ +-- +-- Copyright (c) 2022 ZF Friedrichshafen AG +-- +-- This program and the accompanying materials are made available under the +-- terms of the Apache License, Version 2.0 which is available at +-- https://www.apache.org/licenses/LICENSE-2.0 +-- +-- SPDX-License-Identifier: Apache-2.0 +-- +-- Contributors: +-- ZF Friedrichshafen AG - Initial SQL Query +-- + +-- Statements are designed for and tested with Postgres only! + + +CREATE TABLE IF NOT EXISTS edc_lease +( + leased_by VARCHAR NOT NULL, + leased_at BIGINT, + lease_duration INTEGER NOT NULL, + lease_id VARCHAR NOT NULL + CONSTRAINT lease_pk + PRIMARY KEY +); + +CREATE TABLE IF NOT EXISTS edc_cpadapter_queue +( + id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + channel VARCHAR, + message JSON, + invoke_after BIGINT NOT NULL, + lease_id VARCHAR + CONSTRAINT cpadapter_queue_lease_lease_id_fk + REFERENCES edc_lease + ON DELETE SET NULL, + PRIMARY KEY (id) +); + +CREATE UNIQUE INDEX IF NOT EXISTS edc_cpadapter_queue_id_uindex + ON edc_cpadapter_queue (id); + +CREATE TABLE IF NOT EXISTS edc_cpadapter_object_store +( + id VARCHAR NOT NULL, + created_at BIGINT NOT NULL, + type VARCHAR, + object JSON, + PRIMARY KEY (id) +); + + +