Skip to content

Commit

Permalink
Fix "java - monitor-opentelemetry-exporter - tests" pipeline (#44522)
Browse files Browse the repository at this point in the history
  • Loading branch information
jeanbisutti authored Mar 6, 2025
1 parent 5806027 commit cce51bc
Show file tree
Hide file tree
Showing 4 changed files with 159 additions and 27 deletions.
2 changes: 2 additions & 0 deletions sdk/monitor/azure-monitor-opentelemetry-exporter/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@
<!-- Configures the Java 9+ run to perform the required module exports, opens, and reads that are necessary for testing but shouldn't be part of the module-info. -->
<javaModulesSurefireArgLine>
--add-opens com.azure.monitor.opentelemetry.exporter/com.azure.monitor.opentelemetry.exporter=ALL-UNNAMED
--add-exports com.azure.core/com.azure.core.implementation.util=ALL-UNNAMED
--add-reads com.azure.monitor.opentelemetry.autoconfigure=com.azure.core.tracing.opentelemetry
</javaModulesSurefireArgLine>

<checkstyle.suppressionsLocation>checkstyle-suppressions.xml</checkstyle.suppressionsLocation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpLogDetailLevel;
import com.azure.core.http.policy.HttpLogOptions;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.test.annotation.LiveOnly;
import com.azure.core.util.Configuration;
import com.azure.core.util.Context;
import com.azure.core.util.FluxUtil;
import com.azure.data.appconfiguration.ConfigurationClient;
Expand Down Expand Up @@ -37,6 +39,15 @@

@LiveOnly
public class AppConfigurationExporterIntegrationTest extends MonitorExporterClientTestBase {

private TokenCredential credential;

@Override
public void beforeTest() {
super.beforeTest();
credential = TokenCredentialUtil.getTestTokenCredential(interceptorManager);
}

@Test
public void setConfigurationTest() throws InterruptedException {
CountDownLatch exporterCountDown = new CountDownLatch(1);
Expand Down Expand Up @@ -87,8 +98,10 @@ public void testDisableTracing() throws InterruptedException {
assertTrue(exporterCountDown.await(60, TimeUnit.SECONDS));
}

private static ConfigurationClient getConfigurationClient() {
return new ConfigurationClientBuilder().connectionString(System.getenv("AZURE_APPCONFIG_CONNECTION_STRING"))
private ConfigurationClient getConfigurationClient() {
String endPoint = Configuration.getGlobalConfiguration().get("AZURE_APPCONFIG_ENDPOINT");
return new ConfigurationClientBuilder().credential(credential)
.endpoint(endPoint)
.httpLogOptions(new HttpLogOptions().setLogLevel(HttpLogDetailLevel.BODY_AND_HEADERS))
.buildClient();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,18 @@

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.credential.TokenCredential;
import com.azure.core.http.HttpPipelineCallContext;
import com.azure.core.http.HttpPipelineNextPolicy;
import com.azure.core.http.HttpPipelineNextSyncPolicy;
import com.azure.core.http.HttpResponse;
import com.azure.core.http.policy.HttpPipelinePolicy;
import com.azure.core.test.annotation.LiveOnly;
import com.azure.core.tracing.opentelemetry.OpenTelemetryTracingOptions;
import com.azure.core.util.ClientOptions;
import com.azure.core.util.FluxUtil;
import com.azure.core.util.logging.ClientLogger;
import com.azure.core.util.Configuration;
import com.azure.messaging.eventhubs.EventData;
import com.azure.messaging.eventhubs.EventHubClientBuilder;
import com.azure.messaging.eventhubs.EventHubProducerAsyncClient;
Expand All @@ -14,17 +23,25 @@
import com.azure.messaging.eventhubs.LoadBalancingStrategy;
import com.azure.messaging.eventhubs.checkpointstore.blob.BlobCheckpointStore;
import com.azure.messaging.eventhubs.models.CreateBatchOptions;
import com.azure.monitor.opentelemetry.exporter.implementation.localstorage.LocalStorageTelemetryPipelineListener;
import com.azure.monitor.opentelemetry.exporter.implementation.models.MonitorDomain;
import com.azure.monitor.opentelemetry.exporter.implementation.models.RemoteDependencyData;
import com.azure.monitor.opentelemetry.exporter.implementation.models.TelemetryItem;
import com.azure.monitor.opentelemetry.exporter.implementation.utils.TestUtils;
import com.azure.storage.blob.BlobContainerAsyncClient;
import com.azure.storage.blob.BlobContainerClientBuilder;
import io.opentelemetry.api.OpenTelemetry;
import io.opentelemetry.api.trace.Span;
import io.opentelemetry.api.trace.Tracer;
import io.opentelemetry.context.Scope;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import reactor.core.publisher.Mono;
import reactor.test.StepVerifier;

import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

Expand All @@ -37,38 +54,80 @@ public class EventHubsExporterIntegrationTest extends MonitorExporterClientTestB
private static final String STORAGE_CONNECTION_STRING = System.getenv("STORAGE_CONNECTION_STRING");
private static final String CONTAINER_NAME = System.getenv("STORAGE_CONTAINER_NAME");

private static final ClientLogger LOGGER = new ClientLogger(EventHubsExporterIntegrationTest.class);

private TokenCredential credential;

@Override
public void beforeTest() {
super.beforeTest();
credential = TokenCredentialUtil.getTestTokenCredential(interceptorManager);
}

@Test
@SuppressWarnings("try")
public void producerTest() throws InterruptedException {
String ehNamespace = Configuration.getGlobalConfiguration().get("AZURE_EVENTHUBS_FULLY_QUALIFIED_DOMAIN_NAME");
String ehName = Configuration.getGlobalConfiguration().get("AZURE_EVENTHUBS_EVENT_HUB_NAME");

CountDownLatch exporterCountDown = new CountDownLatch(2);
String spanName = "event-hubs-producer-testing";
HttpPipelinePolicy validationPolicy = (context, next) -> {
Mono<String> asyncString = FluxUtil.collectBytesInByteBufferStream(context.getHttpRequest().getBody())
.map(bytes -> new String(bytes, StandardCharsets.UTF_8));
asyncString.subscribe(value -> {
if (value.contains(spanName)) {
exporterCountDown.countDown();
}
if (value.contains("EventHubs.send")) {
exporterCountDown.countDown();
HttpPipelinePolicy validationPolicy = new HttpPipelinePolicy() {
@Override
public Mono<HttpResponse> process(HttpPipelineCallContext context, HttpPipelineNextPolicy next) {
checkTelemetry(context);
return next.process();
}

@Override
public HttpResponse processSync(HttpPipelineCallContext context, HttpPipelineNextSyncPolicy next) {
checkTelemetry(context);
return next.processSync();
}

private void checkTelemetry(HttpPipelineCallContext context) {
byte[] asyncBytes = LocalStorageTelemetryPipelineListener
.ungzip(context.getHttpRequest().getBodyAsBinaryData().toBytes());
List<TelemetryItem> telemetryItems = TestUtils.deserialize(asyncBytes);

for (TelemetryItem telemetryItem : telemetryItems) {
MonitorDomain monitorDomain = telemetryItem.getData().getBaseData();
RemoteDependencyData remoteDependencyData = TestUtils.toRemoteDependencyData(monitorDomain);
String remoteDependencyName = remoteDependencyData.getName();
if (remoteDependencyName.contains(spanName)) {
exporterCountDown.countDown();
LOGGER.info("Count down " + spanName);
} else if (("send " + ehName).equals(remoteDependencyName)) {
exporterCountDown.countDown();
LOGGER.info("Count down eventHubs send");
} else {
LOGGER.info("remoteDependencyName = " + remoteDependencyName);
}
}
});
return next.process();
}
};
Tracer tracer = TestUtils.createOpenTelemetrySdk(getHttpPipeline(validationPolicy)).getTracer("Sample");
EventHubProducerAsyncClient producer
= new EventHubClientBuilder().connectionString(CONNECTION_STRING).buildAsyncProducerClient();
Span span = tracer.spanBuilder(spanName).startSpan();
Scope scope = span.makeCurrent();
try {
producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData("test event"));
return producer.send(batch);
}).subscribe();
} finally {
span.end();
scope.close();

OpenTelemetry otel = TestUtils.createOpenTelemetrySdk(getHttpPipeline(validationPolicy));
Tracer tracer = otel.getTracer("Sample");

try (EventHubProducerAsyncClient producer = new EventHubClientBuilder().credential(credential)
.fullyQualifiedNamespace(ehNamespace)
.eventHubName(ehName)
.clientOptions(
new ClientOptions().setTracingOptions(new OpenTelemetryTracingOptions().setOpenTelemetry(otel)))
.buildAsyncProducerClient()) {

Span span = tracer.spanBuilder(spanName).startSpan();
try (Scope scope = span.makeCurrent()) {
StepVerifier.create(producer.createBatch().flatMap(batch -> {
batch.tryAdd(new EventData("test event"));
return producer.send(batch);
})).expectComplete().verify(Duration.ofSeconds(60));
} finally {
span.end();
}
}
assertTrue(exporterCountDown.await(5, TimeUnit.SECONDS));
assertTrue(exporterCountDown.await(20, TimeUnit.SECONDS));
}

@Disabled("Processor integration tests require separate consumer group to not have partition contention in CI - https://github.com/Azure/azure-sdk-for-java/issues/23567")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package com.azure.monitor.opentelemetry.exporter;

import com.azure.core.credential.TokenCredential;
import com.azure.core.test.InterceptorManager;
import com.azure.core.test.utils.MockTokenCredential;
import com.azure.core.util.Configuration;
import com.azure.core.util.CoreUtils;
import com.azure.identity.AzurePipelinesCredentialBuilder;
import com.azure.identity.DefaultAzureCredentialBuilder;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;

class TokenCredentialUtil {

/**
* Gets a token credential for use in tests.
* @param interceptorManager the interceptor manager
* @return the TokenCredential
*/
static TokenCredential getTestTokenCredential(InterceptorManager interceptorManager) {
if (interceptorManager.isLiveMode()) {
return getPipelineCredential();
} else if (interceptorManager.isRecordMode()) {
return new DefaultAzureCredentialBuilder().build();
} else {
return new MockTokenCredential();
}
}

private static TokenCredential getPipelineCredential() {
final String serviceConnectionId = getPropertyValue("AZURESUBSCRIPTION_SERVICE_CONNECTION_ID");
final String clientId = getPropertyValue("AZURESUBSCRIPTION_CLIENT_ID");
final String tenantId = getPropertyValue("AZURESUBSCRIPTION_TENANT_ID");
final String systemAccessToken = getPropertyValue("SYSTEM_ACCESSTOKEN");

if (CoreUtils.isNullOrEmpty(serviceConnectionId)
|| CoreUtils.isNullOrEmpty(clientId)
|| CoreUtils.isNullOrEmpty(tenantId)
|| CoreUtils.isNullOrEmpty(systemAccessToken)) {
return null;
}

TokenCredential cred = new AzurePipelinesCredentialBuilder().systemAccessToken(systemAccessToken)
.clientId(clientId)
.tenantId(tenantId)
.serviceConnectionId(serviceConnectionId)
.build();

return request -> Mono.defer(() -> cred.getToken(request)).subscribeOn(Schedulers.boundedElastic());
}

private static String getPropertyValue(String propertyName) {
return Configuration.getGlobalConfiguration().get(propertyName, System.getenv(propertyName));
}
}

0 comments on commit cce51bc

Please sign in to comment.