From c8eadc1fe2731d9d6cc2550a772f19536fa4c1e8 Mon Sep 17 00:00:00 2001 From: Lari Hotari Date: Wed, 22 Jan 2025 22:48:46 -0800 Subject: [PATCH] [fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMessages (#23876) (cherry picked from commit 52e8730613c36008ea57a0ca5c10231512232d7e) (cherry picked from commit 720184d98e54cfccf1b96ed70c11c3ac9e6467a2) --- .../apache/pulsar/broker/BrokerTestUtil.java | 299 +++++++++++++++++- .../pulsar/broker/BrokerTestUtilTest.java | 115 +++++++ 2 files changed, 413 insertions(+), 1 deletion(-) create mode 100644 pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java index bfb172d0711d4..e97928c4c66e0 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtil.java @@ -18,9 +18,42 @@ */ package org.apache.pulsar.broker; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import com.fasterxml.jackson.core.JsonGenerator; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.ObjectWriter; +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.io.StringWriter; +import java.io.UncheckedIOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.time.Duration; +import java.util.Arrays; import java.util.UUID; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; +import java.util.function.BiConsumer; +import java.util.function.BiFunction; +import java.util.stream.Stream; +import lombok.SneakyThrows; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.pulsar.client.admin.PulsarAdmin; +import org.apache.pulsar.client.admin.PulsarAdminException; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.PulsarClientException; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.common.util.ObjectMapperFactory; import org.mockito.Mockito; - +import org.slf4j.Logger; /** * Holds util methods used in test. */ @@ -77,4 +110,268 @@ public static T spyWithoutRecordingInvocations(T object) { .defaultAnswer(Mockito.CALLS_REAL_METHODS) .stubOnly()); } + + /** + * Uses Jackson to create a JSON string for the given object + * @param object to convert to JSON + * @return JSON string + */ + public static String toJson(Object object) { + ObjectWriter writer = ObjectMapperFactory.getMapper().writer(); + StringWriter stringWriter = new StringWriter(); + try (JsonGenerator generator = writer.createGenerator(stringWriter).useDefaultPrettyPrinter()) { + generator.writeObject(object); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + return stringWriter.toString(); + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param pulsarAdmin PulsarAdmin client to use + * @param topic topic name + */ + public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) { + try { + logger.info("[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic))); + logger.info("[{}] internalStats: {}", topic, + toJson(pulsarAdmin.topics().getInternalStats(topic, true))); + } catch (PulsarAdminException e) { + logger.warn("Failed to get stats for topic {}", topic, e); + } + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String topic) { + logTopicStats(logger, baseUrl, "public", "default", topic); + } + + /** + * Logs the topic stats and internal stats for the given topic + * @param logger logger to use + * @param baseUrl Pulsar service URL + * @param tenant tenant name + * @param namespace namespace name + * @param topic topic name + */ + public static void logTopicStats(Logger logger, String baseUrl, String tenant, String namespace, String topic) { + String topicStatsUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/stats", baseUrl, tenant, namespace, topic); + logger.info("[{}] stats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsUri))); + String topicStatsInternalUri = + String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", baseUrl, tenant, namespace, topic); + logger.info("[{}] internalStats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri))); + } + + /** + * Pretty print the given JSON string + * @param jsonString JSON string to pretty print + * @return pretty printed JSON string + */ + public static String jsonPrettyPrint(String jsonString) { + try { + ObjectMapper mapper = new ObjectMapper(); + Object json = mapper.readValue(jsonString, Object.class); + ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter(); + return writer.writeValueAsString(json); + } catch (IOException e) { + throw new UncheckedIOException(e); + } + } + + /** + * Get the resource as a string from the given URI + */ + @SneakyThrows + public static String getJsonResourceAsString(String uri) { + URL url = new URL(uri); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setRequestMethod("GET"); + connection.setRequestProperty("Accept", "application/json"); + try { + int responseCode = connection.getResponseCode(); + if (responseCode == 200) { + try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) { + String inputLine; + StringBuilder content = new StringBuilder(); + while ((inputLine = in.readLine()) != null) { + content.append(inputLine); + } + return content.toString(); + } + } else { + throw new IOException("Failed to get resource: " + uri + ", status: " + responseCode); + } + } finally { + connection.disconnect(); + } + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The message handler should return true if it wants to continue receiving more messages, false otherwise. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, + Duration quietTimeout, + Consumer... consumers) { + receiveMessages(messageHandler, quietTimeout, Arrays.stream(consumers)); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The message handler should return true if it wants to continue receiving more messages, false otherwise. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessages(BiFunction, Message, Boolean> messageHandler, + Duration quietTimeout, + Stream> consumers) { + long quietTimeoutNanos = quietTimeout.toNanos(); + AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime()); + FutureUtil.waitForAll(consumers + .map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler, + lastMessageReceivedNanos)).toList()).join(); + } + + // asynchronously receive messages from a consumer and handle them using the provided message handler + // the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads + // this is useful in tests where multiple consumers are needed to test the functionality + private static CompletableFuture receiveMessagesAsync(Consumer consumer, + long quietTimeoutNanos, + long receiveTimeoutNanos, + BiFunction, Message, Boolean> + messageHandler, + AtomicLong lastMessageReceivedNanos) { + return consumer.receiveAsync() + .orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS) + .handle((msg, t) -> { + long currentNanos = System.nanoTime(); + if (t != null) { + if (t instanceof TimeoutException) { + long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get(); + if (sinceLastMessageReceivedNanos > quietTimeoutNanos) { + return Pair.of(false, 0L); + } else { + return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos); + } + } else { + throw FutureUtil.wrapToCompletionException(t); + } + } + lastMessageReceivedNanos.set(currentNanos); + return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos); + }).thenComposeAsync(receiveMoreAndNextTimeout -> { + boolean receiveMore = receiveMoreAndNextTimeout.getLeft(); + if (receiveMore) { + Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight(); + return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos, + messageHandler, lastMessageReceivedNanos); + } else { + return CompletableFuture.completedFuture(null); + } + }); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * The messages are received until the quiet timeout is reached or the maximum number of messages is received. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param maxMessages the maximum number of messages to receive + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessagesN(BiConsumer, Message> messageHandler, + Duration quietTimeout, + int maxMessages, + Consumer... consumers) + throws ExecutionException, InterruptedException { + AtomicInteger messagesReceived = new AtomicInteger(); + receiveMessages( + (consumer, message) -> { + messageHandler.accept(consumer, message); + return messagesReceived.incrementAndGet() < maxMessages; + }, quietTimeout, consumers); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessagesInThreads(BiFunction, Message, Boolean> messageHandler, + final Duration quietTimeout, + Consumer... consumers) { + receiveMessagesInThreads(messageHandler, quietTimeout, Arrays.stream(consumers).sequential()); + } + + /** + * Receive messages concurrently from multiple consumers and handles them using the provided message handler. + * + * @param messageHandler the message handler + * @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages + * @param consumers the consumers to receive messages from + * @param the message value type + */ + public static void receiveMessagesInThreads(BiFunction, Message, Boolean> messageHandler, + final Duration quietTimeout, + Stream> consumers) { + FutureUtil.waitForAll(consumers.map(consumer -> { + return CompletableFuture.runAsync(() -> { + try { + while (!Thread.currentThread().isInterrupted()) { + Message msg = consumer.receive((int) quietTimeout.toMillis(), TimeUnit.MILLISECONDS); + if (msg != null) { + if (!messageHandler.apply(consumer, msg)) { + break; + } + } else { + break; + } + } + } catch (PulsarClientException e) { + throw new CompletionException(e); + } + }, runnable -> { + Thread thread = new Thread(runnable, "Consumer-" + consumer.getConsumerName()); + thread.start(); + }); + }).toList()).join(); + } + + private static long mockConsumerIdGenerator = 0; + + public static org.apache.pulsar.broker.service.Consumer createMockConsumer(String consumerName) { + long consumerId = mockConsumerIdGenerator++; + return createMockConsumer(consumerName, consumerName + " consumerId:" + consumerId, consumerId); + } + + public static org.apache.pulsar.broker.service.Consumer createMockConsumer(String consumerName, String toString, long consumerId) { + // without stubOnly, the mock will record method invocations and could run into OOME + org.apache.pulsar.broker.service.Consumer + consumer = mock(org.apache.pulsar.broker.service.Consumer.class, Mockito.withSettings().stubOnly()); + when(consumer.consumerName()).thenReturn(consumerName); + when(consumer.toString()).thenReturn(consumerName + " consumerId:" + consumerId); + when(consumer.consumerId()).thenReturn(consumerId); + return consumer; + } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java new file mode 100644 index 0000000000000..90b917a319c71 --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/BrokerTestUtilTest.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.broker; + +import static org.assertj.core.api.Assertions.assertThat; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.testng.Assert.assertEquals; +import java.time.Duration; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.BiFunction; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Consumer; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.common.util.FutureUtil; +import org.testng.annotations.Test; + +@Slf4j +public class BrokerTestUtilTest { + @Test + public void testReceiveMessagesQuietTime() throws Exception { + // Mock consumers + Consumer consumer1 = mock(Consumer.class); + Consumer consumer2 = mock(Consumer.class); + + long consumer1DelayMs = 300L; + long consumer2DelayMs = 400L; + long quietTimeMs = 500L; + + // Define behavior for receiveAsync with delay + AtomicBoolean consumer1FutureContinueSupplying = new AtomicBoolean(true); + when(consumer1.receiveAsync()).thenAnswer(invocation -> { + if (consumer1FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer1DelayMs, TimeUnit.MILLISECONDS)); + consumer1FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer1FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + AtomicBoolean consumer2FutureContinueSupplying = new AtomicBoolean(true); + when(consumer2.receiveAsync()).thenAnswer(invocation -> { + if (consumer2FutureContinueSupplying.get()) { + CompletableFuture messageCompletableFuture = + CompletableFuture.supplyAsync(() -> mock(Message.class), + CompletableFuture.delayedExecutor(consumer2DelayMs, TimeUnit.MILLISECONDS)); + consumer2FutureContinueSupplying.set(false); + // continue supplying while the future is cancelled or timed out + FutureUtil.whenCancelledOrTimedOut(messageCompletableFuture, () -> { + consumer2FutureContinueSupplying.set(true); + }); + return messageCompletableFuture; + } else { + return new CompletableFuture<>(); + } + }); + + // Atomic variables to track message handling + AtomicInteger messageCount = new AtomicInteger(0); + + // Message handler + BiFunction, Message, Boolean> messageHandler = (consumer, msg) -> { + messageCount.incrementAndGet(); + return true; + }; + + // Track start time + long startTime = System.nanoTime(); + + // Call receiveMessages method + BrokerTestUtil.receiveMessages(messageHandler, Duration.ofMillis(quietTimeMs), consumer1, consumer2); + + // Track end time + long endTime = System.nanoTime(); + + // Verify that messages were attempted to be received + verify(consumer1, times(3)).receiveAsync(); + verify(consumer2, times(2)).receiveAsync(); + + // Verify that the message handler was called + assertEquals(messageCount.get(), 2); + + // Verify the time spent is as expected (within a reasonable margin) + long durationMillis = TimeUnit.NANOSECONDS.toMillis(endTime - startTime); + assertThat(durationMillis).isBetween(consumer2DelayMs + quietTimeMs, + consumer2DelayMs + quietTimeMs + (quietTimeMs / 2)); + } +} \ No newline at end of file