Skip to content

Commit 1b8c35b

Browse files
knizhnikMMeent
authored andcommitted
Implement efficient prefetch for parallel bitmap heap scan (#258)
* Implement efficient prefetch for parallel bitmap heap scan * Change MAX_IO_CONCURRENCY to be power of 2
1 parent 5476da2 commit 1b8c35b

File tree

3 files changed

+80
-134
lines changed

3 files changed

+80
-134
lines changed

src/backend/executor/nodeBitmapHeapscan.c

+64-130
Original file line numberDiff line numberDiff line change
@@ -150,6 +150,8 @@ BitmapHeapNext(BitmapHeapScanState *node)
150150
*/
151151
pstate->tbmiterator = tbm_prepare_shared_iterate(tbm);
152152
#ifdef USE_PREFETCH
153+
node->n_prefetch_requests = 0;
154+
node->prefetch_request_pos = 0;
153155
if (node->prefetch_maximum > 0)
154156
{
155157
pstate->prefetch_iterator =
@@ -173,13 +175,6 @@ BitmapHeapNext(BitmapHeapScanState *node)
173175
tbm_attach_shared_iterate(dsa, pstate->tbmiterator);
174176
node->tbmres = tbmres = NULL;
175177

176-
#ifdef USE_PREFETCH
177-
if (node->prefetch_maximum > 0)
178-
{
179-
node->shared_prefetch_iterator =
180-
tbm_attach_shared_iterate(dsa, pstate->prefetch_iterator);
181-
}
182-
#endif /* USE_PREFETCH */
183178
}
184179
node->initialized = true;
185180
}
@@ -198,15 +193,24 @@ BitmapHeapNext(BitmapHeapScanState *node)
198193
if (!pstate)
199194
node->tbmres = tbmres = tbm_iterate(tbmiterator);
200195
else
201-
node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
196+
{
197+
if (node->n_prefetch_requests != 0)
198+
{
199+
node->tbmres = tbmres = (TBMIterateResult *)&node->prefetch_requests[node->prefetch_request_pos];
200+
node->n_prefetch_requests -= 1;
201+
node->prefetch_request_pos = (node->prefetch_request_pos + 1) % MAX_IO_CONCURRENCY;
202+
if (node->prefetch_pages != 0)
203+
node->prefetch_pages -= 1;
204+
}
205+
else
206+
node->tbmres = tbmres = tbm_shared_iterate(shared_tbmiterator);
207+
}
202208
if (tbmres == NULL)
203209
{
204210
/* no more entries in the bitmap */
205211
break;
206212
}
207213

208-
BitmapAdjustPrefetchIterator(node, tbmres);
209-
210214
/*
211215
* We can skip fetching the heap page if we don't need any fields
212216
* from the heap, and the bitmap entries don't need rechecking,
@@ -361,54 +365,21 @@ BitmapAdjustPrefetchIterator(BitmapHeapScanState *node,
361365
TBMIterateResult *tbmres)
362366
{
363367
#ifdef USE_PREFETCH
364-
ParallelBitmapHeapState *pstate = node->pstate;
368+
TBMIterator *prefetch_iterator = node->prefetch_iterator;
369+
Assert(node->pstate == NULL);
365370

366-
if (pstate == NULL)
371+
if (node->prefetch_pages > 0)
367372
{
368-
TBMIterator *prefetch_iterator = node->prefetch_iterator;
369-
370-
if (node->prefetch_pages > 0)
371-
{
372-
/* The main iterator has closed the distance by one page */
373-
node->prefetch_pages--;
374-
}
375-
else if (prefetch_iterator)
376-
{
377-
/* Do not let the prefetch iterator get behind the main one */
378-
TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
379-
380-
if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
381-
elog(ERROR, "prefetch and main iterators are out of sync");
382-
}
383-
return;
373+
/* The main iterator has closed the distance by one page */
374+
node->prefetch_pages--;
384375
}
385-
386-
if (node->prefetch_maximum > 0)
376+
else if (prefetch_iterator)
387377
{
388-
TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
378+
/* Do not let the prefetch iterator get behind the main one */
379+
TBMIterateResult *tbmpre = tbm_iterate(prefetch_iterator);
389380

390-
SpinLockAcquire(&pstate->mutex);
391-
if (pstate->prefetch_pages > 0)
392-
{
393-
pstate->prefetch_pages--;
394-
SpinLockRelease(&pstate->mutex);
395-
}
396-
else
397-
{
398-
/* Release the mutex before iterating */
399-
SpinLockRelease(&pstate->mutex);
400-
401-
/*
402-
* In case of shared mode, we can not ensure that the current
403-
* blockno of the main iterator and that of the prefetch iterator
404-
* are same. It's possible that whatever blockno we are
405-
* prefetching will be processed by another process. Therefore,
406-
* we don't validate the blockno here as we do in non-parallel
407-
* case.
408-
*/
409-
if (prefetch_iterator)
410-
tbm_shared_iterate(prefetch_iterator);
411-
}
381+
if (tbmpre == NULL || tbmpre->blockno != tbmres->blockno)
382+
elog(ERROR, "prefetch and main iterators are out of sync");
412383
}
413384
#endif /* USE_PREFETCH */
414385
}
@@ -425,35 +396,14 @@ static inline void
425396
BitmapAdjustPrefetchTarget(BitmapHeapScanState *node)
426397
{
427398
#ifdef USE_PREFETCH
428-
ParallelBitmapHeapState *pstate = node->pstate;
429-
430-
if (pstate == NULL)
431-
{
432-
if (node->prefetch_target >= node->prefetch_maximum)
433-
/* don't increase any further */ ;
434-
else if (node->prefetch_target >= node->prefetch_maximum / 2)
435-
node->prefetch_target = node->prefetch_maximum;
436-
else if (node->prefetch_target > 0)
437-
node->prefetch_target *= 2;
438-
else
439-
node->prefetch_target++;
440-
return;
441-
}
442-
443-
/* Do an unlocked check first to save spinlock acquisitions. */
444-
if (pstate->prefetch_target < node->prefetch_maximum)
445-
{
446-
SpinLockAcquire(&pstate->mutex);
447-
if (pstate->prefetch_target >= node->prefetch_maximum)
448-
/* don't increase any further */ ;
449-
else if (pstate->prefetch_target >= node->prefetch_maximum / 2)
450-
pstate->prefetch_target = node->prefetch_maximum;
451-
else if (pstate->prefetch_target > 0)
452-
pstate->prefetch_target *= 2;
453-
else
454-
pstate->prefetch_target++;
455-
SpinLockRelease(&pstate->mutex);
456-
}
399+
if (node->prefetch_target >= node->prefetch_maximum)
400+
/* don't increase any further */ ;
401+
else if (node->prefetch_target >= node->prefetch_maximum / 2)
402+
node->prefetch_target = node->prefetch_maximum;
403+
else if (node->prefetch_target > 0)
404+
node->prefetch_target *= 2;
405+
else
406+
node->prefetch_target++;
457407
#endif /* USE_PREFETCH */
458408
}
459409

@@ -507,56 +457,46 @@ BitmapPrefetch(BitmapHeapScanState *node, TableScanDesc scan)
507457
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
508458
}
509459
}
510-
511-
return;
512460
}
513-
514-
if (pstate->prefetch_pages < pstate->prefetch_target)
461+
else
515462
{
516-
TBMSharedIterator *prefetch_iterator = node->shared_prefetch_iterator;
517-
518-
if (prefetch_iterator)
463+
while (1)
519464
{
520-
while (1)
521-
{
522-
TBMIterateResult *tbmpre;
523-
bool do_prefetch = false;
524-
bool skip_fetch;
465+
TBMIterateResult *tbmpre;
466+
bool do_prefetch = false;
467+
bool skip_fetch;
525468

526-
/*
527-
* Recheck under the mutex. If some other process has already
528-
* done enough prefetching then we need not to do anything.
529-
*/
530-
SpinLockAcquire(&pstate->mutex);
531-
if (pstate->prefetch_pages < pstate->prefetch_target)
532-
{
533-
pstate->prefetch_pages++;
534-
do_prefetch = true;
535-
}
536-
SpinLockRelease(&pstate->mutex);
469+
if (node->prefetch_pages < node->prefetch_target)
470+
{
471+
Assert(node->n_prefetch_requests < MAX_IO_CONCURRENCY);
472+
node->prefetch_pages++;
473+
do_prefetch = true;
474+
}
537475

538-
if (!do_prefetch)
539-
return;
476+
if (!do_prefetch)
477+
return;
540478

541-
tbmpre = tbm_shared_iterate(prefetch_iterator);
542-
if (tbmpre == NULL)
543-
{
544-
/* No more pages to prefetch */
545-
tbm_end_shared_iterate(prefetch_iterator);
546-
node->shared_prefetch_iterator = NULL;
547-
break;
548-
}
479+
tbmpre = tbm_shared_iterate(node->shared_tbmiterator);
480+
if (tbmpre != NULL)
481+
{
482+
memcpy(&node->prefetch_requests[(node->prefetch_request_pos + node->n_prefetch_requests) % MAX_IO_CONCURRENCY], tbmpre, sizeof(TBMIteratePrefetchResult));
483+
node->n_prefetch_requests += 1;
484+
}
485+
else
486+
{
487+
/* No more pages to prefetch */
488+
break;
489+
}
549490

550-
/* As above, skip prefetch if we expect not to need page */
551-
skip_fetch = (node->can_skip_fetch &&
552-
(node->tbmres ? !node->tbmres->recheck : false) &&
553-
VM_ALL_VISIBLE(node->ss.ss_currentRelation,
554-
tbmpre->blockno,
555-
&node->pvmbuffer));
491+
/* As above, skip prefetch if we expect not to need page */
492+
skip_fetch = (node->can_skip_fetch &&
493+
(node->tbmres ? !node->tbmres->recheck : false) &&
494+
VM_ALL_VISIBLE(node->ss.ss_currentRelation,
495+
tbmpre->blockno,
496+
&node->pvmbuffer));
556497

557-
if (!skip_fetch)
558-
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
559-
}
498+
if (!skip_fetch)
499+
PrefetchBuffer(scan->rs_rd, MAIN_FORKNUM, tbmpre->blockno);
560500
}
561501
}
562502
#endif /* USE_PREFETCH */
@@ -613,8 +553,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
613553
tbm_end_iterate(node->prefetch_iterator);
614554
if (node->shared_tbmiterator)
615555
tbm_end_shared_iterate(node->shared_tbmiterator);
616-
if (node->shared_prefetch_iterator)
617-
tbm_end_shared_iterate(node->shared_prefetch_iterator);
618556
if (node->tbm)
619557
tbm_free(node->tbm);
620558
if (node->vmbuffer != InvalidBuffer)
@@ -627,7 +565,6 @@ ExecReScanBitmapHeapScan(BitmapHeapScanState *node)
627565
node->prefetch_iterator = NULL;
628566
node->initialized = false;
629567
node->shared_tbmiterator = NULL;
630-
node->shared_prefetch_iterator = NULL;
631568
node->vmbuffer = InvalidBuffer;
632569
node->pvmbuffer = InvalidBuffer;
633570

@@ -683,8 +620,6 @@ ExecEndBitmapHeapScan(BitmapHeapScanState *node)
683620
tbm_free(node->tbm);
684621
if (node->shared_tbmiterator)
685622
tbm_end_shared_iterate(node->shared_tbmiterator);
686-
if (node->shared_prefetch_iterator)
687-
tbm_end_shared_iterate(node->shared_prefetch_iterator);
688623
if (node->vmbuffer != InvalidBuffer)
689624
ReleaseBuffer(node->vmbuffer);
690625
if (node->pvmbuffer != InvalidBuffer)
@@ -739,7 +674,6 @@ ExecInitBitmapHeapScan(BitmapHeapScan *node, EState *estate, int eflags)
739674
scanstate->pscan_len = 0;
740675
scanstate->initialized = false;
741676
scanstate->shared_tbmiterator = NULL;
742-
scanstate->shared_prefetch_iterator = NULL;
743677
scanstate->pstate = NULL;
744678

745679
/*

src/include/nodes/execnodes.h

+14-2
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
#include "nodes/plannodes.h"
2424
#include "nodes/tidbitmap.h"
2525
#include "partitioning/partdefs.h"
26+
#include "storage/bufmgr.h"
2627
#include "storage/condition_variable.h"
2728
#include "utils/hsearch.h"
2829
#include "utils/queryenvironment.h"
@@ -1624,6 +1625,15 @@ typedef struct ParallelBitmapHeapState
16241625
char phs_snapshot_data[FLEXIBLE_ARRAY_MEMBER];
16251626
} ParallelBitmapHeapState;
16261627

1628+
typedef struct TBMIteratePrefetchResult
1629+
{
1630+
BlockNumber blockno; /* page number containing tuples */
1631+
int ntuples; /* -1 indicates lossy result */
1632+
bool recheck; /* should the tuples be rechecked? */
1633+
/* Note: recheck is always true if ntuples < 0 */
1634+
OffsetNumber offsets[MaxHeapTuplesPerPage];
1635+
} TBMIteratePrefetchResult;
1636+
16271637
/* ----------------
16281638
* BitmapHeapScanState information
16291639
*
@@ -1644,7 +1654,6 @@ typedef struct ParallelBitmapHeapState
16441654
* pscan_len size of the shared memory for parallel bitmap
16451655
* initialized is node is ready to iterate
16461656
* shared_tbmiterator shared iterator
1647-
* shared_prefetch_iterator shared iterator for prefetching
16481657
* pstate shared state for parallel bitmap scan
16491658
* ----------------
16501659
*/
@@ -1668,7 +1677,10 @@ typedef struct BitmapHeapScanState
16681677
Size pscan_len;
16691678
bool initialized;
16701679
TBMSharedIterator *shared_tbmiterator;
1671-
TBMSharedIterator *shared_prefetch_iterator;
1680+
/* parallel worker private ring buffer with prefetch requests: it allows to access prefetch result from the same worker */
1681+
TBMIteratePrefetchResult prefetch_requests[MAX_IO_CONCURRENCY];
1682+
int n_prefetch_requests; /* number of used elements in prefetch_requests ring buffer */
1683+
int prefetch_request_pos; /* head position in ring buffer */
16721684
ParallelBitmapHeapState *pstate;
16731685
} BitmapHeapScanState;
16741686

src/include/storage/bufmgr.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -86,8 +86,8 @@ extern PGDLLIMPORT int NLocBuffer;
8686
extern PGDLLIMPORT Block *LocalBufferBlockPointers;
8787
extern PGDLLIMPORT int32 *LocalRefCount;
8888

89-
/* upper limit for effective_io_concurrency */
90-
#define MAX_IO_CONCURRENCY 1000
89+
/* upper limit for effective_io_concurrency (better to he power of 2) */
90+
#define MAX_IO_CONCURRENCY 1024
9191

9292
/* special block number for ReadBuffer() */
9393
#define P_NEW InvalidBlockNumber /* grow the file to get a new page */

0 commit comments

Comments
 (0)