Add macros for ReorderBufferTXN toptxn.
authorAmit Kapila <[email protected]>
Fri, 17 Mar 2023 02:59:41 +0000 (08:29 +0530)
committerAmit Kapila <[email protected]>
Fri, 17 Mar 2023 02:59:41 +0000 (08:29 +0530)
Currently, there are quite a few places in reorderbuffer.c that tries to
access top-transaction for a subtransaction. This makes the code to access
top-transaction consistent and easier to follow.

Author: Peter Smith
Reviewed-by: Vignesh C, Sawada Masahiko
Discussion: https://postgr.es/m/CAHut+PuCznOyTqBQwjRUu-ibG-=KHyCv-0FTcWQtZUdR88umfg@mail.gmail.com

contrib/test_decoding/test_decoding.c
src/backend/replication/logical/reorderbuffer.c
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/reorderbuffer.h

index b7e60486474a064e7c02b962f214621678b20fa6..628c6a2595786f91cc09627e10e2930b46106f3d 100644 (file)
@@ -815,11 +815,11 @@ pg_decode_stream_abort(LogicalDecodingContext *ctx,
         * maintain the output_plugin_private only under the toptxn so if this is
         * not the toptxn then fetch the toptxn.
         */
-       ReorderBufferTXN *toptxn = txn->toptxn ? txn->toptxn : txn;
+       ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
        TestDecodingTxnData *txndata = toptxn->output_plugin_private;
        bool            xact_wrote_changes = txndata->xact_wrote_changes;
 
-       if (txn->toptxn == NULL)
+       if (rbtxn_is_toptxn(txn))
        {
                Assert(txn->output_plugin_private != NULL);
                pfree(txndata);
index 2d17c551a80f8190df869f03ee8801c4dcf6226f..9f4497447302d4f10663c2c056e69ae270a0b5a4 100644 (file)
@@ -717,10 +717,7 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                return;
 
        /* Get the top transaction. */
-       if (txn->toptxn != NULL)
-               toptxn = txn->toptxn;
-       else
-               toptxn = txn;
+       toptxn = rbtxn_get_toptxn(txn);
 
        /*
         * Indicate a partial change for toast inserts.  The change will be
@@ -809,13 +806,7 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
                change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
                change->action == REORDER_BUFFER_CHANGE_MESSAGE)
        {
-               ReorderBufferTXN *toptxn;
-
-               /* get the top transaction */
-               if (txn->toptxn != NULL)
-                       toptxn = txn->toptxn;
-               else
-                       toptxn = txn;
+               ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
 
                toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
        }
@@ -1655,9 +1646,9 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
        /*
         * Mark the transaction as streamed.
         *
-        * The toplevel transaction, identified by (toptxn==NULL), is marked as
-        * streamed always, even if it does not contain any changes (that is, when
-        * all the changes are in subtransactions).
+        * The top-level transaction, is marked as streamed always, even if it
+        * does not contain any changes (that is, when all the changes are in
+        * subtransactions).
         *
         * For subtransactions, we only mark them as streamed when there are
         * changes in them.
@@ -1667,7 +1658,7 @@ ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prep
         * about the toplevel xact (we send the XID in all messages), but we never
         * stream XIDs of empty subxacts.
         */
-       if ((!txn_prepared) && ((!txn->toptxn) || (txn->nentries_mem != 0)))
+       if ((!txn_prepared) && (rbtxn_is_toptxn(txn) || (txn->nentries_mem != 0)))
                txn->txn_flags |= RBTXN_IS_STREAMED;
 
        if (txn_prepared)
@@ -3207,10 +3198,7 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
         * Update the total size in top level as well. This is later used to
         * compute the decoding stats.
         */
-       if (txn->toptxn != NULL)
-               toptxn = txn->toptxn;
-       else
-               toptxn = txn;
+       toptxn = rbtxn_get_toptxn(txn);
 
        if (addition)
        {
@@ -3295,8 +3283,7 @@ ReorderBufferAddInvalidations(ReorderBuffer *rb, TransactionId xid,
         * so that we can execute them all together.  See comments atop this
         * function.
         */
-       if (txn->toptxn)
-               txn = txn->toptxn;
+       txn = rbtxn_get_toptxn(txn);
 
        Assert(nmsgs > 0);
 
@@ -3354,7 +3341,6 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
                                                                  XLogRecPtr lsn)
 {
        ReorderBufferTXN *txn;
-       ReorderBufferTXN *toptxn;
 
        txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
@@ -3370,11 +3356,15 @@ ReorderBufferXidSetCatalogChanges(ReorderBuffer *rb, TransactionId xid,
         * conveniently check just top-level transaction and decide whether to
         * build the hash table or not.
         */
-       toptxn = txn->toptxn;
-       if (toptxn != NULL && !rbtxn_has_catalog_changes(toptxn))
+       if (rbtxn_is_subtxn(txn))
        {
-               toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
-               dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+               ReorderBufferTXN *toptxn = rbtxn_get_toptxn(txn);
+
+               if (!rbtxn_has_catalog_changes(toptxn))
+               {
+                       toptxn->txn_flags |= RBTXN_HAS_CATALOG_CHANGES;
+                       dclist_push_tail(&rb->catchange_txns, &toptxn->catchange_node);
+               }
        }
 }
 
@@ -3619,7 +3609,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
                        (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
                {
                        /* we know there has to be one, because the size is not zero */
-                       Assert(txn && !txn->toptxn);
+                       Assert(txn && rbtxn_is_toptxn(txn));
                        Assert(txn->total_size > 0);
                        Assert(rb->size >= txn->total_size);
 
@@ -4007,7 +3997,7 @@ ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        bool            txn_is_streamed;
 
        /* We can never reach here for a subtransaction. */
-       Assert(txn->toptxn == NULL);
+       Assert(rbtxn_is_toptxn(txn));
 
        /*
         * We can't make any assumptions about base snapshot here, similar to what
index 00a2d73dab034b8da85bd8e3d4fe17a36a98611d..3a2d2e357e031fed665ca012dd6f720133b2804d 100644 (file)
@@ -694,8 +694,8 @@ maybe_send_schema(LogicalDecodingContext *ctx,
        if (in_streaming)
                xid = change->txn->xid;
 
-       if (change->txn->toptxn)
-               topxid = change->txn->toptxn->xid;
+       if (rbtxn_is_subtxn(change->txn))
+               topxid = rbtxn_get_toptxn(change->txn)->xid;
        else
                topxid = xid;
 
@@ -1879,7 +1879,7 @@ pgoutput_stream_abort(struct LogicalDecodingContext *ctx,
        Assert(!in_streaming);
 
        /* determine the toplevel transaction */
-       toptxn = (txn->toptxn) ? txn->toptxn : txn;
+       toptxn = rbtxn_get_toptxn(txn);
 
        Assert(rbtxn_is_streamed(toptxn));
 
index 215d1494e9010b127cb99e05e46858be876b7198..e37f5120ebbf468c3e0777020b89eda3f3acb01a 100644 (file)
@@ -249,6 +249,24 @@ typedef struct ReorderBufferChange
        ((txn)->txn_flags & RBTXN_SKIPPED_PREPARE) != 0 \
 )
 
+/* Is this a top-level transaction? */
+#define rbtxn_is_toptxn(txn) \
+( \
+       (txn)->toptxn == NULL \
+)
+
+/* Is this a subtransaction? */
+#define rbtxn_is_subtxn(txn) \
+( \
+       (txn)->toptxn != NULL \
+)
+
+/* Get the top-level transaction of this (sub)transaction. */
+#define rbtxn_get_toptxn(txn) \
+( \
+       rbtxn_is_subtxn(txn) ? (txn)->toptxn : (txn) \
+)
+
 typedef struct ReorderBufferTXN
 {
        /* See above */