Add parallelism support for TID Range Scans
authorDavid Rowley <drowley@postgresql.org>
Thu, 27 Nov 2025 01:05:04 +0000 (14:05 +1300)
committerDavid Rowley <drowley@postgresql.org>
Thu, 27 Nov 2025 01:05:04 +0000 (14:05 +1300)
In v14, bb437f995 added support for scanning for ranges of TIDs using a
dedicated executor node for the purpose.  Here, we allow these scans to
be parallelized.  The range of blocks to scan is divvied up similarly to
how a Parallel Seq Scans does that, where 'chunks' of blocks are
allocated to each worker and the size of those chunks is slowly reduced
down to 1 block per worker by the time we're nearing the end of the
scan.  Doing that means workers finish at roughly the same time.

Allowing TID Range Scans to be parallelized removes the dilemma from the
planner as to whether a Parallel Seq Scan will cost less than a
non-parallel TID Range Scan due to the CPU concurrency of the Seq Scan
(disk costs are not divided by the number of workers).  It was possible
the planner could choose the Parallel Seq Scan which would result in
reading additional blocks during execution than the TID Scan would have.
Allowing Parallel TID Range Scans removes the trade-off the planner
makes when choosing between reduced CPU costs due to parallelism vs
additional I/O from the Parallel Seq Scan due to it scanning blocks from
outside of the required TID range.  There is also, of course, the
traditional parallelism performance benefits to be gained as well, which
likely doesn't need to be explained here.

Author: Cary Huang <cary.huang@highgo.ca>
Author: David Rowley <dgrowleyml@gmail.com>
Reviewed-by: Junwang Zhao <zhjwpku@gmail.com>
Reviewed-by: Rafia Sabih <rafia.pghackers@gmail.com>
Reviewed-by: Steven Niu <niushiji@gmail.com>
Discussion: https://postgr.es/m/18f2c002a24.11bc2ab825151706.3749144144619388582@highgo.ca

15 files changed:
doc/src/sgml/parallel.sgml
src/backend/access/heap/heapam.c
src/backend/access/table/tableam.c
src/backend/executor/execParallel.c
src/backend/executor/nodeTidrangescan.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/path/tidpath.c
src/backend/optimizer/util/pathnode.c
src/include/access/relscan.h
src/include/access/tableam.h
src/include/executor/nodeTidrangescan.h
src/include/nodes/execnodes.h
src/include/optimizer/pathnode.h
src/test/regress/expected/tidrangescan.out
src/test/regress/sql/tidrangescan.sql

index 1ce9abf86f52514b43f772e194f6868432d42df2..af43484703eb04618582b7db9e60460a13357b26 100644 (file)
@@ -299,6 +299,15 @@ EXPLAIN SELECT * FROM pgbench_accounts WHERE filler LIKE '%x%';
         within each worker process.
       </para>
     </listitem>
+    <listitem>
+      <para>
+        In a <emphasis>parallel tid range scan</emphasis>, the range of blocks
+        will be subdivided into smaller ranges which are shared among the
+        cooperating processes.  Each worker process will complete the scanning
+        of its given range of blocks before requesting an additional range of
+        blocks.
+      </para>
+    </listitem>
   </itemizedlist>
 
     Other scan types, such as scans of non-btree indexes, may support
index 4b0c49f4bb0b77cbe4e5ed29164d548eb6e76129..0a820bab87a45f9e85253aebe297976f95a1bed0 100644 (file)
@@ -258,7 +258,9 @@ heap_scan_stream_read_next_parallel(ReadStream *stream,
        /* parallel scan */
        table_block_parallelscan_startblock_init(scan->rs_base.rs_rd,
                                                 scan->rs_parallelworkerdata,
-                                                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel);
+                                                (ParallelBlockTableScanDesc) scan->rs_base.rs_parallel,
+                                                scan->rs_startblock,
+                                                scan->rs_numblocks);
 
        /* may return InvalidBlockNumber if there are no more blocks */
        scan->rs_prefetch_block = table_block_parallelscan_nextpage(scan->rs_base.rs_rd,
index 5e41404937eb11b5f0dd9c98edfeb006110304e8..1e099febdc8cacf34d60264dbe1c3e60eff895a2 100644 (file)
@@ -188,6 +188,37 @@ table_beginscan_parallel(Relation relation, ParallelTableScanDesc pscan)
                                            pscan, flags);
 }
 
+TableScanDesc
+table_beginscan_parallel_tidrange(Relation relation,
+                                 ParallelTableScanDesc pscan)
+{
+   Snapshot    snapshot;
+   uint32      flags = SO_TYPE_TIDRANGESCAN | SO_ALLOW_PAGEMODE;
+   TableScanDesc sscan;
+
+   Assert(RelFileLocatorEquals(relation->rd_locator, pscan->phs_locator));
+
+   /* disable syncscan in parallel tid range scan. */
+   pscan->phs_syncscan = false;
+
+   if (!pscan->phs_snapshot_any)
+   {
+       /* Snapshot was serialized -- restore it */
+       snapshot = RestoreSnapshot((char *) pscan + pscan->phs_snapshot_off);
+       RegisterSnapshot(snapshot);
+       flags |= SO_TEMP_SNAPSHOT;
+   }
+   else
+   {
+       /* SnapshotAny passed by caller (not serialized) */
+       snapshot = SnapshotAny;
+   }
+
+   sscan = relation->rd_tableam->scan_begin(relation, snapshot, 0, NULL,
+                                            pscan, flags);
+   return sscan;
+}
+
 
 /* ----------------------------------------------------------------------------
  * Index scan related functions.
@@ -398,6 +429,7 @@ table_block_parallelscan_initialize(Relation rel, ParallelTableScanDesc pscan)
        bpscan->phs_nblocks > NBuffers / 4;
    SpinLockInit(&bpscan->phs_mutex);
    bpscan->phs_startblock = InvalidBlockNumber;
+   bpscan->phs_numblock = InvalidBlockNumber;
    pg_atomic_init_u64(&bpscan->phs_nallocated, 0);
 
    return sizeof(ParallelBlockTableScanDescData);
@@ -416,14 +448,22 @@ table_block_parallelscan_reinitialize(Relation rel, ParallelTableScanDesc pscan)
  *
  * Determine where the parallel seq scan should start.  This function may be
  * called many times, once by each parallel worker.  We must be careful only
- * to set the startblock once.
+ * to set the phs_startblock and phs_numblock fields once.
+ *
+ * Callers may optionally specify a non-InvalidBlockNumber value for
+ * 'startblock' to force the scan to start at the given page.  Likewise,
+ * 'numblocks' can be specified as a non-InvalidBlockNumber to limit the
+ * number of blocks to scan to that many blocks.
  */
 void
 table_block_parallelscan_startblock_init(Relation rel,
                                         ParallelBlockTableScanWorker pbscanwork,
-                                        ParallelBlockTableScanDesc pbscan)
+                                        ParallelBlockTableScanDesc pbscan,
+                                        BlockNumber startblock,
+                                        BlockNumber numblocks)
 {
    BlockNumber sync_startpage = InvalidBlockNumber;
+   BlockNumber scan_nblocks;
 
    /* Reset the state we use for controlling allocation size. */
    memset(pbscanwork, 0, sizeof(*pbscanwork));
@@ -431,42 +471,36 @@ table_block_parallelscan_startblock_init(Relation rel,
    StaticAssertStmt(MaxBlockNumber <= 0xFFFFFFFE,
                     "pg_nextpower2_32 may be too small for non-standard BlockNumber width");
 
-   /*
-    * We determine the chunk size based on the size of the relation. First we
-    * split the relation into PARALLEL_SEQSCAN_NCHUNKS chunks but we then
-    * take the next highest power of 2 number of the chunk size.  This means
-    * we split the relation into somewhere between PARALLEL_SEQSCAN_NCHUNKS
-    * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
-    */
-   pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(pbscan->phs_nblocks /
-                                                      PARALLEL_SEQSCAN_NCHUNKS, 1));
-
-   /*
-    * Ensure we don't go over the maximum chunk size with larger tables. This
-    * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
-    * tables.  Too large a chunk size has been shown to be detrimental to
-    * synchronous scan performance.
-    */
-   pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
-                                     PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
-
 retry:
    /* Grab the spinlock. */
    SpinLockAcquire(&pbscan->phs_mutex);
 
    /*
-    * If the scan's startblock has not yet been initialized, we must do so
-    * now.  If this is not a synchronized scan, we just start at block 0, but
-    * if it is a synchronized scan, we must get the starting position from
-    * the synchronized scan machinery.  We can't hold the spinlock while
-    * doing that, though, so release the spinlock, get the information we
-    * need, and retry.  If nobody else has initialized the scan in the
-    * meantime, we'll fill in the value we fetched on the second time
-    * through.
+    * When the caller specified a limit on the number of blocks to scan, set
+    * that in the ParallelBlockTableScanDesc, if it's not been done by
+    * another worker already.
+    */
+   if (numblocks != InvalidBlockNumber &&
+       pbscan->phs_numblock == InvalidBlockNumber)
+   {
+       pbscan->phs_numblock = numblocks;
+   }
+
+   /*
+    * If the scan's phs_startblock has not yet been initialized, we must do
+    * so now.  If a startblock was specified, start there, otherwise if this
+    * is not a synchronized scan, we just start at block 0, but if it is a
+    * synchronized scan, we must get the starting position from the
+    * synchronized scan machinery.  We can't hold the spinlock while doing
+    * that, though, so release the spinlock, get the information we need, and
+    * retry.  If nobody else has initialized the scan in the meantime, we'll
+    * fill in the value we fetched on the second time through.
     */
    if (pbscan->phs_startblock == InvalidBlockNumber)
    {
-       if (!pbscan->base.phs_syncscan)
+       if (startblock != InvalidBlockNumber)
+           pbscan->phs_startblock = startblock;
+       else if (!pbscan->base.phs_syncscan)
            pbscan->phs_startblock = 0;
        else if (sync_startpage != InvalidBlockNumber)
            pbscan->phs_startblock = sync_startpage;
@@ -478,6 +512,34 @@ retry:
        }
    }
    SpinLockRelease(&pbscan->phs_mutex);
+
+   /*
+    * Figure out how many blocks we're going to scan; either all of them, or
+    * just phs_numblock's worth, if a limit has been imposed.
+    */
+   if (pbscan->phs_numblock == InvalidBlockNumber)
+       scan_nblocks = pbscan->phs_nblocks;
+   else
+       scan_nblocks = pbscan->phs_numblock;
+
+   /*
+    * We determine the chunk size based on scan_nblocks.  First we split
+    * scan_nblocks into PARALLEL_SEQSCAN_NCHUNKS chunks then we calculate the
+    * next highest power of 2 number of the result.  This means we split the
+    * blocks we're scanning into somewhere between PARALLEL_SEQSCAN_NCHUNKS
+    * and PARALLEL_SEQSCAN_NCHUNKS / 2 chunks.
+    */
+   pbscanwork->phsw_chunk_size = pg_nextpower2_32(Max(scan_nblocks /
+                                                      PARALLEL_SEQSCAN_NCHUNKS, 1));
+
+   /*
+    * Ensure we don't go over the maximum chunk size with larger tables. This
+    * means we may get much more than PARALLEL_SEQSCAN_NCHUNKS for larger
+    * tables.  Too large a chunk size has been shown to be detrimental to
+    * sequential scan performance.
+    */
+   pbscanwork->phsw_chunk_size = Min(pbscanwork->phsw_chunk_size,
+                                     PARALLEL_SEQSCAN_MAX_CHUNK_SIZE);
 }
 
 /*
@@ -493,6 +555,7 @@ table_block_parallelscan_nextpage(Relation rel,
                                  ParallelBlockTableScanWorker pbscanwork,
                                  ParallelBlockTableScanDesc pbscan)
 {
+   BlockNumber scan_nblocks;
    BlockNumber page;
    uint64      nallocated;
 
@@ -513,7 +576,7 @@ table_block_parallelscan_nextpage(Relation rel,
     *
     * Here we name these ranges of blocks "chunks".  The initial size of
     * these chunks is determined in table_block_parallelscan_startblock_init
-    * based on the size of the relation.  Towards the end of the scan, we
+    * based on the number of blocks to scan.  Towards the end of the scan, we
     * start making reductions in the size of the chunks in order to attempt
     * to divide the remaining work over all the workers as evenly as
     * possible.
@@ -530,17 +593,23 @@ table_block_parallelscan_nextpage(Relation rel,
     * phs_nallocated counter will exceed rs_nblocks, because workers will
     * still increment the value, when they try to allocate the next block but
     * all blocks have been allocated already. The counter must be 64 bits
-    * wide because of that, to avoid wrapping around when rs_nblocks is close
-    * to 2^32.
+    * wide because of that, to avoid wrapping around when scan_nblocks is
+    * close to 2^32.
     *
     * The actual block to return is calculated by adding the counter to the
-    * starting block number, modulo nblocks.
+    * starting block number, modulo phs_nblocks.
     */
 
+   /* First, figure out how many blocks we're planning on scanning */
+   if (pbscan->phs_numblock == InvalidBlockNumber)
+       scan_nblocks = pbscan->phs_nblocks;
+   else
+       scan_nblocks = pbscan->phs_numblock;
+
    /*
-    * First check if we have any remaining blocks in a previous chunk for
-    * this worker.  We must consume all of the blocks from that before we
-    * allocate a new chunk to the worker.
+    * Now check if we have any remaining blocks in a previous chunk for this
+    * worker.  We must consume all of the blocks from that before we allocate
+    * a new chunk to the worker.
     */
    if (pbscanwork->phsw_chunk_remaining > 0)
    {
@@ -562,7 +631,7 @@ table_block_parallelscan_nextpage(Relation rel,
         * chunk size set to 1.
         */
        if (pbscanwork->phsw_chunk_size > 1 &&
-           pbscanwork->phsw_nallocated > pbscan->phs_nblocks -
+           pbscanwork->phsw_nallocated > scan_nblocks -
            (pbscanwork->phsw_chunk_size * PARALLEL_SEQSCAN_RAMPDOWN_CHUNKS))
            pbscanwork->phsw_chunk_size >>= 1;
 
@@ -577,7 +646,8 @@ table_block_parallelscan_nextpage(Relation rel,
        pbscanwork->phsw_chunk_remaining = pbscanwork->phsw_chunk_size - 1;
    }
 
-   if (nallocated >= pbscan->phs_nblocks)
+   /* Check if we've run out of blocks to scan */
+   if (nallocated >= scan_nblocks)
        page = InvalidBlockNumber;  /* all blocks have been allocated */
    else
        page = (nallocated + pbscan->phs_startblock) % pbscan->phs_nblocks;
index f098a5557cf07fb51d3c39865d5de25ccf6519a9..0125464d942b08ffee3e3008f9b40d41070c16df 100644 (file)
@@ -40,6 +40,7 @@
 #include "executor/nodeSeqscan.h"
 #include "executor/nodeSort.h"
 #include "executor/nodeSubplan.h"
+#include "executor/nodeTidrangescan.h"
 #include "executor/tqueue.h"
 #include "jit/jit.h"
 #include "nodes/nodeFuncs.h"
@@ -266,6 +267,11 @@ ExecParallelEstimate(PlanState *planstate, ExecParallelEstimateContext *e)
                ExecForeignScanEstimate((ForeignScanState *) planstate,
                                        e->pcxt);
            break;
+       case T_TidRangeScanState:
+           if (planstate->plan->parallel_aware)
+               ExecTidRangeScanEstimate((TidRangeScanState *) planstate,
+                                        e->pcxt);
+           break;
        case T_AppendState:
            if (planstate->plan->parallel_aware)
                ExecAppendEstimate((AppendState *) planstate,
@@ -493,6 +499,11 @@ ExecParallelInitializeDSM(PlanState *planstate,
                ExecForeignScanInitializeDSM((ForeignScanState *) planstate,
                                             d->pcxt);
            break;
+       case T_TidRangeScanState:
+           if (planstate->plan->parallel_aware)
+               ExecTidRangeScanInitializeDSM((TidRangeScanState *) planstate,
+                                             d->pcxt);
+           break;
        case T_AppendState:
            if (planstate->plan->parallel_aware)
                ExecAppendInitializeDSM((AppendState *) planstate,
@@ -994,6 +1005,11 @@ ExecParallelReInitializeDSM(PlanState *planstate,
                ExecForeignScanReInitializeDSM((ForeignScanState *) planstate,
                                               pcxt);
            break;
+       case T_TidRangeScanState:
+           if (planstate->plan->parallel_aware)
+               ExecTidRangeScanReInitializeDSM((TidRangeScanState *) planstate,
+                                               pcxt);
+           break;
        case T_AppendState:
            if (planstate->plan->parallel_aware)
                ExecAppendReInitializeDSM((AppendState *) planstate, pcxt);
@@ -1362,6 +1378,11 @@ ExecParallelInitializeWorker(PlanState *planstate, ParallelWorkerContext *pwcxt)
                ExecForeignScanInitializeWorker((ForeignScanState *) planstate,
                                                pwcxt);
            break;
+       case T_TidRangeScanState:
+           if (planstate->plan->parallel_aware)
+               ExecTidRangeScanInitializeWorker((TidRangeScanState *) planstate,
+                                                pwcxt);
+           break;
        case T_AppendState:
            if (planstate->plan->parallel_aware)
                ExecAppendInitializeWorker((AppendState *) planstate, pwcxt);
index 1bce8d6cbfe6b1ddbd13618f0e08f1a51c85b039..6fd9f68cddd4f8c2d73d3ccbb5d77d988d305b15 100644 (file)
@@ -415,3 +415,83 @@ ExecInitTidRangeScan(TidRangeScan *node, EState *estate, int eflags)
     */
    return tidrangestate;
 }
+
+/* ----------------------------------------------------------------
+ *                     Parallel Scan Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *     ExecTidRangeScanEstimate
+ *
+ *     Compute the amount of space we'll need in the parallel
+ *     query DSM, and inform pcxt->estimator about our needs.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt)
+{
+   EState     *estate = node->ss.ps.state;
+
+   node->trss_pscanlen =
+       table_parallelscan_estimate(node->ss.ss_currentRelation,
+                                   estate->es_snapshot);
+   shm_toc_estimate_chunk(&pcxt->estimator, node->trss_pscanlen);
+   shm_toc_estimate_keys(&pcxt->estimator, 1);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecTidRangeScanInitializeDSM
+ *
+ *     Set up a parallel TID range scan descriptor.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt)
+{
+   EState     *estate = node->ss.ps.state;
+   ParallelTableScanDesc pscan;
+
+   pscan = shm_toc_allocate(pcxt->toc, node->trss_pscanlen);
+   table_parallelscan_initialize(node->ss.ss_currentRelation,
+                                 pscan,
+                                 estate->es_snapshot);
+   shm_toc_insert(pcxt->toc, node->ss.ps.plan->plan_node_id, pscan);
+   node->ss.ss_currentScanDesc =
+       table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
+                                         pscan);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecTidRangeScanReInitializeDSM
+ *
+ *     Reset shared state before beginning a fresh scan.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanReInitializeDSM(TidRangeScanState *node,
+                               ParallelContext *pcxt)
+{
+   ParallelTableScanDesc pscan;
+
+   pscan = node->ss.ss_currentScanDesc->rs_parallel;
+   table_parallelscan_reinitialize(node->ss.ss_currentRelation, pscan);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecTidRangeScanInitializeWorker
+ *
+ *     Copy relevant information from TOC into planstate.
+ * ----------------------------------------------------------------
+ */
+void
+ExecTidRangeScanInitializeWorker(TidRangeScanState *node,
+                                ParallelWorkerContext *pwcxt)
+{
+   ParallelTableScanDesc pscan;
+
+   pscan = shm_toc_lookup(pwcxt->toc, node->ss.ps.plan->plan_node_id, false);
+   node->ss.ss_currentScanDesc =
+       table_beginscan_parallel_tidrange(node->ss.ss_currentRelation,
+                                         pscan);
+}
index 8335cf5b5c5aeee69ae2616039109b2c0a2991fc..5a7283bd2f535572e0faa93db44514618b768b00 100644 (file)
@@ -1340,8 +1340,9 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
 {
    Selectivity selectivity;
    double      pages;
-   Cost        startup_cost = 0;
-   Cost        run_cost = 0;
+   Cost        startup_cost;
+   Cost        cpu_run_cost;
+   Cost        disk_run_cost;
    QualCost    qpqual_cost;
    Cost        cpu_per_tuple;
    QualCost    tid_qual_cost;
@@ -1373,8 +1374,8 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
     * page is just a normal sequential page read. NOTE: it's desirable for
     * TID Range Scans to cost more than the equivalent Sequential Scans,
     * because Seq Scans have some performance advantages such as scan
-    * synchronization and parallelizability, and we'd prefer one of them to
-    * be picked unless a TID Range Scan really is better.
+    * synchronization, and we'd prefer one of them to be picked unless a TID
+    * Range Scan really is better.
     */
    ntuples = selectivity * baserel->tuples;
    nseqpages = pages - 1.0;
@@ -1391,7 +1392,7 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
                              &spc_seq_page_cost);
 
    /* disk costs; 1 random page and the remainder as seq pages */
-   run_cost += spc_random_page_cost + spc_seq_page_cost * nseqpages;
+   disk_run_cost = spc_random_page_cost + spc_seq_page_cost * nseqpages;
 
    /* Add scanning CPU costs */
    get_restriction_qual_cost(root, baserel, param_info, &qpqual_cost);
@@ -1403,20 +1404,35 @@ cost_tidrangescan(Path *path, PlannerInfo *root,
     * can't be removed, this is a mistake and we're going to underestimate
     * the CPU cost a bit.)
     */
-   startup_cost += qpqual_cost.startup + tid_qual_cost.per_tuple;
+   startup_cost = qpqual_cost.startup + tid_qual_cost.per_tuple;
    cpu_per_tuple = cpu_tuple_cost + qpqual_cost.per_tuple -
        tid_qual_cost.per_tuple;
-   run_cost += cpu_per_tuple * ntuples;
+   cpu_run_cost = cpu_per_tuple * ntuples;
 
    /* tlist eval costs are paid per output row, not per tuple scanned */
    startup_cost += path->pathtarget->cost.startup;
-   run_cost += path->pathtarget->cost.per_tuple * path->rows;
+   cpu_run_cost += path->pathtarget->cost.per_tuple * path->rows;
+
+   /* Adjust costing for parallelism, if used. */
+   if (path->parallel_workers > 0)
+   {
+       double      parallel_divisor = get_parallel_divisor(path);
+
+       /* The CPU cost is divided among all the workers. */
+       cpu_run_cost /= parallel_divisor;
+
+       /*
+        * In the case of a parallel plan, the row count needs to represent
+        * the number of tuples processed per worker.
+        */
+       path->rows = clamp_row_est(path->rows / parallel_divisor);
+   }
 
    /* we should not generate this path type when enable_tidscan=false */
    Assert(enable_tidscan);
    path->disabled_nodes = 0;
    path->startup_cost = startup_cost;
-   path->total_cost = startup_cost + run_cost;
+   path->total_cost = startup_cost + cpu_run_cost + disk_run_cost;
 }
 
 /*
index 2bfb338b81ced8da158d6d5143ca6d50a8207bd4..3ddbc10bbdf1a8706f4ba84f4fc7408e572098ae 100644 (file)
@@ -490,9 +490,8 @@ ec_member_matches_ctid(PlannerInfo *root, RelOptInfo *rel,
 
 /*
  * create_tidscan_paths
- *   Create paths corresponding to direct TID scans of the given rel.
- *
- *   Candidate paths are added to the rel's pathlist (using add_path).
+ *   Create paths corresponding to direct TID scans of the given rel and add
+ *   them to the corresponding path list via add_path or add_partial_path.
  */
 bool
 create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
@@ -553,7 +552,24 @@ create_tidscan_paths(PlannerInfo *root, RelOptInfo *rel)
 
        add_path(rel, (Path *) create_tidrangescan_path(root, rel,
                                                        tidrangequals,
-                                                       required_outer));
+                                                       required_outer,
+                                                       0));
+
+       /* If appropriate, consider parallel tid range scan. */
+       if (rel->consider_parallel && required_outer == NULL)
+       {
+           int         parallel_workers;
+
+           parallel_workers = compute_parallel_worker(rel, rel->pages, -1,
+                                                      max_parallel_workers_per_gather);
+
+           if (parallel_workers > 0)
+               add_partial_path(rel, (Path *) create_tidrangescan_path(root,
+                                                                       rel,
+                                                                       tidrangequals,
+                                                                       required_outer,
+                                                                       parallel_workers));
+       }
    }
 
    /*
index e4fd6950fad1d12cbe5625c9155166d0d84ecf77..fd4bd5f93f0d552981f49d6eee18cc9dbd92175d 100644 (file)
@@ -1262,7 +1262,8 @@ create_tidscan_path(PlannerInfo *root, RelOptInfo *rel, List *tidquals,
  */
 TidRangePath *
 create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
-                        List *tidrangequals, Relids required_outer)
+                        List *tidrangequals, Relids required_outer,
+                        int parallel_workers)
 {
    TidRangePath *pathnode = makeNode(TidRangePath);
 
@@ -1271,9 +1272,9 @@ create_tidrangescan_path(PlannerInfo *root, RelOptInfo *rel,
    pathnode->path.pathtarget = rel->reltarget;
    pathnode->path.param_info = get_baserel_parampathinfo(root, rel,
                                                          required_outer);
-   pathnode->path.parallel_aware = false;
+   pathnode->path.parallel_aware = (parallel_workers > 0);
    pathnode->path.parallel_safe = rel->consider_parallel;
-   pathnode->path.parallel_workers = 0;
+   pathnode->path.parallel_workers = parallel_workers;
    pathnode->path.pathkeys = NIL;  /* always unordered */
 
    pathnode->tidrangequals = tidrangequals;
index b5e0fb386c0aab6b2f005e7085d6cca03e6c1774..87a8be104619eec499cdcfdd4033734395116af6 100644 (file)
@@ -96,6 +96,8 @@ typedef struct ParallelBlockTableScanDescData
    BlockNumber phs_nblocks;    /* # blocks in relation at start of scan */
    slock_t     phs_mutex;      /* mutual exclusion for setting startblock */
    BlockNumber phs_startblock; /* starting block number */
+   BlockNumber phs_numblock;   /* # blocks to scan, or InvalidBlockNumber if
+                                * no limit */
    pg_atomic_uint64 phs_nallocated;    /* number of blocks allocated to
                                         * workers so far. */
 }          ParallelBlockTableScanDescData;
index e16bf0256928430a66a5e84dcbe75662dea0cb03..2fa790b6bf54912079081bfc12ac0ce94edb3e71 100644 (file)
@@ -1130,6 +1130,16 @@ extern void table_parallelscan_initialize(Relation rel,
 extern TableScanDesc table_beginscan_parallel(Relation relation,
                                              ParallelTableScanDesc pscan);
 
+/*
+ * Begin a parallel tid range scan. `pscan` needs to have been initialized
+ * with table_parallelscan_initialize(), for the same relation. The
+ * initialization does not need to have happened in this backend.
+ *
+ * Caller must hold a suitable lock on the relation.
+ */
+extern TableScanDesc table_beginscan_parallel_tidrange(Relation relation,
+                                                      ParallelTableScanDesc pscan);
+
 /*
  * Restart a parallel scan.  Call this in the leader process.  Caller is
  * responsible for making sure that all workers have finished the scan
@@ -2028,7 +2038,9 @@ extern BlockNumber table_block_parallelscan_nextpage(Relation rel,
                                                     ParallelBlockTableScanDesc pbscan);
 extern void table_block_parallelscan_startblock_init(Relation rel,
                                                     ParallelBlockTableScanWorker pbscanwork,
-                                                    ParallelBlockTableScanDesc pbscan);
+                                                    ParallelBlockTableScanDesc pbscan,
+                                                    BlockNumber startblock,
+                                                    BlockNumber numblocks);
 
 
 /* ----------------------------------------------------------------------------
index a831f1202cae368d84d0371cc031bf385fc34fc1..2b5465b3ce4e1f3c87d0167df9a6ab8e9c11b415 100644 (file)
@@ -14,6 +14,7 @@
 #ifndef NODETIDRANGESCAN_H
 #define NODETIDRANGESCAN_H
 
+#include "access/parallel.h"
 #include "nodes/execnodes.h"
 
 extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
@@ -21,4 +22,10 @@ extern TidRangeScanState *ExecInitTidRangeScan(TidRangeScan *node,
 extern void ExecEndTidRangeScan(TidRangeScanState *node);
 extern void ExecReScanTidRangeScan(TidRangeScanState *node);
 
+/* parallel scan support */
+extern void ExecTidRangeScanEstimate(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanReInitializeDSM(TidRangeScanState *node, ParallelContext *pcxt);
+extern void ExecTidRangeScanInitializeWorker(TidRangeScanState *node, ParallelWorkerContext *pwcxt);
+
 #endif                         /* NODETIDRANGESCAN_H */
index 18ae8f0d4bb80b1fbfb27abae231caff29d09d48..64ff6996431ebb3894c4f0a3a781c5f707fe8155 100644 (file)
@@ -1930,6 +1930,7 @@ typedef struct TidScanState
  *     trss_mintid         the lowest TID in the scan range
  *     trss_maxtid         the highest TID in the scan range
  *     trss_inScan         is a scan currently in progress?
+ *     trss_pscanlen       size of parallel heap scan descriptor
  * ----------------
  */
 typedef struct TidRangeScanState
@@ -1939,6 +1940,7 @@ typedef struct TidRangeScanState
    ItemPointerData trss_mintid;
    ItemPointerData trss_maxtid;
    bool        trss_inScan;
+   Size        trss_pscanlen;
 } TidRangeScanState;
 
 /* ----------------
index 955e905685830da18549784b1911c59e4b370cd0..6b010f0b1a5a5b2b9b9e66f6d7673ad05055e216 100644 (file)
@@ -67,7 +67,8 @@ extern TidPath *create_tidscan_path(PlannerInfo *root, RelOptInfo *rel,
 extern TidRangePath *create_tidrangescan_path(PlannerInfo *root,
                                              RelOptInfo *rel,
                                              List *tidrangequals,
-                                             Relids required_outer);
+                                             Relids required_outer,
+                                             int parallel_workers);
 extern AppendPath *create_append_path(PlannerInfo *root, RelOptInfo *rel,
                                      List *subpaths, List *partial_subpaths,
                                      List *pathkeys, Relids required_outer,
index 721f3b94e0423628f70db578532bae38aa39310e..89930f7bc7c6d2b9056a5be58fa53dc12ff852c1 100644 (file)
@@ -297,4 +297,109 @@ FETCH LAST c;
 
 COMMIT;
 DROP TABLE tidrangescan;
+-- Tests for parallel TID Range Scans
+BEGIN;
+SET LOCAL parallel_setup_cost TO 0;
+SET LOCAL parallel_tuple_cost TO 0;
+SET LOCAL min_parallel_table_scan_size TO 0;
+SET LOCAL max_parallel_workers_per_gather TO 4;
+CREATE TABLE parallel_tidrangescan (id integer, data text)
+WITH (fillfactor = 10);
+-- Insert enough tuples such that each page gets 5 tuples with fillfactor = 10
+INSERT INTO parallel_tidrangescan
+SELECT i, repeat('x', 100) FROM generate_series(1,200) AS s(i);
+-- Ensure there are 40 pages for parallel test
+SELECT min(ctid), max(ctid) FROM parallel_tidrangescan;
+  min  |  max   
+-------+--------
+ (0,1) | (39,5)
+(1 row)
+
+-- Parallel range scans with upper bound
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
+                             QUERY PLAN                             
+--------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Tid Range Scan on parallel_tidrangescan
+                     TID Cond: (ctid < '(30,1)'::tid)
+(6 rows)
+
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
+ count 
+-------
+   150
+(1 row)
+
+-- Parallel range scans with lower bound
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
+                             QUERY PLAN                             
+--------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Tid Range Scan on parallel_tidrangescan
+                     TID Cond: (ctid > '(10,0)'::tid)
+(6 rows)
+
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
+ count 
+-------
+   150
+(1 row)
+
+-- Parallel range scans with both bounds
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
+                                    QUERY PLAN                                     
+-----------------------------------------------------------------------------------
+ Finalize Aggregate
+   ->  Gather
+         Workers Planned: 4
+         ->  Partial Aggregate
+               ->  Parallel Tid Range Scan on parallel_tidrangescan
+                     TID Cond: ((ctid > '(10,0)'::tid) AND (ctid < '(30,1)'::tid))
+(6 rows)
+
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
+ count 
+-------
+   100
+(1 row)
+
+-- Parallel rescans
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
+LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)';
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Nested Loop
+   ->  Gather
+         Workers Planned: 4
+         ->  Parallel Tid Range Scan on parallel_tidrangescan t
+               TID Cond: (ctid < '(1,0)'::tid)
+   ->  Aggregate
+         ->  Tid Range Scan on parallel_tidrangescan t2
+               TID Cond: (ctid <= t.ctid)
+(8 rows)
+
+SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
+LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)';
+ ctid  | c 
+-------+---
+ (0,1) | 1
+ (0,2) | 2
+ (0,3) | 3
+ (0,4) | 4
+ (0,5) | 5
+(5 rows)
+
+ROLLBACK;
 RESET enable_seqscan;
index ac09ebb6262648f36c141c76aa28e235062e7501..1ac3995e71c28cc1aea65039b8548f8b347d774f 100644 (file)
@@ -98,4 +98,48 @@ COMMIT;
 
 DROP TABLE tidrangescan;
 
+-- Tests for parallel TID Range Scans
+BEGIN;
+
+SET LOCAL parallel_setup_cost TO 0;
+SET LOCAL parallel_tuple_cost TO 0;
+SET LOCAL min_parallel_table_scan_size TO 0;
+SET LOCAL max_parallel_workers_per_gather TO 4;
+
+CREATE TABLE parallel_tidrangescan (id integer, data text)
+WITH (fillfactor = 10);
+
+-- Insert enough tuples such that each page gets 5 tuples with fillfactor = 10
+INSERT INTO parallel_tidrangescan
+SELECT i, repeat('x', 100) FROM generate_series(1,200) AS s(i);
+
+-- Ensure there are 40 pages for parallel test
+SELECT min(ctid), max(ctid) FROM parallel_tidrangescan;
+
+-- Parallel range scans with upper bound
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid < '(30,1)';
+
+-- Parallel range scans with lower bound
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)';
+
+-- Parallel range scans with both bounds
+EXPLAIN (COSTS OFF)
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
+SELECT count(*) FROM parallel_tidrangescan WHERE ctid > '(10,0)' AND ctid < '(30,1)';
+
+-- Parallel rescans
+EXPLAIN (COSTS OFF)
+SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
+LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)';
+
+SELECT t.ctid,t2.c FROM parallel_tidrangescan t,
+LATERAL (SELECT count(*) c FROM parallel_tidrangescan t2 WHERE t2.ctid <= t.ctid) t2
+WHERE t.ctid < '(1,0)';
+
+ROLLBACK;
 RESET enable_seqscan;