Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor: replace policy with policyId on ContractAgreement #1220

Merged
merged 2 commits into from
Apr 28, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ in the detailed section referring to by linking pull requests or issues.
* Extract single `PolicyArchive` implementation (#1158)
* Replace `accessPolicy` and `contractPolicy` with `accessPolicyId` and `contractPolicyId` on `ContractDefinition` (#1144)
* All DMgmt Api methods now produce and consume `APPLICATION_JSON` (#1175)
* Replace `policy` with `policyId` on `ContractAgreement` (#1220)

#### Removed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ private void registerServices(ServiceExtensionContext context) {
var observable = new ContractNegotiationObservableImpl();
context.registerService(ContractNegotiationObservable.class, observable);

context.registerService(PolicyArchive.class, new PolicyArchiveImpl(store));
context.registerService(PolicyArchive.class, new PolicyArchiveImpl(store, policyStore));

consumerNegotiationManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance()
.waitStrategy(waitStrategy)
Expand All @@ -159,6 +159,7 @@ private void registerServices(ServiceExtensionContext context) {
.telemetry(telemetry)
.executorInstrumentation(context.getService(ExecutorInstrumentation.class))
.store(store)
.policyStore(policyStore)
.batchSize(context.getSetting(NEGOTIATION_CONSUMER_STATE_MACHINE_BATCH_SIZE, 5))
.build();

Expand All @@ -173,6 +174,7 @@ private void registerServices(ServiceExtensionContext context) {
.telemetry(telemetry)
.executorInstrumentation(context.getService(ExecutorInstrumentation.class))
.store(store)
.policyStore(policyStore)
.batchSize(context.getSetting(NEGOTIATION_PROVIDER_STATE_MACHINE_BATCH_SIZE, 5))
.build();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.eclipse.dataspaceconnector.spi.contract.validation.ContractValidationService;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyStore;
import org.eclipse.dataspaceconnector.spi.retry.WaitStrategy;
import org.eclipse.dataspaceconnector.spi.system.ExecutorInstrumentation;
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
Expand All @@ -43,6 +44,7 @@ public abstract class AbstractContractNegotiationManager {
protected ExecutorInstrumentation executorInstrumentation;
protected int batchSize = 5;
protected WaitStrategy waitStrategy = () -> 5000L; // default wait five seconds
protected PolicyStore policyStore;

public static class Builder<T extends AbstractContractNegotiationManager> {

Expand Down Expand Up @@ -109,6 +111,11 @@ public Builder<T> store(ContractNegotiationStore store) {
return this;
}

public Builder<T> policyStore(PolicyStore policyStore) {
manager.policyStore = policyStore;
return this;
}

public T build() {
Objects.requireNonNull(manager.validationService, "contractValidationService");
Objects.requireNonNull(manager.monitor, "monitor");
Expand All @@ -119,11 +126,11 @@ public T build() {
Objects.requireNonNull(manager.telemetry, "telemetry");
Objects.requireNonNull(manager.executorInstrumentation, "executorInstrumentation");
Objects.requireNonNull(manager.negotiationStore, "store");
Objects.requireNonNull(manager.policyStore, "policyStore");
manager.commandProcessor = new CommandProcessor<>(manager.commandQueue, manager.commandRunner, manager.monitor);

return manager;
}

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.dataspaceconnector.common.statemachine.StateMachine;
import org.eclipse.dataspaceconnector.common.statemachine.StateProcessorImpl;
import org.eclipse.dataspaceconnector.contract.common.ContractId;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.ConsumerContractNegotiationManager;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.observe.ContractNegotiationListener;
import org.eclipse.dataspaceconnector.spi.iam.ClaimToken;
Expand Down Expand Up @@ -175,13 +176,13 @@ public StatusResult<ContractNegotiation> offerReceived(ClaimToken token, String
* @param token Claim token of the consumer that send the contract request.
* @param negotiationId Id of the ContractNegotiation.
* @param agreement Agreement sent by provider.
* @param hash A hash of all previous contract offers.
* @param policy the policy
* @return a {@link StatusResult}: FATAL_ERROR, if no match found for Id or no last
* offer found for negotiation; OK otherwise
*/
@WithSpan
@Override
public StatusResult<ContractNegotiation> confirmed(ClaimToken token, String negotiationId, ContractAgreement agreement, String hash) {
public StatusResult<ContractNegotiation> confirmed(ClaimToken token, String negotiationId, ContractAgreement agreement, Policy policy) {
var negotiation = negotiationStore.find(negotiationId);
if (negotiation == null) {
return StatusResult.failure(FATAL_ERROR, format("ContractNegotiation with id %s not found", negotiationId));
Expand Down Expand Up @@ -214,6 +215,7 @@ public StatusResult<ContractNegotiation> confirmed(ClaimToken token, String nego
// TODO: otherwise will fail. But should do it, since it's already confirmed? A duplicated message received shouldn't be an issue
negotiation.transitionConfirmed();
}
policyStore.save(policy);
update(negotiation, l -> l.preConfirmed(negotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
Expand Down Expand Up @@ -390,14 +392,15 @@ private boolean processConsumerApproving(ContractNegotiation negotiation) {
}
var definitionId = contractIdTokens[DEFINITION_PART];

var policy = lastOffer.getPolicy();
var agreement = ContractAgreement.Builder.newInstance()
.id(ContractId.createContractId(definitionId))
.contractStartDate(Instant.now().getEpochSecond())
.contractEndDate(Instant.now().plus(365, ChronoUnit.DAYS).getEpochSecond()) // TODO Make configurable (issue #722)
.contractSigningDate(Instant.now().getEpochSecond())
.providerAgentId(String.valueOf(lastOffer.getProvider()))
.consumerAgentId(String.valueOf(lastOffer.getConsumer()))
.policy(lastOffer.getPolicy())
.policyId(policy.getUid())
.assetId(lastOffer.getAsset().getId())
.build();

Expand All @@ -407,6 +410,7 @@ private boolean processConsumerApproving(ContractNegotiation negotiation) {
.connectorAddress(negotiation.getCounterPartyAddress())
.contractAgreement(agreement)
.correlationId(negotiation.getId())
.policy(policy)
.build();

// TODO protocol-independent response type?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.eclipse.dataspaceconnector.common.statemachine.StateMachine;
import org.eclipse.dataspaceconnector.common.statemachine.StateProcessorImpl;
import org.eclipse.dataspaceconnector.contract.common.ContractId;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.ProviderContractNegotiationManager;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.observe.ContractNegotiationListener;
import org.eclipse.dataspaceconnector.spi.iam.ClaimToken;
Expand Down Expand Up @@ -370,6 +371,7 @@ private boolean processConfirming(ContractNegotiation negotiation) {
var retrievedAgreement = negotiation.getContractAgreement();

ContractAgreement agreement;
Policy policy;
if (retrievedAgreement == null) {
var lastOffer = negotiation.getLastContractOffer();

Expand All @@ -380,6 +382,7 @@ private boolean processConfirming(ContractNegotiation negotiation) {
}
var definitionId = contractIdTokens[DEFINITION_PART];

policy = lastOffer.getPolicy();
//TODO move to own service
agreement = ContractAgreement.Builder.newInstance()
.id(ContractId.createContractId(definitionId))
Expand All @@ -388,11 +391,12 @@ private boolean processConfirming(ContractNegotiation negotiation) {
.contractSigningDate(Instant.now().getEpochSecond())
.providerAgentId(String.valueOf(lastOffer.getProvider()))
.consumerAgentId(String.valueOf(lastOffer.getConsumer()))
.policy(lastOffer.getPolicy())
.policyId(policy.getUid())
.assetId(lastOffer.getAsset().getId())
.build();
} else {
agreement = retrievedAgreement;
policy = policyStore.findById(agreement.getPolicyId());
}

var request = ContractAgreementRequest.Builder.newInstance()
Expand All @@ -401,6 +405,7 @@ private boolean processConfirming(ContractNegotiation negotiation) {
.connectorAddress(negotiation.getCounterPartyAddress())
.contractAgreement(agreement)
.correlationId(negotiation.getCorrelationId())
.policy(policy)
.build();

//TODO protocol-independent response type?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,22 +17,26 @@
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.store.ContractNegotiationStore;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyArchive;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyStore;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.agreement.ContractAgreement;

import java.util.Optional;

public class PolicyArchiveImpl implements PolicyArchive {
private final ContractNegotiationStore contractNegotiationStore;
private final PolicyStore policyStore;

public PolicyArchiveImpl(ContractNegotiationStore contractNegotiationStore) {
public PolicyArchiveImpl(ContractNegotiationStore contractNegotiationStore, PolicyStore policyStore) {
this.contractNegotiationStore = contractNegotiationStore;
this.policyStore = policyStore;
}

@Override
public Policy findPolicyForContract(String contractId) {
return Optional.ofNullable(contractId)
.map(contractNegotiationStore::findContractAgreement)
.map(ContractAgreement::getPolicy)
.map(ContractAgreement::getPolicyId)
.map(policyStore::findById)
.orElse(null);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,9 @@
import org.eclipse.dataspaceconnector.spi.message.MessageContext;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcher;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.dataspaceconnector.spi.monitor.ConsoleMonitor;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.store.PolicyStore;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.types.domain.asset.Asset;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.agreement.ContractAgreementRequest;
Expand Down Expand Up @@ -69,6 +71,8 @@ public abstract class AbstractContractNegotiationIntegrationTest {

protected ContractValidationService validationService;

protected final PolicyStore policyStore = mock(PolicyStore.class);

protected String consumerNegotiationId;

protected ClaimToken token;
Expand All @@ -84,7 +88,7 @@ void setUp() {
validationService = mock(ContractValidationService.class);

// Create a monitor that logs to the console
Monitor monitor = new FakeConsoleMonitor();
Monitor monitor = new ConsoleMonitor();

// Create CommandQueue mock
CommandQueue<ContractNegotiationCommand> queue = (CommandQueue<ContractNegotiationCommand>) mock(CommandQueue.class);
Expand All @@ -105,6 +109,7 @@ void setUp() {
.commandRunner(runner)
.observable(providerObservable)
.store(providerStore)
.policyStore(policyStore)
.build();

consumerManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -116,6 +121,7 @@ void setUp() {
.commandRunner(runner)
.observable(consumerObservable)
.store(consumerStore)
.policyStore(policyStore)
.build();

countDownLatch = new CountDownLatch(2);
Expand All @@ -125,7 +131,7 @@ void setUp() {
* Implementation of the ContractNegotiationListener that signals a CountDownLatch when the
* confirmed state has been reached.
*/
protected class ConfirmedContractNegotiationListener implements ContractNegotiationListener {
protected static class ConfirmedContractNegotiationListener implements ContractNegotiationListener {

private final CountDownLatch countDownLatch;

Expand All @@ -143,7 +149,7 @@ public void preConfirmed(ContractNegotiation negotiation) {
* Implementation of the ContractNegotiationListener that signals a CountDownLatch when the
* declined state has been reached.
*/
protected class DeclinedContractNegotiationListener implements ContractNegotiationListener {
protected static class DeclinedContractNegotiationListener implements ContractNegotiationListener {

private final CountDownLatch countDownLatch;

Expand Down Expand Up @@ -180,7 +186,7 @@ public CompletableFuture<Object> send(RemoteMessage message) {
result = consumerManager.offerReceived(token, request.getCorrelationId(), request.getContractOffer(), "hash");
} else if (message instanceof ContractAgreementRequest) {
var request = (ContractAgreementRequest) message;
result = consumerManager.confirmed(token, request.getCorrelationId(), request.getContractAgreement(), "hash");
result = consumerManager.confirmed(token, request.getCorrelationId(), request.getContractAgreement(), request.getPolicy());
} else if (message instanceof ContractRejection) {
var request = (ContractRejection) message;
result = consumerManager.declined(token, request.getCorrelationId());
Expand Down Expand Up @@ -245,51 +251,6 @@ public CompletableFuture<Object> send(RemoteMessage message) {
}
}

/**
* Monitor implementation that prints to the console.
*/
protected class FakeConsoleMonitor implements Monitor {
@Override
public void debug(String message, Throwable... errors) {
System.out.println("\u001B[34mDEBUG\u001B[0m - " + message);
if (errors != null && errors.length > 0) {
for (Throwable error : errors) {
error.printStackTrace();
}
}
}

@Override
public void info(String message, Throwable... errors) {
System.out.println("\u001B[32mINFO\u001B[0m - " + message);
if (errors != null && errors.length > 0) {
for (Throwable error : errors) {
error.printStackTrace();
}
}
}

@Override
public void warning(String message, Throwable... errors) {
System.out.println("\u001B[33mWARNING\u001B[0m - " + message);
if (errors != null && errors.length > 0) {
for (Throwable error : errors) {
error.printStackTrace();
}
}
}

@Override
public void severe(String message, Throwable... errors) {
System.out.println("\u001B[31mSEVERE\u001B[0m - " + message);
if (errors != null && errors.length > 0) {
for (Throwable error : errors) {
error.printStackTrace();
}
}
}
}

/**
* Creates the initial contract offer.
*
Expand Down
Loading