#include "common/connect.h"
#include "funcapi.h"
#include "libpq-fe.h"
+#include "libpq/libpq-be-fe-helpers.h"
#include "mb/pg_wchar.h"
#include "miscadmin.h"
#include "pgstat.h"
char **err)
{
WalReceiverConn *conn;
- PostgresPollingStatusType status;
const char *keys[6];
const char *vals[6];
int i = 0;
Assert(i < sizeof(keys));
conn = palloc0(sizeof(WalReceiverConn));
- conn->streamConn = PQconnectStartParams(keys, vals,
- /* expand_dbname = */ true);
- if (PQstatus(conn->streamConn) == CONNECTION_BAD)
- goto bad_connection_errmsg;
-
- /*
- * Poll connection until we have OK or FAILED status.
- *
- * Per spec for PQconnectPoll, first wait till socket is write-ready.
- */
- status = PGRES_POLLING_WRITING;
- do
- {
- int io_flag;
- int rc;
-
- if (status == PGRES_POLLING_READING)
- io_flag = WL_SOCKET_READABLE;
-#ifdef WIN32
- /* Windows needs a different test while waiting for connection-made */
- else if (PQstatus(conn->streamConn) == CONNECTION_STARTED)
- io_flag = WL_SOCKET_CONNECTED;
-#endif
- else
- io_flag = WL_SOCKET_WRITEABLE;
-
- rc = WaitLatchOrSocket(MyLatch,
- WL_EXIT_ON_PM_DEATH | WL_LATCH_SET | io_flag,
- PQsocket(conn->streamConn),
- 0,
- WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
-
- /* Interrupted? */
- if (rc & WL_LATCH_SET)
- {
- ResetLatch(MyLatch);
- ProcessWalRcvInterrupts();
- }
-
- /* If socket is ready, advance the libpq state machine */
- if (rc & io_flag)
- status = PQconnectPoll(conn->streamConn);
- } while (status != PGRES_POLLING_OK && status != PGRES_POLLING_FAILED);
+ conn->streamConn =
+ libpqsrv_connect_params(keys, vals,
+ /* expand_dbname = */ true,
+ WAIT_EVENT_LIBPQWALRECEIVER_CONNECT);
if (PQstatus(conn->streamConn) != CONNECTION_OK)
goto bad_connection_errmsg;
/* error path, error already set */
bad_connection:
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
pfree(conn);
return NULL;
}
static void
libpqrcv_disconnect(WalReceiverConn *conn)
{
- PQfinish(conn->streamConn);
+ libpqsrv_disconnect(conn->streamConn);
PQfreemem(conn->recvBuf);
pfree(conn);
}