</para>
<para>
- Promoting a cascading standby terminates the immediate downstream replication
- connections which it serves. This is because the timeline becomes different
- between standbys, and they can no longer continue replication. The
- affected standby(s) may reconnect to reestablish streaming replication.
+ If an upstream standby server is promoted to become new master, downstream
+ servers will continue to stream from the new master if
+ <varname>recovery_target_timeline</> is set to <literal>'latest'</>.
</para>
<para>
</para>
<para>
- There is another Copy-related mode called Copy-both, which allows
+ There is another Copy-related mode called copy-both, which allows
high-speed bulk data transfer to <emphasis>and</> from the server.
Copy-both mode is initiated when a backend in walsender mode
executes a <command>START_REPLICATION</command> statement. The
backend sends a CopyBothResponse message to the frontend. Both
the backend and the frontend may then send CopyData messages
- until the connection is terminated. See <xref
- linkend="protocol-replication">.
+ until either end sends a CopyDone message. After the client
+ sends a CopyDone message, the connection goes from copy-both mode to
+ copy-out mode, and the client may not send any more CopyData messages.
+ Similarly, when the server sends a CopyDone message, the connection
+ goes into copy-in mode, and the server may not send any more CopyData
+ messages. After both sides have sent a CopyDone message, the copy mode
+ is terminated, and the backend reverts to the command-processing mode.
+ See <xref linkend="protocol-replication"> for more information on the
+ subprotocol transmitted over copy-both mode.
</para>
<para>
</varlistentry>
<varlistentry>
- <term>START_REPLICATION <replaceable>XXX</>/<replaceable>XXX</></term>
+ <term>TIMELINE_HISTORY <replaceable class="parameter">tli</replaceable></term>
+ <listitem>
+ <para>
+ Requests the server to send over the timeline history file for timeline
+ <replaceable class="parameter">tli</replaceable>. Server replies with a
+ result set of a single row, containing two fields:
+ </para>
+
+ <para>
+ <variablelist>
+ <varlistentry>
+ <term>
+ filename
+ </term>
+ <listitem>
+ <para>
+ Filename of the timeline history file, e.g 00000002.history.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>
+ content
+ </term>
+ <listitem>
+ <para>
+ Contents of the timeline history file.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </para>
+ </listitem>
+ </varlistentry>
+
+ <varlistentry>
+ <term>START_REPLICATION <replaceable class="parameter">XXX/XXX</> TIMELINE <replaceable class="parameter">tli</></term>
<listitem>
<para>
Instructs server to start streaming WAL, starting at
- WAL position <replaceable>XXX</>/<replaceable>XXX</>.
+ WAL position <replaceable class="parameter">XXX/XXX</> on timeline
+ <replaceable class="parameter">tli</>.
The server can reply with an error, e.g. if the requested section of WAL
has already been recycled. On success, server responds with a
CopyBothResponse message, and then starts to stream WAL to the frontend.
- WAL will continue to be streamed until the connection is broken;
- no further commands will be accepted. If the WAL sender process is
- terminated normally (during postmaster shutdown), it will send a
- CommandComplete message before exiting. This might not happen during an
- abnormal shutdown, of course.
+ </para>
+
+ <para>
+ If the client requests a timeline that's not the latest, but is part of
+ the history of the server, the server will stream all the WAL on that
+ timeline starting from the requested startpoint, up to the point where
+ the server switched to another timeline. If the client requests
+ streaming at exactly the end of an old timeline, the server responds
+ immediately with CommandComplete without entering COPY mode.
+ </para>
+
+ <para>
+ After streaming all the WAL on a timeline that is not the latest one,
+ the server will end streaming by exiting the COPY mode. When the client
+ acknowledges this by also exiting COPY mode, the server responds with a
+ CommandComplete message, and is ready to accept a new command.
</para>
<para>
XLogArchiveNotify(histfname);
}
+/*
+ * Writes a history file for given timeline and contents.
+ *
+ * Currently this is only used in the walreceiver process, and so there are
+ * no locking considerations. But we should be just as tense as XLogFileInit
+ * to avoid emplacing a bogus file.
+ */
+void
+writeTimeLineHistoryFile(TimeLineID tli, char *content, int size)
+{
+ char path[MAXPGPATH];
+ char tmppath[MAXPGPATH];
+ int fd;
+
+ /*
+ * Write into a temp file name.
+ */
+ snprintf(tmppath, MAXPGPATH, XLOGDIR "/xlogtemp.%d", (int) getpid());
+
+ unlink(tmppath);
+
+ /* do not use get_sync_bit() here --- want to fsync only at end of fill */
+ fd = OpenTransientFile(tmppath, O_RDWR | O_CREAT | O_EXCL,
+ S_IRUSR | S_IWUSR);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not create file \"%s\": %m", tmppath)));
+
+ errno = 0;
+ if ((int) write(fd, content, size) != size)
+ {
+ int save_errno = errno;
+
+ /*
+ * If we fail to make the file, delete it to release disk space
+ */
+ unlink(tmppath);
+ /* if write didn't set errno, assume problem is no disk space */
+ errno = save_errno ? save_errno : ENOSPC;
+
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not write to file \"%s\": %m", tmppath)));
+ }
+
+ if (pg_fsync(fd) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not fsync file \"%s\": %m", tmppath)));
+
+ if (CloseTransientFile(fd))
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not close file \"%s\": %m", tmppath)));
+
+
+ /*
+ * Now move the completed history file into place with its final name.
+ */
+ TLHistoryFilePath(path, tli);
+
+ /*
+ * Prefer link() to rename() here just to be really sure that we don't
+ * overwrite an existing logfile. However, there shouldn't be one, so
+ * rename() is an acceptable substitute except for the truly paranoid.
+ */
+#if HAVE_WORKING_LINK
+ if (link(tmppath, path) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not link file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+ unlink(tmppath);
+#else
+ if (rename(tmppath, path) < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not rename file \"%s\" to \"%s\": %m",
+ tmppath, path)));
+#endif
+}
+
/*
* Returns true if 'expectedTLEs' contains a timeline with id 'tli'
*/
/* Local copy of WalRcv->receivedUpto */
static XLogRecPtr receivedUpto = 0;
+static TimeLineID receiveTLI = 0;
/*
* During recovery, lastFullPageWrites keeps track of full_page_writes that
xlogctl->SharedRecoveryInProgress = false;
SpinLockRelease(&xlogctl->info_lck);
}
+
+ /*
+ * If there were cascading standby servers connected to us, nudge any
+ * wal sender processes to notice that we've been promoted.
+ */
+ WalSndWakeup();
}
/*
XLogRecPtr endptr;
/* Get the current (or recent) end of xlog */
- endptr = GetStandbyFlushRecPtr(NULL);
+ endptr = GetStandbyFlushRecPtr();
KeepLogSeg(endptr, &_logSegNo);
_logSegNo--;
/*
* Get latest redo apply position.
*
- * Optionally, returns the current recovery target timeline. Callers not
- * interested in that may pass NULL for targetTLI.
- *
* Exported to allow WALReceiver to read the pointer directly.
*/
XLogRecPtr
-GetXLogReplayRecPtr(TimeLineID *targetTLI)
+GetXLogReplayRecPtr(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile XLogCtlData *xlogctl = XLogCtl;
SpinLockAcquire(&xlogctl->info_lck);
recptr = xlogctl->lastReplayedEndRecPtr;
- if (targetTLI)
- *targetTLI = xlogctl->RecoveryTargetTLI;
SpinLockRelease(&xlogctl->info_lck);
return recptr;
/*
* Get current standby flush position, ie, the last WAL position
* known to be fsync'd to disk in standby.
- *
- * If 'targetTLI' is not NULL, it's set to the current recovery target
- * timeline.
*/
XLogRecPtr
-GetStandbyFlushRecPtr(TimeLineID *targetTLI)
+GetStandbyFlushRecPtr(void)
{
XLogRecPtr receivePtr;
XLogRecPtr replayPtr;
- receivePtr = GetWalRcvWriteRecPtr(NULL);
- replayPtr = GetXLogReplayRecPtr(targetTLI);
+ receivePtr = GetWalRcvWriteRecPtr(NULL, NULL);
+ replayPtr = GetXLogReplayRecPtr();
if (XLByteLT(receivePtr, replayPtr))
return replayPtr;
* archive and pg_xlog before failover.
*/
if (CheckForStandbyTrigger())
+ {
+ ShutdownWalRcv();
return false;
+ }
/*
* If primary_conninfo is set, launch walreceiver to try to
if (PrimaryConnInfo)
{
XLogRecPtr ptr = fetching_ckpt ? RedoStartLSN : RecPtr;
-
- RequestXLogStreaming(ptr, PrimaryConnInfo);
+ TimeLineID tli = tliOfPointInHistory(ptr, expectedTLEs);
+
+ if (curFileTLI > 0 && tli < curFileTLI)
+ elog(ERROR, "according to history file, WAL location %X/%X belongs to timeline %u, but previous recovered WAL file came from timeline %u",
+ (uint32) (ptr >> 32), (uint32) ptr,
+ tli, curFileTLI);
+ curFileTLI = tli;
+ RequestXLogStreaming(curFileTLI, ptr, PrimaryConnInfo);
}
/*
* Move to XLOG_FROM_STREAM state in either case. We'll get
*/
/*
* Before we leave XLOG_FROM_STREAM state, make sure that
- * walreceiver is not running, so that it won't overwrite
- * any WAL that we restore from archive.
+ * walreceiver is not active, so that it won't overwrite
+ * WAL that we restore from archive.
*/
- if (WalRcvInProgress())
+ if (WalRcvStreaming())
ShutdownWalRcv();
/*
/*
* Check if WAL receiver is still active.
*/
- if (!WalRcvInProgress())
+ if (!WalRcvStreaming())
{
lastSourceFailed = true;
break;
{
XLogRecPtr latestChunkStart;
- receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart);
- if (XLByteLT(RecPtr, receivedUpto))
+ receivedUpto = GetWalRcvWriteRecPtr(&latestChunkStart, &receiveTLI);
+ if (XLByteLT(RecPtr, receivedUpto) && receiveTLI == curFileTLI)
{
havedata = true;
if (!XLByteLT(RecPtr, latestChunkStart))
/*
* Check to see whether the user-specified trigger file exists and whether a
- * promote request has arrived. If either condition holds, request postmaster
- * to shut down walreceiver, wait for it to exit, and return true.
+ * promote request has arrived. If either condition holds, return true.
*/
static bool
CheckForStandbyTrigger(void)
{
ereport(LOG,
(errmsg("received promote request")));
- ShutdownWalRcv();
ResetPromoteTriggered();
triggered = true;
return true;
{
ereport(LOG,
(errmsg("trigger file found: %s", TriggerFile)));
- ShutdownWalRcv();
unlink(TriggerFile);
triggered = true;
return true;
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetWalRcvWriteRecPtr(NULL);
+ recptr = GetWalRcvWriteRecPtr(NULL, NULL);
if (recptr == 0)
PG_RETURN_NULL();
XLogRecPtr recptr;
char location[MAXFNAMELEN];
- recptr = GetXLogReplayRecPtr(NULL);
+ recptr = GetXLogReplayRecPtr();
if (recptr == 0)
PG_RETURN_NULL();
ReachedNormalRunning = true;
pmState = PM_RUN;
- /*
- * Kill any walsenders to force the downstream standby(s) to
- * reread the timeline history file, adjust their timelines and
- * establish replication connections again. This is required
- * because the timeline of cascading standby is not consistent
- * with that of cascaded one just after failover. We LOG this
- * message since we need to leave a record to explain this
- * disconnection.
- *
- * XXX should avoid the need for disconnection. When we do,
- * am_cascading_walsender should be replaced with
- * RecoveryInProgress()
- */
- if (max_wal_senders > 0 && CountChildren(BACKEND_TYPE_WALSND) > 0)
- {
- ereport(LOG,
- (errmsg("terminating all walsender processes to force cascaded "
- "standby(s) to update timeline and reconnect")));
- SignalSomeChildren(SIGUSR2, BACKEND_TYPE_WALSND);
- }
-
/*
* Crank up the background tasks, if we didn't do that already
* when we entered consistent recovery state. It doesn't matter
* The Startup process initialises the server and performs any recovery
* actions that have been specified. Notice that there is no "main loop"
* since the Startup process ends as soon as initialisation is complete.
+ * (in standby mode, one can think of the replay loop as a main loop,
+ * though.)
*
*
* Portions Copyright (c) 1996-2012, PostgreSQL Global Development Group
static void parse_basebackup_options(List *options, basebackup_options *opt);
static void SendXlogRecPtrResult(XLogRecPtr ptr);
+/* Was the backup currently in-progress initiated in recovery mode? */
+static bool backup_started_in_recovery = false;
+
/*
* Size of each block sent into the tar stream for larger files.
*
XLogRecPtr endptr;
char *labelfile;
+ backup_started_in_recovery = RecoveryInProgress();
+
startptr = do_pg_start_backup(opt->label, opt->fastcheckpoint, &labelfile);
SendXlogRecPtrResult(startptr);
* http://lists.apple.com/archives/xcode-users/2003/Dec//msg000
* 51.html
*/
- XLogRead(buf, ptr, TAR_SEND_SIZE);
+ XLogRead(buf, ThisTimeLineID, ptr, TAR_SEND_SIZE);
if (pq_putmessage('d', buf, TAR_SEND_SIZE))
ereport(ERROR,
(errmsg("base backup could not send data, aborting backup")));
/*
* Check if the postmaster has signaled us to exit, and abort with an
* error in that case. The error handler further up will call
- * do_pg_abort_backup() for us.
+ * do_pg_abort_backup() for us. Also check that if the backup was
+ * started while still in recovery, the server wasn't promoted.
+ * dp_pg_stop_backup() will check that too, but it's better to stop
+ * the backup early than continue to the end and fail there.
*/
- if (ProcDiePending || walsender_ready_to_stop)
+ CHECK_FOR_INTERRUPTS();
+ if (RecoveryInProgress() != backup_started_in_recovery)
ereport(ERROR,
- (errmsg("shutdown requested, aborting active base backup")));
+ (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+ errmsg("the standby was promoted during online backup"),
+ errhint("This means that the backup being taken is corrupt "
+ "and should not be used. "
+ "Try taking another online backup.")));
snprintf(pathbuf, MAXPGPATH, "%s/%s", path, de->d_name);
static char *recvBuf = NULL;
/* Prototypes for interface functions */
-static bool libpqrcv_connect(char *conninfo, XLogRecPtr startpoint);
-static bool libpqrcv_receive(int timeout, unsigned char *type,
- char **buffer, int *len);
+static void libpqrcv_connect(char *conninfo);
+static void libpqrcv_identify_system(TimeLineID *primary_tli);
+static void libpqrcv_readtimelinehistoryfile(TimeLineID tli, char **filename, char **content, int *len);
+static bool libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint);
+static void libpqrcv_endstreaming(void);
+static int libpqrcv_receive(int timeout, char **buffer);
static void libpqrcv_send(const char *buffer, int nbytes);
static void libpqrcv_disconnect(void);
_PG_init(void)
{
/* Tell walreceiver how to reach us */
- if (walrcv_connect != NULL || walrcv_receive != NULL ||
- walrcv_send != NULL || walrcv_disconnect != NULL)
+ if (walrcv_connect != NULL || walrcv_identify_system != NULL ||
+ walrcv_readtimelinehistoryfile != NULL ||
+ walrcv_startstreaming != NULL || walrcv_endstreaming != NULL ||
+ walrcv_receive != NULL || walrcv_send != NULL ||
+ walrcv_disconnect != NULL)
elog(ERROR, "libpqwalreceiver already loaded");
walrcv_connect = libpqrcv_connect;
+ walrcv_identify_system = libpqrcv_identify_system;
+ walrcv_readtimelinehistoryfile = libpqrcv_readtimelinehistoryfile;
+ walrcv_startstreaming = libpqrcv_startstreaming;
+ walrcv_endstreaming = libpqrcv_endstreaming;
walrcv_receive = libpqrcv_receive;
walrcv_send = libpqrcv_send;
walrcv_disconnect = libpqrcv_disconnect;
/*
* Establish the connection to the primary server for XLOG streaming
*/
-static bool
-libpqrcv_connect(char *conninfo, XLogRecPtr startpoint)
+static void
+libpqrcv_connect(char *conninfo)
{
char conninfo_repl[MAXCONNINFO + 75];
- char *primary_sysid;
- char standby_sysid[32];
- TimeLineID primary_tli;
- TimeLineID standby_tli;
- PGresult *res;
- char cmd[64];
/*
* Connect using deliberately undocumented parameter: replication. The
ereport(ERROR,
(errmsg("could not connect to the primary server: %s",
PQerrorMessage(streamConn))));
+}
+
+/*
+ * Check that primary's system identifier matches ours, and fetch the current
+ * timeline ID of the primary.
+ */
+static void
+libpqrcv_identify_system(TimeLineID *primary_tli)
+{
+ PGresult *res;
+ char *primary_sysid;
+ char standby_sysid[32];
/*
* Get the system identifier and timeline ID as a DataRow message from the
ntuples, nfields)));
}
primary_sysid = PQgetvalue(res, 0, 0);
- primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
+ *primary_tli = pg_atoi(PQgetvalue(res, 0, 1), 4, 0);
/*
* Confirm that the system identifier of the primary is the same as ours.
errdetail("The primary's identifier is %s, the standby's identifier is %s.",
primary_sysid, standby_sysid)));
}
-
- /*
- * Confirm that the current timeline of the primary is the same as the
- * recovery target timeline.
- */
- standby_tli = GetRecoveryTargetTLI();
PQclear(res);
- if (primary_tli != standby_tli)
- ereport(ERROR,
- (errmsg("timeline %u of the primary does not match recovery target timeline %u",
- primary_tli, standby_tli)));
- ThisTimeLineID = primary_tli;
+}
+
+/*
+ * Start streaming WAL data from given startpoint and timeline.
+ *
+ * Returns true if we switched successfully to copy-both mode. False
+ * means the server received the command and executed it successfully, but
+ * didn't switch to copy-mode. That means that there was no WAL on the
+ * requested timeline and starting point, because the server switched to
+ * another timeline at or before the requested starting point. On failure,
+ * throws an ERROR.
+ */
+static bool
+libpqrcv_startstreaming(TimeLineID tli, XLogRecPtr startpoint)
+{
+ char cmd[64];
+ PGresult *res;
/* Start streaming from the point requested by startup process */
- snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X",
- (uint32) (startpoint >> 32), (uint32) startpoint);
+ snprintf(cmd, sizeof(cmd), "START_REPLICATION %X/%X TIMELINE %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ tli);
res = libpqrcv_PQexec(cmd);
- if (PQresultStatus(res) != PGRES_COPY_BOTH)
+
+ if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ {
+ PQclear(res);
+ return false;
+ }
+ else if (PQresultStatus(res) != PGRES_COPY_BOTH)
{
PQclear(res);
ereport(ERROR,
PQerrorMessage(streamConn))));
}
PQclear(res);
+ return true;
+}
+
+/*
+ * Stop streaming WAL data.
+ */
+static void
+libpqrcv_endstreaming(void)
+{
+ PGresult *res;
+
+ if (PQputCopyEnd(streamConn, NULL) <= 0 || PQflush(streamConn))
+ ereport(ERROR,
+ (errmsg("could not send end-of-streaming message to primary: %s",
+ PQerrorMessage(streamConn))));
- ereport(LOG,
- (errmsg("streaming replication successfully connected to primary")));
+ /* Read the command result after COPY is finished */
- return true;
+ while ((res = PQgetResult(streamConn)) != NULL)
+ {
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ ereport(ERROR,
+ (errmsg("error reading result of streaming command: %s",
+ PQerrorMessage(streamConn))));
+ /*
+ * If we had not yet received CopyDone from the backend, PGRES_COPY_IN
+ * is also possible. However, at the moment this function is only
+ * called after receiving CopyDone from the backend - the walreceiver
+ * never terminates replication on its own initiative.
+ */
+
+ PQclear(res);
+ }
+}
+
+/*
+ * Fetch the timeline history file for 'tli' from primary.
+ */
+static void
+libpqrcv_readtimelinehistoryfile(TimeLineID tli,
+ char **filename, char **content, int *len)
+{
+ PGresult *res;
+ char cmd[64];
+
+ /*
+ * Request the primary to send over the history file for given timeline.
+ */
+ snprintf(cmd, sizeof(cmd), "TIMELINE_HISTORY %u", tli);
+ res = libpqrcv_PQexec(cmd);
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ {
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("could not receive timeline history file from "
+ "the primary server: %s",
+ PQerrorMessage(streamConn))));
+ }
+ if (PQnfields(res) != 2 || PQntuples(res) != 1)
+ {
+ int ntuples = PQntuples(res);
+ int nfields = PQnfields(res);
+
+ PQclear(res);
+ ereport(ERROR,
+ (errmsg("invalid response from primary server"),
+ errdetail("Expected 1 tuple with 2 fields, got %d tuples with %d fields.",
+ ntuples, nfields)));
+ }
+ *filename = pstrdup(PQgetvalue(res, 0, 0));
+
+ *len = PQgetlength(res, 0, 1);
+ *content = palloc(*len);
+ memcpy(*content, PQgetvalue(res, 0, 1), *len);
+ PQclear(res);
}
/*
*
* Returns:
*
- * True if data was received. *type, *buffer and *len are set to
- * the type of the received data, buffer holding it, and length,
- * respectively.
+ * If data was received, returns the length of the data. *buffer is set to
+ * point to a buffer holding the received message. The buffer is only valid
+ * until the next libpqrcv_* call.
*
- * False if no data was available within timeout, or wait was interrupted
+ * 0 if no data was available within timeout, or wait was interrupted
* by signal.
*
- * The buffer returned is only valid until the next call of this function or
- * libpq_connect/disconnect.
+ * -1 if the server ended the COPY.
*
* ereports on error.
*/
-static bool
-libpqrcv_receive(int timeout, unsigned char *type, char **buffer, int *len)
+static int
+libpqrcv_receive(int timeout, char **buffer)
{
int rawlen;
if (timeout > 0)
{
if (!libpq_select(timeout))
- return false;
+ return 0;
}
if (PQconsumeInput(streamConn) == 0)
/* Now that we've consumed some input, try again */
rawlen = PQgetCopyData(streamConn, &recvBuf, 1);
if (rawlen == 0)
- return false;
+ return 0;
}
if (rawlen == -1) /* end-of-streaming or error */
{
PGresult *res;
res = PQgetResult(streamConn);
- if (PQresultStatus(res) == PGRES_COMMAND_OK)
+ if (PQresultStatus(res) == PGRES_COMMAND_OK ||
+ PQresultStatus(res) == PGRES_COPY_IN)
+ {
+ PQclear(res);
+ return -1;
+ }
+ else
{
PQclear(res);
ereport(ERROR,
- (errmsg("replication terminated by primary server")));
+ (errmsg("could not receive data from WAL stream: %s",
+ PQerrorMessage(streamConn))));
}
- PQclear(res);
- ereport(ERROR,
- (errmsg("could not receive data from WAL stream: %s",
- PQerrorMessage(streamConn))));
}
if (rawlen < -1)
ereport(ERROR,
PQerrorMessage(streamConn))));
/* Return received messages to caller */
- *type = *((unsigned char *) recvBuf);
- *buffer = recvBuf + sizeof(*type);
- *len = rawlen - sizeof(*type);
-
- return true;
+ *buffer = recvBuf;
+ return rawlen;
}
/*
%union {
char *str;
bool boolval;
+ int32 intval;
XLogRecPtr recptr;
Node *node;
/* Non-keyword tokens */
%token <str> SCONST
+%token <intval> ICONST
%token <recptr> RECPTR
/* Keyword tokens. */
%token K_BASE_BACKUP
%token K_IDENTIFY_SYSTEM
+%token K_START_REPLICATION
+%token K_TIMELINE_HISTORY
%token K_LABEL
%token K_PROGRESS
%token K_FAST
%token K_NOWAIT
%token K_WAL
-%token K_START_REPLICATION
+%token K_TIMELINE
%type <node> command
-%type <node> base_backup start_replication identify_system
+%type <node> base_backup start_replication identify_system timeline_history
%type <list> base_backup_opt_list
%type <defelt> base_backup_opt
+%type <intval> opt_timeline
%%
firstcmd: command opt_semicolon
identify_system
| base_backup
| start_replication
+ | timeline_history
;
/*
;
/*
- * START_REPLICATION %X/%X
+ * START_REPLICATION %X/%X [TIMELINE %d]
*/
start_replication:
- K_START_REPLICATION RECPTR
+ K_START_REPLICATION RECPTR opt_timeline
{
StartReplicationCmd *cmd;
cmd = makeNode(StartReplicationCmd);
cmd->startpoint = $2;
+ cmd->timeline = $3;
+
+ $$ = (Node *) cmd;
+ }
+ ;
+
+opt_timeline:
+ K_TIMELINE ICONST
+ {
+ if ($2 <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ (errmsg("invalid timeline %d", $2))));
+ $$ = $2;
+ }
+ | /* nothing */ { $$ = 0; }
+ ;
+
+/*
+ * TIMELINE_HISTORY %d
+ */
+timeline_history:
+ K_TIMELINE_HISTORY ICONST
+ {
+ TimeLineHistoryCmd *cmd;
+
+ if ($2 <= 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_SYNTAX_ERROR),
+ (errmsg("invalid timeline %d", $2))));
+
+ cmd = makeNode(TimeLineHistoryCmd);
+ cmd->timeline = $2;
$$ = (Node *) cmd;
}
*/
#include "postgres.h"
+#include "utils/builtins.h"
+
/* Avoid exit() on fatal scanner errors (a bit ugly -- see yy_fatal_error) */
#undef fprintf
#define fprintf(file, fmt, msg) ereport(ERROR, (errmsg_internal("%s", msg)))
xqdouble {quote}{quote}
xqinside [^']+
+digit [0-9]+
hexdigit [0-9A-Za-z]+
quote '
NOWAIT { return K_NOWAIT; }
PROGRESS { return K_PROGRESS; }
WAL { return K_WAL; }
+TIMELINE { return K_TIMELINE; }
START_REPLICATION { return K_START_REPLICATION; }
+TIMELINE_HISTORY { return K_TIMELINE_HISTORY; }
"," { return ','; }
";" { return ';'; }
[\t] ;
" " ;
+{digit}+ {
+ yylval.intval = pg_atoi(yytext, sizeof(int32), 0);
+ return ICONST;
+ }
+
{hexdigit}+\/{hexdigit}+ {
uint32 hi,
lo;
* WalRcv->receivedUpto variable in shared memory, to inform the startup
* process of how far it can proceed with XLOG replay.
*
+ * If the primary server ends streaming, but doesn't disconnect, walreceiver
+ * goes into "waiting" mode, and waits for the startup process to give new
+ * instructions. The startup process will treat that the same as
+ * disconnection, and will rescan the archive/pg_xlog directory. But when the
+ * startup process wants to try streaming replication again, it will just
+ * nudge the existing walreceiver process that's waiting, instead of launching
+ * a new one.
+ *
* Normal termination is by SIGTERM, which instructs the walreceiver to
* exit(0). Emergency termination is by SIGQUIT; like any postmaster child
* process, the walreceiver will simply abort and exit on SIGQUIT. A close
#include <signal.h>
#include <unistd.h>
+#include "access/timeline.h"
#include "access/xlog_internal.h"
#include "libpq/pqformat.h"
#include "libpq/pqsignal.h"
/* libpqreceiver hooks to these when loaded */
walrcv_connect_type walrcv_connect = NULL;
+walrcv_identify_system_type walrcv_identify_system = NULL;
+walrcv_startstreaming_type walrcv_startstreaming = NULL;
+walrcv_endstreaming_type walrcv_endstreaming = NULL;
+walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile = NULL;
walrcv_receive_type walrcv_receive = NULL;
walrcv_send_type walrcv_send = NULL;
walrcv_disconnect_type walrcv_disconnect = NULL;
static void ProcessWalRcvInterrupts(void);
static void EnableWalRcvImmediateExit(void);
static void DisableWalRcvImmediateExit(void);
+static void WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last);
+static void WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI);
static void WalRcvDie(int code, Datum arg);
static void XLogWalRcvProcessMsg(unsigned char type, char *buf, Size len);
static void XLogWalRcvWrite(char *buf, Size nbytes, XLogRecPtr recptr);
/* Signal handlers */
static void WalRcvSigHupHandler(SIGNAL_ARGS);
+static void WalRcvSigUsr1Handler(SIGNAL_ARGS);
static void WalRcvShutdownHandler(SIGNAL_ARGS);
static void WalRcvQuickDieHandler(SIGNAL_ARGS);
{
char conninfo[MAXCONNINFO];
XLogRecPtr startpoint;
+ TimeLineID startpointTLI;
+ TimeLineID primaryTLI;
+ bool first_stream;
+
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
TimestampTz last_recv_timestamp;
/* The usual case */
break;
- case WALRCV_RUNNING:
+ case WALRCV_WAITING:
+ case WALRCV_STREAMING:
+ case WALRCV_RESTARTING:
+ default:
/* Shouldn't happen */
elog(PANIC, "walreceiver still running according to shared memory state");
}
/* Advertise our PID so that the startup process can kill us */
walrcv->pid = MyProcPid;
- walrcv->walRcvState = WALRCV_RUNNING;
+ walrcv->walRcvState = WALRCV_STREAMING;
/* Fetch information required to start streaming */
strlcpy(conninfo, (char *) walrcv->conninfo, MAXCONNINFO);
startpoint = walrcv->receiveStart;
+ startpointTLI = walrcv->receiveStartTLI;
/* Initialise to a sanish value */
walrcv->lastMsgSendTime = walrcv->lastMsgReceiptTime = walrcv->latestWalEndTime = GetCurrentTimestamp();
/* Arrange to clean up at walreceiver exit */
on_shmem_exit(WalRcvDie, 0);
+ OwnLatch(&walrcv->latch);
+
/*
* If possible, make this process a group leader, so that the postmaster
* can signal any child processes too. (walreceiver probably never has
pqsignal(SIGQUIT, WalRcvQuickDieHandler); /* hard crash time */
pqsignal(SIGALRM, SIG_IGN);
pqsignal(SIGPIPE, SIG_IGN);
- pqsignal(SIGUSR1, SIG_IGN);
+ pqsignal(SIGUSR1, WalRcvSigUsr1Handler);
pqsignal(SIGUSR2, SIG_IGN);
/* Reset some signals that are accepted by postmaster but not here */
/* Load the libpq-specific functions */
load_file("libpqwalreceiver", false);
- if (walrcv_connect == NULL || walrcv_receive == NULL ||
- walrcv_send == NULL || walrcv_disconnect == NULL)
+ if (walrcv_connect == NULL || walrcv_startstreaming == NULL ||
+ walrcv_endstreaming == NULL ||
+ walrcv_identify_system == NULL ||
+ walrcv_readtimelinehistoryfile == NULL ||
+ walrcv_receive == NULL || walrcv_send == NULL ||
+ walrcv_disconnect == NULL)
elog(ERROR, "libpqwalreceiver didn't initialize correctly");
/*
/* Establish the connection to the primary for XLOG streaming */
EnableWalRcvImmediateExit();
- walrcv_connect(conninfo, startpoint);
+ walrcv_connect(conninfo);
DisableWalRcvImmediateExit();
- /* Initialize LogstreamResult and buffers for processing messages */
- LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr(NULL);
- initStringInfo(&reply_message);
- initStringInfo(&incoming_message);
-
- /* Initialize the last recv timestamp */
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
-
- /* Loop until end-of-streaming or error */
+ first_stream = true;
for (;;)
{
- unsigned char type;
- char *buf;
- int len;
-
/*
- * Emergency bailout if postmaster has died. This is to avoid the
- * necessity for manual cleanup of all postmaster children.
+ * Check that we're connected to a valid server using the
+ * IDENTIFY_SYSTEM replication command,
*/
- if (!PostmasterIsAlive())
- exit(1);
+ EnableWalRcvImmediateExit();
+ walrcv_identify_system(&primaryTLI);
+ DisableWalRcvImmediateExit();
/*
- * Exit walreceiver if we're not in recovery. This should not happen,
- * but cross-check the status here.
+ * Confirm that the current timeline of the primary is the same or
+ * ahead of ours.
*/
- if (!RecoveryInProgress())
- ereport(FATAL,
- (errmsg("cannot continue WAL streaming, recovery has already ended")));
-
- /* Process any requests or signals received recently */
- ProcessWalRcvInterrupts();
+ if (primaryTLI < startpointTLI)
+ ereport(ERROR,
+ (errmsg("highest timeline %u of the primary is behind recovery timeline %u",
+ primaryTLI, startpointTLI)));
- if (got_SIGHUP)
- {
- got_SIGHUP = false;
- ProcessConfigFile(PGC_SIGHUP);
- }
+ /*
+ * Get any missing history files. We do this always, even when we're
+ * not interested in that timeline, so that if we're promoted to become
+ * the master later on, we don't select the same timeline that was
+ * already used in the current master. This isn't bullet-proof - you'll
+ * need some external software to manage your cluster if you need to
+ * ensure that a unique timeline id is chosen in every case, but let's
+ * avoid the confusion of timeline id collisions where we can.
+ */
+ WalRcvFetchTimeLineHistoryFiles(startpointTLI + 1, primaryTLI);
- /* Wait a while for data to arrive */
- if (walrcv_receive(NAPTIME_PER_CYCLE, &type, &buf, &len))
+ /*
+ * Start streaming.
+ *
+ * We'll try to start at the requested starting point and timeline,
+ * even if it's different from the server's latest timeline. In case
+ * we've already reached the end of the old timeline, the server will
+ * finish the streaming immediately, and we will go back to await
+ * orders from the startup process. If recovery_target_timeline is
+ * 'latest', the startup process will scan pg_xlog and find the new
+ * history file, bump recovery target timeline, and ask us to restart
+ * on the new timeline.
+ */
+ ThisTimeLineID = startpointTLI;
+ if (walrcv_startstreaming(startpointTLI, startpoint))
{
- /* Something was received from master, so reset timeout */
+ bool endofwal = false;
+
+ if (first_stream)
+ ereport(LOG,
+ (errmsg("started streaming WAL from primary at %X/%X on timeline %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ startpointTLI)));
+ else
+ ereport(LOG,
+ (errmsg("restarted WAL streaming at %X/%X on timeline %u",
+ (uint32) (startpoint >> 32), (uint32) startpoint,
+ startpointTLI)));
+ first_stream = false;
+
+ /* Initialize LogstreamResult and buffers for processing messages */
+ LogstreamResult.Write = LogstreamResult.Flush = GetXLogReplayRecPtr();
+ initStringInfo(&reply_message);
+ initStringInfo(&incoming_message);
+
+ /* Initialize the last recv timestamp */
last_recv_timestamp = GetCurrentTimestamp();
ping_sent = false;
- /* Accept the received data, and process it */
- XLogWalRcvProcessMsg(type, buf, len);
-
- /* Receive any more data we can without sleeping */
- while (walrcv_receive(0, &type, &buf, &len))
+ /* Loop until end-of-streaming or error */
+ while (!endofwal)
{
- last_recv_timestamp = GetCurrentTimestamp();
- ping_sent = false;
- XLogWalRcvProcessMsg(type, buf, len);
- }
+ char *buf;
+ int len;
- /* Let the master know that we received some data. */
- XLogWalRcvSendReply(false, false);
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid
+ * the necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ /*
+ * Exit walreceiver if we're not in recovery. This should not
+ * happen, but cross-check the status here.
+ */
+ if (!RecoveryInProgress())
+ ereport(FATAL,
+ (errmsg("cannot continue WAL streaming, recovery has already ended")));
+
+ /* Process any requests or signals received recently */
+ ProcessWalRcvInterrupts();
+
+ if (got_SIGHUP)
+ {
+ got_SIGHUP = false;
+ ProcessConfigFile(PGC_SIGHUP);
+ }
+
+ /* Wait a while for data to arrive */
+ len = walrcv_receive(NAPTIME_PER_CYCLE, &buf);
+ if (len != 0)
+ {
+ /*
+ * Process the received data, and any subsequent data we
+ * can read without blocking.
+ */
+ for (;;)
+ {
+ if (len > 0)
+ {
+ /* Something was received from master, so reset timeout */
+ last_recv_timestamp = GetCurrentTimestamp();
+ ping_sent = false;
+ XLogWalRcvProcessMsg(buf[0], &buf[1], len - 1);
+ }
+ else if (len == 0)
+ break;
+ else if (len < 0)
+ {
+ ereport(LOG,
+ (errmsg("replication terminated by primary server"),
+ errdetail("End of WAL reached on timeline %u", startpointTLI)));
+ endofwal = true;
+ break;
+ }
+ len = walrcv_receive(0, &buf);
+ }
+
+ /* Let the master know that we received some data. */
+ XLogWalRcvSendReply(false, false);
+
+ /*
+ * If we've written some records, flush them to disk and
+ * let the startup process and primary server know about
+ * them.
+ */
+ XLogWalRcvFlush(false);
+ }
+ else
+ {
+ /*
+ * We didn't receive anything new. If we haven't heard
+ * anything from the server for more than
+ * wal_receiver_timeout / 2, ping the server. Also, if it's
+ * been longer than wal_receiver_status_interval since the
+ * last update we sent, send a status update to the master
+ * anyway, to report any progress in applying WAL.
+ */
+ bool requestReply = false;
+
+ /*
+ * Check if time since last receive from standby has
+ * reached the configured limit.
+ */
+ if (wal_receiver_timeout > 0)
+ {
+ TimestampTz now = GetCurrentTimestamp();
+ TimestampTz timeout;
+
+ timeout =
+ TimestampTzPlusMilliseconds(last_recv_timestamp,
+ wal_receiver_timeout);
+
+ if (now >= timeout)
+ ereport(ERROR,
+ (errmsg("terminating walreceiver due to timeout")));
+
+ /*
+ * We didn't receive anything new, for half of receiver
+ * replication timeout. Ping the server.
+ */
+ if (!ping_sent)
+ {
+ timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
+ (wal_receiver_timeout/2));
+ if (now >= timeout)
+ {
+ requestReply = true;
+ ping_sent = true;
+ }
+ }
+ }
+
+ XLogWalRcvSendReply(requestReply, requestReply);
+ XLogWalRcvSendHSFeedback();
+ }
+ }
/*
- * If we've written some records, flush them to disk and let the
- * startup process and primary server know about them.
+ * The backend finished streaming. Exit streaming COPY-mode from
+ * our side, too.
*/
+ EnableWalRcvImmediateExit();
+ walrcv_endstreaming();
+ DisableWalRcvImmediateExit();
+ }
+ else
+ ereport(LOG,
+ (errmsg("primary server contains no more WAL on requested timeline %u",
+ startpointTLI)));
+
+ /*
+ * End of WAL reached on the requested timeline. Close the last
+ * segment, and await for new orders from the startup process.
+ */
+ if (recvFile >= 0)
+ {
XLogWalRcvFlush(false);
+ if (close(recvFile) != 0)
+ ereport(PANIC,
+ (errcode_for_file_access(),
+ errmsg("could not close log segment %s: %m",
+ XLogFileNameP(recvFileTLI, recvSegNo))));
}
+ recvFile = -1;
+
+ elog(DEBUG1, "walreceiver ended streaming and awaits new instructions");
+ WalRcvWaitForStartPosition(&startpoint, &startpointTLI);
+ }
+ /* not reached */
+}
+
+/*
+ * Wait for startup process to set receiveStart and receiveStartTLI.
+ */
+static void
+WalRcvWaitForStartPosition(XLogRecPtr *startpoint, TimeLineID *startpointTLI)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ int state;
+
+ SpinLockAcquire(&walrcv->mutex);
+ state = walrcv->walRcvState;
+ if (state != WALRCV_STREAMING)
+ {
+ SpinLockRelease(&walrcv->mutex);
+ if (state == WALRCV_STOPPING)
+ proc_exit(0);
else
+ elog(FATAL, "unexpected walreceiver state");
+ }
+ walrcv->walRcvState = WALRCV_WAITING;
+ walrcv->receiveStart = InvalidXLogRecPtr;
+ walrcv->receiveStartTLI = 0;
+ SpinLockRelease(&walrcv->mutex);
+
+ if (update_process_title)
+ set_ps_display("idle", false);
+
+ /*
+ * nudge startup process to notice that we've stopped streaming and are
+ * now waiting for instructions.
+ */
+ WakeupRecovery();
+ for (;;)
+ {
+ ResetLatch(&walrcv->latch);
+
+ /*
+ * Emergency bailout if postmaster has died. This is to avoid the
+ * necessity for manual cleanup of all postmaster children.
+ */
+ if (!PostmasterIsAlive())
+ exit(1);
+
+ ProcessWalRcvInterrupts();
+
+ SpinLockAcquire(&walrcv->mutex);
+ Assert(walrcv->walRcvState == WALRCV_RESTARTING ||
+ walrcv->walRcvState == WALRCV_WAITING ||
+ walrcv->walRcvState == WALRCV_STOPPING);
+ if (walrcv->walRcvState == WALRCV_RESTARTING)
+ {
+ /* we don't expect primary_conninfo to change */
+ *startpoint = walrcv->receiveStart;
+ *startpointTLI = walrcv->receiveStartTLI;
+ walrcv->walRcvState = WALRCV_STREAMING;
+ SpinLockRelease(&walrcv->mutex);
+ break;
+ }
+ if (walrcv->walRcvState == WALRCV_STOPPING)
{
/*
- * We didn't receive anything new. If we haven't heard anything
- * from the server for more than wal_receiver_timeout / 2,
- * ping the server. Also, if it's been longer than
- * wal_receiver_status_interval since the last update we sent,
- * send a status update to the master anyway, to report any
- * progress in applying WAL.
+ * We should've received SIGTERM if the startup process wants
+ * us to die, but might as well check it here too.
*/
- bool requestReply = false;
+ SpinLockRelease(&walrcv->mutex);
+ exit(1);
+ }
+ SpinLockRelease(&walrcv->mutex);
- /*
- * Check if time since last receive from standby has reached the
- * configured limit.
- */
- if (wal_receiver_timeout > 0)
- {
- TimestampTz now = GetCurrentTimestamp();
- TimestampTz timeout;
+ WaitLatch(&walrcv->latch, WL_LATCH_SET | WL_POSTMASTER_DEATH, 0);
+ }
+
+ if (update_process_title)
+ {
+ char activitymsg[50];
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- wal_receiver_timeout);
+ snprintf(activitymsg, sizeof(activitymsg), "restarting at %X/%X",
+ (uint32) (*startpoint >> 32),
+ (uint32) *startpoint);
+ set_ps_display(activitymsg, false);
+ }
+}
- if (now >= timeout)
- ereport(ERROR,
- (errmsg("terminating walreceiver due to timeout")));
+/*
+ * Fetch any missing timeline history files between 'first' and 'last'
+ * (inclusive) from the server.
+ */
+static void
+WalRcvFetchTimeLineHistoryFiles(TimeLineID first, TimeLineID last)
+{
+ TimeLineID tli;
- /*
- * We didn't receive anything new, for half of receiver
- * replication timeout. Ping the server.
- */
- if (!ping_sent)
- {
- timeout = TimestampTzPlusMilliseconds(last_recv_timestamp,
- (wal_receiver_timeout/2));
- if (now >= timeout)
- {
- requestReply = true;
- ping_sent = true;
- }
- }
- }
+ for (tli = first; tli <= last; tli++)
+ {
+ if (!existsTimeLineHistory(tli))
+ {
+ char *fname;
+ char *content;
+ int len;
+ char expectedfname[MAXFNAMELEN];
- XLogWalRcvSendReply(requestReply, requestReply);
- XLogWalRcvSendHSFeedback();
+ ereport(LOG,
+ (errmsg("fetching timeline history file for timeline %u from primary server",
+ tli)));
+
+ EnableWalRcvImmediateExit();
+ walrcv_readtimelinehistoryfile(tli, &fname, &content, &len);
+ DisableWalRcvImmediateExit();
+
+ /*
+ * Check that the filename on the master matches what we calculated
+ * ourselves. This is just a sanity check, it should always match.
+ */
+ TLHistoryFileName(expectedfname, tli);
+ if (strcmp(fname, expectedfname) != 0)
+ ereport(ERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg_internal("primary reported unexpected filename for timeline history file of timeline %u",
+ tli)));
+
+ /*
+ * Write the file to pg_xlog.
+ */
+ writeTimeLineHistoryFile(tli, content, len);
+
+ pfree(fname);
+ pfree(content);
}
}
}
/* Ensure that all WAL records received are flushed to disk */
XLogWalRcvFlush(true);
+ DisownLatch(&walrcv->latch);
+
SpinLockAcquire(&walrcv->mutex);
- Assert(walrcv->walRcvState == WALRCV_RUNNING ||
+ Assert(walrcv->walRcvState == WALRCV_STREAMING ||
+ walrcv->walRcvState == WALRCV_RESTARTING ||
+ walrcv->walRcvState == WALRCV_STARTING ||
+ walrcv->walRcvState == WALRCV_WAITING ||
walrcv->walRcvState == WALRCV_STOPPING);
+ Assert(walrcv->pid == MyProcPid);
walrcv->walRcvState = WALRCV_STOPPED;
walrcv->pid = 0;
SpinLockRelease(&walrcv->mutex);
/* Terminate the connection gracefully. */
if (walrcv_disconnect != NULL)
walrcv_disconnect();
+
+ /* Wake up the startup process to notice promptly that we're gone */
+ WakeupRecovery();
}
/* SIGHUP: set flag to re-read config file at next convenient time */
got_SIGHUP = true;
}
+
+/* SIGUSR1: used by latch mechanism */
+static void
+WalRcvSigUsr1Handler(SIGNAL_ARGS)
+{
+ latch_sigusr1_handler();
+}
+
/* SIGTERM: set flag for main loop, or shutdown immediately if safe */
static void
WalRcvShutdownHandler(SIGNAL_ARGS)
got_SIGTERM = true;
+ SetLatch(&WalRcv->latch);
+
/* Don't joggle the elbow of proc_exit */
if (!proc_exit_inprogress && WalRcvImmediateInterruptOK)
ProcessWalRcvInterrupts();
{
walrcv->latestChunkStart = walrcv->receivedUpto;
walrcv->receivedUpto = LogstreamResult.Flush;
+ walrcv->receivedTLI = ThisTimeLineID;
}
SpinLockRelease(&walrcv->mutex);
/* Construct a new message */
writePtr = LogstreamResult.Write;
flushPtr = LogstreamResult.Flush;
- applyPtr = GetXLogReplayRecPtr(NULL);
+ applyPtr = GetXLogReplayRecPtr();
resetStringInfo(&reply_message);
pq_sendbyte(&reply_message, 'r');
MemSet(WalRcv, 0, WalRcvShmemSize());
WalRcv->walRcvState = WALRCV_STOPPED;
SpinLockInit(&WalRcv->mutex);
+ InitSharedLatch(&WalRcv->latch);
}
}
-/* Is walreceiver in progress (or starting up)? */
+/* Is walreceiver running (or starting up)? */
bool
-WalRcvInProgress(void)
+WalRcvRunning(void)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
return false;
}
+/*
+ * Is walreceiver running and streaming (or at least attempting to connect,
+ * or starting up)?
+ */
+bool
+WalRcvStreaming(void)
+{
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalRcvData *walrcv = WalRcv;
+ WalRcvState state;
+ pg_time_t startTime;
+
+ SpinLockAcquire(&walrcv->mutex);
+
+ state = walrcv->walRcvState;
+ startTime = walrcv->startTime;
+
+ SpinLockRelease(&walrcv->mutex);
+
+ /*
+ * If it has taken too long for walreceiver to start up, give up. Setting
+ * the state to STOPPED ensures that if walreceiver later does start up
+ * after all, it will see that it's not supposed to be running and die
+ * without doing anything.
+ */
+ if (state == WALRCV_STARTING)
+ {
+ pg_time_t now = (pg_time_t) time(NULL);
+
+ if ((now - startTime) > WALRCV_STARTUP_TIMEOUT)
+ {
+ SpinLockAcquire(&walrcv->mutex);
+
+ if (walrcv->walRcvState == WALRCV_STARTING)
+ state = walrcv->walRcvState = WALRCV_STOPPED;
+
+ SpinLockRelease(&walrcv->mutex);
+ }
+ }
+
+ if (state == WALRCV_STREAMING || state == WALRCV_STARTING ||
+ state == WALRCV_RESTARTING)
+ return true;
+ else
+ return false;
+}
+
/*
* Stop walreceiver (if running) and wait for it to die.
* Executed by the Startup process.
walrcv->walRcvState = WALRCV_STOPPED;
break;
- case WALRCV_RUNNING:
+ case WALRCV_STREAMING:
+ case WALRCV_WAITING:
+ case WALRCV_RESTARTING:
walrcv->walRcvState = WALRCV_STOPPING;
/* fall through */
case WALRCV_STOPPING:
* Wait for walreceiver to acknowledge its death by setting state to
* WALRCV_STOPPED.
*/
- while (WalRcvInProgress())
+ while (WalRcvRunning())
{
/*
* This possibly-long loop needs to handle interrupts of startup
* is a libpq connection string to use.
*/
void
-RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo)
+RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
+ bool launch = false;
pg_time_t now = (pg_time_t) time(NULL);
/*
SpinLockAcquire(&walrcv->mutex);
- /* It better be stopped before we try to restart it */
- Assert(walrcv->walRcvState == WALRCV_STOPPED);
+ /* It better be stopped if we try to restart it */
+ Assert(walrcv->walRcvState == WALRCV_STOPPED ||
+ walrcv->walRcvState == WALRCV_WAITING);
if (conninfo != NULL)
strlcpy((char *) walrcv->conninfo, conninfo, MAXCONNINFO);
else
walrcv->conninfo[0] = '\0';
- walrcv->walRcvState = WALRCV_STARTING;
+
+ if (walrcv->walRcvState == WALRCV_STOPPED)
+ {
+ launch = true;
+ walrcv->walRcvState = WALRCV_STARTING;
+ }
+ else
+ walrcv->walRcvState = WALRCV_RESTARTING;
walrcv->startTime = now;
/*
walrcv->latestChunkStart = recptr;
}
walrcv->receiveStart = recptr;
+ walrcv->receiveStartTLI = tli;
SpinLockRelease(&walrcv->mutex);
- SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+ if (launch)
+ SendPostmasterSignal(PMSIGNAL_START_WALRECEIVER);
+ else
+ SetLatch(&walrcv->latch);
}
/*
*
* Optionally, returns the previous chunk start, that is the first byte
* written in the most recent walreceiver flush cycle. Callers not
- * interested in that value may pass NULL for latestChunkStart.
+ * interested in that value may pass NULL for latestChunkStart. Same for
+ * receiveTLI.
*/
XLogRecPtr
-GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart)
+GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI)
{
/* use volatile pointer to prevent code rearrangement */
volatile WalRcvData *walrcv = WalRcv;
recptr = walrcv->receivedUpto;
if (latestChunkStart)
*latestChunkStart = walrcv->latestChunkStart;
+ if (receiveTLI)
+ *receiveTLI = walrcv->receivedTLI;
SpinLockRelease(&walrcv->mutex);
return recptr;
receivePtr = walrcv->receivedUpto;
SpinLockRelease(&walrcv->mutex);
- replayPtr = GetXLogReplayRecPtr(NULL);
+ replayPtr = GetXLogReplayRecPtr();
if (XLByteEQ(receivePtr, replayPtr))
return 0;
* (Note that there can be more than one walsender process concurrently.)
* It is started by the postmaster when the walreceiver of a standby server
* connects to the primary server and requests XLOG streaming replication.
- * It attempts to keep reading XLOG records from the disk and sending them
- * to the standby server, as long as the connection is alive (i.e., like
- * any backend, there is a one-to-one relationship between a connection
- * and a walsender process).
+ *
+ * A walsender is similar to a regular backend, ie. there is a one-to-one
+ * relationship between a connection and a walsender process, but instead
+ * of processing SQL queries, it understands a small set of special
+ * replication-mode commands. The START_REPLICATION command begins streaming
+ * WAL to the client. While streaming, the walsender keeps reading XLOG
+ * records from the disk and sends them to the standby server over the
+ * COPY protocol, until the either side ends the replication by exiting COPY
+ * mode (or until the connection is closed).
*
* Normal termination is by SIGTERM, which instructs the walsender to
* close the connection and exit(0) at next convenient moment. Emergency
#include <signal.h>
#include <unistd.h>
+#include "access/timeline.h"
#include "access/transam.h"
#include "access/xlog_internal.h"
#include "catalog/pg_type.h"
bool am_cascading_walsender = false; /* Am I cascading WAL to
* another standby ? */
-static bool replication_started = false; /* Started streaming yet? */
-
/* User-settable parameters for walsender */
int max_wal_senders = 0; /* the maximum number of concurrent walsenders */
int wal_sender_timeout = 60 * 1000; /* maximum time to send one
static XLogSegNo sendSegNo = 0;
static uint32 sendOff = 0;
+/*
+ * These variables keep track of the state of the timeline we're currently
+ * sending. sendTimeLine identifies the timeline. If sendTimeLineIsHistoric,
+ * the timeline is not the latest timeline on this server, and the server's
+ * history forked off from that timeline at sendTimeLineValidUpto.
+ */
+static TimeLineID sendTimeLine = 0;
+static bool sendTimeLineIsHistoric = false;
+static XLogRecPtr sendTimeLineValidUpto = InvalidXLogRecPtr;
+
/*
* How far have we sent WAL already? This is also advertised in
* MyWalSnd->sentPtr. (Actually, this is the next WAL location to send.)
/* Have we sent a heartbeat message asking for reply, since last reply? */
static bool ping_sent = false;
+/*
+ * While streaming WAL in Copy mode, streamingDoneSending is set to true
+ * after we have sent CopyDone. We should not send any more CopyData messages
+ * after that. streamingDoneReceiving is set to true when we receive CopyDone
+ * from the other end. When both become true, it's time to exit Copy mode.
+ */
+static bool streamingDoneSending;
+static bool streamingDoneReceiving;
+
/* Flags set by signal handlers for later service in main loop */
static volatile sig_atomic_t got_SIGHUP = false;
-volatile sig_atomic_t walsender_ready_to_stop = false;
+static volatile sig_atomic_t walsender_ready_to_stop = false;
+
+/*
+ * This is set while we are streaming. When not set, SIGUSR2 signal will be
+ * handled like SIGTERM. When set, the main loop is responsible for checking
+ * walsender_ready_to_stop and terminating when it's set (after streaming any
+ * remaining WAL).
+ */
+static volatile sig_atomic_t replication_active = false;
/* Signal handlers */
static void WalSndSigHupHandler(SIGNAL_ARGS);
static void WalSndLastCycleHandler(SIGNAL_ARGS);
/* Prototypes for private functions */
-static void WalSndLoop(void) __attribute__((noreturn));
+static void WalSndLoop(void);
static void InitWalSenderSlot(void);
static void WalSndKill(int code, Datum arg);
static void XLogSend(bool *caughtup);
*/
if (am_cascading_walsender)
ThisTimeLineID = GetRecoveryTargetTLI();
+
+ /*
+ * Let postmaster know that we're a WAL sender. Once we've declared us as
+ * a WAL sender process, postmaster will let us outlive the bgwriter and
+ * kill us last in the shutdown sequence, so we get a chance to stream all
+ * remaining WAL at shutdown, including the shutdown checkpoint. Note that
+ * there's no going back, and we mustn't write any WAL records after this.
+ */
+ MarkPostmasterChildWalSender();
+ SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
}
/*
sendFile = -1;
}
- /*
- * Don't return back to the command loop after we've started replicating.
- * We've already marked us as an actively streaming WAL sender in the
- * PMSignal slot, and there's currently no way to undo that.
- */
- if (replication_started)
+ replication_active = false;
+ if (walsender_ready_to_stop)
proc_exit(0);
+
+ /* Revert back to startup state */
+ WalSndSetState(WALSNDSTATE_STARTUP);
}
/*
- * IDENTIFY_SYSTEM
+ * Handle the IDENTIFY_SYSTEM command.
*/
static void
IdentifySystem(void)
snprintf(sysid, sizeof(sysid), UINT64_FORMAT,
GetSystemIdentifier());
- snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
- logptr = am_cascading_walsender ? GetStandbyFlushRecPtr(NULL) : GetInsertRecPtr();
+ am_cascading_walsender = RecoveryInProgress();
+ if (am_cascading_walsender)
+ {
+ logptr = GetStandbyFlushRecPtr();
+ ThisTimeLineID = GetRecoveryTargetTLI();
+ }
+ else
+ logptr = GetInsertRecPtr();
+
+ snprintf(tli, sizeof(tli), "%u", ThisTimeLineID);
snprintf(xpos, sizeof(xpos), "%X/%X", (uint32) (logptr >> 32), (uint32) logptr);
pq_endmessage(&buf);
}
+
/*
- * Handle START_REPLICATION command.
- *
- * At the moment, this never returns, but an ereport(ERROR) will take us back
- * to the main loop.
+ * Handle TIMELINE_HISTORY command.
*/
static void
-StartReplication(StartReplicationCmd *cmd)
+SendTimeLineHistory(TimeLineHistoryCmd *cmd)
{
StringInfoData buf;
+ char histfname[MAXFNAMELEN];
+ char path[MAXPGPATH];
+ int fd;
+ size_t histfilelen;
+ size_t bytesleft;
/*
- * Let postmaster know that we're streaming. Once we've declared us as a
- * WAL sender process, postmaster will let us outlive the bgwriter and
- * kill us last in the shutdown sequence, so we get a chance to stream all
- * remaining WAL at shutdown, including the shutdown checkpoint. Note that
- * there's no going back, and we mustn't write any WAL records after this.
+ * Reply with a result set with one row, and two columns. The first col
+ * is the name of the history file, 2nd is the contents.
*/
- MarkPostmasterChildWalSender();
- SendPostmasterSignal(PMSIGNAL_ADVANCE_STATE_MACHINE);
- replication_started = true;
- /*
- * When promoting a cascading standby, postmaster sends SIGUSR2 to any
- * cascading walsenders to kill them. But there is a corner-case where
- * such walsender fails to receive SIGUSR2 and survives a standby
- * promotion unexpectedly. This happens when postmaster sends SIGUSR2
- * before the walsender marks itself as a WAL sender, because postmaster
- * sends SIGUSR2 to only the processes marked as a WAL sender.
- *
- * To avoid this corner-case, if recovery is NOT in progress even though
- * the walsender is cascading one, we do the same thing as SIGUSR2 signal
- * handler does, i.e., set walsender_ready_to_stop to true. Which causes
- * the walsender to end later.
- *
- * When terminating cascading walsenders, usually postmaster writes the
- * log message announcing the terminations. But there is a race condition
- * here. If there is no walsender except this process before reaching
- * here, postmaster thinks that there is no walsender and suppresses that
- * log message. To handle this case, we always emit that log message here.
- * This might cause duplicate log messages, but which is less likely to
- * happen, so it's not worth writing some code to suppress them.
- */
- if (am_cascading_walsender && !RecoveryInProgress())
+ TLHistoryFileName(histfname, cmd->timeline);
+ TLHistoryFilePath(path, cmd->timeline);
+
+ /* Send a RowDescription message */
+ pq_beginmessage(&buf, 'T');
+ pq_sendint(&buf, 2, 2); /* 2 fields */
+
+ /* first field */
+ pq_sendstring(&buf, "filename"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, TEXTOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+
+ /* second field */
+ pq_sendstring(&buf, "content"); /* col name */
+ pq_sendint(&buf, 0, 4); /* table oid */
+ pq_sendint(&buf, 0, 2); /* attnum */
+ pq_sendint(&buf, BYTEAOID, 4); /* type oid */
+ pq_sendint(&buf, -1, 2); /* typlen */
+ pq_sendint(&buf, 0, 4); /* typmod */
+ pq_sendint(&buf, 0, 2); /* format code */
+ pq_endmessage(&buf);
+
+ /* Send a DataRow message */
+ pq_beginmessage(&buf, 'D');
+ pq_sendint(&buf, 2, 2); /* # of columns */
+ pq_sendint(&buf, strlen(histfname), 4); /* col1 len */
+ pq_sendbytes(&buf, histfname, strlen(histfname));
+
+ fd = OpenTransientFile(path, O_RDONLY | PG_BINARY, 0666);
+ if (fd < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not open file \"%s\": %m", path)));
+
+ /* Determine file length and send it to client */
+ histfilelen = lseek(fd, 0, SEEK_END);
+ if (histfilelen < 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to end of file \"%s\": %m", path)));
+ if (lseek(fd, 0, SEEK_SET) != 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not seek to beginning of file \"%s\": %m", path)));
+
+ pq_sendint(&buf, histfilelen, 4); /* col2 len */
+
+ bytesleft = histfilelen;
+ while (bytesleft > 0)
{
- ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
- walsender_ready_to_stop = true;
+ char rbuf[BLCKSZ];
+ int nread;
+
+ nread = read(fd, rbuf, sizeof(rbuf));
+ if (nread <= 0)
+ ereport(ERROR,
+ (errcode_for_file_access(),
+ errmsg("could not read file \"%s\": %m",
+ path)));
+ pq_sendbytes(&buf, rbuf, nread);
+ bytesleft -= nread;
}
+ CloseTransientFile(fd);
+
+ pq_endmessage(&buf);
+}
+
+/*
+ * Handle START_REPLICATION command.
+ *
+ * At the moment, this never returns, but an ereport(ERROR) will take us back
+ * to the main loop.
+ */
+static void
+StartReplication(StartReplicationCmd *cmd)
+{
+ StringInfoData buf;
/*
* We assume here that we're logging enough information in the WAL for
*/
/*
- * When we first start replication the standby will be behind the primary.
- * For some applications, for example, synchronous replication, it is
- * important to have a clear state for this initial catchup mode, so we
- * can trigger actions when we change streaming state later. We may stay
- * in this state for a long time, which is exactly why we want to be able
- * to monitor whether or not we are still here.
+ * Select the timeline. If it was given explicitly by the client, use
+ * that. Otherwise use the current ThisTimeLineID.
*/
- WalSndSetState(WALSNDSTATE_CATCHUP);
+ if (cmd->timeline != 0)
+ {
+ XLogRecPtr switchpoint;
- /* Send a CopyBothResponse message, and start streaming */
- pq_beginmessage(&buf, 'W');
- pq_sendbyte(&buf, 0);
- pq_sendint(&buf, 0, 2);
- pq_endmessage(&buf);
- pq_flush();
+ sendTimeLine = cmd->timeline;
+ if (sendTimeLine == ThisTimeLineID)
+ {
+ sendTimeLineIsHistoric = false;
+ sendTimeLineValidUpto = InvalidXLogRecPtr;
+ }
+ else
+ {
+ List *timeLineHistory;
- /*
- * Initialize position to the received one, then the xlog records begin to
- * be shipped from that position
- */
- sentPtr = cmd->startpoint;
+ sendTimeLineIsHistoric = true;
- /* Also update the start position status in shared memory */
- {
- /* use volatile pointer to prevent code rearrangement */
- volatile WalSnd *walsnd = MyWalSnd;
+ /*
+ * Check that the timeline the client requested for exists, and the
+ * requested start location is on that timeline.
+ */
+ timeLineHistory = readTimeLineHistory(ThisTimeLineID);
+ switchpoint = tliSwitchPoint(cmd->timeline, timeLineHistory);
+ list_free_deep(timeLineHistory);
- SpinLockAcquire(&walsnd->mutex);
- walsnd->sentPtr = sentPtr;
- SpinLockRelease(&walsnd->mutex);
+ /*
+ * Found the requested timeline in the history. Check that
+ * requested startpoint is on that timeline in our history.
+ *
+ * This is quite loose on purpose. We only check that we didn't
+ * fork off the requested timeline before the switchpoint. We don't
+ * check that we switched *to* it before the requested starting
+ * point. This is because the client can legitimately request to
+ * start replication from the beginning of the WAL segment that
+ * contains switchpoint, but on the new timeline, so that it
+ * doesn't end up with a partial segment. If you ask for a too old
+ * starting point, you'll get an error later when we fail to find
+ * the requested WAL segment in pg_xlog.
+ *
+ * XXX: we could be more strict here and only allow a startpoint
+ * that's older than the switchpoint, if it it's still in the same
+ * WAL segment.
+ */
+ if (!XLogRecPtrIsInvalid(switchpoint) &&
+ XLByteLT(switchpoint, cmd->startpoint))
+ {
+ ereport(ERROR,
+ (errmsg("requested starting point %X/%X on timeline %u is not in this server's history",
+ (uint32) (cmd->startpoint >> 32),
+ (uint32) (cmd->startpoint),
+ cmd->timeline),
+ errdetail("This server's history forked from timeline %u at %X/%X",
+ cmd->timeline,
+ (uint32) (switchpoint >> 32),
+ (uint32) (switchpoint))));
+ }
+ sendTimeLineValidUpto = switchpoint;
+ }
+ }
+ else
+ {
+ sendTimeLine = ThisTimeLineID;
+ sendTimeLineValidUpto = InvalidXLogRecPtr;
+ sendTimeLineIsHistoric = false;
}
- SyncRepInitConfig();
+ streamingDoneSending = streamingDoneReceiving = false;
+
+ /* If there is nothing to stream, don't even enter COPY mode */
+ if (!sendTimeLineIsHistoric ||
+ XLByteLT(cmd->startpoint, sendTimeLineValidUpto))
+ {
+ XLogRecPtr FlushPtr;
+ /*
+ * When we first start replication the standby will be behind the primary.
+ * For some applications, for example, synchronous replication, it is
+ * important to have a clear state for this initial catchup mode, so we
+ * can trigger actions when we change streaming state later. We may stay
+ * in this state for a long time, which is exactly why we want to be able
+ * to monitor whether or not we are still here.
+ */
+ WalSndSetState(WALSNDSTATE_CATCHUP);
+
+ /* Send a CopyBothResponse message, and start streaming */
+ pq_beginmessage(&buf, 'W');
+ pq_sendbyte(&buf, 0);
+ pq_sendint(&buf, 0, 2);
+ pq_endmessage(&buf);
+ pq_flush();
+
+ /*
+ * Don't allow a request to stream from a future point in WAL that
+ * hasn't been flushed to disk in this server yet.
+ */
+ if (am_cascading_walsender)
+ FlushPtr = GetStandbyFlushRecPtr();
+ else
+ FlushPtr = GetFlushRecPtr();
+ if (XLByteLT(FlushPtr, cmd->startpoint))
+ {
+ ereport(ERROR,
+ (errmsg("requested starting point %X/%X is ahead of the WAL flush position of this server %X/%X",
+ (uint32) (cmd->startpoint >> 32),
+ (uint32) (cmd->startpoint),
+ (uint32) (FlushPtr >> 32),
+ (uint32) (FlushPtr))));
+ }
+
+ /* Start streaming from the requested point */
+ sentPtr = cmd->startpoint;
- /* Main loop of walsender */
- WalSndLoop();
+ /* Initialize shared memory status, too */
+ {
+ /* use volatile pointer to prevent code rearrangement */
+ volatile WalSnd *walsnd = MyWalSnd;
+
+ SpinLockAcquire(&walsnd->mutex);
+ walsnd->sentPtr = sentPtr;
+ SpinLockRelease(&walsnd->mutex);
+ }
+
+ SyncRepInitConfig();
+
+ /* Main loop of walsender */
+ replication_active = true;
+
+ WalSndLoop();
+
+ replication_active = false;
+ if (walsender_ready_to_stop)
+ proc_exit(0);
+ WalSndSetState(WALSNDSTATE_STARTUP);
+ }
+
+ /* Get out of COPY mode (CommandComplete). */
+ EndCommand("COPY 0", DestRemote);
}
/*
SendBaseBackup((BaseBackupCmd *) cmd_node);
break;
+ case T_TimeLineHistoryCmd:
+ SendTimeLineHistory((TimeLineHistoryCmd *) cmd_node);
+ break;
+
default:
- ereport(FATAL,
- (errcode(ERRCODE_PROTOCOL_VIOLATION),
- errmsg("invalid standby query string: %s", cmd_string)));
+ elog(ERROR, "unrecognized replication command node tag: %u",
+ cmd_node->type);
}
/* done */
}
/*
- * Check if the remote end has closed the connection.
+ * Process any incoming messages while streaming. Also checks if the remote
+ * end has closed the connection.
*/
static void
ProcessRepliesIfAny(void)
int r;
bool received = false;
- for (;;)
+ /*
+ * If we already received a CopyDone from the frontend, any subsequent
+ * message is the beginning of a new command, and should be processed in
+ * the main processing loop.
+ */
+ while (!streamingDoneReceiving)
{
r = pq_getbyte_if_available(&firstchar);
if (r < 0)
received = true;
break;
+ /*
+ * CopyDone means the standby requested to finish streaming.
+ * Reply with CopyDone, if we had not sent that already.
+ */
+ case 'c':
+ if (!streamingDoneSending)
+ {
+ pq_putmessage_noblock('c', NULL, 0);
+ streamingDoneSending = true;
+ }
+
+ /* consume the CopyData message */
+ resetStringInfo(&reply_message);
+ if (pq_getmessage(&reply_message, 0))
+ {
+ ereport(COMMERROR,
+ (errcode(ERRCODE_PROTOCOL_VIOLATION),
+ errmsg("unexpected EOF on standby connection")));
+ proc_exit(0);
+ }
+
+ streamingDoneReceiving = true;
+ received = true;
+ break;
+
/*
* 'X' means that the standby is closing down the socket.
*/
last_reply_timestamp = GetCurrentTimestamp();
ping_sent = false;
- /* Loop forever, unless we get an error */
+ /*
+ * Loop until we reach the end of this timeline or the client requests
+ * to stop streaming.
+ */
for (;;)
{
/* Clear any already-pending wakeups */
/* Check for input from the client */
ProcessRepliesIfAny();
+ /*
+ * If we have received CopyDone from the client, sent CopyDone
+ * ourselves, and the output buffer is empty, it's time to exit
+ * streaming.
+ */
+ if (!pq_is_send_pending() && streamingDoneSending && streamingDoneReceiving)
+ break;
+
/*
* If we don't have any pending data in the output buffer, try to send
* some more. If there is some, we don't bother to call XLogSend
/* Try to flush pending output to the client */
if (pq_flush_if_writable() != 0)
- break;
+ goto send_failure;
/* If nothing remains to be sent right now ... */
if (caughtup && !pq_is_send_pending())
if (caughtup && !pq_is_send_pending())
{
/* Inform the standby that XLOG streaming is done */
- pq_puttextmessage('C', "COPY 0");
+ EndCommand("COPY 0", DestRemote);
pq_flush();
proc_exit(0);
* loaded a subset of the available data but then pq_flush_if_writable
* flushed it all --- we should immediately try to send more.
*/
- if (caughtup || pq_is_send_pending())
+ if ((caughtup && !streamingDoneSending) || pq_is_send_pending())
{
TimestampTz timeout = 0;
long sleeptime = 10000; /* 10 s */
int wakeEvents;
- wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH |
- WL_SOCKET_READABLE | WL_TIMEOUT;
+ wakeEvents = WL_LATCH_SET | WL_POSTMASTER_DEATH | WL_TIMEOUT;
+
+ if (!streamingDoneReceiving)
+ wakeEvents |= WL_SOCKET_READABLE;
if (pq_is_send_pending())
wakeEvents |= WL_SOCKET_WRITEABLE;
*/
ereport(COMMERROR,
(errmsg("terminating walsender process due to replication timeout")));
- break;
+ goto send_failure;
}
}
}
+ return;
+send_failure:
/*
* Get here on send failure. Clean up and exit.
*
* more than one.
*/
void
-XLogRead(char *buf, XLogRecPtr startptr, Size count)
+XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count)
{
char *p;
XLogRecPtr recptr;
startoff = recptr % XLogSegSize;
- if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo))
+ if (sendFile < 0 || !XLByteInSeg(recptr, sendSegNo) || sendTimeLine != tli)
{
char path[MAXPGPATH];
if (sendFile >= 0)
close(sendFile);
+ sendTimeLine = tli;
XLByteToSeg(recptr, sendSegNo);
- XLogFilePath(path, ThisTimeLineID, sendSegNo);
+ XLogFilePath(path, sendTimeLine, sendSegNo);
sendFile = BasicOpenFile(path, O_RDONLY | PG_BINARY, 0);
if (sendFile < 0)
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(ThisTimeLineID, sendSegNo))));
+ XLogFileNameP(sendTimeLine, sendSegNo))));
else
ereport(ERROR,
(errcode_for_file_access(),
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not seek in log segment %s to offset %u: %m",
- XLogFileNameP(ThisTimeLineID, sendSegNo),
+ XLogFileNameP(sendTimeLine, sendSegNo),
startoff)));
sendOff = startoff;
}
ereport(ERROR,
(errcode_for_file_access(),
errmsg("could not read from log segment %s, offset %u, length %lu: %m",
- XLogFileNameP(ThisTimeLineID, sendSegNo),
+ XLogFileNameP(sendTimeLine, sendSegNo),
sendOff, (unsigned long) segbytes)));
}
ereport(ERROR,
(errcode_for_file_access(),
errmsg("requested WAL segment %s has already been removed",
- XLogFileNameP(ThisTimeLineID, segno))));
+ XLogFileNameP(sendTimeLine, segno))));
/*
* During recovery, the currently-open WAL file might be replaced with the
XLogSend(bool *caughtup)
{
XLogRecPtr SendRqstPtr;
+ XLogRecPtr FlushPtr;
XLogRecPtr startptr;
XLogRecPtr endptr;
Size nbytes;
+ if (streamingDoneSending)
+ {
+ *caughtup = true;
+ return;
+ }
+
/*
* Attempt to send all data that's already been written out and fsync'd to
* disk. We cannot go further than what's been written out given the
* that gets lost on the master.
*/
if (am_cascading_walsender)
+ FlushPtr = GetStandbyFlushRecPtr();
+ else
+ FlushPtr = GetFlushRecPtr();
+
+ /*
+ * In a cascading standby, the current recovery target timeline can
+ * change, or we can be promoted. In either case, the current timeline
+ * becomes historic. We need to detect that so that we don't try to stream
+ * past the point where we switched to another timeline. It's checked
+ * after calculating FlushPtr, to avoid a race condition: if the timeline
+ * becomes historic just after we checked that it was still current, it
+ * should still be OK to stream it up to the FlushPtr that was calculated
+ * before it became historic.
+ */
+ if (!sendTimeLineIsHistoric && am_cascading_walsender)
{
- TimeLineID currentTargetTLI;
- SendRqstPtr = GetStandbyFlushRecPtr(¤tTargetTLI);
+ bool becameHistoric = false;
+ TimeLineID targetTLI;
- /*
- * If the recovery target timeline changed, bail out. It's a bit
- * unfortunate that we have to just disconnect, but there is no way
- * to tell the client that the timeline changed. We also don't know
- * exactly where the switch happened, so we cannot safely try to send
- * up to the switchover point before disconnecting.
- */
- if (currentTargetTLI != ThisTimeLineID)
+ if (!RecoveryInProgress())
{
- if (!walsender_ready_to_stop)
- ereport(LOG,
- (errmsg("terminating walsender process to force cascaded standby "
- "to update timeline and reconnect")));
- walsender_ready_to_stop = true;
- *caughtup = true;
- return;
+ /*
+ * We have been promoted. RecoveryInProgress() updated
+ * ThisTimeLineID to the new current timeline.
+ */
+ targetTLI = ThisTimeLineID;
+ am_cascading_walsender = false;
+ becameHistoric = true;
+ }
+ else
+ {
+ /*
+ * Still a cascading standby. But is the timeline we're sending
+ * still the recovery target timeline?
+ */
+ targetTLI = GetRecoveryTargetTLI();
+
+ if (targetTLI != sendTimeLine)
+ becameHistoric = true;
+ }
+
+ if (becameHistoric)
+ {
+ /*
+ * The timeline we were sending has become historic. Read the
+ * timeline history file of the new timeline to see where exactly
+ * we forked off from the timeline we were sending.
+ */
+ List *history;
+
+ history = readTimeLineHistory(targetTLI);
+ sendTimeLineValidUpto = tliSwitchPoint(sendTimeLine, history);
+ Assert(XLByteLE(sentPtr, sendTimeLineValidUpto));
+ list_free_deep(history);
+
+ /* the switchpoint should be >= current send pointer */
+ if (!XLByteLE(sentPtr, sendTimeLineValidUpto))
+ elog(ERROR, "server switched off timeline %u at %X/%X, but walsender already streamed up to %X/%X",
+ sendTimeLine,
+ (uint32) (sendTimeLineValidUpto >> 32),
+ (uint32) sendTimeLineValidUpto,
+ (uint32) (sentPtr >> 32),
+ (uint32) sentPtr);
+
+ sendTimeLineIsHistoric = true;
}
}
+
+ /*
+ * If this is a historic timeline and we've reached the point where we
+ * forked to the next timeline, stop streaming.
+ */
+ if (sendTimeLineIsHistoric && XLByteLE(sendTimeLineValidUpto, sentPtr))
+ {
+ /* close the current file. */
+ if (sendFile >= 0)
+ close(sendFile);
+ sendFile = -1;
+
+ /* Send CopyDone */
+ pq_putmessage_noblock('c', NULL, 0);
+ streamingDoneSending = true;
+
+ *caughtup = true;
+ return;
+ }
+
+ /*
+ * Stream up to the point known to be flushed to disk, or to the end of
+ * this timeline, whichever comes first.
+ */
+ if (sendTimeLineIsHistoric && XLByteLT(sendTimeLineValidUpto, FlushPtr))
+ SendRqstPtr = sendTimeLineValidUpto;
else
- SendRqstPtr = GetFlushRecPtr();
+ SendRqstPtr = FlushPtr;
- /* Quick exit if nothing to do */
+ Assert(XLByteLE(sentPtr, SendRqstPtr));
if (XLByteLE(SendRqstPtr, sentPtr))
{
*caughtup = true;
if (XLByteLE(SendRqstPtr, endptr))
{
endptr = SendRqstPtr;
- *caughtup = true;
+ if (sendTimeLineIsHistoric)
+ *caughtup = false;
+ else
+ *caughtup = true;
}
else
{
* calls.
*/
enlargeStringInfo(&output_message, nbytes);
- XLogRead(&output_message.data[output_message.len], startptr, nbytes);
+ XLogRead(&output_message.data[output_message.len], sendTimeLine, startptr, nbytes);
output_message.len += nbytes;
output_message.data[output_message.len] = '\0';
{
int save_errno = errno;
+ /*
+ * If replication has not yet started, die like with SIGTERM. If
+ * replication is active, only set a flag and wake up the main loop. It
+ * will send any outstanding WAL, and then exit gracefully.
+ */
+ if (!replication_active)
+ kill(MyProcPid, SIGTERM);
+
walsender_ready_to_stop = true;
if (MyWalSnd)
SetLatch(&MyWalSnd->latch);
extern TimeLineID findNewestTimeLine(TimeLineID startTLI);
extern void writeTimeLineHistory(TimeLineID newTLI, TimeLineID parentTLI,
XLogRecPtr switchpoint, char *reason);
+extern void writeTimeLineHistoryFile(TimeLineID tli, char *content, int size);
extern bool tliInHistory(TimeLineID tli, List *expectedTLIs);
extern TimeLineID tliOfPointInHistory(XLogRecPtr ptr, List *history);
extern XLogRecPtr tliSwitchPoint(TimeLineID tli, List *history);
extern bool HotStandbyActive(void);
extern bool XLogInsertAllowed(void);
extern void GetXLogReceiptTime(TimestampTz *rtime, bool *fromStream);
-extern XLogRecPtr GetXLogReplayRecPtr(TimeLineID *targetTLI);
-extern XLogRecPtr GetStandbyFlushRecPtr(TimeLineID *targetTLI);
+extern XLogRecPtr GetXLogReplayRecPtr(void);
+extern XLogRecPtr GetStandbyFlushRecPtr(void);
extern XLogRecPtr GetXLogInsertRecPtr(void);
extern XLogRecPtr GetXLogWriteRecPtr(void);
extern bool RecoveryIsPaused(void);
T_IdentifySystemCmd,
T_BaseBackupCmd,
T_StartReplicationCmd,
+ T_TimeLineHistoryCmd,
/*
* TAGS FOR RANDOM OTHER STUFF
typedef struct StartReplicationCmd
{
NodeTag type;
+ TimeLineID timeline;
XLogRecPtr startpoint;
} StartReplicationCmd;
+
+/* ----------------------
+ * TIMELINE_HISTORY command
+ * ----------------------
+ */
+typedef struct TimeLineHistoryCmd
+{
+ NodeTag type;
+ TimeLineID timeline;
+} TimeLineHistoryCmd;
+
#endif /* REPLNODES_H */
#include "access/xlog.h"
#include "access/xlogdefs.h"
+#include "storage/latch.h"
#include "storage/spin.h"
#include "pgtime.h"
WALRCV_STOPPED, /* stopped and mustn't start up again */
WALRCV_STARTING, /* launched, but the process hasn't
* initialized yet */
- WALRCV_RUNNING, /* walreceiver is running */
+ WALRCV_STREAMING, /* walreceiver is streaming */
+ WALRCV_WAITING, /* stopped streaming, waiting for orders */
+ WALRCV_RESTARTING, /* asked to restart streaming */
WALRCV_STOPPING /* requested to stop, but still running */
} WalRcvState;
pg_time_t startTime;
/*
- * receiveStart is the first byte position that will be received. When
- * startup process starts the walreceiver, it sets receiveStart to the
- * point where it wants the streaming to begin.
+ * receiveStart and receiveStartTLI indicate the first byte position
+ * and timeline that will be received. When startup process starts the
+ * walreceiver, it sets these to the point where it wants the streaming
+ * to begin.
*/
XLogRecPtr receiveStart;
+ TimeLineID receiveStartTLI;
/*
* receivedUpto-1 is the last byte position that has already been
- * received. At the first startup of walreceiver, receivedUpto is set to
- * receiveStart. After that, walreceiver updates this whenever it flushes
- * the received WAL to disk.
+ * received, and receivedTLI is the timeline it came from. At the first
+ * startup of walreceiver, these are set to receiveStart and
+ * receiveStartTLI. After that, walreceiver updates these whenever it
+ * flushes the received WAL to disk.
*/
XLogRecPtr receivedUpto;
+ TimeLineID receivedTLI;
/*
* latestChunkStart is the starting byte position of the current "batch"
char conninfo[MAXCONNINFO];
slock_t mutex; /* locks shared variables shown above */
+
+ /*
+ * Latch used by startup process to wake up walreceiver after telling it
+ * where to start streaming (after setting receiveStart and
+ * receiveStartTLI).
+ */
+ Latch latch;
} WalRcvData;
extern WalRcvData *WalRcv;
/* libpqwalreceiver hooks */
-typedef bool (*walrcv_connect_type) (char *conninfo, XLogRecPtr startpoint);
+typedef void (*walrcv_connect_type) (char *conninfo);
extern PGDLLIMPORT walrcv_connect_type walrcv_connect;
-typedef bool (*walrcv_receive_type) (int timeout, unsigned char *type,
- char **buffer, int *len);
+typedef void (*walrcv_identify_system_type) (TimeLineID *primary_tli);
+extern PGDLLIMPORT walrcv_identify_system_type walrcv_identify_system;
+
+typedef void (*walrcv_readtimelinehistoryfile_type) (TimeLineID tli, char **filename, char **content, int *size);
+extern PGDLLIMPORT walrcv_readtimelinehistoryfile_type walrcv_readtimelinehistoryfile;
+
+typedef bool (*walrcv_startstreaming_type) (TimeLineID tli, XLogRecPtr startpoint);
+extern PGDLLIMPORT walrcv_startstreaming_type walrcv_startstreaming;
+
+typedef void (*walrcv_endstreaming_type) (void);
+extern PGDLLIMPORT walrcv_endstreaming_type walrcv_endstreaming;
+
+typedef int (*walrcv_receive_type) (int timeout, char **buffer);
extern PGDLLIMPORT walrcv_receive_type walrcv_receive;
typedef void (*walrcv_send_type) (const char *buffer, int nbytes);
extern Size WalRcvShmemSize(void);
extern void WalRcvShmemInit(void);
extern void ShutdownWalRcv(void);
-extern bool WalRcvInProgress(void);
-extern void RequestXLogStreaming(XLogRecPtr recptr, const char *conninfo);
-extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart);
+extern bool WalRcvStreaming(void);
+extern bool WalRcvRunning(void);
+extern void RequestXLogStreaming(TimeLineID tli, XLogRecPtr recptr, const char *conninfo);
+extern XLogRecPtr GetWalRcvWriteRecPtr(XLogRecPtr *latestChunkStart, TimeLineID *receiveTLI);
extern int GetReplicationApplyDelay(void);
extern int GetReplicationTransferLatency(void);
/* global state */
extern bool am_walsender;
extern bool am_cascading_walsender;
-extern volatile sig_atomic_t walsender_ready_to_stop;
extern bool wake_wal_senders;
/* user-settable parameters */
extern void WalSndSetState(WalSndState state);
-extern void XLogRead(char *buf, XLogRecPtr startptr, Size count);
+extern void XLogRead(char *buf, TimeLineID tli, XLogRecPtr startptr, Size count);
/*
* Internal functions for parsing the replication grammar, in repl_gram.y and
{
if (!conn)
return -1;
- if (conn->asyncStatus != PGASYNC_COPY_IN)
+ if (conn->asyncStatus != PGASYNC_COPY_IN &&
+ conn->asyncStatus != PGASYNC_COPY_BOTH)
{
printfPQExpBuffer(&conn->errorMessage,
libpq_gettext("no COPY in progress\n"));
}
/* Return to active duty */
- conn->asyncStatus = PGASYNC_BUSY;
+ if (conn->asyncStatus == PGASYNC_COPY_BOTH)
+ conn->asyncStatus = PGASYNC_COPY_OUT;
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
resetPQExpBuffer(&conn->errorMessage);
/* Try to flush data */
* expect the state was already changed.
*/
if (msgLength == -1)
- conn->asyncStatus = PGASYNC_BUSY;
+ {
+ if (conn->asyncStatus == PGASYNC_COPY_BOTH)
+ conn->asyncStatus = PGASYNC_COPY_IN;
+ else
+ conn->asyncStatus = PGASYNC_BUSY;
+ }
return msgLength; /* end-of-copy or error */
}
if (msgLength == 0)