</entry>
</row>
+ <row>
+ <entry><structname>pg_stat_replication_slots</structname><indexterm><primary>pg_stat_replication_slots</primary></indexterm></entry>
+ <entry>One row per replication slot, showing statistics about
+ replication slot usage.
+ See <link linkend="monitoring-pg-stat-replication-slots-view">
+ <structname>pg_stat_replication_slots</structname></link> for details.
+ </entry>
+ </row>
+
<row>
<entry><structname>pg_stat_wal_receiver</structname><indexterm><primary>pg_stat_wal_receiver</primary></indexterm></entry>
<entry>Only one row, showing statistics about the WAL receiver from
</sect2>
+ <sect2 id="monitoring-pg-stat-replication-slots-view">
+ <title><structname>pg_stat_replication_slots</structname></title>
+
+ <indexterm>
+ <primary>pg_stat_replication_slots</primary>
+ </indexterm>
+
+ <para>
+ The <structname>pg_stat_replication_slots</structname> view will contain
+ one row per logical replication slot, showing statistics about its usage.
+ </para>
+
+ <table id="pg-stat-replication-slots-view" xreflabel="pg_stat_replication_slots">
+ <title><structname>pg_stat_replication_slots</structname> View</title>
+ <tgroup cols="1">
+ <thead>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ Column Type
+ </para>
+ <para>
+ Description
+ </para></entry>
+ </row>
+ </thead>
+
+ <tbody>
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>name</structfield> <type>text</type>
+ </para>
+ <para>
+ A unique, cluster-wide identifier for the replication slot
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_txns</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of transactions spilled to disk after the memory used by
+ logical decoding exceeds <literal>logical_decoding_work_mem</literal>. The
+ counter gets incremented both for toplevel transactions and
+ subtransactions.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_count</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Number of times transactions were spilled to disk. Transactions
+ may get spilled repeatedly, and this counter gets incremented on every
+ such invocation.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>spill_bytes</structfield> <type>bigint</type>
+ </para>
+ <para>
+ Amount of decoded transaction data spilled to disk.
+ </para></entry>
+ </row>
+
+ <row>
+ <entry role="catalog_table_entry"><para role="column_definition">
+ <structfield>stats_reset</structfield> <type>timestamp with time zone</type>
+ </para>
+ <para>
+ Time at which these statistics were last reset
+ </para></entry>
+ </row>
+ </tbody>
+ </tgroup>
+ </table>
+
+ </sect2>
+
<sect2 id="monitoring-pg-stat-wal-receiver-view">
<title><structname>pg_stat_wal_receiver</structname></title>
can be granted EXECUTE to run the function.
</para></entry>
</row>
+
+ <row>
+ <entry role="func_table_entry"><para role="func_signature">
+ <indexterm>
+ <primary>pg_stat_reset_replication_slot</primary>
+ </indexterm>
+ <function>pg_stat_reset_replication_slot</function> ( <type>text</type> )
+ <returnvalue>void</returnvalue>
+ </para>
+ <para>
+ Resets statistics to zero for a single replication slot, or for all
+ replication slots in the cluster. The argument can be either the name
+ of the slot to reset the stats or NULL. If the argument is NULL, all
+ counters shown in the <structname>pg_stat_replication_slots</structname>
+ view for all replication slots are reset.
+ </para>
+ <para>
+ This function is restricted to superusers by default, but other users
+ can be granted EXECUTE to run the function.
+ </para></entry>
+ </row>
</tbody>
</tgroup>
</table>
JOIN pg_stat_get_wal_senders() AS W ON (S.pid = W.pid)
LEFT JOIN pg_authid AS U ON (S.usesysid = U.oid);
+CREATE VIEW pg_stat_replication_slots AS
+ SELECT
+ s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes,
+ s.stats_reset
+ FROM pg_stat_get_replication_slots() AS s;
+
CREATE VIEW pg_stat_slru AS
SELECT
s.name,
REVOKE EXECUTE ON FUNCTION pg_stat_reset_slru(text) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_table_counters(oid) FROM public;
REVOKE EXECUTE ON FUNCTION pg_stat_reset_single_function_counters(oid) FROM public;
+REVOKE EXECUTE ON FUNCTION pg_stat_reset_replication_slot(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text) FROM public;
REVOKE EXECUTE ON FUNCTION lo_import(text, oid) FROM public;
#include "postmaster/fork_process.h"
#include "postmaster/interrupt.h"
#include "postmaster/postmaster.h"
+#include "replication/slot.h"
#include "replication/walsender.h"
#include "storage/backendid.h"
#include "storage/dsm.h"
static PgStat_GlobalStats globalStats;
static PgStat_WalStats walStats;
static PgStat_SLRUStats slruStats[SLRU_NUM_ELEMENTS];
+static PgStat_ReplSlotStats *replSlotStats;
+static int nReplSlotStats;
/*
* List of OIDs of databases we need to write out. If an entry is InvalidOid,
static bool pgstat_write_statsfile_needed(void);
static bool pgstat_db_requested(Oid databaseid);
+static int pgstat_replslot_index(const char *name, bool create_it);
+static void pgstat_reset_replslot(int i, TimestampTz ts);
+
static void pgstat_send_tabstat(PgStat_MsgTabstat *tsmsg);
static void pgstat_send_funcstats(void);
static void pgstat_send_slru(void);
static void pgstat_recv_resetsharedcounter(PgStat_MsgResetsharedcounter *msg, int len);
static void pgstat_recv_resetsinglecounter(PgStat_MsgResetsinglecounter *msg, int len);
static void pgstat_recv_resetslrucounter(PgStat_MsgResetslrucounter *msg, int len);
+static void pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg, int len);
static void pgstat_recv_autovac(PgStat_MsgAutovacStart *msg, int len);
static void pgstat_recv_vacuum(PgStat_MsgVacuum *msg, int len);
static void pgstat_recv_analyze(PgStat_MsgAnalyze *msg, int len);
static void pgstat_recv_recoveryconflict(PgStat_MsgRecoveryConflict *msg, int len);
static void pgstat_recv_deadlock(PgStat_MsgDeadlock *msg, int len);
static void pgstat_recv_checksum_failure(PgStat_MsgChecksumFailure *msg, int len);
+static void pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len);
static void pgstat_recv_tempfile(PgStat_MsgTempFile *msg, int len);
/* ------------------------------------------------------------
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_reset_replslot_counter() -
+ *
+ * Tell the statistics collector to reset a single replication slot
+ * counter, or all replication slots counters (when name is null).
+ *
+ * Permission checking for this function is managed through the normal
+ * GRANT system.
+ * ----------
+ */
+void
+pgstat_reset_replslot_counter(const char *name)
+{
+ PgStat_MsgResetreplslotcounter msg;
+
+ if (pgStatSock == PGINVALID_SOCKET)
+ return;
+
+ if (name)
+ {
+ ReplicationSlot *slot;
+
+ /*
+ * Check if the slot exits with the given name. It is possible that by
+ * the time this message is executed the slot is dropped but at least
+ * this check will ensure that the given name is for a valid slot.
+ */
+ LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
+ slot = SearchNamedReplicationSlot(name);
+ LWLockRelease(ReplicationSlotControlLock);
+
+ if (!slot)
+ ereport(ERROR,
+ (errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+ errmsg("replication slot \"%s\" does not exist",
+ name)));
+
+ /*
+ * Nothing to do for physical slots as we collect stats only for
+ * logical slots.
+ */
+ if (SlotIsPhysical(slot))
+ return;
+
+ memcpy(&msg.m_slotname, name, NAMEDATALEN);
+ msg.clearall = false;
+ }
+ else
+ msg.clearall = true;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_RESETREPLSLOTCOUNTER);
+
+ pgstat_send(&msg, sizeof(msg));
+}
+
/* ----------
* pgstat_report_autovac() -
*
pgstat_send(&msg, sizeof(msg));
}
+/* ----------
+ * pgstat_report_replslot() -
+ *
+ * Tell the collector about replication slot statistics.
+ * ----------
+ */
+void
+pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes)
+{
+ PgStat_MsgReplSlot msg;
+
+ /*
+ * Prepare and send the message
+ */
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = false;
+ msg.m_spill_txns = spilltxns;
+ msg.m_spill_count = spillcount;
+ msg.m_spill_bytes = spillbytes;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
+
+/* ----------
+ * pgstat_report_replslot_drop() -
+ *
+ * Tell the collector about dropping the replication slot.
+ * ----------
+ */
+void
+pgstat_report_replslot_drop(const char *slotname)
+{
+ PgStat_MsgReplSlot msg;
+
+ pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_REPLSLOT);
+ memcpy(&msg.m_slotname, slotname, NAMEDATALEN);
+ msg.m_drop = true;
+ pgstat_send(&msg, sizeof(PgStat_MsgReplSlot));
+}
/* ----------
* pgstat_ping() -
return slruStats;
}
+/*
+ * ---------
+ * pgstat_fetch_replslot() -
+ *
+ * Support function for the SQL-callable pgstat* functions. Returns
+ * a pointer to the replication slot statistics struct and sets the
+ * number of entries in nslots_p.
+ * ---------
+ */
+PgStat_ReplSlotStats *
+pgstat_fetch_replslot(int *nslots_p)
+{
+ backend_read_statsfile();
+
+ *nslots_p = nReplSlotStats;
+ return replSlotStats;
+}
/* ------------------------------------------------------------
* Functions for management of the shared-memory PgBackendStatus array
len);
break;
+ case PGSTAT_MTYPE_RESETREPLSLOTCOUNTER:
+ pgstat_recv_resetreplslotcounter(&msg.msg_resetreplslotcounter,
+ len);
+ break;
+
case PGSTAT_MTYPE_AUTOVAC_START:
pgstat_recv_autovac(&msg.msg_autovacuum_start, len);
break;
len);
break;
+ case PGSTAT_MTYPE_REPLSLOT:
+ pgstat_recv_replslot(&msg.msg_replslot, len);
+ break;
+
default:
break;
}
const char *tmpfile = permanent ? PGSTAT_STAT_PERMANENT_TMPFILE : pgstat_stat_tmpname;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
int rc;
+ int i;
elog(DEBUG2, "writing stats file \"%s\"", statfile);
(void) rc; /* we'll check for error with ferror */
}
+ /*
+ * Write replication slot stats struct
+ */
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ fputc('R', fpout);
+ rc = fwrite(&replSlotStats[i], sizeof(PgStat_ReplSlotStats), 1, fpout);
+ (void) rc; /* we'll check for error with ferror */
+ }
+
/*
* No more output to be done. Close the temp file and replace the old
* pgstat.stat with it. The ferror() check replaces testing for error
dbhash = hash_create("Databases hash", PGSTAT_DB_HASH_SIZE, &hash_ctl,
HASH_ELEM | HASH_BLOBS | HASH_CONTEXT);
+ /* Allocate the space for replication slot statistics */
+ replSlotStats = palloc0(max_replication_slots * sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats = 0;
+
/*
* Clear out global, archiver, WAL and SLRU statistics so they start from
* zero in case we can't load an existing statsfile.
for (i = 0; i < SLRU_NUM_ELEMENTS; i++)
slruStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+ /*
+ * Set the same reset timestamp for all replication slots too.
+ */
+ for (i = 0; i < max_replication_slots; i++)
+ replSlotStats[i].stat_reset_timestamp = globalStats.stat_reset_timestamp;
+
/*
* Try to open the stats file. If it doesn't exist, the backends simply
* return zero for anything and the collector simply starts from scratch
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication
+ * slot follows.
+ */
+ case 'R':
+ if (fread(&replSlotStats[nReplSlotStats], 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ goto done;
+ }
+ nReplSlotStats++;
+ break;
+
case 'E':
goto done;
PgStat_ArchiverStats myArchiverStats;
PgStat_WalStats myWalStats;
PgStat_SLRUStats mySLRUStats[SLRU_NUM_ELEMENTS];
+ PgStat_ReplSlotStats myReplSlotStats;
FILE *fpin;
int32 format_id;
const char *statfile = permanent ? PGSTAT_STAT_PERMANENT_FILENAME : pgstat_stat_filename;
break;
+ /*
+ * 'R' A PgStat_ReplSlotStats struct describing a replication
+ * slot follows.
+ */
+ case 'R':
+ if (fread(&myReplSlotStats, 1, sizeof(PgStat_ReplSlotStats), fpin)
+ != sizeof(PgStat_ReplSlotStats))
+ {
+ ereport(pgStatRunningInCollector ? LOG : WARNING,
+ (errmsg("corrupted statistics file \"%s\"",
+ statfile)));
+ FreeFile(fpin);
+ return false;
+ }
+ break;
+
case 'E':
goto done;
}
}
+/* ----------
+ * pgstat_recv_resetreplslotcounter() -
+ *
+ * Reset some replication slot statistics of the cluster.
+ * ----------
+ */
+static void
+pgstat_recv_resetreplslotcounter(PgStat_MsgResetreplslotcounter *msg,
+ int len)
+{
+ int i;
+ int idx = -1;
+ TimestampTz ts;
+
+ ts = GetCurrentTimestamp();
+ if (msg->clearall)
+ {
+ for (i = 0; i < nReplSlotStats; i++)
+ pgstat_reset_replslot(i, ts);
+ }
+ else
+ {
+ /* Get the index of replication slot statistics to reset */
+ idx = pgstat_replslot_index(msg->m_slotname, false);
+
+ /*
+ * Nothing to do if the given slot entry is not found. This could
+ * happen when the slot with the given name is removed and the
+ * corresponding statistics entry is also removed before receiving the
+ * reset message.
+ */
+ if (idx < 0)
+ return;
+
+ /* Reset the stats for the requested replication slot */
+ pgstat_reset_replslot(idx, ts);
+ }
+}
+
+
/* ----------
* pgstat_recv_autovac() -
*
dbentry->last_checksum_failure = msg->m_failure_time;
}
+/* ----------
+ * pgstat_recv_replslot() -
+ *
+ * Process a REPLSLOT message.
+ * ----------
+ */
+static void
+pgstat_recv_replslot(PgStat_MsgReplSlot *msg, int len)
+{
+ int idx;
+
+ /*
+ * Get the index of replication slot statistics. On dropping, we don't
+ * create the new statistics.
+ */
+ idx = pgstat_replslot_index(msg->m_slotname, !msg->m_drop);
+
+ /*
+ * The slot entry is not found or there is no space to accommodate the new
+ * entry. This could happen when the message for the creation of a slot
+ * reached before the drop message even though the actual operations
+ * happen in reverse order. In such a case, the next update of the
+ * statistics for the same slot will create the required entry.
+ */
+ if (idx < 0)
+ return;
+
+ Assert(idx >= 0 && idx <= max_replication_slots);
+ if (msg->m_drop)
+ {
+ /* Remove the replication slot statistics with the given name */
+ memcpy(&replSlotStats[idx], &replSlotStats[nReplSlotStats - 1],
+ sizeof(PgStat_ReplSlotStats));
+ nReplSlotStats--;
+ Assert(nReplSlotStats >= 0);
+ }
+ else
+ {
+ /* Update the replication slot statistics */
+ replSlotStats[idx].spill_txns += msg->m_spill_txns;
+ replSlotStats[idx].spill_count += msg->m_spill_count;
+ replSlotStats[idx].spill_bytes += msg->m_spill_bytes;
+ }
+}
+
/* ----------
* pgstat_recv_tempfile() -
*
return activity;
}
+/* ----------
+ * pgstat_replslot_index
+ *
+ * Return the index of entry of a replication slot with the given name, or
+ * -1 if the slot is not found.
+ *
+ * create_it tells whether to create the new slot entry if it is not found.
+ * ----------
+ */
+static int
+pgstat_replslot_index(const char *name, bool create_it)
+{
+ int i;
+
+ Assert(nReplSlotStats <= max_replication_slots);
+ for (i = 0; i < nReplSlotStats; i++)
+ {
+ if (strcmp(replSlotStats[i].slotname, name) == 0)
+ return i; /* found */
+ }
+
+ /*
+ * The slot is not found. We don't want to register the new statistics if
+ * the list is already full or the caller didn't request.
+ */
+ if (i == max_replication_slots || !create_it)
+ return -1;
+
+ /* Register new slot */
+ memset(&replSlotStats[nReplSlotStats], 0, sizeof(PgStat_ReplSlotStats));
+ memcpy(&replSlotStats[nReplSlotStats].slotname, name, NAMEDATALEN);
+
+ return nReplSlotStats++;
+}
+
+/* ----------
+ * pgstat_reset_replslot
+ *
+ * Reset the replication slot stats at index 'i'.
+ * ----------
+ */
+static void
+pgstat_reset_replslot(int i, TimestampTz ts)
+{
+ /* reset only counters. Don't clear slot name */
+ replSlotStats[i].spill_txns = 0;
+ replSlotStats[i].spill_count = 0;
+ replSlotStats[i].spill_bytes = 0;
+ replSlotStats[i].stat_reset_timestamp = ts;
+}
+
/*
* pgstat_slru_index
*
/* replay actions of all transaction + subtransactions in order */
ReorderBufferCommit(ctx->reorder, xid, buf->origptr, buf->endptr,
commit_time, origin_id, origin_lsn);
+
+ /*
+ * Update the decoding stats at transaction commit/abort. It is not clear
+ * that sending more or less frequently than this would be better.
+ */
+ UpdateDecodingStats(ctx);
}
/*
}
ReorderBufferAbort(ctx->reorder, xid, buf->record->EndRecPtr);
+
+ /* update the decoding stats */
+ UpdateDecodingStats(ctx);
}
/*
#include "access/xlog_internal.h"
#include "fmgr.h"
#include "miscadmin.h"
+#include "pgstat.h"
#include "replication/decode.h"
#include "replication/logical.h"
#include "replication/origin.h"
CheckXidAlive = InvalidTransactionId;
bsysscan = false;
}
+
+/*
+ * Report stats for a slot.
+ */
+void
+UpdateDecodingStats(LogicalDecodingContext *ctx)
+{
+ ReorderBuffer *rb = ctx->reorder;
+
+ /*
+ * Nothing to do if we haven't spilled anything since the last time the
+ * stats has been sent.
+ */
+ if (rb->spillBytes <= 0)
+ return;
+
+ elog(DEBUG2, "UpdateSpillStats: updating stats %p %lld %lld %lld",
+ rb,
+ (long long) rb->spillTxns,
+ (long long) rb->spillCount,
+ (long long) rb->spillBytes);
+
+ pgstat_report_replslot(NameStr(ctx->slot->data.name),
+ rb->spillTxns, rb->spillCount, rb->spillBytes);
+ rb->spillTxns = 0;
+ rb->spillCount = 0;
+ rb->spillBytes = 0;
+}
buffer->outbufsize = 0;
buffer->size = 0;
+ buffer->spillTxns = 0;
+ buffer->spillCount = 0;
+ buffer->spillBytes = 0;
+
buffer->current_restart_decoding_lsn = InvalidXLogRecPtr;
dlist_init(&buffer->toplevel_by_lsn);
{
ReorderBufferRestoreCleanup(rb, txn);
txn->txn_flags &= ~RBTXN_IS_SERIALIZED;
+
+ /*
+ * We set this flag to indicate if the transaction is ever serialized.
+ * We need this to accurately update the stats as otherwise the same
+ * transaction can be counted as serialized multiple times.
+ */
+ txn->txn_flags |= RBTXN_IS_SERIALIZED_CLEAR;
}
/* also reset the number of entries in the transaction */
int fd = -1;
XLogSegNo curOpenSegNo = 0;
Size spilled = 0;
+ Size size = txn->size;
elog(DEBUG2, "spill %u changes in XID %u to disk",
(uint32) txn->nentries_mem, txn->xid);
spilled++;
}
+ /* update the statistics iff we have spilled anything */
+ if (spilled)
+ {
+ rb->spillCount += 1;
+ rb->spillBytes += size;
+
+ /* don't consider already serialized transactions */
+ rb->spillTxns += (rbtxn_is_serialized(txn) || rbtxn_is_serialized_clear(txn)) ? 0 : 1;
+ }
+
Assert(spilled == txn->nentries_mem);
Assert(dlist_is_empty(&txn->changes));
txn->nentries_mem = 0;
int max_replication_slots = 0; /* the maximum number of replication
* slots */
-static ReplicationSlot *SearchNamedReplicationSlot(const char *name);
static int ReplicationSlotAcquireInternal(ReplicationSlot *slot,
const char *name, SlotAcquireBehavior behavior);
static void ReplicationSlotDropAcquired(void);
LWLockRelease(ReplicationSlotControlLock);
+ /*
+ * Create statistics entry for the new logical slot. We don't collect any
+ * stats for physical slots, so no need to create an entry for the same.
+ * See ReplicationSlotDropPtr for why we need to do this before releasing
+ * ReplicationSlotAllocationLock.
+ */
+ if (SlotIsLogical(slot))
+ pgstat_report_replslot(NameStr(slot->data.name), 0, 0, 0);
+
/*
* Now that the slot has been marked as in_use and active, it's safe to
* let somebody else try to allocate a slot.
*
* The caller must hold ReplicationSlotControlLock in shared mode.
*/
-static ReplicationSlot *
+ReplicationSlot *
SearchNamedReplicationSlot(const char *name)
{
int i;
ereport(WARNING,
(errmsg("could not remove directory \"%s\"", tmppath)));
+ /*
+ * Send a message to drop the replication slot to the stats collector.
+ * Since there is no guarantee of the order of message transfer on a UDP
+ * connection, it's possible that a message for creating a new slot
+ * reaches before a message for removing the old slot. We send the drop
+ * and create messages while holding ReplicationSlotAllocationLock to
+ * reduce that possibility. If the messages reached in reverse, we would
+ * lose one statistics update message. But the next update message will
+ * create the statistics for the replication slot.
+ */
+ if (SlotIsLogical(slot))
+ pgstat_report_replslot_drop(NameStr(slot->data.name));
+
/*
* We release this at the very end, so that nobody starts trying to create
* a slot while we're still cleaning up the detritus of the old one.
PG_RETURN_VOID();
}
+/* Reset replication slots stats (a specific one or all of them). */
+Datum
+pg_stat_reset_replication_slot(PG_FUNCTION_ARGS)
+{
+ char *target = NULL;
+
+ if (!PG_ARGISNULL(0))
+ target = text_to_cstring(PG_GETARG_TEXT_PP(0));
+
+ pgstat_reset_replslot_counter(target);
+
+ PG_RETURN_VOID();
+}
+
Datum
pg_stat_get_archiver(PG_FUNCTION_ARGS)
{
/* Returns the record as Datum */
PG_RETURN_DATUM(HeapTupleGetDatum(heap_form_tuple(tupdesc, values, nulls)));
}
+
+/* Get the statistics for the replication slots */
+Datum
+pg_stat_get_replication_slots(PG_FUNCTION_ARGS)
+{
+#define PG_STAT_GET_REPLICATION_SLOT_CLOS 5
+ ReturnSetInfo *rsinfo = (ReturnSetInfo *) fcinfo->resultinfo;
+ TupleDesc tupdesc;
+ Tuplestorestate *tupstore;
+ MemoryContext per_query_ctx;
+ MemoryContext oldcontext;
+ PgStat_ReplSlotStats *slotstats;
+ int nstats;
+ int i;
+
+ /* check to see if caller supports us returning a tuplestore */
+ if (rsinfo == NULL || !IsA(rsinfo, ReturnSetInfo))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("set-valued function called in context that cannot accept a set")));
+ if (!(rsinfo->allowedModes & SFRM_Materialize))
+ ereport(ERROR,
+ (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+ errmsg("materialize mode required, but it is not allowed in this context")));
+
+ /* Build a tuple descriptor for our result type */
+ if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
+ elog(ERROR, "return type must be a row type");
+
+ per_query_ctx = rsinfo->econtext->ecxt_per_query_memory;
+ oldcontext = MemoryContextSwitchTo(per_query_ctx);
+
+ tupstore = tuplestore_begin_heap(true, false, work_mem);
+ rsinfo->returnMode = SFRM_Materialize;
+ rsinfo->setResult = tupstore;
+ rsinfo->setDesc = tupdesc;
+
+ MemoryContextSwitchTo(oldcontext);
+
+ slotstats = pgstat_fetch_replslot(&nstats);
+ for (i = 0; i < nstats; i++)
+ {
+ Datum values[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ bool nulls[PG_STAT_GET_REPLICATION_SLOT_CLOS];
+ PgStat_ReplSlotStats *s = &(slotstats[i]);
+
+ MemSet(values, 0, sizeof(values));
+ MemSet(nulls, 0, sizeof(nulls));
+
+ values[0] = PointerGetDatum(cstring_to_text(s->slotname));
+ values[1] = Int64GetDatum(s->spill_txns);
+ values[2] = Int64GetDatum(s->spill_count);
+ values[3] = Int64GetDatum(s->spill_bytes);
+
+ if (s->stat_reset_timestamp == 0)
+ nulls[4] = true;
+ else
+ values[4] = TimestampTzGetDatum(s->stat_reset_timestamp);
+
+ tuplestore_putvalues(tupstore, tupdesc, values, nulls);
+ }
+
+ tuplestore_donestoring(tupstore);
+
+ return (Datum) 0;
+}
*/
/* yyyymmddN */
-#define CATALOG_VERSION_NO 202010021
+#define CATALOG_VERSION_NO 202010081
#endif
proargmodes => '{o,o,o,o,o,o,o,o,o,o,o,o,o,o,o}',
proargnames => '{pid,status,receive_start_lsn,receive_start_tli,written_lsn,flushed_lsn,received_tli,last_msg_send_time,last_msg_receipt_time,latest_end_lsn,latest_end_time,slot_name,sender_host,sender_port,conninfo}',
prosrc => 'pg_stat_get_wal_receiver' },
+{ oid => '8595', descr => 'statistics: information about replication slots',
+ proname => 'pg_stat_get_replication_slots', prorows => '10', proisstrict => 'f',
+ proretset => 't', provolatile => 's', proparallel => 'r',
+ prorettype => 'record', proargtypes => '',
+ proallargtypes => '{text,int8,int8,int8,timestamptz}',
+ proargmodes => '{o,o,o,o,o}',
+ proargnames => '{name,spill_txns,spill_count,spill_bytes,stats_reset}',
+ prosrc => 'pg_stat_get_replication_slots' },
{ oid => '6118', descr => 'statistics: information about subscription',
proname => 'pg_stat_get_subscription', proisstrict => 'f', provolatile => 's',
proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
descr => 'statistics: reset collected statistics for a single SLRU',
proname => 'pg_stat_reset_slru', proisstrict => 'f', provolatile => 'v',
prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_slru' },
+{ oid => '8596',
+ descr => 'statistics: reset collected statistics for a single replication slot',
+ proname => 'pg_stat_reset_replication_slot', proisstrict => 'f', provolatile => 'v',
+ prorettype => 'void', proargtypes => 'text', prosrc => 'pg_stat_reset_replication_slot' },
{ oid => '3163', descr => 'current trigger depth',
proname => 'pg_trigger_depth', provolatile => 's', proparallel => 'r',
PGSTAT_MTYPE_RESETSHAREDCOUNTER,
PGSTAT_MTYPE_RESETSINGLECOUNTER,
PGSTAT_MTYPE_RESETSLRUCOUNTER,
+ PGSTAT_MTYPE_RESETREPLSLOTCOUNTER,
PGSTAT_MTYPE_AUTOVAC_START,
PGSTAT_MTYPE_VACUUM,
PGSTAT_MTYPE_ANALYZE,
PGSTAT_MTYPE_RECOVERYCONFLICT,
PGSTAT_MTYPE_TEMPFILE,
PGSTAT_MTYPE_DEADLOCK,
- PGSTAT_MTYPE_CHECKSUMFAILURE
+ PGSTAT_MTYPE_CHECKSUMFAILURE,
+ PGSTAT_MTYPE_REPLSLOT,
} StatMsgType;
/* ----------
int m_index;
} PgStat_MsgResetslrucounter;
+/* ----------
+ * PgStat_MsgResetreplslotcounter Sent by the backend to tell the collector
+ * to reset replication slot counter(s)
+ * ----------
+ */
+typedef struct PgStat_MsgResetreplslotcounter
+{
+ PgStat_MsgHdr m_hdr;
+ char m_slotname[NAMEDATALEN];
+ bool clearall;
+} PgStat_MsgResetreplslotcounter;
+
/* ----------
* PgStat_MsgAutovacStart Sent by the autovacuum daemon to signal
* that a database is going to be processed
PgStat_Counter m_truncate;
} PgStat_MsgSLRU;
+/* ----------
+ * PgStat_MsgReplSlot Sent by a backend or a wal sender to update replication
+ * slot statistics.
+ * ----------
+ */
+typedef struct PgStat_MsgReplSlot
+{
+ PgStat_MsgHdr m_hdr;
+ char m_slotname[NAMEDATALEN];
+ bool m_drop;
+ PgStat_Counter m_spill_txns;
+ PgStat_Counter m_spill_count;
+ PgStat_Counter m_spill_bytes;
+} PgStat_MsgReplSlot;
+
+
/* ----------
* PgStat_MsgRecoveryConflict Sent by the backend upon recovery conflict
* ----------
PgStat_MsgResetsharedcounter msg_resetsharedcounter;
PgStat_MsgResetsinglecounter msg_resetsinglecounter;
PgStat_MsgResetslrucounter msg_resetslrucounter;
+ PgStat_MsgResetreplslotcounter msg_resetreplslotcounter;
PgStat_MsgAutovacStart msg_autovacuum_start;
PgStat_MsgVacuum msg_vacuum;
PgStat_MsgAnalyze msg_analyze;
PgStat_MsgDeadlock msg_deadlock;
PgStat_MsgTempFile msg_tempfile;
PgStat_MsgChecksumFailure msg_checksumfailure;
+ PgStat_MsgReplSlot msg_replslot;
} PgStat_Msg;
* ------------------------------------------------------------
*/
-#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9E
+#define PGSTAT_FILE_FORMAT_ID 0x01A5BC9F
/* ----------
* PgStat_StatDBEntry The collector's data per database
TimestampTz stat_reset_timestamp;
} PgStat_SLRUStats;
+/*
+ * Replication slot statistics kept in the stats collector
+ */
+typedef struct PgStat_ReplSlotStats
+{
+ char slotname[NAMEDATALEN];
+ PgStat_Counter spill_txns;
+ PgStat_Counter spill_count;
+ PgStat_Counter spill_bytes;
+ TimestampTz stat_reset_timestamp;
+} PgStat_ReplSlotStats;
/* ----------
* Backend states
extern void pgstat_reset_shared_counters(const char *);
extern void pgstat_reset_single_counter(Oid objectid, PgStat_Single_Reset_Type type);
extern void pgstat_reset_slru_counter(const char *);
+extern void pgstat_reset_replslot_counter(const char *name);
extern void pgstat_report_autovac(Oid dboid);
extern void pgstat_report_vacuum(Oid tableoid, bool shared,
extern void pgstat_report_deadlock(void);
extern void pgstat_report_checksum_failures_in_db(Oid dboid, int failurecount);
extern void pgstat_report_checksum_failure(void);
+extern void pgstat_report_replslot(const char *slotname, int spilltxns, int spillcount,
+ int spillbytes);
+extern void pgstat_report_replslot_drop(const char *slotname);
extern void pgstat_initialize(void);
extern void pgstat_bestart(void);
extern PgStat_GlobalStats *pgstat_fetch_global(void);
extern PgStat_WalStats *pgstat_fetch_stat_wal(void);
extern PgStat_SLRUStats *pgstat_fetch_slru(void);
+extern PgStat_ReplSlotStats *pgstat_fetch_replslot(int *nslots_p);
extern void pgstat_count_slru_page_zeroed(int slru_idx);
extern void pgstat_count_slru_page_hit(int slru_idx);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
+extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
#endif
#define RBTXN_HAS_CATALOG_CHANGES 0x0001
#define RBTXN_IS_SUBXACT 0x0002
#define RBTXN_IS_SERIALIZED 0x0004
-#define RBTXN_IS_STREAMED 0x0008
-#define RBTXN_HAS_TOAST_INSERT 0x0010
-#define RBTXN_HAS_SPEC_INSERT 0x0020
+#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
+#define RBTXN_IS_STREAMED 0x0010
+#define RBTXN_HAS_TOAST_INSERT 0x0020
+#define RBTXN_HAS_SPEC_INSERT 0x0040
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
((txn)->txn_flags & RBTXN_IS_SERIALIZED) != 0 \
)
+/* Has this transaction ever been spilled to disk? */
+#define rbtxn_is_serialized_clear(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_IS_SERIALIZED_CLEAR) != 0 \
+)
+
/* This transaction's changes has toast insert, without main table insert. */
#define rbtxn_has_toast_insert(txn) \
( \
/* memory accounting */
Size size;
+
+ /*
+ * Statistics about transactions spilled to disk.
+ *
+ * A single transaction may be spilled repeatedly, which is why we keep
+ * two different counters. For spilling, the transaction counter includes
+ * both toplevel transactions and subtransactions.
+ */
+ int64 spillTxns; /* number of transactions spilled to disk */
+ int64 spillCount; /* spill-to-disk invocation counter */
+ int64 spillBytes; /* amount of data spilled to disk */
};
extern bool ReplicationSlotsCountDBSlots(Oid dboid, int *nslots, int *nactive);
extern void ReplicationSlotsDropDBSlots(Oid dboid);
extern void InvalidateObsoleteReplicationSlots(XLogSegNo oldestSegno);
+extern ReplicationSlot *SearchNamedReplicationSlot(const char *name);
extern void StartupReplicationSlots(void);
extern void CheckPointReplicationSlots(void);
FROM ((pg_stat_get_activity(NULL::integer) s(datid, pid, usesysid, application_name, state, query, wait_event_type, wait_event, xact_start, query_start, backend_start, state_change, client_addr, client_hostname, client_port, backend_xid, backend_xmin, backend_type, ssl, sslversion, sslcipher, sslbits, sslcompression, ssl_client_dn, ssl_client_serial, ssl_issuer_dn, gss_auth, gss_princ, gss_enc, leader_pid)
JOIN pg_stat_get_wal_senders() w(pid, state, sent_lsn, write_lsn, flush_lsn, replay_lsn, write_lag, flush_lag, replay_lag, sync_priority, sync_state, reply_time) ON ((s.pid = w.pid)))
LEFT JOIN pg_authid u ON ((s.usesysid = u.oid)));
+pg_stat_replication_slots| SELECT s.name,
+ s.spill_txns,
+ s.spill_count,
+ s.spill_bytes,
+ s.stats_reset
+ FROM pg_stat_get_replication_slots() s(name, spill_txns, spill_count, spill_bytes, stats_reset);
pg_stat_slru| SELECT s.name,
s.blks_zeroed,
s.blks_hit,
PgStat_MsgHdr
PgStat_MsgInquiry
PgStat_MsgRecoveryConflict
+PgStat_MsgReplSlot
PgStat_MsgResetcounter
+PgStat_MsgResetreplslotcounter
PgStat_MsgResetsharedcounter
PgStat_MsgResetsinglecounter
PgStat_MsgResetslrucounter
PgStat_MsgTempFile
PgStat_MsgVacuum
PgStat_MsgWal
+PgStat_ReplSlotStats
PgStat_SLRUStats
PgStat_Shared_Reset_Target
PgStat_Single_Reset_Type