DetermineSlotInvalidationCause(uint32 possible_causes, ReplicationSlot *s,
XLogRecPtr oldestLSN, Oid dboid,
TransactionId snapshotConflictHorizon,
- TransactionId initial_effective_xmin,
- TransactionId initial_catalog_effective_xmin,
- XLogRecPtr initial_restart_lsn,
TimestampTz *inactive_since, TimestampTz now)
{
Assert(possible_causes != RS_INVAL_NONE);
if (possible_causes & RS_INVAL_WAL_REMOVED)
{
- if (initial_restart_lsn != InvalidXLogRecPtr &&
- initial_restart_lsn < oldestLSN)
+ XLogRecPtr restart_lsn = s->data.restart_lsn;
+
+ if (restart_lsn != InvalidXLogRecPtr &&
+ restart_lsn < oldestLSN)
return RS_INVAL_WAL_REMOVED;
}
if (SlotIsLogical(s) &&
(dboid == InvalidOid || dboid == s->data.database))
{
- if (TransactionIdIsValid(initial_effective_xmin) &&
- TransactionIdPrecedesOrEquals(initial_effective_xmin,
+ TransactionId effective_xmin = s->effective_xmin;
+ TransactionId catalog_effective_xmin = s->effective_catalog_xmin;
+
+ if (TransactionIdIsValid(effective_xmin) &&
+ TransactionIdPrecedesOrEquals(effective_xmin,
snapshotConflictHorizon))
return RS_INVAL_HORIZON;
- else if (TransactionIdIsValid(initial_catalog_effective_xmin) &&
- TransactionIdPrecedesOrEquals(initial_catalog_effective_xmin,
+ else if (TransactionIdIsValid(catalog_effective_xmin) &&
+ TransactionIdPrecedesOrEquals(catalog_effective_xmin,
snapshotConflictHorizon))
return RS_INVAL_HORIZON;
}
{
int last_signaled_pid = 0;
bool released_lock = false;
- bool terminated = false;
- TransactionId initial_effective_xmin = InvalidTransactionId;
- TransactionId initial_catalog_effective_xmin = InvalidTransactionId;
- XLogRecPtr initial_restart_lsn = InvalidXLogRecPtr;
- ReplicationSlotInvalidationCause invalidation_cause_prev PG_USED_FOR_ASSERTS_ONLY = RS_INVAL_NONE;
TimestampTz inactive_since = 0;
for (;;)
/* we do nothing if the slot is already invalid */
if (s->data.invalidated == RS_INVAL_NONE)
- {
- /*
- * The slot's mutex will be released soon, and it is possible that
- * those values change since the process holding the slot has been
- * terminated (if any), so record them here to ensure that we
- * would report the correct invalidation cause.
- *
- * Unlike other slot attributes, slot's inactive_since can't be
- * changed until the acquired slot is released or the owning
- * process is terminated. So, the inactive slot can only be
- * invalidated immediately without being terminated.
- */
- if (!terminated)
- {
- initial_restart_lsn = s->data.restart_lsn;
- initial_effective_xmin = s->effective_xmin;
- initial_catalog_effective_xmin = s->effective_catalog_xmin;
- }
-
invalidation_cause = DetermineSlotInvalidationCause(possible_causes,
s, oldestLSN,
dboid,
snapshotConflictHorizon,
- initial_effective_xmin,
- initial_catalog_effective_xmin,
- initial_restart_lsn,
&inactive_since,
now);
- }
-
- /*
- * The invalidation cause recorded previously should not change while
- * the process owning the slot (if any) has been terminated.
- */
- Assert(!(invalidation_cause_prev != RS_INVAL_NONE && terminated &&
- invalidation_cause_prev != invalidation_cause));
/* if there's no invalidation, we're done */
if (invalidation_cause == RS_INVAL_NONE)
* If the slot can be acquired, do so and mark it invalidated
* immediately. Otherwise we'll signal the owning process, below, and
* retry.
+ *
+ * Note: Unlike other slot attributes, slot's inactive_since can't be
+ * changed until the acquired slot is released or the owning process
+ * is terminated. So, the inactive slot can only be invalidated
+ * immediately without being terminated.
*/
if (active_pid == 0)
{
(void) kill(active_pid, SIGTERM);
last_signaled_pid = active_pid;
- terminated = true;
- invalidation_cause_prev = invalidation_cause;
}
/* Wait until the slot is released. */
* Re-acquire lock and start over; we expect to invalidate the
* slot next time (unless another process acquires the slot in the
* meantime).
+ *
+ * Note: It is possible for a slot to advance its restart_lsn or
+ * xmin values sufficiently between when we release the mutex and
+ * when we recheck, moving from a conflicting state to a non
+ * conflicting state. This is intentional and safe: if the slot
+ * has caught up while we're busy here, the resources we were
+ * concerned about (WAL segments or tuples) have not yet been
+ * removed, and there's no reason to invalidate the slot.
*/
LWLockAcquire(ReplicationSlotControlLock, LW_SHARED);
continue;