Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Observability: Trace context propagation within DPF #1162

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ in the detailed section referring to by linking pull requests or issues.
* Http Deprovisioner Webhook endpoint (#1039)
* Add performance test example and scheduled workflow (#1029)
* Add basic authentication mechanism for DataManagement API (#981)
* Trace context propagation in DPF (#1162)

#### Changed

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ plugins {
val httpMockServer: String by project
val jodahFailsafeVersion: String by project
val okHttpVersion: String by project
val openTelemetryVersion: String by project
val faker: String by project

dependencies {
Expand All @@ -30,6 +31,7 @@ dependencies {

implementation("net.jodah:failsafe:${jodahFailsafeVersion}")
implementation("com.squareup.okhttp3:okhttp:${okHttpVersion}")
implementation("io.opentelemetry:opentelemetry-extension-annotations:${openTelemetryVersion}")

testImplementation("org.mock-server:mockserver-netty:${httpMockServer}:shaded")
testImplementation("org.mock-server:mockserver-client-java:${httpMockServer}:shaded")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.transfer.dataplane.client;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.dataplane.spi.manager.DataPlaneManager;
import org.eclipse.dataspaceconnector.spi.response.ResponseStatus;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
Expand All @@ -31,6 +32,7 @@ public EmbeddedDataPlaneTransferClient(DataPlaneManager dataPlaneManager) {
this.dataPlaneManager = dataPlaneManager;
}

@WithSpan
@Override
public StatusResult<Void> transfer(DataFlowRequest request) {
var result = dataPlaneManager.validate(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.opentelemetry.extension.annotations.WithSpan;
import net.jodah.failsafe.Failsafe;
import net.jodah.failsafe.RetryPolicy;
import okhttp3.MediaType;
Expand Down Expand Up @@ -57,6 +58,7 @@ public RemoteDataPlaneTransferClient(OkHttpClient client, DataPlaneSelectorClien
this.mapper = mapper;
}

@WithSpan
@Override
public StatusResult<Void> transfer(DataFlowRequest request) {
var instance = selectorClient.find(request.getSourceDataAddress(), request.getDestinationDataAddress(), selectorStrategy);
Expand Down
4 changes: 4 additions & 0 deletions extensions/data-plane/data-plane-framework/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,17 @@
*
*/

val openTelemetryVersion: String by project

plugins {
`java-library`
}

dependencies {
api(project(":spi:core-spi"))
api(project(":extensions:data-plane:data-plane-spi"))
implementation(project(":common:util"))
implementation("io.opentelemetry:opentelemetry-extension-annotations:${openTelemetryVersion}")
testImplementation(testFixtures(project(":launchers:junit")))
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,8 @@ public void initialize(ServiceExtensionContext context) {
context.registerService(DataTransferExecutorServiceContainer.class, executorContainer);

monitor = context.getMonitor();
var telemetry = context.getTelemetry();

var queueCapacity = context.getSetting(QUEUE_CAPACITY, DEFAULT_QUEUE_CAPACITY);
var workers = context.getSetting(WORKERS, DEFAULT_WORKERS);
var waitTimeout = context.getSetting(WAIT_TIMEOUT, DEFAULT_WAIT_TIMEOUT);
Expand All @@ -108,7 +110,9 @@ public void initialize(ServiceExtensionContext context) {
.pipelineService(pipelineService)
.transferServiceRegistry(transferServiceRegistry)
.store(new InMemoryDataPlaneStore(IN_MEMORY_STORE_CAPACITY))
.monitor(monitor).build();
.monitor(monitor)
.telemetry(telemetry)
.build();

context.registerService(DataPlaneManager.class, dataPlaneManager);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.result.Result;
import org.eclipse.dataspaceconnector.spi.system.ExecutorInstrumentation;
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.types.domain.transfer.DataFlowRequest;

import java.util.concurrent.ArrayBlockingQueue;
Expand All @@ -49,6 +50,7 @@ public class DataPlaneManagerImpl implements DataPlaneManager {
private PipelineService pipelineService;
private ExecutorInstrumentation executorInstrumentation;
private Monitor monitor;
private Telemetry telemetry;

private BlockingQueue<DataFlowRequest> queue;
private ExecutorService executorService;
Expand Down Expand Up @@ -89,8 +91,12 @@ public Result<Boolean> validate(DataFlowRequest dataRequest) {
}

public void initiateTransfer(DataFlowRequest dataRequest) {
queue.add(dataRequest);
store.received(dataRequest.getProcessId());
// store current trace context in entity for request traceability
DataFlowRequest dataRequestWithTraceContext = dataRequest.toBuilder()
.traceContext(telemetry.getCurrentTraceContext())
.build();
queue.add(dataRequestWithTraceContext);
store.received(dataRequestWithTraceContext.getProcessId());
}

@Override
Expand All @@ -116,21 +122,9 @@ private void run() {
if (request == null) {
continue;
}
final var polledRequest = request;
// propagate trace context for request into the current thread
telemetry.contextPropagationMiddleware(this::processDataFlowRequest).accept(request);

var transferService = transferServiceRegistry.resolveTransferService(polledRequest);
if (transferService == null) {
// Should not happen since resolving a transferService is part of payload validation
// TODO persist error details
store.completed(polledRequest.getProcessId());
} else {
transferService.transfer(request).whenComplete((result, exception) -> {
if (polledRequest.isTrackable()) {
// TODO persist TransferResult or error details
store.completed(polledRequest.getProcessId());
}
});
}
} catch (InterruptedException e) {
Thread.interrupted();
active.set(false);
Expand All @@ -147,6 +141,22 @@ private void run() {
}
}

private void processDataFlowRequest(DataFlowRequest request) {
var transferService = transferServiceRegistry.resolveTransferService(request);
if (transferService == null) {
// Should not happen since resolving a transferService is part of payload validation
// TODO persist error details
store.completed(request.getProcessId());
} else {
transferService.transfer(request).whenComplete((result, exception) -> {
if (request.isTrackable()) {
// TODO persist TransferResult or error details
store.completed(request.getProcessId());
}
});
}
}

public static class Builder {
private DataPlaneManagerImpl manager;

Expand Down Expand Up @@ -174,6 +184,11 @@ public Builder monitor(Monitor monitor) {
return this;
}

public Builder telemetry(Telemetry telemetry) {
manager.telemetry = telemetry;
return this;
}

public Builder queueCapacity(int capacity) {
manager.queueCapacity = capacity;
return this;
Expand All @@ -200,6 +215,7 @@ public DataPlaneManagerImpl build() {

private Builder() {
manager = new DataPlaneManagerImpl();
this.manager.telemetry = new Telemetry(); // default noop implementation
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.dataplane.framework.pipeline;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSink;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSinkFactory;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource;
Expand Down Expand Up @@ -72,6 +73,7 @@ public Result<Boolean> validate(DataFlowRequest request) {
return Result.success(true);
}

@WithSpan
@Override
public CompletableFuture<StatusResult<Void>> transfer(DataFlowRequest request) {
var sourceFactory = getSourceFactory(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.dataplane.framework.pipeline;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.PipelineService;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.TransferService;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
Expand Down Expand Up @@ -43,6 +44,7 @@ public Result<Boolean> validate(DataFlowRequest request) {
return pipelineService.validate(request);
}

@WithSpan
@Override
public CompletableFuture<StatusResult<Void>> transfer(DataFlowRequest request) {
return pipelineService.transfer(request);
Expand Down
2 changes: 2 additions & 0 deletions extensions/data-plane/data-plane-spi/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
*/

val mockitoVersion: String by project
val openTelemetryVersion: String by project

plugins {
`java-library`
Expand All @@ -21,6 +22,7 @@ plugins {
dependencies {
api(project(":spi:core-spi"))
implementation(project(":common:util"))
implementation("io.opentelemetry:opentelemetry-extension-annotations:${openTelemetryVersion}")
}


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,15 +14,20 @@

package org.eclipse.dataspaceconnector.dataplane.spi.pipeline;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.common.stream.PartitionIterator;
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.result.AbstractResult;
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.eclipse.dataspaceconnector.spi.telemetry.TraceCarrier;
import org.jetbrains.annotations.NotNull;

import java.util.List;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.function.Supplier;

import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.stream.Collectors.toList;
Expand All @@ -37,12 +42,16 @@ public abstract class ParallelSink implements DataSink {
protected int partitionSize = 5;
protected ExecutorService executorService;
protected Monitor monitor;
protected Telemetry telemetry;

@WithSpan
@Override
public CompletableFuture<StatusResult<Void>> transfer(DataSource source) {
try (var partStream = source.openPartStream()) {
var partitioned = PartitionIterator.streamOf(partStream, partitionSize);
var futures = partitioned.map(parts -> supplyAsync(() -> transferParts(parts), executorService)).collect(toList());
var traceCarrier = telemetry.getTraceCarrierWithCurrentContext();

var futures = partitioned.map(parts -> processPartsAsync(parts, traceCarrier)).collect(toList());
return futures.stream()
.collect(asyncAllOf())
.thenApply(results -> results.stream()
Expand All @@ -57,13 +66,20 @@ public CompletableFuture<StatusResult<Void>> transfer(DataSource source) {
}
}

@NotNull
private CompletableFuture<StatusResult<Void>> processPartsAsync(List<DataSource.Part> parts, TraceCarrier traceCarrier) {
Supplier<StatusResult<Void>> supplier = () -> transferParts(parts);
return supplyAsync(telemetry.contextPropagationMiddleware(supplier, traceCarrier), executorService);
}

protected abstract StatusResult<Void> transferParts(List<DataSource.Part> parts);

protected abstract static class Builder<B extends Builder<B, T>, T extends ParallelSink> {
protected T sink;

protected Builder(T sink) {
this.sink = sink;
this.sink.telemetry = new Telemetry(); // default noop implementation
}

public B requestId(String requestId) {
Expand All @@ -86,6 +102,11 @@ public B monitor(Monitor monitor) {
return self();
}

public B telemetry(Telemetry telemetry) {
sink.telemetry = telemetry;
return self();
}

public T build() {
Objects.requireNonNull(sink.requestId, "requestId");
Objects.requireNonNull(sink.executorService, "executorService");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.eclipse.dataspaceconnector.spi.monitor.Monitor;
import org.eclipse.dataspaceconnector.spi.response.ResponseStatus;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
import org.eclipse.dataspaceconnector.spi.telemetry.Telemetry;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;

Expand Down Expand Up @@ -48,6 +49,7 @@ class ParallelSinkTest {
void setup() {
fakeSink = new FakeParallelSink();
fakeSink.monitor = monitor;
fakeSink.telemetry = new Telemetry(); // default noop implementation
fakeSink.executorService = executor;
fakeSink.requestId = UUID.randomUUID().toString();
}
Expand Down
2 changes: 2 additions & 0 deletions samples/04.0-file-transfer/transfer-file/build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ plugins {
}

val rsApi: String by project
val openTelemetryVersion: String by project

dependencies {
api(project(":spi"))
Expand All @@ -28,6 +29,7 @@ dependencies {
implementation(project(":extensions:data-plane-selector:selector-core"))
implementation(project(":extensions:data-plane-selector:selector-store"))
implementation(project(":extensions:data-plane:data-plane-framework"))
implementation("io.opentelemetry:opentelemetry-extension-annotations:${openTelemetryVersion}")

implementation(project(":extensions:data-plane:data-plane-spi"))
implementation(project(":extensions:in-memory:assetindex-memory"))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

package org.eclipse.dataspaceconnector.extensions.api;

import io.opentelemetry.extension.annotations.WithSpan;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.DataSource;
import org.eclipse.dataspaceconnector.dataplane.spi.pipeline.ParallelSink;
import org.eclipse.dataspaceconnector.spi.response.StatusResult;
Expand All @@ -29,6 +30,7 @@
class FileTransferDataSink extends ParallelSink {
private File file;

@WithSpan
@Override
protected StatusResult<Void> transferParts(List<DataSource.Part> parts) {
for (DataSource.Part part : parts) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright (c) 2022 Microsoft Corporation
*
* This program and the accompanying materials are made available under the
* terms of the Apache License, Version 2.0 which is available at
* https://www.apache.org/licenses/LICENSE-2.0
*
* SPDX-License-Identifier: Apache-2.0
*
* Contributors:
* Microsoft Corporation - Initial implementation
*
*/

package org.eclipse.dataspaceconnector.spi.telemetry;

import java.util.Collections;
import java.util.Map;

/**
* Simple {@link TraceCarrier} to use in situations where no entity is persisted (e.g. asynchronous processing)
*/
class InMemoryTraceCarrier implements TraceCarrier {

private final Map<String, String> traceContext;

InMemoryTraceCarrier(Map<String, String> traceContext) {
this.traceContext = Collections.unmodifiableMap(traceContext);
}

@Override
public Map<String, String> getTraceContext() {
return traceContext;
}
}
Loading