Skip to content

Commit 8c64e8e

Browse files
author
marcingajek-zf
committed
cp-adapter : persistance, horizontal scaling
1 parent 2cad49a commit 8c64e8e

File tree

4 files changed

+37
-20
lines changed

4 files changed

+37
-20
lines changed

edc-extensions/control-plane-adapter/README.md

+25-12
Original file line numberDiff line numberDiff line change
@@ -4,23 +4,36 @@ The goal of this extension is to simplify the process of retrieving data out of
44

55
Additional requirements, that affects the architecture of the extension:
66
- can return data both in SYNC and ASYNC mode (currently only SYNC endpoint available)
7-
- can be persistent, so that process can be restored from the point where it was before application was stopped (not implemented yet)
8-
- prepared to scale horizontally (not yet implemented)
7+
- can be persistent, so that process can be restored from the point where it was before application was stopped
8+
- scaling horizontally (when persistence is added to configuration)
99
- can retry failed part of the process (no need to start the process from the beginning)
1010

11-
<b>Configuration:</b>
11+
<h2><u>Configuration:</u></h2>
1212

13-
| Key | Description | Mandatory | Default |
14-
|:-------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------|
15-
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
16-
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
17-
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
18-
| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement. Value 1 = on, 0 = off. | no | 1 |
19-
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 |
13+
| Key | Description | Mandatory | Default |
14+
|:-------------------------------------------------|:---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|------|---------|
15+
| edc.cp.adapter.default.message.retry.number | Number of retries of a message, in case of an error, within the internal process of retrieving DataReference | no | 3 |
16+
| edc.cp.adapter.default.sync.request.timeout | Timeout for synchronous request (in seconds), after witch 'timeout' error will be returned to the requesting client | no | 20 |
17+
| edc.cp.adapter.messagebus.inmemory.thread.number | Number of threads running within the in-memory implementation of MessageBus _ _ | no | 10 |
18+
| edc.cp.adapter.reuse.contract.agreement | Turn on/off reusing of existing contract agreements for the specific asset. Once the contract is agreed, the second request for the same asset will reuse the agreement (if exists) pulled from the EDC. Value 1 = on, 0 = off. | no | 1 |
19+
| edc.cp.adapter.cache.catalog.expire.after | Number of seconds, after witch prevoiusly requested catalog will not be reused, and will be removed from catalog cache | no | 300 |
2020
| edc.cp.adapter.catalog.request.limit | Maximum number of items taken from Catalog within single request. Requests are repeated until all offers of the query are retrieved | no | 100 |
2121

22+
By default, the extension works in "IN MEMORY" mode. This setup has some limitations:
23+
+ It can work only within single EDC instance. If CP-adapter requests are handled by more than one EDC, data flow may be broken.
24+
+ If the EDC instance is restarted, all running processes are lost.
2225

23-
<b>How to use it:</b>
26+
To run CP-Adapter in "PERSISTENT" mode, You need to create a proper tables with [this](docs/schema.sql) script, and add the following configuration values to Your control-plane EDC properties file:
27+
28+
| Key | Description |
29+
|-----------------------------------|-------------|
30+
| edc.datasource.cpadapter.name | data source name |
31+
| edc.datasource.cpadapter.url | data source url |
32+
| edc.datasource.cpadapter.user | data source user |
33+
| edc.datasource.cpadapter.password | data source password |
34+
35+
36+
<h2><u>How to use it:</u></h2>
2437
1. Client sends a GET request with two parameters: assetId and the url of the provider control-plane:
2538

2639
```
@@ -33,7 +46,7 @@ Additional requirements, that affects the architecture of the extension:
3346
http://localhost:9193/api/v1/data/adapter/asset/sync/123?providerUrl=http://localhost:8182/api/v1/ids/data
3447
```
3548

36-
Oprional request parameters:
49+
Optional request parameters, that overwrite the settings for a single request:
3750

3851
| Name | Description |
3952
|--- |--- |

edc-extensions/control-plane-adapter/pom.xml

+1-1
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@
122122
<dependency>
123123
<groupId>org.eclipse.edc</groupId>
124124
<artifactId>sql-lease</artifactId>
125-
<version>0.0.1-20221201-SNAPSHOT</version> <!-- TODO why needs version -->
125+
<version>${org.eclipse.edc.version}</version>
126126
</dependency>
127127
<dependency>
128128
<groupId>org.eclipse.edc</groupId>

edc-extensions/control-plane-adapter/src/main/java/org/eclipse/tractusx/edc/cp/adapter/ApiAdapterConfig.java

+2-4
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,8 @@ public class ApiAdapterConfig {
2323
"edc.cp.adapter.default.sync.request.timeout";
2424
private static final String CATALOG_EXPIRE_AFTER_TIME =
2525
"edc.cp.adapter.cache.catalog.expire.after";
26-
private static final String REUSE_CONTRACT_AGREEMENT =
27-
"edc.cp.adapter.reuse.contract.agreement";
28-
private static final String CATALOG_REQUEST_LIMIT =
29-
"edc.cp.adapter.catalog.request.limit";
26+
private static final String REUSE_CONTRACT_AGREEMENT = "edc.cp.adapter.reuse.contract.agreement";
27+
private static final String CATALOG_REQUEST_LIMIT = "edc.cp.adapter.catalog.request.limit";
3028

3129
private static final String DATASOURCE_NAME = "edc.datasource.cpadapter.name";
3230
private static final String DATASOURCE_URL = "edc.datasource.cpadapter.url";

edc-extensions/control-plane-adapter/src/test/java/org/eclipse/tractusx/edc/cp/adapter/messaging/SqlMessageBusTest.java

+9-3
Original file line numberDiff line numberDiff line change
@@ -58,11 +58,11 @@ public void send_shouldCallListenerOnce() throws InterruptedException {
5858

5959
// when
6060
messageBus.send(Channel.INITIAL, message);
61-
Thread.sleep(50);
61+
Thread.sleep(60);
6262
messageBus.deliverMessages(10);
6363

6464
// then
65-
Thread.sleep(50);
65+
Thread.sleep(60);
6666
verify(listener, times(1)).process(any(DataReferenceRetrievalDto.class));
6767
}
6868

@@ -78,6 +78,7 @@ public void send_shouldCallListenerWithRetryOnException() throws InterruptedExce
7878
// when
7979
messageBus.send(Channel.INITIAL, message);
8080
messageBus.deliverMessages(10);
81+
Thread.sleep(60);
8182

8283
// then
8384
verify(listener, times(2)).process(any(DataReferenceRetrievalDto.class));
@@ -95,7 +96,7 @@ public void send_shouldSendToDlqIfErrorLimitReached() throws InterruptedExceptio
9596

9697
// when
9798
messageBus.send(Channel.INITIAL, message);
98-
Thread.sleep(50);
99+
Thread.sleep(60);
99100

100101
// then
101102
verify(listenerService).getListener(eq(Channel.DLQ));
@@ -226,6 +227,11 @@ public void execute(TransactionBlock transactionBlock) {}
226227
public <T> T execute(ResultTransactionBlock<T> resultTransactionBlock) {
227228
return null;
228229
}
230+
231+
@Override
232+
public void registerSynchronization(TransactionSynchronization transactionSynchronization) {
233+
234+
}
229235
};
230236
}
231237
}

0 commit comments

Comments
 (0)