Skip to content

Commit 666a954

Browse files
author
marcingajek-zf
committed
cp-adapter : data reference error handling
1 parent 0089b3a commit 666a954

15 files changed

+158
-32
lines changed

edc-extensions/control-plane-adapter/README.md

+5-4
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,12 @@ Additional requirements, that affects the architecture of the extension:
3434

3535
Oprional request parameters:
3636

37-
| Name | Description |
38-
|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|---|
39-
| contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. |
37+
| Name | Description |
38+
|--- |--- |
39+
| contractAgreementId | Defines the ID of existing contract agreement, that should be reused for retrieving the asset. If parameter is specified, but contract is not found, 404 error will be returned. |
4040
| contractAgreementReuse | Similar to <i>edc.cp.adapter.reuse.contract.agreement</i> option allows to turn off reusing of existing contracts, but on a request level. Set the parameter value to '0' and new contract agrement will be negotiated. |
41-
41+
| timeout | Similar to <i>edc.cp.adapter.default.sync.request.timeout</i>, defines the maximum time of the request. If data is not ready, time out error will be returned. |
42+
4243
The controller is registered under the context alias of DataManagement API. The authentication depends on the DataManagement configuration.
4344
To find out more please visit:
4445

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterExtension.java

+22-10
Original file line numberDiff line numberDiff line change
@@ -39,10 +39,7 @@
3939
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractAgreementRetriever;
4040
import org.eclipse.tractusx.edc.cp.adapter.process.contractnegotiation.ContractNegotiationHandler;
4141
import org.eclipse.tractusx.edc.cp.adapter.process.contractnotification.*;
42-
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefSyncService;
43-
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataRefNotificationSyncService;
44-
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.DataReferenceHandler;
45-
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.EndpointDataReferenceReceiverImpl;
42+
import org.eclipse.tractusx.edc.cp.adapter.process.datareference.*;
4643
import org.eclipse.tractusx.edc.cp.adapter.service.ErrorResultService;
4744
import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService;
4845
import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreServiceInMemory;
@@ -118,10 +115,11 @@ public void initialize(ServiceExtensionContext context) {
118115
listenerService.addListener(Channel.RESULT, resultService);
119116
listenerService.addListener(Channel.DLQ, errorResultService);
120117

121-
initHttpController(monitor, messageBus, resultService, config);
118+
initHttpController(messageBus, resultService, config);
122119
initContractNegotiationListener(
123-
monitor, negotiationObservable, messageBus, contractSyncService, dataTransferInitializer);
124-
initDataReferenceReceiver(monitor, messageBus, dataRefSyncService);
120+
negotiationObservable, messageBus, contractSyncService, dataTransferInitializer);
121+
initDataReferenceReceiver(messageBus, dataRefSyncService);
122+
initDataRefErrorHandler(messageBus, storeService, transferProcessService);
125123
}
126124

127125
private MessageBus createMessageBus(ListenerService listenerService, ServiceExtensionContext context, ApiAdapterConfig config) {
@@ -169,7 +167,6 @@ private void initMessageBus(SqlMessageBus messageBus, ApiAdapterConfig config) {
169167
}
170168

171169
private void initHttpController(
172-
Monitor monitor,
173170
MessageBus messageBus,
174171
ResultService resultService,
175172
ApiAdapterConfig config) {
@@ -191,7 +188,6 @@ private ContractNegotiationHandler getContractNegotiationHandler(
191188
}
192189

193190
private void initDataReferenceReceiver(
194-
Monitor monitor,
195191
MessageBus messageBus,
196192
DataRefNotificationSyncService dataRefSyncService) {
197193
EndpointDataReferenceReceiver dataReferenceReceiver =
@@ -200,7 +196,6 @@ private void initDataReferenceReceiver(
200196
}
201197

202198
private void initContractNegotiationListener(
203-
Monitor monitor,
204199
ContractNegotiationObservable negotiationObservable,
205200
MessageBus messageBus,
206201
ContractNotificationSyncService contractSyncService,
@@ -212,4 +207,21 @@ private void initContractNegotiationListener(
212207
negotiationObservable.registerListener(contractNegotiationListener);
213208
}
214209
}
210+
211+
private void initDataRefErrorHandler(
212+
MessageBus messageBus,
213+
ObjectStoreService objectStore,
214+
TransferProcessService transferProcessService) {
215+
216+
final int poolSize = 1;
217+
final int initialDelay = 5;
218+
final int interval = 5;
219+
ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(poolSize);
220+
221+
DataReferenceErrorHandler errorHandler = new DataReferenceErrorHandler(
222+
monitor, messageBus,objectStore, transferProcessService);
223+
224+
scheduler.scheduleAtFixedRate(errorHandler::validateActiveProcesses,
225+
initialDelay, interval, TimeUnit.SECONDS);
226+
}
215227
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/HttpController.java

+12-2
Original file line numberDiff line numberDiff line change
@@ -20,8 +20,11 @@
2020
import jakarta.ws.rs.core.MediaType;
2121
import jakarta.ws.rs.core.Response;
2222
import java.util.Objects;
23+
import java.util.concurrent.TimeUnit;
24+
2325
import lombok.RequiredArgsConstructor;
2426
import org.eclipse.edc.spi.monitor.Monitor;
27+
import org.eclipse.edc.util.string.StringUtils;
2528
import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto;
2629
import org.eclipse.tractusx.edc.cp.adapter.dto.ProcessData;
2730
import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel;
@@ -45,7 +48,8 @@ public Response getAssetSynchronous(
4548
@PathParam("assetId") String assetId,
4649
@QueryParam("providerUrl") String providerUrl,
4750
@QueryParam("contractAgreementId") String contractAgreementId,
48-
@QueryParam("contractAgreementReuse") String contractAgreementReuse) {
51+
@QueryParam("contractAgreementReuse") String contractAgreementReuse,
52+
@QueryParam("timeout") String timeout) {
4953

5054
if (invalidParams(assetId, providerUrl)) {
5155
return badRequestResponse();
@@ -55,7 +59,9 @@ public Response getAssetSynchronous(
5559
initiateProcess(assetId, providerUrl, contractAgreementId, contractAgreementReuse);
5660

5761
try {
58-
ProcessData processData = resultService.pull(traceId);
62+
ProcessData processData = StringUtils.isNullOrEmpty(timeout) || !isNumeric(timeout)
63+
? resultService.pull(traceId)
64+
: resultService.pull(traceId, Long.parseLong(timeout), TimeUnit.SECONDS);
5965

6066
if (Objects.isNull(processData)) {
6167
return notFoundResponse();
@@ -130,4 +136,8 @@ private Response timeoutResponse() {
130136
.entity(Response.Status.REQUEST_TIMEOUT.getReasonPhrase())
131137
.build();
132138
}
139+
140+
private boolean isNumeric(String str) {
141+
return str != null && str.matches("[0-9]+");
142+
}
133143
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/dto/ProcessData.java

+1
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,7 @@ public class ProcessData {
3939
@Setter private String contractNegotiationId;
4040
@Setter private String contractAgreementId;
4141
@Builder.Default @Setter private boolean isContractConfirmed = false;
42+
@Setter private String transferProcessId;
4243

4344
// result/response data
4445
@Setter private EndpointDataReference endpointDataReference;

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNegotiationListenerImpl.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,8 @@ public void failed(ContractNegotiation negotiation) {
7474
}
7575

7676
public void initiateDataTransfer(DataReferenceRetrievalDto dto) {
77-
dataTransfer.initiate(dto);
77+
String transferProcessId = dataTransfer.initiate(dto);
78+
dto.getPayload().setTransferProcessId(transferProcessId);
7879
dto.getPayload().setContractConfirmed(true);
7980
messageBus.send(Channel.DATA_REFERENCE, dto);
8081
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/ContractNotificationHandler.java

+2-1
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,8 @@ public void process(DataReferenceRetrievalDto dto) {
7373
}
7474

7575
public void initiateDataTransfer(DataReferenceRetrievalDto dto) {
76-
dataTransfer.initiate(dto);
76+
String transferProcessId = dataTransfer.initiate(dto);
77+
dto.getPayload().setTransferProcessId(transferProcessId);
7778
dto.getPayload().setContractConfirmed(true);
7879
messageBus.send(Channel.DATA_REFERENCE, dto);
7980
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/process/contractnotification/DataTransferInitializer.java

+3-1
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ public class DataTransferInitializer {
1515
private final Monitor monitor;
1616
private final TransferProcessService transferProcessService;
1717

18-
public void initiate(DataReferenceRetrievalDto dto) {
18+
public String initiate(DataReferenceRetrievalDto dto) {
1919
monitor.info(
2020
String.format(
2121
"[%s] ContractConfirmationHandler: transfer init - start.", dto.getTraceId()));
@@ -46,6 +46,8 @@ public void initiate(DataReferenceRetrievalDto dto) {
4646
if (result.failed()) {
4747
throwDataRefRequestException(dto);
4848
}
49+
50+
return result.getContent();
4951
}
5052

5153
private void throwDataRefRequestException(DataReferenceRetrievalDto dto) {
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
package org.eclipse.tractusx.edc.cp.adapter.process.datareference;
2+
3+
import jakarta.ws.rs.core.Response;
4+
import lombok.AllArgsConstructor;
5+
import org.eclipse.edc.connector.spi.transferprocess.TransferProcessService;
6+
import org.eclipse.edc.spi.monitor.Monitor;
7+
import org.eclipse.edc.util.string.StringUtils;
8+
import org.eclipse.tractusx.edc.cp.adapter.dto.DataReferenceRetrievalDto;
9+
import org.eclipse.tractusx.edc.cp.adapter.messaging.Channel;
10+
import org.eclipse.tractusx.edc.cp.adapter.messaging.MessageBus;
11+
import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectStoreService;
12+
import org.eclipse.tractusx.edc.cp.adapter.service.objectstore.ObjectType;
13+
14+
import java.util.List;
15+
16+
@AllArgsConstructor
17+
public class DataReferenceErrorHandler {
18+
private static final String ERROR_MESSAGE = "Data reference process stage failed with status: ";
19+
private final Monitor monitor;
20+
private final MessageBus messageBus;
21+
private final ObjectStoreService objectStore;
22+
private final TransferProcessService transferProcessService;
23+
24+
private final List<String> errorStates = List.of("CANCELLED", "ERROR");
25+
26+
public void validateActiveProcesses() {
27+
monitor.debug("Data reference error handling - START");
28+
objectStore.get(ObjectType.DTO, DataReferenceRetrievalDto.class).stream()
29+
.filter(dto -> !StringUtils.isNullOrEmpty(dto.getPayload().getTransferProcessId()))
30+
.forEach(this::validateProcess);
31+
}
32+
33+
private void validateProcess(DataReferenceRetrievalDto dto) {
34+
String state = transferProcessService.getState(dto.getPayload().getTransferProcessId());
35+
if (errorStates.contains(state)) {
36+
monitor.warning(String.format("[%s] ", dto.getTraceId()) + ERROR_MESSAGE + state);
37+
String contractAgreementId = dto.getPayload().getContractAgreementId();
38+
objectStore.remove(contractAgreementId, ObjectType.DTO);
39+
40+
dto.getPayload().setErrorStatus(Response.Status.BAD_GATEWAY);
41+
dto.getPayload().setErrorMessage(ERROR_MESSAGE + state);
42+
messageBus.send(Channel.RESULT, dto);
43+
}
44+
}
45+
}
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,10 @@
11
package org.eclipse.tractusx.edc.cp.adapter.service.objectstore;
22

3+
import java.util.List;
4+
35
public interface ObjectStoreService {
46
void put(String key, ObjectType objectType, Object object);
57
<T> T get(String key, ObjectType objectType, Class<T> type);
68
void remove(String key, ObjectType objectType);
9+
<T> List<T> get(ObjectType objectType, Class<T> type);
710
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceInMemory.java

+24-10
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,10 @@
55
import lombok.AllArgsConstructor;
66

77
import java.util.HashMap;
8+
import java.util.List;
89
import java.util.Map;
10+
import java.util.Objects;
11+
import java.util.stream.Collectors;
912

1013
@AllArgsConstructor
1114
public class ObjectStoreServiceInMemory implements ObjectStoreService {
@@ -25,17 +28,18 @@ public void put(String key, ObjectType objectType, Object object) {
2528
@Override
2629
public <T> T get(String key, ObjectType objectType, Class<T> type) {
2730
String json = map.get(getKey(key, objectType));
28-
if (json == null) {
29-
return null;
30-
}
31+
return Objects.isNull(json)
32+
? null
33+
: map(type, json);
34+
}
3135

32-
T object = null;
33-
try {
34-
object = mapper.readValue(json, type);
35-
} catch (JsonProcessingException e) {
36-
e.printStackTrace();
37-
}
38-
return object;
36+
@Override
37+
public <T> List<T> get(ObjectType objectType, Class<T> type) {
38+
return map.entrySet().stream()
39+
.filter(entry -> entry.getKey().startsWith(objectType.name()))
40+
.map(Map.Entry::getValue)
41+
.map(s -> map(type, s))
42+
.collect(Collectors.toList());
3943
}
4044

4145
@Override
@@ -46,4 +50,14 @@ public void remove(String key, ObjectType objectType) {
4650
private String getKey(String key, ObjectType objectType) {
4751
return objectType.name() + key;
4852
}
53+
54+
private <T> T map(Class<T> type, String json) {
55+
T object = null;
56+
try {
57+
object = mapper.readValue(json, type);
58+
} catch (JsonProcessingException e) {
59+
e.printStackTrace();
60+
}
61+
return object;
62+
}
4963
}

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/service/objectstore/ObjectStoreServiceSql.java

+14
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@
33
import com.fasterxml.jackson.core.JsonProcessingException;
44
import com.fasterxml.jackson.databind.ObjectMapper;
55
import lombok.AllArgsConstructor;
6+
import org.eclipse.edc.util.collection.CollectionUtil;
67
import org.eclipse.tractusx.edc.cp.adapter.store.SqlObjectStore;
78
import org.eclipse.tractusx.edc.cp.adapter.store.model.ObjectEntity;
89

10+
import java.util.List;
911
import java.util.Objects;
12+
import java.util.stream.Collectors;
1013

1114
@AllArgsConstructor
1215
public class ObjectStoreServiceSql implements ObjectStoreService {
@@ -32,6 +35,17 @@ public <T> T get(String key, ObjectType objectType, Class<T> type) {
3235
return jsonToObject(entity, type);
3336
}
3437

38+
@Override
39+
public <T> List<T> get(ObjectType objectType, Class<T> type) {
40+
List<ObjectEntity> entities = objectStore.find(objectType.name());
41+
if (CollectionUtil.isEmpty(entities)) {
42+
return List.of();
43+
}
44+
return entities.stream()
45+
.map(entity -> jsonToObject(entity, type))
46+
.collect(Collectors.toList());
47+
}
48+
3549
@Override
3650
public void remove(String key, ObjectType objectType) {
3751
objectStore.deleteMessage(key, objectType.name());

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/SqlObjectStore.java

+15
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@
1111
import java.sql.ResultSet;
1212
import java.sql.SQLException;
1313
import java.time.Instant;
14+
import java.util.List;
15+
import java.util.stream.Collectors;
1416

1517
import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuery;
1618
import static org.eclipse.edc.sql.SqlQueryExecutor.executeQuerySingle;
@@ -52,6 +54,19 @@ public ObjectEntity find(String id, String type) {
5254
});
5355
}
5456

57+
public List<ObjectEntity> find(String type) {
58+
return transactionContext.execute(() -> {
59+
try (var connection = getConnection()) {
60+
var sql = statements.getFindByTypeTemplate();
61+
return executeQuery(connection, false, this::mapObjectEntity, sql, type)
62+
.collect(Collectors.toList());
63+
} catch (SQLException e) {
64+
e.printStackTrace();
65+
throw new EdcPersistenceException(e);
66+
}
67+
});
68+
}
69+
5570
public void deleteMessage(String id, String type) {
5671
transactionContext.execute(() -> {
5772
try (var connection = getConnection()) {

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/BaseSqlDialectObjectStoreStatements.java

+5
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,11 @@ public String getFindByIdAndTypeTemplate() {
1616
return format("SELECT * FROM %s WHERE %s = ? AND %s = ?", getObjectStoreTable(), getIdColumn(), getTypeColumn());
1717
}
1818

19+
@Override
20+
public String getFindByTypeTemplate() {
21+
return format("SELECT * FROM %s WHERE %s = ?", getObjectStoreTable(), getTypeColumn());
22+
}
23+
1924
@Override
2025
public String getDeleteTemplate() {
2126
return format("DELETE FROM %s WHERE %s = ? AND %s = ?;", getObjectStoreTable(), getIdColumn(), getTypeColumn());

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/store/schema/ObjectStoreStatements.java

+2
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,7 @@ default String getObjectColumn() {
2525

2626
String getFindByIdAndTypeTemplate();
2727

28+
String getFindByTypeTemplate();
29+
2830
String getDeleteTemplate();
2931
}

edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/HttpControllerTest.java

+3-3
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ public void getAssetSynchronous_shouldReturnBadRequestIfNoAssetIdParam() {
4242
HttpController httpController = new HttpController(monitor, resultService, messageBus, config);
4343

4444
// when
45-
Response response = httpController.getAssetSynchronous(null, "providerUrl", null, null);
45+
Response response = httpController.getAssetSynchronous(null, "providerUrl", null, null, null);
4646

4747
// then
4848
assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -59,7 +59,7 @@ public void getAssetSynchronous_shouldReturnBadRequestIfNoProviderUrlParam() {
5959
HttpController httpController = new HttpController(monitor, resultService, messageBus, config);
6060

6161
// when
62-
Response response = httpController.getAssetSynchronous("assetId", null, null, null);
62+
Response response = httpController.getAssetSynchronous("assetId", null, null, null, null);
6363

6464
// then
6565
assertEquals(Response.Status.BAD_REQUEST.getStatusCode(), response.getStatus());
@@ -79,7 +79,7 @@ public void getAssetSynchronous_shouldReturnOkResponse() throws InterruptedExcep
7979
ProcessData.builder().endpointDataReference(getEndpointDataReference()).build());
8080

8181
// when
82-
Response response = httpController.getAssetSynchronous("assetId", "providerUrl", null, null);
82+
Response response = httpController.getAssetSynchronous("assetId", "providerUrl", null, null, null);
8383

8484
// then
8585
assertEquals(Response.Status.OK.getStatusCode(), response.getStatus());

0 commit comments

Comments
 (0)