Skip to content

Commit a3c94fd

Browse files
MMeenttristan957
authored andcommitted
[PG14] Feature/replicas (#278)
* Recovery requirements: Add condition variable for WAL recovery; allowing backends to wait for recovery up to some record pointer. * Fix issues w.r.t. WAL when LwLsn is initiated and when recovery starts. This fixes some test failures that showed up after updating Neon code to do more precise handling of replica's get_page_at_lsn's request_lsn lsns. --------- Co-authored-by: Matthias van de Meent <[email protected]>
1 parent 48c2277 commit a3c94fd

File tree

3 files changed

+80
-5
lines changed

3 files changed

+80
-5
lines changed

src/backend/access/transam/xlog.c

+75-5
Original file line numberDiff line numberDiff line change
@@ -754,6 +754,7 @@ typedef struct XLogCtlData
754754
TimeLineID lastReplayedTLI;
755755
XLogRecPtr replayEndRecPtr;
756756
TimeLineID replayEndTLI;
757+
ConditionVariable replayProgressCV;
757758
/* timestamp of last COMMIT/ABORT record replayed (or being replayed) */
758759
TimestampTz recoveryLastXTime;
759760

@@ -5340,9 +5341,67 @@ XLOGShmemInit(void)
53405341
SpinLockInit(&XLogCtl->info_lck);
53415342
SpinLockInit(&XLogCtl->ulsn_lck);
53425343
InitSharedLatch(&XLogCtl->recoveryWakeupLatch);
5344+
ConditionVariableInit(&XLogCtl->replayProgressCV);
53435345
ConditionVariableInit(&XLogCtl->recoveryNotPausedCV);
53445346
}
53455347

5348+
/*
5349+
* Wait for recovery to complete replaying all WAL up to and including
5350+
* redoEndRecPtr.
5351+
*
5352+
* This gets woken up for every WAL record replayed, so make sure you're not
5353+
* trying to wait an LSN that is too far in the future.
5354+
*/
5355+
void
5356+
XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr)
5357+
{
5358+
static XLogRecPtr replayRecPtr = 0;
5359+
5360+
if (!RecoveryInProgress())
5361+
return;
5362+
5363+
/*
5364+
* Check the backend-local variable first, we may be able to skip accessing
5365+
* shared memory (which requires locking)
5366+
*/
5367+
if (redoEndRecPtr <= replayRecPtr)
5368+
return;
5369+
5370+
replayRecPtr = GetXLogReplayRecPtr(NULL);
5371+
5372+
/*
5373+
* Check again if we're going to need to wait, now that we've updated
5374+
* the local cached variable.
5375+
*/
5376+
if (redoEndRecPtr <= replayRecPtr)
5377+
return;
5378+
5379+
/*
5380+
* We need to wait for the variable, so prepare for that.
5381+
*
5382+
* Note: This wakes up every time a WAL record is replayed, so this can
5383+
* be expensive.
5384+
*/
5385+
ConditionVariablePrepareToSleep(&XLogCtl->replayProgressCV);
5386+
5387+
while (redoEndRecPtr > replayRecPtr)
5388+
{
5389+
bool timeout;
5390+
timeout = ConditionVariableTimedSleep(&XLogCtl->replayProgressCV,
5391+
10000000,
5392+
WAIT_EVENT_RECOVERY_WAL_STREAM);
5393+
5394+
if (timeout)
5395+
ereport(LOG,
5396+
(errmsg("Waiting for recovery to catch up to %X/%X",
5397+
LSN_FORMAT_ARGS(redoEndRecPtr))));
5398+
else
5399+
replayRecPtr = GetXLogReplayRecPtr(NULL);
5400+
}
5401+
5402+
ConditionVariableCancelSleep();
5403+
}
5404+
53465405
/*
53475406
* This func must be called ONCE on system install. It creates pg_control
53485407
* and the initial XLOG segment.
@@ -7265,6 +7324,14 @@ StartupXLOG(void)
72657324
abortedRecPtr = InvalidXLogRecPtr;
72667325
missingContrecPtr = InvalidXLogRecPtr;
72677326

7327+
/*
7328+
* Setup last written lsn cache, max written LSN.
7329+
* Starting from here, we could be modifying pages through REDO, which requires
7330+
* the existance of maxLwLsn + LwLsn LRU.
7331+
*/
7332+
XLogCtl->maxLastWrittenLsn = RedoRecPtr;
7333+
dlist_init(&XLogCtl->lastWrittenLsnLRU);
7334+
72687335
/* REDO */
72697336
if (InRecovery)
72707337
{
@@ -7772,6 +7839,8 @@ StartupXLOG(void)
77727839
WalSndWakeup();
77737840
}
77747841

7842+
ConditionVariableBroadcast(&XLogCtl->replayProgressCV);
7843+
77757844
/* Exit loop if we reached inclusive recovery target */
77767845
if (recoveryStopsAfter(xlogreader))
77777846
{
@@ -8167,8 +8236,6 @@ StartupXLOG(void)
81678236

81688237
XLogCtl->LogwrtRqst.Write = EndOfLog;
81698238
XLogCtl->LogwrtRqst.Flush = EndOfLog;
8170-
XLogCtl->maxLastWrittenLsn = EndOfLog;
8171-
dlist_init(&XLogCtl->lastWrittenLsnLRU);
81728239

81738240
LocalSetXLogInsertAllowed();
81748241

@@ -10978,11 +11045,14 @@ xlog_redo(XLogReaderState *record)
1097811045
XLogRedoAction result;
1097911046

1098011047
result = XLogReadBufferForRedo(record, block_id, &buffer);
10981-
if (result == BLK_DONE && !IsUnderPostmaster)
11048+
if (result == BLK_DONE && (!IsUnderPostmaster || StandbyMode))
1098211049
{
1098311050
/*
10984-
* In the special WAL process, blocks that are being ignored
10985-
* return BLK_DONE. Accept that.
11051+
* NEON: In the special WAL redo process, blocks that are being
11052+
* ignored return BLK_DONE. Accept that.
11053+
* Additionally, in standby mode, blocks that are not present
11054+
* in shared buffers are ignored during replay, so we also
11055+
* ignore those blocks.
1098611056
*/
1098711057
}
1098811058
else if (result != BLK_RESTORED)

src/include/access/xlog.h

+1
Original file line numberDiff line numberDiff line change
@@ -323,6 +323,7 @@ extern bool HotStandbyActive(void);
323323
extern bool HotStandbyActiveInReplay(void);
324324
extern bool XLogInsertAllowed(void);
325325
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
326+
extern void XLogWaitForReplayOf(XLogRecPtr redoEndRecPtr);
326327
extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *replayTLI);
327328
extern XLogRecPtr GetXLogInsertRecPtr(void);
328329
extern XLogRecPtr GetXLogWriteRecPtr(void);

src/include/access/xlogutils.h

+4
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,10 @@ typedef enum
3333
* need to be replayed) */
3434
} XLogRedoAction;
3535

36+
/*
37+
* Returns true if we shouldn't do REDO on that block in record indicated by
38+
* block_id; false otherwise.
39+
*/
3640
extern bool (*redo_read_buffer_filter) (XLogReaderState *record, uint8 block_id);
3741

3842
extern XLogRedoAction XLogReadBufferForRedo(XLogReaderState *record,

0 commit comments

Comments
 (0)