Skip to content


[fix][test] Fix quiet time implementation in BrokerTestUtil.receiveMe…
Browse files Browse the repository at this point in the history
…ssages (apache#23876)

(cherry picked from commit 52e8730)
(cherry picked from commit 720184d)
  • Loading branch information
lhotari authored and srinath-ctds committed Feb 3, 2025
1 parent 094f1ba commit c8eadc1
Show file tree
Hide file tree
Showing 2 changed files with 413 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,42 @@

import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import java.time.Duration;
import java.util.Arrays;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import lombok.SneakyThrows;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.common.util.ObjectMapperFactory;
import org.mockito.Mockito;

import org.slf4j.Logger;
* Holds util methods used in test.
Expand Down Expand Up @@ -77,4 +110,268 @@ public static <T> T spyWithoutRecordingInvocations(T object) {

* Uses Jackson to create a JSON string for the given object
* @param object to convert to JSON
* @return JSON string
public static String toJson(Object object) {
ObjectWriter writer = ObjectMapperFactory.getMapper().writer();
StringWriter stringWriter = new StringWriter();
try (JsonGenerator generator = writer.createGenerator(stringWriter).useDefaultPrettyPrinter()) {
} catch (IOException e) {
throw new UncheckedIOException(e);
return stringWriter.toString();

* Logs the topic stats and internal stats for the given topic
* @param logger logger to use
* @param pulsarAdmin PulsarAdmin client to use
* @param topic topic name
public static void logTopicStats(Logger logger, PulsarAdmin pulsarAdmin, String topic) {
try {"[{}] stats: {}", topic, toJson(pulsarAdmin.topics().getStats(topic)));"[{}] internalStats: {}", topic,
toJson(pulsarAdmin.topics().getInternalStats(topic, true)));
} catch (PulsarAdminException e) {
logger.warn("Failed to get stats for topic {}", topic, e);

* Logs the topic stats and internal stats for the given topic
* @param logger logger to use
* @param baseUrl Pulsar service URL
* @param topic topic name
public static void logTopicStats(Logger logger, String baseUrl, String topic) {
logTopicStats(logger, baseUrl, "public", "default", topic);

* Logs the topic stats and internal stats for the given topic
* @param logger logger to use
* @param baseUrl Pulsar service URL
* @param tenant tenant name
* @param namespace namespace name
* @param topic topic name
public static void logTopicStats(Logger logger, String baseUrl, String tenant, String namespace, String topic) {
String topicStatsUri =
String.format("%s/admin/v2/persistent/%s/%s/%s/stats", baseUrl, tenant, namespace, topic);"[{}] stats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsUri)));
String topicStatsInternalUri =
String.format("%s/admin/v2/persistent/%s/%s/%s/internalStats", baseUrl, tenant, namespace, topic);"[{}] internalStats: {}", topic, jsonPrettyPrint(getJsonResourceAsString(topicStatsInternalUri)));

* Pretty print the given JSON string
* @param jsonString JSON string to pretty print
* @return pretty printed JSON string
public static String jsonPrettyPrint(String jsonString) {
try {
ObjectMapper mapper = new ObjectMapper();
Object json = mapper.readValue(jsonString, Object.class);
ObjectWriter writer = mapper.writerWithDefaultPrettyPrinter();
return writer.writeValueAsString(json);
} catch (IOException e) {
throw new UncheckedIOException(e);

* Get the resource as a string from the given URI
public static String getJsonResourceAsString(String uri) {
URL url = new URL(uri);
HttpURLConnection connection = (HttpURLConnection) url.openConnection();
connection.setRequestProperty("Accept", "application/json");
try {
int responseCode = connection.getResponseCode();
if (responseCode == 200) {
try (BufferedReader in = new BufferedReader(new InputStreamReader(connection.getInputStream()))) {
String inputLine;
StringBuilder content = new StringBuilder();
while ((inputLine = in.readLine()) != null) {
return content.toString();
} else {
throw new IOException("Failed to get resource: " + uri + ", status: " + responseCode);
} finally {

* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
* The message handler should return true if it wants to continue receiving more messages, false otherwise.
* @param messageHandler the message handler
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
* @param consumers the consumers to receive messages from
* @param <T> the message value type
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
Duration quietTimeout,
Consumer<T>... consumers) {
receiveMessages(messageHandler, quietTimeout,;

* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
* The message handler should return true if it wants to continue receiving more messages, false otherwise.
* @param messageHandler the message handler
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
* @param consumers the consumers to receive messages from
* @param <T> the message value type
public static <T> void receiveMessages(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
Duration quietTimeout,
Stream<Consumer<T>> consumers) {
long quietTimeoutNanos = quietTimeout.toNanos();
AtomicLong lastMessageReceivedNanos = new AtomicLong(System.nanoTime());
.map(consumer -> receiveMessagesAsync(consumer, quietTimeoutNanos, quietTimeoutNanos, messageHandler,

// asynchronously receive messages from a consumer and handle them using the provided message handler
// the benefit is that multiple consumers can be concurrently consumed without the need to have multiple threads
// this is useful in tests where multiple consumers are needed to test the functionality
private static <T> CompletableFuture<Void> receiveMessagesAsync(Consumer<T> consumer,
long quietTimeoutNanos,
long receiveTimeoutNanos,
BiFunction<Consumer<T>, Message<T>, Boolean>
AtomicLong lastMessageReceivedNanos) {
return consumer.receiveAsync()
.orTimeout(receiveTimeoutNanos, TimeUnit.NANOSECONDS)
.handle((msg, t) -> {
long currentNanos = System.nanoTime();
if (t != null) {
if (t instanceof TimeoutException) {
long sinceLastMessageReceivedNanos = currentNanos - lastMessageReceivedNanos.get();
if (sinceLastMessageReceivedNanos > quietTimeoutNanos) {
return Pair.of(false, 0L);
} else {
return Pair.of(true, quietTimeoutNanos - sinceLastMessageReceivedNanos);
} else {
throw FutureUtil.wrapToCompletionException(t);
return Pair.of(messageHandler.apply(consumer, msg), quietTimeoutNanos);
}).thenComposeAsync(receiveMoreAndNextTimeout -> {
boolean receiveMore = receiveMoreAndNextTimeout.getLeft();
if (receiveMore) {
Long nextReceiveTimeoutNanos = receiveMoreAndNextTimeout.getRight();
return receiveMessagesAsync(consumer, quietTimeoutNanos, nextReceiveTimeoutNanos,
messageHandler, lastMessageReceivedNanos);
} else {
return CompletableFuture.completedFuture(null);

* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
* The messages are received until the quiet timeout is reached or the maximum number of messages is received.
* @param messageHandler the message handler
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
* @param maxMessages the maximum number of messages to receive
* @param consumers the consumers to receive messages from
* @param <T> the message value type
public static <T> void receiveMessagesN(BiConsumer<Consumer<T>, Message<T>> messageHandler,
Duration quietTimeout,
int maxMessages,
Consumer<T>... consumers)
throws ExecutionException, InterruptedException {
AtomicInteger messagesReceived = new AtomicInteger();
(consumer, message) -> {
messageHandler.accept(consumer, message);
return messagesReceived.incrementAndGet() < maxMessages;
}, quietTimeout, consumers);

* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
* @param messageHandler the message handler
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
* @param consumers the consumers to receive messages from
* @param <T> the message value type
public static <T> void receiveMessagesInThreads(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
final Duration quietTimeout,
Consumer<T>... consumers) {
receiveMessagesInThreads(messageHandler, quietTimeout,;

* Receive messages concurrently from multiple consumers and handles them using the provided message handler.
* @param messageHandler the message handler
* @param quietTimeout the duration of quiet time after which the method will stop waiting for more messages
* @param consumers the consumers to receive messages from
* @param <T> the message value type
public static <T> void receiveMessagesInThreads(BiFunction<Consumer<T>, Message<T>, Boolean> messageHandler,
final Duration quietTimeout,
Stream<Consumer<T>> consumers) {
FutureUtil.waitForAll( -> {
return CompletableFuture.runAsync(() -> {
try {
while (!Thread.currentThread().isInterrupted()) {
Message<T> msg = consumer.receive((int) quietTimeout.toMillis(), TimeUnit.MILLISECONDS);
if (msg != null) {
if (!messageHandler.apply(consumer, msg)) {
} else {
} catch (PulsarClientException e) {
throw new CompletionException(e);
}, runnable -> {
Thread thread = new Thread(runnable, "Consumer-" + consumer.getConsumerName());

private static long mockConsumerIdGenerator = 0;

public static createMockConsumer(String consumerName) {
long consumerId = mockConsumerIdGenerator++;
return createMockConsumer(consumerName, consumerName + " consumerId:" + consumerId, consumerId);

public static createMockConsumer(String consumerName, String toString, long consumerId) {
// without stubOnly, the mock will record method invocations and could run into OOME
consumer = mock(, Mockito.withSettings().stubOnly());
when(consumer.toString()).thenReturn(consumerName + " consumerId:" + consumerId);
return consumer;

0 comments on commit c8eadc1

Please sign in to comment.