Fix the logical replication timeout during large transactions.
authorAmit Kapila <[email protected]>
Wed, 11 May 2022 05:41:44 +0000 (11:11 +0530)
committerAmit Kapila <[email protected]>
Wed, 11 May 2022 05:41:44 +0000 (11:11 +0530)
The problem is that we don't send keep-alive messages for a long time
while processing large transactions during logical replication where we
don't send any data of such transactions. This can happen when the table
modified in the transaction is not published or because all the changes
got filtered. We do try to send the keep_alive if necessary at the end of
the transaction (via WalSndWriteData()) but by that time the
subscriber-side can timeout and exit.

To fix this we try to send the keepalive message if required after
processing certain threshold of changes.

Reported-by: Fabrice Chapuis
Author: Wang wei and Amit Kapila
Reviewed By: Masahiko Sawada, Euler Taveira, Hou Zhijie, Hayato Kuroda
Backpatch-through: 10
Discussion: https://postgr.es/m/CAA5-nLARN7-3SLU_QUxfy510pmrYK6JJb=bk3hcgemAM_pAv+w@mail.gmail.com

src/backend/replication/logical/logical.c
src/backend/replication/pgoutput/pgoutput.c
src/backend/replication/walsender.c
src/include/replication/logical.h

index 788769dd738e67cb19447c0d4159e96fd3a4c342..625a7f42730d131619d8df80bb234b1ac205ac76 100644 (file)
@@ -746,6 +746,7 @@ startup_cb_wrapper(LogicalDecodingContext *ctx, OutputPluginOptions *opt, bool i
 
        /* set output state */
        ctx->accept_writes = false;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.startup_cb(ctx, opt, is_init);
@@ -773,6 +774,7 @@ shutdown_cb_wrapper(LogicalDecodingContext *ctx)
 
        /* set output state */
        ctx->accept_writes = false;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.shutdown_cb(ctx);
@@ -808,6 +810,7 @@ begin_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->first_lsn;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.begin_cb(ctx, txn);
@@ -839,6 +842,7 @@ commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record */
+       ctx->end_xact = true;
 
        /* do the actual work: call callback */
        ctx->callbacks.commit_cb(ctx, txn, commit_lsn);
@@ -879,6 +883,7 @@ begin_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn)
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->first_lsn;
+       ctx->end_xact = false;
 
        /*
         * If the plugin supports two-phase commits then begin prepare callback is
@@ -923,6 +928,7 @@ prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record */
+       ctx->end_xact = true;
 
        /*
         * If the plugin supports two-phase commits then prepare callback is
@@ -967,6 +973,7 @@ commit_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record */
+       ctx->end_xact = true;
 
        /*
         * If the plugin support two-phase commits then commit prepared callback
@@ -1012,6 +1019,7 @@ rollback_prepared_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn; /* points to the end of the record */
+       ctx->end_xact = true;
 
        /*
         * If the plugin support two-phase commits then rollback prepared callback
@@ -1062,6 +1070,8 @@ change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
+       ctx->end_xact = false;
+
        ctx->callbacks.change_cb(ctx, txn, relation, change);
 
        /* Pop the error context stack */
@@ -1102,6 +1112,8 @@ truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
+       ctx->end_xact = false;
+
        ctx->callbacks.truncate_cb(ctx, txn, nrelations, relations, change);
 
        /* Pop the error context stack */
@@ -1129,6 +1141,7 @@ filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
 
        /* set output state */
        ctx->accept_writes = false;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
@@ -1159,6 +1172,7 @@ filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id)
 
        /* set output state */
        ctx->accept_writes = false;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ret = ctx->callbacks.filter_by_origin_cb(ctx, origin_id);
@@ -1196,6 +1210,7 @@ message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
        ctx->write_location = message_lsn;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1239,6 +1254,8 @@ stream_start_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = first_lsn;
 
+       ctx->end_xact = false;
+
        /* in streaming mode, stream_start_cb is required */
        if (ctx->callbacks.stream_start_cb == NULL)
                ereport(ERROR,
@@ -1286,6 +1303,8 @@ stream_stop_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = last_lsn;
 
+       ctx->end_xact = false;
+
        /* in streaming mode, stream_stop_cb is required */
        if (ctx->callbacks.stream_stop_cb == NULL)
                ereport(ERROR,
@@ -1325,6 +1344,7 @@ stream_abort_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = abort_lsn;
+       ctx->end_xact = true;
 
        /* in streaming mode, stream_abort_cb is required */
        if (ctx->callbacks.stream_abort_cb == NULL)
@@ -1369,6 +1389,7 @@ stream_prepare_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn;
+       ctx->end_xact = true;
 
        /* in streaming mode with two-phase commits, stream_prepare_cb is required */
        if (ctx->callbacks.stream_prepare_cb == NULL)
@@ -1409,6 +1430,7 @@ stream_commit_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn->xid;
        ctx->write_location = txn->end_lsn;
+       ctx->end_xact = true;
 
        /* in streaming mode, stream_commit_cb is required */
        if (ctx->callbacks.stream_commit_cb == NULL)
@@ -1457,6 +1479,8 @@ stream_change_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
+       ctx->end_xact = false;
+
        /* in streaming mode, stream_change_cb is required */
        if (ctx->callbacks.stream_change_cb == NULL)
                ereport(ERROR,
@@ -1501,6 +1525,7 @@ stream_message_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
        ctx->accept_writes = true;
        ctx->write_xid = txn != NULL ? txn->xid : InvalidTransactionId;
        ctx->write_location = message_lsn;
+       ctx->end_xact = false;
 
        /* do the actual work: call callback */
        ctx->callbacks.stream_message_cb(ctx, txn, message_lsn, transactional, prefix,
@@ -1549,6 +1574,8 @@ stream_truncate_cb_wrapper(ReorderBuffer *cache, ReorderBufferTXN *txn,
         */
        ctx->write_location = change->lsn;
 
+       ctx->end_xact = false;
+
        ctx->callbacks.stream_truncate_cb(ctx, txn, nrelations, relations, change);
 
        /* Pop the error context stack */
index b197bfd565d78444d8bfd394f4427d0d6636510d..406ad84e1d649e837e4dc24d748eaaa9f684887a 100644 (file)
@@ -91,6 +91,8 @@ static void send_relation_and_attrs(Relation relation, TransactionId xid,
 static void send_repl_origin(LogicalDecodingContext *ctx,
                                                         RepOriginId origin_id, XLogRecPtr origin_lsn,
                                                         bool send_origin);
+static void update_replication_progress(LogicalDecodingContext *ctx,
+                                                                               bool skipped_xact);
 
 /*
  * Only 3 publication actions are used for row filtering ("insert", "update",
@@ -558,7 +560,7 @@ pgoutput_commit_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
         * from this transaction has been sent to the downstream.
         */
        sent_begin_txn = txndata->sent_begin_txn;
-       OutputPluginUpdateProgress(ctx, !sent_begin_txn);
+       update_replication_progress(ctx, !sent_begin_txn);
        pfree(txndata);
        txn->output_plugin_private = NULL;
 
@@ -597,7 +599,7 @@ static void
 pgoutput_prepare_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                         XLogRecPtr prepare_lsn)
 {
-       OutputPluginUpdateProgress(ctx, false);
+       update_replication_progress(ctx, false);
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_prepare(ctx->out, txn, prepare_lsn);
@@ -611,7 +613,7 @@ static void
 pgoutput_commit_prepared_txn(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
                                                         XLogRecPtr commit_lsn)
 {
-       OutputPluginUpdateProgress(ctx, false);
+       update_replication_progress(ctx, false);
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_commit_prepared(ctx->out, txn, commit_lsn);
@@ -627,7 +629,7 @@ pgoutput_rollback_prepared_txn(LogicalDecodingContext *ctx,
                                                           XLogRecPtr prepare_end_lsn,
                                                           TimestampTz prepare_time)
 {
-       OutputPluginUpdateProgress(ctx, false);
+       update_replication_progress(ctx, false);
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_rollback_prepared(ctx->out, txn, prepare_end_lsn,
@@ -1360,6 +1362,8 @@ pgoutput_change(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        TupleTableSlot *old_slot = NULL;
        TupleTableSlot *new_slot = NULL;
 
+       update_replication_progress(ctx, false);
+
        if (!is_publishable_relation(relation))
                return;
 
@@ -1592,6 +1596,8 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        Oid                *relids;
        TransactionId xid = InvalidTransactionId;
 
+       update_replication_progress(ctx, false);
+
        /* Remember the xid for the change in streaming mode. See pgoutput_change. */
        if (in_streaming)
                xid = change->txn->xid;
@@ -1655,6 +1661,8 @@ pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
        PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
        TransactionId xid = InvalidTransactionId;
 
+       update_replication_progress(ctx, false);
+
        if (!data->messages)
                return;
 
@@ -1847,7 +1855,7 @@ pgoutput_stream_commit(struct LogicalDecodingContext *ctx,
        Assert(!in_streaming);
        Assert(rbtxn_is_streamed(txn));
 
-       OutputPluginUpdateProgress(ctx, false);
+       update_replication_progress(ctx, false);
 
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_commit(ctx->out, txn, commit_lsn);
@@ -1868,7 +1876,7 @@ pgoutput_stream_prepare_txn(LogicalDecodingContext *ctx,
 {
        Assert(rbtxn_is_streamed(txn));
 
-       OutputPluginUpdateProgress(ctx, false);
+       update_replication_progress(ctx, false);
        OutputPluginPrepareWrite(ctx, true);
        logicalrep_write_stream_prepare(ctx->out, txn, prepare_lsn);
        OutputPluginWrite(ctx, true);
@@ -2361,3 +2369,37 @@ send_repl_origin(LogicalDecodingContext *ctx, RepOriginId origin_id,
                }
        }
 }
+
+/*
+ * Try to update progress and send a keepalive message if too many changes were
+ * processed.
+ *
+ * For a large transaction, if we don't send any change to the downstream for a
+ * long time (exceeds the wal_receiver_timeout of standby) then it can timeout.
+ * This can happen when all or most of the changes are either not published or
+ * got filtered out.
+ */
+static void
+update_replication_progress(LogicalDecodingContext *ctx, bool skipped_xact)
+{
+       static int      changes_count = 0;
+
+       /*
+        * We don't want to try sending a keepalive message after processing each
+        * change as that can have overhead. Tests revealed that there is no
+        * noticeable overhead in doing it after continuously processing 100 or so
+        * changes.
+        */
+#define CHANGES_THRESHOLD 100
+
+       /*
+        * If we are at the end of transaction LSN, update progress tracking.
+        * Otherwise, after continuously processing CHANGES_THRESHOLD changes, we
+        * try to send a keepalive message if required.
+        */
+       if (ctx->end_xact || ++changes_count >= CHANGES_THRESHOLD)
+       {
+               OutputPluginUpdateProgress(ctx, skipped_xact);
+               changes_count = 0;
+       }
+}
index 63a818140bd6722fb103258aa46a2673d2d87eae..c6c196b2fab28746b944dcaffb96727b5f9a1a71 100644 (file)
@@ -1482,14 +1482,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 {
        static TimestampTz sendTime = 0;
        TimestampTz now = GetCurrentTimestamp();
+       bool            pending_writes = false;
+       bool            end_xact = ctx->end_xact;
 
        /*
         * Track lag no more than once per WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS to
         * avoid flooding the lag tracker when we commit frequently.
+        *
+        * We don't have a mechanism to get the ack for any LSN other than end
+        * xact LSN from the downstream. So, we track lag only for end of
+        * transaction LSN.
         */
 #define WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS   1000
-       if (TimestampDifferenceExceeds(sendTime, now,
-                                                                  WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
+       if (end_xact && TimestampDifferenceExceeds(sendTime, now,
+                                                                                          WALSND_LOGICAL_LAG_TRACK_INTERVAL_MS))
        {
                LagTrackerWrite(lsn, now);
                sendTime = now;
@@ -1515,8 +1521,20 @@ WalSndUpdateProgress(LogicalDecodingContext *ctx, XLogRecPtr lsn, TransactionId
 
                /* If we have pending write here, make sure it's actually flushed */
                if (pq_is_send_pending())
-                       ProcessPendingWrites();
+                       pending_writes = true;
        }
+
+       /*
+        * Process pending writes if any or try to send a keepalive if required.
+        * We don't need to try sending keep alive messages at the transaction end
+        * as that will be done at a later point in time. This is required only
+        * for large transactions where we don't send any changes to the
+        * downstream and the receiver can timeout due to that.
+        */
+       if (pending_writes || (!end_xact &&
+                                                  now >= TimestampTzPlusMilliseconds(last_reply_timestamp,
+                                                                                                                         wal_sender_timeout / 2)))
+               ProcessPendingWrites();
 }
 
 /*
index a6ef16ad5b1452cb90f357ee2be84e7713061a31..edadacd58936aae5ffe66c54687cb8a34a5e2308 100644 (file)
@@ -107,6 +107,8 @@ typedef struct LogicalDecodingContext
        bool            prepared_write;
        XLogRecPtr      write_location;
        TransactionId write_xid;
+       /* Are we processing the end LSN of a transaction? */
+       bool            end_xact;
 } LogicalDecodingContext;