Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix: avoid endless loops on contract negotiation sending failure #1489

Merged
merged 4 commits into from
Jun 21, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ in the detailed section referring to by linking pull requests or issues.
* Fixed a dead link in contributor documentation (#1477)
* Fix usage of `NAME` property in `HttpDataSourceFactory` (#1460)
* Fix clearing Loaders in the FCC (#1495)
* Avoid endless loops in `ContractNegotiationManager` (#1487)

## [milestone-4] - 2022-06-07

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,37 +12,35 @@
*
*/

package org.eclipse.dataspaceconnector.transfer.core.transfer;
package org.eclipse.dataspaceconnector.common.statemachine.retry;

import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.retry.WaitStrategy;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcess;

import java.time.Clock;
import java.util.function.Supplier;

import static java.lang.String.format;

/**
* Service enabling a "long retry" mechanism when sending {@link TransferProcess}es across applications.
* Service enabling a "long retry" mechanism when sending entities across applications.
* The implementation supports a pluggable retry strategy (e.g. an exponential wait mechanism
* so as not to overflow the remote service when it becomes available again).
*/
public class TransferProcessSendRetryManager implements SendRetryManager<TransferProcess> {
public class EntitySendRetryManager implements SendRetryManager<StatefulEntity> {
private final Monitor monitor;
private final Supplier<WaitStrategy> delayStrategySupplier;
private final int retryLimit;
private final Clock clock;

public TransferProcessSendRetryManager(Monitor monitor, Supplier<WaitStrategy> delayStrategySupplier, Clock clock, int retryLimit) {
public EntitySendRetryManager(Monitor monitor, Supplier<WaitStrategy> delayStrategySupplier, Clock clock, int retryLimit) {
this.monitor = monitor;
this.delayStrategySupplier = delayStrategySupplier;
this.clock = clock;
this.retryLimit = retryLimit;
}

@Override
public boolean shouldDelay(TransferProcess process) {
public boolean shouldDelay(StatefulEntity process) {
int retryCount = process.getStateCount() - 1;
if (retryCount <= 0) {
return false;
Expand All @@ -60,15 +58,17 @@ public boolean shouldDelay(TransferProcess process) {

long remainingWaitMillis = process.getStateTimestamp() + waitMillis - clock.millis();
if (remainingWaitMillis > 0) {
monitor.debug(format("Process %s transfer retry #%d will not be attempted before %d ms.", process.getId(), retryCount, remainingWaitMillis));
monitor.debug(String.format("Process %s %s retry #%d will not be attempted before %d ms.", process.getId(), process.getClass().getSimpleName(), retryCount, remainingWaitMillis));
return true;
} else {
monitor.debug(String.format("Process %s %s retry #%d of %d.", process.getId(), process.getClass().getSimpleName(), retryCount, retryLimit));
return false;
}
monitor.debug(format("Process %s transfer retry #%d of %d.", process.getId(), retryCount, retryLimit));
return false;

}

@Override
public boolean retriesExhausted(TransferProcess entity) {
public boolean retriesExhausted(StatefulEntity entity) {
return entity.getStateCount() > retryLimit;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
*
*/

package org.eclipse.dataspaceconnector.transfer.core.transfer;
package org.eclipse.dataspaceconnector.common.statemachine.retry;

/**
* Service enabling a "long retry" mechanism when sending entities across applications.
Expand All @@ -21,7 +21,7 @@
*
* @param <T> entity type
*/
interface SendRetryManager<T> {
public interface SendRetryManager<T> {
/**
* Determines whether the given entity may be sent at this time, or the system
* should wait and send the entity later.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - initial API and implementation
*
*/

package org.eclipse.dataspaceconnector.common.statemachine.retry;

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.databind.annotation.JsonPOJOBuilder;
import com.github.javafaker.Faker;
import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.retry.WaitStrategy;
import org.junit.jupiter.api.extension.ExtensionContext;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.ArgumentsProvider;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.junit.jupiter.params.provider.ValueSource;

import java.time.Clock;
import java.util.function.Supplier;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.params.provider.Arguments.arguments;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

class EntitySendRetryManagerTest {

private final Faker faker = new Faker();

private final Monitor monitor = mock(Monitor.class);
private final WaitStrategy delayStrategy = mock(WaitStrategy.class);
private final int sendRetryLimit = faker.number().numberBetween(5, 10);

private final Clock clock = mock(Clock.class);
private final Supplier<WaitStrategy> waitStrategy = () -> delayStrategy;
private final EntitySendRetryManager sendRetryManager = new EntitySendRetryManager(monitor, waitStrategy, clock, sendRetryLimit);

@ParameterizedTest
@ArgumentsSource(DelayArgs.class)
void shouldDelay(long stateTimestamp, long currentTime, long retryDelay, boolean shouldDelay) {
var stateCount = sendRetryLimit - 2;
var entity = TestEntity.Builder.newInstance()
.id("any")
.stateCount(stateCount)
.stateTimestamp(stateTimestamp)
.build();

when(delayStrategy.retryInMillis())
.thenAnswer(i -> {
verify(delayStrategy).failures(stateCount - 1);
return retryDelay;
})
.thenThrow(new RuntimeException("should call only once"));

when(clock.millis()).thenReturn(currentTime);

assertThat(sendRetryManager.shouldDelay(entity)).isEqualTo(shouldDelay);
}

@ParameterizedTest
@ValueSource(ints = {-2, -1, 0, 1, 2})
void retriesExhausted(int retriesLeft) {
var stateCount = sendRetryLimit - retriesLeft;
var stateTimestamp = faker.number().randomNumber();
var process = TestEntity.Builder.newInstance()
.id("any")
.stateCount(stateCount)
.stateTimestamp(stateTimestamp)
.build();

var expected = retriesLeft < 0;
assertThat(sendRetryManager.retriesExhausted(process)).isEqualTo(expected);
}

private static class DelayArgs implements ArgumentsProvider {

@Override
public Stream<? extends Arguments> provideArguments(ExtensionContext context) {
return Stream.of(
arguments(0, 0, 0, false),
arguments(0, 10, 9, false),
arguments(0, 9, 10, true),
arguments(2, 10, 9, true),
arguments(2, 12, 9, false)
);
}
}

private static class TestEntity extends StatefulEntity {
@JsonPOJOBuilder(withPrefix = "")
public static class Builder extends StatefulEntity.Builder<TestEntity, Builder> {

private Builder(TestEntity entity) {
super(entity);
}

@JsonCreator
public static Builder newInstance() {
return new Builder(new TestEntity());
}

@Override
public void validate() {

}

}
}
}
15 changes: 8 additions & 7 deletions core/contract/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,11 @@

## Configuration

* `edc.negotiation.consumer.state-machine.batch-size`
* the size of the batch of entity fetched for every consumer `ContractNegotiation` state machine iteration.
* _Default value_: 5
* `edc.negotiation.consumer.state-machine.batch-size`
* the size of the batch of entity fetched for every provider `ContractNegotiation` state machine iteration.
* _Default value_: 5

| Parameter name | Description | Mandatory | Default value |
|-----------------------------------------------------|-----------------------------------------------------------------------------------------------------------|-----------|---------------|
| `edc.negotiation.consumer.state-machine.batch-size` | the size of the batch of entity fetched for every consumer `ContractNegotiation` state machine iteration. | false | 5 |
| `edc.negotiation.provider.state-machine.batch-size` | the size of the batch of entity fetched for every provider `ContractNegotiation` state machine iteration. | false | 5 |
| `edc.negotiation.consumer.send.retry.limit` | the limit of retries in case of consumer `ContractNegotiation` sending failure. | false | 7 |
| `edc.negotiation.provider.send.retry.limit` | the limit of retries in case of provider `ContractNegotiation` sending failure. | false | 7 |
| `edc.negotiation.consumer.send.retry.base-delay.ms` | the base ms delay value for consumer `ContractNegotiation` sending retrial. | false | 100 |
| `edc.negotiation.provider.send.retry.base-delay.ms` | the base ms delay value for consumer `ContractNegotiation` sending retrial. | false | 100 |
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@

package org.eclipse.dataspaceconnector.contract;

import org.eclipse.dataspaceconnector.common.statemachine.retry.EntitySendRetryManager;
import org.eclipse.dataspaceconnector.common.statemachine.retry.SendRetryManager;
import org.eclipse.dataspaceconnector.contract.negotiation.ConsumerContractNegotiationManagerImpl;
import org.eclipse.dataspaceconnector.contract.negotiation.ProviderContractNegotiationManagerImpl;
import org.eclipse.dataspaceconnector.contract.observe.ContractNegotiationObservableImpl;
Expand All @@ -40,6 +42,7 @@
import org.eclipse.dataspaceconnector.spi.contract.offer.ContractOfferService;
import org.eclipse.dataspaceconnector.spi.contract.offer.store.ContractDefinitionStore;
import org.eclipse.dataspaceconnector.spi.contract.validation.ContractValidationService;
import org.eclipse.dataspaceconnector.spi.entity.StatefulEntity;
import org.eclipse.dataspaceconnector.spi.message.RemoteMessageDispatcherRegistry;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.policy.PolicyEngine;
Expand All @@ -55,6 +58,7 @@
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.ContractNegotiation;
import org.eclipse.dataspaceconnector.spi.types.domain.contract.negotiation.command.ContractNegotiationCommand;
import org.jetbrains.annotations.NotNull;

import java.time.Clock;

Expand All @@ -70,6 +74,15 @@ public class ContractServiceExtension implements ServiceExtension {
private static final String NEGOTIATION_CONSUMER_STATE_MACHINE_BATCH_SIZE = "edc.negotiation.consumer.state-machine.batch-size";
@EdcSetting
private static final String NEGOTIATION_PROVIDER_STATE_MACHINE_BATCH_SIZE = "edc.negotiation.provider.state-machine.batch-size";
@EdcSetting
private static final String NEGOTIATION_CONSUMER_SEND_RETRY_LIMIT = "edc.negotiation.consumer.send.retry.limit";
@EdcSetting
private static final String NEGOTIATION_PROVIDER_SEND_RETRY_LIMIT = "edc.negotiation.provider.send.retry.limit";
@EdcSetting
private static final String NEGOTIATION_CONSUMER_SEND_RETRY_BASE_DELAY_MS = "edc.negotiation.consumer.send.retry.base-delay.ms";
@EdcSetting
private static final String NEGOTIATION_PROVIDER_SEND_RETRY_BASE_DELAY_MS = "edc.negotiation.provider.send.retry.base-delay.ms";

private ConsumerContractNegotiationManagerImpl consumerNegotiationManager;
private ProviderContractNegotiationManagerImpl providerNegotiationManager;
@Inject
Expand Down Expand Up @@ -150,7 +163,6 @@ private void registerServices(ServiceExtensionContext context) {

var observable = new ContractNegotiationObservableImpl();
context.registerService(ContractNegotiationObservable.class, observable);

context.registerService(PolicyArchive.class, new PolicyArchiveImpl(store));

consumerNegotiationManager = ConsumerContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -167,6 +179,7 @@ private void registerServices(ServiceExtensionContext context) {
.store(store)
.policyStore(policyStore)
.batchSize(context.getSetting(NEGOTIATION_CONSUMER_STATE_MACHINE_BATCH_SIZE, 5))
.sendRetryManager(consumerSendRetryManager(context))
.build();

providerNegotiationManager = ProviderContractNegotiationManagerImpl.Builder.newInstance()
Expand All @@ -183,12 +196,26 @@ private void registerServices(ServiceExtensionContext context) {
.store(store)
.policyStore(policyStore)
.batchSize(context.getSetting(NEGOTIATION_PROVIDER_STATE_MACHINE_BATCH_SIZE, 5))
.sendRetryManager(providerSendRetryManager(context))
.build();

context.registerService(ConsumerContractNegotiationManager.class, consumerNegotiationManager);
context.registerService(ProviderContractNegotiationManager.class, providerNegotiationManager);
}

private SendRetryManager<StatefulEntity> providerSendRetryManager(ServiceExtensionContext context) {
var retryLimit = context.getSetting(NEGOTIATION_PROVIDER_SEND_RETRY_LIMIT, 7);
var retryBaseDelay = context.getSetting(NEGOTIATION_PROVIDER_SEND_RETRY_BASE_DELAY_MS, 100L);
return new EntitySendRetryManager(monitor, () -> new ExponentialWaitStrategy(retryBaseDelay), clock, retryLimit);
}

@NotNull
private EntitySendRetryManager consumerSendRetryManager(ServiceExtensionContext context) {
var retryLimit = context.getSetting(NEGOTIATION_CONSUMER_SEND_RETRY_LIMIT, 7);
var retryBaseDelay = context.getSetting(NEGOTIATION_CONSUMER_SEND_RETRY_BASE_DELAY_MS, 100L);
return new EntitySendRetryManager(monitor, () -> new ExponentialWaitStrategy(retryBaseDelay), clock, retryLimit);
}

private void registerTypes(ServiceExtensionContext context) {
var typeManager = context.getTypeManager();
typeManager.registerTypes(ContractNegotiation.class);
Expand Down
Loading