SpinLockInit(&walsnd->mutex);
}
+
+ ConditionVariableInit(&WalSndCtl->wal_flush_cv);
+ ConditionVariableInit(&WalSndCtl->wal_replay_cv);
}
}
void
WalSndWakeup(bool physical, bool logical)
{
- int i;
-
- for (i = 0; i < max_wal_senders; i++)
- {
- Latch *latch;
- ReplicationKind kind;
- WalSnd *walsnd = &WalSndCtl->walsnds[i];
-
- /*
- * Get latch pointer with spinlock held, for the unlikely case that
- * pointer reads aren't atomic (as they're 8 bytes). While at it, also
- * get kind.
- */
- SpinLockAcquire(&walsnd->mutex);
- latch = walsnd->latch;
- kind = walsnd->kind;
- SpinLockRelease(&walsnd->mutex);
-
- if (latch == NULL)
- continue;
+ /*
+ * Wake up all the walsenders waiting on WAL being flushed or replayed
+ * respectively. Note that waiting walsender would have prepared to sleep
+ * on the CV (i.e., added itself to the CV's waitlist) in WalSndWait()
+ * before actually waiting.
+ */
+ if (physical)
+ ConditionVariableBroadcast(&WalSndCtl->wal_flush_cv);
- if ((physical && kind == REPLICATION_KIND_PHYSICAL) ||
- (logical && kind == REPLICATION_KIND_LOGICAL))
- SetLatch(latch);
- }
+ if (logical)
+ ConditionVariableBroadcast(&WalSndCtl->wal_replay_cv);
}
/*
WaitEvent event;
ModifyWaitEvent(FeBeWaitSet, FeBeWaitSetSocketPos, socket_events, NULL);
+
+ /*
+ * We use a condition variable to efficiently wake up walsenders in
+ * WalSndWakeup().
+ *
+ * Every walsender prepares to sleep on a shared memory CV. Note that it
+ * just prepares to sleep on the CV (i.e., adds itself to the CV's
+ * waitlist), but does not actually wait on the CV (IOW, it never calls
+ * ConditionVariableSleep()). It still uses WaitEventSetWait() for
+ * waiting, because we also need to wait for socket events. The processes
+ * (startup process, walreceiver etc.) wanting to wake up walsenders use
+ * ConditionVariableBroadcast(), which in turn calls SetLatch(), helping
+ * walsenders come out of WaitEventSetWait().
+ *
+ * This approach is simple and efficient because, one doesn't have to loop
+ * through all the walsenders slots, with a spinlock acquisition and
+ * release for every iteration, just to wake up only the waiting
+ * walsenders. It makes WalSndWakeup() callers' life easy.
+ *
+ * XXX: A desirable future improvement would be to add support for CVs
+ * into WaitEventSetWait().
+ *
+ * And, we use separate shared memory CVs for physical and logical
+ * walsenders for selective wake ups, see WalSndWakeup() for more details.
+ */
+ if (MyWalSnd->kind == REPLICATION_KIND_PHYSICAL)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_flush_cv);
+ else if (MyWalSnd->kind == REPLICATION_KIND_LOGICAL)
+ ConditionVariablePrepareToSleep(&WalSndCtl->wal_replay_cv);
+
if (WaitEventSetWait(FeBeWaitSet, timeout, &event, 1, wait_event) == 1 &&
(event.events & WL_POSTMASTER_DEATH))
+ {
+ ConditionVariableCancelSleep();
proc_exit(1);
+ }
+
+ ConditionVariableCancelSleep();
}
/*