#include "utils/guc.h"
#include "utils/memutils.h"
#include "utils/pg_lsn.h"
+#include "utils/pgstat_internal.h"
#include "utils/ps_status.h"
#include "utils/timeout.h"
#include "utils/timestamp.h"
+/* Minimum interval used by walsender for stats flushes, in ms */
+#define WALSENDER_STATS_FLUSH_INTERVAL 1000
+
/*
* Maximum data payload in a WAL data message. Must be >= XLOG_BLCKSZ.
*
int wakeEvents;
uint32 wait_event = 0;
static XLogRecPtr RecentFlushPtr = InvalidXLogRecPtr;
+ TimestampTz last_flush = 0;
/*
* Fast path to avoid acquiring the spinlock in case we already know we
{
bool wait_for_standby_at_stop = false;
long sleeptime;
+ TimestampTz now;
/* Clear any already-pending wakeups */
ResetLatch(MyLatch);
* new WAL to be generated. (But if we have nothing to send, we don't
* want to wake on socket-writable.)
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
wakeEvents = WL_SOCKET_READABLE;
Assert(wait_event != 0);
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
WalSndWait(wakeEvents, sleeptime, wait_event);
}
static void
WalSndLoop(WalSndSendDataCallback send_data)
{
+ TimestampTz last_flush = 0;
+
/*
* Initialize the last reply timestamp. That enables timeout processing
* from hereon.
* WalSndWaitForWal() handle any other blocking; idle receivers need
* its additional actions. For physical replication, also block if
* caught up; its send_data does not block.
+ *
+ * The IO statistics are reported in WalSndWaitForWal() for the
+ * logical WAL senders.
*/
if ((WalSndCaughtUp && send_data != XLogSendLogical &&
!streamingDoneSending) ||
{
long sleeptime;
int wakeEvents;
+ TimestampTz now;
if (!streamingDoneReceiving)
wakeEvents = WL_SOCKET_READABLE;
* Use fresh timestamp, not last_processing, to reduce the chance
* of reaching wal_sender_timeout before sending a keepalive.
*/
- sleeptime = WalSndComputeSleeptime(GetCurrentTimestamp());
+ now = GetCurrentTimestamp();
+ sleeptime = WalSndComputeSleeptime(now);
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
+ /* Report IO statistics, if needed */
+ if (TimestampDifferenceExceeds(last_flush, now,
+ WALSENDER_STATS_FLUSH_INTERVAL))
+ {
+ pgstat_flush_io(false);
+ (void) pgstat_flush_backend(false, PGSTAT_BACKEND_FLUSH_IO);
+ last_flush = now;
+ }
+
/* Sleep until something happens or we time out */
WalSndWait(wakeEvents, sleeptime, WAIT_EVENT_WAL_SENDER_MAIN);
}
has_streaming => 1);
$node_standby_2->start;
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_primary->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
# Create some content on primary and check its presence in standby nodes
$node_primary->safe_psql('postgres',
"CREATE TABLE tab_int AS SELECT generate_series(1,1002) AS a");
note "switching to physical replication slot";
+# Wait for the physical WAL sender to update its IO statistics. This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_primary->poll_query_until(
+ 'postgres',
+ qq[SELECT sum(reads) > 0
+ FROM pg_catalog.pg_stat_io
+ WHERE backend_type = 'walsender'
+ AND object = 'wal']
+ )
+ or die
+ "Timed out while waiting for the walsender to update its IO statistics";
+
# Switch to using a physical replication slot. We can do this without a new
# backup since physical slots can go backwards if needed. Do so on both
# standbys. Since we're going to be testing things that affect the slot state,
# Wait for initial table sync to finish
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tap_sub');
+# Reset IO statistics, for the WAL sender check with pg_stat_io.
+$node_publisher->safe_psql('postgres', "SELECT pg_stat_reset_shared('io')");
+
my $result =
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_notrep");
is($result, qq(0), 'check non-replicated table is empty on subscriber');
$node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_no_col");
is($result, qq(2), 'check replicated changes for table having no columns');
+# Wait for the logical WAL sender to update its IO statistics. This is
+# done before the next restart, which would force a flush of its stats, and
+# far enough from the reset done above to not impact the run time.
+$node_publisher->poll_query_until(
+ 'postgres',
+ qq[SELECT sum(reads) > 0
+ FROM pg_catalog.pg_stat_io
+ WHERE backend_type = 'walsender'
+ AND object = 'wal']
+ )
+ or die
+ "Timed out while waiting for the walsender to update its IO statistics";
+
# insert some duplicate rows
$node_publisher->safe_psql('postgres',
"INSERT INTO tab_full SELECT generate_series(1,10)");