Implement streaming mode in ReorderBuffer.
authorAmit Kapila <[email protected]>
Sat, 8 Aug 2020 02:04:39 +0000 (07:34 +0530)
committerAmit Kapila <[email protected]>
Sat, 8 Aug 2020 02:17:06 +0000 (07:47 +0530)
Instead of serializing the transaction to disk after reaching the
logical_decoding_work_mem limit in memory, we consume the changes we have
in memory and invoke stream API methods added by commit 45fdc9738b.
However, sometimes if we have incomplete toast or speculative insert we
spill to the disk because we can't generate the complete tuple and stream.
And, as soon as we get the complete tuple we stream the transaction
including the serialized changes.

We can do this incremental processing thanks to having assignments
(associating subxact with toplevel xacts) in WAL right away, and
thanks to logging the invalidation messages at each command end. These
features are added by commits 0bead9af48 and c55040ccd0 respectively.

Now that we can stream in-progress transactions, the concurrent aborts
may cause failures when the output plugin consults catalogs (both system
and user-defined).

We handle such failures by returning ERRCODE_TRANSACTION_ROLLBACK
sqlerrcode from system table scan APIs to the backend or WALSender
decoding a specific uncommitted transaction. The decoding logic on the
receipt of such a sqlerrcode aborts the decoding of the current
transaction and continue with the decoding of other transactions.

We have ReorderBufferTXN pointer in each ReorderBufferChange by which we
know which xact it belongs to.  The output plugin can use this to decide
which changes to discard in case of stream_abort_cb (e.g. when a subxact
gets discarded).

We also provide a new option via SQL APIs to fetch the changes being
streamed.

Author: Dilip Kumar, Tomas Vondra, Amit Kapila, Nikhil Sontakke
Reviewed-by: Amit Kapila, Kuntal Ghosh, Ajin Cherian
Tested-by: Neha Sharma, Mahendra Singh Thalor and Ajin Cherian
Discussion: https://postgr.es/m/688b0b7f-2f6c-d827-c27b-216a8e3ea700@2ndquadrant.com

21 files changed:
contrib/test_decoding/Makefile
contrib/test_decoding/expected/stream.out [new file with mode: 0644]
contrib/test_decoding/expected/truncate.out
contrib/test_decoding/sql/stream.sql [new file with mode: 0644]
contrib/test_decoding/sql/truncate.sql
contrib/test_decoding/test_decoding.c
doc/src/sgml/logicaldecoding.sgml
doc/src/sgml/test-decoding.sgml
src/backend/access/heap/heapam.c
src/backend/access/heap/heapam_visibility.c
src/backend/access/index/genam.c
src/backend/access/table/tableam.c
src/backend/access/transam/xact.c
src/backend/replication/logical/decode.c
src/backend/replication/logical/logical.c
src/backend/replication/logical/reorderbuffer.c
src/include/access/heapam_xlog.h
src/include/access/tableam.h
src/include/access/xact.h
src/include/replication/logical.h
src/include/replication/reorderbuffer.h

index f439c582a5f9b2ac3edfcfe02cec8fd169e0b9e7..ed9a3d6c0edeef621eac73eef591eb82ad149cf4 100644 (file)
@@ -5,7 +5,7 @@ PGFILEDESC = "test_decoding - example of a logical decoding output plugin"
 
 REGRESS = ddl xact rewrite toast permissions decoding_in_xact \
    decoding_into_rel binary prepared replorigin time messages \
-   spill slot truncate
+   spill slot truncate stream
 ISOLATION = mxact delayed_startup ondisk_startup concurrent_ddl_dml \
    oldest_xmin snapshot_transfer subxact_without_top
 
diff --git a/contrib/test_decoding/expected/stream.out b/contrib/test_decoding/expected/stream.out
new file mode 100644 (file)
index 0000000..9a5d7e7
--- /dev/null
@@ -0,0 +1,94 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+ ?column? 
+----------
+ init
+(1 row)
+
+CREATE TABLE stream_test(data text);
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+-- streaming test with sub-transaction
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+ ?column? 
+----------
+ msg5
+(1 row)
+
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+COMMIT;
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+                           data                           
+----------------------------------------------------------
+ opening a streamed block for transaction
+ streaming message: transactional: 1 prefix: test, sz: 50
+ closing a streamed block for transaction
+ aborting streamed (sub)transaction
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(27 rows)
+
+-- streaming test for toast changes
+ALTER TABLE stream_test ALTER COLUMN data set storage external;
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+ data 
+------
+(0 rows)
+
+INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+                   data                   
+------------------------------------------
+ opening a streamed block for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ streaming change for transaction
+ closing a streamed block for transaction
+ committing streamed transaction
+(13 rows)
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
index 1cf2ae835c8412aa54ffe4a05724dc1d3031219b..e64d377214ab902d9f706b07bcbf65b6531ce94f 100644 (file)
@@ -25,3 +25,9 @@ SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'inc
  COMMIT
 (9 rows)
 
+SELECT pg_drop_replication_slot('regression_slot');
+ pg_drop_replication_slot 
+--------------------------
+(1 row)
+
diff --git a/contrib/test_decoding/sql/stream.sql b/contrib/test_decoding/sql/stream.sql
new file mode 100644 (file)
index 0000000..8abc30d
--- /dev/null
@@ -0,0 +1,30 @@
+SET synchronous_commit = on;
+SELECT 'init' FROM pg_create_logical_replication_slot('regression_slot', 'test_decoding');
+
+CREATE TABLE stream_test(data text);
+
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+-- streaming test with sub-transaction
+BEGIN;
+savepoint s1;
+SELECT 'msg5' FROM pg_logical_emit_message(true, 'test', repeat('a', 50));
+INSERT INTO stream_test SELECT repeat('a', 2000) || g.i FROM generate_series(1, 35) g(i);
+TRUNCATE table stream_test;
+rollback to s1;
+INSERT INTO stream_test SELECT repeat('a', 10) || g.i FROM generate_series(1, 20) g(i);
+COMMIT;
+
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+
+-- streaming test for toast changes
+ALTER TABLE stream_test ALTER COLUMN data set storage external;
+-- consume DDL
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+
+INSERT INTO stream_test SELECT repeat('a', 6000) || g.i FROM generate_series(1, 10) g(i);
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'stream-changes', '1');
+
+DROP TABLE stream_test;
+SELECT pg_drop_replication_slot('regression_slot');
index 5aecdf0881f56f93e2e46e8e93483e7e2fd79916..5633854e0dfc504e01525d0b69267fdffd6a4c86 100644 (file)
@@ -11,3 +11,4 @@ TRUNCATE tab1, tab1 RESTART IDENTITY CASCADE;
 TRUNCATE tab1, tab2;
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');
+SELECT pg_drop_replication_slot('regression_slot');
index dbef52a3af470cf695135937a30c66e78811bf16..34745150e9ba42eab689feba58a05e32ac27ed7c 100644 (file)
@@ -122,6 +122,7 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
 {
    ListCell   *option;
    TestDecodingData *data;
+   bool        enable_streaming = false;
 
    data = palloc0(sizeof(TestDecodingData));
    data->context = AllocSetContextCreate(ctx->context,
@@ -212,6 +213,16 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                         errmsg("could not parse value \"%s\" for parameter \"%s\"",
                                strVal(elem->arg), elem->defname)));
        }
+       else if (strcmp(elem->defname, "stream-changes") == 0)
+       {
+           if (elem->arg == NULL)
+               continue;
+           else if (!parse_bool(strVal(elem->arg), &enable_streaming))
+               ereport(ERROR,
+                       (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+                        errmsg("could not parse value \"%s\" for parameter \"%s\"",
+                               strVal(elem->arg), elem->defname)));
+       }
        else
        {
            ereport(ERROR,
@@ -221,6 +232,8 @@ pg_decode_startup(LogicalDecodingContext *ctx, OutputPluginOptions *opt,
                            elem->arg ? strVal(elem->arg) : "(null)")));
        }
    }
+
+   ctx->streaming &= enable_streaming;
 }
 
 /* cleanup this plugin's resources */
index 791a62b57c9b539aaaa2f06efa6639e2bcd6fc70..1571d71a5b6c700c2550542764f4f83f7e095cff 100644 (file)
@@ -433,9 +433,12 @@ typedef void (*LogicalOutputPluginInit) (struct OutputPluginCallbacks *cb);
 ALTER TABLE user_catalog_table SET (user_catalog_table = true);
 CREATE TABLE another_catalog_table(data text) WITH (user_catalog_table = true);
 </programlisting>
-     Any actions leading to transaction ID assignment are prohibited. That, among others,
-     includes writing to tables, performing DDL changes, and
-     calling <literal>pg_current_xact_id()</literal>.
+     Note that access to user catalog tables or regular system catalog tables
+     in the output plugins has to be done via the <literal>systable_*</literal>
+     scan APIs only. Access via the <literal>heap_*</literal> scan APIs will
+     error out. Additionally, any actions leading to transaction ID assignment
+     are prohibited. That, among others, includes writing to tables, performing
+     DDL changes, and calling <literal>pg_current_xact_id()</literal>.
     </para>
    </sect2>
 
index 8356a3d67b31be12e35f8c353d836618f5e7d3ab..fe7c9783facdb84cc68d9ab51d79fcea6cc332e9 100644 (file)
@@ -39,4 +39,26 @@ postgres=# SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'i
 </programlisting>
  </para>
 
+<para>
+  We can also get the changes of the in-progress transaction and the typical
+  output, might be:
+
+<programlisting>
+postgres[33712]=#* SELECT * FROM pg_logical_slot_get_changes('test_slot', NULL, NULL, 'stream-changes', '1');
+    lsn    | xid |                       data                       
+-----------+-----+--------------------------------------------------
+ 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
+ 0/16B21F8 | 503 | streaming change for TXN 503
+ 0/16B2300 | 503 | streaming change for TXN 503
+ 0/16B2408 | 503 | streaming change for TXN 503
+ 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
+ 0/16B21F8 | 503 | opening a streamed block for transaction TXN 503
+ 0/16BECA8 | 503 | streaming change for TXN 503
+ 0/16BEDB0 | 503 | streaming change for TXN 503
+ 0/16BEEB8 | 503 | streaming change for TXN 503
+ 0/16BEBA0 | 503 | closing a streamed block for transaction TXN 503
+(10 rows)
+</programlisting>
+ </para>
+
 </sect1>
index 5eef225f5c79127d08a4c46b62994e3f2b9a882c..00169006fb1f1cfe4af002fb37fcc146ee684a35 100644 (file)
@@ -1299,6 +1299,16 @@ heap_getnext(TableScanDesc sscan, ScanDirection direction)
                (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                 errmsg_internal("only heap AM is supported")));
 
+   /*
+    * We don't expect direct calls to heap_getnext with valid CheckXidAlive
+    * for catalog or regular tables.  See detailed comments in xact.c where
+    * these variables are declared.  Normally we have such a check at tableam
+    * level API but this is called from many places so we need to ensure it
+    * here.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected heap_getnext call during logical decoding");
+
    /* Note: no locking manipulations needed */
 
    if (scan->rs_base.rs_flags & SO_ALLOW_PAGEMODE)
@@ -1956,6 +1966,9 @@ heap_insert(Relation relation, HeapTuple tup, CommandId cid,
        {
            xlrec.flags |= XLH_INSERT_CONTAINS_NEW_TUPLE;
            bufflags |= REGBUF_KEEP_DATA;
+
+           if (IsToastRelation(relation))
+               xlrec.flags |= XLH_INSERT_ON_TOAST_RELATION;
        }
 
        XLogBeginInsert();
index dba10890aabe88376ee9274e59de1d0017a14ba3..c77128087cf7d58ed90073680dd4d9fb19cc8321 100644 (file)
@@ -1571,8 +1571,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
                                                 htup, buffer,
                                                 &cmin, &cmax);
 
+       /*
+        * If we haven't resolved the combocid to cmin/cmax, that means we
+        * have not decoded the combocid yet. That means the cmin is
+        * definitely in the future, and we're not supposed to see the tuple
+        * yet.
+        *
+        * XXX This only applies to decoding of in-progress transactions. In
+        * regular logical decoding we only execute this code at commit time,
+        * at which point we should have seen all relevant combocids. So
+        * ideally, we should error out in this case but in practice, this
+        * won't happen. If we are too worried about this then we can add an
+        * elog inside ResolveCminCmaxDuringDecoding.
+        *
+        * XXX For the streaming case, we can track the largest combocid
+        * assigned, and error out based on this (when unable to resolve
+        * combocid below that observed maximum value).
+        */
        if (!resolved)
-           elog(ERROR, "could not resolve cmin/cmax of catalog tuple");
+           return false;
 
        Assert(cmin != InvalidCommandId);
 
@@ -1642,10 +1659,25 @@ HeapTupleSatisfiesHistoricMVCC(HeapTuple htup, Snapshot snapshot,
                                                 htup, buffer,
                                                 &cmin, &cmax);
 
-       if (!resolved)
-           elog(ERROR, "could not resolve combocid to cmax");
-
-       Assert(cmax != InvalidCommandId);
+       /*
+        * If we haven't resolved the combocid to cmin/cmax, that means we
+        * have not decoded the combocid yet. That means the cmax is
+        * definitely in the future, and we're still supposed to see the
+        * tuple.
+        *
+        * XXX This only applies to decoding of in-progress transactions. In
+        * regular logical decoding we only execute this code at commit time,
+        * at which point we should have seen all relevant combocids. So
+        * ideally, we should error out in this case but in practice, this
+        * won't happen. If we are too worried about this then we can add an
+        * elog inside ResolveCminCmaxDuringDecoding.
+        *
+        * XXX For the streaming case, we can track the largest combocid
+        * assigned, and error out based on this (when unable to resolve
+        * combocid below that observed maximum value).
+        */
+       if (!resolved || cmax == InvalidCommandId)
+           return true;
 
        if (cmax >= snapshot->curcid)
            return true;        /* deleted after scan started */
index dfba5ae39ae9795b1d33d614400d10b65adf6128..e3164e674a7bc2d8740389aa45f1333d032caa7b 100644 (file)
@@ -28,6 +28,7 @@
 #include "lib/stringinfo.h"
 #include "miscadmin.h"
 #include "storage/bufmgr.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/lsyscache.h"
@@ -429,9 +430,36 @@ systable_beginscan(Relation heapRelation,
        sysscan->iscan = NULL;
    }
 
+   /*
+    * If CheckXidAlive is set then set a flag to indicate that system table
+    * scan is in-progress.  See detailed comments in xact.c where these
+    * variables are declared.
+    */
+   if (TransactionIdIsValid(CheckXidAlive))
+       bsysscan = true;
+
    return sysscan;
 }
 
+/*
+ * HandleConcurrentAbort - Handle concurrent abort of the CheckXidAlive.
+ *
+ * Error out, if CheckXidAlive is aborted. We can't directly use
+ * TransactionIdDidAbort as after crash such transaction might not have been
+ * marked as aborted.  See detailed comments in xact.c where the variable
+ * is declared.
+ */
+static inline void
+HandleConcurrentAbort()
+{
+   if (TransactionIdIsValid(CheckXidAlive) &&
+       !TransactionIdIsInProgress(CheckXidAlive) &&
+       !TransactionIdDidCommit(CheckXidAlive))
+       ereport(ERROR,
+               (errcode(ERRCODE_TRANSACTION_ROLLBACK),
+                errmsg("transaction aborted during system catalog scan")));
+}
+
 /*
  * systable_getnext --- get next tuple in a heap-or-index scan
  *
@@ -481,6 +509,12 @@ systable_getnext(SysScanDesc sysscan)
        }
    }
 
+   /*
+    * Handle the concurrent abort while fetching the catalog tuple during
+    * logical streaming of a transaction.
+    */
+   HandleConcurrentAbort();
+
    return htup;
 }
 
@@ -517,6 +551,12 @@ systable_recheck_tuple(SysScanDesc sysscan, HeapTuple tup)
                                            sysscan->slot,
                                            freshsnap);
 
+   /*
+    * Handle the concurrent abort while fetching the catalog tuple during
+    * logical streaming of a transaction.
+    */
+   HandleConcurrentAbort();
+
    return result;
 }
 
@@ -545,6 +585,13 @@ systable_endscan(SysScanDesc sysscan)
    if (sysscan->snapshot)
        UnregisterSnapshot(sysscan->snapshot);
 
+   /*
+    * Reset the bsysscan flag at the end of the systable scan.  See
+    * detailed comments in xact.c where these variables are declared.
+    */
+   if (TransactionIdIsValid(CheckXidAlive))
+       bsysscan = false;
+
    pfree(sysscan);
 }
 
@@ -643,6 +690,12 @@ systable_getnext_ordered(SysScanDesc sysscan, ScanDirection direction)
    if (htup && sysscan->iscan->xs_recheck)
        elog(ERROR, "system catalog scans with lossy index conditions are not implemented");
 
+   /*
+    * Handle the concurrent abort while fetching the catalog tuple during
+    * logical streaming of a transaction.
+    */
+   HandleConcurrentAbort();
+
    return htup;
 }
 
index 3afb63b1fe4db946a798b961be224835d59c47b6..c6383197657562845b7f39901e8f5bfb6e5e8d2c 100644 (file)
@@ -248,6 +248,14 @@ table_tuple_get_latest_tid(TableScanDesc scan, ItemPointer tid)
    Relation    rel = scan->rs_rd;
    const TableAmRoutine *tableam = rel->rd_tableam;
 
+   /*
+    * We don't expect direct calls to table_tuple_get_latest_tid with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_tuple_get_latest_tid call during logical decoding");
+
    /*
     * Since this can be called with user-supplied TID, don't trust the input
     * too much.
index d4f7c29847f4384fde31380a4a90e220b12e616c..727d616035935436359ba793135a268bb06666fb 100644 (file)
@@ -82,6 +82,19 @@ bool     XactDeferrable;
 
 int            synchronous_commit = SYNCHRONOUS_COMMIT_ON;
 
+/*
+ * CheckXidAlive is a xid value pointing to a possibly ongoing (sub)
+ * transaction.  Currently, it is used in logical decoding.  It's possible
+ * that such transactions can get aborted while the decoding is ongoing in
+ * which case we skip decoding that particular transaction.  To ensure that we
+ * check whether the CheckXidAlive is aborted after fetching the tuple from
+ * system tables.  We also ensure that during logical decoding we never
+ * directly access the tableam or heap APIs because we are checking for the
+ * concurrent aborts only in systable_* APIs.
+ */
+TransactionId CheckXidAlive = InvalidTransactionId;
+bool       bsysscan = false;
+
 /*
  * When running as a parallel worker, we place only a single
  * TransactionStateData on the parallel worker's state stack, and the XID
@@ -2680,6 +2693,9 @@ AbortTransaction(void)
    /* Forget about any active REINDEX. */
    ResetReindexState(s->nestingLevel);
 
+   /* Reset logical streaming state. */
+   ResetLogicalStreamingState();
+
    /* If in parallel mode, clean up workers and exit parallel mode. */
    if (IsInParallelMode())
    {
@@ -4982,6 +4998,9 @@ AbortSubTransaction(void)
    /* Forget about any active REINDEX. */
    ResetReindexState(s->nestingLevel);
 
+   /* Reset logical streaming state. */
+   ResetLogicalStreamingState();
+
    /* Exit from parallel mode, if necessary. */
    if (IsInParallelMode())
    {
index f3a1c31a2921c76bccd4eaecc3de1108b3d0f19e..f21f61d5e10b052d9c25a0d75055a557d52d309c 100644 (file)
@@ -724,7 +724,9 @@ DecodeInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
    change->data.tp.clear_toast_afterwards = true;
 
-   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+                            change,
+                            xlrec->flags & XLH_INSERT_ON_TOAST_RELATION);
 }
 
 /*
@@ -791,7 +793,8 @@ DecodeUpdate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
    change->data.tp.clear_toast_afterwards = true;
 
-   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+                            change, false);
 }
 
 /*
@@ -848,7 +851,8 @@ DecodeDelete(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
    change->data.tp.clear_toast_afterwards = true;
 
-   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+                            change, false);
 }
 
 /*
@@ -884,7 +888,7 @@ DecodeTruncate(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
    memcpy(change->data.truncate.relids, xlrec->relids,
           xlrec->nrelids * sizeof(Oid));
    ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
-                            buf->origptr, change);
+                            buf->origptr, change, false);
 }
 
 /*
@@ -984,7 +988,7 @@ DecodeMultiInsert(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
            change->data.tp.clear_toast_afterwards = false;
 
        ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r),
-                                buf->origptr, change);
+                                buf->origptr, change, false);
 
        /* move to the next xl_multi_insert_tuple entry */
        data += datalen;
@@ -1022,7 +1026,8 @@ DecodeSpecConfirm(LogicalDecodingContext *ctx, XLogRecordBuffer *buf)
 
    change->data.tp.clear_toast_afterwards = true;
 
-   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr, change);
+   ReorderBufferQueueChange(ctx->reorder, XLogRecGetXid(r), buf->origptr,
+                            change, false);
 }
 
 
index 05d24b93da02126d2b8116a6b733890a76c493dd..42f284b33f6bc39a2d3aef3f45260fe6207d39d0 100644 (file)
@@ -1442,3 +1442,13 @@ LogicalConfirmReceivedLocation(XLogRecPtr lsn)
        SpinLockRelease(&MyReplicationSlot->mutex);
    }
 }
+
+/*
+ * Clear logical streaming state during (sub)transaction abort.
+ */
+void
+ResetLogicalStreamingState(void)
+{
+   CheckXidAlive = InvalidTransactionId;
+   bsysscan = false;
+}
index ce6e62152f037306544ee13785d5c7fe827e43a0..5b7afe6d9e9cfd6222e7aa74a29115684811b72f 100644 (file)
@@ -178,6 +178,21 @@ typedef struct ReorderBufferDiskChange
    /* data follows */
 } ReorderBufferDiskChange;
 
+#define IsSpecInsert(action) \
+( \
+   ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT) \
+)
+#define IsSpecConfirm(action) \
+( \
+   ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM) \
+)
+#define IsInsertOrUpdate(action) \
+( \
+   (((action) == REORDER_BUFFER_CHANGE_INSERT) || \
+   ((action) == REORDER_BUFFER_CHANGE_UPDATE) || \
+   ((action) == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT)) \
+)
+
 /*
  * Maximum number of changes kept in memory, per transaction. After that,
  * changes are spooled to disk.
@@ -236,6 +251,7 @@ static Size ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn
 static void ReorderBufferRestoreChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
                                       char *change);
 static void ReorderBufferRestoreCleanup(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
 static void ReorderBufferCleanupSerializedTXNs(const char *slotname);
 static void ReorderBufferSerializedPath(char *path, ReplicationSlot *slot,
                                        TransactionId xid, XLogSegNo segno);
@@ -244,6 +260,16 @@ static void ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap);
 static Snapshot ReorderBufferCopySnap(ReorderBuffer *rb, Snapshot orig_snap,
                                      ReorderBufferTXN *txn, CommandId cid);
 
+/*
+ * ---------------------------------------
+ * Streaming support functions
+ * ---------------------------------------
+ */
+static inline bool ReorderBufferCanStream(ReorderBuffer *rb);
+static inline bool ReorderBufferCanStartStreaming(ReorderBuffer *rb);
+static void ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn);
+static void ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn);
+
 /* ---------------------------------------
  * toast reassembly support
  * ---------------------------------------
@@ -367,6 +393,9 @@ ReorderBufferGetTXN(ReorderBuffer *rb)
    dlist_init(&txn->tuplecids);
    dlist_init(&txn->subtxns);
 
+   /* InvalidCommandId is not zero, so set it explicitly */
+   txn->command_id = InvalidCommandId;
+
    return txn;
 }
 
@@ -416,13 +445,15 @@ ReorderBufferGetChange(ReorderBuffer *rb)
 }
 
 /*
- * Free an ReorderBufferChange.
+ * Free a ReorderBufferChange and update memory accounting, if requested.
  */
 void
-ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change)
+ReorderBufferReturnChange(ReorderBuffer *rb, ReorderBufferChange *change,
+                         bool upd_mem)
 {
    /* update memory accounting info */
-   ReorderBufferChangeMemoryUpdate(rb, change, false);
+   if (upd_mem)
+       ReorderBufferChangeMemoryUpdate(rb, change, false);
 
    /* free contained data */
    switch (change->action)
@@ -624,16 +655,102 @@ ReorderBufferTXNByXid(ReorderBuffer *rb, TransactionId xid, bool create,
 }
 
 /*
- * Queue a change into a transaction so it can be replayed upon commit.
+ * Record the partial change for the streaming of in-progress transactions.  We
+ * can stream only complete changes so if we have a partial change like toast
+ * table insert or speculative insert then we mark such a 'txn' so that it
+ * can't be streamed.  We also ensure that if the changes in such a 'txn' are
+ * above logical_decoding_work_mem threshold then we stream them as soon as we
+ * have a complete change.
+ */
+static void
+ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                                 ReorderBufferChange *change,
+                                 bool toast_insert)
+{
+   ReorderBufferTXN *toptxn;
+
+   /*
+    * The partial changes need to be processed only while streaming
+    * in-progress transactions.
+    */
+   if (!ReorderBufferCanStream(rb))
+       return;
+
+   /* Get the top transaction. */
+   if (txn->toptxn != NULL)
+       toptxn = txn->toptxn;
+   else
+       toptxn = txn;
+
+   /*
+    * Set the toast insert bit whenever we get toast insert to indicate a
+    * partial change and clear it when we get the insert or update on main
+    * table (Both update and insert will do the insert in the toast table).
+    */
+   if (toast_insert)
+       toptxn->txn_flags |= RBTXN_HAS_TOAST_INSERT;
+   else if (rbtxn_has_toast_insert(toptxn) &&
+            IsInsertOrUpdate(change->action))
+       toptxn->txn_flags &= ~RBTXN_HAS_TOAST_INSERT;
+
+   /*
+    * Set the spec insert bit whenever we get the speculative insert to
+    * indicate the partial change and clear the same on speculative confirm.
+    */
+   if (IsSpecInsert(change->action))
+       toptxn->txn_flags |= RBTXN_HAS_SPEC_INSERT;
+   else if (IsSpecConfirm(change->action))
+   {
+       /*
+        * Speculative confirm change must be preceded by speculative
+        * insertion.
+        */
+       Assert(rbtxn_has_spec_insert(toptxn));
+       toptxn->txn_flags &= ~RBTXN_HAS_SPEC_INSERT;
+   }
+
+   /*
+    * Stream the transaction if it is serialized before and the changes are
+    * now complete in the top-level transaction.
+    *
+    * The reason for doing the streaming of such a transaction as soon as we
+    * get the complete change for it is that previously it would have reached
+    * the memory threshold and wouldn't get streamed because of incomplete
+    * changes.  Delaying such transactions would increase apply lag for them.
+    */
+   if (ReorderBufferCanStartStreaming(rb) &&
+       !(rbtxn_has_incomplete_tuple(toptxn)) &&
+       rbtxn_is_serialized(txn))
+       ReorderBufferStreamTXN(rb, toptxn);
+}
+
+/*
+ * Queue a change into a transaction so it can be replayed upon commit or will be
+ * streamed when we reach logical_decoding_work_mem threshold.
  */
 void
 ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
-                        ReorderBufferChange *change)
+                        ReorderBufferChange *change, bool toast_insert)
 {
    ReorderBufferTXN *txn;
 
    txn = ReorderBufferTXNByXid(rb, xid, true, NULL, lsn, true);
 
+   /*
+    * While streaming the previous changes we have detected that the
+    * transaction is aborted.  So there is no point in collecting further
+    * changes for it.
+    */
+   if (txn->concurrent_abort)
+   {
+       /*
+        * We don't need to update memory accounting for this change as we
+        * have not added it to the queue yet.
+        */
+       ReorderBufferReturnChange(rb, change, false);
+       return;
+   }
+
    change->lsn = lsn;
    change->txn = txn;
 
@@ -645,6 +762,9 @@ ReorderBufferQueueChange(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn,
    /* update memory accounting information */
    ReorderBufferChangeMemoryUpdate(rb, change, true);
 
+   /* process partial change */
+   ReorderBufferProcessPartialChange(rb, txn, change, toast_insert);
+
    /* check the memory limits and evict something if needed */
    ReorderBufferCheckMemoryLimit(rb);
 }
@@ -674,7 +794,7 @@ ReorderBufferQueueMessage(ReorderBuffer *rb, TransactionId xid,
        change->data.msg.message = palloc(message_size);
        memcpy(change->data.msg.message, message, message_size);
 
-       ReorderBufferQueueChange(rb, xid, lsn, change);
+       ReorderBufferQueueChange(rb, xid, lsn, change, false);
 
        MemoryContextSwitchTo(oldcontext);
    }
@@ -763,6 +883,38 @@ AssertTXNLsnOrder(ReorderBuffer *rb)
 #endif
 }
 
+/*
+ * AssertChangeLsnOrder
+ *
+ * Check ordering of changes in the (sub)transaction.
+ */
+static void
+AssertChangeLsnOrder(ReorderBufferTXN *txn)
+{
+#ifdef USE_ASSERT_CHECKING
+   dlist_iter  iter;
+   XLogRecPtr  prev_lsn = txn->first_lsn;
+
+   dlist_foreach(iter, &txn->changes)
+   {
+       ReorderBufferChange *cur_change;
+
+       cur_change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+       Assert(txn->first_lsn != InvalidXLogRecPtr);
+       Assert(cur_change->lsn != InvalidXLogRecPtr);
+       Assert(txn->first_lsn <= cur_change->lsn);
+
+       if (txn->end_lsn != InvalidXLogRecPtr)
+           Assert(cur_change->lsn <= txn->end_lsn);
+
+       Assert(prev_lsn <= cur_change->lsn);
+
+       prev_lsn = cur_change->lsn;
+   }
+#endif
+}
+
 /*
  * ReorderBufferGetOldestTXN
  *     Return oldest transaction in reorderbuffer
@@ -1018,6 +1170,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
    *iter_state = NULL;
 
+   /* Check ordering of changes in the toplevel transaction. */
+   AssertChangeLsnOrder(txn);
+
    /*
     * Calculate the size of our heap: one element for every transaction that
     * contains changes.  (Besides the transactions already in the reorder
@@ -1032,6 +1187,9 @@ ReorderBufferIterTXNInit(ReorderBuffer *rb, ReorderBufferTXN *txn,
 
        cur_txn = dlist_container(ReorderBufferTXN, node, cur_txn_i.cur);
 
+       /* Check ordering of changes in this subtransaction. */
+       AssertChangeLsnOrder(cur_txn);
+
        if (cur_txn->nentries > 0)
            nr_txns++;
    }
@@ -1148,7 +1306,7 @@ ReorderBufferIterTXNNext(ReorderBuffer *rb, ReorderBufferIterTXNState *state)
    {
        change = dlist_container(ReorderBufferChange, node,
                                 dlist_pop_head_node(&state->old_change));
-       ReorderBufferReturnChange(rb, change);
+       ReorderBufferReturnChange(rb, change, true);
        Assert(dlist_is_empty(&state->old_change));
    }
 
@@ -1234,7 +1392,7 @@ ReorderBufferIterTXNFinish(ReorderBuffer *rb,
 
        change = dlist_container(ReorderBufferChange, node,
                                 dlist_pop_head_node(&state->old_change));
-       ReorderBufferReturnChange(rb, change);
+       ReorderBufferReturnChange(rb, change, true);
        Assert(dlist_is_empty(&state->old_change));
    }
 
@@ -1280,7 +1438,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        /* Check we're not mixing changes from different transactions. */
        Assert(change->txn == txn);
 
-       ReorderBufferReturnChange(rb, change);
+       ReorderBufferReturnChange(rb, change, true);
    }
 
    /*
@@ -1297,7 +1455,7 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        Assert(change->txn == txn);
        Assert(change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID);
 
-       ReorderBufferReturnChange(rb, change);
+       ReorderBufferReturnChange(rb, change, true);
    }
 
    /*
@@ -1309,6 +1467,15 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
        dlist_delete(&txn->base_snapshot_node);
    }
 
+   /*
+    * Cleanup the snapshot for the last streamed run.
+    */
+   if (txn->snapshot_now != NULL)
+   {
+       Assert(rbtxn_is_streamed(txn));
+       ReorderBufferFreeSnap(rb, txn->snapshot_now);
+   }
+
    /*
     * Remove TXN from its containing list.
     *
@@ -1334,6 +1501,91 @@ ReorderBufferCleanupTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
    ReorderBufferReturnTXN(rb, txn);
 }
 
+/*
+ * Discard changes from a transaction (and subtransactions), after streaming
+ * them.  Keep the remaining info - transactions, tuplecids, invalidations and
+ * snapshots.
+ */
+static void
+ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+   dlist_mutable_iter iter;
+
+   /* cleanup subtransactions & their changes */
+   dlist_foreach_modify(iter, &txn->subtxns)
+   {
+       ReorderBufferTXN *subtxn;
+
+       subtxn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+       /*
+        * Subtransactions are always associated to the toplevel TXN, even if
+        * they originally were happening inside another subtxn, so we won't
+        * ever recurse more than one level deep here.
+        */
+       Assert(rbtxn_is_known_subxact(subtxn));
+       Assert(subtxn->nsubtxns == 0);
+
+       ReorderBufferTruncateTXN(rb, subtxn);
+   }
+
+   /* cleanup changes in the toplevel txn */
+   dlist_foreach_modify(iter, &txn->changes)
+   {
+       ReorderBufferChange *change;
+
+       change = dlist_container(ReorderBufferChange, node, iter.cur);
+
+       /* Check we're not mixing changes from different transactions. */
+       Assert(change->txn == txn);
+
+       /* remove the change from it's containing list */
+       dlist_delete(&change->node);
+
+       ReorderBufferReturnChange(rb, change, true);
+   }
+
+   /*
+    * Mark the transaction as streamed.
+    *
+    * The toplevel transaction, identified by (toptxn==NULL), is marked as
+    * streamed always, even if it does not contain any changes (that is, when
+    * all the changes are in subtransactions).
+    *
+    * For subtransactions, we only mark them as streamed when there are
+    * changes in them.
+    *
+    * We do it this way because of aborts - we don't want to send aborts for
+    * XIDs the downstream is not aware of. And of course, it always knows
+    * about the toplevel xact (we send the XID in all messages), but we never
+    * stream XIDs of empty subxacts.
+    */
+   if ((!txn->toptxn) || (txn->nentries_mem != 0))
+       txn->txn_flags |= RBTXN_IS_STREAMED;
+
+   /*
+    * Destroy the (relfilenode, ctid) hashtable, so that we don't leak any
+    * memory. We could also keep the hash table and update it with new ctid
+    * values, but this seems simpler and good enough for now.
+    */
+   if (txn->tuplecid_hash != NULL)
+   {
+       hash_destroy(txn->tuplecid_hash);
+       txn->tuplecid_hash = NULL;
+   }
+
+   /* If this txn is serialized then clean the disk space. */
+   if (rbtxn_is_serialized(txn))
+   {
+       ReorderBufferRestoreCleanup(rb, txn);
+       txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
+   }
+
+   /* also reset the number of entries in the transaction */
+   txn->nentries_mem = 0;
+   txn->nentries = 0;
+}
+
 /*
  * Build a hash with a (relfilenode, ctid) -> (cmin, cmax) mapping for use by
  * HeapTupleSatisfiesHistoricMVCC.
@@ -1485,57 +1737,191 @@ ReorderBufferFreeSnap(ReorderBuffer *rb, Snapshot snap)
 }
 
 /*
- * Perform the replay of a transaction and its non-aborted subtransactions.
- *
- * Subtransactions previously have to be processed by
- * ReorderBufferCommitChild(), even if previously assigned to the toplevel
- * transaction with ReorderBufferAssignChild.
- *
- * We currently can only decode a transaction's contents when its commit
- * record is read because that's the only place where we know about cache
- * invalidations. Thus, once a toplevel commit is read, we iterate over the top
- * and subtransactions (using a k-way merge) and replay the changes in lsn
- * order.
+ * If the transaction was (partially) streamed, we need to commit it in a
+ * 'streamed' way.  That is, we first stream the remaining part of the
+ * transaction, and then invoke stream_commit message.
  */
-void
-ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
-                   XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
-                   TimestampTz commit_time,
-                   RepOriginId origin_id, XLogRecPtr origin_lsn)
+static void
+ReorderBufferStreamCommit(ReorderBuffer *rb, ReorderBufferTXN *txn)
 {
-   ReorderBufferTXN *txn;
-   volatile Snapshot snapshot_now;
-   volatile CommandId command_id = FirstCommandId;
-   bool        using_subtxn;
-   ReorderBufferIterTXNState *volatile iterstate = NULL;
+   /* we should only call this for previously streamed transactions */
+   Assert(rbtxn_is_streamed(txn));
 
-   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
-                               false);
+   ReorderBufferStreamTXN(rb, txn);
 
-   /* unknown transaction, nothing to replay */
-   if (txn == NULL)
-       return;
+   rb->stream_commit(rb, txn, txn->final_lsn);
 
-   txn->final_lsn = commit_lsn;
-   txn->end_lsn = end_lsn;
-   txn->commit_time = commit_time;
-   txn->origin_id = origin_id;
-   txn->origin_lsn = origin_lsn;
+   ReorderBufferCleanupTXN(rb, txn);
+}
 
+/*
+ * Set xid to detect concurrent aborts.
+ *
+ * While streaming an in-progress transaction there is a possibility that the
+ * (sub)transaction might get aborted concurrently.  In such case if the
+ * (sub)transaction has catalog update then we might decode the tuple using
+ * wrong catalog version.  For example, suppose there is one catalog tuple with
+ * (xmin: 500, xmax: 0).  Now, the transaction 501 updates the catalog tuple
+ * and after that we will have two tuples (xmin: 500, xmax: 501) and
+ * (xmin: 501, xmax: 0).  Now, if 501 is aborted and some other transaction
+ * say 502 updates the same catalog tuple then the first tuple will be changed
+ * to (xmin: 500, xmax: 502).  So, the problem is that when we try to decode
+ * the tuple inserted/updated in 501 after the catalog update, we will see the
+ * catalog tuple with (xmin: 500, xmax: 502) as visible because it will
+ * consider that the tuple is deleted by xid 502 which is not visible to our
+ * snapshot.  And when we will try to decode with that catalog tuple, it can
+ * lead to a wrong result or a crash.  So, it is necessary to detect
+ * concurrent aborts to allow streaming of in-progress transactions.
+ *
+ * For detecting the concurrent abort we set CheckXidAlive to the current
+ * (sub)transaction's xid for which this change belongs to.  And, during
+ * catalog scan we can check the status of the xid and if it is aborted we will
+ * report a specific error so that we can stop streaming current transaction
+ * and discard the already streamed changes on such an error.  We might have
+ * already streamed some of the changes for the aborted (sub)transaction, but
+ * that is fine because when we decode the abort we will stream abort message
+ * to truncate the changes in the subscriber.
+ */
+static inline void
+SetupCheckXidLive(TransactionId xid)
+{
    /*
-    * If this transaction has no snapshot, it didn't make any changes to the
-    * database, so there's nothing to decode.  Note that
-    * ReorderBufferCommitChild will have transferred any snapshots from
-    * subtransactions if there were any.
+    * If the input transaction id is already set as a CheckXidAlive then
+    * nothing to do.
     */
-   if (txn->base_snapshot == NULL)
-   {
-       Assert(txn->ninvalidations == 0);
-       ReorderBufferCleanupTXN(rb, txn);
+   if (TransactionIdEquals(CheckXidAlive, xid))
        return;
+
+   /*
+    * setup CheckXidAlive if it's not committed yet.  We don't check if the
+    * xid is aborted.  That will happen during catalog access.
+    */
+   if (!TransactionIdDidCommit(xid))
+       CheckXidAlive = xid;
+   else
+       CheckXidAlive = InvalidTransactionId;
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying change.
+ */
+static inline void
+ReorderBufferApplyChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                        Relation relation, ReorderBufferChange *change,
+                        bool streaming)
+{
+   if (streaming)
+       rb->stream_change(rb, txn, relation, change);
+   else
+       rb->apply_change(rb, txn, relation, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the truncate.
+ */
+static inline void
+ReorderBufferApplyTruncate(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                          int nrelations, Relation *relations,
+                          ReorderBufferChange *change, bool streaming)
+{
+   if (streaming)
+       rb->stream_truncate(rb, txn, nrelations, relations, change);
+   else
+       rb->apply_truncate(rb, txn, nrelations, relations, change);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN for applying the message.
+ */
+static inline void
+ReorderBufferApplyMessage(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                         ReorderBufferChange *change, bool streaming)
+{
+   if (streaming)
+       rb->stream_message(rb, txn, change->lsn, true,
+                          change->data.msg.prefix,
+                          change->data.msg.message_size,
+                          change->data.msg.message);
+   else
+       rb->message(rb, txn, change->lsn, true,
+                   change->data.msg.prefix,
+                   change->data.msg.message_size,
+                   change->data.msg.message);
+}
+
+/*
+ * Function to store the command id and snapshot at the end of the current
+ * stream so that we can reuse the same while sending the next stream.
+ */
+static inline void
+ReorderBufferSaveTXNSnapshot(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                            Snapshot snapshot_now, CommandId command_id)
+{
+   txn->command_id = command_id;
+
+   /* Avoid copying if it's already copied. */
+   if (snapshot_now->copied)
+       txn->snapshot_now = snapshot_now;
+   else
+       txn->snapshot_now = ReorderBufferCopySnap(rb, snapshot_now,
+                                                 txn, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferProcessTXN to handle the concurrent
+ * abort of the streaming transaction.  This resets the TXN such that it
+ * can be used to stream the remaining data of transaction being processed.
+ */
+static void
+ReorderBufferResetTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                     Snapshot snapshot_now,
+                     CommandId command_id,
+                     XLogRecPtr last_lsn,
+                     ReorderBufferChange *specinsert)
+{
+   /* Discard the changes that we just streamed */
+   ReorderBufferTruncateTXN(rb, txn);
+
+   /* Free all resources allocated for toast reconstruction */
+   ReorderBufferToastReset(rb, txn);
+
+   /* Return the spec insert change if it is not NULL */
+   if (specinsert != NULL)
+   {
+       ReorderBufferReturnChange(rb, specinsert, true);
+       specinsert = NULL;
    }
 
-   snapshot_now = txn->base_snapshot;
+   /* Stop the stream. */
+   rb->stream_stop(rb, txn, last_lsn);
+
+   /* Remember the command ID and snapshot for the streaming run */
+   ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+}
+
+/*
+ * Helper function for ReorderBufferCommit and ReorderBufferStreamTXN.
+ *
+ * Send data of a transaction (and its subtransactions) to the
+ * output plugin. We iterate over the top and subtransactions (using a k-way
+ * merge) and replay the changes in lsn order.
+ *
+ * If streaming is true then data will be sent using stream API.
+ */
+static void
+ReorderBufferProcessTXN(ReorderBuffer *rb, ReorderBufferTXN *txn,
+                       XLogRecPtr commit_lsn,
+                       volatile Snapshot snapshot_now,
+                       volatile CommandId command_id,
+                       bool streaming)
+{
+   bool        using_subtxn;
+   MemoryContext ccxt = CurrentMemoryContext;
+   ReorderBufferIterTXNState *volatile iterstate = NULL;
+   volatile XLogRecPtr prev_lsn = InvalidXLogRecPtr;
+   ReorderBufferChange *volatile specinsert = NULL;
+   volatile bool stream_started = false;
+   ReorderBufferTXN *volatile curtxn = NULL;
 
    /* build data to be able to lookup the CommandIds of catalog tuples */
    ReorderBufferBuildTupleCidHash(rb, txn);
@@ -1558,14 +1944,15 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
    PG_TRY();
    {
        ReorderBufferChange *change;
-       ReorderBufferChange *specinsert = NULL;
 
        if (using_subtxn)
-           BeginInternalSubTransaction("replay");
+           BeginInternalSubTransaction(streaming ? "stream" : "replay");
        else
            StartTransactionCommand();
 
-       rb->begin(rb, txn);
+       /* We only need to send begin/commit for non-streamed transactions. */
+       if (!streaming)
+           rb->begin(rb, txn);
 
        ReorderBufferIterTXNInit(rb, txn, &iterstate);
        while ((change = ReorderBufferIterTXNNext(rb, iterstate)) != NULL)
@@ -1573,6 +1960,36 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
            Relation    relation = NULL;
            Oid         reloid;
 
+           /*
+            * We can't call start stream callback before processing first
+            * change.
+            */
+           if (prev_lsn == InvalidXLogRecPtr)
+           {
+               if (streaming)
+               {
+                   txn->origin_id = change->origin_id;
+                   rb->stream_start(rb, txn, change->lsn);
+                   stream_started = true;
+               }
+           }
+
+           /*
+            * Enforce correct ordering of changes, merged from multiple
+            * subtransactions. The changes may have the same LSN due to
+            * MULTI_INSERT xlog records.
+            */
+           Assert(prev_lsn == InvalidXLogRecPtr || prev_lsn <= change->lsn);
+
+           prev_lsn = change->lsn;
+
+           /* Set the current xid to detect concurrent aborts. */
+           if (streaming)
+           {
+               curtxn = change->txn;
+               SetupCheckXidLive(curtxn->xid);
+           }
+
            switch (change->action)
            {
                case REORDER_BUFFER_CHANGE_INTERNAL_SPEC_CONFIRM:
@@ -1649,7 +2066,8 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                    if (!IsToastRelation(relation))
                    {
                        ReorderBufferToastReplace(rb, txn, relation, change);
-                       rb->apply_change(rb, txn, relation, change);
+                       ReorderBufferApplyChange(rb, txn, relation, change,
+                                                streaming);
 
                        /*
                         * Only clear reassembled toast chunks if we're sure
@@ -1685,11 +2103,11 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                     */
                    if (specinsert != NULL)
                    {
-                       ReorderBufferReturnChange(rb, specinsert);
+                       ReorderBufferReturnChange(rb, specinsert, true);
                        specinsert = NULL;
                    }
 
-                   if (relation != NULL)
+                   if (RelationIsValid(relation))
                    {
                        RelationClose(relation);
                        relation = NULL;
@@ -1714,7 +2132,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                    /* clear out a pending (and thus failed) speculation */
                    if (specinsert != NULL)
                    {
-                       ReorderBufferReturnChange(rb, specinsert);
+                       ReorderBufferReturnChange(rb, specinsert, true);
                        specinsert = NULL;
                    }
 
@@ -1747,7 +2165,10 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                            relations[nrelations++] = relation;
                        }
 
-                       rb->apply_truncate(rb, txn, nrelations, relations, change);
+                       /* Apply the truncate. */
+                       ReorderBufferApplyTruncate(rb, txn, nrelations,
+                                                  relations, change,
+                                                  streaming);
 
                        for (i = 0; i < nrelations; i++)
                            RelationClose(relations[i]);
@@ -1756,10 +2177,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                    }
 
                case REORDER_BUFFER_CHANGE_MESSAGE:
-                   rb->message(rb, txn, change->lsn, true,
-                               change->data.msg.prefix,
-                               change->data.msg.message_size,
-                               change->data.msg.message);
+                   ReorderBufferApplyMessage(rb, txn, change, streaming);
                    break;
 
                case REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT:
@@ -1790,7 +2208,6 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
                        snapshot_now = change->data.snapshot;
                    }
 
-
                    /* and continue with the new one */
                    SetupHistoricSnapshot(snapshot_now, txn->tuplecid_hash);
                    break;
@@ -1837,7 +2254,7 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
         */
        if (specinsert)
        {
-           ReorderBufferReturnChange(rb, specinsert);
+           ReorderBufferReturnChange(rb, specinsert, true);
            specinsert = NULL;
        }
 
@@ -1845,14 +2262,35 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
        ReorderBufferIterTXNFinish(rb, iterstate);
        iterstate = NULL;
 
-       /* call commit callback */
-       rb->commit(rb, txn, commit_lsn);
+       /*
+        * Done with current changes, send the last message for this set of
+        * changes depending upon streaming mode.
+        */
+       if (streaming)
+       {
+           if (stream_started)
+           {
+               rb->stream_stop(rb, txn, prev_lsn);
+               stream_started = false;
+           }
+       }
+       else
+           rb->commit(rb, txn, commit_lsn);
 
        /* this is just a sanity check against bad output plugin behaviour */
        if (GetCurrentTransactionIdIfAny() != InvalidTransactionId)
            elog(ERROR, "output plugin used XID %u",
                 GetCurrentTransactionId());
 
+       /*
+        * Remember the command ID and snapshot for the next set of changes in
+        * streaming mode.
+        */
+       if (streaming)
+           ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
+       else if (snapshot_now->copied)
+           ReorderBufferFreeSnap(rb, snapshot_now);
+
        /* cleanup */
        TeardownHistoricSnapshot(false);
 
@@ -1870,14 +2308,27 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
        if (using_subtxn)
            RollbackAndReleaseCurrentSubTransaction();
 
-       if (snapshot_now->copied)
-           ReorderBufferFreeSnap(rb, snapshot_now);
+       /*
+        * If we are streaming the in-progress transaction then discard the
+        * changes that we just streamed, and mark the transactions as
+        * streamed (if they contained changes). Otherwise, remove all the
+        * changes and deallocate the ReorderBufferTXN.
+        */
+       if (streaming)
+       {
+           ReorderBufferTruncateTXN(rb, txn);
 
-       /* remove potential on-disk data, and deallocate */
-       ReorderBufferCleanupTXN(rb, txn);
+           /* Reset the CheckXidAlive */
+           CheckXidAlive = InvalidTransactionId;
+       }
+       else
+           ReorderBufferCleanupTXN(rb, txn);
    }
    PG_CATCH();
    {
+       MemoryContext ecxt = MemoryContextSwitchTo(ccxt);
+       ErrorData  *errdata = CopyErrorData();
+
        /* TODO: Encapsulate cleanup from the PG_TRY and PG_CATCH blocks */
        if (iterstate)
            ReorderBufferIterTXNFinish(rb, iterstate);
@@ -1896,15 +2347,106 @@ ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
        if (using_subtxn)
            RollbackAndReleaseCurrentSubTransaction();
 
-       if (snapshot_now->copied)
-           ReorderBufferFreeSnap(rb, snapshot_now);
+       /*
+        * The error code ERRCODE_TRANSACTION_ROLLBACK indicates a concurrent
+        * abort of the (sub)transaction we are streaming. We need to do the
+        * cleanup and return gracefully on this error, see SetupCheckXidLive.
+        */
+       if (errdata->sqlerrcode == ERRCODE_TRANSACTION_ROLLBACK)
+       {
+           /*
+            * This error can only occur when we are sending the data in
+            * streaming mode and the streaming is not finished yet.
+            */
+           Assert(streaming);
+           Assert(stream_started);
+
+           /* Cleanup the temporary error state. */
+           FlushErrorState();
+           FreeErrorData(errdata);
+           errdata = NULL;
+           curtxn->concurrent_abort = true;
+
+           /* Reset the TXN so that it is allowed to stream remaining data. */
+           ReorderBufferResetTXN(rb, txn, snapshot_now,
+                                 command_id, prev_lsn,
+                                 specinsert);
+       }
+       else
+       {
+           ReorderBufferCleanupTXN(rb, txn);
+           MemoryContextSwitchTo(ecxt);
+           PG_RE_THROW();
+       }
+   }
+   PG_END_TRY();
+}
 
-       /* remove potential on-disk data, and deallocate */
-       ReorderBufferCleanupTXN(rb, txn);
+/*
+ * Perform the replay of a transaction and its non-aborted subtransactions.
+ *
+ * Subtransactions previously have to be processed by
+ * ReorderBufferCommitChild(), even if previously assigned to the toplevel
+ * transaction with ReorderBufferAssignChild.
+ *
+ * This interface is called once a toplevel commit is read for both streamed
+ * as well as non-streamed transactions.
+ */
+void
+ReorderBufferCommit(ReorderBuffer *rb, TransactionId xid,
+                   XLogRecPtr commit_lsn, XLogRecPtr end_lsn,
+                   TimestampTz commit_time,
+                   RepOriginId origin_id, XLogRecPtr origin_lsn)
+{
+   ReorderBufferTXN *txn;
+   Snapshot    snapshot_now;
+   CommandId   command_id = FirstCommandId;
+
+   txn = ReorderBufferTXNByXid(rb, xid, false, NULL, InvalidXLogRecPtr,
+                               false);
 
-       PG_RE_THROW();
+   /* unknown transaction, nothing to replay */
+   if (txn == NULL)
+       return;
+
+   txn->final_lsn = commit_lsn;
+   txn->end_lsn = end_lsn;
+   txn->commit_time = commit_time;
+   txn->origin_id = origin_id;
+   txn->origin_lsn = origin_lsn;
+
+   /*
+    * If the transaction was (partially) streamed, we need to commit it in a
+    * 'streamed' way. That is, we first stream the remaining part of the
+    * transaction, and then invoke stream_commit message.
+    *
+    * Called after everything (origin ID, LSN, ...) is stored in the
+    * transaction to avoid passing that information directly.
+    */
+   if (rbtxn_is_streamed(txn))
+   {
+       ReorderBufferStreamCommit(rb, txn);
+       return;
    }
-   PG_END_TRY();
+
+   /*
+    * If this transaction has no snapshot, it didn't make any changes to the
+    * database, so there's nothing to decode.  Note that
+    * ReorderBufferCommitChild will have transferred any snapshots from
+    * subtransactions if there were any.
+    */
+   if (txn->base_snapshot == NULL)
+   {
+       Assert(txn->ninvalidations == 0);
+       ReorderBufferCleanupTXN(rb, txn);
+       return;
+   }
+
+   snapshot_now = txn->base_snapshot;
+
+   /* Process and send the changes to output plugin. */
+   ReorderBufferProcessTXN(rb, txn, commit_lsn, snapshot_now,
+                           command_id, false);
 }
 
 /*
@@ -1931,6 +2473,22 @@ ReorderBufferAbort(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    if (txn == NULL)
        return;
 
+   /* For streamed transactions notify the remote node about the abort. */
+   if (rbtxn_is_streamed(txn))
+   {
+       rb->stream_abort(rb, txn, lsn);
+
+       /*
+        * We might have decoded changes for this transaction that could load
+        * the cache as per the current transaction's view (consider DDL's
+        * happened in this transaction). We don't want the decoding of future
+        * transactions to use those cache entries so execute invalidations.
+        */
+       if (txn->ninvalidations > 0)
+           ReorderBufferImmediateInvalidation(rb, txn->ninvalidations,
+                                              txn->invalidations);
+   }
+
    /* cosmetic... */
    txn->final_lsn = lsn;
 
@@ -2000,6 +2558,10 @@ ReorderBufferForget(ReorderBuffer *rb, TransactionId xid, XLogRecPtr lsn)
    if (txn == NULL)
        return;
 
+   /* For streamed transactions notify the remote node about the abort. */
+   if (rbtxn_is_streamed(txn))
+       rb->stream_abort(rb, txn, lsn);
+
    /* cosmetic... */
    txn->final_lsn = lsn;
 
@@ -2082,7 +2644,7 @@ ReorderBufferAddSnapshot(ReorderBuffer *rb, TransactionId xid,
    change->data.snapshot = snap;
    change->action = REORDER_BUFFER_CHANGE_INTERNAL_SNAPSHOT;
 
-   ReorderBufferQueueChange(rb, xid, lsn, change);
+   ReorderBufferQueueChange(rb, xid, lsn, change, false);
 }
 
 /*
@@ -2131,12 +2693,21 @@ ReorderBufferAddNewCommandId(ReorderBuffer *rb, TransactionId xid,
    change->data.command_id = cid;
    change->action = REORDER_BUFFER_CHANGE_INTERNAL_COMMAND_ID;
 
-   ReorderBufferQueueChange(rb, xid, lsn, change);
+   ReorderBufferQueueChange(rb, xid, lsn, change, false);
 }
 
 /*
- * Update the memory accounting info. We track memory used by the whole
- * reorder buffer and the transaction containing the change.
+ * Update memory counters to account for the new or removed change.
+ *
+ * We update two counters - in the reorder buffer, and in the transaction
+ * containing the change. The reorder buffer counter allows us to quickly
+ * decide if we reached the memory limit, the transaction counter allows
+ * us to quickly pick the largest transaction for eviction.
+ *
+ * When streaming is enabled, we need to update the toplevel transaction
+ * counters instead - we don't really care about subtransactions as we
+ * can't stream them individually anyway, and we only pick toplevel
+ * transactions for eviction. So only toplevel transactions matter.
  */
 static void
 ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
@@ -2144,6 +2715,8 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
                                bool addition)
 {
    Size        sz;
+   ReorderBufferTXN *txn;
+   ReorderBufferTXN *toptxn = NULL;
 
    Assert(change->txn);
 
@@ -2155,19 +2728,41 @@ ReorderBufferChangeMemoryUpdate(ReorderBuffer *rb,
    if (change->action == REORDER_BUFFER_CHANGE_INTERNAL_TUPLECID)
        return;
 
+   txn = change->txn;
+
+   /* If streaming supported, update the total size in top level as well. */
+   if (ReorderBufferCanStream(rb))
+   {
+       if (txn->toptxn != NULL)
+           toptxn = txn->toptxn;
+       else
+           toptxn = txn;
+   }
+
    sz = ReorderBufferChangeSize(change);
 
    if (addition)
    {
-       change->txn->size += sz;
+       txn->size += sz;
        rb->size += sz;
+
+       /* Update the total size in the top transaction. */
+       if (toptxn)
+           toptxn->total_size += sz;
    }
    else
    {
-       Assert((rb->size >= sz) && (change->txn->size >= sz));
-       change->txn->size -= sz;
+       Assert((rb->size >= sz) && (txn->size >= sz));
+       txn->size -= sz;
        rb->size -= sz;
+
+       /* Update the total size in the top transaction. */
+       if (toptxn)
+           toptxn->total_size -= sz;
    }
+
+   Assert(txn->size <= rb->size);
+   Assert((txn->size >= 0) && (rb->size >= 0));
 }
 
 /*
@@ -2387,6 +2982,51 @@ ReorderBufferLargestTXN(ReorderBuffer *rb)
    return largest;
 }
 
+/*
+ * Find the largest toplevel transaction to evict (by streaming).
+ *
+ * This can be seen as an optimized version of ReorderBufferLargestTXN, which
+ * should give us the same transaction (because we don't update memory account
+ * for subtransaction with streaming, so it's always 0). But we can simply
+ * iterate over the limited number of toplevel transactions.
+ *
+ * Note that, we skip transactions that contains incomplete changes. There
+ * is a scope of optimization here such that we can select the largest transaction
+ * which has complete changes.  But that will make the code and design quite complex
+ * and that might not be worth the benefit.  If we plan to stream the transactions
+ * that contains incomplete changes then we need to find a way to partially
+ * stream/truncate the transaction changes in-memory and build a mechanism to
+ * partially truncate the spilled files.  Additionally, whenever we partially
+ * stream the transaction we need to maintain the last streamed lsn and next time
+ * we need to restore from that segment and the offset in WAL.  As we stream the
+ * changes from the top transaction and restore them subtransaction wise, we need
+ * to even remember the subxact from where we streamed the last change.
+ */
+static ReorderBufferTXN *
+ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+{
+   dlist_iter  iter;
+   Size        largest_size = 0;
+   ReorderBufferTXN *largest = NULL;
+
+   /* Find the largest top-level transaction. */
+   dlist_foreach(iter, &rb->toplevel_by_lsn)
+   {
+       ReorderBufferTXN *txn;
+
+       txn = dlist_container(ReorderBufferTXN, node, iter.cur);
+
+       if ((largest != NULL || txn->total_size > largest_size) &&
+           (txn->total_size > 0) && !(rbtxn_has_incomplete_tuple(txn)))
+       {
+           largest = txn;
+           largest_size = txn->total_size;
+       }
+   }
+
+   return largest;
+}
+
 /*
  * Check whether the logical_decoding_work_mem limit was reached, and if yes
  * pick the largest (sub)transaction at-a-time to evict and spill its changes to
@@ -2419,11 +3059,33 @@ ReorderBufferCheckMemoryLimit(ReorderBuffer *rb)
    {
        /*
         * Pick the largest transaction (or subtransaction) and evict it from
-        * memory by serializing it to disk.
+        * memory by streaming, if possible.  Otherwise, spill to disk.
         */
-       txn = ReorderBufferLargestTXN(rb);
+       if (ReorderBufferCanStartStreaming(rb) &&
+           (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+       {
+           /* we know there has to be one, because the size is not zero */
+           Assert(txn && !txn->toptxn);
+           Assert(txn->total_size > 0);
+           Assert(rb->size >= txn->total_size);
 
-       ReorderBufferSerializeTXN(rb, txn);
+           ReorderBufferStreamTXN(rb, txn);
+       }
+       else
+       {
+           /*
+            * Pick the largest transaction (or subtransaction) and evict it
+            * from memory by serializing it to disk.
+            */
+           txn = ReorderBufferLargestTXN(rb);
+
+           /* we know there has to be one, because the size is not zero */
+           Assert(txn);
+           Assert(txn->size > 0);
+           Assert(rb->size >= txn->size);
+
+           ReorderBufferSerializeTXN(rb, txn);
+       }
 
        /*
         * After eviction, the transaction should have no entries in memory,
@@ -2501,7 +3163,7 @@ ReorderBufferSerializeTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
 
        ReorderBufferSerializeChange(rb, txn, fd, change);
        dlist_delete(&change->node);
-       ReorderBufferReturnChange(rb, change);
+       ReorderBufferReturnChange(rb, change, true);
 
        spilled++;
    }
@@ -2713,6 +3375,136 @@ ReorderBufferSerializeChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
    Assert(ondisk->change.action == change->action);
 }
 
+/* Returns true, if the output plugin supports streaming, false, otherwise. */
+static inline bool
+ReorderBufferCanStream(ReorderBuffer *rb)
+{
+   LogicalDecodingContext *ctx = rb->private_data;
+
+   return ctx->streaming;
+}
+
+/* Returns true, if the streaming can be started now, false, otherwise. */
+static inline bool
+ReorderBufferCanStartStreaming(ReorderBuffer *rb)
+{
+   LogicalDecodingContext *ctx = rb->private_data;
+   SnapBuild  *builder = ctx->snapshot_builder;
+
+   /*
+    * We can't start streaming immediately even if the streaming is enabled
+    * because we previously decoded this transaction and now just are
+    * restarting.
+    */
+   if (ReorderBufferCanStream(rb) &&
+       !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+   {
+       /* We must have a consistent snapshot by this time */
+       Assert(SnapBuildCurrentState(builder) == SNAPBUILD_CONSISTENT);
+       return true;
+   }
+
+   return false;
+}
+
+/*
+ * Send data of a large transaction (and its subtransactions) to the
+ * output plugin, but using the stream API.
+ */
+static void
+ReorderBufferStreamTXN(ReorderBuffer *rb, ReorderBufferTXN *txn)
+{
+   Snapshot    snapshot_now;
+   CommandId   command_id;
+
+   /* We can never reach here for a subtransaction. */
+   Assert(txn->toptxn == NULL);
+
+   /*
+    * We can't make any assumptions about base snapshot here, similar to what
+    * ReorderBufferCommit() does. That relies on base_snapshot getting
+    * transferred from subxact in ReorderBufferCommitChild(), but that was
+    * not yet called as the transaction is in-progress.
+    *
+    * So just walk the subxacts and use the same logic here. But we only need
+    * to do that once, when the transaction is streamed for the first time.
+    * After that we need to reuse the snapshot from the previous run.
+    *
+    * Unlike DecodeCommit which adds xids of all the subtransactions in
+    * snapshot's xip array via SnapBuildCommittedTxn, we can't do that here
+    * but we do add them to subxip array instead via ReorderBufferCopySnap.
+    * This allows the catalog changes made in subtransactions decoded till
+    * now to be visible.
+    */
+   if (txn->snapshot_now == NULL)
+   {
+       dlist_iter  subxact_i;
+
+       /* make sure this transaction is streamed for the first time */
+       Assert(!rbtxn_is_streamed(txn));
+
+       /* at the beginning we should have invalid command ID */
+       Assert(txn->command_id == InvalidCommandId);
+
+       dlist_foreach(subxact_i, &txn->subtxns)
+       {
+           ReorderBufferTXN *subtxn;
+
+           subtxn = dlist_container(ReorderBufferTXN, node, subxact_i.cur);
+           ReorderBufferTransferSnapToParent(txn, subtxn);
+       }
+
+       /*
+        * If this transaction has no snapshot, it didn't make any changes to
+        * the database till now, so there's nothing to decode.
+        */
+       if (txn->base_snapshot == NULL)
+       {
+           Assert(txn->ninvalidations == 0);
+           return;
+       }
+
+       command_id = FirstCommandId;
+       snapshot_now = ReorderBufferCopySnap(rb, txn->base_snapshot,
+                                            txn, command_id);
+   }
+   else
+   {
+       /* the transaction must have been already streamed */
+       Assert(rbtxn_is_streamed(txn));
+
+       /*
+        * Nah, we already have snapshot from the previous streaming run. We
+        * assume new subxacts can't move the LSN backwards, and so can't beat
+        * the LSN condition in the previous branch (so no need to walk
+        * through subxacts again). In fact, we must not do that as we may be
+        * using snapshot half-way through the subxact.
+        */
+       command_id = txn->command_id;
+
+       /*
+        * We can't use txn->snapshot_now directly because after the last
+        * streaming run, we might have got some new sub-transactions. So we
+        * need to add them to the snapshot.
+        */
+       snapshot_now = ReorderBufferCopySnap(rb, txn->snapshot_now,
+                                            txn, command_id);
+
+       /* Free the previously copied snapshot. */
+       Assert(txn->snapshot_now->copied);
+       ReorderBufferFreeSnap(rb, txn->snapshot_now);
+       txn->snapshot_now = NULL;
+   }
+
+   /* Process and send the changes to output plugin. */
+   ReorderBufferProcessTXN(rb, txn, InvalidXLogRecPtr, snapshot_now,
+                           command_id, true);
+
+   Assert(dlist_is_empty(&txn->changes));
+   Assert(txn->nentries == 0);
+   Assert(txn->nentries_mem == 0);
+}
+
 /*
  * Size of a change in memory.
  */
@@ -2813,7 +3605,7 @@ ReorderBufferRestoreChanges(ReorderBuffer *rb, ReorderBufferTXN *txn,
        dlist_container(ReorderBufferChange, node, cleanup_iter.cur);
 
        dlist_delete(&cleanup->node);
-       ReorderBufferReturnChange(rb, cleanup);
+       ReorderBufferReturnChange(rb, cleanup, true);
    }
    txn->nentries_mem = 0;
    Assert(dlist_is_empty(&txn->changes));
@@ -3522,7 +4314,7 @@ ReorderBufferToastReset(ReorderBuffer *rb, ReorderBufferTXN *txn)
            dlist_container(ReorderBufferChange, node, it.cur);
 
            dlist_delete(&change->node);
-           ReorderBufferReturnChange(rb, change);
+           ReorderBufferReturnChange(rb, change, true);
        }
    }
 
@@ -3812,6 +4604,17 @@ ResolveCminCmaxDuringDecoding(HTAB *tuplecid_data,
    BlockNumber blockno;
    bool        updated_mapping = false;
 
+   /*
+    * Return unresolved if tuplecid_data is not valid.  That's because when
+    * streaming in-progress transactions we may run into tuples with the CID
+    * before actually decoding them.  Think e.g. about INSERT followed by
+    * TRUNCATE, where the TRUNCATE may not be decoded yet when applying the
+    * INSERT.  So in such cases, we assume the CID is from the future
+    * command.
+    */
+   if (tuplecid_data == NULL)
+       return false;
+
    /* be careful about padding */
    memset(&key, 0, sizeof(key));
 
index 95d18cdb12e764ccb5bd5d284b1f1bcaa55e0337..aa17f7df84d4be2e85e0e5566d8c5029be0ea388 100644 (file)
@@ -67,6 +67,7 @@
 #define XLH_INSERT_LAST_IN_MULTI               (1<<1)
 #define XLH_INSERT_IS_SPECULATIVE              (1<<2)
 #define XLH_INSERT_CONTAINS_NEW_TUPLE          (1<<3)
+#define XLH_INSERT_ON_TOAST_RELATION           (1<<4)
 
 /*
  * xl_heap_update flag values, 8 bits are available.
index 7ba72c84e02174676f775fc85ffe5afd7a06162a..387eb34a61a366a5341bb3435732d5f158de4717 100644 (file)
@@ -19,6 +19,7 @@
 
 #include "access/relscan.h"
 #include "access/sdir.h"
+#include "access/xact.h"
 #include "utils/guc.h"
 #include "utils/rel.h"
 #include "utils/snapshot.h"
@@ -903,6 +904,15 @@ static inline bool
 table_scan_getnextslot(TableScanDesc sscan, ScanDirection direction, TupleTableSlot *slot)
 {
    slot->tts_tableOid = RelationGetRelid(sscan->rs_rd);
+
+   /*
+    * We don't expect direct calls to table_scan_getnextslot with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_scan_getnextslot call during logical decoding");
+
    return sscan->rs_rd->rd_tableam->scan_getnextslot(sscan, direction, slot);
 }
 
@@ -1017,6 +1027,13 @@ table_index_fetch_tuple(struct IndexFetchTableData *scan,
                        TupleTableSlot *slot,
                        bool *call_again, bool *all_dead)
 {
+   /*
+    * We don't expect direct calls to table_index_fetch_tuple with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_index_fetch_tuple call during logical decoding");
 
    return scan->rel->rd_tableam->index_fetch_tuple(scan, tid, snapshot,
                                                    slot, call_again,
@@ -1056,6 +1073,14 @@ table_tuple_fetch_row_version(Relation rel,
                              Snapshot snapshot,
                              TupleTableSlot *slot)
 {
+   /*
+    * We don't expect direct calls to table_tuple_fetch_row_version with
+    * valid CheckXidAlive for catalog or regular tables.  See detailed
+    * comments in xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_tuple_fetch_row_version call during logical decoding");
+
    return rel->rd_tableam->tuple_fetch_row_version(rel, tid, snapshot, slot);
 }
 
@@ -1713,6 +1738,14 @@ static inline bool
 table_scan_bitmap_next_block(TableScanDesc scan,
                             struct TBMIterateResult *tbmres)
 {
+   /*
+    * We don't expect direct calls to table_scan_bitmap_next_block with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_scan_bitmap_next_block call during logical decoding");
+
    return scan->rs_rd->rd_tableam->scan_bitmap_next_block(scan,
                                                           tbmres);
 }
@@ -1730,6 +1763,14 @@ table_scan_bitmap_next_tuple(TableScanDesc scan,
                             struct TBMIterateResult *tbmres,
                             TupleTableSlot *slot)
 {
+   /*
+    * We don't expect direct calls to table_scan_bitmap_next_tuple with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_scan_bitmap_next_tuple call during logical decoding");
+
    return scan->rs_rd->rd_tableam->scan_bitmap_next_tuple(scan,
                                                           tbmres,
                                                           slot);
@@ -1748,6 +1789,13 @@ static inline bool
 table_scan_sample_next_block(TableScanDesc scan,
                             struct SampleScanState *scanstate)
 {
+   /*
+    * We don't expect direct calls to table_scan_sample_next_block with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_scan_sample_next_block call during logical decoding");
    return scan->rs_rd->rd_tableam->scan_sample_next_block(scan, scanstate);
 }
 
@@ -1764,6 +1812,13 @@ table_scan_sample_next_tuple(TableScanDesc scan,
                             struct SampleScanState *scanstate,
                             TupleTableSlot *slot)
 {
+   /*
+    * We don't expect direct calls to table_scan_sample_next_tuple with valid
+    * CheckXidAlive for catalog or regular tables.  See detailed comments in
+    * xact.c where these variables are declared.
+    */
+   if (unlikely(TransactionIdIsValid(CheckXidAlive) && !bsysscan))
+       elog(ERROR, "unexpected table_scan_sample_next_tuple call during logical decoding");
    return scan->rs_rd->rd_tableam->scan_sample_next_tuple(scan, scanstate,
                                                           slot);
 }
index 53480116a4622f3b36484c6112ecd652ce370db8..c18554bae2c251ab21984a2d43361636ad5706c5 100644 (file)
@@ -81,6 +81,10 @@ typedef enum
 /* Synchronous commit level */
 extern int synchronous_commit;
 
+/* used during logical streaming of a transaction */
+extern TransactionId CheckXidAlive;
+extern bool bsysscan;
+
 /*
  * Miscellaneous flag bits to record events which occur on the top level
  * transaction. These flags are only persisted in MyXactFlags and are intended
index deef31825d6e124f709ac739cda1e60a654cd940..b0fae9808bf6894c75c23ad32a8d1eedc1250205 100644 (file)
@@ -121,5 +121,6 @@ extern void LogicalIncreaseRestartDecodingForSlot(XLogRecPtr current_lsn,
 extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
 
 extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
+extern void ResetLogicalStreamingState(void);
 
 #endif
index 42bc8176487395b393e1340b3bf0fc57d0955957..1ae17d5f11fd73fadd6b3877b2fb00600ac2b435 100644 (file)
@@ -162,6 +162,9 @@ typedef struct ReorderBufferChange
 #define RBTXN_HAS_CATALOG_CHANGES 0x0001
 #define RBTXN_IS_SUBXACT          0x0002
 #define RBTXN_IS_SERIALIZED       0x0004
+#define RBTXN_IS_STREAMED         0x0008
+#define RBTXN_HAS_TOAST_INSERT    0x0010
+#define RBTXN_HAS_SPEC_INSERT     0x0020
 
 /* Does the transaction have catalog changes? */
 #define rbtxn_has_catalog_changes(txn) \
@@ -181,6 +184,40 @@ typedef struct ReorderBufferChange
    ((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
 )
 
+/* This transaction's changes has toast insert, without main table insert. */
+#define rbtxn_has_toast_insert(txn) \
+( \
+   ((txn)->txn_flags & RBTXN_HAS_TOAST_INSERT) != 0 \
+)
+/*
+ * This transaction's changes has speculative insert, without speculative
+ * confirm.
+ */
+#define rbtxn_has_spec_insert(txn) \
+( \
+   ((txn)->txn_flags & RBTXN_HAS_SPEC_INSERT) != 0 \
+)
+
+/* Check whether this transaction has an incomplete change. */
+#define rbtxn_has_incomplete_tuple(txn) \
+( \
+   rbtxn_has_toast_insert(txn) || rbtxn_has_spec_insert(txn) \
+)
+
+/*
+ * Has this transaction been streamed to downstream?
+ *
+ * (It's not possible to deduce this from nentries and nentries_mem for
+ * various reasons. For example, all changes may be in subtransactions in
+ * which case we'd have nentries==0 for the toplevel one, which would say
+ * nothing about the streaming. So we maintain this flag, but only for the
+ * toplevel transaction.)
+ */
+#define rbtxn_is_streamed(txn) \
+( \
+   ((txn)->txn_flags & RBTXN_IS_STREAMED) != 0 \
+)
+
 typedef struct ReorderBufferTXN
 {
    /* See above */
@@ -248,6 +285,13 @@ typedef struct ReorderBufferTXN
    XLogRecPtr  base_snapshot_lsn;
    dlist_node  base_snapshot_node; /* link in txns_by_base_snapshot_lsn */
 
+   /*
+    * Snapshot/CID from the previous streaming run. Only valid for already
+    * streamed transactions (NULL/InvalidCommandId otherwise).
+    */
+   Snapshot    snapshot_now;
+   CommandId   command_id;
+
    /*
     * How many ReorderBufferChange's do we have in this txn.
     *
@@ -313,6 +357,12 @@ typedef struct ReorderBufferTXN
     * Size of this transaction (changes currently in memory, in bytes).
     */
    Size        size;
+
+   /* Size of top-transaction including sub-transactions. */
+   Size        total_size;
+
+   /* If we have detected concurrent abort then ignore future changes. */
+   bool        concurrent_abort;
 } ReorderBufferTXN;
 
 /* so we can define the callbacks used inside struct ReorderBuffer itself */
@@ -484,12 +534,14 @@ void      ReorderBufferFree(ReorderBuffer *);
 ReorderBufferTupleBuf *ReorderBufferGetTupleBuf(ReorderBuffer *, Size tuple_len);
 void       ReorderBufferReturnTupleBuf(ReorderBuffer *, ReorderBufferTupleBuf *tuple);
 ReorderBufferChange *ReorderBufferGetChange(ReorderBuffer *);
-void       ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *);
+void       ReorderBufferReturnChange(ReorderBuffer *, ReorderBufferChange *, bool);
 
 Oid           *ReorderBufferGetRelids(ReorderBuffer *, int nrelids);
 void       ReorderBufferReturnRelids(ReorderBuffer *, Oid *relids);
 
-void       ReorderBufferQueueChange(ReorderBuffer *, TransactionId, XLogRecPtr lsn, ReorderBufferChange *);
+void       ReorderBufferQueueChange(ReorderBuffer *, TransactionId,
+                                    XLogRecPtr lsn, ReorderBufferChange *,
+                                    bool toast_insert);
 void       ReorderBufferQueueMessage(ReorderBuffer *, TransactionId, Snapshot snapshot, XLogRecPtr lsn,
                                      bool transactional, const char *prefix,
                                      Size message_size, const char *message);