Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve] [broker] Make the estimated entry size more accurate #23931

Merged
merged 17 commits into from
Feb 25, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_BACKOFF_TIME_SEC;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.DEFAULT_LEDGER_DELETE_RETRIES;
import static org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.createManagedLedgerException;
import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.apache.bookkeeper.mledger.util.Errors.isNoSuchLedgerExistsException;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.MoreObjects;
Expand Down Expand Up @@ -3810,26 +3811,51 @@ public int applyMaxSizeCap(int maxEntries, long maxSizeBytes) {
if (maxSizeBytes == NO_MAX_SIZE_LIMIT) {
return maxEntries;
}
int maxEntriesBasedOnSize =
Long.valueOf(estimateEntryCountBySize(maxSizeBytes, readPosition, ledger)).intValue();
return Math.min(maxEntriesBasedOnSize, maxEntries);
}

double avgEntrySize = ledger.getStats().getEntrySizeAverage();
if (!Double.isFinite(avgEntrySize)) {
// We don't have yet any stats on the topic entries. Let's try to use the cursor avg size stats
avgEntrySize = (double) entriesReadSize / (double) entriesReadCount;
}

if (!Double.isFinite(avgEntrySize)) {
// If we still don't have any information, it means this is the first time we attempt reading
// and there are no writes. Let's start with 1 to avoid any overflow and start the avg stats
return 1;
static long estimateEntryCountBySize(long bytesSize, Position readPosition, ManagedLedgerImpl ml) {
Position posToRead = readPosition;
if (!ml.isValidPosition(readPosition)) {
posToRead = ml.getNextValidPosition(readPosition);
}
long result = 0;
long remainingBytesSize = bytesSize;

int maxEntriesBasedOnSize = (int) (maxSizeBytes / avgEntrySize);
if (maxEntriesBasedOnSize < 1) {
// We need to read at least one entry
return 1;
while (remainingBytesSize > 0) {
// Last ledger.
if (posToRead.getLedgerId() == ml.getCurrentLedger().getId()) {
if (ml.getCurrentLedgerSize() == 0 || ml.getCurrentLedgerEntries() == 0) {
// Only read 1 entry if no entries to read.
return 1;
}
long avg = Math.max(1, ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries())
+ BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
result += remainingBytesSize / avg;
break;
}
// Skip empty ledger.
LedgerInfo ledgerInfo = ml.getLedgersInfo().get(posToRead.getLedgerId());
if (ledgerInfo.getSize() == 0 || ledgerInfo.getEntries() == 0) {
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
continue;
}
// Calculate entries by average of ledgers.
long avg = Math.max(1, ledgerInfo.getSize() / ledgerInfo.getEntries()) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long remainEntriesOfLedger = ledgerInfo.getEntries() - posToRead.getEntryId();
if (remainEntriesOfLedger * avg >= remainingBytesSize) {
result += remainingBytesSize / avg;
break;
} else {
// Calculate for the next ledger.
result += remainEntriesOfLedger;
remainingBytesSize -= remainEntriesOfLedger * avg;
posToRead = ml.getNextValidPosition(PositionFactory.create(posToRead.getLedgerId(), Long.MAX_VALUE));
}
}

return Math.min(maxEntriesBasedOnSize, maxEntries);
return Math.max(result, 1);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,8 @@ public class ManagedLedgerImpl implements ManagedLedger, CreateCallback {
private final CallbackMutex offloadMutex = new CallbackMutex();
public static final CompletableFuture<Position> NULL_OFFLOAD_PROMISE = CompletableFuture
.completedFuture(PositionFactory.LATEST);
@VisibleForTesting
@Getter
protected volatile LedgerHandle currentLedger;
protected volatile long currentLedgerEntries = 0;
protected volatile long currentLedgerSize = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -303,7 +303,7 @@ void asyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Position
doAsyncReadEntriesByPosition(lh, firstPosition, lastPosition, numberOfEntries, shouldCacheEntry,
originalCallback, ctx);
} else {
long estimatedEntrySize = getEstimatedEntrySize();
long estimatedEntrySize = getEstimatedEntrySize(lh);
long estimatedReadSize = numberOfEntries * estimatedEntrySize;
if (log.isDebugEnabled()) {
log.debug("Estimated read size: {} bytes for {} entries with {} estimated entry size",
Expand Down Expand Up @@ -419,12 +419,12 @@ void doAsyncReadEntriesByPosition(ReadHandle lh, Position firstPosition, Positio
}

@VisibleForTesting
public long getEstimatedEntrySize() {
long estimatedEntrySize = getAvgEntrySize();
if (estimatedEntrySize == 0) {
estimatedEntrySize = DEFAULT_ESTIMATED_ENTRY_SIZE;
public long getEstimatedEntrySize(ReadHandle lh) {
if (lh.getLength() == 0 || lh.getLastAddConfirmed() < 0) {
// No entries stored.
return Math.max(getAvgEntrySize(), DEFAULT_ESTIMATED_ENTRY_SIZE) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}
return estimatedEntrySize + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
return Math.max(1, lh.getLength() / (lh.getLastAddConfirmed() + 1)) + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
}

private long getAvgEntrySize() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -141,10 +141,9 @@ public void testPreciseLimitation(String missingCase) throws Exception {
SimpleReadEntriesCallback cb0 = new SimpleReadEntriesCallback();
entryCache.asyncReadEntry(spyCurrentLedger, 125, 125, true, cb0, ctx);
cb0.entries.join();
Long sizePerEntry1 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry1, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
int sizePerEntry = Long.valueOf(entryCache.getEstimatedEntrySize(ml.currentLedger)).intValue();
Awaitility.await().untilAsserted(() -> {
long remainingBytes =limiter.getRemainingBytes();
long remainingBytes = limiter.getRemainingBytes();
Assert.assertEquals(remainingBytes, totalCapacity);
});
log.info("remainingBytes 0: {}", limiter.getRemainingBytes());
Expand All @@ -165,7 +164,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
entryCache.asyncReadEntry(spyCurrentLedger, start2, end2, true, cb2, ctx);
}).start();

long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, 1);
long bytesAcquired1 = calculateBytesSizeBeforeFirstReading(readCount1 + readCount2, sizePerEntry);
long remainingBytesExpected1 = totalCapacity - bytesAcquired1;
log.info("acquired : {}", bytesAcquired1);
log.info("remainingBytesExpected 0 : {}", remainingBytesExpected1);
Expand All @@ -178,9 +177,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
Thread.sleep(3000);
readCompleteSignal1.countDown();
cb1.entries.join();
Long sizePerEntry2 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry2, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, 1);
long bytesAcquired2 = calculateBytesSizeBeforeFirstReading(readCount2, sizePerEntry);
long remainingBytesExpected2 = totalCapacity - bytesAcquired2;
log.info("acquired : {}", bytesAcquired2);
log.info("remainingBytesExpected 1: {}", remainingBytesExpected2);
Expand All @@ -191,8 +188,6 @@ public void testPreciseLimitation(String missingCase) throws Exception {

readCompleteSignal2.countDown();
cb2.entries.join();
Long sizePerEntry3 = entryCache.getEstimatedEntrySize();
Assert.assertEquals(sizePerEntry3, 1 + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
Awaitility.await().untilAsserted(() -> {
long remainingBytes = limiter.getRemainingBytes();
log.info("remainingBytes 2: {}", remainingBytes);
Expand All @@ -204,7 +199,7 @@ public void testPreciseLimitation(String missingCase) throws Exception {
}

private long calculateBytesSizeBeforeFirstReading(int entriesCount, int perEntrySize) {
return entriesCount * (perEntrySize + RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
return entriesCount * perEntrySize;
}

class SimpleReadEntriesCallback implements AsyncCallbacks.ReadEntriesCallback {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.bookkeeper.mledger.impl;

import static org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl.BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doAnswer;
Expand Down Expand Up @@ -686,13 +687,15 @@ void testAsyncReadWithMaxSizeByte() throws Exception {
ManagedCursor cursor = ledger.openCursor("c1");

for (int i = 0; i < 100; i++) {
ledger.addEntry(new byte[1024]);
ledger.addEntry(new byte[(int) (1024)]);
}

// First time, since we don't have info, we'll get 1 single entry
readAndCheck(cursor, 10, 3 * 1024, 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (BOOKKEEPER_READ_OVERHEAD_PER_ENTRY + 1024);
readAndCheck(cursor, 10, 3 * avg, 3);
// We should only return 3 entries, based on the max size
readAndCheck(cursor, 20, 3 * 1024, 3);
readAndCheck(cursor, 20, 3 * avg, 3);
// If maxSize is < avg, we should get 1 entry
readAndCheck(cursor, 10, 500, 1);
}
Expand Down Expand Up @@ -3914,13 +3917,15 @@ public void testReadEntriesOrWaitWithMaxSize() throws Exception {
ledger.addEntry(new byte[1024]);
}

// First time, since we don't have info, we'll get 1 single entry
List<Entry> entries = c.readEntriesOrWait(10, 3 * 1024);
assertEquals(entries.size(), 1);
// Since https://github.com/apache/pulsar/pull/23931 improved the performance of delivery, the consumer
// will get more messages than before(it only receives 1 messages at the first delivery),
int avg = (int) (1024 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
List<Entry> entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

// We should only return 3 entries, based on the max size
entries = c.readEntriesOrWait(10, 3 * 1024);
entries = c.readEntriesOrWait(10, 3 * avg);
assertEquals(entries.size(), 3);
entries.forEach(Entry::release);

Expand Down Expand Up @@ -5164,6 +5169,83 @@ public void findEntryFailed(ManagedLedgerException exception, Optional<Position>
assertEquals(positionRef4.get(), position4);
}

@Test
public void testEstimateEntryCountBySize() throws Exception {
final String mlName = "ml-" + UUID.randomUUID().toString().replaceAll("-", "");
ManagedLedgerImpl ml = (ManagedLedgerImpl) factory.open(mlName);
long entryCount0 =
ManagedCursorImpl.estimateEntryCountBySize(16, PositionFactory.create(ml.getCurrentLedger().getId(), 0), ml);
assertEquals(entryCount0, 1);
// Avoid trimming ledgers.
ml.openCursor("c1");

// Build data.
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1});
}
long ledger1 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2});
}
long ledger2 = ml.getCurrentLedger().getId();
ml.getCurrentLedger().close();
ml.ledgerClosed(ml.getCurrentLedger());
for (int i = 0; i < 100; i++) {
ml.addEntry(new byte[]{1, 2, 3, 4});
}
long ledger3 = ml.getCurrentLedger().getId();
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo1 = ml.getLedgersInfo().get(ledger1);
MLDataFormats.ManagedLedgerInfo.LedgerInfo ledgerInfo2 = ml.getLedgersInfo().get(ledger2);
long average1 = ledgerInfo1.getSize() / ledgerInfo1.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average2 = ledgerInfo2.getSize() / ledgerInfo2.getEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
long average3 = ml.getCurrentLedgerSize() / ml.getCurrentLedgerEntries() + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY;
assertEquals(average1, 1 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average2, 2 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);
assertEquals(average3, 4 + BOOKKEEPER_READ_OVERHEAD_PER_ENTRY);

// Test: the individual ledgers.
long entryCount1 =
ManagedCursorImpl.estimateEntryCountBySize(average1 * 16, PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount1, 16);
long entryCount2 =
ManagedCursorImpl.estimateEntryCountBySize(average2 * 8, PositionFactory.create(ledger2, 0), ml);
assertEquals(entryCount2, 8);
long entryCount3 =
ManagedCursorImpl.estimateEntryCountBySize(average3 * 4, PositionFactory.create(ledger3, 0), ml);
assertEquals(entryCount3, 4);

// Test: across ledgers.
long entryCount4 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 8), PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount4, 108);
long entryCount5 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 100) + (average3 * 4), PositionFactory.create(ledger2, 0), ml);
assertEquals(entryCount5, 104);
long entryCount6 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount6, 204);

long entryCount7 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 8), PositionFactory.create(ledger1, 80), ml);
assertEquals(entryCount7, 28);
long entryCount8 =
ManagedCursorImpl.estimateEntryCountBySize((average2 * 20) + (average3 * 4), PositionFactory.create(ledger2, 80), ml);
assertEquals(entryCount8, 24);
long entryCount9 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 20) + (average2 * 100) + (average3 * 4), PositionFactory.create(ledger1, 80), ml);
assertEquals(entryCount9, 124);

// Test: read more than entries written.
long entryCount10 =
ManagedCursorImpl.estimateEntryCountBySize((average1 * 100) + (average2 * 100) + (average3 * 100) + (average3 * 4) , PositionFactory.create(ledger1, 0), ml);
assertEquals(entryCount10, 304);

// cleanup.
ml.delete();
}

@Test
void testForceCursorRecovery() throws Exception {
TestPulsarMockBookKeeper bk = new TestPulsarMockBookKeeper(executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ public void testBatchMessageAck() {
.newConsumer()
.topic(topicName)
.subscriptionName(subscriptionName)
.receiverQueueSize(10)
.receiverQueueSize(50)
.subscriptionType(SubscriptionType.Shared)
.enableBatchIndexAcknowledgment(true)
.negativeAckRedeliveryDelay(100, TimeUnit.MILLISECONDS)
Expand Down Expand Up @@ -114,27 +114,29 @@ public void testBatchMessageAck() {
consumer.acknowledge(receive1);
consumer.acknowledge(receive2);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 18);
// Since https://github.com/apache/pulsar/pull/23931 improved the mechanism of estimate average entry size,
// broker will deliver much messages than before. So edit 18 -> 38 here.
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 38);
});
Message<byte[]> receive3 = consumer.receive();
Message<byte[]> receive4 = consumer.receive();
consumer.acknowledge(receive3);
consumer.acknowledge(receive4);
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
// Block cmd-flow send until verify finish. see: https://github.com/apache/pulsar/pull/17436.
consumer.pause();
Message<byte[]> receive5 = consumer.receive();
consumer.negativeAcknowledge(receive5);
Awaitility.await().pollInterval(1, TimeUnit.MILLISECONDS).untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 0);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 20);
});
// Unblock cmd-flow.
consumer.resume();
consumer.receive();
Awaitility.await().untilAsserted(() -> {
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 16);
assertEquals(dispatcher.getConsumers().get(0).getUnackedMessages(), 36);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -445,8 +445,13 @@ public void testAvgMessagesPerEntry() throws Exception {
.batchingMaxPublishDelay(5, TimeUnit.SECONDS)
.batchingMaxBytes(Integer.MAX_VALUE)
.create();

producer.send("first-message");
// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
for (int i = 0; i < 20; i++) {
producer.send("first-message");
}
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
List<CompletableFuture<MessageId>> futures = new ArrayList<>();
for (int i = 0; i < 20; i++) {
futures.add(producer.sendAsync("message"));
Expand Down Expand Up @@ -480,6 +485,7 @@ public void testAvgMessagesPerEntry() throws Exception {
metadataConsumer.put("matchValueReschedule", "producer2");
@Cleanup
Consumer<String> consumer = pulsarClient.newConsumer(Schema.STRING).topic(topic).properties(metadataConsumer)
.receiverQueueSize(20)
.subscriptionName(subName).subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

int counter = 0;
Expand All @@ -494,14 +500,17 @@ public void testAvgMessagesPerEntry() throws Exception {
}
}

assertEquals(21, counter);
assertEquals(40, counter);

ConsumerStats consumerStats =
admin.topics().getStats(topic).getSubscriptions().get(subName).getConsumers().get(0);

assertEquals(21, consumerStats.getMsgOutCounter());
assertEquals(40, consumerStats.getMsgOutCounter());

// Math.round(1 * 0.9 + 0.1 * (20 / 1))
// The first messages deliver: 20 msgs.
// Average of "messages per batch" is "1".
// The second messages deliver: 20 msgs.
// Average of "messages per batch" is "Math.round(1 * 0.9 + 20 * 0.1) = 2.9 ~ 3".
int avgMessagesPerEntry = consumerStats.getAvgMessagesPerEntry();
assertEquals(3, avgMessagesPerEntry);
}
Expand Down
Loading