#include "common/connect.h"
 #include "funcapi.h"
 #include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
 #include "mb/pg_wchar.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 };
 
 /* Prototypes for private functions */
-static PGresult *libpqrcv_PQexec(PGconn *streamConn, const char *query);
-static PGresult *libpqrcv_PQgetResult(PGconn *streamConn);
 static char *stringlist_to_identifierstr(PGconn *conn, List *strings);
 
 /*
                 bool must_use_password, const char *appname, char **err)
 {
    WalReceiverConn *conn;
-   PostgresPollingStatusType status;
    const char *keys[6];
    const char *vals[6];
    int         i = 0;
    Assert(i < lengthof(keys));
 
    conn = palloc0(sizeof(WalReceiverConn));
-   conn->streamConn = PQconnectStartParams(keys, vals,
-                                            /* expand_dbname = */ true);
-   if (PQstatus(conn->streamConn) == CONNECTION_BAD)
-       goto bad_connection_errmsg;
-
-   /*
-    * Poll connection until we have OK or FAILED status.
-    *
-    * Per spec for PQconnectPoll, first wait till socket is write-ready.
-    */
-   status = PGRES_POLLING_WRITING;
-   do
-   {
-       int         io_flag;
-       int         rc;
-
-       if (status == PGRES_POLLING_READING)
-           io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
-       /* Windows needs a different test while waiting for connection-made */
-       else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
-           io_flag = WL_SOCKET_CONNECTED;
-#endif
-       else
-           io_flag = WL_SOCKET_WRITEABLE;
-
-       rc = WaitLatchOrSocket(MyLatch,
-                              WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
-                              PQsocket(conn->streamConn),
-                              0,
-                              WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
-       /* Interrupted? */
-       if (rc & WL_LATCH_SET)
-       {
-           ResetLatch(MyLatch);
-           ProcessWalRcvInterrupts();
-       }
-
-       /* If socket is ready, advance the libpq state machine */
-       if (rc & io_flag)
-           status = PQconnectPoll(conn->streamConn);
-   } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+   conn->streamConn =
+       libpqsrv_connect_params(keys, vals,
+                                /* expand_dbname = */ true,
+                               WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
 
    if (PQstatus(conn->streamConn) != CONNECTION_OK)
        goto bad_connection_errmsg;
 
    if (must_use_password && !PQconnectionUsedPassword(conn->streamConn))
    {
-       PQfinish(conn->streamConn);
+       libpqsrv_disconnect(conn->streamConn);
        pfree(conn);
 
        ereport(ERROR,
    {
        PGresult   *res;
 
-       res = libpqrcv_PQexec(conn->streamConn,
-                             ALWAYS_SECURE_SEARCH_PATH_SQL);
+       res = libpqsrv_exec(conn->streamConn,
+                           ALWAYS_SECURE_SEARCH_PATH_SQL,
+                           WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
        {
            PQclear(res);
 
    /* error path, error already set */
 bad_connection:
-   PQfinish(conn->streamConn);
+   libpqsrv_disconnect(conn->streamConn);
    pfree(conn);
    return NULL;
 }
     * Get the system identifier and timeline ID as a DataRow message from the
     * primary server.
     */
-   res = libpqrcv_PQexec(conn->streamConn, "IDENTIFY_SYSTEM");
+   res = libpqsrv_exec(conn->streamConn,
+                       "IDENTIFY_SYSTEM",
+                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        PQclear(res);
                         options->proto.physical.startpointTLI);
 
    /* Start streaming. */
-   res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+   res = libpqsrv_exec(conn->streamConn,
+                       cmd.data,
+                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    pfree(cmd.data);
 
    if (PQresultStatus(res) == PGRES_COMMAND_OK)
    PGresult   *res;
 
    /*
-    * Send copy-end message.  As in libpqrcv_PQexec, this could theoretically
+    * Send copy-end message.  As in libpqsrv_exec, this could theoretically
     * block, but the risk seems small.
     */
    if (PQputCopyEnd(conn->streamConn, NULL) <= 0 ||
     * If we had not yet received CopyDone from the backend, PGRES_COPY_OUT is
     * also possible in case we aborted the copy in mid-stream.
     */
-   res = libpqrcv_PQgetResult(conn->streamConn);
+   res = libpqsrv_get_result(conn->streamConn,
+                             WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    if (PQresultStatus(res) == PGRES_TUPLES_OK)
    {
        /*
        PQclear(res);
 
        /* the result set should be followed by CommandComplete */
-       res = libpqrcv_PQgetResult(conn->streamConn);
+       res = libpqsrv_get_result(conn->streamConn,
+                                 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    }
    else if (PQresultStatus(res) == PGRES_COPY_OUT)
    {
                            pchomp(PQerrorMessage(conn->streamConn)))));
 
        /* CommandComplete should follow */
-       res = libpqrcv_PQgetResult(conn->streamConn);
+       res = libpqsrv_get_result(conn->streamConn,
+                                 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    }
 
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    PQclear(res);
 
    /* Verify that there are no more results */
-   res = libpqrcv_PQgetResult(conn->streamConn);
+   res = libpqsrv_get_result(conn->streamConn,
+                             WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    if (res != NULL)
        ereport(ERROR,
                (errcode(ERRCODE_PROTOCOL_VIOLATION),
     * Request the primary to send over the history file for given timeline.
     */
    snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
-   res = libpqrcv_PQexec(conn->streamConn, cmd);
+   res = libpqsrv_exec(conn->streamConn,
+                       cmd,
+                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
    {
        PQclear(res);
    PQclear(res);
 }
 
-/*
- * Send a query and wait for the results by using the asynchronous libpq
- * functions and socket readiness events.
- *
- * The function is modeled on libpqsrv_exec(), with the behavior difference
- * being that it calls ProcessWalRcvInterrupts().  As an optimization, it
- * skips try/catch, since all errors terminate the process.
- *
- * May return NULL, rather than an error result, on failure.
- */
-static PGresult *
-libpqrcv_PQexec(PGconn *streamConn, const char *query)
-{
-   PGresult   *lastResult = NULL;
-
-   /*
-    * PQexec() silently discards any prior query results on the connection.
-    * This is not required for this function as it's expected that the caller
-    * (which is this library in all cases) will behave correctly and we don't
-    * have to be backwards compatible with old libpq.
-    */
-
-   /*
-    * Submit the query.  Since we don't use non-blocking mode, this could
-    * theoretically block.  In practice, since we don't send very long query
-    * strings, the risk seems negligible.
-    */
-   if (!PQsendQuery(streamConn, query))
-       return NULL;
-
-   for (;;)
-   {
-       /* Wait for, and collect, the next PGresult. */
-       PGresult   *result;
-
-       result = libpqrcv_PQgetResult(streamConn);
-       if (result == NULL)
-           break;              /* query is complete, or failure */
-
-       /*
-        * Emulate PQexec()'s behavior of returning the last result when there
-        * are many.  We are fine with returning just last error message.
-        */
-       PQclear(lastResult);
-       lastResult = result;
-
-       if (PQresultStatus(lastResult) == PGRES_COPY_IN ||
-           PQresultStatus(lastResult) == PGRES_COPY_OUT ||
-           PQresultStatus(lastResult) == PGRES_COPY_BOTH ||
-           PQstatus(streamConn) == CONNECTION_BAD)
-           break;
-   }
-
-   return lastResult;
-}
-
-/*
- * Perform the equivalent of PQgetResult(), but watch for interrupts.
- */
-static PGresult *
-libpqrcv_PQgetResult(PGconn *streamConn)
-{
-   /*
-    * Collect data until PQgetResult is ready to get the result without
-    * blocking.
-    */
-   while (PQisBusy(streamConn))
-   {
-       int         rc;
-
-       /*
-        * We don't need to break down the sleep into smaller increments,
-        * since we'll get interrupted by signals and can handle any
-        * interrupts here.
-        */
-       rc = WaitLatchOrSocket(MyLatch,
-                              WL_EXIT_ON_PM_DEATH | WL_SOCKET_READABLE |
-                              WL_LATCH_SET,
-                              PQsocket(streamConn),
-                              0,
-                              WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
-
-       /* Interrupted? */
-       if (rc & WL_LATCH_SET)
-       {
-           ResetLatch(MyLatch);
-           ProcessWalRcvInterrupts();
-       }
-
-       /* Consume whatever data is available from the socket */
-       if (PQconsumeInput(streamConn) == 0)
-       {
-           /* trouble; return NULL */
-           return NULL;
-       }
-   }
-
-   /* Now we can collect and return the next PGresult */
-   return PQgetResult(streamConn);
-}
-
 /*
  * Disconnect connection to primary, if any.
  */
 static void
 libpqrcv_disconnect(WalReceiverConn *conn)
 {
-   PQfinish(conn->streamConn);
+   libpqsrv_disconnect(conn->streamConn);
    PQfreemem(conn->recvBuf);
    pfree(conn);
 }
    {
        PGresult   *res;
 
-       res = libpqrcv_PQgetResult(conn->streamConn);
+       res = libpqsrv_get_result(conn->streamConn,
+                                 WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
        if (PQresultStatus(res) == PGRES_COMMAND_OK)
        {
            PQclear(res);
 
            /* Verify that there are no more results. */
-           res = libpqrcv_PQgetResult(conn->streamConn);
+           res = libpqsrv_get_result(conn->streamConn,
+                                     WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
            if (res != NULL)
            {
                PQclear(res);
            appendStringInfoString(&cmd, " PHYSICAL RESERVE_WAL");
    }
 
-   res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+   res = libpqsrv_exec(conn->streamConn,
+                       cmd.data,
+                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    pfree(cmd.data);
 
    if (PQresultStatus(res) != PGRES_TUPLES_OK)
 
    appendStringInfoString(&cmd, " );");
 
-   res = libpqrcv_PQexec(conn->streamConn, cmd.data);
+   res = libpqsrv_exec(conn->streamConn, cmd.data,
+                       WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
    pfree(cmd.data);
 
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
    {
        char       *cstrs[MaxTupleAttributeNumber];
 
-       ProcessWalRcvInterrupts();
+       CHECK_FOR_INTERRUPTS();
 
        /* Do the allocations in temporary context. */
        oldcontext = MemoryContextSwitchTo(rowcontext);
                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                 errmsg("the query interface requires a database connection")));
 
-   pgres = libpqrcv_PQexec(conn->streamConn, query);
+   pgres = libpqsrv_exec(conn->streamConn,
+                         query,
+                         WAIT_EVENT_LIBPQWALRECEIVER_RECEIVE);
 
    switch (PQresultStatus(pgres))
    {
 
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/procsignal.h"
+#include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/builtins.h"
 #include "utils/guc.h"
 static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 static void WalRcvComputeNextWakeup(WalRcvWakeupReason reason, TimestampTz now);
 
-/*
- * Process any interrupts the walreceiver process may have received.
- * This should be called any time the process's latch has become set.
- *
- * Currently, only SIGTERM is of interest.  We can't just exit(1) within the
- * SIGTERM signal handler, because the signal might arrive in the middle of
- * some critical operation, like while we're holding a spinlock.  Instead, the
- * signal handler sets a flag variable as well as setting the process's latch.
- * We must check the flag (by calling ProcessWalRcvInterrupts) anytime the
- * latch has become set.  Operations that could block for a long time, such as
- * reading from a remote server, must pay attention to the latch too; see
- * libpqrcv_PQgetResult for example.
- */
-void
-ProcessWalRcvInterrupts(void)
-{
-   /*
-    * Although walreceiver interrupt handling doesn't use the same scheme as
-    * regular backends, call CHECK_FOR_INTERRUPTS() to make sure we receive
-    * any incoming signals on Win32, and also to make sure we process any
-    * barrier events.
-    */
-   CHECK_FOR_INTERRUPTS();
-
-   if (ShutdownRequestPending)
-   {
-       ereport(FATAL,
-               (errcode(ERRCODE_ADMIN_SHUTDOWN),
-                errmsg("terminating walreceiver process due to administrator command")));
-   }
-}
-
 
 /* Main entry point for walreceiver process */
 void
    pqsignal(SIGHUP, SignalHandlerForConfigReload); /* set flag to read config
                                                     * file */
    pqsignal(SIGINT, SIG_IGN);
-   pqsignal(SIGTERM, SignalHandlerForShutdownRequest); /* request shutdown */
+   pqsignal(SIGTERM, die);     /* request shutdown */
    /* SIGQUIT handler was already set up by InitPostmasterChild */
    pqsignal(SIGALRM, SIG_IGN);
    pqsignal(SIGPIPE, SIG_IGN);
                             errmsg("cannot continue WAL streaming, recovery has already ended")));
 
                /* Process any requests or signals received recently */
-               ProcessWalRcvInterrupts();
+               CHECK_FOR_INTERRUPTS();
 
                if (ConfigReloadPending)
                {
                if (rc & WL_LATCH_SET)
                {
                    ResetLatch(MyLatch);
-                   ProcessWalRcvInterrupts();
+                   CHECK_FOR_INTERRUPTS();
 
                    if (walrcv->force_reply)
                    {
    {
        ResetLatch(MyLatch);
 
-       ProcessWalRcvInterrupts();
+       CHECK_FOR_INTERRUPTS();
 
        SpinLockAcquire(&walrcv->mutex);
        Assert(walrcv->walRcvState == WALRCV_RESTARTING ||