Allow a streaming replication standby to follow a timeline switch.
authorHeikki Linnakangas <[email protected]>
Thu, 13 Dec 2012 17:00:00 +0000 (19:00 +0200)
committerHeikki Linnakangas <[email protected]>
Thu, 13 Dec 2012 17:17:32 +0000 (19:17 +0200)
Before this patch, streaming replication would refuse to start replicating
if the timeline in the primary doesn't exactly match the standby. The
situation where it doesn't match is when you have a master, and two
standbys, and you promote one of the standbys to become new master.
Promoting bumps up the timeline ID, and after that bump, the other standby
would refuse to continue.

There's significantly more timeline related logic in streaming replication
now. First of all, when a standby connects to primary, it will ask the
primary for any timeline history files that are missing from the standby.
The missing files are sent using a new replication command TIMELINE_HISTORY,
and stored in standby's pg_xlog directory. Using the timeline history files,
the standby can follow the latest timeline present in the primary
(recovery_target_timeline='latest'), just as it can follow new timelines
appearing in an archive directory.

START_REPLICATION now takes a TIMELINE parameter, to specify exactly which
timeline to stream WAL from. This allows the standby to request the primary
to send over WAL that precedes the promotion. The replication protocol is
changed slightly (in a backwards-compatible way although there's little hope
of streaming replication working across major versions anyway), to allow
replication to stop when the end of timeline reached, putting the walsender
back into accepting a replication command.

Many thanks to Amit Kapila for testing and reviewing various versions of
this patch.

23 files changed:
doc/src/sgml/high-availability.sgml
doc/src/sgml/protocol.sgml
src/backend/access/transam/timeline.c
src/backend/access/transam/xlog.c
src/backend/access/transam/xlogfuncs.c
src/backend/postmaster/postmaster.c
src/backend/postmaster/startup.c
src/backend/replication/basebackup.c
src/backend/replication/libpqwalreceiver/libpqwalreceiver.c
src/backend/replication/repl_gram.y
src/backend/replication/repl_scanner.l
src/backend/replication/walreceiver.c
src/backend/replication/walreceiverfuncs.c
src/backend/replication/walsender.c
src/include/access/timeline.h
src/include/access/xlog.h
src/include/nodes/nodes.h
src/include/nodes/replnodes.h
src/include/replication/walreceiver.h
src/include/replication/walsender.h
src/include/replication/walsender_private.h
src/interfaces/libpq/fe-exec.c
src/interfaces/libpq/fe-protocol3.c

index 62f72b40ae0e4bf906a7052af756075a27f424f5..e8342858c9d11bce470107591e2cb48267021cac 100644 (file)
@@ -912,10 +912,9 @@ primary_conninfo = 'host=192.168.1.50 port=5432 user=foo password=foopass'
    </para>
 
    <para>
-    Promoting a cascading standby terminates the immediate downstream replication
-    connections which it serves. This is because the timeline becomes different
-    between standbys, and they can no longer continue replication.  The
-    affected standby(s) may reconnect to reestablish streaming replication.
+    If an upstream standby server is promoted to become new master, downstream
+    servers will continue to stream from the new master if
+    <varname>recovery_target_timeline</> is set to <literal>'latest'</>.
    </para>
 
    <para>
index f87020c9099b92e697099a2db2d26aeab38fa92c..e14627c201ebc5ac63271afd3d79ddf900a60451 100644 (file)
    </para>
 
    <para>
-    There is another Copy-related mode called Copy-both, which allows
+    There is another Copy-related mode called copy-both, which allows
     high-speed bulk data transfer to <emphasis>and</> from the server.
     Copy-both mode is initiated when a backend in walsender mode
     executes a <command>START_REPLICATION</command> statement.  The
     backend sends a CopyBothResponse message to the frontend.  Both
     the backend and the frontend may then send CopyData messages
-    until the connection is terminated.  See <xref
-    linkend="protocol-replication">.
+    until either end sends a CopyDone message. After the client
+    sends a CopyDone message, the connection goes from copy-both mode to
+    copy-out mode, and the client may not send any more CopyData messages.
+    Similarly, when the server sends a CopyDone message, the connection
+    goes into copy-in mode, and the server may not send any more CopyData
+    messages. After both sides have sent a CopyDone message, the copy mode
+    is terminated, and the backend reverts to the command-processing mode.
+    See <xref linkend="protocol-replication"> for more information on the
+    subprotocol transmitted over copy-both mode.
    </para>
 
    <para>
@@ -1350,19 +1357,69 @@ The commands accepted in walsender mode are:
   </varlistentry>
 
   <varlistentry>
-    <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term>
+    <term>TIMELINE_HISTORY <replaceable class="parameter">tli</replaceable></term>
+    <listitem>
+     <para>
+      Requests the server to send over the timeline history file for timeline
+      <replaceable class="parameter">tli</replaceable>.  Server replies with a
+      result set of a single row, containing two fields:
+     </para>
+
+     <para>
+      <variablelist>
+      <varlistentry>
+      <term>
+       filename
+      </term>
+      <listitem>
+      <para>
+       Filename of the timeline history file, e.g 00000002.history.
+      </para>
+      </listitem>
+      </varlistentry>
+
+      <varlistentry>
+      <term>
+       content
+      </term>
+      <listitem>
+      <para>
+       Contents of the timeline history file.
+      </para>
+      </listitem>
+      </varlistentry>
+
+      </variablelist>
+     </para>
+    </listitem>
+  </varlistentry>
+
+  <varlistentry>
+    <term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
     <listitem>
      <para>
       Instructs server to start streaming WAL, starting at
-      WAL position <replaceable>XXX</>/<replaceable>XXX</>.
+      WAL position <replaceable class="parameter">XXX/XXX</> on timeline
+      <replaceable class="parameter">tli</>.
       The server can reply with an error, e.g. if the requested section of WAL
       has already been recycled. On success, server responds with a
       CopyBothResponse message, and then starts to stream WAL to the frontend.
-      WAL will continue to be streamed until the connection is broken;
-      no further commands will be accepted. If the WAL sender process is
-      terminated normally (during postmaster shutdown), it will send a
-      CommandComplete message before exiting. This might not happen during an
-      abnormal shutdown, of course.
+     </para>
+
+     <para>
+      If the client requests a timeline that's not the latest, but is part of
+      the history of the server, the server will stream all the WAL on that
+      timeline starting from the requested startpoint, up to the point where
+      the server switched to another timeline. If the client requests
+      streaming at exactly the end of an old timeline, the server responds
+      immediately with CommandComplete without entering COPY mode.
+     </para>
+
+     <para>
+      After streaming all the WAL on a timeline that is not the latest one,
+      the server will end streaming by exiting the COPY mode. When the client
+      acknowledges this by also exiting COPY mode, the server responds with a
+      CommandComplete message, and is ready to accept a new command.
      </para>
 
      <para>
index 0681944ae5e1905daba9adaee486fce6367517c0..b33d230c7011857b03129cefddd4e2443451cbde 100644 (file)
@@ -410,6 +410,89 @@ writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
        XLogArchiveNotify(histfname);
 }
 
+/*
+ * Writes a history file for given timeline and contents.
+ *
+ * Currently this is only used in the walreceiver process, and so there are
+ * no locking considerations.  But we should be just as tense as XLogFileInit
+ * to avoid emplacing a bogus file.
+ */
+void
+writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
+{
+       char            path[MAXPGPATH];
+       char            tmppath[MAXPGPATH];
+       int                     fd;
+
+       /*
+        * Write into a temp file name.
+        */
+       snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
+
+       unlink(tmppath);
+
+       /* do not use get_sync_bit() here --- want to fsync only at end of fill */
+       fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
+                                                  S_IRUSR | S_IWUSR);
+       if (fd < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not create file \"%s\": %m", tmppath)));
+
+       errno = 0;
+       if ((int) write(fd, content, size) != size)
+       {
+               int                     save_errno = errno;
+
+               /*
+                * If we fail to make the file, delete it to release disk space
+                */
+               unlink(tmppath);
+               /* if write didn't set errno, assume problem is no disk space */
+               errno = save_errno ? save_errno : ENOSPC;
+
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not write to file \"%s\": %m", tmppath)));
+       }
+
+       if (pg_fsync(fd) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not fsync file \"%s\": %m", tmppath)));
+
+       if (CloseTransientFile(fd))
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not close file \"%s\": %m", tmppath)));
+
+
+       /*
+        * Now move the completed history file into place with its final name.
+        */
+       TLHistoryFilePath(path, tli);
+
+       /*
+        * Prefer link() to rename() here just to be really sure that we don't
+        * overwrite an existing logfile.  However, there shouldn't be one, so
+        * rename() is an acceptable substitute except for the truly paranoid.
+        */
+#if HAVE_WORKING_LINK
+       if (link(tmppath, path) < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not link file \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+       unlink(tmppath);
+#else
+       if (rename(tmppath, path) < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not rename file \"%s\" to \"%s\": %m",
+                                               tmppath, path)));
+#endif
+}
+
 /*
  * Returns true if 'expectedTLEs' contains a timeline with id 'tli'
  */
index 2d4b62aa84055ec0df2c3c38fd10ef182ba87d4e..2deb7e5d89bf1177b2d32c16ff6550aae6928778 100644 (file)
@@ -153,6 +153,7 @@ static XLogRecPtr LastRec;
 
 /* Local copy of WalRcv->receivedUpto */
 static XLogRecPtr receivedUpto = 0;
+static TimeLineID receiveTLI = 0;
 
 /*
  * During recovery, lastFullPageWrites keeps track of full_page_writes that
@@ -6366,6 +6367,12 @@ StartupXLOG(void)
                xlogctl->SharedRecoveryInProgress = false;
                SpinLockRelease(&xlogctl->info_lck);
        }
+
+       /*
+        * If there were cascading standby servers connected to us, nudge any
+        * wal sender processes to notice that we've been promoted.
+        */
+       WalSndWakeup();
 }
 
 /*
@@ -7626,7 +7633,7 @@ CreateRestartPoint(int flags)
                XLogRecPtr      endptr;
 
                /* Get the current (or recent) end of xlog */
-               endptr = GetStandbyFlushRecPtr(NULL);
+               endptr = GetStandbyFlushRecPtr();
 
                KeepLogSeg(endptr, &_logSegNo);
                _logSegNo--;
@@ -9087,13 +9094,10 @@ do_pg_abort_backup(void)
 /*
  * Get latest redo apply position.
  *
- * Optionally, returns the current recovery target timeline. Callers not
- * interested in that may pass NULL for targetTLI.
- *
  * Exported to allow WALReceiver to read the pointer directly.
  */
 XLogRecPtr
-GetXLogReplayRecPtr(TimeLineID *targetTLI)
+GetXLogReplayRecPtr(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile XLogCtlData *xlogctl = XLogCtl;
@@ -9101,8 +9105,6 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI)
 
        SpinLockAcquire(&xlogctl->info_lck);
        recptr = xlogctl->lastReplayedEndRecPtr;
-       if (targetTLI)
-               *targetTLI = xlogctl->RecoveryTargetTLI;
        SpinLockRelease(&xlogctl->info_lck);
 
        return recptr;
@@ -9111,18 +9113,15 @@ GetXLogReplayRecPtr(TimeLineID *targetTLI)
 /*
  * Get current standby flush position, ie, the last WAL position
  * known to be fsync'd to disk in standby.
- *
- * If 'targetTLI' is not NULL, it's set to the current recovery target
- * timeline.
  */
 XLogRecPtr
-GetStandbyFlushRecPtr(TimeLineID *targetTLI)
+GetStandbyFlushRecPtr(void)
 {
        XLogRecPtr      receivePtr;
        XLogRecPtr      replayPtr;
 
-       receivePtr = GetWalRcvWriteRecPtr(NULL);
-       replayPtr = GetXLogReplayRecPtr(targetTLI);
+       receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+       replayPtr = GetXLogReplayRecPtr();
 
        if (XLByteLT(receivePtr, replayPtr))
                return replayPtr;
@@ -9611,7 +9610,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                         * archive and pg_xlog before failover.
                                         */
                                        if (CheckForStandbyTrigger())
+                                       {
+                                               ShutdownWalRcv();
                                                return false;
+                                       }
 
                                        /*
                                         * If primary_conninfo is set, launch walreceiver to try to
@@ -9626,8 +9628,14 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                        if (PrimaryConnInfo)
                                        {
                                                XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
-
-                                               RequestXLogStreaming(ptr, PrimaryConnInfo);
+                                               TimeLineID tli = tliOfPointInHistory(ptr, expectedTLEs);
+
+                                               if (curFileTLI > 0 && tli < curFileTLI)
+                                                       elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
+                                                                (uint32) (ptr >> 32), (uint32) ptr,
+                                                                tli, curFileTLI);
+                                               curFileTLI = tli;
+                                               RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
                                        }
                                        /*
                                         * Move to XLOG_FROM_STREAM state in either case. We'll get
@@ -9653,10 +9661,10 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                         */
                                        /*
                                         * Before we leave XLOG_FROM_STREAM state, make sure that
-                                        * walreceiver is not running, so that it won't overwrite
-                                        * any WAL that we restore from archive.
+                                        * walreceiver is not active, so that it won't overwrite
+                                        * WAL that we restore from archive.
                                         */
-                                       if (WalRcvInProgress())
+                                       if (WalRcvStreaming())
                                                ShutdownWalRcv();
 
                                        /*
@@ -9749,7 +9757,7 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                /*
                                 * Check if WAL receiver is still active.
                                 */
-                               if (!WalRcvInProgress())
+                               if (!WalRcvStreaming())
                                {
                                        lastSourceFailed = true;
                                        break;
@@ -9772,8 +9780,8 @@ WaitForWALToBecomeAvailable(XLogRecPtr RecPtr, bool randAccess,
                                {
                                        XLogRecPtr      latestChunkStart;
 
-                                       receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
-                                       if (XLByteLT(RecPtr, receivedUpto))
+                                       receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
+                                       if (XLByteLT(RecPtr, receivedUpto) && receiveTLI == curFileTLI)
                                        {
                                                havedata = true;
                                                if (!XLByteLT(RecPtr, latestChunkStart))
@@ -9888,8 +9896,7 @@ emode_for_corrupt_record(int emode, XLogRecPtr RecPtr)
 
 /*
  * Check to see whether the user-specified trigger file exists and whether a
- * promote request has arrived.  If either condition holds, request postmaster
- * to shut down walreceiver, wait for it to exit, and return true.
+ * promote request has arrived.  If either condition holds, return true.
  */
 static bool
 CheckForStandbyTrigger(void)
@@ -9904,7 +9911,6 @@ CheckForStandbyTrigger(void)
        {
                ereport(LOG,
                                (errmsg("received promote request")));
-               ShutdownWalRcv();
                ResetPromoteTriggered();
                triggered = true;
                return true;
@@ -9917,7 +9923,6 @@ CheckForStandbyTrigger(void)
        {
                ereport(LOG,
                                (errmsg("trigger file found: %s", TriggerFile)));
-               ShutdownWalRcv();
                unlink(TriggerFile);
                triggered = true;
                return true;
index 40c0bd67b57a1b11fa7d47e5224a41f3a75abef9..e91bdc3f4af93b6b56367a0d2010651f49d95c4c 100644 (file)
@@ -226,7 +226,7 @@ pg_last_xlog_receive_location(PG_FUNCTION_ARGS)
        XLogRecPtr      recptr;
        char            location[MAXFNAMELEN];
 
-       recptr = GetWalRcvWriteRecPtr(NULL);
+       recptr = GetWalRcvWriteRecPtr(NULL, NULL);
 
        if (recptr == 0)
                PG_RETURN_NULL();
@@ -248,7 +248,7 @@ pg_last_xlog_replay_location(PG_FUNCTION_ARGS)
        XLogRecPtr      recptr;
        char            location[MAXFNAMELEN];
 
-       recptr = GetXLogReplayRecPtr(NULL);
+       recptr = GetXLogReplayRecPtr();
 
        if (recptr == 0)
                PG_RETURN_NULL();
index a492c60b46a925a3c2cd980689623541fe3c1e8c..8f39aec1808e7c8b59feea4a503c3f54fa0deec3 100644 (file)
@@ -2563,27 +2563,6 @@ reaper(SIGNAL_ARGS)
                        ReachedNormalRunning = true;
                        pmState = PM_RUN;
 
-                       /*
-                        * Kill any walsenders to force the downstream standby(s) to
-                        * reread the timeline history file, adjust their timelines and
-                        * establish replication connections again. This is required
-                        * because the timeline of cascading standby is not consistent
-                        * with that of cascaded one just after failover. We LOG this
-                        * message since we need to leave a record to explain this
-                        * disconnection.
-                        *
-                        * XXX should avoid the need for disconnection. When we do,
-                        * am_cascading_walsender should be replaced with
-                        * RecoveryInProgress()
-                        */
-                       if (max_wal_senders > 0 && CountChildren(BACKEND_TYPE_WALSND) > 0)
-                       {
-                               ereport(LOG,
-                                               (errmsg("terminating all walsender processes to force cascaded "
-                                                       "standby(s) to update timeline and reconnect")));
-                               SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
-                       }
-
                        /*
                         * Crank up the background tasks, if we didn't do that already
                         * when we entered consistent recovery state.  It doesn't matter
index ab4d1645f246aa9c56e14bf0374e69dfb4a950c3..70b75c7292730c3f559d87c6042a680cd601b518 100644 (file)
@@ -5,6 +5,8 @@
  * The Startup process initialises the server and performs any recovery
  * actions that have been specified. Notice that there is no "main loop"
  * since the Startup process ends as soon as initialisation is complete.
+ * (in standby mode, one can think of the replay loop as a main loop,
+ * though.)
  *
  *
  * Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
index 04681f4196299ce74739cc4cccf5ca305cffae97..65200c129aafb62ca3926136fe9befba65f64fd7 100644 (file)
@@ -56,6 +56,9 @@ static void perform_base_backup(basebackup_options *opt, DIR *tblspcdir);
 static void parse_basebackup_options(List *options, basebackup_options *opt);
 static void SendXlogRecPtrResult(XLogRecPtr ptr);
 
+/* Was the backup currently in-progress initiated in recovery mode? */
+static bool backup_started_in_recovery = false;
+
 /*
  * Size of each block sent into the tar stream for larger files.
  *
@@ -94,6 +97,8 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
        XLogRecPtr      endptr;
        char       *labelfile;
 
+       backup_started_in_recovery = RecoveryInProgress();
+
        startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
        SendXlogRecPtrResult(startptr);
 
@@ -261,7 +266,7 @@ perform_base_backup(basebackup_options *opt, DIR *tblspcdir)
                                 * http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
                                 * 51.html
                                 */
-                               XLogRead(buf, ptr, TAR_SEND_SIZE);
+                               XLogRead(buf, ThisTimeLineID, ptr, TAR_SEND_SIZE);
                                if (pq_putmessage('d', buf, TAR_SEND_SIZE))
                                        ereport(ERROR,
                                                        (errmsg("base backup could not send data, aborting backup")));
@@ -592,11 +597,19 @@ sendDir(char *path, int basepathlen, bool sizeonly)
                /*
                 * Check if the postmaster has signaled us to exit, and abort with an
                 * error in that case. The error handler further up will call
-                * do_pg_abort_backup() for us.
+                * do_pg_abort_backup() for us. Also check that if the backup was
+                * started while still in recovery, the server wasn't promoted.
+                * dp_pg_stop_backup() will check that too, but it's better to stop
+                * the backup early than continue to the end and fail there.
                 */
-               if (ProcDiePending || walsender_ready_to_stop)
+               CHECK_FOR_INTERRUPTS();
+               if (RecoveryInProgress() != backup_started_in_recovery)
                        ereport(ERROR,
-                               (errmsg("shutdown requested, aborting active base backup")));
+                                       (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                        errmsg("the standby was promoted during online backup"),
+                                        errhint("This means that the backup being taken is corrupt "
+                                                        "and should not be used. "
+                                                        "Try taking another online backup.")));
 
                snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
 
index bfaebeae84286908ff2dd0df240943ec70e084d7..180d96b6a694222df1c2b86af7bdf4f89e90e3ca 100644 (file)
@@ -46,9 +46,12 @@ static PGconn *streamConn = NULL;
 static char *recvBuf = NULL;
 
 /* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
-static bool libpqrcv_receive(int timeout, unsigned char *type,
-                                char **buffer, int *len);
+static void libpqrcv_connect(char *conninfo);
+static void libpqrcv_identify_system(TimeLineID *primary_tli);
+static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static void libpqrcv_endstreaming(void);
+static int libpqrcv_receive(int timeout, char **buffer);
 static void libpqrcv_send(const char *buffer, int nbytes);
 static void libpqrcv_disconnect(void);
 
@@ -63,10 +66,17 @@ void
 _PG_init(void)
 {
        /* Tell walreceiver how to reach us */
-       if (walrcv_connect != NULL || walrcv_receive != NULL ||
-               walrcv_send != NULL || walrcv_disconnect != NULL)
+       if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
+               walrcv_readtimelinehistoryfile != NULL ||
+               walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
+               walrcv_receive != NULL || walrcv_send != NULL ||
+               walrcv_disconnect != NULL)
                elog(ERROR, "libpqwalreceiver already loaded");
        walrcv_connect = libpqrcv_connect;
+       walrcv_identify_system = libpqrcv_identify_system;
+       walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
+       walrcv_startstreaming = libpqrcv_startstreaming;
+       walrcv_endstreaming = libpqrcv_endstreaming;
        walrcv_receive = libpqrcv_receive;
        walrcv_send = libpqrcv_send;
        walrcv_disconnect = libpqrcv_disconnect;
@@ -75,16 +85,10 @@ _PG_init(void)
 /*
  * Establish the connection to the primary server for XLOG streaming
  */
-static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+static void
+libpqrcv_connect(char *conninfo)
 {
        char            conninfo_repl[MAXCONNINFO + 75];
-       char       *primary_sysid;
-       char            standby_sysid[32];
-       TimeLineID      primary_tli;
-       TimeLineID      standby_tli;
-       PGresult   *res;
-       char            cmd[64];
 
        /*
         * Connect using deliberately undocumented parameter: replication. The
@@ -100,6 +104,18 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
                ereport(ERROR,
                                (errmsg("could not connect to the primary server: %s",
                                                PQerrorMessage(streamConn))));
+}
+
+/*
+ * Check that primary's system identifier matches ours, and fetch the current
+ * timeline ID of the primary.
+ */
+static void
+libpqrcv_identify_system(TimeLineID *primary_tli)
+{
+       PGresult   *res;
+       char       *primary_sysid;
+       char            standby_sysid[32];
 
        /*
         * Get the system identifier and timeline ID as a DataRow message from the
@@ -126,7 +142,7 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
                                                   ntuples, nfields)));
        }
        primary_sysid = PQgetvalue(res, 0, 0);
-       primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+       *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
 
        /*
         * Confirm that the system identifier of the primary is the same as ours.
@@ -141,24 +157,37 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
                                 errdetail("The primary's identifier is %s, the standby's identifier is %s.",
                                                   primary_sysid, standby_sysid)));
        }
-
-       /*
-        * Confirm that the current timeline of the primary is the same as the
-        * recovery target timeline.
-        */
-       standby_tli = GetRecoveryTargetTLI();
        PQclear(res);
-       if (primary_tli != standby_tli)
-               ereport(ERROR,
-                               (errmsg("timeline %u of the primary does not match recovery target timeline %u",
-                                               primary_tli, standby_tli)));
-       ThisTimeLineID = primary_tli;
+}
+
+/*
+ * Start streaming WAL data from given startpoint and timeline.
+ *
+ * Returns true if we switched successfully to copy-both mode. False
+ * means the server received the command and executed it successfully, but
+ * didn't switch to copy-mode.  That means that there was no WAL on the
+ * requested timeline and starting point, because the server switched to
+ * another timeline at or before the requested starting point. On failure,
+ * throws an ERROR.
+ */
+static bool
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+{
+       char            cmd[64];
+       PGresult   *res;
 
        /* Start streaming from the point requested by startup process */
-       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
-                        (uint32) (startpoint >> 32), (uint32) startpoint);
+       snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
+                        (uint32) (startpoint >> 32), (uint32) startpoint,
+                        tli);
        res = libpqrcv_PQexec(cmd);
-       if (PQresultStatus(res) != PGRES_COPY_BOTH)
+
+       if (PQresultStatus(res) == PGRES_COMMAND_OK)
+       {
+               PQclear(res);
+               return false;
+       }
+       else if (PQresultStatus(res) != PGRES_COPY_BOTH)
        {
                PQclear(res);
                ereport(ERROR,
@@ -166,11 +195,81 @@ libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
                                                PQerrorMessage(streamConn))));
        }
        PQclear(res);
+       return true;
+}
+
+/*
+ * Stop streaming WAL data.
+ */
+static void
+libpqrcv_endstreaming(void)
+{
+       PGresult   *res;
+
+       if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
+               ereport(ERROR,
+                               (errmsg("could not send end-of-streaming message to primary: %s",
+                                               PQerrorMessage(streamConn))));
 
-       ereport(LOG,
-               (errmsg("streaming replication successfully connected to primary")));
+       /* Read the command result after COPY is finished */
 
-       return true;
+       while ((res = PQgetResult(streamConn)) != NULL)
+       {
+               if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       ereport(ERROR,
+                                       (errmsg("error reading result of streaming command: %s",
+                                                       PQerrorMessage(streamConn))));
+               /*
+                * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
+                * is also possible. However, at the moment this function is only
+                * called after receiving CopyDone from the backend - the walreceiver
+                * never terminates replication on its own initiative.
+                */
+
+               PQclear(res);
+       }
+}
+
+/*
+ * Fetch the timeline history file for 'tli' from primary.
+ */
+static void
+libpqrcv_readtimelinehistoryfile(TimeLineID tli,
+                                                                char **filename, char **content, int *len)
+{
+       PGresult   *res;
+       char            cmd[64];
+
+       /*
+        * Request the primary to send over the history file for given timeline.
+        */
+       snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
+       res = libpqrcv_PQexec(cmd);
+       if (PQresultStatus(res) != PGRES_TUPLES_OK)
+       {
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("could not receive timeline history file from "
+                                               "the primary server: %s",
+                                               PQerrorMessage(streamConn))));
+       }
+       if (PQnfields(res) != 2 || PQntuples(res) != 1)
+       {
+               int                     ntuples = PQntuples(res);
+               int                     nfields = PQnfields(res);
+
+               PQclear(res);
+               ereport(ERROR,
+                               (errmsg("invalid response from primary server"),
+                                errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
+                                                  ntuples, nfields)));
+       }
+       *filename = pstrdup(PQgetvalue(res, 0, 0));
+
+       *len = PQgetlength(res, 0, 1);
+       *content = palloc(*len);
+       memcpy(*content, PQgetvalue(res, 0, 1), *len);
+       PQclear(res);
 }
 
 /*
@@ -327,20 +426,19 @@ libpqrcv_disconnect(void)
  *
  * Returns:
  *
- *      True if data was received. *type, *buffer and *len are set to
- *      the type of the received data, buffer holding it, and length,
- *      respectively.
+ *      If data was received, returns the length of the data. *buffer is set to
+ *      point to a buffer holding the received message. The buffer is only valid
+ *      until the next libpqrcv_* call.
  *
- *      False if no data was available within timeout, or wait was interrupted
+ *      0 if no data was available within timeout, or wait was interrupted
  *      by signal.
  *
- * The buffer returned is only valid until the next call of this function or
- * libpq_connect/disconnect.
+ *   -1 if the server ended the COPY.
  *
  * ereports on error.
  */
-static bool
-libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
+static int
+libpqrcv_receive(int timeout, char **buffer)
 {
        int                     rawlen;
 
@@ -359,7 +457,7 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
                if (timeout > 0)
                {
                        if (!libpq_select(timeout))
-                               return false;
+                               return 0;
                }
 
                if (PQconsumeInput(streamConn) == 0)
@@ -370,23 +468,26 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
                /* Now that we've consumed some input, try again */
                rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
                if (rawlen == 0)
-                       return false;
+                       return 0;
        }
        if (rawlen == -1)                       /* end-of-streaming or error */
        {
                PGresult   *res;
 
                res = PQgetResult(streamConn);
-               if (PQresultStatus(res) == PGRES_COMMAND_OK)
+               if (PQresultStatus(res) == PGRES_COMMAND_OK ||
+                       PQresultStatus(res) == PGRES_COPY_IN)
+               {
+                       PQclear(res);
+                       return -1;
+               }
+               else
                {
                        PQclear(res);
                        ereport(ERROR,
-                                       (errmsg("replication terminated by primary server")));
+                                       (errmsg("could not receive data from WAL stream: %s",
+                                                       PQerrorMessage(streamConn))));
                }
-               PQclear(res);
-               ereport(ERROR,
-                               (errmsg("could not receive data from WAL stream: %s",
-                                               PQerrorMessage(streamConn))));
        }
        if (rawlen < -1)
                ereport(ERROR,
@@ -394,11 +495,8 @@ libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
                                                PQerrorMessage(streamConn))));
 
        /* Return received messages to caller */
-       *type = *((unsigned char *) recvBuf);
-       *buffer = recvBuf + sizeof(*type);
-       *len = rawlen - sizeof(*type);
-
-       return true;
+       *buffer = recvBuf;
+       return rawlen;
 }
 
 /*
index b6cfdac1b66e09875b3368d1f2f9dec15bb3173e..ff4cff16e4b50495659daf53d844963590dbfff7 100644 (file)
@@ -56,6 +56,7 @@ Node *replication_parse_result;
 %union {
                char                                    *str;
                bool                                    boolval;
+               int32                                   intval;
 
                XLogRecPtr                              recptr;
                Node                                    *node;
@@ -65,22 +66,26 @@ Node *replication_parse_result;
 
 /* Non-keyword tokens */
 %token <str> SCONST
+%token <intval> ICONST
 %token <recptr> RECPTR
 
 /* Keyword tokens. */
 %token K_BASE_BACKUP
 %token K_IDENTIFY_SYSTEM
+%token K_START_REPLICATION
+%token K_TIMELINE_HISTORY
 %token K_LABEL
 %token K_PROGRESS
 %token K_FAST
 %token K_NOWAIT
 %token K_WAL
-%token K_START_REPLICATION
+%token K_TIMELINE
 
 %type <node>   command
-%type <node>   base_backup start_replication identify_system
+%type <node>   base_backup start_replication identify_system timeline_history
 %type <list>   base_backup_opt_list
 %type <defelt> base_backup_opt
+%type <intval> opt_timeline
 %%
 
 firstcmd: command opt_semicolon
@@ -97,6 +102,7 @@ command:
                        identify_system
                        | base_backup
                        | start_replication
+                       | timeline_history
                        ;
 
 /*
@@ -153,15 +159,48 @@ base_backup_opt:
                        ;
 
 /*
- * START_REPLICATION %X/%X
+ * START_REPLICATION %X/%X [TIMELINE %d]
  */
 start_replication:
-                       K_START_REPLICATION RECPTR
+                       K_START_REPLICATION RECPTR opt_timeline
                                {
                                        StartReplicationCmd *cmd;
 
                                        cmd = makeNode(StartReplicationCmd);
                                        cmd->startpoint = $2;
+                                       cmd->timeline = $3;
+
+                                       $$ = (Node *) cmd;
+                               }
+                       ;
+
+opt_timeline:
+                       K_TIMELINE ICONST
+                               {
+                                       if ($2 <= 0)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                                (errmsg("invalid timeline %d", $2))));
+                                       $$ = $2;
+                               }
+                               | /* nothing */                 { $$ = 0; }
+                       ;
+
+/*
+ * TIMELINE_HISTORY %d
+ */
+timeline_history:
+                       K_TIMELINE_HISTORY ICONST
+                               {
+                                       TimeLineHistoryCmd *cmd;
+
+                                       if ($2 <= 0)
+                                               ereport(ERROR,
+                                                               (errcode(ERRCODE_SYNTAX_ERROR),
+                                                                (errmsg("invalid timeline %d", $2))));
+
+                                       cmd = makeNode(TimeLineHistoryCmd);
+                                       cmd->timeline = $2;
 
                                        $$ = (Node *) cmd;
                                }
index 51f381da000803a71a5a95cfc109a3d17e70356c..122da29016be14acebef68f31dbeacd6eab3c1b4 100644 (file)
@@ -15,6 +15,8 @@
  */
 #include "postgres.h"
 
+#include "utils/builtins.h"
+
 /* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
 #undef fprintf
 #define fprintf(file, fmt, msg)  ereport(ERROR, (errmsg_internal("%s", msg)))
@@ -49,6 +51,7 @@ xqstart                       {quote}
 xqdouble               {quote}{quote}
 xqinside               [^']+
 
+digit                  [0-9]+
 hexdigit               [0-9A-Za-z]+
 
 quote                  '
@@ -63,7 +66,9 @@ LABEL                 { return K_LABEL; }
 NOWAIT                 { return K_NOWAIT; }
 PROGRESS                       { return K_PROGRESS; }
 WAL                    { return K_WAL; }
+TIMELINE                       { return K_TIMELINE; }
 START_REPLICATION      { return K_START_REPLICATION; }
+TIMELINE_HISTORY       { return K_TIMELINE_HISTORY; }
 ","                            { return ','; }
 ";"                            { return ';'; }
 
@@ -71,6 +76,11 @@ START_REPLICATION    { return K_START_REPLICATION; }
 [\t]                   ;
 " "                            ;
 
+{digit}+               {
+                                       yylval.intval = pg_atoi(yytext, sizeof(int32), 0);
+                                       return ICONST;
+                               }
+
 {hexdigit}+\/{hexdigit}+               {
                                        uint32  hi,
                                                        lo;
index 62135037f104837b154cf7293d51418c08e1f660..303edb75a32061c0d268c1e5fe6334be4f1f9049 100644 (file)
  * WalRcv->receivedUpto variable in shared memory, to inform the startup
  * process of how far it can proceed with XLOG replay.
  *
+ * If the primary server ends streaming, but doesn't disconnect, walreceiver
+ * goes into "waiting" mode, and waits for the startup process to give new
+ * instructions. The startup process will treat that the same as
+ * disconnection, and will rescan the archive/pg_xlog directory. But when the
+ * startup process wants to try streaming replication again, it will just
+ * nudge the existing walreceiver process that's waiting, instead of launching
+ * a new one.
+ *
  * Normal termination is by SIGTERM, which instructs the walreceiver to
  * exit(0). Emergency termination is by SIGQUIT; like any postmaster child
  * process, the walreceiver will simply abort and exit on SIGQUIT. A close
@@ -38,6 +46,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/timeline.h"
 #include "access/xlog_internal.h"
 #include "libpq/pqformat.h"
 #include "libpq/pqsignal.h"
@@ -60,6 +69,10 @@ bool         hot_standby_feedback;
 
 /* libpqreceiver hooks to these when loaded */
 walrcv_connect_type walrcv_connect = NULL;
+walrcv_identify_system_type walrcv_identify_system = NULL;
+walrcv_startstreaming_type walrcv_startstreaming = NULL;
+walrcv_endstreaming_type walrcv_endstreaming = NULL;
+walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
 walrcv_receive_type walrcv_receive = NULL;
 walrcv_send_type walrcv_send = NULL;
 walrcv_disconnect_type walrcv_disconnect = NULL;
@@ -118,6 +131,8 @@ static volatile bool WalRcvImmediateInterruptOK = false;
 static void ProcessWalRcvInterrupts(void);
 static void EnableWalRcvImmediateExit(void);
 static void DisableWalRcvImmediateExit(void);
+static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
+static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
 static void WalRcvDie(int code, Datum arg);
 static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
 static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
@@ -128,6 +143,7 @@ static void ProcessWalSndrMessage(XLogRecPtr walEnd, TimestampTz sendTime);
 
 /* Signal handlers */
 static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
 static void WalRcvShutdownHandler(SIGNAL_ARGS);
 static void WalRcvQuickDieHandler(SIGNAL_ARGS);
 
@@ -171,6 +187,10 @@ WalReceiverMain(void)
 {
        char            conninfo[MAXCONNINFO];
        XLogRecPtr      startpoint;
+       TimeLineID      startpointTLI;
+       TimeLineID      primaryTLI;
+       bool            first_stream;
+
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
        TimestampTz last_recv_timestamp;
@@ -207,17 +227,21 @@ WalReceiverMain(void)
                        /* The usual case */
                        break;
 
-               case WALRCV_RUNNING:
+               case WALRCV_WAITING:
+               case WALRCV_STREAMING:
+               case WALRCV_RESTARTING:
+               default:
                        /* Shouldn't happen */
                        elog(PANIC, "walreceiver still running according to shared memory state");
        }
        /* Advertise our PID so that the startup process can kill us */
        walrcv->pid = MyProcPid;
-       walrcv->walRcvState = WALRCV_RUNNING;
+       walrcv->walRcvState = WALRCV_STREAMING;
 
        /* Fetch information required to start streaming */
        strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
        startpoint = walrcv->receiveStart;
+       startpointTLI = walrcv->receiveStartTLI;
 
        /* Initialise to a sanish value */
        walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
@@ -227,6 +251,8 @@ WalReceiverMain(void)
        /* Arrange to clean up at walreceiver exit */
        on_shmem_exit(WalRcvDie, 0);
 
+       OwnLatch(&walrcv->latch);
+
        /*
         * If possible, make this process a group leader, so that the postmaster
         * can signal any child processes too.  (walreceiver probably never has
@@ -246,7 +272,7 @@ WalReceiverMain(void)
        pqsignal(SIGQUIT, WalRcvQuickDieHandler);       /* hard crash time */
        pqsignal(SIGALRM, SIG_IGN);
        pqsignal(SIGPIPE, SIG_IGN);
-       pqsignal(SIGUSR1, SIG_IGN);
+       pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
        pqsignal(SIGUSR2, SIG_IGN);
 
        /* Reset some signals that are accepted by postmaster but not here */
@@ -261,8 +287,12 @@ WalReceiverMain(void)
 
        /* Load the libpq-specific functions */
        load_file("libpqwalreceiver", false);
-       if (walrcv_connect == NULL || walrcv_receive == NULL ||
-               walrcv_send == NULL || walrcv_disconnect == NULL)
+       if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
+               walrcv_endstreaming == NULL ||
+               walrcv_identify_system == NULL ||
+               walrcv_readtimelinehistoryfile == NULL ||
+               walrcv_receive == NULL || walrcv_send == NULL ||
+               walrcv_disconnect == NULL)
                elog(ERROR, "libpqwalreceiver didn't initialize correctly");
 
        /*
@@ -276,122 +306,360 @@ WalReceiverMain(void)
 
        /* Establish the connection to the primary for XLOG streaming */
        EnableWalRcvImmediateExit();
-       walrcv_connect(conninfo, startpoint);
+       walrcv_connect(conninfo);
        DisableWalRcvImmediateExit();
 
-       /* Initialize LogstreamResult and buffers for processing messages */
-       LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
-       initStringInfo(&reply_message);
-       initStringInfo(&incoming_message);
-
-       /* Initialize the last recv timestamp */
-       last_recv_timestamp = GetCurrentTimestamp();
-       ping_sent = false;
-
-       /* Loop until end-of-streaming or error */
+       first_stream = true;
        for (;;)
        {
-               unsigned char type;
-               char       *buf;
-               int                     len;
-
                /*
-                * Emergency bailout if postmaster has died.  This is to avoid the
-                * necessity for manual cleanup of all postmaster children.
+                * Check that we're connected to a valid server using the
+                * IDENTIFY_SYSTEM replication command,
                 */
-               if (!PostmasterIsAlive())
-                       exit(1);
+               EnableWalRcvImmediateExit();
+               walrcv_identify_system(&primaryTLI);
+               DisableWalRcvImmediateExit();
 
                /*
-                * Exit walreceiver if we're not in recovery. This should not happen,
-                * but cross-check the status here.
+                * Confirm that the current timeline of the primary is the same or
+                * ahead of ours.
                 */
-               if (!RecoveryInProgress())
-                       ereport(FATAL,
-                                       (errmsg("cannot continue WAL streaming, recovery has already ended")));
-
-               /* Process any requests or signals received recently */
-               ProcessWalRcvInterrupts();
+               if (primaryTLI < startpointTLI)
+                       ereport(ERROR,
+                                       (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
+                                                       primaryTLI, startpointTLI)));
 
-               if (got_SIGHUP)
-               {
-                       got_SIGHUP = false;
-                       ProcessConfigFile(PGC_SIGHUP);
-               }
+               /*
+                * Get any missing history files. We do this always, even when we're
+                * not interested in that timeline, so that if we're promoted to become
+                * the master later on, we don't select the same timeline that was
+                * already used in the current master. This isn't bullet-proof - you'll
+                * need some external software to manage your cluster if you need to
+                * ensure that a unique timeline id is chosen in every case, but let's
+                * avoid the confusion of timeline id collisions where we can.
+                */
+               WalRcvFetchTimeLineHistoryFiles(startpointTLI + 1, primaryTLI);
 
-               /* Wait a while for data to arrive */
-               if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
+               /*
+                * Start streaming.
+                *
+                * We'll try to start at the requested starting point and timeline,
+                * even if it's different from the server's latest timeline. In case
+                * we've already reached the end of the old timeline, the server will
+                * finish the streaming immediately, and we will go back to await
+                * orders from the startup process. If recovery_target_timeline is
+                * 'latest', the startup process will scan pg_xlog and find the new
+                * history file, bump recovery target timeline, and ask us to restart
+                * on the new timeline.
+                */
+               ThisTimeLineID = startpointTLI;
+               if (walrcv_startstreaming(startpointTLI, startpoint))
                {
-                       /* Something was received from master, so reset timeout */
+                       bool endofwal = false;
+
+                       if (first_stream)
+                               ereport(LOG,
+                                               (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
+                                                               (uint32) (startpoint >> 32), (uint32) startpoint,
+                                                               startpointTLI)));
+                       else
+                               ereport(LOG,
+                                               (errmsg("restarted WAL streaming at %X/%X on timeline %u",
+                                                               (uint32) (startpoint >> 32), (uint32) startpoint,
+                                                               startpointTLI)));
+                       first_stream = false;
+
+                       /* Initialize LogstreamResult and buffers for processing messages */
+                       LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
+                       initStringInfo(&reply_message);
+                       initStringInfo(&incoming_message);
+
+                       /* Initialize the last recv timestamp */
                        last_recv_timestamp = GetCurrentTimestamp();
                        ping_sent = false;
 
-                       /* Accept the received data, and process it */
-                       XLogWalRcvProcessMsg(type, buf, len);
-
-                       /* Receive any more data we can without sleeping */
-                       while (walrcv_receive(0, &type, &buf, &len))
+                       /* Loop until end-of-streaming or error */
+                       while (!endofwal)
                        {
-                               last_recv_timestamp = GetCurrentTimestamp();
-                               ping_sent = false;
-                               XLogWalRcvProcessMsg(type, buf, len);
-                       }
+                               char       *buf;
+                               int                     len;
 
-                       /* Let the master know that we received some data. */
-                       XLogWalRcvSendReply(false, false);
+                               /*
+                                * Emergency bailout if postmaster has died.  This is to avoid
+                                * the necessity for manual cleanup of all postmaster children.
+                                */
+                               if (!PostmasterIsAlive())
+                                       exit(1);
+
+                               /*
+                                * Exit walreceiver if we're not in recovery. This should not
+                                * happen, but cross-check the status here.
+                                */
+                               if (!RecoveryInProgress())
+                                       ereport(FATAL,
+                                                       (errmsg("cannot continue WAL streaming, recovery has already ended")));
+
+                               /* Process any requests or signals received recently */
+                               ProcessWalRcvInterrupts();
+
+                               if (got_SIGHUP)
+                               {
+                                       got_SIGHUP = false;
+                                       ProcessConfigFile(PGC_SIGHUP);
+                               }
+
+                               /* Wait a while for data to arrive */
+                               len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+                               if (len != 0)
+                               {
+                                       /*
+                                        * Process the received data, and any subsequent data we
+                                        * can read without blocking.
+                                        */
+                                       for (;;)
+                                       {
+                                               if (len > 0)
+                                               {
+                                                       /* Something was received from master, so reset timeout */
+                                                       last_recv_timestamp = GetCurrentTimestamp();
+                                                       ping_sent = false;
+                                                       XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+                                               }
+                                               else if (len == 0)
+                                                       break;
+                                               else if (len < 0)
+                                               {
+                                                       ereport(LOG,
+                                                                       (errmsg("replication terminated by primary server"),
+                                                                        errdetail("End of WAL reached on timeline %u", startpointTLI)));
+                                                       endofwal = true;
+                                                       break;
+                                               }
+                                               len = walrcv_receive(0, &buf);
+                                       }
+
+                                       /* Let the master know that we received some data. */
+                                       XLogWalRcvSendReply(false, false);
+
+                                       /*
+                                        * If we've written some records, flush them to disk and
+                                        * let the startup process and primary server know about
+                                        * them.
+                                        */
+                                       XLogWalRcvFlush(false);
+                               }
+                               else
+                               {
+                                       /*
+                                        * We didn't receive anything new. If we haven't heard
+                                        * anything from the server for more than
+                                        * wal_receiver_timeout / 2, ping the server. Also, if it's
+                                        * been longer than wal_receiver_status_interval since the
+                                        * last update we sent, send a status update to the master
+                                        * anyway, to report any progress in applying WAL.
+                                        */
+                                       bool requestReply = false;
+
+                                       /*
+                                        * Check if time since last receive from standby has
+                                        * reached the configured limit.
+                                        */
+                                       if (wal_receiver_timeout > 0)
+                                       {
+                                               TimestampTz now = GetCurrentTimestamp();
+                                               TimestampTz timeout;
+
+                                               timeout =
+                                                       TimestampTzPlusMilliseconds(last_recv_timestamp,
+                                                                                                               wal_receiver_timeout);
+
+                                               if (now >= timeout)
+                                                       ereport(ERROR,
+                                                                       (errmsg("terminating walreceiver due to timeout")));
+
+                                               /*
+                                                * We didn't receive anything new, for half of receiver
+                                                * replication timeout. Ping the server.
+                                                */
+                                               if (!ping_sent)
+                                               {
+                                                       timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+                                                                                                                                 (wal_receiver_timeout/2));
+                                                       if (now >= timeout)
+                                                       {
+                                                               requestReply = true;
+                                                               ping_sent = true;
+                                                       }
+                                               }
+                                       }
+
+                                       XLogWalRcvSendReply(requestReply, requestReply);
+                                       XLogWalRcvSendHSFeedback();
+                               }
+                       }
 
                        /*
-                        * If we've written some records, flush them to disk and let the
-                        * startup process and primary server know about them.
+                        * The backend finished streaming. Exit streaming COPY-mode from
+                        * our side, too.
                         */
+                       EnableWalRcvImmediateExit();
+                       walrcv_endstreaming();
+                       DisableWalRcvImmediateExit();
+               }
+               else
+                       ereport(LOG,
+                                       (errmsg("primary server contains no more WAL on requested timeline %u",
+                                                       startpointTLI)));
+
+               /*
+                * End of WAL reached on the requested timeline. Close the last
+                * segment, and await for new orders from the startup process.
+                */
+               if (recvFile >= 0)
+               {
                        XLogWalRcvFlush(false);
+                       if (close(recvFile) != 0)
+                               ereport(PANIC,
+                                               (errcode_for_file_access(),
+                                                errmsg("could not close log segment %s: %m",
+                                                               XLogFileNameP(recvFileTLI, recvSegNo))));
                }
+               recvFile = -1;
+
+               elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
+               WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+       }
+       /* not reached */
+}
+
+/*
+ * Wait for startup process to set receiveStart and receiveStartTLI.
+ */
+static void
+WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+       int                     state;
+
+       SpinLockAcquire(&walrcv->mutex);
+       state = walrcv->walRcvState;
+       if (state != WALRCV_STREAMING)
+       {
+               SpinLockRelease(&walrcv->mutex);
+               if (state == WALRCV_STOPPING)
+                       proc_exit(0);
                else
+                       elog(FATAL, "unexpected walreceiver state");
+       }
+       walrcv->walRcvState = WALRCV_WAITING;
+       walrcv->receiveStart = InvalidXLogRecPtr;
+       walrcv->receiveStartTLI = 0;
+       SpinLockRelease(&walrcv->mutex);
+
+       if (update_process_title)
+               set_ps_display("idle", false);
+
+       /*
+        * nudge startup process to notice that we've stopped streaming and are
+        * now waiting for instructions.
+        */
+       WakeupRecovery();
+       for (;;)
+       {
+               ResetLatch(&walrcv->latch);
+
+               /*
+                * Emergency bailout if postmaster has died.  This is to avoid the
+                * necessity for manual cleanup of all postmaster children.
+                */
+               if (!PostmasterIsAlive())
+                       exit(1);
+
+               ProcessWalRcvInterrupts();
+
+               SpinLockAcquire(&walrcv->mutex);
+               Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
+                          walrcv->walRcvState == WALRCV_WAITING ||
+                          walrcv->walRcvState == WALRCV_STOPPING);
+               if (walrcv->walRcvState == WALRCV_RESTARTING)
+               {
+                       /* we don't expect primary_conninfo to change */
+                       *startpoint = walrcv->receiveStart;
+                       *startpointTLI = walrcv->receiveStartTLI;
+                       walrcv->walRcvState = WALRCV_STREAMING;
+                       SpinLockRelease(&walrcv->mutex);
+                       break;
+               }
+               if (walrcv->walRcvState == WALRCV_STOPPING)
                {
                        /*
-                        * We didn't receive anything new. If we haven't heard anything
-                        * from the server for more than wal_receiver_timeout / 2,
-                        * ping the server. Also, if it's been longer than
-                        * wal_receiver_status_interval since the last update we sent,
-                        * send a status update to the master anyway, to report any
-                        * progress in applying WAL.
+                        * We should've received SIGTERM if the startup process wants
+                        * us to die, but might as well check it here too.
                         */
-                       bool requestReply = false;
+                       SpinLockRelease(&walrcv->mutex);
+                       exit(1);
+               }
+               SpinLockRelease(&walrcv->mutex);
 
-                       /*
-                        * Check if time since last receive from standby has reached the
-                        * configured limit.
-                        */
-                       if (wal_receiver_timeout > 0)
-                       {
-                               TimestampTz now = GetCurrentTimestamp();
-                               TimestampTz timeout;
+               WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+       }
+
+       if (update_process_title)
+       {
+               char            activitymsg[50];
 
-                               timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                                                                         wal_receiver_timeout);
+               snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
+                                (uint32) (*startpoint >> 32),
+                                (uint32) *startpoint);
+               set_ps_display(activitymsg, false);
+       }
+}
 
-                               if (now >= timeout)
-                                       ereport(ERROR,
-                                                       (errmsg("terminating walreceiver due to timeout")));
+/*
+ * Fetch any missing timeline history files between 'first' and 'last'
+ * (inclusive) from the server.
+ */
+static void
+WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
+{
+       TimeLineID tli;
 
-                               /*
-                                * We didn't receive anything new, for half of receiver
-                                * replication timeout. Ping the server.
-                                */
-                               if (!ping_sent)
-                               {
-                                       timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
-                                                                                                                 (wal_receiver_timeout/2));
-                                       if (now >= timeout)
-                                       {
-                                               requestReply = true;
-                                               ping_sent = true;
-                                       }
-                               }
-                       }
+       for (tli = first; tli <= last; tli++)
+       {
+               if (!existsTimeLineHistory(tli))
+               {
+                       char       *fname;
+                       char       *content;
+                       int                     len;
+                       char            expectedfname[MAXFNAMELEN];
 
-                       XLogWalRcvSendReply(requestReply, requestReply);
-                       XLogWalRcvSendHSFeedback();
+                       ereport(LOG,
+                                       (errmsg("fetching timeline history file for timeline %u from primary server",
+                                                       tli)));
+
+                       EnableWalRcvImmediateExit();
+                       walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+                       DisableWalRcvImmediateExit();
+
+                       /*
+                        * Check that the filename on the master matches what we calculated
+                        * ourselves. This is just a sanity check, it should always match.
+                        */
+                       TLHistoryFileName(expectedfname, tli);
+                       if (strcmp(fname, expectedfname) != 0)
+                               ereport(ERROR,
+                                               (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
+                                                                                tli)));
+
+                       /*
+                        * Write the file to pg_xlog.
+                        */
+                       writeTimeLineHistoryFile(tli, content, len);
+
+                       pfree(fname);
+                       pfree(content);
                }
        }
 }
@@ -408,9 +676,15 @@ WalRcvDie(int code, Datum arg)
        /* Ensure that all WAL records received are flushed to disk */
        XLogWalRcvFlush(true);
 
+       DisownLatch(&walrcv->latch);
+
        SpinLockAcquire(&walrcv->mutex);
-       Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+       Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+                  walrcv->walRcvState == WALRCV_RESTARTING ||
+                  walrcv->walRcvState == WALRCV_STARTING ||
+                  walrcv->walRcvState == WALRCV_WAITING ||
                   walrcv->walRcvState == WALRCV_STOPPING);
+       Assert(walrcv->pid == MyProcPid);
        walrcv->walRcvState = WALRCV_STOPPED;
        walrcv->pid = 0;
        SpinLockRelease(&walrcv->mutex);
@@ -418,6 +692,9 @@ WalRcvDie(int code, Datum arg)
        /* Terminate the connection gracefully. */
        if (walrcv_disconnect != NULL)
                walrcv_disconnect();
+
+       /* Wake up the startup process to notice promptly that we're gone */
+       WakeupRecovery();
 }
 
 /* SIGHUP: set flag to re-read config file at next convenient time */
@@ -427,6 +704,14 @@ WalRcvSigHupHandler(SIGNAL_ARGS)
        got_SIGHUP = true;
 }
 
+
+/* SIGUSR1: used by latch mechanism */
+static void
+WalRcvSigUsr1Handler(SIGNAL_ARGS)
+{
+       latch_sigusr1_handler();
+}
+
 /* SIGTERM: set flag for main loop, or shutdown immediately if safe */
 static void
 WalRcvShutdownHandler(SIGNAL_ARGS)
@@ -435,6 +720,8 @@ WalRcvShutdownHandler(SIGNAL_ARGS)
 
        got_SIGTERM = true;
 
+       SetLatch(&WalRcv->latch);
+
        /* Don't joggle the elbow of proc_exit */
        if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
                ProcessWalRcvInterrupts();
@@ -661,6 +948,7 @@ XLogWalRcvFlush(bool dying)
                {
                        walrcv->latestChunkStart = walrcv->receivedUpto;
                        walrcv->receivedUpto = LogstreamResult.Flush;
+                       walrcv->receivedTLI = ThisTimeLineID;
                }
                SpinLockRelease(&walrcv->mutex);
 
@@ -738,7 +1026,7 @@ XLogWalRcvSendReply(bool force, bool requestReply)
        /* Construct a new message */
        writePtr = LogstreamResult.Write;
        flushPtr = LogstreamResult.Flush;
-       applyPtr = GetXLogReplayRecPtr(NULL);
+       applyPtr = GetXLogReplayRecPtr();
 
        resetStringInfo(&reply_message);
        pq_sendbyte(&reply_message, 'r');
index 9eba180f04973df6c4f0bc5a73686f0aca5dcb14..a8ccfc66398bab64b90c8443182a0a5f1ca10d43 100644 (file)
@@ -64,12 +64,13 @@ WalRcvShmemInit(void)
                MemSet(WalRcv, 0, WalRcvShmemSize());
                WalRcv->walRcvState = WALRCV_STOPPED;
                SpinLockInit(&WalRcv->mutex);
+               InitSharedLatch(&WalRcv->latch);
        }
 }
 
-/* Is walreceiver in progress (or starting up)? */
+/* Is walreceiver running (or starting up)? */
 bool
-WalRcvInProgress(void)
+WalRcvRunning(void)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
@@ -110,6 +111,53 @@ WalRcvInProgress(void)
                return false;
 }
 
+/*
+ * Is walreceiver running and streaming (or at least attempting to connect,
+ * or starting up)?
+ */
+bool
+WalRcvStreaming(void)
+{
+       /* use volatile pointer to prevent code rearrangement */
+       volatile WalRcvData *walrcv = WalRcv;
+       WalRcvState state;
+       pg_time_t       startTime;
+
+       SpinLockAcquire(&walrcv->mutex);
+
+       state = walrcv->walRcvState;
+       startTime = walrcv->startTime;
+
+       SpinLockRelease(&walrcv->mutex);
+
+       /*
+        * If it has taken too long for walreceiver to start up, give up. Setting
+        * the state to STOPPED ensures that if walreceiver later does start up
+        * after all, it will see that it's not supposed to be running and die
+        * without doing anything.
+        */
+       if (state == WALRCV_STARTING)
+       {
+               pg_time_t       now = (pg_time_t) time(NULL);
+
+               if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
+               {
+                       SpinLockAcquire(&walrcv->mutex);
+
+                       if (walrcv->walRcvState == WALRCV_STARTING)
+                               state = walrcv->walRcvState = WALRCV_STOPPED;
+
+                       SpinLockRelease(&walrcv->mutex);
+               }
+       }
+
+       if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+               state == WALRCV_RESTARTING)
+               return true;
+       else
+               return false;
+}
+
 /*
  * Stop walreceiver (if running) and wait for it to die.
  * Executed by the Startup process.
@@ -135,7 +183,9 @@ ShutdownWalRcv(void)
                        walrcv->walRcvState = WALRCV_STOPPED;
                        break;
 
-               case WALRCV_RUNNING:
+               case WALRCV_STREAMING:
+               case WALRCV_WAITING:
+               case WALRCV_RESTARTING:
                        walrcv->walRcvState = WALRCV_STOPPING;
                        /* fall through */
                case WALRCV_STOPPING:
@@ -154,7 +204,7 @@ ShutdownWalRcv(void)
         * Wait for walreceiver to acknowledge its death by setting state to
         * WALRCV_STOPPED.
         */
-       while (WalRcvInProgress())
+       while (WalRcvRunning())
        {
                /*
                 * This possibly-long loop needs to handle interrupts of startup
@@ -173,10 +223,11 @@ ShutdownWalRcv(void)
  * is a libpq connection string to use.
  */
 void
-RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
+       bool            launch = false;
        pg_time_t       now = (pg_time_t) time(NULL);
 
        /*
@@ -190,14 +241,22 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
 
        SpinLockAcquire(&walrcv->mutex);
 
-       /* It better be stopped before we try to restart it */
-       Assert(walrcv->walRcvState == WALRCV_STOPPED);
+       /* It better be stopped if we try to restart it */
+       Assert(walrcv->walRcvState == WALRCV_STOPPED ||
+                  walrcv->walRcvState == WALRCV_WAITING);
 
        if (conninfo != NULL)
                strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
        else
                walrcv->conninfo[0] = '\0';
-       walrcv->walRcvState = WALRCV_STARTING;
+
+       if (walrcv->walRcvState == WALRCV_STOPPED)
+       {
+               launch = true;
+               walrcv->walRcvState = WALRCV_STARTING;
+       }
+       else
+               walrcv->walRcvState = WALRCV_RESTARTING;
        walrcv->startTime = now;
 
        /*
@@ -210,10 +269,14 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
                walrcv->latestChunkStart = recptr;
        }
        walrcv->receiveStart = recptr;
+       walrcv->receiveStartTLI = tli;
 
        SpinLockRelease(&walrcv->mutex);
 
-       SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+       if (launch)
+               SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+       else
+               SetLatch(&walrcv->latch);
 }
 
 /*
@@ -221,10 +284,11 @@ RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
  *
  * Optionally, returns the previous chunk start, that is the first byte
  * written in the most recent walreceiver flush cycle. Callers not
- * interested in that value may pass NULL for latestChunkStart.
+ * interested in that value may pass NULL for latestChunkStart. Same for
+ * receiveTLI.
  */
 XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
+GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
 {
        /* use volatile pointer to prevent code rearrangement */
        volatile WalRcvData *walrcv = WalRcv;
@@ -234,6 +298,8 @@ GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
        recptr = walrcv->receivedUpto;
        if (latestChunkStart)
                *latestChunkStart = walrcv->latestChunkStart;
+       if (receiveTLI)
+               *receiveTLI = walrcv->receivedTLI;
        SpinLockRelease(&walrcv->mutex);
 
        return recptr;
@@ -258,7 +324,7 @@ GetReplicationApplyDelay(void)
        receivePtr = walrcv->receivedUpto;
        SpinLockRelease(&walrcv->mutex);
 
-       replayPtr = GetXLogReplayRecPtr(NULL);
+       replayPtr = GetXLogReplayRecPtr();
 
        if (XLByteEQ(receivePtr, replayPtr))
                return 0;
index 8774d7e8229edc34634bf4bcf7e4b5469e3fe5ad..aec57f5535fc42f17330fc066c1e10175663f963 100644 (file)
@@ -7,10 +7,15 @@
  * (Note that there can be more than one walsender process concurrently.)
  * It is started by the postmaster when the walreceiver of a standby server
  * connects to the primary server and requests XLOG streaming replication.
- * It attempts to keep reading XLOG records from the disk and sending them
- * to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is a one-to-one relationship between a connection
- * and a walsender process).
+ *
+ * A walsender is similar to a regular backend, ie. there is a one-to-one
+ * relationship between a connection and a walsender process, but instead
+ * of processing SQL queries, it understands a small set of special
+ * replication-mode commands. The START_REPLICATION command begins streaming
+ * WAL to the client. While streaming, the walsender keeps reading XLOG
+ * records from the disk and sends them to the standby server over the
+ * COPY protocol, until the either side ends the replication by exiting COPY
+ * mode (or until the connection is closed).
  *
  * Normal termination is by SIGTERM, which instructs the walsender to
  * close the connection and exit(0) at next convenient moment. Emergency
@@ -37,6 +42,7 @@
 #include <signal.h>
 #include <unistd.h>
 
+#include "access/timeline.h"
 #include "access/transam.h"
 #include "access/xlog_internal.h"
 #include "catalog/pg_type.h"
@@ -87,8 +93,6 @@ bool          am_walsender = false;           /* Am I a walsender process ? */
 bool           am_cascading_walsender = false;         /* Am I cascading WAL to
                                                                                                 * another standby ? */
 
-static bool    replication_started = false; /* Started streaming yet? */
-
 /* User-settable parameters for walsender */
 int                    max_wal_senders = 0;    /* the maximum number of concurrent walsenders */
 int                    wal_sender_timeout = 60 * 1000; /* maximum time to send one
@@ -106,6 +110,16 @@ static int sendFile = -1;
 static XLogSegNo sendSegNo = 0;
 static uint32 sendOff = 0;
 
+/*
+ * These variables keep track of the state of the timeline we're currently
+ * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
+ * the timeline is not the latest timeline on this server, and the server's
+ * history forked off from that timeline at sendTimeLineValidUpto.
+ */
+static TimeLineID      sendTimeLine = 0;
+static bool                    sendTimeLineIsHistoric = false;
+static XLogRecPtr      sendTimeLineValidUpto = InvalidXLogRecPtr;
+
 /*
  * How far have we sent WAL already? This is also advertised in
  * MyWalSnd->sentPtr.  (Actually, this is the next WAL location to send.)
@@ -124,9 +138,26 @@ static TimestampTz last_reply_timestamp;
 /* Have we sent a heartbeat message asking for reply, since last reply? */
 static bool    ping_sent = false;
 
+/*
+ * While streaming WAL in Copy mode, streamingDoneSending is set to true
+ * after we have sent CopyDone. We should not send any more CopyData messages
+ * after that. streamingDoneReceiving is set to true when we receive CopyDone
+ * from the other end. When both become true, it's time to exit Copy mode.
+ */
+static bool    streamingDoneSending;
+static bool    streamingDoneReceiving;
+
 /* Flags set by signal handlers for later service in main loop */
 static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_ready_to_stop = false;
+static volatile sig_atomic_t walsender_ready_to_stop = false;
+
+/*
+ * This is set while we are streaming. When not set, SIGUSR2 signal will be
+ * handled like SIGTERM. When set, the main loop is responsible for checking
+ * walsender_ready_to_stop and terminating when it's set (after streaming any
+ * remaining WAL).
+ */
+static volatile sig_atomic_t replication_active = false;
 
 /* Signal handlers */
 static void WalSndSigHupHandler(SIGNAL_ARGS);
@@ -134,7 +165,7 @@ static void WalSndXLogSendHandler(SIGNAL_ARGS);
 static void WalSndLastCycleHandler(SIGNAL_ARGS);
 
 /* Prototypes for private functions */
-static void WalSndLoop(void) __attribute__((noreturn));
+static void WalSndLoop(void);
 static void InitWalSenderSlot(void);
 static void WalSndKill(int code, Datum arg);
 static void XLogSend(bool *caughtup);
@@ -164,6 +195,16 @@ InitWalSender(void)
         */
        if (am_cascading_walsender)
                ThisTimeLineID = GetRecoveryTargetTLI();
+
+       /*
+        * Let postmaster know that we're a WAL sender. Once we've declared us as
+        * a WAL sender process, postmaster will let us outlive the bgwriter and
+        * kill us last in the shutdown sequence, so we get a chance to stream all
+        * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+        * there's no going back, and we mustn't write any WAL records after this.
+        */
+       MarkPostmasterChildWalSender();
+       SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
 }
 
 /*
@@ -182,17 +223,16 @@ WalSndErrorCleanup()
                sendFile = -1;
        }
 
-       /*
-        * Don't return back to the command loop after we've started replicating.
-        * We've already marked us as an actively streaming WAL sender in the
-        * PMSignal slot, and there's currently no way to undo that.
-        */
-       if (replication_started)
+       replication_active = false;
+       if (walsender_ready_to_stop)
                proc_exit(0);
+
+       /* Revert back to startup state */
+       WalSndSetState(WALSNDSTATE_STARTUP);
 }
 
 /*
- * IDENTIFY_SYSTEM
+ * Handle the IDENTIFY_SYSTEM command.
  */
 static void
 IdentifySystem(void)
@@ -210,9 +250,17 @@ IdentifySystem(void)
 
        snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
                         GetSystemIdentifier());
-       snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
-       logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
+       am_cascading_walsender = RecoveryInProgress();
+       if (am_cascading_walsender)
+       {
+               logptr = GetStandbyFlushRecPtr();
+               ThisTimeLineID = GetRecoveryTargetTLI();
+       }
+       else
+               logptr = GetInsertRecPtr();
+
+       snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
 
        snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
 
@@ -261,56 +309,106 @@ IdentifySystem(void)
        pq_endmessage(&buf);
 }
 
+
 /*
- * Handle START_REPLICATION command.
- *
- * At the moment, this never returns, but an ereport(ERROR) will take us back
- * to the main loop.
+ * Handle TIMELINE_HISTORY command.
  */
 static void
-StartReplication(StartReplicationCmd *cmd)
+SendTimeLineHistory(TimeLineHistoryCmd *cmd)
 {
        StringInfoData buf;
+       char            histfname[MAXFNAMELEN];
+       char            path[MAXPGPATH];
+       int                     fd;
+       size_t          histfilelen;
+       size_t          bytesleft;
 
        /*
-        * Let postmaster know that we're streaming. Once we've declared us as a
-        * WAL sender process, postmaster will let us outlive the bgwriter and
-        * kill us last in the shutdown sequence, so we get a chance to stream all
-        * remaining WAL at shutdown, including the shutdown checkpoint. Note that
-        * there's no going back, and we mustn't write any WAL records after this.
+        * Reply with a result set with one row, and two columns. The first col
+        * is the name of the history file, 2nd is the contents.
         */
-       MarkPostmasterChildWalSender();
-       SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
-       replication_started = true;
 
-       /*
-        * When promoting a cascading standby, postmaster sends SIGUSR2 to any
-        * cascading walsenders to kill them. But there is a corner-case where
-        * such walsender fails to receive SIGUSR2 and survives a standby
-        * promotion unexpectedly. This happens when postmaster sends SIGUSR2
-        * before the walsender marks itself as a WAL sender, because postmaster
-        * sends SIGUSR2 to only the processes marked as a WAL sender.
-        *
-        * To avoid this corner-case, if recovery is NOT in progress even though
-        * the walsender is cascading one, we do the same thing as SIGUSR2 signal
-        * handler does, i.e., set walsender_ready_to_stop to true. Which causes
-        * the walsender to end later.
-        *
-        * When terminating cascading walsenders, usually postmaster writes the
-        * log message announcing the terminations. But there is a race condition
-        * here. If there is no walsender except this process before reaching
-        * here, postmaster thinks that there is no walsender and suppresses that
-        * log message. To handle this case, we always emit that log message here.
-        * This might cause duplicate log messages, but which is less likely to
-        * happen, so it's not worth writing some code to suppress them.
-        */
-       if (am_cascading_walsender && !RecoveryInProgress())
+       TLHistoryFileName(histfname, cmd->timeline);
+       TLHistoryFilePath(path, cmd->timeline);
+
+       /* Send a RowDescription message */
+       pq_beginmessage(&buf, 'T');
+       pq_sendint(&buf, 2, 2);         /* 2 fields */
+
+       /* first field */
+       pq_sendstring(&buf, "filename");        /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, TEXTOID, 4);           /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+
+       /* second field */
+       pq_sendstring(&buf, "content"); /* col name */
+       pq_sendint(&buf, 0, 4);         /* table oid */
+       pq_sendint(&buf, 0, 2);         /* attnum */
+       pq_sendint(&buf, BYTEAOID, 4);          /* type oid */
+       pq_sendint(&buf, -1, 2);        /* typlen */
+       pq_sendint(&buf, 0, 4);         /* typmod */
+       pq_sendint(&buf, 0, 2);         /* format code */
+       pq_endmessage(&buf);
+
+       /* Send a DataRow message */
+       pq_beginmessage(&buf, 'D');
+       pq_sendint(&buf, 2, 2);         /* # of columns */
+       pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
+       pq_sendbytes(&buf, histfname, strlen(histfname));
+
+       fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
+       if (fd < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not open file \"%s\": %m", path)));
+
+       /* Determine file length and send it to client */
+       histfilelen = lseek(fd, 0, SEEK_END);
+       if (histfilelen < 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not seek to end of file \"%s\": %m", path)));
+       if (lseek(fd, 0, SEEK_SET) != 0)
+               ereport(ERROR,
+                               (errcode_for_file_access(),
+                                errmsg("could not seek to beginning of file \"%s\": %m", path)));
+
+       pq_sendint(&buf, histfilelen, 4);       /* col2 len */
+
+       bytesleft = histfilelen;
+       while (bytesleft > 0)
        {
-               ereport(LOG,
-                  (errmsg("terminating walsender process to force cascaded standby "
-                                  "to update timeline and reconnect")));
-               walsender_ready_to_stop = true;
+               char rbuf[BLCKSZ];
+               int nread;
+
+               nread = read(fd, rbuf, sizeof(rbuf));
+               if (nread <= 0)
+                       ereport(ERROR,
+                                       (errcode_for_file_access(),
+                                        errmsg("could not read file \"%s\": %m",
+                                                       path)));
+               pq_sendbytes(&buf, rbuf, nread);
+               bytesleft -= nread;
        }
+       CloseTransientFile(fd);
+
+       pq_endmessage(&buf);
+}
+
+/*
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
+ */
+static void
+StartReplication(StartReplicationCmd *cmd)
+{
+       StringInfoData buf;
 
        /*
         * We assume here that we're logging enough information in the WAL for
@@ -322,42 +420,144 @@ StartReplication(StartReplicationCmd *cmd)
         */
 
        /*
-        * When we first start replication the standby will be behind the primary.
-        * For some applications, for example, synchronous replication, it is
-        * important to have a clear state for this initial catchup mode, so we
-        * can trigger actions when we change streaming state later. We may stay
-        * in this state for a long time, which is exactly why we want to be able
-        * to monitor whether or not we are still here.
+        * Select the timeline. If it was given explicitly by the client, use
+        * that. Otherwise use the current ThisTimeLineID.
         */
-       WalSndSetState(WALSNDSTATE_CATCHUP);
+       if (cmd->timeline != 0)
+       {
+               XLogRecPtr      switchpoint;
 
-       /* Send a CopyBothResponse message, and start streaming */
-       pq_beginmessage(&buf, 'W');
-       pq_sendbyte(&buf, 0);
-       pq_sendint(&buf, 0, 2);
-       pq_endmessage(&buf);
-       pq_flush();
+               sendTimeLine = cmd->timeline;
+               if (sendTimeLine == ThisTimeLineID)
+               {
+                       sendTimeLineIsHistoric = false;
+                       sendTimeLineValidUpto = InvalidXLogRecPtr;
+               }
+               else
+               {
+                       List       *timeLineHistory;
 
-       /*
-        * Initialize position to the received one, then the xlog records begin to
-        * be shipped from that position
-        */
-       sentPtr = cmd->startpoint;
+                       sendTimeLineIsHistoric = true;
 
-       /* Also update the start position status in shared memory */
-       {
-               /* use volatile pointer to prevent code rearrangement */
-               volatile WalSnd *walsnd = MyWalSnd;
+                       /*
+                        * Check that the timeline the client requested for exists, and the
+                        * requested start location is on that timeline.
+                        */
+                       timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+                       switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+                       list_free_deep(timeLineHistory);
 
-               SpinLockAcquire(&walsnd->mutex);
-               walsnd->sentPtr = sentPtr;
-               SpinLockRelease(&walsnd->mutex);
+                       /*
+                        * Found the requested timeline in the history. Check that
+                        * requested startpoint is on that timeline in our history.
+                        *
+                        * This is quite loose on purpose. We only check that we didn't
+                        * fork off the requested timeline before the switchpoint. We don't
+                        * check that we switched *to* it before the requested starting
+                        * point. This is because the client can legitimately request to
+                        * start replication from the beginning of the WAL segment that
+                        * contains switchpoint, but on the new timeline, so that it
+                        * doesn't end up with a partial segment. If you ask for a too old
+                        * starting point, you'll get an error later when we fail to find
+                        * the requested WAL segment in pg_xlog.
+                        *
+                        * XXX: we could be more strict here and only allow a startpoint
+                        * that's older than the switchpoint, if it it's still in the same
+                        * WAL segment.
+                        */
+                       if (!XLogRecPtrIsInvalid(switchpoint) &&
+                               XLByteLT(switchpoint, cmd->startpoint))
+                       {
+                               ereport(ERROR,
+                                               (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
+                                                               (uint32) (cmd->startpoint >> 32),
+                                                               (uint32) (cmd->startpoint),
+                                                               cmd->timeline),
+                                                errdetail("This server's history forked from timeline %u at %X/%X",
+                                                                  cmd->timeline,
+                                                                  (uint32) (switchpoint >> 32),
+                                                                  (uint32) (switchpoint))));
+                       }
+                       sendTimeLineValidUpto = switchpoint;
+               }
+       }
+       else
+       {
+               sendTimeLine = ThisTimeLineID;
+               sendTimeLineValidUpto = InvalidXLogRecPtr;
+               sendTimeLineIsHistoric = false;
        }
 
-       SyncRepInitConfig();
+       streamingDoneSending = streamingDoneReceiving = false;
+
+       /* If there is nothing to stream, don't even enter COPY mode */
+       if (!sendTimeLineIsHistoric ||
+               XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
+       {
+               XLogRecPtr FlushPtr;
+               /*
+                * When we first start replication the standby will be behind the primary.
+                * For some applications, for example, synchronous replication, it is
+                * important to have a clear state for this initial catchup mode, so we
+                * can trigger actions when we change streaming state later. We may stay
+                * in this state for a long time, which is exactly why we want to be able
+                * to monitor whether or not we are still here.
+                */
+               WalSndSetState(WALSNDSTATE_CATCHUP);
+
+               /* Send a CopyBothResponse message, and start streaming */
+               pq_beginmessage(&buf, 'W');
+               pq_sendbyte(&buf, 0);
+               pq_sendint(&buf, 0, 2);
+               pq_endmessage(&buf);
+               pq_flush();
+
+               /*
+                * Don't allow a request to stream from a future point in WAL that
+                * hasn't been flushed to disk in this server yet.
+                */
+               if (am_cascading_walsender)
+                       FlushPtr = GetStandbyFlushRecPtr();
+               else
+                       FlushPtr = GetFlushRecPtr();
+               if (XLByteLT(FlushPtr, cmd->startpoint))
+               {
+                       ereport(ERROR,
+                                       (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
+                                                       (uint32) (cmd->startpoint >> 32),
+                                                       (uint32) (cmd->startpoint),
+                                                       (uint32) (FlushPtr >> 32),
+                                                       (uint32) (FlushPtr))));
+               }
+
+               /* Start streaming from the requested point */
+               sentPtr = cmd->startpoint;
 
-       /* Main loop of walsender */
-       WalSndLoop();
+               /* Initialize shared memory status, too */
+               {
+                       /* use volatile pointer to prevent code rearrangement */
+                       volatile WalSnd *walsnd = MyWalSnd;
+
+                       SpinLockAcquire(&walsnd->mutex);
+                       walsnd->sentPtr = sentPtr;
+                       SpinLockRelease(&walsnd->mutex);
+               }
+
+               SyncRepInitConfig();
+
+               /* Main loop of walsender */
+               replication_active = true;
+
+               WalSndLoop();
+
+               replication_active = false;
+               if (walsender_ready_to_stop)
+                       proc_exit(0);
+               WalSndSetState(WALSNDSTATE_STARTUP);
+       }
+
+       /* Get out of COPY mode (CommandComplete). */
+       EndCommand("COPY 0", DestRemote);
 }
 
 /*
@@ -406,10 +606,13 @@ exec_replication_command(const char *cmd_string)
                        SendBaseBackup((BaseBackupCmd *) cmd_node);
                        break;
 
+               case T_TimeLineHistoryCmd:
+                       SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+                       break;
+
                default:
-                       ereport(FATAL,
-                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
-                                        errmsg("invalid standby query string: %s", cmd_string)));
+                       elog(ERROR, "unrecognized replication command node tag: %u",
+                                cmd_node->type);
        }
 
        /* done */
@@ -421,7 +624,8 @@ exec_replication_command(const char *cmd_string)
 }
 
 /*
- * Check if the remote end has closed the connection.
+ * Process any incoming messages while streaming. Also checks if the remote
+ * end has closed the connection.
  */
 static void
 ProcessRepliesIfAny(void)
@@ -430,7 +634,12 @@ ProcessRepliesIfAny(void)
        int                     r;
        bool            received = false;
 
-       for (;;)
+       /*
+        * If we already received a CopyDone from the frontend, any subsequent
+        * message is the beginning of a new command, and should be processed in
+        * the main processing loop.
+        */
+       while (!streamingDoneReceiving)
        {
                r = pq_getbyte_if_available(&firstchar);
                if (r < 0)
@@ -458,6 +667,31 @@ ProcessRepliesIfAny(void)
                                received = true;
                                break;
 
+                               /*
+                                * CopyDone means the standby requested to finish streaming.
+                                * Reply with CopyDone, if we had not sent that already.
+                                */
+                       case 'c':
+                               if (!streamingDoneSending)
+                               {
+                                       pq_putmessage_noblock('c', NULL, 0);
+                                       streamingDoneSending = true;
+                               }
+
+                               /* consume the CopyData message */
+                               resetStringInfo(&reply_message);
+                               if (pq_getmessage(&reply_message, 0))
+                               {
+                                       ereport(COMMERROR,
+                                                       (errcode(ERRCODE_PROTOCOL_VIOLATION),
+                                                        errmsg("unexpected EOF on standby connection")));
+                                       proc_exit(0);
+                               }
+
+                               streamingDoneReceiving = true;
+                               received = true;
+                               break;
+
                                /*
                                 * 'X' means that the standby is closing down the socket.
                                 */
@@ -666,7 +900,10 @@ WalSndLoop(void)
        last_reply_timestamp = GetCurrentTimestamp();
        ping_sent = false;
 
-       /* Loop forever, unless we get an error */
+       /*
+        * Loop until we reach the end of this timeline or the client requests
+        * to stop streaming.
+        */
        for (;;)
        {
                /* Clear any already-pending wakeups */
@@ -692,6 +929,14 @@ WalSndLoop(void)
                /* Check for input from the client */
                ProcessRepliesIfAny();
 
+               /*
+                * If we have received CopyDone from the client, sent CopyDone
+                * ourselves, and the output buffer is empty, it's time to exit
+                * streaming.
+                */
+               if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
+                       break;
+
                /*
                 * If we don't have any pending data in the output buffer, try to send
                 * some more.  If there is some, we don't bother to call XLogSend
@@ -705,7 +950,7 @@ WalSndLoop(void)
 
                /* Try to flush pending output to the client */
                if (pq_flush_if_writable() != 0)
-                       break;
+                       goto send_failure;
 
                /* If nothing remains to be sent right now ... */
                if (caughtup && !pq_is_send_pending())
@@ -739,7 +984,7 @@ WalSndLoop(void)
                                if (caughtup && !pq_is_send_pending())
                                {
                                        /* Inform the standby that XLOG streaming is done */
-                                       pq_puttextmessage('C', "COPY 0");
+                                       EndCommand("COPY 0", DestRemote);
                                        pq_flush();
 
                                        proc_exit(0);
@@ -754,14 +999,16 @@ WalSndLoop(void)
                 * loaded a subset of the available data but then pq_flush_if_writable
                 * flushed it all --- we should immediately try to send more.
                 */
-               if (caughtup || pq_is_send_pending())
+               if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
                {
                        TimestampTz timeout = 0;
                        long            sleeptime = 10000;              /* 10 s */
                        int                     wakeEvents;
 
-                       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
-                               WL_SOCKET_READABLE | WL_TIMEOUT;
+                       wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
+
+                       if (!streamingDoneReceiving)
+                               wakeEvents |= WL_SOCKET_READABLE;
 
                        if (pq_is_send_pending())
                                wakeEvents |= WL_SOCKET_WRITEABLE;
@@ -813,11 +1060,13 @@ WalSndLoop(void)
                                 */
                                ereport(COMMERROR,
                                                (errmsg("terminating walsender process due to replication timeout")));
-                               break;
+                               goto send_failure;
                        }
                }
        }
+       return;
 
+send_failure:
        /*
         * Get here on send failure.  Clean up and exit.
         *
@@ -916,7 +1165,7 @@ WalSndKill(int code, Datum arg)
  * more than one.
  */
 void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
 {
        char       *p;
        XLogRecPtr      recptr;
@@ -937,7 +1186,7 @@ retry:
 
                startoff = recptr % XLogSegSize;
 
-               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+               if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || sendTimeLine != tli)
                {
                        char            path[MAXPGPATH];
 
@@ -945,8 +1194,9 @@ retry:
                        if (sendFile >= 0)
                                close(sendFile);
 
+                       sendTimeLine = tli;
                        XLByteToSeg(recptr, sendSegNo);
-                       XLogFilePath(path, ThisTimeLineID, sendSegNo);
+                       XLogFilePath(path, sendTimeLine, sendSegNo);
 
                        sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
                        if (sendFile < 0)
@@ -960,7 +1210,7 @@ retry:
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
                                                         errmsg("requested WAL segment %s has already been removed",
-                                                                       XLogFileNameP(ThisTimeLineID, sendSegNo))));
+                                                                       XLogFileNameP(sendTimeLine, sendSegNo))));
                                else
                                        ereport(ERROR,
                                                        (errcode_for_file_access(),
@@ -977,7 +1227,7 @@ retry:
                                ereport(ERROR,
                                                (errcode_for_file_access(),
                                                 errmsg("could not seek in log segment %s to offset %u: %m",
-                                                               XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                                               XLogFileNameP(sendTimeLine, sendSegNo),
                                                                startoff)));
                        sendOff = startoff;
                }
@@ -994,7 +1244,7 @@ retry:
                        ereport(ERROR,
                                        (errcode_for_file_access(),
                        errmsg("could not read from log segment %s, offset %u, length %lu: %m",
-                                  XLogFileNameP(ThisTimeLineID, sendSegNo),
+                                  XLogFileNameP(sendTimeLine, sendSegNo),
                                   sendOff, (unsigned long) segbytes)));
                }
 
@@ -1019,7 +1269,7 @@ retry:
                ereport(ERROR,
                                (errcode_for_file_access(),
                                 errmsg("requested WAL segment %s has already been removed",
-                                               XLogFileNameP(ThisTimeLineID, segno))));
+                                               XLogFileNameP(sendTimeLine, segno))));
 
        /*
         * During recovery, the currently-open WAL file might be replaced with the
@@ -1060,10 +1310,17 @@ static void
 XLogSend(bool *caughtup)
 {
        XLogRecPtr      SendRqstPtr;
+       XLogRecPtr      FlushPtr;
        XLogRecPtr      startptr;
        XLogRecPtr      endptr;
        Size            nbytes;
 
+       if (streamingDoneSending)
+       {
+               *caughtup = true;
+               return;
+       }
+
        /*
         * Attempt to send all data that's already been written out and fsync'd to
         * disk.  We cannot go further than what's been written out given the
@@ -1073,32 +1330,103 @@ XLogSend(bool *caughtup)
         * that gets lost on the master.
         */
        if (am_cascading_walsender)
+               FlushPtr = GetStandbyFlushRecPtr();
+       else
+               FlushPtr = GetFlushRecPtr();
+
+       /*
+        * In a cascading standby, the current recovery target timeline can
+        * change, or we can be promoted. In either case, the current timeline
+        * becomes historic. We need to detect that so that we don't try to stream
+        * past the point where we switched to another timeline. It's checked
+        * after calculating FlushPtr, to avoid a race condition: if the timeline
+        * becomes historic just after we checked that it was still current, it
+        * should still be OK to stream it up to the FlushPtr that was calculated
+        * before it became historic.
+        */
+       if (!sendTimeLineIsHistoric && am_cascading_walsender)
        {
-               TimeLineID      currentTargetTLI;
-               SendRqstPtr = GetStandbyFlushRecPtr(&currentTargetTLI);
+               bool            becameHistoric = false;
+               TimeLineID      targetTLI;
 
-               /*
-                * If the recovery target timeline changed, bail out. It's a bit
-                * unfortunate that we have to just disconnect, but there is no way
-                * to tell the client that the timeline changed. We also don't know
-                * exactly where the switch happened, so we cannot safely try to send
-                * up to the switchover point before disconnecting.
-                */
-               if (currentTargetTLI != ThisTimeLineID)
+               if (!RecoveryInProgress())
                {
-                       if (!walsender_ready_to_stop)
-                               ereport(LOG,
-                                               (errmsg("terminating walsender process to force cascaded standby "
-                                                               "to update timeline and reconnect")));
-                       walsender_ready_to_stop = true;
-                       *caughtup = true;
-                       return;
+                       /*
+                        * We have been promoted. RecoveryInProgress() updated
+                        * ThisTimeLineID to the new current timeline.
+                        */
+                       targetTLI = ThisTimeLineID;
+                       am_cascading_walsender = false;
+                       becameHistoric = true;
+               }
+               else
+               {
+                       /*
+                        * Still a cascading standby. But is the timeline we're sending
+                        * still the recovery target timeline?
+                        */
+                       targetTLI = GetRecoveryTargetTLI();
+
+                       if (targetTLI != sendTimeLine)
+                               becameHistoric = true;
+               }
+
+               if (becameHistoric)
+               {
+                       /*
+                        * The timeline we were sending has become historic. Read the
+                        * timeline history file of the new timeline to see where exactly
+                        * we forked off from the timeline we were sending.
+                        */
+                       List       *history;
+
+                       history = readTimeLineHistory(targetTLI);
+                       sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+                       Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
+                       list_free_deep(history);
+
+                       /* the switchpoint should be >= current send pointer */
+                       if (!XLByteLE(sentPtr, sendTimeLineValidUpto))
+                               elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
+                                        sendTimeLine,
+                                        (uint32) (sendTimeLineValidUpto >> 32),
+                                        (uint32) sendTimeLineValidUpto,
+                                        (uint32) (sentPtr >> 32),
+                                        (uint32) sentPtr);
+
+                       sendTimeLineIsHistoric = true;
                }
        }
+
+       /*
+        * If this is a historic timeline and we've reached the point where we
+        * forked to the next timeline, stop streaming.
+        */
+       if (sendTimeLineIsHistoric && XLByteLE(sendTimeLineValidUpto, sentPtr))
+       {
+               /* close the current file. */
+               if (sendFile >= 0)
+                       close(sendFile);
+               sendFile = -1;
+
+               /* Send CopyDone */
+               pq_putmessage_noblock('c', NULL, 0);
+               streamingDoneSending = true;
+
+               *caughtup = true;
+               return;
+       }
+
+       /*
+        * Stream up to the point known to be flushed to disk, or to the end of
+        * this timeline, whichever comes first.
+        */
+       if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr))
+               SendRqstPtr = sendTimeLineValidUpto;
        else
-               SendRqstPtr = GetFlushRecPtr();
+               SendRqstPtr = FlushPtr;
 
-       /* Quick exit if nothing to do */
+       Assert(XLByteLE(sentPtr, SendRqstPtr));
        if (XLByteLE(SendRqstPtr, sentPtr))
        {
                *caughtup = true;
@@ -1124,7 +1452,10 @@ XLogSend(bool *caughtup)
        if (XLByteLE(SendRqstPtr, endptr))
        {
                endptr = SendRqstPtr;
-               *caughtup = true;
+               if (sendTimeLineIsHistoric)
+                       *caughtup = false;
+               else
+                       *caughtup = true;
        }
        else
        {
@@ -1151,7 +1482,7 @@ XLogSend(bool *caughtup)
         * calls.
         */
        enlargeStringInfo(&output_message, nbytes);
-       XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+       XLogRead(&output_message.data[output_message.len], sendTimeLine, startptr, nbytes);
        output_message.len += nbytes;
        output_message.data[output_message.len] = '\0';
 
@@ -1242,6 +1573,14 @@ WalSndLastCycleHandler(SIGNAL_ARGS)
 {
        int                     save_errno = errno;
 
+       /*
+        * If replication has not yet started, die like with SIGTERM. If
+        * replication is active, only set a flag and wake up the main loop. It
+        * will send any outstanding WAL, and then exit gracefully.
+        */
+       if (!replication_active)
+               kill(MyProcPid, SIGTERM);
+
        walsender_ready_to_stop = true;
        if (MyWalSnd)
                SetLatch(&MyWalSnd->latch);
index 785195bd36a9caf740a77ed55f3de60869042ac8..08b75f6d79dcada28ca19d0f952fd19d9922b02e 100644 (file)
@@ -34,6 +34,7 @@ extern bool existsTimeLineHistory(TimeLineID probeTLI);
 extern TimeLineID findNewestTimeLine(TimeLineID startTLI);
 extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
                                         XLogRecPtr switchpoint, char *reason);
+extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
 extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
 extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
 extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
index 32c2e40ac14fe249ce662b6262d4391351ec1d09..c8cd37981c58785bcb3326034fd533051124d820 100644 (file)
@@ -283,8 +283,8 @@ extern bool RecoveryInProgress(void);
 extern bool HotStandbyActive(void);
 extern bool XLogInsertAllowed(void);
 extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
-extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
+extern XLogRecPtr GetXLogReplayRecPtr(void);
+extern XLogRecPtr GetStandbyFlushRecPtr(void);
 extern XLogRecPtr GetXLogInsertRecPtr(void);
 extern XLogRecPtr GetXLogWriteRecPtr(void);
 extern bool RecoveryIsPaused(void);
index 438a1d98630e97bcf0d9f02fdc78e922e02b5a10..5529a8b811cc5e5308e3ae72d4f16662d9ee5192 100644 (file)
@@ -407,6 +407,7 @@ typedef enum NodeTag
        T_IdentifySystemCmd,
        T_BaseBackupCmd,
        T_StartReplicationCmd,
+       T_TimeLineHistoryCmd,
 
        /*
         * TAGS FOR RANDOM OTHER STUFF
index 236a36dd98045b1a7633b7e81451b25a43db78bd..8d686879a9aeaea94fc0cc88677a69b451430e73 100644 (file)
@@ -46,7 +46,19 @@ typedef struct BaseBackupCmd
 typedef struct StartReplicationCmd
 {
        NodeTag         type;
+       TimeLineID      timeline;
        XLogRecPtr      startpoint;
 } StartReplicationCmd;
 
+
+/* ----------------------
+ *             TIMELINE_HISTORY command
+ * ----------------------
+ */
+typedef struct TimeLineHistoryCmd
+{
+       NodeTag         type;
+       TimeLineID      timeline;
+} TimeLineHistoryCmd;
+
 #endif   /* REPLNODES_H */
index 4cf5a27e6d9cbe9728c83df50a8039a70b76db6c..e62a1db63fe8623d6891877687714ccf246c9c4b 100644 (file)
@@ -14,6 +14,7 @@
 
 #include "access/xlog.h"
 #include "access/xlogdefs.h"
+#include "storage/latch.h"
 #include "storage/spin.h"
 #include "pgtime.h"
 
@@ -40,7 +41,9 @@ typedef enum
        WALRCV_STOPPED,                         /* stopped and mustn't start up again */
        WALRCV_STARTING,                        /* launched, but the process hasn't
                                                                 * initialized yet */
-       WALRCV_RUNNING,                         /* walreceiver is running */
+       WALRCV_STREAMING,                       /* walreceiver is streaming */
+       WALRCV_WAITING,                         /* stopped streaming, waiting for orders */
+       WALRCV_RESTARTING,                      /* asked to restart streaming */
        WALRCV_STOPPING                         /* requested to stop, but still running */
 } WalRcvState;
 
@@ -57,19 +60,23 @@ typedef struct
        pg_time_t       startTime;
 
        /*
-        * receiveStart is the first byte position that will be received. When
-        * startup process starts the walreceiver, it sets receiveStart to the
-        * point where it wants the streaming to begin.
+        * receiveStart and receiveStartTLI indicate the first byte position
+        * and timeline that will be received. When startup process starts the
+        * walreceiver, it sets these to the point where it wants the streaming
+        * to begin.
         */
        XLogRecPtr      receiveStart;
+       TimeLineID      receiveStartTLI;
 
        /*
         * receivedUpto-1 is the last byte position that has already been
-        * received.  At the first startup of walreceiver, receivedUpto is set to
-        * receiveStart. After that, walreceiver updates this whenever it flushes
-        * the received WAL to disk.
+        * received, and receivedTLI is the timeline it came from.  At the first
+        * startup of walreceiver, these are set to receiveStart and
+        * receiveStartTLI. After that, walreceiver updates these whenever it
+        * flushes the received WAL to disk.
         */
        XLogRecPtr      receivedUpto;
+       TimeLineID      receivedTLI;
 
        /*
         * latestChunkStart is the starting byte position of the current "batch"
@@ -97,16 +104,34 @@ typedef struct
        char            conninfo[MAXCONNINFO];
 
        slock_t         mutex;                  /* locks shared variables shown above */
+
+       /*
+        * Latch used by startup process to wake up walreceiver after telling it
+        * where to start streaming (after setting receiveStart and
+        * receiveStartTLI).
+        */
+       Latch           latch;
 } WalRcvData;
 
 extern WalRcvData *WalRcv;
 
 /* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef void (*walrcv_connect_type) (char *conninfo);
 extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
 
-typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
-                                                                                                char **buffer, int *len);
+typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
+extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
+
+typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
+extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
+
+typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
+extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
+
+typedef void (*walrcv_endstreaming_type) (void);
+extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
+
+typedef int (*walrcv_receive_type) (int timeout, char **buffer);
 extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
 
 typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
@@ -122,9 +147,10 @@ extern void WalReceiverMain(void) __attribute__((noreturn));
 extern Size WalRcvShmemSize(void);
 extern void WalRcvShmemInit(void);
 extern void ShutdownWalRcv(void);
-extern bool WalRcvInProgress(void);
-extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern bool WalRcvStreaming(void);
+extern bool WalRcvRunning(void);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
+extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
 extern int     GetReplicationApplyDelay(void);
 extern int     GetReplicationTransferLatency(void);
 
index df8e9514780b3f85a21af31541d66eabc59ec781..eabafab33b8219fa2e363c2a04d1d789f4c23b71 100644 (file)
@@ -19,7 +19,6 @@
 /* global state */
 extern bool am_walsender;
 extern bool am_cascading_walsender;
-extern volatile sig_atomic_t walsender_ready_to_stop;
 extern bool wake_wal_senders;
 
 /* user-settable parameters */
index 66234cd8b5925d81b535522983fdc75da1d40c75..5d849d4b0f5dcb50f36dc1254385d5ce09162fe7 100644 (file)
@@ -95,7 +95,7 @@ extern WalSndCtlData *WalSndCtl;
 
 
 extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count);
 
 /*
  * Internal functions for parsing the replication grammar, in repl_gram.y and
index 77124efe779656f49f8bb028378d351bfcb0e8a7..4bc5e647153ba55431456cd314b1fc9baa7ddfe6 100644 (file)
@@ -2245,7 +2245,8 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
 {
        if (!conn)
                return -1;
-       if (conn->asyncStatus != PGASYNC_COPY_IN)
+       if (conn->asyncStatus != PGASYNC_COPY_IN &&
+               conn->asyncStatus != PGASYNC_COPY_BOTH)
        {
                printfPQExpBuffer(&conn->errorMessage,
                                                  libpq_gettext("no COPY in progress\n"));
@@ -2305,7 +2306,10 @@ PQputCopyEnd(PGconn *conn, const char *errormsg)
        }
 
        /* Return to active duty */
-       conn->asyncStatus = PGASYNC_BUSY;
+       if (conn->asyncStatus == PGASYNC_COPY_BOTH)
+               conn->asyncStatus = PGASYNC_COPY_OUT;
+       else
+               conn->asyncStatus = PGASYNC_BUSY;
        resetPQExpBuffer(&conn->errorMessage);
 
        /* Try to flush data */
index c605bcd734c9fd76d15c893db6d66f727bb6d876..e8f3f337b1a2c6e4be1f2afecbadda383c105d71 100644 (file)
@@ -1484,7 +1484,12 @@ pqGetCopyData3(PGconn *conn, char **buffer, int async)
                         * expect the state was already changed.
                         */
                        if (msgLength == -1)
-                               conn->asyncStatus = PGASYNC_BUSY;
+                       {
+                               if (conn->asyncStatus == PGASYNC_COPY_BOTH)
+                                       conn->asyncStatus = PGASYNC_COPY_IN;
+                               else
+                                       conn->asyncStatus = PGASYNC_BUSY;
+                       }
                        return msgLength;       /* end-of-copy or error */
                }
                if (msgLength == 0)