Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Core: shared clock #1416

Merged
merged 17 commits into from
Jun 9, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@ in the detailed section referring to by linking pull requests or issues.
* Verify OpenAPI definitions (#1312)
* Documentation for CosmosDB (#1334)
* Add validation to contract definition id (#1347)
* Shared clock service (#1416)

#### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.eclipse.dataspaceconnector.spi.types.TypeManager;

import java.security.PrivateKey;
import java.time.Clock;
import java.time.Duration;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ExecutorService;
Expand All @@ -65,6 +66,7 @@
HealthCheckService.class,
Monitor.class,
TypeManager.class,
Clock.class,
Telemetry.class
})
public class CoreServicesExtension implements ServiceExtension {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.types.TypeManager;

import java.time.Clock;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -47,28 +48,14 @@ public DefaultServiceExtensionContext(TypeManager typeManager, Monitor monitor,
registerService(TypeManager.class, typeManager);
registerService(Monitor.class, monitor);
registerService(Telemetry.class, telemetry);
registerService(Clock.class, Clock.systemUTC());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Unless there is a good reason to the contrary we should provide the clock with a @Provider(isDefault=true)-annotated factory method in the DefaultServicesExtension.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reasons for this are:

  • to provide a utility method ServiceExtensionContext#getClock so that extensions don't need to inject a clock service. If we move the default clock to DefaultServicesExtension, we can't have such a utility method, since we need the dependency resolver to start extensions in the right order.
  • that the clock is a common service just like TypeManager, Monitor and Telemetry, and there's no obvious reason to manage it differently

}

@Override
public String getConnectorId() {
return connectorId;
}

@Override
public Monitor getMonitor() {
return getService(Monitor.class);
}

@Override
public Telemetry getTelemetry() {
return getService(Telemetry.class);
}

@Override
public TypeManager getTypeManager() {
return getService(TypeManager.class);
}

@Override
public <T> boolean hasService(Class<T> type) {
return services.containsKey(type);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,9 +52,12 @@
import org.eclipse.dataspaceconnector.spi.system.Provides;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtension;
import org.eclipse.dataspaceconnector.spi.system.ServiceExtensionContext;
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.ContractNegotiation;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.command.ContractNegotiationCommand;

import java.time.Clock;

@Provides({
ContractOfferService.class, ContractValidationService.class, ConsumerContractNegotiationManager.class,
PolicyArchive.class, ProviderContractNegotiationManager.class
Expand All @@ -67,7 +70,6 @@ public class ContractServiceExtension implements ServiceExtension {
private static final String NEGOTIATION_CONSUMER_STATE_MACHINE_BATCH_SIZE = "edc.negotiation.consumer.state-machine.batch-size";
@EdcSetting
private static final String NEGOTIATION_PROVIDER_STATE_MACHINE_BATCH_SIZE = "edc.negotiation.provider.state-machine.batch-size";
private Monitor monitor;
private ConsumerContractNegotiationManagerImpl consumerNegotiationManager;
private ProviderContractNegotiationManagerImpl providerNegotiationManager;
@Inject
Expand All @@ -94,15 +96,22 @@ public class ContractServiceExtension implements ServiceExtension {
@Inject
private PolicyDefinitionStore policyStore;

@Inject
private Monitor monitor;

@Inject
private Telemetry telemetry;

@Inject
private Clock clock;

@Override
public String name() {
return "Core Contract Service";
}

@Override
public void initialize(ServiceExtensionContext context) {
monitor = context.getMonitor();

registerTypes(context);
registerServices(context);
}
Expand Down Expand Up @@ -131,15 +140,14 @@ private void registerServices(ServiceExtensionContext context) {
var contractOfferService = new ContractOfferServiceImpl(agentService, definitionService, assetIndex, policyStore);
context.registerService(ContractOfferService.class, contractOfferService);

var validationService = new ContractValidationServiceImpl(agentService, definitionService, assetIndex, policyStore);
var validationService = new ContractValidationServiceImpl(agentService, definitionService, assetIndex, policyStore, clock);
context.registerService(ContractValidationService.class, validationService);

var waitStrategy = context.hasService(NegotiationWaitStrategy.class) ? context.getService(NegotiationWaitStrategy.class) : new ExponentialWaitStrategy(DEFAULT_ITERATION_WAIT);

CommandQueue<ContractNegotiationCommand> commandQueue = new BoundedCommandQueue<>(10);
CommandRunner<ContractNegotiationCommand> commandRunner = new CommandRunner<>(commandHandlerRegistry, monitor);

var telemetry = context.getTelemetry();
var observable = new ContractNegotiationObservableImpl();
context.registerService(ContractNegotiationObservable.class, observable);

Expand All @@ -153,6 +161,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.clock(clock)
.telemetry(telemetry)
.executorInstrumentation(context.getService(ExecutorInstrumentation.class))
.store(store)
Expand All @@ -168,6 +177,7 @@ private void registerServices(ServiceExtensionContext context) {
.commandQueue(commandQueue)
.commandRunner(commandRunner)
.observable(observable)
.clock(clock)
.telemetry(telemetry)
.executorInstrumentation(context.getService(ExecutorInstrumentation.class))
.store(store)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.command.ContractNegotiationCommand;

import java.time.Clock;
import java.util.Objects;

public abstract class AbstractContractNegotiationManager {
Expand All @@ -40,6 +41,7 @@ public abstract class AbstractContractNegotiationManager {
protected CommandRunner<ContractNegotiationCommand> commandRunner;
protected CommandProcessor<ContractNegotiationCommand> commandProcessor;
protected Monitor monitor;
protected Clock clock;
protected Telemetry telemetry;
protected ExecutorInstrumentation executorInstrumentation;
protected int batchSize = 5;
Expand All @@ -52,6 +54,7 @@ public static class Builder<T extends AbstractContractNegotiationManager> {

protected Builder(T manager) {
this.manager = manager;
this.manager.clock = Clock.systemUTC(); // default implementation
this.manager.telemetry = new Telemetry(); // default noop implementation
this.manager.executorInstrumentation = ExecutorInstrumentation.noop(); // default noop implementation
}
Expand Down Expand Up @@ -91,6 +94,11 @@ public Builder<T> commandRunner(CommandRunner<ContractNegotiationCommand> comman
return this;
}

public Builder<T> clock(Clock clock) {
manager.clock = clock;
return this;
}

public Builder<T> telemetry(Telemetry telemetry) {
manager.telemetry = telemetry;
return this;
Expand Down Expand Up @@ -123,6 +131,7 @@ public T build() {
Objects.requireNonNull(manager.commandQueue, "commandQueue");
Objects.requireNonNull(manager.commandRunner, "commandRunner");
Objects.requireNonNull(manager.observable, "observable");
Objects.requireNonNull(manager.clock, "clock");
Objects.requireNonNull(manager.telemetry, "telemetry");
Objects.requireNonNull(manager.executorInstrumentation, "executorInstrumentation");
Objects.requireNonNull(manager.negotiationStore, "store");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractOffer;
import org.jetbrains.annotations.NotNull;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Objects;
import java.util.UUID;
Expand Down Expand Up @@ -393,9 +392,9 @@ private boolean processConsumerApproving(ContractNegotiation negotiation) {
var policy = lastOffer.getPolicy();
var agreement = ContractAgreement.Builder.newInstance()
.id(ContractId.createContractId(definitionId))
.contractStartDate(Instant.now().getEpochSecond())
.contractEndDate(Instant.now().plus(365, ChronoUnit.DAYS).getEpochSecond()) // TODO Make configurable (issue #722)
.contractSigningDate(Instant.now().getEpochSecond())
.contractStartDate(clock.instant().getEpochSecond())
.contractEndDate(clock.instant().plus(365, ChronoUnit.DAYS).getEpochSecond()) // TODO Make configurable (issue #722)
.contractSigningDate(clock.instant().getEpochSecond())
.providerAgentId(String.valueOf(lastOffer.getProvider()))
.consumerAgentId(String.valueOf(lastOffer.getConsumer()))
.policy(policy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractOffer;
import org.jetbrains.annotations.NotNull;

import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.UUID;
import java.util.function.BiConsumer;
Expand Down Expand Up @@ -383,9 +382,9 @@ private boolean processConfirming(ContractNegotiation negotiation) {
//TODO move to own service
agreement = ContractAgreement.Builder.newInstance()
.id(ContractId.createContractId(definitionId))
.contractStartDate(Instant.now().getEpochSecond())
.contractEndDate(Instant.now().plus(365, ChronoUnit.DAYS).getEpochSecond()) // TODO Make configurable (issue #722)
.contractSigningDate(Instant.now().getEpochSecond())
.contractStartDate(clock.instant().getEpochSecond())
.contractEndDate(clock.instant().plus(365, ChronoUnit.DAYS).getEpochSecond()) // TODO Make configurable (issue #722)
.contractSigningDate(clock.instant().getEpochSecond())
.providerAgentId(String.valueOf(lastOffer.getProvider()))
.consumerAgentId(String.valueOf(lastOffer.getConsumer()))
.policy(policy)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import org.eclipse.dataspaceconnector.spi.types.domain.contract.offer.ContractOffer;
import org.jetbrains.annotations.NotNull;

import java.time.Instant;
import java.time.Clock;
import java.util.ArrayList;

import static java.lang.String.format;
Expand All @@ -47,12 +47,14 @@ public class ContractValidationServiceImpl implements ContractValidationService
private final ContractDefinitionService contractDefinitionService;
private final AssetIndex assetIndex;
private final PolicyDefinitionStore policyStore;
private final Clock clock;

public ContractValidationServiceImpl(ParticipantAgentService agentService, ContractDefinitionService contractDefinitionService, AssetIndex assetIndex, PolicyDefinitionStore policyStore) {
public ContractValidationServiceImpl(ParticipantAgentService agentService, ContractDefinitionService contractDefinitionService, AssetIndex assetIndex, PolicyDefinitionStore policyStore, Clock clock) {
this.agentService = agentService;
this.contractDefinitionService = contractDefinitionService;
this.assetIndex = assetIndex;
this.policyStore = policyStore;
this.clock = clock;
}

@Override
Expand Down Expand Up @@ -143,11 +145,11 @@ private ArrayList<Criterion> createCriteria(ContractOffer offer, ContractDefinit
}

private boolean isExpired(ContractAgreement contractAgreement) {
return contractAgreement.getContractEndDate() < Instant.now().getEpochSecond();
return contractAgreement.getContractEndDate() * 1000L < clock.millis();
}

private boolean isStarted(ContractAgreement contractAgreement) {
return contractAgreement.getContractStartDate() <= Instant.now().getEpochSecond();
return contractAgreement.getContractStartDate() * 1000L <= clock.millis();
}

private boolean isMandatoryAttributeMissing(ContractOffer offer) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

package org.eclipse.dataspaceconnector.contract.validation;

import com.github.javafaker.Faker;
import org.eclipse.dataspaceconnector.policy.model.Policy;
import org.eclipse.dataspaceconnector.policy.model.PolicyDefinition;
import org.eclipse.dataspaceconnector.spi.agent.ParticipantAgent;
Expand All @@ -34,13 +35,17 @@
import org.junit.jupiter.api.Test;

import java.net.URI;
import java.time.Clock;
import java.time.Instant;
import java.time.temporal.ChronoUnit;
import java.util.Date;
import java.util.UUID;
import java.util.stream.Stream;

import static java.time.Instant.EPOCH;
import static java.time.Instant.MAX;
import static java.time.Instant.MIN;
import static java.time.ZoneOffset.UTC;
import static java.util.Collections.emptyMap;
import static org.assertj.core.api.Assertions.assertThat;
import static org.mockito.ArgumentMatchers.any;
Expand All @@ -52,15 +57,19 @@

class ContractValidationServiceImplTest {

private static final Faker FAKER = new Faker();
private final Instant now = Instant.now();

private final ParticipantAgentService agentService = mock(ParticipantAgentService.class);
private final ContractDefinitionService definitionService = mock(ContractDefinitionService.class);
private final AssetIndex assetIndex = mock(AssetIndex.class);
private final PolicyDefinitionStore policyStore = mock(PolicyDefinitionStore.class);
private final Clock clock = Clock.fixed(now, UTC);
private ContractValidationServiceImpl validationService;

@BeforeEach
void setUp() {
validationService = new ContractValidationServiceImpl(agentService, definitionService, assetIndex, policyStore);
validationService = new ContractValidationServiceImpl(agentService, definitionService, assetIndex, policyStore, clock);
}

@Test
Expand Down Expand Up @@ -120,9 +129,9 @@ void verifyContractAgreementValidation() {
.consumerAgentId("consumer")
.policy(Policy.Builder.newInstance().build())
.assetId(UUID.randomUUID().toString())
.contractStartDate(Instant.now().getEpochSecond())
.contractEndDate(Instant.now().plus(1, ChronoUnit.DAYS).getEpochSecond())
.contractSigningDate(Instant.now().getEpochSecond())
.contractStartDate(now.getEpochSecond())
.contractEndDate(now.plus(1, ChronoUnit.DAYS).getEpochSecond())
.contractSigningDate(now.getEpochSecond())
.id("1:2").build();

assertThat(validationService.validate(claimToken, agreement)).isTrue();
Expand All @@ -132,8 +141,9 @@ void verifyContractAgreementValidation() {

@Test
void verifyContractAgreementExpired() {
var past = FAKER.date().between(Date.from(EPOCH), Date.from(now)).toInstant().getEpochSecond();
var isValid =
validateAgreementDate(MIN.getEpochSecond(), MIN.getEpochSecond(), Instant.now().getEpochSecond() - 1);
validateAgreementDate(MIN.getEpochSecond(), MIN.getEpochSecond(), past);

assertThat(isValid).isFalse();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@
import org.eclipse.dataspaceconnector.transfer.core.transfer.TransferProcessManagerImpl;
import org.eclipse.dataspaceconnector.transfer.core.transfer.TransferProcessSendRetryManager;

import java.time.Clock;

/**
* Provides core data transfer services to the system.
*/
Expand Down Expand Up @@ -141,7 +143,8 @@ public void initialize(ServiceExtensionContext context) {

var retryLimit = context.getSetting(TRANSFER_SEND_RETRY_LIMIT, 7);
var retryBaseDelay = context.getSetting(TRANSFER_SEND_RETRY_BASE_DELAY_MS, 100L);
var sendRetryManager = new TransferProcessSendRetryManager(monitor, () -> new ExponentialWaitStrategy(retryBaseDelay), retryLimit);
Clock clock = context.getClock();
var sendRetryManager = new TransferProcessSendRetryManager(monitor, () -> new ExponentialWaitStrategy(retryBaseDelay), clock, retryLimit);

processManager = TransferProcessManagerImpl.Builder.newInstance()
.waitStrategy(waitStrategy)
Expand All @@ -154,6 +157,7 @@ public void initialize(ServiceExtensionContext context) {
.telemetry(telemetry)
.executorInstrumentation(context.getService(ExecutorInstrumentation.class))
.vault(vault)
.clock(clock)
.typeManager(typeManager)
.commandQueue(commandQueue)
.commandRunner(new CommandRunner<>(registry, monitor))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ public class TransferProcessManagerImpl implements TransferProcessManager, Provi
private DataAddressResolver addressResolver;
private PolicyArchive policyArchive;
private SendRetryManager<TransferProcess> sendRetryManager;
protected Clock clock = Clock.systemUTC();
private Clock clock;

private TransferProcessManagerImpl() {
}
Expand Down Expand Up @@ -712,6 +712,11 @@ public Builder executorInstrumentation(ExecutorInstrumentation executorInstrumen
return this;
}

public Builder clock(Clock clock) {
manager.clock = clock;
return this;
}

public Builder vault(Vault vault) {
manager.vault = vault;
return this;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,11 +32,12 @@ public class TransferProcessSendRetryManager implements SendRetryManager<Transfe
private final Monitor monitor;
private final Supplier<WaitStrategy> delayStrategySupplier;
private final int retryLimit;
protected Clock clock = Clock.systemUTC();
private final Clock clock;

public TransferProcessSendRetryManager(Monitor monitor, Supplier<WaitStrategy> delayStrategySupplier, int retryLimit) {
public TransferProcessSendRetryManager(Monitor monitor, Supplier<WaitStrategy> delayStrategySupplier, Clock clock, int retryLimit) {
this.monitor = monitor;
this.delayStrategySupplier = delayStrategySupplier;
this.clock = clock;
this.retryLimit = retryLimit;
}

Expand Down
Loading