Skip to content

Commit

Permalink
[improve][broker] Decouple pulsar_storage_backlog_age_seconds metric …
Browse files Browse the repository at this point in the history
…from backlogQuota policy
  • Loading branch information
shibd committed Nov 21, 2024
1 parent 49aa308 commit 5bbca7c
Show file tree
Hide file tree
Showing 4 changed files with 217 additions and 120 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2245,29 +2245,31 @@ public BacklogQuotaManager getBacklogQuotaManager() {
public void monitorBacklogQuota() {
long startTimeMillis = System.currentTimeMillis();
forEachPersistentTopic(topic -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded().thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
topic.updateOldPositionInfo().thenAccept(__ -> {
if (topic.isSizeBacklogExceeded()) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.destination_storage, false);
} else {
topic.checkTimeBacklogExceeded(false).thenAccept(isExceeded -> {
if (isExceeded) {
getBacklogQuotaManager().handleExceededBacklogQuota(topic,
BacklogQuota.BacklogQuotaType.message_age,
pulsar.getConfiguration().isPreciseTimeBasedBacklogQuotaCheck());
} else {
if (log.isDebugEnabled()) {
log.debug("quota not exceeded for [{}]", topic.getName());
}
}
}
}).exceptionally(throwable -> {
log.error("Error when checkTimeBacklogExceeded({}) in monitorBacklogQuota",
});
}
}).whenComplete((unused, throwable) -> {
if (throwable != null) {
log.error("Error when checkBacklogQuota({}) in monitorBacklogQuota",
topic.getName(), throwable);
return null;
}).whenComplete((unused, throwable) -> {
backlogQuotaCheckDuration.observe(
MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
});
}
}
backlogQuotaCheckDuration.observe(
MILLISECONDS.toSeconds(System.currentTimeMillis() - startTimeMillis));
});
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -295,14 +295,14 @@ protected TopicStatsHelper initialValue() {
PERSISTENT_TOPIC_ATTRIBUTES_FIELD_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
PersistentTopic.class, PersistentTopicAttributes.class, "persistentTopicAttributes");

private volatile TimeBasedBacklogQuotaCheckResult timeBasedBacklogQuotaCheckResult;
private static final AtomicReferenceFieldUpdater<PersistentTopic, TimeBasedBacklogQuotaCheckResult>
private volatile OldestPositionInfo oldestPositionInfo;
private static final AtomicReferenceFieldUpdater<PersistentTopic, OldestPositionInfo>
TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER = AtomicReferenceFieldUpdater.newUpdater(
PersistentTopic.class,
TimeBasedBacklogQuotaCheckResult.class,
"timeBasedBacklogQuotaCheckResult");
OldestPositionInfo.class,
"oldestPositionInfo");
@Value
private static class TimeBasedBacklogQuotaCheckResult {
private static class OldestPositionInfo {
Position oldestCursorMarkDeletePosition;
String cursorName;
long positionPublishTimestampInMillis;
Expand Down Expand Up @@ -2634,12 +2634,11 @@ public CompletableFuture<? extends TopicStatsImpl> asyncGetStats(GetStatsOptions
stats.backlogQuotaLimitSize = getBacklogQuota(BacklogQuotaType.destination_storage).getLimitSize();
stats.backlogQuotaLimitTime = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

TimeBasedBacklogQuotaCheckResult backlogQuotaCheckResult = timeBasedBacklogQuotaCheckResult;
stats.oldestBacklogMessageAgeSeconds = getBestEffortOldestUnacknowledgedMessageAgeSeconds();
stats.oldestBacklogMessageSubscriptionName = (backlogQuotaCheckResult == null)
stats.oldestBacklogMessageSubscriptionName = (oldestPositionInfo == null)
|| !hasBacklogs(getStatsOptions.isGetPreciseBacklog())
? null
: backlogQuotaCheckResult.getCursorName();
: oldestPositionInfo.getCursorName();

stats.compaction.reset();
mxBean.flatMap(bean -> bean.getCompactionRecordForTopic(topic)).map(compactionRecord -> {
Expand Down Expand Up @@ -3425,7 +3424,7 @@ public CompletableFuture<Void> checkBacklogQuotaExceeded(String producerName, Ba
return FutureUtil.failedFuture(new TopicBacklogQuotaExceededException(retentionPolicy));
}
if (backlogQuotaType == BacklogQuotaType.message_age) {
return checkTimeBacklogExceeded().thenCompose(isExceeded -> {
return checkTimeBacklogExceeded(true).thenCompose(isExceeded -> {
if (isExceeded) {
log.debug("[{}] Time backlog quota exceeded. Cannot create producer [{}]", this.getName(),
producerName);
Expand Down Expand Up @@ -3466,16 +3465,15 @@ public long getBestEffortOldestUnacknowledgedMessageAgeSeconds() {
if (!hasBacklogs(false)) {
return 0;
}
TimeBasedBacklogQuotaCheckResult result = timeBasedBacklogQuotaCheckResult;
if (result == null) {
if (oldestPositionInfo == null) {
return -1;
} else {
return TimeUnit.MILLISECONDS.toSeconds(
Clock.systemUTC().millis() - result.getPositionPublishTimestampInMillis());
Clock.systemUTC().millis() - oldestPositionInfo.getPositionPublishTimestampInMillis());
}
}

private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult) {
private void updateResultIfNewer(OldestPositionInfo updatedResult) {
TIME_BASED_BACKLOG_QUOTA_CHECK_RESULT_UPDATER.updateAndGet(this,
existingResult -> {
if (existingResult == null
Expand All @@ -3489,74 +3487,46 @@ private void updateResultIfNewer(TimeBasedBacklogQuotaCheckResult updatedResult)

}

/**
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
*/
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
public CompletableFuture<Void> updateOldPositionInfo() {
TopicName topicName = TopicName.get(getName());
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();
if (log.isDebugEnabled()) {
log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond);
}

// If backlog quota by time is not set
if (backlogQuotaLimitInSecond <= 0) {
return CompletableFuture.completedFuture(false);
}

ManagedCursorContainer managedCursorContainer = (ManagedCursorContainer) ledger.getCursors();
CursorInfo oldestMarkDeleteCursorInfo = managedCursorContainer.getCursorWithOldestPosition();

// If we have no durable cursor since `ledger.getCursors()` only managed durable cursors
if (oldestMarkDeleteCursorInfo == null
|| oldestMarkDeleteCursorInfo.getPosition() == null) {
if (oldestMarkDeleteCursorInfo == null || oldestMarkDeleteCursorInfo.getPosition() == null) {
if (log.isDebugEnabled()) {
log.debug("[{}] No durable cursor found. Skipping time based backlog quota check."
+ " Oldest mark-delete cursor info: {}", topicName, oldestMarkDeleteCursorInfo);
log.debug("[{}] No durable cursor found. Skip update old position info.", topicName);
}
return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(null);
}

Position oldestMarkDeletePosition = oldestMarkDeleteCursorInfo.getPosition();

TimeBasedBacklogQuotaCheckResult lastCheckResult = timeBasedBacklogQuotaCheckResult;
if (lastCheckResult != null
&& oldestMarkDeletePosition.compareTo(lastCheckResult.getOldestCursorMarkDeletePosition()) == 0) {

OldestPositionInfo lastOldestPositionInfo = oldestPositionInfo;
if (lastOldestPositionInfo != null
&& oldestMarkDeletePosition.compareTo(lastOldestPositionInfo.getOldestCursorMarkDeletePosition()) == 0) {
// Same position, but the cursor causing it has changed?
if (!lastCheckResult.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) {
final TimeBasedBacklogQuotaCheckResult updatedResult = new TimeBasedBacklogQuotaCheckResult(
lastCheckResult.getOldestCursorMarkDeletePosition(),
if (!lastOldestPositionInfo.getCursorName().equals(oldestMarkDeleteCursorInfo.getCursor().getName())) {
updateResultIfNewer(new OldestPositionInfo(
lastOldestPositionInfo.getOldestCursorMarkDeletePosition(),
oldestMarkDeleteCursorInfo.getCursor().getName(),
lastCheckResult.getPositionPublishTimestampInMillis(),
oldestMarkDeleteCursorInfo.getVersion());

updateResultIfNewer(updatedResult);
lastOldestPositionInfo.getPositionPublishTimestampInMillis(),
oldestMarkDeleteCursorInfo.getVersion()));
if (log.isDebugEnabled()) {
log.debug("[{}] Time-based backlog quota check. Updating cached result for position {}, "
+ "since cursor causing it has changed from {} to {}",
log.debug("[{}] Updating cached old position info {}, "
+ "since cursor causing it has changed from {} to {}",
topicName,
oldestMarkDeletePosition,
lastCheckResult.getCursorName(),
lastOldestPositionInfo.getCursorName(),
oldestMarkDeleteCursorInfo.getCursor().getName());
}
}

long entryTimestamp = lastCheckResult.getPositionPublishTimestampInMillis();
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (log.isDebugEnabled()) {
log.debug("[{}] Time based backlog quota check. Using cache result for position {}. "
+ "Entry timestamp: {}, expired: {}",
topicName, oldestMarkDeletePosition, entryTimestamp, expired);
}
return CompletableFuture.completedFuture(expired);
return CompletableFuture.completedFuture(null);
}

if (brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck()) {
if (!hasBacklogs(true)) {
return CompletableFuture.completedFuture(false);
return CompletableFuture.completedFuture(null);
}
CompletableFuture<Boolean> future = new CompletableFuture<>();
CompletableFuture<Void> future = new CompletableFuture<>();
// Check if first unconsumed message(first message after mark delete position)
// for slowest cursor's has expired.
Position position = ledger.getNextValidPosition(oldestMarkDeletePosition);
Expand All @@ -3566,71 +3536,84 @@ public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
public void readEntryComplete(Entry entry, Object ctx) {
try {
long entryTimestamp = Commands.getEntryTimestamp(entry.getDataBuffer());

updateResultIfNewer(
new TimeBasedBacklogQuotaCheckResult(
oldestMarkDeleteCursorInfo.getPosition(),
oldestMarkDeleteCursorInfo.getCursor().getName(),
entryTimestamp,
oldestMarkDeleteCursorInfo.getVersion()));

boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
if (log.isDebugEnabled()) {
log.debug("[{}] Time based backlog quota check. Oldest unacked entry read from BK. "
+ "Oldest entry in cursor {}'s backlog: {}. "
+ "Oldest mark-delete position: {}. "
+ "Quota {}. Last check result position [{}]. "
+ "Expired: {}, entryTimestamp: {}",
topicName,
oldestMarkDeleteCursorInfo.getCursor().getName(),
position,
oldestMarkDeletePosition,
backlogQuotaLimitInSecond,
lastCheckResult.getOldestCursorMarkDeletePosition(),
expired,
entryTimestamp);
}
future.complete(expired);
new OldestPositionInfo(
oldestMarkDeleteCursorInfo.getPosition(),
oldestMarkDeleteCursorInfo.getCursor().getName(),
entryTimestamp,
oldestMarkDeleteCursorInfo.getVersion()));
future.complete(null);
} catch (Exception e) {
log.error("[{}][{}] Error deserializing message for backlog check", topicName, e);
future.complete(false);
log.error("[{}][{}] Error deserializing message for update old position", topicName, e);
future.completeExceptionally(e);
} finally {
entry.release();
}
}

@Override
public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
log.error("[{}][{}] Error reading entry for precise time based backlog check",
log.error("[{}][{}] Error reading entry for precise update old position",
topicName, exception);
future.complete(false);
future.completeExceptionally(exception);
}
}, null);
return future;
} else {
if (!hasBacklogs(false)) {
return CompletableFuture.completedFuture(null);
}
try {
if (!hasBacklogs(false)) {
return CompletableFuture.completedFuture(false);
}
EstimateTimeBasedBacklogQuotaCheckResult checkResult =
estimatedTimeBasedBacklogQuotaCheck(oldestMarkDeletePosition);
if (checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp() != null) {
updateResultIfNewer(
new TimeBasedBacklogQuotaCheckResult(
oldestMarkDeleteCursorInfo.getPosition(),
oldestMarkDeleteCursorInfo.getCursor().getName(),
checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
oldestMarkDeleteCursorInfo.getVersion()));
new OldestPositionInfo(
oldestMarkDeleteCursorInfo.getPosition(),
oldestMarkDeleteCursorInfo.getCursor().getName(),
checkResult.getEstimatedOldestUnacknowledgedMessageTimestamp(),
oldestMarkDeleteCursorInfo.getVersion()));
}

return CompletableFuture.completedFuture(checkResult.isTruncateBacklogToMatchQuota());
return CompletableFuture.completedFuture(null);
} catch (Exception e) {
log.error("[{}][{}] Error reading entry for precise time based backlog check", topicName, e);
return CompletableFuture.completedFuture(false);
log.error("[{}][{}] Error reading entry for update old position", topicName, e);
return CompletableFuture.failedFuture(e);
}
}
}

/**
* @return determine if backlog quota enforcement needs to be done for topic based on time limit
*/
public CompletableFuture<Boolean> checkTimeBacklogExceeded(boolean shouldUpdateOldPositionInfo) {
TopicName topicName = TopicName.get(getName());
int backlogQuotaLimitInSecond = getBacklogQuota(BacklogQuotaType.message_age).getLimitTime();

if (log.isDebugEnabled()) {
log.debug("[{}] Time backlog quota = [{}]. Checking if exceeded.", topicName, backlogQuotaLimitInSecond);
}
CompletableFuture<Void> updateFuture = shouldUpdateOldPositionInfo ? updateOldPositionInfo()
: CompletableFuture.completedFuture(null);
return updateFuture.thenCompose(__ -> {
if (backlogQuotaLimitInSecond <= 0) {
return CompletableFuture.completedFuture(false);
}
if (oldestPositionInfo == null) {
return CompletableFuture.completedFuture(false);
}
if (!hasBacklogs(brokerService.pulsar().getConfiguration().isPreciseTimeBasedBacklogQuotaCheck())) {
return CompletableFuture.completedFuture(false);
}
long entryTimestamp = oldestPositionInfo.getPositionPublishTimestampInMillis();
boolean expired = MessageImpl.isEntryExpired(backlogQuotaLimitInSecond, entryTimestamp);
return CompletableFuture.completedFuture(expired);
}).exceptionally(e -> {
log.error("[{}][{}] Error checking time backlog exceeded", topicName, e);
return false;
});
}

private EstimateTimeBasedBacklogQuotaCheckResult estimatedTimeBasedBacklogQuotaCheck(
Position markDeletePosition)
throws ExecutionException, InterruptedException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public boolean isSizeBacklogExceeded() {
}

@Override
public CompletableFuture<Boolean> checkTimeBacklogExceeded() {
public CompletableFuture<Boolean> checkTimeBacklogExceeded(boolean shouldUpdateOldPositionInfo) {
return CompletableFuture.completedFuture(false);
}

Expand Down
Loading

0 comments on commit 5bbca7c

Please sign in to comment.