Avoid unnecessary streaming of transactions during logical replication.
authorAmit Kapila <[email protected]>
Thu, 8 Dec 2022 00:35:09 +0000 (06:05 +0530)
committerAmit Kapila <[email protected]>
Thu, 8 Dec 2022 00:35:09 +0000 (06:05 +0530)
After restart, we don't perform streaming of an in-progress transaction if
it was previously decoded and confirmed by the client. To achieve that we
were comparing the END location of the WAL record being decoded with the
WAL location we have already decoded and confirmed by the client. While
decoding the commit record, to decide whether to process and send the
complete transaction, we compare its START location with the WAL location
we have already decoded and confirmed by the client. Now, if we need to
queue some change in the transaction while decoding the commit record
(e.g. snapshot), it is possible that we decide to stream the transaction
but later commit processing decides to skip it. In such a case, we would
needlessly send the changes and later when we decide to skip it, we will
send stream abort.

We also sometimes decide to stream the changes when we actually just need
to process them locally like a change for invalidations. This will lead us
to send empty streams. To avoid this, while queuing each change for
decoding, we remember whether the transaction has any change that actually
needs to be sent downstream and use that information later to decide
whether to stream the transaction or not.

Note, we can't avoid all cases where we have to send empty streams like
the case where the plugin later decides that the change is not
publishable. However, we will no longer need to send stream_abort when we
skip sending a particular transaction.

Author: Dilip Kumar
Reviewed-by: Hou Zhijie, Ashutosh Bapat, Shi yu, Amit Kapila
Discussion: https://postgr.es/m/CAFiTN-tHK=7LzfrPs8fbT2ksrOJGQbzywcgXst2bM9-rJJAAUg@mail.gmail.com

src/backend/replication/logical/reorderbuffer.c
src/include/replication/reorderbuffer.h

index 31f7381f2d6d0ea5390f9ab93a802664bac670e6..b567b8b59e21a3533d66a697b4753f94b33fe4cf 100644 (file)
@@ -695,9 +695,9 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
  * Record the partial change for the streaming of in-progress transactions.  We
  * can stream only complete changes so if we have a partial change like toast
  * table insert or speculative insert then we mark such a 'txn' so that it
- * can't be streamed.  We also ensure that if the changes in such a 'txn' are
- * above logical_decoding_work_mem threshold then we stream them as soon as we
- * have a complete change.
+ * can't be streamed.  We also ensure that if the changes in such a 'txn' can
+ * be streamed and are above logical_decoding_work_mem threshold then we stream
+ * them as soon as we have a complete change.
  */
 static void
 ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
@@ -762,7 +762,8 @@ ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
         */
        if (ReorderBufferCanStartStreaming(rb) &&
                !(rbtxn_has_partial_change(toptxn)) &&
-               rbtxn_is_serialized(txn))
+               rbtxn_is_serialized(txn) &&
+               rbtxn_has_streamable_change(toptxn))
                ReorderBufferStreamTXN(rb, toptxn);
 }
 
@@ -793,6 +794,29 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
                return;
        }
 
+       /*
+        * The changes that are sent downstream are considered streamable.  We
+        * remember such transactions so that only those will later be considered
+        * for streaming.
+        */
+       if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+               change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+               change->action == REORDER_BUFFER_CHANGE_DELETE ||
+               change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+               change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
+               change->action == REORDER_BUFFER_CHANGE_MESSAGE)
+       {
+               ReorderBufferTXN *toptxn;
+
+               /* get the top transaction */
+               if (txn->toptxn != NULL)
+                       toptxn = txn->toptxn;
+               else
+                       toptxn = txn;
+
+               toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+       }
+
        change->lsn = lsn;
        change->txn = txn;
 
@@ -2942,9 +2966,8 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
        if (txn == NULL)
                return;
 
-       /* For streamed transactions notify the remote node about the abort. */
-       if (rbtxn_is_streamed(txn))
-               rb->stream_abort(rb, txn, lsn);
+       /* this transaction mustn't be streamed */
+       Assert(!rbtxn_is_streamed(txn));
 
        /* cosmetic... */
        txn->final_lsn = lsn;
@@ -3460,14 +3483,15 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
 }
 
 /*
- * Find the largest toplevel transaction to evict (by streaming).
+ * Find the largest streamable toplevel transaction to evict (by streaming).
  *
  * This can be seen as an optimized version of ReorderBufferLargestTXN, which
  * should give us the same transaction (because we don't update memory account
  * for subtransaction with streaming, so it's always 0). But we can simply
  * iterate over the limited number of toplevel transactions that have a base
  * snapshot. There is no use of selecting a transaction that doesn't have base
- * snapshot because we don't decode such transactions.
+ * snapshot because we don't decode such transactions.  Also, we do not select
+ * the transaction which doesn't have any streamable change.
  *
  * Note that, we skip transactions that contains incomplete changes. There
  * is a scope of optimization here such that we can select the largest
@@ -3483,7 +3507,7 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
  * the subxact from where we streamed the last change.
  */
 static ReorderBufferTXN *
-ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
 {
        dlist_iter      iter;
        Size            largest_size = 0;
@@ -3502,7 +3526,8 @@ ReorderBufferLargestTopTXN(ReorderBuffer *rb)
                Assert(txn->base_snapshot != NULL);
 
                if ((largest == NULL || txn->total_size > largest_size) &&
-                       (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+                       (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+                       rbtxn_has_streamable_change(txn))
                {
                        largest = txn;
                        largest_size = txn->total_size;
@@ -3547,7 +3572,7 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
                 * memory by streaming, if possible.  Otherwise, spill to disk.
                 */
                if (ReorderBufferCanStartStreaming(rb) &&
-                       (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+                       (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
                {
                        /* we know there has to be one, because the size is not zero */
                        Assert(txn && !txn->toptxn);
@@ -3919,7 +3944,7 @@ ReorderBufferCanStartStreaming(ReorderBuffer *rb)
         * restarting.
         */
        if (ReorderBufferCanStream(rb) &&
-               !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+               !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
                return true;
 
        return false;
index b23d8cc4f9fdb426e497476c35d7c55fc714a229..c700b55b1c0a42ac62da581cdffc11b64770529f 100644 (file)
@@ -168,14 +168,15 @@ typedef struct ReorderBufferChange
 } ReorderBufferChange;
 
 /* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT          0x0002
-#define RBTXN_IS_SERIALIZED       0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED         0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE  0x0020
-#define RBTXN_PREPARE             0x0040
-#define RBTXN_SKIPPED_PREPARE    0x0080
+#define RBTXN_HAS_CATALOG_CHANGES      0x0001
+#define RBTXN_IS_SUBXACT               0x0002
+#define RBTXN_IS_SERIALIZED            0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR      0x0008
+#define RBTXN_IS_STREAMED              0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE       0x0020
+#define RBTXN_PREPARE                  0x0040
+#define RBTXN_SKIPPED_PREPARE          0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE    0x0100
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -207,6 +208,12 @@ typedef struct ReorderBufferChange
        ((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
 )
 
+/* Does this transaction contain streamable changes? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+       ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
 /*
  * Has this transaction been streamed to downstream?
  *