Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: FCC node directory is queried on every plan execution #1243

Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public class FederatedCatalogCacheExtension implements ServiceExtension {
private HealthCheckService healthCheckService;
@Inject
private RemoteMessageDispatcherRegistry dispatcherRegistry;
// protocol registry - must be supplied by another extension
// get all known nodes from node directory - must be supplied by another extension
@Inject
private FederatedCacheNodeDirectory directory;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,15 +63,15 @@ public PartitionManagerImpl(Monitor monitor, WorkItemQueue workQueue, Function<W

@Override
public void schedule(ExecutionPlan executionPlan) {
//todo: should we really discard updates?
var currentList = workloadSource.get();
executionPlan.run(() -> {
// obtain latest node directory contents before scheduling the work
var currentList = workloadSource.get();
monitor.debug("Partition manager: execute plan - waiting for queue lock");
workQueue.lock();
monitor.debug("Partition manager: execute plan - adding workload " + currentList.size());
workQueue.addAll(currentList);
monitor.debug("Partition manager: execute release queue lock");
workQueue.unlock();
monitor.debug("Partition manager: execute release queue lock");
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.eclipse.dataspaceconnector.catalog.spi.Crawler;
import org.eclipse.dataspaceconnector.catalog.spi.WorkItem;
import org.eclipse.dataspaceconnector.catalog.spi.WorkItemQueue;
import org.eclipse.dataspaceconnector.catalog.spi.model.ExecutionPlan;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.DisplayName;
Expand All @@ -26,20 +27,22 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.function.Supplier;

import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.dataspaceconnector.catalog.cache.TestUtil.createWorkItem;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class PartitionManagerImplTest {

private PartitionManagerImpl partitionManager;

private final Monitor monitorMock = mock(Monitor.class);
private final WorkItemQueue workItemQueueMock = mock(WorkItemQueue.class);
private PartitionManagerImpl partitionManager;
private List<WorkItem> staticWorkload;

@BeforeEach
Expand Down Expand Up @@ -76,4 +79,25 @@ void stop_allCrawlersJoinSuccessfully() throws InterruptedException {
partitionManager.stop();
}

@Test
void schedule_verifyNodeDirectoryGetsQueried() {
Supplier<List<WorkItem>> queueSourceMock = mock(Supplier.class);
when(queueSourceMock.get()).thenReturn(List.of(new WorkItem("http://some.url", "test-protocol")));
partitionManager = new PartitionManagerImpl(monitorMock, workItemQueueMock, workItems -> mock(Crawler.class), 5, queueSourceMock);

ExecutionPlan runMultiPlan = mock(ExecutionPlan.class);
doAnswer(invocation -> {
var runnable = (Runnable) invocation.getArgument(0);
runnable.run(); //run several times
runnable.run();
runnable.run();
return null;
}).when(runMultiPlan).run(any());

// schedule once, make sure multiple invocations happen
partitionManager.schedule(runMultiPlan);

verify(queueSourceMock, times(3)).get();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

import java.net.MalformedURLException;
import java.net.URL;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;

import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.dataspaceconnector.common.testfixtures.TestUtils.getFreePort;
import static org.eclipse.dataspaceconnector.common.testfixtures.TestUtils.testOkHttpClient;
import static org.eclipse.dataspaceconnector.spi.types.domain.transfer.TransferProcessStates.PROVISIONING;
import static org.eclipse.dataspaceconnector.transfer.provision.http.HttpProvisionerFixtures.PROVISIONER_CONFIG;
Expand All @@ -60,6 +62,7 @@ public class HttpProvisionerExtensionEndToEndTest {
private static final String ASSET_ID = "1";
private static final String CONTRACT_ID = "2";
private static final String POLICY_ID = "3";
private final int dataPort = getFreePort();

private Interceptor delegate;

Expand Down Expand Up @@ -96,6 +99,10 @@ void processProviderRequestRetry(TransferProcessManager processManager,

@BeforeEach
void setup(EdcExtension extension) {
extension.setConfiguration(Map.of(
"web.http.data.port", String.valueOf(dataPort),
"web.http.data.path", "/api/v1/data"
));
delegate = mock(Interceptor.class);
var httpClient = testOkHttpClient().newBuilder().addInterceptor(delegate).build();

Expand Down