Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

core: dispatch ContractNegotiation events #1609

Merged
merged 3 commits into from
Jul 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ in the detailed section referring to by linking pull requests or issues.

* Event Framework for Asset entity (#1453)
* Event Framework for ContractDefinition entity (#1436)
* Event Framework for ContractNegotiation entity (#1434)
* Event Framework for PolicyDefinition entity (#1437)
* Event Framework for TransferProcess entity (#1439)
* SQL Translation layer (#1357, #1459)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import org.eclipse.dataspaceconnector.common.statemachine.retry.EntitySendRetryManager;
import org.eclipse.dataspaceconnector.common.statemachine.retry.SendRetryManager;
import org.eclipse.dataspaceconnector.contract.listener.ContractNegotiationEventListener;
import org.eclipse.dataspaceconnector.contract.negotiation.ConsumerContractNegotiationManagerImpl;
import org.eclipse.dataspaceconnector.contract.negotiation.ProviderContractNegotiationManagerImpl;
import org.eclipse.dataspaceconnector.contract.observe.ContractNegotiationObservableImpl;
Expand All @@ -43,6 +44,7 @@
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.contract.validation.ContractValidationService;
import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyEngine;
Expand Down Expand Up @@ -118,6 +120,9 @@ public class ContractServiceExtension implements ServiceExtension {
@Inject
private Clock clock;

@Inject
private EventRouter eventRouter;

@Override
public String name() {
return "Core Contract Service";
Expand Down Expand Up @@ -162,6 +167,8 @@ private void registerServices(ServiceExtensionContext context) {
CommandRunner<ContractNegotiationCommand> commandRunner = new CommandRunner<>(commandHandlerRegistry, monitor);

var observable = new ContractNegotiationObservableImpl();
observable.registerListener(new ContractNegotiationEventListener(eventRouter, clock));

context.registerService(ContractNegotiationObservable.class, observable);
context.registerService(PolicyArchive.class, new PolicyArchiveImpl(store));

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
/*
* Copyright (c) 2022 Bayerische Motoren Werke Aktiengesellschaft (BMW AG)
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Bayerische Motoren Werke Aktiengesellschaft (BMW AG) - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.contract.listener;

import org.eclipse.dataspaceconnector.spi.contract.negotiation.observe.ContractNegotiationListener;
import org.eclipse.dataspaceconnector.spi.event.EventRouter;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationApproved;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationConfirmed;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationDeclined;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationFailed;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationInitiated;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationOffered;
import org.eclipse.dataspaceconnector.spi.event.contractnegotiation.ContractNegotiationRequested;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.ContractNegotiation;

import java.time.Clock;

public class ContractNegotiationEventListener implements ContractNegotiationListener {
private final EventRouter eventRouter;
private final Clock clock;

public ContractNegotiationEventListener(EventRouter eventRouter, Clock clock) {
this.eventRouter = eventRouter;
this.clock = clock;
}

@Override
public void initiated(ContractNegotiation negotiation) {
var event = ContractNegotiationInitiated.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void requested(ContractNegotiation negotiation) {
var event = ContractNegotiationRequested.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void offered(ContractNegotiation negotiation) {
var event = ContractNegotiationOffered.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void approved(ContractNegotiation negotiation) {
var event = ContractNegotiationApproved.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void declined(ContractNegotiation negotiation) {
var event = ContractNegotiationDeclined.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void confirmed(ContractNegotiation negotiation) {
var event = ContractNegotiationConfirmed.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}

@Override
public void failed(ContractNegotiation negotiation) {
var event = ContractNegotiationFailed.Builder.newInstance()
.contractNegotiationId(negotiation.getId())
.at(clock.millis())
.build();

eventRouter.publish(event);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -210,6 +210,7 @@ public BiConsumer<Object, Throwable> build() {
} else if (sendRetryManager.retriesExhausted(negotiation)) {
negotiation.transitionError("Retry limited exceeded: " + throwable.getMessage());
update(negotiation, l -> l.preError(negotiation));
observable.invokeForEach(l -> l.failed(negotiation));
monitor.warning(format("[%s] attempt #%d failed to %s. Retry limit exceeded, ContractNegotiation %s moves to ERROR state",
getName(), negotiation.getStateCount(), operationDescription, negotiation.getId()), throwable);
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ public StatusResult<ContractNegotiation> initiate(ContractOfferRequest contractO
negotiation.addContractOffer(contractOffer.getContractOffer());
negotiation.transitionInitial();
update(negotiation, l -> l.preRequesting(negotiation));
observable.invokeForEach(l -> l.initiated(negotiation));

monitor.debug(String.format("[Consumer] ContractNegotiation initiated. %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
Expand Down Expand Up @@ -143,7 +144,7 @@ public StatusResult<ContractNegotiation> offerReceived(ClaimToken token, String
Result<ContractOffer> result = validationService.validate(token, contractOffer, latestOffer);
negotiation.addContractOffer(contractOffer); // TODO persist unchecked offer of provider?
if (result.failed()) {
monitor.debug("[Consumer] Contract offer received. Will be rejected.");
monitor.debug("[Consumer] Contract offer received. Will be rejected: " + result.getFailureDetail());
negotiation.setErrorDetail(result.getFailureMessages().get(0));
negotiation.transitionDeclining();
update(negotiation, l -> l.preDeclining(negotiation));
Expand Down Expand Up @@ -208,6 +209,7 @@ public StatusResult<ContractNegotiation> confirmed(ClaimToken token, String nego
negotiation.transitionConfirmed();
}
update(negotiation, l -> l.preConfirmed(negotiation));
observable.invokeForEach(l -> l.confirmed(negotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));

Expand All @@ -234,6 +236,7 @@ public StatusResult<ContractNegotiation> declined(ClaimToken token, String negot
monitor.debug("[Consumer] Contract rejection received. Abort negotiation process.");
negotiation.transitionDeclined();
update(negotiation, l -> l.preDeclined(negotiation));
observable.invokeForEach(l -> l.declined(negotiation));
monitor.debug(String.format("[Consumer] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
return StatusResult.success(negotiation);
Expand Down Expand Up @@ -421,6 +424,7 @@ private BiConsumer<Object, Throwable> onInitialOfferSent(String id) {
.onSuccess(negotiation -> {
negotiation.transitionRequested();
update(negotiation, l -> l.preRequested(negotiation));
observable.invokeForEach(l -> l.requested(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionRequesting();
Expand All @@ -435,6 +439,7 @@ private BiConsumer<Object, Throwable> onCounterOfferSent(String negotiationId) {
.onSuccess(negotiation -> {
negotiation.transitionOffered();
update(negotiation, l -> l.preConsumerOffered(negotiation));
observable.invokeForEach(l -> l.offered(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionOffering();
Expand All @@ -449,6 +454,7 @@ private BiConsumer<Object, Throwable> onAgreementSent(String negotiationId) {
.onSuccess(negotiation -> {
negotiation.transitionApproved();
update(negotiation, l -> l.preConsumerApproved(negotiation));
observable.invokeForEach(l -> l.approved(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionApproving();
Expand All @@ -463,6 +469,7 @@ private BiConsumer<Object, Throwable> onRejectionSent(String negotiationId) {
.onSuccess(negotiation -> {
negotiation.transitionDeclined();
update(negotiation, l -> l.preDeclined(negotiation));
observable.invokeForEach(l -> l.declined(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionDeclining();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public StatusResult<ContractNegotiation> declined(ClaimToken token, String corre
}
negotiation.transitionDeclined();
update(negotiation, l -> l.preDeclined(negotiation));
observable.invokeForEach(l -> l.declined(negotiation));
monitor.debug(String.format("[Provider] ContractNegotiation %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));

Expand Down Expand Up @@ -138,8 +139,8 @@ public StatusResult<ContractNegotiation> requested(ClaimToken token, ContractOff
.build();

negotiation.transitionRequested();

update(negotiation, l -> l.preRequested(negotiation));
observable.invokeForEach(l -> l.requested(negotiation));

monitor.debug(String.format("[Provider] ContractNegotiation initiated. %s is now in state %s.",
negotiation.getId(), ContractNegotiationStates.from(negotiation.getState())));
Expand Down Expand Up @@ -228,7 +229,7 @@ private StatusResult<ContractNegotiation> processIncomingOffer(ContractNegotiati
negotiation.addContractOffer(offer); // TODO persist unchecked offer of consumer?

if (result.failed()) {
monitor.debug("[Provider] Contract offer received. Will be rejected.");
monitor.debug("[Provider] Contract offer received. Will be rejected: " + result.getFailureDetail());
negotiation.setErrorDetail(result.getFailureMessages().get(0));
negotiation.transitionDeclining();
update(negotiation, l -> l.preDeclining(negotiation));
Expand Down Expand Up @@ -385,6 +386,7 @@ private BiConsumer<Object, Throwable> onCounterOfferSent(String negotiationId) {
.onSuccess(negotiation -> {
negotiation.transitionOffered();
update(negotiation, l -> l.preProviderOffered(negotiation));
observable.invokeForEach(l -> l.offered(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionOffering();
Expand All @@ -399,6 +401,7 @@ private BiConsumer<Object, Throwable> onRejectionSent(String negotiationId) {
.onSuccess(negotiation -> {
negotiation.transitionDeclined();
update(negotiation, l -> l.preDeclined(negotiation));
observable.invokeForEach(l -> l.declined(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionDeclining();
Expand All @@ -414,6 +417,7 @@ private BiConsumer<Object, Throwable> onAgreementSent(String id, ContractAgreeme
negotiation.setContractAgreement(agreement);
negotiation.transitionConfirmed();
update(negotiation, l -> l.preConfirmed(negotiation));
observable.invokeForEach(l -> l.confirmed(negotiation));
})
.onFailure(negotiation -> {
negotiation.transitionConfirming();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ public Result<ContractOffer> validate(ClaimToken token, ContractOffer offer) {
var agent = agentService.createFor(token);
var contractDefinition = contractDefinitionService.definitionFor(agent, contractIdTokens[DEFINITION_PART]);
if (contractDefinition == null) {
return Result.failure("Invalid contract.");
return Result.failure("The ContractDefinition with id %s either does not exist or the access to it is not granted.");
}

// take asset from definition and index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import org.eclipse.dataspaceconnector.policy.model.PolicyType;
import org.eclipse.dataspaceconnector.spi.command.CommandQueue;
import org.eclipse.dataspaceconnector.spi.command.CommandRunner;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.observe.ContractNegotiationListener;
import org.eclipse.dataspaceconnector.spi.contract.negotiation.observe.ContractNegotiationObservable;
import org.eclipse.dataspaceconnector.spi.contract.validation.ContractValidationService;
import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
Expand All @@ -51,7 +50,6 @@
import java.util.ArrayList;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;

import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.mock;
Expand All @@ -75,8 +73,6 @@ public abstract class AbstractContractNegotiationIntegrationTest {

protected ClaimToken token;

protected CountDownLatch countDownLatch;

/**
* Prepares the test setup
*/
Expand Down Expand Up @@ -124,7 +120,6 @@ void setUp() {
.sendRetryManager(sendRetryManager)
.build();

countDownLatch = new CountDownLatch(2);
}

/**
Expand Down Expand Up @@ -203,42 +198,6 @@ protected ContractOffer getConsumerCounterOffer() {
.build();
}

/**
* Implementation of the ContractNegotiationListener that signals a CountDownLatch when the confirmed state has been
* reached.
*/
protected static class ConfirmedContractNegotiationListener implements ContractNegotiationListener {

private final CountDownLatch countDownLatch;

public ConfirmedContractNegotiationListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void preConfirmed(ContractNegotiation negotiation) {
countDownLatch.countDown();
}
}

/**
* Implementation of the ContractNegotiationListener that signals a CountDownLatch when the declined state has been
* reached.
*/
protected static class DeclinedContractNegotiationListener implements ContractNegotiationListener {

private final CountDownLatch countDownLatch;

public DeclinedContractNegotiationListener(CountDownLatch countDownLatch) {
this.countDownLatch = countDownLatch;
}

@Override
public void preDeclined(ContractNegotiation negotiation) {
countDownLatch.countDown();
}
}

/**
* Implementation of the RemoteMessageDispatcherRegistry for the provider that delegates the requests to the
* consumer negotiation manager directly.
Expand Down
Loading