#
 # Copyright (c) 1994, Regents of the University of California
 #
-# $PostgreSQL: pgsql/src/Makefile,v 1.49 2010/01/15 17:01:06 heikki Exp $
+# $PostgreSQL: pgsql/src/Makefile,v 1.50 2010/01/20 09:16:23 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
    $(MAKE) -C backend/snowball $@
    $(MAKE) -C include $@
    $(MAKE) -C interfaces $@
-   $(MAKE) -C backend/replication/walreceiver $@
+   $(MAKE) -C backend/replication/libpqwalreceiver $@
    $(MAKE) -C bin $@
    $(MAKE) -C pl $@
    $(MAKE) -C makefiles $@
    $(MAKE) -C backend/snowball $@
    $(MAKE) -C include $@
    $(MAKE) -C interfaces $@
-   $(MAKE) -C backend/replication/walreceiver $@
+   $(MAKE) -C backend/replication/libpqwalreceiver $@
    $(MAKE) -C bin $@
    $(MAKE) -C pl $@
    $(MAKE) -C makefiles $@
    $(MAKE) -C backend/snowball $@
    $(MAKE) -C include $@
    $(MAKE) -C interfaces $@
-   $(MAKE) -C backend/replication/walreceiver $@
+   $(MAKE) -C backend/replication/libpqwalreceiver $@
    $(MAKE) -C bin $@
    $(MAKE) -C pl $@
    $(MAKE) -C makefiles $@
    $(MAKE) -C backend/utils/mb/conversion_procs $@
    $(MAKE) -C backend/snowball $@
    $(MAKE) -C interfaces $@
-   $(MAKE) -C backend/replication/walreceiver $@
+   $(MAKE) -C backend/replication/libpqwalreceiver $@
    $(MAKE) -C bin $@
    $(MAKE) -C pl $@
 
 
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.256 2010/01/15 09:19:00 heikki Exp $
+ *   $PostgreSQL: pgsql/src/backend/bootstrap/bootstrap.c,v 1.257 2010/01/20 09:16:23 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 
        case WalReceiverProcess:
            /* don't set signals, walreceiver has its own agenda */
-           {
-               PGFunction WalReceiverMain;
-
-               /*
-                * Walreceiver is not linked directly into the server
-                * binary because we would then need to link the server
-                * with libpq. It's compiled as a dynamically loaded module
-                * to avoid that.
-                */
-               WalReceiverMain = load_external_function("walreceiver",
-                                                        "WalReceiverMain",
-                                                        true, NULL);
-               WalReceiverMain(NULL);
-           }
+           WalReceiverMain();
            proc_exit(1);       /* should never return */
 
        default:
 
 #    Makefile for src/backend/replication
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.1 2010/01/15 09:19:03 heikki Exp $
+#    $PostgreSQL: pgsql/src/backend/replication/Makefile,v 1.2 2010/01/20 09:16:24 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
 top_builddir = ../../..
 include $(top_builddir)/src/Makefile.global
 
-OBJS = walsender.o walreceiverfuncs.o
+OBJS = walsender.o walreceiverfuncs.o walreceiver.o
 
 include $(top_srcdir)/src/backend/common.mk
 
-$PostgreSQL: pgsql/src/backend/replication/README,v 1.1 2010/01/15 09:19:03 heikki Exp $
+$PostgreSQL: pgsql/src/backend/replication/README,v 1.2 2010/01/20 09:16:24 heikki Exp $
+
+Walreceiver - libpqwalreceiver API
+----------------------------------
+
+The transport-specific part of walreceiver, responsible for connecting to
+the primary server and receiving WAL files, is loaded dynamically to avoid
+having to link the main server binary with libpq. The dynamically loaded
+module is in libpqwalreceiver subdirectory.
+
+The dynamically loaded module implements three functions:
+
+
+bool walrcv_connect(char *conninfo, XLogRecPtr startpoint)
+
+Establish connection to the primary, and starts streaming from 'startpoint'.
+Returns true on success.
+
+
+bool walrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+
+Retrieve any WAL record available through the connection, blocking for
+maximum of 'timeout' ms.
+
+
+void walrcv_disconnect(void);
+
+Disconnect.
+
+
+This API should be considered internal at the moment, but we could open it
+up for 3rd party replacements of libpqwalreceiver in the future, allowing
+pluggable methods for receiveing WAL.
 
 Walreceiver IPC
 ---------------
 
 #-------------------------------------------------------------------------
 #
 # Makefile--
-#    Makefile for src/backend/replication/walreceiver
+#    Makefile for src/backend/replication/libpqwalreceiver
 #
 # IDENTIFICATION
-#    $PostgreSQL: pgsql/src/backend/replication/walreceiver/Makefile,v 1.4 2010/01/15 21:06:26 tgl Exp $
+#    $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/Makefile,v 1.1 2010/01/20 09:16:24 heikki Exp $
 #
 #-------------------------------------------------------------------------
 
-subdir = src/backend/replication/walreceiver
+subdir = src/backend/postmaster/libpqwalreceiver
 top_builddir = ../../../..
 include $(top_builddir)/src/Makefile.global
 
-override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
-
-OBJS = walreceiver.o
+override CPPFLAGS := -I$(srcdir) -I$(libpq_srcdir) $(CPPFLAGS)
 
+OBJS = libpqwalreceiver.o
 SHLIB_LINK = $(libpq)
-
-NAME := walreceiver
+NAME = libpqwalreceiver
 
 all: submake-libpq all-shared-lib
 
 
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * libpqwalreceiver.c
+ *
+ * This file contains the libpq-specific parts of walreceiver. It's
+ * loaded as a dynamic module to avoid linking the main server binary with
+ * libpq.
+ *
+ * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
+ *
+ *
+ * IDENTIFICATION
+ *   $PostgreSQL: pgsql/src/backend/replication/libpqwalreceiver/libpqwalreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
+ *
+ *-------------------------------------------------------------------------
+ */
+#include "postgres.h"
+
+#include <unistd.h>
+#include <sys/time.h>
+
+#include "libpq-fe.h"
+#include "access/xlog.h"
+#include "miscadmin.h"
+#include "replication/walreceiver.h"
+#include "utils/builtins.h"
+
+#ifdef HAVE_POLL_H
+#include <poll.h>
+#endif
+#ifdef HAVE_SYS_POLL_H
+#include <sys/poll.h>
+#endif
+#ifdef HAVE_SYS_SELECT_H
+#include <sys/select.h>
+#endif
+
+PG_MODULE_MAGIC;
+
+void       _PG_init(void);
+
+/* Current connection to the primary, if any */
+static PGconn *streamConn = NULL;
+static bool justconnected = false;
+
+/* Buffer for currently read records */
+static char *recvBuf = NULL;
+
+/* Prototypes for interface functions */
+static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
+static bool libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer,
+             int *len);
+static void libpqrcv_disconnect(void);
+
+/* Prototypes for private functions */
+static bool libpq_select(int timeout_ms);
+
+/*
+ * Module load callback
+ */
+void
+_PG_init(void)
+{
+   /* Tell walreceiver how to reach us */
+   if (walrcv_connect != NULL || walrcv_receive != NULL || walrcv_disconnect)
+       elog(ERROR, "libpqwalreceiver already loaded");
+   walrcv_connect = libpqrcv_connect;
+   walrcv_receive = libpqrcv_receive;
+   walrcv_disconnect = libpqrcv_disconnect;
+}
+
+/*
+ * Establish the connection to the primary server for XLOG streaming
+ */
+static bool
+libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+{
+   char        conninfo_repl[MAXCONNINFO + 14];
+   char       *primary_sysid;
+   char        standby_sysid[32];
+   TimeLineID  primary_tli;
+   TimeLineID  standby_tli;
+   PGresult   *res;
+   char        cmd[64];
+
+   Assert(startpoint.xlogid != 0 || startpoint.xrecoff != 0);
+
+   /* Connect */
+   snprintf(conninfo_repl, sizeof(conninfo_repl), "%s replication=true", conninfo);
+
+   streamConn = PQconnectdb(conninfo_repl);
+   if (PQstatus(streamConn) != CONNECTION_OK)
+       ereport(ERROR,
+               (errmsg("could not connect to the primary server : %s",
+                       PQerrorMessage(streamConn))));
+
+   /*
+    * Get the system identifier and timeline ID as a DataRow message
+    * from the primary server.
+    */
+   res = PQexec(streamConn, "IDENTIFY_SYSTEM");
+   if (PQresultStatus(res) != PGRES_TUPLES_OK)
+    {
+       PQclear(res);
+       ereport(ERROR,
+               (errmsg("could not receive the SYSID and timeline ID from "
+                       "the primary server: %s",
+                       PQerrorMessage(streamConn))));
+    }
+   if (PQnfields(res) != 2 || PQntuples(res) != 1)
+   {
+       int ntuples = PQntuples(res);
+       int nfields = PQnfields(res);
+       PQclear(res);
+       ereport(ERROR,
+               (errmsg("invalid response from primary server"),
+                errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
+                          ntuples, nfields)));
+   }
+   primary_sysid = PQgetvalue(res, 0, 0);
+   primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+
+   /*
+    * Confirm that the system identifier of the primary is the same
+    * as ours.
+    */
+   snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
+            GetSystemIdentifier());
+   if (strcmp(primary_sysid, standby_sysid) != 0)
+   {
+       PQclear(res);
+       ereport(ERROR,
+               (errmsg("system differs between the primary and standby"),
+                errdetail("the primary SYSID is %s, standby SYSID is %s",
+                          primary_sysid, standby_sysid)));
+   }
+
+   /*
+    * Confirm that the current timeline of the primary is the same
+    * as the recovery target timeline.
+    */
+   standby_tli = GetRecoveryTargetTLI();
+   PQclear(res);
+   if (primary_tli != standby_tli)
+       ereport(ERROR,
+               (errmsg("timeline %u of the primary does not match recovery target timeline %u",
+                       primary_tli, standby_tli)));
+   ThisTimeLineID = primary_tli;
+
+   /* Start streaming from the point requested by startup process */
+   snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
+            startpoint.xlogid, startpoint.xrecoff);
+   res = PQexec(streamConn, cmd);
+   if (PQresultStatus(res) != PGRES_COPY_OUT)
+       ereport(ERROR,
+               (errmsg("could not start XLOG streaming: %s",
+                       PQerrorMessage(streamConn))));
+   PQclear(res);
+
+   justconnected = true;
+
+   return true;
+}
+
+/*
+ * Wait until we can read WAL stream, or timeout.
+ *
+ * Returns true if data has become available for reading, false if timed out
+ * or interrupted by signal.
+ *
+ * This is based on pqSocketCheck.
+ */
+static bool
+libpq_select(int timeout_ms)
+{
+   int ret;
+
+   Assert(streamConn != NULL);
+   if (PQsocket(streamConn) < 0)
+       ereport(ERROR,
+               (errcode_for_socket_access(),
+                errmsg("socket not open")));
+
+   /* We use poll(2) if available, otherwise select(2) */
+   {
+#ifdef HAVE_POLL
+       struct pollfd input_fd;
+
+       input_fd.fd = PQsocket(streamConn);
+       input_fd.events = POLLIN | POLLERR;
+       input_fd.revents = 0;
+
+       ret = poll(&input_fd, 1, timeout_ms);
+#else                          /* !HAVE_POLL */
+
+       fd_set      input_mask;
+       struct timeval timeout;
+       struct timeval *ptr_timeout;
+
+       FD_ZERO(&input_mask);
+       FD_SET(PQsocket(streamConn), &input_mask);
+
+       if (timeout_ms < 0)
+           ptr_timeout = NULL;
+       else
+       {
+           timeout.tv_sec  = timeout_ms / 1000;
+           timeout.tv_usec = (timeout_ms % 1000) * 1000;
+           ptr_timeout     = &timeout;
+       }
+
+       ret = select(PQsocket(streamConn) + 1, &input_mask,
+                    NULL, NULL, ptr_timeout);
+#endif   /* HAVE_POLL */
+   }
+
+   if (ret == 0 || (ret < 0 && errno == EINTR))
+       return false;
+   if (ret < 0)
+       ereport(ERROR,
+               (errcode_for_socket_access(),
+                errmsg("select() failed: %m")));
+   return true;
+}
+
+/*
+ * Disconnect connection to primary, if any.
+ */
+static void
+libpqrcv_disconnect(void)
+{
+   PQfinish(streamConn);
+   streamConn = NULL;
+   justconnected = false;
+}
+
+/*
+ * Receive any WAL records available from XLOG stream, blocking for
+ * maximum of 'timeout' ms.
+ *
+ * Returns:
+ *
+ *   True if data was received. *recptr, *buffer and *len are set to
+ *   the WAL location of the received data, buffer holding it, and length,
+ *   respectively.
+ *
+ *   False if no data was available within timeout, or wait was interrupted
+ *   by signal.
+ *
+ * The buffer returned is only valid until the next call of this function or
+ * libpq_connect/disconnect.
+ *
+ * ereports on error.
+ */
+static bool
+libpqrcv_receive(int timeout, XLogRecPtr *recptr, char **buffer, int *len)
+{
+   int         rawlen;
+
+   if (recvBuf != NULL)
+       PQfreemem(recvBuf);
+   recvBuf = NULL;
+
+   /*
+    * If the caller requested to block, wait for data to arrive. But if
+    * this is the first call after connecting, don't wait, because
+    * there might already be some data in libpq buffer that we haven't
+    * returned to caller.
+    */
+   if (timeout > 0 && !justconnected)
+   {
+       if (!libpq_select(timeout))
+           return false;
+
+       if (PQconsumeInput(streamConn) == 0)
+           ereport(ERROR,
+                   (errmsg("could not read xlog records: %s",
+                           PQerrorMessage(streamConn))));
+   }
+   justconnected = false;
+
+   /* Receive CopyData message */
+   rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
+   if (rawlen == 0)    /* no records available yet, then return */
+       return false;
+   if (rawlen == -1)   /* end-of-streaming or error */
+   {
+       PGresult    *res;
+
+       res = PQgetResult(streamConn);
+       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+           PQclear(res);
+           ereport(ERROR,
+                   (errmsg("replication terminated by primary server")));
+       }
+       PQclear(res);
+       ereport(ERROR,
+               (errmsg("could not read xlog records: %s",
+                       PQerrorMessage(streamConn))));
+   }
+   if (rawlen < -1)
+       ereport(ERROR,
+               (errmsg("could not read xlog records: %s",
+                       PQerrorMessage(streamConn))));
+
+   if (rawlen < sizeof(XLogRecPtr))
+       ereport(ERROR,
+               (errmsg("invalid WAL message received from primary")));
+
+   /* Return received WAL records to caller */
+   *recptr = *((XLogRecPtr *) recvBuf);
+   *buffer = recvBuf + sizeof(XLogRecPtr);
+   *len = rawlen - sizeof(XLogRecPtr);
+
+   return true;
+}
 
  * of the connection and a FATAL error are treated not as a crash but as
  * normal operation.
  *
- * Walreceiver is a postmaster child process like others, but it's compiled
- * as a dynamic module to avoid linking libpq with the main server binary.
+ * This file contains the server-facing parts of walreceiver. The libpq-
+ * specific parts are in the libpqwalreceiver module. It's loaded
+ * dynamically to avoid linking the server with libpq.
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/replication/walreceiver/walreceiver.c,v 1.2 2010/01/16 01:55:28 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/replication/walreceiver.c,v 1.1 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "postgres.h"
 
+#include <signal.h>
 #include <unistd.h>
-#include <sys/time.h>
 
 #include "access/xlog_internal.h"
-#include "libpq-fe.h"
 #include "libpq/pqsignal.h"
 #include "miscadmin.h"
 #include "replication/walreceiver.h"
 #include "utils/ps_status.h"
 #include "utils/resowner.h"
 
-#ifdef HAVE_POLL_H
-#include <poll.h>
-#endif
-#ifdef HAVE_SYS_POLL_H
-#include <sys/poll.h>
-#endif
-#ifdef HAVE_SYS_SELECT_H
-#include <sys/select.h>
-#endif
-
-PG_MODULE_MAGIC;
-
-PG_FUNCTION_INFO_V1(WalReceiverMain);
-Datum WalReceiverMain(PG_FUNCTION_ARGS);
-
-/* streamConn is a PGconn object of a connection to walsender from walreceiver */
-static PGconn *streamConn = NULL;
+/* libpqreceiver hooks to these when loaded */
+walrcv_connect_type walrcv_connect = NULL;
+walrcv_receive_type walrcv_receive = NULL;
+walrcv_disconnect_type walrcv_disconnect = NULL;
 
 #define NAPTIME_PER_CYCLE 100  /* max sleep time between cycles (100ms) */
 
 static uint32 recvSeg = 0;
 static uint32 recvOff = 0;
 
-/* Buffer for currently read records */
-static char *recvBuf = NULL;
-
-/* Flags set by interrupt handlers of walreceiver for later service in the main loop */
+/*
+ * Flags set by interrupt handlers of walreceiver for later service in the
+ * main loop.
+ */
 static volatile sig_atomic_t got_SIGHUP = false;
 static volatile sig_atomic_t got_SIGTERM = false;
 
 static void ProcessWalRcvInterrupts(void);
-static void EnableImmediateExit(void);
-static void DisableImmediateExit(void);
+static void EnableWalRcvImmediateExit(void);
+static void DisableWalRcvImmediateExit(void);
 
 /*
  * About SIGTERM handling:
 }
 
 static void
-EnableImmediateExit()
+EnableWalRcvImmediateExit()
 {
    WalRcvImmediateInterruptOK = true;
    ProcessWalRcvInterrupts();
 }
 
 static void
-DisableImmediateExit()
+DisableWalRcvImmediateExit()
 {
    WalRcvImmediateInterruptOK = false;
    ProcessWalRcvInterrupts();
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalRcvLoop(void);
 static void InitWalRcv(void);
-static void WalRcvConnect(void);
-static bool WalRcvWait(int timeout_ms);
 static void WalRcvKill(int code, Datum arg);
-static void XLogRecv(void);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
 static void XLogWalRcvFlush(void);
 
 } LogstreamResult;
 
 /* Main entry point for walreceiver process */
-Datum
-WalReceiverMain(PG_FUNCTION_ARGS)
+void
+WalReceiverMain(void)
 {
    sigjmp_buf  local_sigjmp_buf;
    MemoryContext walrcv_context;
+   char conninfo[MAXCONNINFO];
+   XLogRecPtr startpoint;
+   /* use volatile pointer to prevent code rearrangement */
+   volatile WalRcvData *walrcv = WalRcv;
+
+   /* Load the libpq-specific functions */
+   load_file("libpqwalreceiver", false);
+   if (walrcv_connect == NULL || walrcv_receive == NULL ||
+       walrcv_disconnect == NULL)
+       elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
    /* Mark walreceiver in progress */
    InitWalRcv();
        error_context_stack = NULL;
 
        /* Reset WalRcvImmediateInterruptOK */
-       DisableImmediateExit();
+       DisableWalRcvImmediateExit();
 
        /* Prevent interrupts while cleaning up */
        HOLD_INTERRUPTS();
        /* Report the error to the server log */
        EmitErrorReport();
 
-       /* Free the data structure related to a connection */
-       PQfinish(streamConn);
-       streamConn = NULL;
-       if (recvBuf != NULL)
-           PQfreemem(recvBuf);
-       recvBuf = NULL;
+       /* Disconnect any previous connection. */
+       EnableWalRcvImmediateExit();
+       walrcv_disconnect();
+       DisableWalRcvImmediateExit();
 
        /*
         * Now return to normal top-level context and clear ErrorContext for
    /* Unblock signals (they were blocked when the postmaster forked us) */
    PG_SETMASK(&UnBlockSig);
 
-   /* Establish the connection to the primary for XLOG streaming */
-   WalRcvConnect();
-
-   /* Main loop of walreceiver */
-   WalRcvLoop();
+   /* Fetch connection information from shared memory */
+   SpinLockAcquire(&walrcv->mutex);
+   strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
+   startpoint = walrcv->receivedUpto;
+   SpinLockRelease(&walrcv->mutex);
 
-   PG_RETURN_VOID(); /* WalRcvLoop() never returns, but keep compiler quiet */
-}
+   /* Establish the connection to the primary for XLOG streaming */
+   EnableWalRcvImmediateExit();
+   walrcv_connect(conninfo, startpoint);
+   DisableWalRcvImmediateExit();
 
-/* Main loop of walreceiver process */
-static void
-WalRcvLoop(void)
-{
    /* Loop until end-of-streaming or error */
    for (;;)
    {
+       XLogRecPtr recptr;
+       char   *buf;
+       int     len;
+
        /*
         * Emergency bailout if postmaster has died.  This is to avoid the
         * necessity for manual cleanup of all postmaster children.
        }
 
        /* Wait a while for data to arrive */
-       if (WalRcvWait(NAPTIME_PER_CYCLE))
+       if (walrcv_receive(NAPTIME_PER_CYCLE, &recptr, &buf, &len))
        {
-           /* data has arrived. Process it */
-           if (PQconsumeInput(streamConn) == 0)
-               ereport(ERROR,
-                       (errmsg("could not read xlog records: %s",
-                               PQerrorMessage(streamConn))));
-           XLogRecv();
+           /* Write received WAL records to disk */
+           XLogWalRcvWrite(buf, len, recptr);
+
+           /* Receive any more WAL records we can without sleeping */
+           while(walrcv_receive(0, &recptr, &buf, &len))
+               XLogWalRcvWrite(buf, len, recptr);
+
+           /*
+            * Now that we've written some records, flush them to disk and
+            * let the startup process know about them.
+            */
+           XLogWalRcvFlush();
        }
    }
 }
    on_shmem_exit(WalRcvKill, 0);
 }
 
-/*
- * Establish the connection to the primary server for XLOG streaming
- */
-static void
-WalRcvConnect(void)
-{
-   char        conninfo[MAXCONNINFO + 14];
-   char       *primary_sysid;
-   char        standby_sysid[32];
-   TimeLineID  primary_tli;
-   TimeLineID  standby_tli;
-   PGresult   *res;
-   XLogRecPtr  recptr;
-   char        cmd[64];
-   /* use volatile pointer to prevent code rearrangement */
-   volatile WalRcvData *walrcv = WalRcv;
-
-   /*
-    * Set up a connection for XLOG streaming
-    */
-   SpinLockAcquire(&walrcv->mutex);
-   snprintf(conninfo, sizeof(conninfo), "%s replication=true", walrcv->conninfo);
-   recptr = walrcv->receivedUpto;
-   SpinLockRelease(&walrcv->mutex);
-
-   /* initialize local XLOG pointers */
-   LogstreamResult.Write = LogstreamResult.Flush = recptr;
-
-   Assert(recptr.xlogid != 0 || recptr.xrecoff != 0);
-
-   EnableImmediateExit();
-   streamConn = PQconnectdb(conninfo);
-   DisableImmediateExit();
-   if (PQstatus(streamConn) != CONNECTION_OK)
-       ereport(ERROR,
-               (errmsg("could not connect to the primary server : %s",
-                       PQerrorMessage(streamConn))));
-
-   /*
-    * Get the system identifier and timeline ID as a DataRow message
-    * from the primary server.
-    */
-   EnableImmediateExit();
-   res = PQexec(streamConn, "IDENTIFY_SYSTEM");
-   DisableImmediateExit();
-   if (PQresultStatus(res) != PGRES_TUPLES_OK)
-    {
-       PQclear(res);
-       ereport(ERROR,
-               (errmsg("could not receive the SYSID and timeline ID from "
-                       "the primary server: %s",
-                       PQerrorMessage(streamConn))));
-    }
-   if (PQnfields(res) != 2 || PQntuples(res) != 1)
-   {
-       int ntuples = PQntuples(res);
-       int nfields = PQnfields(res);
-       PQclear(res);
-       ereport(ERROR,
-               (errmsg("invalid response from primary server"),
-                errdetail("expected 1 tuple with 2 fields, got %d tuples with %d fields",
-                          ntuples, nfields)));
-   }
-   primary_sysid = PQgetvalue(res, 0, 0);
-   primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
-
-   /*
-    * Confirm that the system identifier of the primary is the same
-    * as ours.
-    */
-   snprintf(standby_sysid, sizeof(standby_sysid), UINT64_FORMAT,
-            GetSystemIdentifier());
-   if (strcmp(primary_sysid, standby_sysid) != 0)
-   {
-       PQclear(res);
-       ereport(ERROR,
-               (errmsg("system differs between the primary and standby"),
-                errdetail("the primary SYSID is %s, standby SYSID is %s",
-                          primary_sysid, standby_sysid)));
-   }
-
-   /*
-    * Confirm that the current timeline of the primary is the same
-    * as the recovery target timeline.
-    */
-   standby_tli = GetRecoveryTargetTLI();
-   PQclear(res);
-   if (primary_tli != standby_tli)
-       ereport(ERROR,
-               (errmsg("timeline %u of the primary does not match recovery target timeline %u",
-                       primary_tli, standby_tli)));
-   ThisTimeLineID = primary_tli;
-
-   /* Start streaming from the point requested by startup process */
-   snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X", recptr.xlogid, recptr.xrecoff);
-   EnableImmediateExit();
-   res = PQexec(streamConn, cmd);
-   DisableImmediateExit();
-   if (PQresultStatus(res) != PGRES_COPY_OUT)
-       ereport(ERROR,
-               (errmsg("could not start XLOG streaming: %s",
-                       PQerrorMessage(streamConn))));
-   PQclear(res);
-
-   /*
-    * Process the outstanding messages before beginning to wait for
-    * new message to arrive.
-    */
-   XLogRecv();
-}
-
-/*
- * Wait until we can read WAL stream, or timeout.
- *
- * Returns true if data has become available for reading, false if timed out
- * or interrupted by signal.
- *
- * This is based on pqSocketCheck.
- */
-static bool
-WalRcvWait(int timeout_ms)
-{
-   int ret;
-
-   Assert(streamConn != NULL);
-   if (PQsocket(streamConn) < 0)
-       ereport(ERROR,
-               (errcode_for_socket_access(),
-                errmsg("socket not open")));
-
-   /* We use poll(2) if available, otherwise select(2) */
-   {
-#ifdef HAVE_POLL
-       struct pollfd input_fd;
-
-       input_fd.fd = PQsocket(streamConn);
-       input_fd.events = POLLIN | POLLERR;
-       input_fd.revents = 0;
-
-       ret = poll(&input_fd, 1, timeout_ms);
-#else                          /* !HAVE_POLL */
-
-       fd_set      input_mask;
-       struct timeval timeout;
-       struct timeval *ptr_timeout;
-
-       FD_ZERO(&input_mask);
-       FD_SET(PQsocket(streamConn), &input_mask);
-
-       if (timeout_ms < 0)
-           ptr_timeout = NULL;
-       else
-       {
-           timeout.tv_sec  = timeout_ms / 1000;
-           timeout.tv_usec = (timeout_ms % 1000) * 1000;
-           ptr_timeout     = &timeout;
-       }
-
-       ret = select(PQsocket(streamConn) + 1, &input_mask,
-                    NULL, NULL, ptr_timeout);
-#endif   /* HAVE_POLL */
-   }
-
-   if (ret == 0 || (ret < 0 && errno == EINTR))
-       return false;
-   if (ret < 0)
-       ereport(ERROR,
-               (errcode_for_socket_access(),
-                errmsg("select() failed: %m")));
-   return true;
-}
-
 /*
  * Clear our pid from shared memory at exit.
  */
    walrcv->pid = 0;
    SpinLockRelease(&walrcv->mutex);
 
-   PQfinish(streamConn);
+   walrcv_disconnect();
 
    /* If requested to stop, tell postmaster to not restart us. */
    if (stopped)
    exit(2);
 }
 
-/*
- * Receive any WAL records available without blocking from XLOG stream and
- * write it to the disk.
- */
-static void
-XLogRecv(void)
-{
-   XLogRecPtr *recptr;
-   int         len;
-
-   for (;;)
-   {
-       /* Receive CopyData message */
-       len = PQgetCopyData(streamConn, &recvBuf, 1);
-       if (len == 0)   /* no records available yet, then return */
-           break;
-       if (len == -1)  /* end-of-streaming or error */
-       {
-           PGresult    *res;
-
-           res = PQgetResult(streamConn);
-           if (PQresultStatus(res) == PGRES_COMMAND_OK)
-           {
-               PQclear(res);
-               ereport(ERROR,
-                       (errmsg("replication terminated by primary server")));
-           }
-           PQclear(res);
-           ereport(ERROR,
-                   (errmsg("could not read xlog records: %s",
-                           PQerrorMessage(streamConn))));
-       }
-       if (len < -1)
-           ereport(ERROR,
-                   (errmsg("could not read xlog records: %s",
-                           PQerrorMessage(streamConn))));
-
-       if (len < sizeof(XLogRecPtr))
-           ereport(ERROR,
-                   (errmsg("invalid WAL message received from primary")));
-
-       /* Write received WAL records to disk */
-       recptr = (XLogRecPtr *) recvBuf;
-       XLogWalRcvWrite(recvBuf + sizeof(XLogRecPtr),
-                       len - sizeof(XLogRecPtr), *recptr);
-
-       if (recvBuf != NULL)
-           PQfreemem(recvBuf);
-       recvBuf = NULL;
-   }
-
-   /*
-    * Now that we've written some records, flush them to disk and let the
-    * startup process know about them.
-    */
-   XLogWalRcvFlush();
-}
-
 /*
  * Write XLOG data to disk.
  */
 
  *
  * This file contains functions used by the startup process to communicate
  * with the walreceiver process. Functions implementing walreceiver itself
- * are in src/backend/replication/walreceiver subdirectory.
+ * are in walreceiver.c.
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.1 2010/01/15 09:19:03 heikki Exp $
+ *   $PostgreSQL: pgsql/src/backend/replication/walreceiverfuncs.c,v 1.2 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 
  *
  * Portions Copyright (c) 2010-2010, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.2 2010/01/16 00:04:41 tgl Exp $
+ * $PostgreSQL: pgsql/src/include/replication/walreceiver.h,v 1.3 2010/01/20 09:16:24 heikki Exp $
  *
  *-------------------------------------------------------------------------
  */
 #ifndef _WALRECEIVER_H
 #define _WALRECEIVER_H
 
+#include "access/xlogdefs.h"
 #include "storage/spin.h"
 
 /*
 
 extern PGDLLIMPORT WalRcvData *WalRcv;
 
+/* libpqwalreceiver hooks */
+typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
+
+typedef bool (*walrcv_receive_type) (int timeout, XLogRecPtr *recptr, char **buffer, int *len);
+extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
+
+typedef void (*walrcv_disconnect_type) (void);
+extern PGDLLIMPORT walrcv_disconnect_type walrcv_disconnect;
+
+extern void WalReceiverMain(void);
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
 extern bool WalRcvInProgress(void);