#include <zlib.h>
 #endif
 
+#include "common/string.h"
 #include "getopt_long.h"
 #include "libpq-fe.h"
 #include "pqexpbuffer.h"
    if (!ReceiveXlogStream(param->bgconn, param->startptr, param->timeline,
                           param->sysidentifier, param->xlogdir,
                           reached_end_position, standby_message_timeout,
-                          NULL, false))
+                          NULL, false, true))
 
        /*
         * Any errors will already have been reported in the function process,
    logstreamer_param *param;
    uint32      hi,
                lo;
+   char        statusdir[MAXPGPATH];
 
    param = pg_malloc0(sizeof(logstreamer_param));
    param->timeline = timeline;
        /* Error message already written in GetConnection() */
        exit(1);
 
+   snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
+
    /*
-    * Always in plain format, so we can write to basedir/pg_xlog. But the
-    * directory entry in the tar file may arrive later, so make sure it's
-    * created before we start.
+    * Create pg_xlog/archive_status (and thus pg_xlog) so we can can write to
+    * basedir/pg_xlog as the directory entry in the tar file may arrive
+    * later.
     */
-   snprintf(param->xlogdir, sizeof(param->xlogdir), "%s/pg_xlog", basedir);
-   verify_dir_is_empty_or_create(param->xlogdir);
+   snprintf(statusdir, sizeof(statusdir), "%s/pg_xlog/archive_status",
+            basedir);
+
+   if (pg_mkdir_p(statusdir, S_IRWXU) != 0 && errno != EEXIST)
+   {
+       fprintf(stderr,
+               _("%s: could not create directory \"%s\": %s\n"),
+               progname, statusdir, strerror(errno));
+       disconnect_and_exit(1);
+   }
 
    /*
     * Start a child process and tell it to start streaming. On Unix, this is
                         * by the wal receiver process. Also, when transaction
                         * log directory location was specified, pg_xlog has
                         * already been created as a symbolic link before
-                        * starting the actual backup. So just ignore failure
-                        * on them.
+                        * starting the actual backup. So just ignore creation
+                        * failures on related directories.
                         */
-                       if ((!streamwal && (strcmp(xlog_dir, "") == 0))
-                           || strcmp(filename + strlen(filename) - 8, "/pg_xlog") != 0)
+                       if (!((pg_str_endswith(filename, "/pg_xlog") ||
+                              pg_str_endswith(filename, "/archive_status")) &&
+                             errno == EEXIST))
                        {
                            fprintf(stderr,
                            _("%s: could not create directory \"%s\": %s\n"),
 
                 uint32 timeline, char *basedir,
               stream_stop_callback stream_stop, int standby_message_timeout,
                  char *partial_suffix, XLogRecPtr *stoppos,
-                 bool synchronous);
+                 bool synchronous, bool mark_done);
 static int CopyStreamPoll(PGconn *conn, long timeout_ms);
 static int CopyStreamReceive(PGconn *conn, long timeout, char **buffer);
 static bool ProcessKeepaliveMsg(PGconn *conn, char *copybuf, int len,
 static bool ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
                               XLogRecPtr *blockpos, uint32 timeline,
                               char *basedir, stream_stop_callback stream_stop,
-                              char *partial_suffix);
+                              char *partial_suffix, bool mark_done);
 static PGresult *HandleEndOfCopyStream(PGconn *conn, char *copybuf,
                                       XLogRecPtr blockpos, char *basedir, char *partial_suffix,
-                                      XLogRecPtr *stoppos);
+                                      XLogRecPtr *stoppos, bool mark_done);
 static bool CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos,
                                uint32 timeline, char *basedir,
                                stream_stop_callback stream_stop,
-                               char *partial_suffix, XLogRecPtr *stoppos);
+                               char *partial_suffix, XLogRecPtr *stoppos,
+                               bool mark_done);
 static long CalculateCopyStreamSleeptime(int64 now, int standby_message_timeout,
                                         int64 last_status);
 
 static bool ReadEndOfStreamingResult(PGresult *res, XLogRecPtr *startpos,
                         uint32 *timeline);
 
+static bool
+mark_file_as_archived(const char *basedir, const char *fname)
+{
+   int fd;
+   static char tmppath[MAXPGPATH];
+
+   snprintf(tmppath, sizeof(tmppath), "%s/archive_status/%s.done",
+            basedir, fname);
+
+   fd = open(tmppath, O_WRONLY | O_CREAT | PG_BINARY, S_IRUSR | S_IWUSR);
+   if (fd < 0)
+   {
+       fprintf(stderr, _("%s: could not create archive status file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   if (fsync(fd) != 0)
+   {
+       fprintf(stderr, _("%s: could not fsync file \"%s\": %s\n"),
+               progname, tmppath, strerror(errno));
+       return false;
+   }
+
+   close(fd);
+
+   return true;
+}
+
 /*
  * Open a new WAL file in the specified directory.
  *
  * and returns false, otherwise returns true.
  */
 static bool
-close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos)
+close_walfile(char *basedir, char *partial_suffix, XLogRecPtr pos, bool mark_done)
 {
    off_t       currpos;
 
                _("%s: not renaming \"%s%s\", segment is not complete\n"),
                progname, current_walfile_name, partial_suffix);
 
+   /*
+    * Mark file as archived if requested by the caller - pg_basebackup needs
+    * to do so as files can otherwise get archived again after promotion of a
+    * new node. This is in line with walreceiver.c always doing a
+    * XLogArchiveForceDone() after a complete segment.
+    */
+   if (currpos == XLOG_SEG_SIZE && mark_done)
+   {
+       /* writes error message if failed */
+       if (!mark_file_as_archived(basedir, current_walfile_name))
+           return false;
+   }
+
    lastFlushPosition = pos;
    return true;
 }
 }
 
 static bool
-writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename, char *content)
+writeTimeLineHistoryFile(char *basedir, TimeLineID tli, char *filename,
+                        char *content, bool mark_done)
 {
    int         size = strlen(content);
    char        path[MAXPGPATH];
        return false;
    }
 
+   /* Maintain archive_status, check close_walfile() for details. */
+   if (mark_done)
+   {
+       /* writes error message if failed */
+       if (!mark_file_as_archived(basedir, histfname))
+           return false;
+   }
+
    return true;
 }
 
                  char *sysidentifier, char *basedir,
                  stream_stop_callback stream_stop,
                  int standby_message_timeout, char *partial_suffix,
-                 bool synchronous)
+                 bool synchronous, bool mark_done)
 {
    char        query[128];
    char        slotcmd[128];
            /* Write the history file to disk */
            writeTimeLineHistoryFile(basedir, timeline,
                                     PQgetvalue(res, 0, 0),
-                                    PQgetvalue(res, 0, 1));
+                                    PQgetvalue(res, 0, 1),
+                                    mark_done);
 
            PQclear(res);
        }
        /* Stream the WAL */
        res = HandleCopyStream(conn, startpos, timeline, basedir, stream_stop,
                               standby_message_timeout, partial_suffix,
-                              &stoppos, synchronous);
+                              &stoppos, synchronous, mark_done);
        if (res == NULL)
            goto error;
 
 HandleCopyStream(PGconn *conn, XLogRecPtr startpos, uint32 timeline,
                 char *basedir, stream_stop_callback stream_stop,
                 int standby_message_timeout, char *partial_suffix,
-                XLogRecPtr *stoppos, bool synchronous)
+                XLogRecPtr *stoppos, bool synchronous, bool mark_done)
 {
    char       *copybuf = NULL;
    int64       last_status = -1;
         * Check if we should continue streaming, or abort at this point.
         */
        if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
-                               stream_stop, partial_suffix, stoppos))
+                               stream_stop, partial_suffix, stoppos,
+                               mark_done))
            goto error;
 
        now = feGetCurrentTimestamp();
            if (r == -2)
            {
                PGresult    *res = HandleEndOfCopyStream(conn, copybuf, blockpos,
-                                                        basedir, partial_suffix, stoppos);
+                                                        basedir, partial_suffix,
+                                                        stoppos, mark_done);
                if (res == NULL)
                    goto error;
                else
            else if (copybuf[0] == 'w')
            {
                if (!ProcessXLogDataMsg(conn, copybuf, r, &blockpos,
-                                       timeline, basedir, stream_stop, partial_suffix))
+                                       timeline, basedir, stream_stop,
+                                       partial_suffix, true))
                    goto error;
 
                /*
                 * Check if we should continue streaming, or abort at this point.
                 */
                if (!CheckCopyStreamStop(conn, blockpos, timeline, basedir,
-                                        stream_stop, partial_suffix, stoppos))
+                                        stream_stop, partial_suffix, stoppos,
+                                        mark_done))
                    goto error;
            }
            else
 ProcessXLogDataMsg(PGconn *conn, char *copybuf, int len,
                   XLogRecPtr *blockpos, uint32 timeline,
                   char *basedir, stream_stop_callback stream_stop,
-                  char *partial_suffix)
+                  char *partial_suffix, bool mark_done)
 {
    int         xlogoff;
    int         bytes_left;
        /* Did we reach the end of a WAL segment? */
        if (*blockpos % XLOG_SEG_SIZE == 0)
        {
-           if (!close_walfile(basedir, partial_suffix, *blockpos))
+           if (!close_walfile(basedir, partial_suffix, *blockpos, mark_done))
                /* Error message written in close_walfile() */
                return false;
 
 static PGresult *
 HandleEndOfCopyStream(PGconn *conn, char *copybuf,
                      XLogRecPtr blockpos, char *basedir, char *partial_suffix,
-                     XLogRecPtr *stoppos)
+                     XLogRecPtr *stoppos, bool mark_done)
 {
    PGresult   *res = PQgetResult(conn);
 
     */
    if (still_sending)
    {
-       if (!close_walfile(basedir, partial_suffix, blockpos))
+       if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
        {
            /* Error message written in close_walfile() */
            PQclear(res);
 static bool
 CheckCopyStreamStop(PGconn *conn, XLogRecPtr blockpos, uint32 timeline,
                    char *basedir, stream_stop_callback stream_stop,
-                   char *partial_suffix, XLogRecPtr *stoppos)
+                   char *partial_suffix, XLogRecPtr *stoppos, bool mark_done)
 {
    if (still_sending && stream_stop(blockpos, timeline, false))
    {
-       if (!close_walfile(basedir, partial_suffix, blockpos))
+       if (!close_walfile(basedir, partial_suffix, blockpos, mark_done))
        {
            /* Potential error message is written by close_walfile */
            return false;