Flush the IO statistics of active WAL senders more frequently
authorMichael Paquier <[email protected]>
Mon, 7 Apr 2025 22:57:19 +0000 (07:57 +0900)
committerMichael Paquier <[email protected]>
Mon, 7 Apr 2025 22:57:19 +0000 (07:57 +0900)
WAL senders do not flush their statistics until they exit, limiting the
monitoring possible for live processes.  This is penalizing when WAL
senders are running for a long time, like in streaming or logical
replication setups, because it is not possible to know the amount of IO
they generate while running.

This commit makes WAL senders more aggressive with their statistics
flush, using an internal of 1 second, with the flush timing calculated
based on the existing GetCurrentTimestamp() done before the sleeps done
to wait for some activity.  Note that the sleep done for logical and
physical WAL senders happens in two different code paths, so the stats
flushes need to happen in these two places.

One test is added for the physical WAL sender case, and one for the
logical WAL sender case.  This can be done in a stable fashion by
relying on the WAL generated by the TAP tests in combination with a
stats reset while a server is running, but only on HEAD as WAL data has
been added to pg_stat_io in a051e71e28a1.

This issue exists since a9c70b46dbe and the introduction of pg_stat_io,
so backpatch down to v16.

Author: Bertrand Drouvot <[email protected]>
Reviewed-by: vignesh C <[email protected]>
Reviewed-by: Xuneng Zhou <[email protected]>
Discussion: https://postgr.es/m/[email protected]
Backpatch-through: 16

src/backend/replication/walsender.c
src/test/recovery/t/001_stream_rep.pl
src/test/subscription/t/001_rep_changes.pl

index 1028919aecb14897c29df5f9d3db209695846fd1..216baeda5cd221b80de074e47db9f1ed3f3a5971 100644 (file)
 #include "utils/guc.h"
 #include "utils/memutils.h"
 #include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
 #include "utils/ps_status.h"
 #include "utils/timeout.h"
 #include "utils/timestamp.h"
 
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL         1000
+
 /*
  * Maximum data payload in a WAL data message.  Must be >= XLOG_BLCKSZ.
  *
@@ -1797,6 +1801,7 @@ WalSndWaitForWal(XLogRecPtr loc)
    int         wakeEvents;
    uint32      wait_event = 0;
    static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+   TimestampTz last_flush = 0;
 
    /*
     * Fast path to avoid acquiring the spinlock in case we already know we
@@ -1817,6 +1822,7 @@ WalSndWaitForWal(XLogRecPtr loc)
    {
        bool        wait_for_standby_at_stop = false;
        long        sleeptime;
+       TimestampTz now;
 
        /* Clear any already-pending wakeups */
        ResetLatch(MyLatch);
@@ -1927,7 +1933,8 @@ WalSndWaitForWal(XLogRecPtr loc)
         * new WAL to be generated.  (But if we have nothing to send, we don't
         * want to wake on socket-writable.)
         */
-       sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+       now = GetCurrentTimestamp();
+       sleeptime = WalSndComputeSleeptime(now);
 
        wakeEvents = WL_SOCKET_READABLE;
 
@@ -1936,6 +1943,15 @@ WalSndWaitForWal(XLogRecPtr loc)
 
        Assert(wait_event != 0);
 
+       /* Report IO statistics, if needed */
+       if (TimestampDifferenceExceeds(last_flush, now,
+                                      WALSENDER_STATS_FLUSH_INTERVAL))
+       {
+           pgstat_flush_io(false);
+           (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+           last_flush = now;
+       }
+
        WalSndWait(wakeEvents, sleeptime, wait_event);
    }
 
@@ -2742,6 +2758,8 @@ WalSndCheckTimeOut(void)
 static void
 WalSndLoop(WalSndSendDataCallback send_data)
 {
+   TimestampTz last_flush = 0;
+
    /*
     * Initialize the last reply timestamp. That enables timeout processing
     * from hereon.
@@ -2836,6 +2854,9 @@ WalSndLoop(WalSndSendDataCallback send_data)
         * WalSndWaitForWal() handle any other blocking; idle receivers need
         * its additional actions.  For physical replication, also block if
         * caught up; its send_data does not block.
+        *
+        * The IO statistics are reported in WalSndWaitForWal() for the
+        * logical WAL senders.
         */
        if ((WalSndCaughtUp && send_data != XLogSendLogical &&
             !streamingDoneSending) ||
@@ -2843,6 +2864,7 @@ WalSndLoop(WalSndSendDataCallback send_data)
        {
            long        sleeptime;
            int         wakeEvents;
+           TimestampTz now;
 
            if (!streamingDoneReceiving)
                wakeEvents = WL_SOCKET_READABLE;
@@ -2853,11 +2875,21 @@ WalSndLoop(WalSndSendDataCallback send_data)
             * Use fresh timestamp, not last_processing, to reduce the chance
             * of reaching wal_sender_timeout before sending a keepalive.
             */
-           sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+           now = GetCurrentTimestamp();
+           sleeptime = WalSndComputeSleeptime(now);
 
            if (pq_is_send_pending())
                wakeEvents |= WL_SOCKET_WRITEABLE;
 
+           /* Report IO statistics, if needed */
+           if (TimestampDifferenceExceeds(last_flush, now,
+                                          WALSENDER_STATS_FLUSH_INTERVAL))
+           {
+               pgstat_flush_io(false);
+               (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+               last_flush = now;
+           }
+
            /* Sleep until something happens or we time out */
            WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
        }
index ccd8417d449f9d8b390ab54667f71e3a9827a73e..2cbcc509d76657252553b57fce4e96c87cc58c0c 100644 (file)
@@ -42,6 +42,9 @@ $node_standby_2->init_from_backup($node_standby_1, $backup_name,
    has_streaming => 1);
 $node_standby_2->start;
 
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 # Create some content on primary and check its presence in standby nodes
 $node_primary->safe_psql('postgres',
    "CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
@@ -333,6 +336,19 @@ $node_primary->psql(
 
 note "switching to physical replication slot";
 
+# Wait for the physical WAL sender to update its IO statistics.  This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_primary->poll_query_until(
+   'postgres',
+   qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # Switch to using a physical replication slot. We can do this without a new
 # backup since physical slots can go backwards if needed. Do so on both
 # standbys. Since we're going to be testing things that affect the slot state,
index 8726fe04ad285f177dbe72004b0d147555a0880e..916fdb48b3b345ce31032d45612fe1cf66613590 100644 (file)
@@ -113,6 +113,9 @@ $node_subscriber->safe_psql('postgres',
 # Wait for initial table sync to finish
 $node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
 
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_publisher->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
 my $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
 is($result, qq(0), 'check non-replicated table is empty on subscriber');
@@ -184,6 +187,19 @@ $result =
   $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
 is($result, qq(2), 'check replicated changes for table having no columns');
 
+# Wait for the logical WAL sender to update its IO statistics.  This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_publisher->poll_query_until(
+   'postgres',
+   qq[SELECT sum(reads) > 0
+       FROM pg_catalog.pg_stat_io
+       WHERE backend_type = 'walsender'
+       AND object = 'wal']
+  )
+  or die
+  "Timed out while waiting for the walsender to update its IO statistics";
+
 # insert some duplicate rows
 $node_publisher->safe_psql('postgres',
    "INSERT INTO tab_full SELECT generate_series(1,10)");