Add condition variable for walreceiver shutdown.
authorThomas Munro <[email protected]>
Fri, 12 Mar 2021 06:07:27 +0000 (19:07 +1300)
committerThomas Munro <[email protected]>
Fri, 12 Mar 2021 06:45:42 +0000 (19:45 +1300)
Use this new CV to wait for walreceiver shutdown without a sleep/poll
loop, while also benefiting from standard postmaster death handling.

Discussion: https://postgr.es/m/CA%2BhUKGK1607VmtrDUHQXrsooU%3Dap4g4R2yaoByWOOA3m8xevUQ%40mail.gmail.com

doc/src/sgml/monitoring.sgml
src/backend/postmaster/pgstat.c
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/include/pgstat.h
src/include/replication/walreceiver.h

index 1ba813bbb9ab65ffd07081ade0dbd50a3a1a3805..c35045faa1a701865c5965c58562d3e97b37758d 100644 (file)
@@ -1766,6 +1766,10 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
       <entry>Waiting for confirmation from a remote server during synchronous
        replication.</entry>
      </row>
+     <row>
+      <entry><literal>WalrcvExit</literal></entry>
+      <entry>Waiting for the walreceiver to exit.</entry>
+     </row>
      <row>
       <entry><literal>XactGroupUpdate</literal></entry>
       <entry>Waiting for the group leader to update transaction status at
index 68eefb97227fa0c68b5fb7330ba7d3c5000beef8..b1e2d94951d3e0290ee7b407d9e1414dad7a45d7 100644 (file)
@@ -4124,6 +4124,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
        case WAIT_EVENT_SYNC_REP:
            event_name = "SyncRep";
            break;
+       case WAIT_EVENT_WALRCV_EXIT:
+           event_name = "WalrcvExit";
+           break;
        case WAIT_EVENT_XACT_GROUP_UPDATE:
            event_name = "XactGroupUpdate";
            break;
index e5f8a06fea041fbe71883c990354206f9035d127..8532296f26c1f399f435bf05435e41ffc269b701 100644 (file)
@@ -207,6 +207,7 @@ WalReceiverMain(void)
 
        case WALRCV_STOPPED:
            SpinLockRelease(&walrcv->mutex);
+           ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
            proc_exit(1);
            break;
 
@@ -784,6 +785,8 @@ WalRcvDie(int code, Datum arg)
    walrcv->latch = NULL;
    SpinLockRelease(&walrcv->mutex);
 
+   ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
    /* Terminate the connection gracefully. */
    if (wrconn != NULL)
        walrcv_disconnect(wrconn);
index 63e60478ea6e8f4b7041ac28808c039958dfe3a4..fff6c54c45d3f1507757f8408f33a5d310bd2772 100644 (file)
@@ -23,6 +23,7 @@
 #include <signal.h>
 
 #include "access/xlog_internal.h"
+#include "pgstat.h"
 #include "postmaster/startup.h"
 #include "replication/walreceiver.h"
 #include "storage/pmsignal.h"
@@ -62,6 +63,7 @@ WalRcvShmemInit(void)
        /* First time through, so initialize */
        MemSet(WalRcv, 0, WalRcvShmemSize());
        WalRcv->walRcvState = WALRCV_STOPPED;
+       ConditionVariableInit(&WalRcv->walRcvStoppedCV);
        SpinLockInit(&WalRcv->mutex);
        pg_atomic_init_u64(&WalRcv->writtenUpto, 0);
        WalRcv->latch = NULL;
@@ -95,12 +97,18 @@ WalRcvRunning(void)
 
        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
-           SpinLockAcquire(&walrcv->mutex);
+           bool        stopped = false;
 
+           SpinLockAcquire(&walrcv->mutex);
            if (walrcv->walRcvState == WALRCV_STARTING)
+           {
                state = walrcv->walRcvState = WALRCV_STOPPED;
-
+               stopped = true;
+           }
            SpinLockRelease(&walrcv->mutex);
+
+           if (stopped)
+               ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
        }
    }
 
@@ -140,12 +148,18 @@ WalRcvStreaming(void)
 
        if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
        {
-           SpinLockAcquire(&walrcv->mutex);
+           bool        stopped = false;
 
+           SpinLockAcquire(&walrcv->mutex);
            if (walrcv->walRcvState == WALRCV_STARTING)
+           {
                state = walrcv->walRcvState = WALRCV_STOPPED;
-
+               stopped = true;
+           }
            SpinLockRelease(&walrcv->mutex);
+
+           if (stopped)
+               ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
        }
    }
 
@@ -165,6 +179,7 @@ ShutdownWalRcv(void)
 {
    WalRcvData *walrcv = WalRcv;
    pid_t       walrcvpid = 0;
+   bool        stopped = false;
 
    /*
     * Request walreceiver to stop. Walreceiver will switch to WALRCV_STOPPED
@@ -178,6 +193,7 @@ ShutdownWalRcv(void)
            break;
        case WALRCV_STARTING:
            walrcv->walRcvState = WALRCV_STOPPED;
+           stopped = true;
            break;
 
        case WALRCV_STREAMING:
@@ -191,6 +207,10 @@ ShutdownWalRcv(void)
    }
    SpinLockRelease(&walrcv->mutex);
 
+   /* Unnecessary but consistent. */
+   if (stopped)
+       ConditionVariableBroadcast(&walrcv->walRcvStoppedCV);
+
    /*
     * Signal walreceiver process if it was still running.
     */
@@ -201,16 +221,11 @@ ShutdownWalRcv(void)
     * Wait for walreceiver to acknowledge its death by setting state to
     * WALRCV_STOPPED.
     */
+   ConditionVariablePrepareToSleep(&walrcv->walRcvStoppedCV);
    while (WalRcvRunning())
-   {
-       /*
-        * This possibly-long loop needs to handle interrupts of startup
-        * process.
-        */
-       HandleStartupProcInterrupts();
-
-       pg_usleep(100000);      /* 100ms */
-   }
+       ConditionVariableSleep(&walrcv->walRcvStoppedCV,
+                              WAIT_EVENT_WALRCV_EXIT);
+   ConditionVariableCancelSleep();
 }
 
 /*
index f9166b865584d77d0af01a55a8d61a39f722a583..be43c04802897d6b4f7f64c4383cb8d513dd88c1 100644 (file)
@@ -1009,6 +1009,7 @@ typedef enum
    WAIT_EVENT_REPLICATION_SLOT_DROP,
    WAIT_EVENT_SAFE_SNAPSHOT,
    WAIT_EVENT_SYNC_REP,
+   WAIT_EVENT_WALRCV_EXIT,
    WAIT_EVENT_XACT_GROUP_UPDATE
 } WaitEventIPC;
 
index a97a59a6a30aaedf74b356f01094cdef4ee3af46..4fd7c25ea74e88f01779f5c5798476e780f43e90 100644 (file)
@@ -19,6 +19,7 @@
 #include "port/atomics.h"
 #include "replication/logicalproto.h"
 #include "replication/walsender.h"
+#include "storage/condition_variable.h"
 #include "storage/latch.h"
 #include "storage/spin.h"
 #include "utils/tuplestore.h"
@@ -62,6 +63,7 @@ typedef struct
     */
    pid_t       pid;
    WalRcvState walRcvState;
+   ConditionVariable walRcvStoppedCV;
    pg_time_t   startTime;
 
    /*