</para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>subretaindeadtuples</structfield> <type>bool</type>
+      </para>
+      <para>
+       If true, the information (e.g., dead tuples, commit timestamps, and
+       origins) on the subscriber that is useful for conflict detection is
+       retained.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>subconninfo</structfield> <type>text</type>
 
           new setting.
           This setting has no effect if <varname>primary_conninfo</varname> is not
           set or the server is not in standby mode.
+          The name cannot be <literal>pg_conflict_detection</literal> as it is
+          reserved for the conflict detection slot.
          </para>
         </listitem>
        </varlistentry>
 
        </para>
        <para>
         Creates a new physical replication slot named
-        <parameter>slot_name</parameter>. The optional second parameter,
+        <parameter>slot_name</parameter>. The name cannot be
+        <literal>pg_conflict_detection</literal> as it is reserved for the
+        conflict detection slot. The optional second parameter,
         when <literal>true</literal>, specifies that the <acronym>LSN</acronym> for this
         replication slot be reserved immediately; otherwise
         the <acronym>LSN</acronym> is reserved on first connection from a streaming
        <para>
         Creates a new logical (decoding) replication slot named
         <parameter>slot_name</parameter> using the output plugin
-        <parameter>plugin</parameter>. The optional third
+        <parameter>plugin</parameter>. The name cannot be
+        <literal>pg_conflict_detection</literal> as it is reserved for
+        the conflict detection slot. The optional third
         parameter, <parameter>temporary</parameter>, when set to true, specifies that
         the slot should not be permanently stored to disk and is only meant
         for use by the current session. Temporary slots are also
        <para>
         Copies an existing physical replication slot named <parameter>src_slot_name</parameter>
         to a physical replication slot named <parameter>dst_slot_name</parameter>.
+        The new slot name cannot be <literal>pg_conflict_detection</literal>,
+        as it is reserved for the conflict detection.
         The copied physical slot starts to reserve WAL from the same <acronym>LSN</acronym> as the
         source slot.
         <parameter>temporary</parameter> is optional. If <parameter>temporary</parameter>
         Copies an existing logical replication slot
         named <parameter>src_slot_name</parameter> to a logical replication
         slot named <parameter>dst_slot_name</parameter>, optionally changing
-        the output plugin and persistence.  The copied logical slot starts
-        from the same <acronym>LSN</acronym> as the source logical slot.  Both
+        the output plugin and persistence.  The new slot name cannot be
+        <literal>pg_conflict_detection</literal> as it is reserved for
+        the conflict detection.  The copied logical slot starts from the same
+        <acronym>LSN</acronym> as the source logical slot.  Both
         <parameter>temporary</parameter> and <parameter>plugin</parameter> are
         optional; if they are omitted, the values of the source slot are used.
         The <literal>failover</literal> option of the source logical slot
 
     the subscriber, plus some reserve for table synchronization.
    </para>
 
+   <para>
+    <link linkend="guc-max-replication-slots"><varname>max_replication_slots</varname></link>
+    must be set to at least 1 when <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+    is enabled for any subscription.
+   </para>
+
    <para>
     <link linkend="guc-max-logical-replication-workers"><varname>max_logical_replication_workers</varname></link>
     must be set to at least the number of subscriptions (for leader apply
     dependencies on clusters before version 17.0 will silently be ignored.
    </para>
 
+   <note>
+    <para>
+     Commit timestamps and origin data are not preserved during the upgrade.
+     As a result, even if
+     <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+     is enabled, the upgraded subscriber may be unable to detect conflicts or
+     log relevant commit timestamps and origins when applying changes from the
+     publisher occurred before the upgrade. Additionally, immediately after the
+     upgrade, the vacuum may remove the deleted rows that are required for
+     conflict detection. This can affect the changes that were not replicated
+     before the upgrade. To ensure consistent conflict tracking, users should
+     ensure that all potentially conflicting changes are replicated to the
+     subscriber before initiating the upgrade.
+    </para>
+   </note>
+
    <para>
     There are some prerequisites for <application>pg_upgrade</application> to
     be able to upgrade the subscriptions. If these are not met an error
       subscriptions present in the old cluster.
      </para>
     </listitem>
+    <listitem>
+     <para>
+      If there are subscriptions with retain_dead_tuples enabled, the reserved
+      replication slot <quote><literal>pg_conflict_detection</literal></quote>
+      must not exist on the new cluster. Additionally, the
+      <link linkend="guc-wal-level"><varname>wal_level</varname></link> on the
+      new cluster must be set to <literal>replica</literal> or
+      <literal>logical</literal>.
+     </para>
+    </listitem>
    </itemizedlist>
   </sect2>
 
 
          <para>
           The name of the slot to create. Must be a valid replication slot
           name (see <xref linkend="streaming-replication-slots-manipulation"/>).
+          The name cannot be <literal>pg_conflict_detection</literal> as it
+          is reserved for the conflict detection.
          </para>
         </listitem>
        </varlistentry>
          </variablelist>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="protocol-replication-primary-status-update">
+        <term>Primary status update (B)</term>
+        <listitem>
+         <variablelist>
+          <varlistentry>
+           <term>Byte1('s')</term>
+           <listitem>
+            <para>
+             Identifies the message as a primary status update.
+            </para>
+           </listitem>
+          </varlistentry>
+
+          <varlistentry>
+           <term>Int64</term>
+           <listitem>
+            <para>
+             The latest WAL write position on the server.
+            </para>
+           </listitem>
+          </varlistentry>
+
+          <varlistentry>
+           <term>Int64</term>
+           <listitem>
+            <para>
+             The oldest transaction ID that is currently in the commit phase on
+             the server, along with its epoch. The most significant 32 bits are
+             the epoch. The least significant 32 bits are the transaction ID.
+             If no transactions are active on the server, this number will be
+             the next transaction ID to be assigned.
+            </para>
+           </listitem>
+          </varlistentry>
+
+          <varlistentry>
+           <term>Int64</term>
+           <listitem>
+            <para>
+             The next transaction ID to be assigned on the server, along with
+             its epoch. The most significant 32 bits are the epoch. The least
+             significant 32 bits are the transaction ID.
+            </para>
+           </listitem>
+          </varlistentry>
+
+          <varlistentry>
+           <term>Int64</term>
+           <listitem>
+            <para>
+             The server's system clock at the time of transmission, as
+             microseconds since midnight on 2000-01-01.
+            </para>
+           </listitem>
+          </varlistentry>
+         </variablelist>
+        </listitem>
+       </varlistentry>
       </variablelist>
 
       <para>
          </variablelist>
         </listitem>
        </varlistentry>
+
+       <varlistentry id="protocol-replication-standby-wal-status-request">
+        <term>Request primary status update (F)</term>
+        <listitem>
+         <variablelist>
+          <varlistentry>
+           <term>Byte1('p')</term>
+           <listitem>
+            <para>
+             Identifies the message as a request for a primary status update.
+            </para>
+           </listitem>
+          </varlistentry>
+
+          <varlistentry>
+           <term>Int64</term>
+           <listitem>
+            <para>
+             The client's system clock at the time of transmission, as
+             microseconds since midnight on 2000-01-01.
+            </para>
+           </listitem>
+          </varlistentry>
+         </variablelist>
+        </listitem>
+       </varlistentry>
+
       </variablelist>
      </listitem>
     </varlistentry>
 
       <link linkend="sql-createsubscription-params-with-password-required"><literal>password_required</literal></link>,
       <link linkend="sql-createsubscription-params-with-run-as-owner"><literal>run_as_owner</literal></link>,
       <link linkend="sql-createsubscription-params-with-origin"><literal>origin</literal></link>,
-      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>, and
-      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>.
+      <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>.
       Only a superuser can set <literal>password_required = false</literal>.
      </para>
 
      </para>
 
      <para>
-      The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>
-      and <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>
+      The <link linkend="sql-createsubscription-params-with-failover"><literal>failover</literal></link>,
+      <link linkend="sql-createsubscription-params-with-two-phase"><literal>two_phase</literal></link>, and
+      <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
       parameters can only be altered when the subscription is disabled.
      </para>
 
       option is changed from <literal>true</literal> to <literal>false</literal>,
       the publisher will replicate the transactions again when they are committed.
      </para>
+
+     <para>
+      If the <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+      option is altered to <literal>false</literal> and no other subscription
+      has this option enabled, the replication slot named
+      <quote><literal>pg_conflict_detection</literal></quote>, created to retain
+      dead tuples for conflict detection, will be dropped.
+     </para>
     </listitem>
    </varlistentry>
 
 
         <listitem>
          <para>
           Name of the publisher's replication slot to use.  The default is
-          to use the name of the subscription for the slot name.
+          to use the name of the subscription for the slot name. The name cannot
+          be <literal>pg_conflict_detection</literal> as it is reserved for the
+          conflict detection.
          </para>
 
          <para>
          </para>
         </listitem>
        </varlistentry>
+
+      <varlistentry id="sql-createsubscription-params-with-retain-dead-tuples">
+        <term><literal>retain_dead_tuples</literal> (<type>boolean</type>)</term>
+        <listitem>
+         <para>
+          Specifies whether the information (e.g., dead tuples, commit
+          timestamps, and origins) required for conflict detection on the
+          subscriber is retained. The default is <literal>false</literal>.
+          If set to <literal>true</literal>, a physical replication slot named
+          <quote><literal>pg_conflict_detection</literal></quote> will be
+          created on the subscriber to prevent the conflict information from
+          being removed.
+         </para>
+
+         <para>
+          Note that the information useful for conflict detection is retained
+          only after the creation of the slot. You can verify the existence of
+          this slot by querying <link linkend="view-pg-replication-slots">pg_replication_slots</link>.
+          And even if multiple subscriptions on one node enable this option,
+          only one replication slot will be created. Also,
+          <varname>wal_level</varname> must be set to <literal>replica</literal>
+          or higher to allow the replication slot to be used.
+         </para>
+
+         <caution>
+          <para>
+           Note that the information for conflict detection cannot be purged if
+           the subscription is disabled; thus, the information will accumulate
+           until the subscription is enabled. To prevent excessive accumulation,
+           it is recommended to disable <literal>retain_dead_tuples</literal>
+           if the subscription will be inactive for an extended period.
+          </para>
+
+          <para>
+           Additionally when enabling <literal>retain_dead_tuples</literal> for
+           conflict detection in logical replication, it is important to design the
+           replication topology to balance data retention requirements with
+           overall system performance. This option provides minimal performance
+           overhead when applied appropriately. The following scenarios illustrate
+           effective usage patterns when enabling this option.
+          </para>
+
+          <para>
+           a. Large Tables with Bidirectional Writes:
+           For large tables subject to concurrent writes on both publisher and
+           subscriber nodes, publishers can define row filters when creating
+           publications to segment data. This allows multiple subscriptions
+           to replicate exclusive subsets of the table in parallel, optimizing
+           the throughput.
+          </para>
+
+          <para>
+           b. Write-Enabled Subscribers:
+           If a subscriber node is expected to perform write operations, replication
+           can be structured using multiple publications and subscriptions. By
+           distributing tables across these publications, the workload is spread among
+           several apply workers, improving concurrency and reducing contention.
+          </para>
+
+          <para>
+           c. Read-Only Subscribers:
+           In configurations involving single or multiple publisher nodes
+           performing concurrent write operations, read-only subscriber nodes may
+           replicate changes without seeing a performance impact if it does index
+           scan. However, if the subscriber is impacted due to replication lag or
+           scan performance (say due to sequential scans), it needs to follow one
+           of the two previous strategies to distribute the workload on the
+           subscriber.
+          </para>
+         </caution>
+
+         <para>
+          This option cannot be enabled if the publisher is a physical standby.
+         </para>
+
+         <para>
+          Enabling this option ensures retention of information useful for
+          conflict detection solely for changes occurring locally on the
+          publisher. For the changes originating from different origins,
+          reliable conflict detection cannot be guaranteed.
+         </para>
+        </listitem>
+       </varlistentry>
       </variablelist></para>
 
     </listitem>
 
     * starting immediately after the WAL record is inserted could complete
     * without fsync'ing our state file.  (This is essentially the same kind
     * of race condition as the COMMIT-to-clog-write case that
-    * RecordTransactionCommit uses DELAY_CHKPT_START for; see notes there.)
+    * RecordTransactionCommit uses DELAY_CHKPT_IN_COMMIT for; see notes
+    * there.) Note that DELAY_CHKPT_IN_COMMIT is used to find transactions in
+    * the critical commit section. We need to know about such transactions
+    * for conflict detection in logical replication. See
+    * GetOldestActiveTransactionId(true, false) and its use.
     *
     * We save the PREPARE record's location in the gxact for later use by
     * CheckPointTwoPhase.
  * RecordTransactionCommitPrepared
  *
  * This is basically the same as RecordTransactionCommit (q.v. if you change
- * this function): in particular, we must set DELAY_CHKPT_START to avoid a
+ * this function): in particular, we must set DELAY_CHKPT_IN_COMMIT to avoid a
  * race condition.
  *
  * We know the transaction made at least one XLOG entry (its PREPARE),
                                const char *gid)
 {
    XLogRecPtr  recptr;
-   TimestampTz committs = GetCurrentTimestamp();
+   TimestampTz committs;
    bool        replorigin;
 
    /*
    START_CRIT_SECTION();
 
    /* See notes in RecordTransactionCommit */
-   Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
-   MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+   Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
+   MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+   /*
+    * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible before
+    * commit time is written.
+    */
+   pg_write_barrier();
+
+   /*
+    * Note it is important to set committs value after marking ourselves as
+    * in the commit critical section (DELAY_CHKPT_IN_COMMIT). This is because
+    * we want to ensure all transactions that have acquired commit timestamp
+    * are finished before we allow the logical replication client to advance
+    * its xid which is used to hold back dead rows for conflict detection.
+    * See comments atop worker.c.
+    */
+   committs = GetCurrentTimestamp();
 
    /*
     * Emit the XLOG commit record. Note that we mark 2PC commits as
    TransactionIdCommitTree(xid, nchildren, children);
 
    /* Checkpoint can proceed now */
-   MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+   MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
 
    END_CRIT_SECTION();
 
 
         * without holding the ProcArrayLock, since we're the only one
         * modifying it.  This makes checkpoint's determination of which xacts
         * are delaying the checkpoint a bit fuzzy, but it doesn't matter.
+        *
+        * Note, it is important to get the commit timestamp after marking the
+        * transaction in the commit critical section. See
+        * RecordTransactionCommitPrepared.
         */
-       Assert((MyProc->delayChkptFlags & DELAY_CHKPT_START) == 0);
+       Assert((MyProc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0);
        START_CRIT_SECTION();
-       MyProc->delayChkptFlags |= DELAY_CHKPT_START;
+       MyProc->delayChkptFlags |= DELAY_CHKPT_IN_COMMIT;
+
+       Assert(xactStopTimestamp == 0);
+
+       /*
+        * Ensures the DELAY_CHKPT_IN_COMMIT flag write is globally visible
+        * before commit time is written.
+        */
+       pg_write_barrier();
 
        /*
         * Insert the commit XLOG record.
     */
    if (markXidCommitted)
    {
-       MyProc->delayChkptFlags &= ~DELAY_CHKPT_START;
+       MyProc->delayChkptFlags &= ~DELAY_CHKPT_IN_COMMIT;
        END_CRIT_SECTION();
    }
 
 
     * starting snapshot of locks and transactions.
     */
    if (!shutdown && XLogStandbyInfoActive())
-       checkPoint.oldestActiveXid = GetOldestActiveTransactionId();
+       checkPoint.oldestActiveXid = GetOldestActiveTransactionId(false, true);
    else
        checkPoint.oldestActiveXid = InvalidTransactionId;
 
 
 check_primary_slot_name(char **newval, void **extra, GucSource source)
 {
    if (*newval && strcmp(*newval, "") != 0 &&
-       !ReplicationSlotValidateName(*newval, WARNING))
+       !ReplicationSlotValidateName(*newval, false, WARNING))
        return false;
 
    return true;
 
    sub->passwordrequired = subform->subpasswordrequired;
    sub->runasowner = subform->subrunasowner;
    sub->failover = subform->subfailover;
+   sub->retaindeadtuples = subform->subretaindeadtuples;
 
    /* Get conninfo */
    datum = SysCacheGetAttrNotNull(SUBSCRIPTIONOID,
 
 GRANT SELECT (oid, subdbid, subskiplsn, subname, subowner, subenabled,
               subbinary, substream, subtwophasestate, subdisableonerr,
              subpasswordrequired, subrunasowner, subfailover,
-              subslotname, subsynccommit, subpublications, suborigin)
+             subretaindeadtuples, subslotname, subsynccommit,
+             subpublications, suborigin)
     ON pg_subscription TO public;
 
 CREATE VIEW pg_stat_subscription_stats AS
 
 
 #include "postgres.h"
 
+#include "access/commit_ts.h"
 #include "access/htup_details.h"
 #include "access/table.h"
 #include "access/twophase.h"
 #define SUBOPT_PASSWORD_REQUIRED   0x00000800
 #define SUBOPT_RUN_AS_OWNER            0x00001000
 #define SUBOPT_FAILOVER                0x00002000
-#define SUBOPT_LSN                 0x00004000
-#define SUBOPT_ORIGIN              0x00008000
+#define SUBOPT_RETAIN_DEAD_TUPLES  0x00004000
+#define SUBOPT_LSN                 0x00008000
+#define SUBOPT_ORIGIN              0x00010000
 
 /* check if the 'val' has 'bits' set */
 #define IsSet(val, bits)  (((val) & (bits)) == (bits))
    bool        passwordrequired;
    bool        runasowner;
    bool        failover;
+   bool        retaindeadtuples;
    char       *origin;
    XLogRecPtr  lsn;
 } SubOpts;
 static List *fetch_table_list(WalReceiverConn *wrconn, List *publications);
 static void check_publications_origin(WalReceiverConn *wrconn,
                                      List *publications, bool copydata,
-                                     char *origin, Oid *subrel_local_oids,
-                                     int subrel_count, char *subname);
+                                     bool retain_dead_tuples, char *origin,
+                                     Oid *subrel_local_oids, int subrel_count,
+                                     char *subname);
+static void check_pub_dead_tuple_retention(WalReceiverConn *wrconn);
 static void check_duplicates_in_publist(List *publist, Datum *datums);
 static List *merge_publications(List *oldpublist, List *newpublist, bool addpub, const char *subname);
 static void ReportSlotConnectionError(List *rstates, Oid subid, char *slotname, char *err);
        opts->runasowner = false;
    if (IsSet(supported_opts, SUBOPT_FAILOVER))
        opts->failover = false;
+   if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+       opts->retaindeadtuples = false;
    if (IsSet(supported_opts, SUBOPT_ORIGIN))
        opts->origin = pstrdup(LOGICALREP_ORIGIN_ANY);
 
            if (strcmp(opts->slot_name, "none") == 0)
                opts->slot_name = NULL;
            else
-               ReplicationSlotValidateName(opts->slot_name, ERROR);
+               ReplicationSlotValidateName(opts->slot_name, false, ERROR);
        }
        else if (IsSet(supported_opts, SUBOPT_COPY_DATA) &&
                 strcmp(defel->defname, "copy_data") == 0)
            opts->specified_opts |= SUBOPT_FAILOVER;
            opts->failover = defGetBoolean(defel);
        }
+       else if (IsSet(supported_opts, SUBOPT_RETAIN_DEAD_TUPLES) &&
+                strcmp(defel->defname, "retain_dead_tuples") == 0)
+       {
+           if (IsSet(opts->specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+               errorConflictingDefElem(defel, pstate);
+
+           opts->specified_opts |= SUBOPT_RETAIN_DEAD_TUPLES;
+           opts->retaindeadtuples = defGetBoolean(defel);
+       }
        else if (IsSet(supported_opts, SUBOPT_ORIGIN) &&
                 strcmp(defel->defname, "origin") == 0)
        {
                      SUBOPT_SYNCHRONOUS_COMMIT | SUBOPT_BINARY |
                      SUBOPT_STREAMING | SUBOPT_TWOPHASE_COMMIT |
                      SUBOPT_DISABLE_ON_ERR | SUBOPT_PASSWORD_REQUIRED |
-                     SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER | SUBOPT_ORIGIN);
+                     SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
+                     SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
    parse_subscription_options(pstate, stmt->options, supported_opts, &opts);
 
    /*
                        stmt->subname)));
    }
 
+   /* Ensure that we can enable retain_dead_tuples */
+   if (opts.retaindeadtuples)
+       CheckSubDeadTupleRetention(true, !opts.enabled, WARNING);
+
    if (!IsSet(opts.specified_opts, SUBOPT_SLOT_NAME) &&
        opts.slot_name == NULL)
        opts.slot_name = stmt->subname;
    values[Anum_pg_subscription_subpasswordrequired - 1] = BoolGetDatum(opts.passwordrequired);
    values[Anum_pg_subscription_subrunasowner - 1] = BoolGetDatum(opts.runasowner);
    values[Anum_pg_subscription_subfailover - 1] = BoolGetDatum(opts.failover);
+   values[Anum_pg_subscription_subretaindeadtuples - 1] =
+       BoolGetDatum(opts.retaindeadtuples);
    values[Anum_pg_subscription_subconninfo - 1] =
        CStringGetTextDatum(conninfo);
    if (opts.slot_name)
        {
            check_publications(wrconn, publications);
            check_publications_origin(wrconn, publications, opts.copy_data,
-                                     opts.origin, NULL, 0, stmt->subname);
+                                     opts.retaindeadtuples, opts.origin,
+                                     NULL, 0, stmt->subname);
+
+           if (opts.retaindeadtuples)
+               check_pub_dead_tuple_retention(wrconn);
 
            /*
             * Set sync state based on if we were asked to do data copy or
              sizeof(Oid), oid_cmp);
 
        check_publications_origin(wrconn, sub->publications, copy_data,
-                                 sub->origin, subrel_local_oids,
-                                 subrel_count, sub->name);
+                                 sub->retaindeadtuples, sub->origin,
+                                 subrel_local_oids, subrel_count, sub->name);
 
        /*
         * Rels that we want to remove from subscription and drop any slots
 }
 
 /*
- * Common checks for altering failover and two_phase options.
+ * Common checks for altering failover, two_phase, and retain_dead_tuples
+ * options.
  */
 static void
 CheckAlterSubOption(Subscription *sub, const char *option,
                    bool slot_needs_update, bool isTopLevel)
 {
+   Assert(strcmp(option, "failover") == 0 ||
+          strcmp(option, "two_phase") == 0 ||
+          strcmp(option, "retain_dead_tuples") == 0);
+
    /*
-    * The checks in this function are required only for failover and
-    * two_phase options.
+    * Altering the retain_dead_tuples option does not update the slot on the
+    * publisher.
     */
-   Assert(strcmp(option, "failover") == 0 ||
-          strcmp(option, "two_phase") == 0);
+   Assert(!slot_needs_update || strcmp(option, "retain_dead_tuples") != 0);
 
    /*
     * Do not allow changing the option if the subscription is enabled. This
     * the publisher by the existing walsender, so we could have allowed that
     * even when the subscription is enabled. But we kept this restriction for
     * the sake of consistency and simplicity.
+    *
+    * Additionally, do not allow changing the retain_dead_tuples option when
+    * the subscription is enabled to prevent race conditions arising from the
+    * new option value being acknowledged asynchronously by the launcher and
+    * apply workers.
+    *
+    * Without the restriction, a race condition may arise when a user
+    * disables and immediately re-enables the retain_dead_tuples option. In
+    * this case, the launcher might drop the slot upon noticing the disabled
+    * action, while the apply worker may keep maintaining
+    * oldest_nonremovable_xid without noticing the option change. During this
+    * period, a transaction ID wraparound could falsely make this ID appear
+    * as if it originates from the future w.r.t the transaction ID stored in
+    * the slot maintained by launcher.
+    *
+    * Similarly, if the user enables retain_dead_tuples concurrently with the
+    * launcher starting the worker, the apply worker may start calculating
+    * oldest_nonremovable_xid before the launcher notices the enable action.
+    * Consequently, the launcher may update slot.xmin to a newer value than
+    * that maintained by the worker. In subsequent cycles, upon integrating
+    * the worker's oldest_nonremovable_xid, the launcher might detect a
+    * retreat in the calculated xmin, necessitating additional handling.
+    *
+    * XXX To address the above race conditions, we can define
+    * oldest_nonremovable_xid as FullTransactionID and adds the check to
+    * disallow retreating the conflict slot's xmin. For now, we kept the
+    * implementation simple by disallowing change to the retain_dead_tuples,
+    * but in the future we can change this after some more analysis.
+    *
+    * Note that we could restrict only the enabling of retain_dead_tuples to
+    * avoid the race conditions described above, but we maintain the
+    * restriction for both enable and disable operations for the sake of
+    * consistency.
     */
    if (sub->enabled)
        ereport(ERROR,
    bool        update_tuple = false;
    bool        update_failover = false;
    bool        update_two_phase = false;
+   bool        check_pub_rdt = false;
+   bool        retain_dead_tuples;
+   char       *origin;
    Subscription *sub;
    Form_pg_subscription form;
    bits32      supported_opts;
 
    sub = GetSubscription(subid, false);
 
+   retain_dead_tuples = sub->retaindeadtuples;
+   origin = sub->origin;
+
    /*
     * Don't allow non-superuser modification of a subscription with
     * password_required=false.
                                  SUBOPT_DISABLE_ON_ERR |
                                  SUBOPT_PASSWORD_REQUIRED |
                                  SUBOPT_RUN_AS_OWNER | SUBOPT_FAILOVER |
-                                 SUBOPT_ORIGIN);
+                                 SUBOPT_RETAIN_DEAD_TUPLES | SUBOPT_ORIGIN);
 
                parse_subscription_options(pstate, stmt->options,
                                           supported_opts, &opts);
                    replaces[Anum_pg_subscription_subfailover - 1] = true;
                }
 
+               if (IsSet(opts.specified_opts, SUBOPT_RETAIN_DEAD_TUPLES))
+               {
+                   values[Anum_pg_subscription_subretaindeadtuples - 1] =
+                       BoolGetDatum(opts.retaindeadtuples);
+                   replaces[Anum_pg_subscription_subretaindeadtuples - 1] = true;
+
+                   CheckAlterSubOption(sub, "retain_dead_tuples", false, isTopLevel);
+
+                   /*
+                    * Workers may continue running even after the
+                    * subscription has been disabled.
+                    *
+                    * To prevent race conditions (as described in
+                    * CheckAlterSubOption()), ensure that all worker
+                    * processes have already exited before proceeding.
+                    */
+                   if (logicalrep_workers_find(subid, true, true))
+                       ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+                                errmsg("cannot alter retain_dead_tuples when logical replication worker is still running"),
+                                errhint("Try again after some time.")));
+
+                   /*
+                    * Remind the user that enabling subscription will prevent
+                    * the accumulation of dead tuples.
+                    */
+                   if (opts.retaindeadtuples)
+                       CheckSubDeadTupleRetention(true, !sub->enabled, NOTICE);
+
+                   /*
+                    * Notify the launcher to manage the replication slot for
+                    * conflict detection. This ensures that replication slot
+                    * is efficiently handled (created, updated, or dropped)
+                    * in response to any configuration changes.
+                    */
+                   ApplyLauncherWakeupAtCommit();
+
+                   check_pub_rdt = opts.retaindeadtuples;
+                   retain_dead_tuples = opts.retaindeadtuples;
+               }
+
                if (IsSet(opts.specified_opts, SUBOPT_ORIGIN))
                {
                    values[Anum_pg_subscription_suborigin - 1] =
                        CStringGetTextDatum(opts.origin);
                    replaces[Anum_pg_subscription_suborigin - 1] = true;
+
+                   /*
+                    * Check if changes from different origins may be received
+                    * from the publisher when the origin is changed to ANY
+                    * and retain_dead_tuples is enabled.
+                    */
+                   check_pub_rdt = retain_dead_tuples &&
+                       pg_strcasecmp(opts.origin, LOGICALREP_ORIGIN_ANY) == 0;
+
+                   origin = opts.origin;
                }
 
                update_tuple = true;
                            (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                             errmsg("cannot enable subscription that does not have a slot name")));
 
+               /*
+                * Check track_commit_timestamp only when enabling the
+                * subscription in case it was disabled after creation. See
+                * comments atop CheckSubDeadTupleRetention() for details.
+                */
+               if (sub->retaindeadtuples)
+                   CheckSubDeadTupleRetention(opts.enabled, !opts.enabled,
+                                              WARNING);
+
                values[Anum_pg_subscription_subenabled - 1] =
                    BoolGetDatum(opts.enabled);
                replaces[Anum_pg_subscription_subenabled - 1] = true;
                    ApplyLauncherWakeupAtCommit();
 
                update_tuple = true;
+
+               /*
+                * The subscription might be initially created with
+                * connect=false and retain_dead_tuples=true, meaning the
+                * remote server's status may not be checked. Ensure this
+                * check is conducted now.
+                */
+               check_pub_rdt = sub->retaindeadtuples && opts.enabled;
                break;
            }
 
                CStringGetTextDatum(stmt->conninfo);
            replaces[Anum_pg_subscription_subconninfo - 1] = true;
            update_tuple = true;
+
+           /*
+            * Since the remote server configuration might have changed,
+            * perform a check to ensure it permits enabling
+            * retain_dead_tuples.
+            */
+           check_pub_rdt = sub->retaindeadtuples;
            break;
 
        case ALTER_SUBSCRIPTION_SET_PUBLICATION:
    }
 
    /*
-    * Try to acquire the connection necessary for altering the slot, if
-    * needed.
+    * Try to acquire the connection necessary either for modifying the slot
+    * or for checking if the remote server permits enabling
+    * retain_dead_tuples.
     *
     * This has to be at the end because otherwise if there is an error while
     * doing the database operations we won't be able to rollback altered
     * slot.
     */
-   if (update_failover || update_two_phase)
+   if (update_failover || update_two_phase || check_pub_rdt)
    {
        bool        must_use_password;
        char       *err;
        /* Load the library providing us libpq calls. */
        load_file("libpqwalreceiver", false);
 
-       /* Try to connect to the publisher. */
+       /*
+        * Try to connect to the publisher, using the new connection string if
+        * available.
+        */
        must_use_password = sub->passwordrequired && !sub->ownersuperuser;
-       wrconn = walrcv_connect(sub->conninfo, true, true, must_use_password,
-                               sub->name, &err);
+       wrconn = walrcv_connect(stmt->conninfo ? stmt->conninfo : sub->conninfo,
+                               true, true, must_use_password, sub->name,
+                               &err);
        if (!wrconn)
            ereport(ERROR,
                    (errcode(ERRCODE_CONNECTION_FAILURE),
 
        PG_TRY();
        {
-           walrcv_alter_slot(wrconn, sub->slotname,
-                             update_failover ? &opts.failover : NULL,
-                             update_two_phase ? &opts.twophase : NULL);
+           if (retain_dead_tuples)
+               check_pub_dead_tuple_retention(wrconn);
+
+           check_publications_origin(wrconn, sub->publications, false,
+                                     retain_dead_tuples, origin, NULL, 0,
+                                     sub->name);
+
+           if (update_failover || update_two_phase)
+               walrcv_alter_slot(wrconn, sub->slotname,
+                                 update_failover ? &opts.failover : NULL,
+                                 update_two_phase ? &opts.twophase : NULL);
        }
        PG_FINALLY();
        {
  * Check and log a warning if the publisher has subscribed to the same table,
  * its partition ancestors (if it's a partition), or its partition children (if
  * it's a partitioned table), from some other publishers. This check is
- * required only if "copy_data = true" and "origin = none" for CREATE
- * SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements to notify the
- * user that data having origin might have been copied.
+ * required in the following scenarios:
  *
- * This check need not be performed on the tables that are already added
- * because incremental sync for those tables will happen through WAL and the
- * origin of the data can be identified from the WAL records.
+ * 1) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ *    with "copy_data = true" and "origin = none":
+ *    - Warn the user that data with an origin might have been copied.
+ *    - This check is skipped for tables already added, as incremental sync via
+ *      WAL allows origin tracking. The list of such tables is in
+ *      subrel_local_oids.
  *
- * subrel_local_oids contains the list of relation oids that are already
- * present on the subscriber.
+ * 2) For CREATE SUBSCRIPTION and ALTER SUBSCRIPTION ... REFRESH statements
+ *    with "retain_dead_tuples = true" and "origin = any", and for ALTER
+ *    SUBSCRIPTION statements that modify retain_dead_tuples or origin, or
+ *    when the publisher's status changes (e.g., due to a connection string
+ *    update):
+ *    - Warn the user that only conflict detection info for local changes on
+ *      the publisher is retained. Data from other origins may lack sufficient
+ *      details for reliable conflict detection.
+ *    - See comments atop worker.c for more details.
  */
 static void
 check_publications_origin(WalReceiverConn *wrconn, List *publications,
-                         bool copydata, char *origin, Oid *subrel_local_oids,
+                         bool copydata, bool retain_dead_tuples,
+                         char *origin, Oid *subrel_local_oids,
                          int subrel_count, char *subname)
 {
    WalRcvExecResult *res;
    Oid         tableRow[1] = {TEXTOID};
    List       *publist = NIL;
    int         i;
+   bool        check_rdt;
+   bool        check_table_sync;
+   bool        origin_none = origin &&
+       pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) == 0;
+
+   /*
+    * Enable retain_dead_tuples checks only when origin is set to 'any',
+    * since with origin='none' only local changes are replicated to the
+    * subscriber.
+    */
+   check_rdt = retain_dead_tuples && !origin_none;
+
+   /*
+    * Enable table synchronization checks only when origin is 'none', to
+    * ensure that data from other origins is not inadvertently copied.
+    */
+   check_table_sync = copydata && origin_none;
 
-   if (!copydata || !origin ||
-       (pg_strcasecmp(origin, LOGICALREP_ORIGIN_NONE) != 0))
+   /* retain_dead_tuples and table sync checks occur separately */
+   Assert(!(check_rdt && check_table_sync));
+
+   /* Return if no checks are required */
+   if (!check_rdt && !check_table_sync)
        return;
 
    initStringInfo(&cmd);
    /*
     * In case of ALTER SUBSCRIPTION ... REFRESH, subrel_local_oids contains
     * the list of relation oids that are already present on the subscriber.
-    * This check should be skipped for these tables.
+    * This check should be skipped for these tables if checking for table
+    * sync scenario. However, when handling the retain_dead_tuples scenario,
+    * ensure all tables are checked, as some existing tables may now include
+    * changes from other origins due to newly created subscriptions on the
+    * publisher.
     */
-   for (i = 0; i < subrel_count; i++)
+   if (check_table_sync)
    {
-       Oid         relid = subrel_local_oids[i];
-       char       *schemaname = get_namespace_name(get_rel_namespace(relid));
-       char       *tablename = get_rel_name(relid);
+       for (i = 0; i < subrel_count; i++)
+       {
+           Oid         relid = subrel_local_oids[i];
+           char       *schemaname = get_namespace_name(get_rel_namespace(relid));
+           char       *tablename = get_rel_name(relid);
 
-       appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
-                        schemaname, tablename);
+           appendStringInfo(&cmd, "AND NOT (N.nspname = '%s' AND C.relname = '%s')\n",
+                            schemaname, tablename);
+       }
    }
 
    res = walrcv_exec(wrconn, cmd.data, 1, tableRow);
     * XXX: For simplicity, we don't check whether the table has any data or
     * not. If the table doesn't have any data then we don't need to
     * distinguish between data having origin and data not having origin so we
-    * can avoid logging a warning in that case.
+    * can avoid logging a warning for table sync scenario.
     */
    if (publist)
    {
        StringInfo  pubnames = makeStringInfo();
+       StringInfo  err_msg = makeStringInfo();
+       StringInfo  err_hint = makeStringInfo();
 
        /* Prepare the list of publication(s) for warning message. */
        GetPublicationsStr(publist, pubnames, false);
+
+       if (check_table_sync)
+       {
+           appendStringInfo(err_msg, _("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin"),
+                            subname);
+           appendStringInfoString(err_hint, _("Verify that initial data copied from the publisher tables did not come from other origins."));
+       }
+       else
+       {
+           appendStringInfo(err_msg, _("subscription \"%s\" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins"),
+                            subname);
+           appendStringInfoString(err_hint, _("Consider using origin = NONE or disabling retain_dead_tuples."));
+       }
+
        ereport(WARNING,
                errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
-               errmsg("subscription \"%s\" requested copy_data with origin = NONE but might copy data that had a different origin",
-                      subname),
-               errdetail_plural("The subscription being created subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
-                                "The subscription being created subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
+               errmsg_internal("%s", err_msg->data),
+               errdetail_plural("The subscription subscribes to a publication (%s) that contains tables that are written to by other subscriptions.",
+                                "The subscription subscribes to publications (%s) that contain tables that are written to by other subscriptions.",
                                 list_length(publist), pubnames->data),
-               errhint("Verify that initial data copied from the publisher tables did not come from other origins."));
+               errhint_internal("%s", err_hint->data));
    }
 
    ExecDropSingleTupleTableSlot(slot);
    walrcv_clear_result(res);
 }
 
+/*
+ * Determine whether the retain_dead_tuples can be enabled based on the
+ * publisher's status.
+ *
+ * This option is disallowed if the publisher is running a version earlier
+ * than the PG19, or if the publisher is in recovery (i.e., it is a standby
+ * server).
+ *
+ * See comments atop worker.c for a detailed explanation.
+ */
+static void
+check_pub_dead_tuple_retention(WalReceiverConn *wrconn)
+{
+   WalRcvExecResult *res;
+   Oid         RecoveryRow[1] = {BOOLOID};
+   TupleTableSlot *slot;
+   bool        isnull;
+   bool        remote_in_recovery;
+
+   if (walrcv_server_version(wrconn) < 19000)
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("cannot enable retain_dead_tuples if the publisher is running a version earlier than PostgreSQL 19"));
+
+   res = walrcv_exec(wrconn, "SELECT pg_is_in_recovery()", 1, RecoveryRow);
+
+   if (res->status != WALRCV_OK_TUPLES)
+       ereport(ERROR,
+               (errcode(ERRCODE_CONNECTION_FAILURE),
+                errmsg("could not obtain recovery progress from the publisher: %s",
+                       res->err)));
+
+   slot = MakeSingleTupleTableSlot(res->tupledesc, &TTSOpsMinimalTuple);
+   if (!tuplestore_gettupleslot(res->tuplestore, true, false, slot))
+       elog(ERROR, "failed to fetch tuple for the recovery progress");
+
+   remote_in_recovery = DatumGetBool(slot_getattr(slot, 1, &isnull));
+
+   if (remote_in_recovery)
+       ereport(ERROR,
+               errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+               errmsg("cannot enable retain_dead_tuples if the publisher is in recovery."));
+
+   ExecDropSingleTupleTableSlot(slot);
+
+   walrcv_clear_result(res);
+}
+
+/*
+ * Check if the subscriber's configuration is adequate to enable the
+ * retain_dead_tuples option.
+ *
+ * Issue an ERROR if the wal_level does not support the use of replication
+ * slots when check_guc is set to true.
+ *
+ * Issue a WARNING if track_commit_timestamp is not enabled when check_guc is
+ * set to true. This is only to highlight the importance of enabling
+ * track_commit_timestamp instead of catching all the misconfigurations, as
+ * this setting can be adjusted after subscription creation. Without it, the
+ * apply worker will simply skip conflict detection.
+ *
+ * Issue a WARNING or NOTICE if the subscription is disabled. Do not raise an
+ * ERROR since users can only modify retain_dead_tuples for disabled
+ * subscriptions. And as long as the subscription is enabled promptly, it will
+ * not pose issues.
+ */
+void
+CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+                          int elevel_for_sub_disabled)
+{
+   Assert(elevel_for_sub_disabled == NOTICE ||
+          elevel_for_sub_disabled == WARNING);
+
+   if (check_guc && wal_level < WAL_LEVEL_REPLICA)
+       ereport(ERROR,
+               errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
+               errmsg("\"wal_level\" is insufficient to create the replication slot required by retain_dead_tuples"),
+               errhint("\"wal_level\" must be set to \"replica\" or \"logical\" at server start."));
+
+   if (check_guc && !track_commit_timestamp)
+       ereport(WARNING,
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("commit timestamp and origin data required for detecting conflicts won't be retained"),
+               errhint("Consider setting \"%s\" to true.",
+                       "track_commit_timestamp"));
+
+   if (sub_disabled)
+       ereport(elevel_for_sub_disabled,
+               errcode(ERRCODE_INVALID_PARAMETER_VALUE),
+               errmsg("deleted rows to detect conflicts would not be removed until the subscription is enabled"),
+               (elevel_for_sub_disabled > NOTICE)
+               ? errhint("Consider setting %s to false.",
+                         "retain_dead_tuples") : 0);
+}
+
 /*
  * Get the list of tables which belong to specified publications on the
  * publisher connection.
 
                                        MySubscription->name,
                                        MyLogicalRepWorker->userid,
                                        InvalidOid,
-                                       dsm_segment_handle(winfo->dsm_seg));
+                                       dsm_segment_handle(winfo->dsm_seg),
+                                       false);
 
    if (launched)
    {
 
 #include "postmaster/interrupt.h"
 #include "replication/logicallauncher.h"
 #include "replication/origin.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "storage/ipc.h"
 static bool on_commit_launcher_wakeup = false;
 
 
-static void ApplyLauncherWakeup(void);
 static void logicalrep_launcher_onexit(int code, Datum arg);
 static void logicalrep_worker_onexit(int code, Datum arg);
 static void logicalrep_worker_detach(void);
 static void logicalrep_launcher_attach_dshmem(void);
 static void ApplyLauncherSetWorkerStartTime(Oid subid, TimestampTz start_time);
 static TimestampTz ApplyLauncherGetWorkerStartTime(Oid subid);
+static void compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin);
+static bool acquire_conflict_slot_if_exists(void);
+static void advance_conflict_slot_xmin(TransactionId new_xmin);
 
 
 /*
        sub->owner = subform->subowner;
        sub->enabled = subform->subenabled;
        sub->name = pstrdup(NameStr(subform->subname));
+       sub->retaindeadtuples = subform->subretaindeadtuples;
        /* We don't fill fields we are not interested in. */
 
        res = lappend(res, sub);
 bool
 logicalrep_worker_launch(LogicalRepWorkerType wtype,
                         Oid dbid, Oid subid, const char *subname, Oid userid,
-                        Oid relid, dsm_handle subworker_dsm)
+                        Oid relid, dsm_handle subworker_dsm,
+                        bool retain_dead_tuples)
 {
    BackgroundWorker bgw;
    BackgroundWorkerHandle *bgw_handle;
     * - must be valid worker type
     * - tablesync workers are only ones to have relid
     * - parallel apply worker is the only kind of subworker
+    * - The replication slot used in conflict detection is created when
+    *   retain_dead_tuples is enabled
     */
    Assert(wtype != WORKERTYPE_UNKNOWN);
    Assert(is_tablesync_worker == OidIsValid(relid));
    Assert(is_parallel_apply_worker == (subworker_dsm != DSM_HANDLE_INVALID));
+   Assert(!retain_dead_tuples || MyReplicationSlot);
 
    ereport(DEBUG1,
            (errmsg_internal("starting logical replication worker for subscription \"%s\"",
    worker->stream_fileset = NULL;
    worker->leader_pid = is_parallel_apply_worker ? MyProcPid : InvalidPid;
    worker->parallel_apply = is_parallel_apply_worker;
+   worker->oldest_nonremovable_xid = retain_dead_tuples
+       ? MyReplicationSlot->data.xmin
+       : InvalidTransactionId;
    worker->last_lsn = InvalidXLogRecPtr;
    TIMESTAMP_NOBEGIN(worker->last_send_time);
    TIMESTAMP_NOBEGIN(worker->last_recv_time);
        on_commit_launcher_wakeup = true;
 }
 
-static void
+/*
+ * Wakeup the launcher immediately.
+ */
+void
 ApplyLauncherWakeup(void)
 {
    if (LogicalRepCtx->launcher_pid != 0)
     */
    BackgroundWorkerInitializeConnection(NULL, NULL, 0);
 
+   /*
+    * Acquire the conflict detection slot at startup to ensure it can be
+    * dropped if no longer needed after a restart.
+    */
+   acquire_conflict_slot_if_exists();
+
    /* Enter main loop */
    for (;;)
    {
        MemoryContext subctx;
        MemoryContext oldctx;
        long        wait_time = DEFAULT_NAPTIME_PER_CYCLE;
+       bool        can_advance_xmin = true;
+       bool        retain_dead_tuples = false;
+       TransactionId xmin = InvalidTransactionId;
 
        CHECK_FOR_INTERRUPTS();
 
                                       ALLOCSET_DEFAULT_SIZES);
        oldctx = MemoryContextSwitchTo(subctx);
 
-       /* Start any missing workers for enabled subscriptions. */
+       /*
+        * Start any missing workers for enabled subscriptions.
+        *
+        * Also, during the iteration through all subscriptions, we compute
+        * the minimum XID required to protect deleted tuples for conflict
+        * detection if one of the subscription enables retain_dead_tuples
+        * option.
+        */
        sublist = get_subscription_list();
        foreach(lc, sublist)
        {
            TimestampTz now;
            long        elapsed;
 
+           if (sub->retaindeadtuples)
+           {
+               retain_dead_tuples = true;
+
+               /*
+                * Can't advance xmin of the slot unless all the subscriptions
+                * with retain_dead_tuples are enabled. This is required to
+                * ensure that we don't advance the xmin of
+                * CONFLICT_DETECTION_SLOT if one of the subscriptions is not
+                * enabled. Otherwise, we won't be able to detect conflicts
+                * reliably for such a subscription even though it has set the
+                * retain_dead_tuples option.
+                */
+               can_advance_xmin &= sub->enabled;
+
+               /*
+                * Create a replication slot to retain information necessary
+                * for conflict detection such as dead tuples, commit
+                * timestamps, and origins.
+                *
+                * The slot is created before starting the apply worker to
+                * prevent it from unnecessarily maintaining its
+                * oldest_nonremovable_xid.
+                *
+                * The slot is created even for a disabled subscription to
+                * ensure that conflict-related information is available when
+                * applying remote changes that occurred before the
+                * subscription was enabled.
+                */
+               CreateConflictDetectionSlot();
+           }
+
            if (!sub->enabled)
                continue;
 
            LWLockRelease(LogicalRepWorkerLock);
 
            if (w != NULL)
-               continue;       /* worker is running already */
+           {
+               /*
+                * Compute the minimum xmin required to protect dead tuples
+                * required for conflict detection among all running apply
+                * workers that enables retain_dead_tuples.
+                */
+               if (sub->retaindeadtuples && can_advance_xmin)
+                   compute_min_nonremovable_xid(w, &xmin);
+
+               /* worker is running already */
+               continue;
+           }
+
+           /*
+            * Can't advance xmin of the slot unless all the workers
+            * corresponding to subscriptions with retain_dead_tuples are
+            * running, disabling the further computation of the minimum
+            * nonremovable xid.
+            */
+           if (sub->retaindeadtuples)
+               can_advance_xmin = false;
 
            /*
             * If the worker is eligible to start now, launch it.  Otherwise,
                if (!logicalrep_worker_launch(WORKERTYPE_APPLY,
                                              sub->dbid, sub->oid, sub->name,
                                              sub->owner, InvalidOid,
-                                             DSM_HANDLE_INVALID))
+                                             DSM_HANDLE_INVALID,
+                                             sub->retaindeadtuples))
                {
                    /*
                     * We get here either if we failed to launch a worker
            }
        }
 
+       /*
+        * Drop the CONFLICT_DETECTION_SLOT slot if there is no subscription
+        * that requires us to retain dead tuples. Otherwise, if required,
+        * advance the slot's xmin to protect dead tuples required for the
+        * conflict detection.
+        */
+       if (MyReplicationSlot)
+       {
+           if (!retain_dead_tuples)
+               ReplicationSlotDropAcquired();
+           else if (can_advance_xmin)
+               advance_conflict_slot_xmin(xmin);
+       }
+
        /* Switch back to original memory context. */
        MemoryContextSwitchTo(oldctx);
        /* Clean the temporary memory. */
    /* Not reachable */
 }
 
+/*
+ * Determine the minimum non-removable transaction ID across all apply workers
+ * for subscriptions that have retain_dead_tuples enabled. Store the result
+ * in *xmin.
+ */
+static void
+compute_min_nonremovable_xid(LogicalRepWorker *worker, TransactionId *xmin)
+{
+   TransactionId nonremovable_xid;
+
+   Assert(worker != NULL);
+
+   /*
+    * The replication slot for conflict detection must be created before the
+    * worker starts.
+    */
+   Assert(MyReplicationSlot);
+
+   SpinLockAcquire(&worker->relmutex);
+   nonremovable_xid = worker->oldest_nonremovable_xid;
+   SpinLockRelease(&worker->relmutex);
+
+   Assert(TransactionIdIsValid(nonremovable_xid));
+
+   if (!TransactionIdIsValid(*xmin) ||
+       TransactionIdPrecedes(nonremovable_xid, *xmin))
+       *xmin = nonremovable_xid;
+}
+
+/*
+ * Acquire the replication slot used to retain information for conflict
+ * detection, if it exists.
+ *
+ * Return true if successfully acquired, otherwise return false.
+ */
+static bool
+acquire_conflict_slot_if_exists(void)
+{
+   if (!SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true))
+       return false;
+
+   ReplicationSlotAcquire(CONFLICT_DETECTION_SLOT, true, false);
+   return true;
+}
+
+/*
+ * Advance the xmin the replication slot used to retain information required
+ * for conflict detection.
+ */
+static void
+advance_conflict_slot_xmin(TransactionId new_xmin)
+{
+   Assert(MyReplicationSlot);
+   Assert(TransactionIdIsValid(new_xmin));
+   Assert(TransactionIdPrecedesOrEquals(MyReplicationSlot->data.xmin, new_xmin));
+
+   /* Return if the xmin value of the slot cannot be advanced */
+   if (TransactionIdEquals(MyReplicationSlot->data.xmin, new_xmin))
+       return;
+
+   SpinLockAcquire(&MyReplicationSlot->mutex);
+   MyReplicationSlot->effective_xmin = new_xmin;
+   MyReplicationSlot->data.xmin = new_xmin;
+   SpinLockRelease(&MyReplicationSlot->mutex);
+
+   elog(DEBUG1, "updated xmin: %u", MyReplicationSlot->data.xmin);
+
+   ReplicationSlotMarkDirty();
+   ReplicationSlotsComputeRequiredXmin(false);
+
+   /*
+    * Like PhysicalConfirmReceivedLocation(), do not save slot information
+    * each time. This is acceptable because all concurrent transactions on
+    * the publisher that require the data preceding the slot's xmin should
+    * have already been applied and flushed on the subscriber before the xmin
+    * is advanced. So, even if the slot's xmin regresses after a restart, it
+    * will be advanced again in the next cycle. Therefore, no data required
+    * for conflict detection will be prematurely removed.
+    */
+   return;
+}
+
+/*
+ * Create and acquire the replication slot used to retain information for
+ * conflict detection, if not yet.
+ */
+void
+CreateConflictDetectionSlot(void)
+{
+   TransactionId xmin_horizon;
+
+   /* Exit early, if the replication slot is already created and acquired */
+   if (MyReplicationSlot)
+       return;
+
+   ereport(LOG,
+           errmsg("creating replication conflict detection slot"));
+
+   ReplicationSlotCreate(CONFLICT_DETECTION_SLOT, false, RS_PERSISTENT, false,
+                         false, false);
+
+   LWLockAcquire(ProcArrayLock, LW_EXCLUSIVE);
+
+   xmin_horizon = GetOldestSafeDecodingTransactionId(false);
+
+   SpinLockAcquire(&MyReplicationSlot->mutex);
+   MyReplicationSlot->effective_xmin = xmin_horizon;
+   MyReplicationSlot->data.xmin = xmin_horizon;
+   SpinLockRelease(&MyReplicationSlot->mutex);
+
+   ReplicationSlotsComputeRequiredXmin(true);
+
+   LWLockRelease(ProcArrayLock);
+
+   /* Write this slot to disk */
+   ReplicationSlotMarkDirty();
+   ReplicationSlotSave();
+}
+
 /*
  * Is current process the logical replication launcher?
  */
 
            continue;
 
        /* if it cannot be a slot, skip the directory */
-       if (!ReplicationSlotValidateName(logical_de->d_name, DEBUG2))
+       if (!ReplicationSlotValidateName(logical_de->d_name, true, DEBUG2))
            continue;
 
        /*
 
                                                        MySubscription->name,
                                                        MyLogicalRepWorker->userid,
                                                        rstate->relid,
-                                                       DSM_HANDLE_INVALID);
+                                                       DSM_HANDLE_INVALID,
+                                                       false);
                    }
                }
            }
 
  * failover = true when creating the subscription. Enabling failover allows us
  * to smoothly transition to the promoted standby, ensuring that we can
  * subscribe to the new primary without losing any data.
+ *
+ * RETAIN DEAD TUPLES
+ * ----------------------
+ * Each apply worker that enabled retain_dead_tuples option maintains a
+ * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
+ * prevent dead rows from being removed prematurely when the apply worker still
+ * needs them to detect conflicts reliably. This helps to retain the required
+ * commit_ts module information, which further helps to detect
+ * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * otherwise, vacuum freeze could remove the required information.
+ *
+ * The logical replication launcher manages an internal replication slot named
+ * "pg_conflict_detection". It asynchronously aggregates the non-removable
+ * transaction ID from all apply workers to determine the appropriate xmin for
+ * the slot, thereby retaining necessary tuples.
+ *
+ * The non-removable transaction ID in the apply worker is advanced to the
+ * oldest running transaction ID once all concurrent transactions on the
+ * publisher have been applied and flushed locally. The process involves:
+ *
+ * - RDT_GET_CANDIDATE_XID:
+ *   Call GetOldestActiveTransactionId() to take oldestRunningXid as the
+ *   candidate xid.
+ *
+ * - RDT_REQUEST_PUBLISHER_STATUS:
+ *   Send a message to the walsender requesting the publisher status, which
+ *   includes the latest WAL write position and information about transactions
+ *   that are in the commit phase.
+ *
+ * - RDT_WAIT_FOR_PUBLISHER_STATUS:
+ *   Wait for the status from the walsender. After receiving the first status,
+ *   do not proceed if there are concurrent remote transactions that are still
+ *   in the commit phase. These transactions might have been assigned an
+ *   earlier commit timestamp but have not yet written the commit WAL record.
+ *   Continue to request the publisher status (RDT_REQUEST_PUBLISHER_STATUS)
+ *   until all these transactions have completed.
+ *
+ * - RDT_WAIT_FOR_LOCAL_FLUSH:
+ *   Advance the non-removable transaction ID if the current flush location has
+ *   reached or surpassed the last received WAL position.
+ *
+ * The overall state progression is: GET_CANDIDATE_XID ->
+ * REQUEST_PUBLISHER_STATUS -> WAIT_FOR_PUBLISHER_STATUS -> (loop to
+ * REQUEST_PUBLISHER_STATUS till concurrent remote transactions end) ->
+ * WAIT_FOR_LOCAL_FLUSH -> loop back to GET_CANDIDATE_XID.
+ *
+ * Retaining the dead tuples for this period is sufficient for ensuring
+ * eventual consistency using last-update-wins strategy, as dead tuples are
+ * useful for detecting conflicts only during the application of concurrent
+ * transactions from remote nodes. After applying and flushing all remote
+ * transactions that occurred concurrently with the tuple DELETE, any
+ * subsequent UPDATE from a remote node should have a later timestamp. In such
+ * cases, it is acceptable to detect an update_missing scenario and convert the
+ * UPDATE to an INSERT when applying it. But, detecting concurrent remote
+ * transactions with earlier timestamps than the DELETE is necessary, as the
+ * UPDATEs in remote transactions should be ignored if their timestamp is
+ * earlier than that of the dead tuples.
+ *
+ * Note that advancing the non-removable transaction ID is not supported if the
+ * publisher is also a physical standby. This is because the logical walsender
+ * on the standby can only get the WAL replay position but there may be more
+ * WALs that are being replicated from the primary and those WALs could have
+ * earlier commit timestamp.
+ *
+ * Similarly, when the publisher has subscribed to another publisher,
+ * information necessary for conflict detection cannot be retained for
+ * changes from origins other than the publisher. This is because publisher
+ * lacks the information on concurrent transactions of other publishers to
+ * which it subscribes. As the information on concurrent transactions is
+ * unavailable beyond subscriber's immediate publishers, the non-removable
+ * transaction ID might be advanced prematurely before changes from other
+ * origins have been fully applied.
+ *
+ * XXX Retaining information for changes from other origins might be possible
+ * by requesting the subscription on that origin to enable retain_dead_tuples
+ * and fetching the conflict detection slot.xmin along with the publisher's
+ * status. In the RDT_WAIT_FOR_PUBLISHER_STATUS phase, the apply worker could
+ * wait for the remote slot's xmin to reach the oldest active transaction ID,
+ * ensuring that all transactions from other origins have been applied on the
+ * publisher, thereby getting the latest WAL position that includes all
+ * concurrent changes. However, this approach may impact performance, so it
+ * might not worth the effort.
+ *
+ * XXX It seems feasible to get the latest commit's WAL location from the
+ * publisher and wait till that is applied. However, we can't do that
+ * because commit timestamps can regress as a commit with a later LSN is not
+ * guaranteed to have a later timestamp than those with earlier LSNs. Having
+ * said that, even if that is possible, it won't improve performance much as
+ * the apply always lag and moves slowly as compared with the transactions
+ * on the publisher.
  *-------------------------------------------------------------------------
  */
 
 #include <sys/stat.h>
 #include <unistd.h>
 
+#include "access/commit_ts.h"
 #include "access/table.h"
 #include "access/tableam.h"
 #include "access/twophase.h"
 #include "catalog/pg_inherits.h"
 #include "catalog/pg_subscription.h"
 #include "catalog/pg_subscription_rel.h"
+#include "commands/subscriptioncmds.h"
 #include "commands/tablecmds.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 #include "replication/logicalrelation.h"
 #include "replication/logicalworker.h"
 #include "replication/origin.h"
+#include "replication/slot.h"
 #include "replication/walreceiver.h"
 #include "replication/worker_internal.h"
 #include "rewrite/rewriteHandler.h"
 #include "storage/buffile.h"
 #include "storage/ipc.h"
 #include "storage/lmgr.h"
+#include "storage/procarray.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
 #include "utils/dynahash.h"
    TRANS_PARALLEL_APPLY,
 } TransApplyAction;
 
+/*
+ * The phases involved in advancing the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details of the transition between these
+ * phases.
+ */
+typedef enum
+{
+   RDT_GET_CANDIDATE_XID,
+   RDT_REQUEST_PUBLISHER_STATUS,
+   RDT_WAIT_FOR_PUBLISHER_STATUS,
+   RDT_WAIT_FOR_LOCAL_FLUSH
+} RetainDeadTuplesPhase;
+
+/*
+ * Critical information for managing phase transitions within the
+ * RetainDeadTuplesPhase.
+ */
+typedef struct RetainDeadTuplesData
+{
+   RetainDeadTuplesPhase phase;    /* current phase */
+   XLogRecPtr  remote_lsn;     /* WAL write position on the publisher */
+
+   /*
+    * Oldest transaction ID that was in the commit phase on the publisher.
+    * Use FullTransactionId to prevent issues with transaction ID wraparound,
+    * where a new remote_oldestxid could falsely appear to originate from the
+    * past and block advancement.
+    */
+   FullTransactionId remote_oldestxid;
+
+   /*
+    * Next transaction ID to be assigned on the publisher. Use
+    * FullTransactionId for consistency and to allow straightforward
+    * comparisons with remote_oldestxid.
+    */
+   FullTransactionId remote_nextxid;
+
+   TimestampTz reply_time;     /* when the publisher responds with status */
+
+   /*
+    * Publisher transaction ID that must be awaited to complete before
+    * entering the final phase (RDT_WAIT_FOR_LOCAL_FLUSH). Use
+    * FullTransactionId for the same reason as remote_nextxid.
+    */
+   FullTransactionId remote_wait_for;
+
+   TransactionId candidate_xid;    /* candidate for the non-removable
+                                    * transaction ID */
+   TimestampTz flushpos_update_time;   /* when the remote flush position was
+                                        * updated in final phase
+                                        * (RDT_WAIT_FOR_LOCAL_FLUSH) */
+
+   /*
+    * The following fields are used to determine the timing for the next
+    * round of transaction ID advancement.
+    */
+   TimestampTz last_recv_time; /* when the last message was received */
+   TimestampTz candidate_xid_time; /* when the candidate_xid is decided */
+   int         xid_advance_interval;   /* how much time (ms) to wait before
+                                        * attempting to advance the
+                                        * non-removable transaction ID */
+} RetainDeadTuplesData;
+
+/*
+ * The minimum (100ms) and maximum (3 minutes) intervals for advancing
+ * non-removable transaction IDs. The maximum interval is a bit arbitrary but
+ * is sufficient to not cause any undue network traffic.
+ */
+#define MIN_XID_ADVANCE_INTERVAL 100
+#define MAX_XID_ADVANCE_INTERVAL 180000
+
 /* errcontext tracker */
 static ApplyErrorCallbackArg apply_error_callback_arg =
 {
 /* BufFile handle of the current streaming file */
 static BufFile *stream_fd = NULL;
 
+/*
+ * The remote WAL position that has been applied and flushed locally. We record
+ * and use this information both while sending feedback to the server and
+ * advancing oldest_nonremovable_xid.
+ */
+static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
+
 typedef struct SubXactInfo
 {
    TransactionId xid;          /* XID of the subxact */
 
 static void send_feedback(XLogRecPtr recvpos, bool force, bool requestReply);
 
+static void maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+                                          bool status_received);
+static bool can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data);
+static void process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+                                        bool status_received);
+static void get_candidate_xid(RetainDeadTuplesData *rdt_data);
+static void request_publisher_status(RetainDeadTuplesData *rdt_data);
+static void wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+                                     bool status_received);
+static void wait_for_local_flush(RetainDeadTuplesData *rdt_data);
+static void adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data,
+                                       bool new_xid_found);
+
 static void apply_handle_commit_internal(LogicalRepCommitData *commit_data);
 static void apply_handle_insert_internal(ApplyExecutionData *edata,
                                         ResultRelInfo *relinfo,
    bool        ping_sent = false;
    TimeLineID  tli;
    ErrorContextCallback errcallback;
+   RetainDeadTuplesData rdt_data = {0};
 
    /*
     * Init the ApplyMessageContext which we clean up after each replication
                    last_recv_timestamp = GetCurrentTimestamp();
                    ping_sent = false;
 
+                   rdt_data.last_recv_time = last_recv_timestamp;
+
                    /* Ensure we are reading the data into our memory context. */
                    MemoryContextSwitchTo(ApplyMessageContext);
 
                        UpdateWorkerStats(last_received, send_time, false);
 
                        apply_dispatch(&s);
+
+                       maybe_advance_nonremovable_xid(&rdt_data, false);
                    }
                    else if (c == 'k')
                    {
                            last_received = end_lsn;
 
                        send_feedback(last_received, reply_requested, false);
+
+                       maybe_advance_nonremovable_xid(&rdt_data, false);
+
                        UpdateWorkerStats(last_received, timestamp, true);
                    }
+                   else if (c == 's')  /* Primary status update */
+                   {
+                       rdt_data.remote_lsn = pq_getmsgint64(&s);
+                       rdt_data.remote_oldestxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+                       rdt_data.remote_nextxid = FullTransactionIdFromU64((uint64) pq_getmsgint64(&s));
+                       rdt_data.reply_time = pq_getmsgint64(&s);
+
+                       /*
+                        * This should never happen, see
+                        * ProcessStandbyPSRequestMessage. But if it happens
+                        * due to a bug, we don't want to proceed as it can
+                        * incorrectly advance oldest_nonremovable_xid.
+                        */
+                       if (XLogRecPtrIsInvalid(rdt_data.remote_lsn))
+                           elog(ERROR, "cannot get the latest WAL position from the publisher");
+
+                       maybe_advance_nonremovable_xid(&rdt_data, true);
+
+                       UpdateWorkerStats(last_received, rdt_data.reply_time, false);
+                   }
                    /* other message types are purposefully ignored */
 
                    MemoryContextReset(ApplyMessageContext);
        /* confirm all writes so far */
        send_feedback(last_received, false, false);
 
+       /* Reset the timestamp if no message was received */
+       rdt_data.last_recv_time = 0;
+
+       maybe_advance_nonremovable_xid(&rdt_data, false);
+
        if (!in_remote_transaction && !in_streamed_transaction)
        {
            /*
        else
            wait_time = NAPTIME_PER_CYCLE;
 
+       /*
+        * Ensure to wake up when it's possible to advance the non-removable
+        * transaction ID.
+        */
+       if (rdt_data.phase == RDT_GET_CANDIDATE_XID &&
+           rdt_data.xid_advance_interval)
+           wait_time = Min(wait_time, rdt_data.xid_advance_interval);
+
        rc = WaitLatchOrSocket(MyLatch,
                               WL_SOCKET_READABLE | WL_LATCH_SET |
                               WL_TIMEOUT | WL_EXIT_ON_PM_DEATH,
 
            send_feedback(last_received, requestReply, requestReply);
 
+           maybe_advance_nonremovable_xid(&rdt_data, false);
+
            /*
             * Force reporting to ensure long idle periods don't lead to
             * arbitrarily delayed stats. Stats can only be reported outside
 
    static XLogRecPtr last_recvpos = InvalidXLogRecPtr;
    static XLogRecPtr last_writepos = InvalidXLogRecPtr;
-   static XLogRecPtr last_flushpos = InvalidXLogRecPtr;
 
    XLogRecPtr  writepos;
    XLogRecPtr  flushpos;
        last_flushpos = flushpos;
 }
 
+/*
+ * Attempt to advance the non-removable transaction ID.
+ *
+ * See comments atop worker.c for details.
+ */
+static void
+maybe_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data,
+                              bool status_received)
+{
+   if (!can_advance_nonremovable_xid(rdt_data))
+       return;
+
+   process_rdt_phase_transition(rdt_data, status_received);
+}
+
+/*
+ * Preliminary check to determine if advancing the non-removable transaction ID
+ * is allowed.
+ */
+static bool
+can_advance_nonremovable_xid(RetainDeadTuplesData *rdt_data)
+{
+   /*
+    * It is sufficient to manage non-removable transaction ID for a
+    * subscription by the main apply worker to detect conflicts reliably even
+    * for table sync or parallel apply workers.
+    */
+   if (!am_leader_apply_worker())
+       return false;
+
+   /* No need to advance if retaining dead tuples is not required */
+   if (!MySubscription->retaindeadtuples)
+       return false;
+
+   return true;
+}
+
+/*
+ * Process phase transitions during the non-removable transaction ID
+ * advancement. See comments atop worker.c for details of the transition.
+ */
+static void
+process_rdt_phase_transition(RetainDeadTuplesData *rdt_data,
+                            bool status_received)
+{
+   switch (rdt_data->phase)
+   {
+       case RDT_GET_CANDIDATE_XID:
+           get_candidate_xid(rdt_data);
+           break;
+       case RDT_REQUEST_PUBLISHER_STATUS:
+           request_publisher_status(rdt_data);
+           break;
+       case RDT_WAIT_FOR_PUBLISHER_STATUS:
+           wait_for_publisher_status(rdt_data, status_received);
+           break;
+       case RDT_WAIT_FOR_LOCAL_FLUSH:
+           wait_for_local_flush(rdt_data);
+           break;
+   }
+}
+
+/*
+ * Workhorse for the RDT_GET_CANDIDATE_XID phase.
+ */
+static void
+get_candidate_xid(RetainDeadTuplesData *rdt_data)
+{
+   TransactionId oldest_running_xid;
+   TimestampTz now;
+
+   /*
+    * Use last_recv_time when applying changes in the loop to avoid
+    * unnecessary system time retrieval. If last_recv_time is not available,
+    * obtain the current timestamp.
+    */
+   now = rdt_data->last_recv_time ? rdt_data->last_recv_time : GetCurrentTimestamp();
+
+   /*
+    * Compute the candidate_xid and request the publisher status at most once
+    * per xid_advance_interval. Refer to adjust_xid_advance_interval() for
+    * details on how this value is dynamically adjusted. This is to avoid
+    * using CPU and network resources without making much progress.
+    */
+   if (!TimestampDifferenceExceeds(rdt_data->candidate_xid_time, now,
+                                   rdt_data->xid_advance_interval))
+       return;
+
+   /*
+    * Immediately update the timer, even if the function returns later
+    * without setting candidate_xid due to inactivity on the subscriber. This
+    * avoids frequent calls to GetOldestActiveTransactionId.
+    */
+   rdt_data->candidate_xid_time = now;
+
+   /*
+    * Consider transactions in the current database, as only dead tuples from
+    * this database are required for conflict detection.
+    */
+   oldest_running_xid = GetOldestActiveTransactionId(false, false);
+
+   /*
+    * Oldest active transaction ID (oldest_running_xid) can't be behind any
+    * of its previously computed value.
+    */
+   Assert(TransactionIdPrecedesOrEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+                                        oldest_running_xid));
+
+   /* Return if the oldest_nonremovable_xid cannot be advanced */
+   if (TransactionIdEquals(MyLogicalRepWorker->oldest_nonremovable_xid,
+                           oldest_running_xid))
+   {
+       adjust_xid_advance_interval(rdt_data, false);
+       return;
+   }
+
+   adjust_xid_advance_interval(rdt_data, true);
+
+   rdt_data->candidate_xid = oldest_running_xid;
+   rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+   /* process the next phase */
+   process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_REQUEST_PUBLISHER_STATUS phase.
+ */
+static void
+request_publisher_status(RetainDeadTuplesData *rdt_data)
+{
+   static StringInfo request_message = NULL;
+
+   if (!request_message)
+   {
+       MemoryContext oldctx = MemoryContextSwitchTo(ApplyContext);
+
+       request_message = makeStringInfo();
+       MemoryContextSwitchTo(oldctx);
+   }
+   else
+       resetStringInfo(request_message);
+
+   /*
+    * Send the current time to update the remote walsender's latest reply
+    * message received time.
+    */
+   pq_sendbyte(request_message, 'p');
+   pq_sendint64(request_message, GetCurrentTimestamp());
+
+   elog(DEBUG2, "sending publisher status request message");
+
+   /* Send a request for the publisher status */
+   walrcv_send(LogRepWorkerWalRcvConn,
+               request_message->data, request_message->len);
+
+   rdt_data->phase = RDT_WAIT_FOR_PUBLISHER_STATUS;
+
+   /*
+    * Skip calling maybe_advance_nonremovable_xid() since further transition
+    * is possible only once we receive the publisher status message.
+    */
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_PUBLISHER_STATUS phase.
+ */
+static void
+wait_for_publisher_status(RetainDeadTuplesData *rdt_data,
+                         bool status_received)
+{
+   /*
+    * Return if we have requested but not yet received the publisher status.
+    */
+   if (!status_received)
+       return;
+
+   if (!FullTransactionIdIsValid(rdt_data->remote_wait_for))
+       rdt_data->remote_wait_for = rdt_data->remote_nextxid;
+
+   /*
+    * Check if all remote concurrent transactions that were active at the
+    * first status request have now completed. If completed, proceed to the
+    * next phase; otherwise, continue checking the publisher status until
+    * these transactions finish.
+    *
+    * It's possible that transactions in the commit phase during the last
+    * cycle have now finished committing, but remote_oldestxid remains older
+    * than remote_wait_for. This can happen if some old transaction came in
+    * the commit phase when we requested status in this cycle. We do not
+    * handle this case explicitly as it's rare and the benefit doesn't
+    * justify the required complexity. Tracking would require either caching
+    * all xids at the publisher or sending them to subscribers. The condition
+    * will resolve naturally once the remaining transactions are finished.
+    *
+    * Directly advancing the non-removable transaction ID is possible if
+    * there are no activities on the publisher since the last advancement
+    * cycle. However, it requires maintaining two fields, last_remote_nextxid
+    * and last_remote_lsn, within the structure for comparison with the
+    * current cycle's values. Considering the minimal cost of continuing in
+    * RDT_WAIT_FOR_LOCAL_FLUSH without awaiting changes, we opted not to
+    * advance the transaction ID here.
+    */
+   if (FullTransactionIdPrecedesOrEquals(rdt_data->remote_wait_for,
+                                         rdt_data->remote_oldestxid))
+       rdt_data->phase = RDT_WAIT_FOR_LOCAL_FLUSH;
+   else
+       rdt_data->phase = RDT_REQUEST_PUBLISHER_STATUS;
+
+   /* process the next phase */
+   process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Workhorse for the RDT_WAIT_FOR_LOCAL_FLUSH phase.
+ */
+static void
+wait_for_local_flush(RetainDeadTuplesData *rdt_data)
+{
+   Assert(!XLogRecPtrIsInvalid(rdt_data->remote_lsn) &&
+          TransactionIdIsValid(rdt_data->candidate_xid));
+
+   /*
+    * We expect the publisher and subscriber clocks to be in sync using time
+    * sync service like NTP. Otherwise, we will advance this worker's
+    * oldest_nonremovable_xid prematurely, leading to the removal of rows
+    * required to detect conflicts reliably. This check primarily addresses
+    * scenarios where the publisher's clock falls behind; if the publisher's
+    * clock is ahead, subsequent transactions will naturally bear later
+    * commit timestamps, conforming to the design outlined atop worker.c.
+    *
+    * XXX Consider waiting for the publisher's clock to catch up with the
+    * subscriber's before proceeding to the next phase.
+    */
+   if (TimestampDifferenceExceeds(rdt_data->reply_time,
+                                  rdt_data->candidate_xid_time, 0))
+       ereport(ERROR,
+               errmsg_internal("oldest_nonremovable_xid transaction ID could be advanced prematurely"),
+               errdetail_internal("The clock on the publisher is behind that of the subscriber."));
+
+   /*
+    * Do not attempt to advance the non-removable transaction ID when table
+    * sync is in progress. During this time, changes from a single
+    * transaction may be applied by multiple table sync workers corresponding
+    * to the target tables. So, it's necessary for all table sync workers to
+    * apply and flush the corresponding changes before advancing the
+    * transaction ID, otherwise, dead tuples that are still needed for
+    * conflict detection in table sync workers could be removed prematurely.
+    * However, confirming the apply and flush progress across all table sync
+    * workers is complex and not worth the effort, so we simply return if not
+    * all tables are in the READY state.
+    *
+    * It is safe to add new tables with initial states to the subscription
+    * after this check because any changes applied to these tables should
+    * have a WAL position greater than the rdt_data->remote_lsn.
+    */
+   if (!AllTablesyncsReady())
+       return;
+
+   /*
+    * Update and check the remote flush position if we are applying changes
+    * in a loop. This is done at most once per WalWriterDelay to avoid
+    * performing costly operations in get_flush_position() too frequently
+    * during change application.
+    */
+   if (last_flushpos < rdt_data->remote_lsn && rdt_data->last_recv_time &&
+       TimestampDifferenceExceeds(rdt_data->flushpos_update_time,
+                                  rdt_data->last_recv_time, WalWriterDelay))
+   {
+       XLogRecPtr  writepos;
+       XLogRecPtr  flushpos;
+       bool        have_pending_txes;
+
+       /* Fetch the latest remote flush position */
+       get_flush_position(&writepos, &flushpos, &have_pending_txes);
+
+       if (flushpos > last_flushpos)
+           last_flushpos = flushpos;
+
+       rdt_data->flushpos_update_time = rdt_data->last_recv_time;
+   }
+
+   /* Return to wait for the changes to be applied */
+   if (last_flushpos < rdt_data->remote_lsn)
+       return;
+
+   /*
+    * Reaching here means the remote WAL position has been received, and all
+    * transactions up to that position on the publisher have been applied and
+    * flushed locally. So, we can advance the non-removable transaction ID.
+    */
+   SpinLockAcquire(&MyLogicalRepWorker->relmutex);
+   MyLogicalRepWorker->oldest_nonremovable_xid = rdt_data->candidate_xid;
+   SpinLockRelease(&MyLogicalRepWorker->relmutex);
+
+   elog(DEBUG2, "confirmed flush up to remote lsn %X/%X: new oldest_nonremovable_xid %u",
+        LSN_FORMAT_ARGS(rdt_data->remote_lsn),
+        rdt_data->candidate_xid);
+
+   /* Notify launcher to update the xmin of the conflict slot */
+   ApplyLauncherWakeup();
+
+   /*
+    * Reset all data fields except those used to determine the timing for the
+    * next round of transaction ID advancement. We can even use
+    * flushpos_update_time in the next round to decide whether to get the
+    * latest flush position.
+    */
+   rdt_data->phase = RDT_GET_CANDIDATE_XID;
+   rdt_data->remote_lsn = InvalidXLogRecPtr;
+   rdt_data->remote_oldestxid = InvalidFullTransactionId;
+   rdt_data->remote_nextxid = InvalidFullTransactionId;
+   rdt_data->reply_time = 0;
+   rdt_data->remote_wait_for = InvalidFullTransactionId;
+   rdt_data->candidate_xid = InvalidTransactionId;
+
+   /* process the next phase */
+   process_rdt_phase_transition(rdt_data, false);
+}
+
+/*
+ * Adjust the interval for advancing non-removable transaction IDs.
+ *
+ * We double the interval to try advancing the non-removable transaction IDs
+ * if there is no activity on the node. The maximum value of the interval is
+ * capped by wal_receiver_status_interval if it is not zero, otherwise to a
+ * 3 minutes which should be sufficient to avoid using CPU or network
+ * resources without much benefit.
+ *
+ * The interval is reset to a minimum value of 100ms once there is some
+ * activity on the node.
+ *
+ * XXX The use of wal_receiver_status_interval is a bit arbitrary so we can
+ * consider the other interval or a separate GUC if the need arises.
+ */
+static void
+adjust_xid_advance_interval(RetainDeadTuplesData *rdt_data, bool new_xid_found)
+{
+   if (!new_xid_found && rdt_data->xid_advance_interval)
+   {
+       int         max_interval = wal_receiver_status_interval
+           ? wal_receiver_status_interval * 1000
+           : MAX_XID_ADVANCE_INTERVAL;
+
+       /*
+        * No new transaction ID has been assigned since the last check, so
+        * double the interval, but not beyond the maximum allowable value.
+        */
+       rdt_data->xid_advance_interval = Min(rdt_data->xid_advance_interval * 2,
+                                            max_interval);
+   }
+   else
+   {
+       /*
+        * A new transaction ID was found or the interval is not yet
+        * initialized, so set the interval to the minimum value.
+        */
+       rdt_data->xid_advance_interval = MIN_XID_ADVANCE_INTERVAL;
+   }
+}
+
 /*
  * Exit routine for apply workers due to subscription parameter changes.
  */
        apply_worker_exit();
    }
 
+   /*
+    * Restart the worker if retain_dead_tuples was enabled during startup.
+    *
+    * At this point, the replication slot used for conflict detection might
+    * not exist yet, or could be dropped soon if the launcher perceives
+    * retain_dead_tuples as disabled. To avoid unnecessary tracking of
+    * oldest_nonremovable_xid when the slot is absent or at risk of being
+    * dropped, a restart is initiated.
+    *
+    * The oldest_nonremovable_xid should be initialized only when the
+    * retain_dead_tuples is enabled before launching the worker. See
+    * logicalrep_worker_launch.
+    */
+   if (am_leader_apply_worker() &&
+       MySubscription->retaindeadtuples &&
+       !TransactionIdIsValid(MyLogicalRepWorker->oldest_nonremovable_xid))
+   {
+       ereport(LOG,
+               errmsg("logical replication worker for subscription \"%s\" will restart because the option %s was enabled during startup",
+                      MySubscription->name, "retain_dead_tuples"));
+
+       apply_worker_exit();
+   }
+
    /* Setup synchronous commit according to the user's wishes */
    SetConfigOption("synchronous_commit", MySubscription->synccommit,
                    PGC_BACKEND, PGC_S_OVERRIDE);
            errmsg("subscription \"%s\" has been disabled because of an error",
                   MySubscription->name));
 
+   /*
+    * Skip the track_commit_timestamp check when disabling the worker due to
+    * an error, as verifying commit timestamps is unnecessary in this
+    * context.
+    */
+   if (MySubscription->retaindeadtuples)
+       CheckSubDeadTupleRetention(false, true, WARNING);
+
    proc_exit(0);
 }
 
 
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "postmaster/interrupt.h"
+#include "replication/logicallauncher.h"
 #include "replication/slotsync.h"
 #include "replication/slot.h"
 #include "replication/walsender_private.h"
 static XLogRecPtr ss_oldest_flush_lsn = InvalidXLogRecPtr;
 
 static void ReplicationSlotShmemExit(int code, Datum arg);
+static bool IsSlotForConflictCheck(const char *name);
 static void ReplicationSlotDropPtr(ReplicationSlot *slot);
 
 /* internal persistency functions */
 /*
  * Check whether the passed slot name is valid and report errors at elevel.
  *
+ * An error will be reported for a reserved replication slot name if
+ * allow_reserved_name is set to false.
+ *
  * Slot names may consist out of [a-z0-9_]{1,NAMEDATALEN-1} which should allow
  * the name to be used as a directory name on every supported OS.
  *
  * Returns whether the directory name is valid or not if elevel < ERROR.
  */
 bool
-ReplicationSlotValidateName(const char *name, int elevel)
+ReplicationSlotValidateName(const char *name, bool allow_reserved_name,
+                           int elevel)
 {
    const char *cp;
 
            return false;
        }
    }
+
+   if (!allow_reserved_name && IsSlotForConflictCheck(name))
+   {
+       ereport(elevel,
+               errcode(ERRCODE_RESERVED_NAME),
+               errmsg("replication slot name \"%s\" is reserved",
+                      name),
+               errdetail("The name \"%s\" is reserved for the conflict detection slot.",
+                         CONFLICT_DETECTION_SLOT));
+
+       return false;
+   }
+
    return true;
 }
 
+/*
+ * Return true if the replication slot name is "pg_conflict_detection".
+ */
+static bool
+IsSlotForConflictCheck(const char *name)
+{
+   return (strcmp(name, CONFLICT_DETECTION_SLOT) == 0);
+}
+
 /*
  * Create a new replication slot and mark it as used by this backend.
  *
 
    Assert(MyReplicationSlot == NULL);
 
-   ReplicationSlotValidateName(name, ERROR);
+   /*
+    * The logical launcher or pg_upgrade may create or migrate an internal
+    * slot, so using a reserved name is allowed in these cases.
+    */
+   ReplicationSlotValidateName(name, IsBinaryUpgrade || IsLogicalLauncher(),
+                               ERROR);
 
    if (failover)
    {
                        name)));
    }
 
+   /*
+    * Do not allow users to acquire the reserved slot. This scenario may
+    * occur if the launcher that owns the slot has terminated unexpectedly
+    * due to an error, and a backend process attempts to reuse the slot.
+    */
+   if (!IsLogicalLauncher() && IsSlotForConflictCheck(name))
+       ereport(ERROR,
+               errcode(ERRCODE_UNDEFINED_OBJECT),
+               errmsg("cannot acquire replication slot \"%s\"", name),
+               errdetail("The slot is reserved for conflict detection and can only be acquired by logical replication launcher."));
+
    /*
     * This is the slot we want; check if it's active under some other
     * process.  In single user mode, we don't need this check.
 
 #include "storage/ipc.h"
 #include "storage/pmsignal.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "tcop/dest.h"
 #include "tcop/tcopprot.h"
 #include "utils/acl.h"
 static void ProcessStandbyMessage(void);
 static void ProcessStandbyReplyMessage(void);
 static void ProcessStandbyHSFeedbackMessage(void);
+static void ProcessStandbyPSRequestMessage(void);
 static void ProcessRepliesIfAny(void);
 static void ProcessPendingWrites(void);
 static void WalSndKeepalive(bool requestReply, XLogRecPtr writePtr);
            ProcessStandbyHSFeedbackMessage();
            break;
 
+       case 'p':
+           ProcessStandbyPSRequestMessage();
+           break;
+
        default:
            ereport(COMMERROR,
                    (errcode(ERRCODE_PROTOCOL_VIOLATION),
    }
 }
 
+/*
+ * Process the request for a primary status update message.
+ */
+static void
+ProcessStandbyPSRequestMessage(void)
+{
+   XLogRecPtr  lsn = InvalidXLogRecPtr;
+   TransactionId oldestXidInCommit;
+   FullTransactionId nextFullXid;
+   FullTransactionId fullOldestXidInCommit;
+   WalSnd     *walsnd = MyWalSnd;
+   TimestampTz replyTime;
+
+   /*
+    * This shouldn't happen because we don't support getting primary status
+    * message from standby.
+    */
+   if (RecoveryInProgress())
+       elog(ERROR, "the primary status is unavailable during recovery");
+
+   replyTime = pq_getmsgint64(&reply_message);
+
+   /*
+    * Update shared state for this WalSender process based on reply data from
+    * standby.
+    */
+   SpinLockAcquire(&walsnd->mutex);
+   walsnd->replyTime = replyTime;
+   SpinLockRelease(&walsnd->mutex);
+
+   /*
+    * Consider transactions in the current database, as only these are the
+    * ones replicated.
+    */
+   oldestXidInCommit = GetOldestActiveTransactionId(true, false);
+   nextFullXid = ReadNextFullTransactionId();
+   fullOldestXidInCommit = FullTransactionIdFromAllowableAt(nextFullXid,
+                                                            oldestXidInCommit);
+   lsn = GetXLogWriteRecPtr();
+
+   elog(DEBUG2, "sending primary status");
+
+   /* construct the message... */
+   resetStringInfo(&output_message);
+   pq_sendbyte(&output_message, 's');
+   pq_sendint64(&output_message, lsn);
+   pq_sendint64(&output_message, (int64) U64FromFullTransactionId(fullOldestXidInCommit));
+   pq_sendint64(&output_message, (int64) U64FromFullTransactionId(nextFullXid));
+   pq_sendint64(&output_message, GetCurrentTimestamp());
+
+   /* ... and send it wrapped in CopyData */
+   pq_putmessage_noblock('d', output_message.data, output_message.len);
+}
+
 /*
  * Compute how long send/receive loops should sleep.
  *
 
  *
  * Similar to GetSnapshotData but returns just oldestActiveXid. We include
  * all PGPROCs with an assigned TransactionId, even VACUUM processes.
- * We look at all databases, though there is no need to include WALSender
- * since this has no effect on hot standby conflicts.
+ *
+ * If allDbs is true, we look at all databases, though there is no need to
+ * include WALSender since this has no effect on hot standby conflicts. If
+ * allDbs is false, skip processes attached to other databases.
  *
  * This is never executed during recovery so there is no need to look at
  * KnownAssignedXids.
  * We don't worry about updating other counters, we want to keep this as
  * simple as possible and leave GetSnapshotData() as the primary code for
  * that bookkeeping.
+ *
+ * inCommitOnly indicates getting the oldestActiveXid among the transactions
+ * in the commit critical section.
  */
 TransactionId
-GetOldestActiveTransactionId(void)
+GetOldestActiveTransactionId(bool inCommitOnly, bool allDbs)
 {
    ProcArrayStruct *arrayP = procArray;
    TransactionId *other_xids = ProcGlobal->xids;
    for (index = 0; index < arrayP->numProcs; index++)
    {
        TransactionId xid;
+       int         pgprocno = arrayP->pgprocnos[index];
+       PGPROC     *proc = &allProcs[pgprocno];
 
        /* Fetch xid just once - see GetNewTransactionId */
        xid = UINT32_ACCESS_ONCE(other_xids[index]);
        if (!TransactionIdIsNormal(xid))
            continue;
 
+       if (inCommitOnly &&
+           (proc->delayChkptFlags & DELAY_CHKPT_IN_COMMIT) == 0)
+           continue;
+
+       if (!allDbs && proc->databaseId != MyDatabaseId)
+           continue;
+
        if (TransactionIdPrecedes(xid, oldestRunningXid))
            oldestRunningXid = xid;
 
 
 #include "commands/extension.h"
 #include "miscadmin.h"
 #include "replication/logical.h"
+#include "replication/logicallauncher.h"
 #include "replication/origin.h"
 #include "replication/worker_internal.h"
 #include "storage/lmgr.h"
 
    PG_RETURN_VOID();
 }
+
+/*
+ * binary_upgrade_create_conflict_detection_slot
+ *
+ * Create a replication slot to retain information necessary for conflict
+ * detection such as dead tuples, commit timestamps, and origins.
+ */
+Datum
+binary_upgrade_create_conflict_detection_slot(PG_FUNCTION_ARGS)
+{
+   CHECK_IS_BINARY_UPGRADE;
+
+   CreateConflictDetectionSlot();
+
+   ReplicationSlotRelease();
+
+   PG_RETURN_VOID();
+}
 
    int         i_suboriginremotelsn;
    int         i_subenabled;
    int         i_subfailover;
+   int         i_subretaindeadtuples;
    int         i,
                ntups;
 
 
    if (fout->remoteVersion >= 170000)
        appendPQExpBufferStr(query,
-                            " s.subfailover\n");
+                            " s.subfailover,\n");
    else
        appendPQExpBufferStr(query,
-                            " false AS subfailover\n");
+                            " false AS subfailover,\n");
+
+   if (fout->remoteVersion >= 190000)
+       appendPQExpBufferStr(query,
+                            " s.subretaindeadtuples\n");
+   else
+       appendPQExpBufferStr(query,
+                            " false AS subretaindeadtuples\n");
 
    appendPQExpBufferStr(query,
                         "FROM pg_subscription s\n");
    i_subpasswordrequired = PQfnumber(res, "subpasswordrequired");
    i_subrunasowner = PQfnumber(res, "subrunasowner");
    i_subfailover = PQfnumber(res, "subfailover");
+   i_subretaindeadtuples = PQfnumber(res, "subretaindeadtuples");
    i_subconninfo = PQfnumber(res, "subconninfo");
    i_subslotname = PQfnumber(res, "subslotname");
    i_subsynccommit = PQfnumber(res, "subsynccommit");
            (strcmp(PQgetvalue(res, i, i_subrunasowner), "t") == 0);
        subinfo[i].subfailover =
            (strcmp(PQgetvalue(res, i, i_subfailover), "t") == 0);
+       subinfo[i].subretaindeadtuples =
+           (strcmp(PQgetvalue(res, i, i_subretaindeadtuples), "t") == 0);
        subinfo[i].subconninfo =
            pg_strdup(PQgetvalue(res, i, i_subconninfo));
        if (PQgetisnull(res, i, i_subslotname))
    if (subinfo->subfailover)
        appendPQExpBufferStr(query, ", failover = true");
 
+   if (subinfo->subretaindeadtuples)
+       appendPQExpBufferStr(query, ", retain_dead_tuples = true");
+
    if (strcmp(subinfo->subsynccommit, "off") != 0)
        appendPQExpBuffer(query, ", synchronous_commit = %s", fmtId(subinfo->subsynccommit));
 
 
    bool        subpasswordrequired;
    bool        subrunasowner;
    bool        subfailover;
+   bool        subretaindeadtuples;
    char       *subconninfo;
    char       *subslotname;
    char       *subsynccommit;
 
 static void check_for_new_tablespace_dir(void);
 static void check_for_user_defined_encoding_conversions(ClusterInfo *cluster);
 static void check_for_unicode_update(ClusterInfo *cluster);
-static void check_new_cluster_logical_replication_slots(void);
+static void check_new_cluster_replication_slots(void);
 static void check_new_cluster_subscription_configuration(void);
 static void check_old_cluster_for_valid_slots(void);
 static void check_old_cluster_subscription_state(void);
         * Before that the logical slots are not upgraded, so we will not be
         * able to upgrade the logical replication clusters completely.
         */
-       get_subscription_count(&old_cluster);
+       get_subscription_info(&old_cluster);
        check_old_cluster_subscription_state();
    }
 
 
    check_for_new_tablespace_dir();
 
-   check_new_cluster_logical_replication_slots();
+   check_new_cluster_replication_slots();
 
    check_new_cluster_subscription_configuration();
 }
 }
 
 /*
- * check_new_cluster_logical_replication_slots()
+ * check_new_cluster_replication_slots()
  *
- * Verify that there are no logical replication slots on the new cluster and
- * that the parameter settings necessary for creating slots are sufficient.
+ * Validate the new cluster's readiness for migrating replication slots:
+ * - Ensures no existing logical replication slots on the new cluster when
+ *   migrating logical slots.
+ * - Ensure conflict detection slot does not exist on the new cluster when
+ *   migrating subscriptions with retain_dead_tuples enabled.
+ * - Ensure that the parameter settings on the new cluster necessary for
+ *   creating slots are sufficient.
  */
 static void
-check_new_cluster_logical_replication_slots(void)
+check_new_cluster_replication_slots(void)
 {
    PGresult   *res;
    PGconn     *conn;
    int         nslots_on_old;
    int         nslots_on_new;
+   int         rdt_slot_on_new;
    int         max_replication_slots;
    char       *wal_level;
+   int         i_nslots_on_new;
+   int         i_rdt_slot_on_new;
 
-   /* Logical slots can be migrated since PG17. */
+   /*
+    * Logical slots can be migrated since PG17 and a physical slot
+    * CONFLICT_DETECTION_SLOT can be migrated since PG19.
+    */
    if (GET_MAJOR_VERSION(old_cluster.major_version) <= 1600)
        return;
 
    nslots_on_old = count_old_cluster_logical_slots();
 
-   /* Quick return if there are no logical slots to be migrated. */
-   if (nslots_on_old == 0)
+   /*
+    * Quick return if there are no slots to be migrated and no subscriptions
+    * have the retain_dead_tuples option enabled.
+    */
+   if (nslots_on_old == 0 && !old_cluster.sub_retain_dead_tuples)
        return;
 
    conn = connectToServer(&new_cluster, "template1");
 
-   prep_status("Checking for new cluster logical replication slots");
+   prep_status("Checking for new cluster replication slots");
 
-   res = executeQueryOrDie(conn, "SELECT count(*) "
-                           "FROM pg_catalog.pg_replication_slots "
-                           "WHERE slot_type = 'logical' AND "
-                           "temporary IS FALSE;");
+   res = executeQueryOrDie(conn, "SELECT %s AS nslots_on_new, %s AS rdt_slot_on_new "
+                           "FROM pg_catalog.pg_replication_slots",
+                           nslots_on_old > 0
+                           ? "COUNT(*) FILTER (WHERE slot_type = 'logical' AND temporary IS FALSE)"
+                           : "0",
+                           old_cluster.sub_retain_dead_tuples
+                           ? "COUNT(*) FILTER (WHERE slot_name = 'pg_conflict_detection')"
+                           : "0");
 
    if (PQntuples(res) != 1)
-       pg_fatal("could not count the number of logical replication slots");
+       pg_fatal("could not count the number of replication slots");
 
-   nslots_on_new = atoi(PQgetvalue(res, 0, 0));
+   i_nslots_on_new = PQfnumber(res, "nslots_on_new");
+   i_rdt_slot_on_new = PQfnumber(res, "rdt_slot_on_new");
+
+   nslots_on_new = atoi(PQgetvalue(res, 0, i_nslots_on_new));
 
    if (nslots_on_new)
+   {
+       Assert(nslots_on_old);
        pg_fatal("expected 0 logical replication slots but found %d",
                 nslots_on_new);
+   }
+
+   rdt_slot_on_new = atoi(PQgetvalue(res, 0, i_rdt_slot_on_new));
+
+   if (rdt_slot_on_new)
+   {
+       Assert(old_cluster.sub_retain_dead_tuples);
+       pg_fatal("The replication slot \"pg_conflict_detection\" already exists on the new cluster");
+   }
 
    PQclear(res);
 
 
    wal_level = PQgetvalue(res, 0, 0);
 
-   if (strcmp(wal_level, "logical") != 0)
+   if (nslots_on_old > 0 && strcmp(wal_level, "logical") != 0)
        pg_fatal("\"wal_level\" must be \"logical\" but is set to \"%s\"",
                 wal_level);
 
+   if (old_cluster.sub_retain_dead_tuples &&
+       strcmp(wal_level, "minimal") == 0)
+       pg_fatal("\"wal_level\" must be \"replica\" or \"logical\" but is set to \"%s\"",
+                wal_level);
+
    max_replication_slots = atoi(PQgetvalue(res, 1, 0));
 
+   if (old_cluster.sub_retain_dead_tuples &&
+       nslots_on_old + 1 > max_replication_slots)
+       pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
+                "logical replication slots on the old cluster plus one additional slot required "
+                "for retaining conflict detection information (%d)",
+                max_replication_slots, nslots_on_old + 1);
+
    if (nslots_on_old > max_replication_slots)
        pg_fatal("\"max_replication_slots\" (%d) must be greater than or equal to the number of "
                 "logical replication slots (%d) on the old cluster",
                        "The slot \"%s\" has not consumed the WAL yet\n",
                        slot->slotname);
            }
+
+           /*
+            * The name "pg_conflict_detection" (defined as
+            * CONFLICT_DETECTION_SLOT) has been reserved for logical
+            * replication conflict detection slot since PG19.
+            */
+           if (strcmp(slot->slotname, "pg_conflict_detection") == 0)
+           {
+               if (script == NULL &&
+                   (script = fopen_priv(output_path, "w")) == NULL)
+                   pg_fatal("could not open file \"%s\": %m", output_path);
+
+               fprintf(script,
+                       "The slot name \"%s\" is reserved\n",
+                       slot->slotname);
+           }
        }
    }
 
 
 }
 
 /*
- * get_subscription_count()
+ * get_subscription_info()
  *
- * Gets the number of subscriptions in the cluster.
+ * Gets the information of subscriptions in the cluster.
  */
 void
-get_subscription_count(ClusterInfo *cluster)
+get_subscription_info(ClusterInfo *cluster)
 {
    PGconn     *conn;
    PGresult   *res;
+   int         i_nsub;
+   int         i_retain_dead_tuples;
 
    conn = connectToServer(cluster, "template1");
-   res = executeQueryOrDie(conn, "SELECT count(*) "
-                           "FROM pg_catalog.pg_subscription");
-   cluster->nsubs = atoi(PQgetvalue(res, 0, 0));
+   if (GET_MAJOR_VERSION(cluster->major_version) >= 1900)
+       res = executeQueryOrDie(conn, "SELECT count(*) AS nsub,"
+                               "COUNT(CASE WHEN subretaindeadtuples THEN 1 END) > 0 AS retain_dead_tuples "
+                               "FROM pg_catalog.pg_subscription");
+   else
+       res = executeQueryOrDie(conn, "SELECT count(*) AS nsub,"
+                               "'f' AS retain_dead_tuples "
+                               "FROM pg_catalog.pg_subscription");
+
+   i_nsub = PQfnumber(res, "nsub");
+   i_retain_dead_tuples = PQfnumber(res, "retain_dead_tuples");
+
+   cluster->nsubs = atoi(PQgetvalue(res, 0, i_nsub));
+   cluster->sub_retain_dead_tuples = (strcmp(PQgetvalue(res, 0, i_retain_dead_tuples), "t") == 0);
 
    PQclear(res);
    PQfinish(conn);
 
 static void make_outputdirs(char *pgdata);
 static void setup(char *argv0);
 static void create_logical_replication_slots(void);
+static void create_conflict_detection_slot(void);
 
 ClusterInfo old_cluster,
            new_cluster;
 main(int argc, char **argv)
 {
    char       *deletion_script_file_name = NULL;
+   bool        migrate_logical_slots;
 
    /*
     * pg_upgrade doesn't currently use common/logging.c, but initialize it
              new_cluster.pgdata);
    check_ok();
 
+   migrate_logical_slots = count_old_cluster_logical_slots();
+
    /*
-    * Migrate the logical slots to the new cluster.  Note that we need to do
-    * this after resetting WAL because otherwise the required WAL would be
-    * removed and slots would become unusable.  There is a possibility that
-    * background processes might generate some WAL before we could create the
-    * slots in the new cluster but we can ignore that WAL as that won't be
-    * required downstream.
+    * Migrate replication slots to the new cluster.
+    *
+    * Note that we must migrate logical slots after resetting WAL because
+    * otherwise the required WAL would be removed and slots would become
+    * unusable.  There is a possibility that background processes might
+    * generate some WAL before we could create the slots in the new cluster
+    * but we can ignore that WAL as that won't be required downstream.
+    *
+    * The conflict detection slot is not affected by concerns related to WALs
+    * as it only retains the dead tuples. It is created here for consistency.
+    * Note that the new conflict detection slot uses the latest transaction
+    * ID as xmin, so it cannot protect dead tuples that existed before the
+    * upgrade. Additionally, commit timestamps and origin data are not
+    * preserved during the upgrade. So, even after creating the slot, the
+    * upgraded subscriber may be unable to detect conflicts or log relevant
+    * commit timestamps and origins when applying changes from the publisher
+    * occurred before the upgrade especially if those changes were not
+    * replicated. It can only protect tuples that might be deleted after the
+    * new cluster starts.
     */
-   if (count_old_cluster_logical_slots())
+   if (migrate_logical_slots || old_cluster.sub_retain_dead_tuples)
    {
        start_postmaster(&new_cluster, true);
-       create_logical_replication_slots();
+
+       if (migrate_logical_slots)
+           create_logical_replication_slots();
+
+       if (old_cluster.sub_retain_dead_tuples)
+           create_conflict_detection_slot();
+
        stop_postmaster(false);
    }
 
 
    return;
 }
+
+/*
+ * create_conflict_detection_slot()
+ *
+ * Create a replication slot to retain information necessary for conflict
+ * detection such as dead tuples, commit timestamps, and origins, for migrated
+ * subscriptions with retain_dead_tuples enabled.
+ */
+static void
+create_conflict_detection_slot(void)
+{
+   PGconn     *conn_new_template1;
+
+   prep_status("Creating the replication conflict detection slot");
+
+   conn_new_template1 = connectToServer(&new_cluster, "template1");
+   PQclear(executeQueryOrDie(conn_new_template1, "SELECT pg_catalog.binary_upgrade_create_conflict_detection_slot()"));
+   PQfinish(conn_new_template1);
+
+   check_ok();
+}
 
    uint32      bin_version;    /* version returned from pg_ctl */
    const char *tablespace_suffix;  /* directory specification */
    int         nsubs;          /* number of subscriptions */
+   bool        sub_retain_dead_tuples; /* whether a subscription enables
+                                        * retain_dead_tuples. */
 } ClusterInfo;
 
 
                              const char *new_pgdata);
 void       get_db_rel_and_slot_infos(ClusterInfo *cluster);
 int            count_old_cluster_logical_slots(void);
-void       get_subscription_count(ClusterInfo *cluster);
+void       get_subscription_info(ClusterInfo *cluster);
 
 /* option.c */
 
 
 
 # Initialize the old subscriber node
 my $old_sub = PostgreSQL::Test::Cluster->new('old_sub');
-$old_sub->init;
+$old_sub->init(allows_streaming => 'physical');
 $old_sub->start;
 my $oldbindir = $old_sub->config_data('--bindir');
 
 # Initialize the new subscriber
 my $new_sub = PostgreSQL::Test::Cluster->new('new_sub');
-$new_sub->init;
+$new_sub->init(allows_streaming => 'physical');
 my $newbindir = $new_sub->config_data('--bindir');
 
 # In a VPATH build, we'll be started in the source directory, but we want
 $old_sub->start;
 $old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;");
 
+# ------------------------------------------------------
+# Check that pg_upgrade fails when max_replication_slots configured in the new
+# cluster is less than the number of logical slots in the old cluster + 1 when
+# subscription's retain_dead_tuples option is enabled.
+# ------------------------------------------------------
+# It is sufficient to use disabled subscription to test upgrade failure.
+
+$publisher->safe_psql('postgres', "CREATE PUBLICATION regress_pub1");
+$old_sub->safe_psql('postgres',
+   "CREATE SUBSCRIPTION regress_sub1 CONNECTION '$connstr' PUBLICATION regress_pub1 WITH (enabled = false, retain_dead_tuples = true)"
+);
+
+$old_sub->stop;
+
+$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 0');
+
+# pg_upgrade will fail because the new cluster has insufficient
+# max_replication_slots.
+command_checks_all(
+   [
+       'pg_upgrade',
+       '--no-sync',
+       '--old-datadir' => $old_sub->data_dir,
+       '--new-datadir' => $new_sub->data_dir,
+       '--old-bindir' => $oldbindir,
+       '--new-bindir' => $newbindir,
+       '--socketdir' => $new_sub->host,
+       '--old-port' => $old_sub->port,
+       '--new-port' => $new_sub->port,
+       $mode,
+       '--check',
+   ],
+   1,
+   [
+       qr/"max_replication_slots" \(0\) must be greater than or equal to the number of logical replication slots on the old cluster plus one additional slot required for retaining conflict detection information \(1\)/
+   ],
+   [qr//],
+   'run of pg_upgrade where the new cluster has insufficient max_replication_slots'
+);
+
+# Reset max_replication_slots
+$new_sub->append_conf('postgresql.conf', 'max_replication_slots = 10');
+
+# Cleanup
+$publisher->safe_psql('postgres', "DROP PUBLICATION regress_pub1");
+$old_sub->start;
+$old_sub->safe_psql('postgres', "DROP SUBSCRIPTION regress_sub1;");
+
 # ------------------------------------------------------
 # Check that pg_upgrade refuses to run if:
 # a) there's a subscription with tables in a state other than 'r' (ready) or
 rmtree($new_sub->data_dir . "/pg_upgrade_output.d");
 
 # Verify that the upgrade should be successful with tables in 'ready'/'init'
-# state along with retaining the replication origin's remote lsn, subscription's
-# running status, and failover option.
+# state along with retaining the replication origin's remote lsn,
+# subscription's running status, failover option, and retain_dead_tuples
+# option.
 $publisher->safe_psql(
    'postgres', qq[
        CREATE TABLE tab_upgraded1(id int);
 $old_sub->safe_psql(
    'postgres', qq[
        CREATE TABLE tab_upgraded1(id int);
-       CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true);
+       CREATE SUBSCRIPTION regress_sub4 CONNECTION '$connstr' PUBLICATION regress_pub4 WITH (failover = true, retain_dead_tuples = true);
 ]);
 
 # Wait till the table tab_upgraded1 reaches 'ready' state
 # Check that pg_upgrade is successful when all tables are in ready or in
 # init state (tab_upgraded1 table is in ready state and tab_upgraded2 table is
 # in init state) along with retaining the replication origin's remote lsn,
-# subscription's running status, and failover option.
+# subscription's running status, failover option, and retain_dead_tuples
+# option.
 # ------------------------------------------------------
 command_ok(
    [
 # ------------------------------------------------------
 # Check that the data inserted to the publisher when the new subscriber is down
 # will be replicated once it is started. Also check that the old subscription
-# states and relations origins are all preserved.
+# states and relations origins are all preserved, and that the conflict
+# detection slot is created.
 # ------------------------------------------------------
 $publisher->safe_psql(
    'postgres', qq[
 
 $new_sub->start;
 
-# The subscription's running status and failover option should be preserved
-# in the upgraded instance. So regress_sub4 should still have subenabled and
-# subfailover set to true, while regress_sub5 should have both set to false.
+# The subscription's running status, failover option, and retain_dead_tuples
+# option should be preserved in the upgraded instance. So regress_sub4 should
+# still have subenabled, subfailover, and subretaindeadtuples set to true,
+# while regress_sub5 should have both set to false.
 $result = $new_sub->safe_psql('postgres',
-   "SELECT subname, subenabled, subfailover FROM pg_subscription ORDER BY subname"
+   "SELECT subname, subenabled, subfailover, subretaindeadtuples FROM pg_subscription ORDER BY subname"
 );
-is( $result, qq(regress_sub4|t|t
-regress_sub5|f|f),
-   "check that the subscription's running status and failover are preserved"
+is( $result, qq(regress_sub4|t|t|t
+regress_sub5|f|f|f),
+   "check that the subscription's running status, failover, and retain_dead_tuples are preserved"
 );
 
 # Subscription relations should be preserved
 );
 is($result, qq($remote_lsn), "remote_lsn should have been preserved");
 
+# The conflict detection slot should be created
+$result = $new_sub->safe_psql('postgres',
+   "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'");
+is($result, qq(t), "conflict detection slot exists");
+
 # Resume the initial sync and wait until all tables of subscription
 # 'regress_sub5' are synchronized
 $new_sub->append_conf('postgresql.conf',
 
    printQueryOpt myopt = pset.popt;
    static const bool translate_columns[] = {false, false, false, false,
        false, false, false, false, false, false, false, false, false, false,
-   false};
+   false, false};
 
    if (pset.sversion < 100000)
    {
            appendPQExpBuffer(&buf,
                              ", subfailover AS \"%s\"\n",
                              gettext_noop("Failover"));
+       if (pset.sversion >= 190000)
+           appendPQExpBuffer(&buf,
+                             ", subretaindeadtuples AS \"%s\"\n",
+                             gettext_noop("Retain dead tuples"));
 
        appendPQExpBuffer(&buf,
                          ",  subsynccommit AS \"%s\"\n"
 
    /* ALTER SUBSCRIPTION <name> SET ( */
    else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SET", "("))
        COMPLETE_WITH("binary", "disable_on_error", "failover", "origin",
-                     "password_required", "run_as_owner", "slot_name",
-                     "streaming", "synchronous_commit", "two_phase");
+                     "password_required", "retain_dead_tuples",
+                     "run_as_owner", "slot_name", "streaming",
+                     "synchronous_commit", "two_phase");
    /* ALTER SUBSCRIPTION <name> SKIP ( */
    else if (Matches("ALTER", "SUBSCRIPTION", MatchAny, MatchAnyN, "SKIP", "("))
        COMPLETE_WITH("lsn");
    else if (Matches("CREATE", "SUBSCRIPTION", MatchAnyN, "WITH", "("))
        COMPLETE_WITH("binary", "connect", "copy_data", "create_slot",
                      "disable_on_error", "enabled", "failover", "origin",
-                     "password_required", "run_as_owner", "slot_name",
-                     "streaming", "synchronous_commit", "two_phase");
+                     "password_required", "retain_dead_tuples",
+                     "run_as_owner", "slot_name", "streaming",
+                     "synchronous_commit", "two_phase");
 
 /* CREATE TRIGGER --- is allowed inside CREATE SCHEMA, so use TailMatches */
 
 
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202507091
+#define CATALOG_VERSION_NO 202507231
 
 #endif
 
   proname => 'binary_upgrade_replorigin_advance', proisstrict => 'f',
   provolatile => 'v', proparallel => 'u', prorettype => 'void',
   proargtypes => 'text pg_lsn', prosrc => 'binary_upgrade_replorigin_advance' },
+{ oid => '9159', descr => 'for use by pg_upgrade (conflict detection slot)',
+  proname => 'binary_upgrade_create_conflict_detection_slot', proisstrict => 'f',
+  provolatile => 'v', proparallel => 'u', prorettype => 'void',
+  proargtypes => '', prosrc => 'binary_upgrade_create_conflict_detection_slot' },
 
 # conversion functions
 { oid => '4302',
 
                                 * slots) in the upstream database are enabled
                                 * to be synchronized to the standbys. */
 
+   bool        subretaindeadtuples;    /* True if dead tuples useful for
+                                        * conflict detection are retained */
+
 #ifdef CATALOG_VARLEN          /* variable-length fields start here */
    /* Connection string to the publisher */
    text        subconninfo BKI_FORCE_NOT_NULL;
                                 * (i.e. the main slot and the table sync
                                 * slots) in the upstream database are enabled
                                 * to be synchronized to the standbys. */
+   bool        retaindeadtuples;   /* True if dead tuples useful for conflict
+                                    * detection are retained */
    char       *conninfo;       /* Connection string to the publisher */
    char       *slotname;       /* Name of the replication slot */
    char       *synccommit;     /* Synchronous commit setting for worker */
 
 
 extern char defGetStreamingMode(DefElem *def);
 
+extern ObjectAddress AlterSubscription(ParseState *pstate, AlterSubscriptionStmt *stmt, bool isTopLevel);
+
+extern void CheckSubDeadTupleRetention(bool check_guc, bool sub_disabled,
+                                      int elevel_for_sub_disabled);
+
 #endif                         /* SUBSCRIPTIONCMDS_H */
 
 extern void ApplyLauncherForgetWorkerStartTime(Oid subid);
 
 extern void ApplyLauncherWakeupAtCommit(void);
+extern void ApplyLauncherWakeup(void);
 extern void AtEOXact_ApplyLauncher(bool isCommit);
 
+extern void CreateConflictDetectionSlot(void);
+
 extern bool IsLogicalLauncher(void);
 
 extern pid_t GetLeaderApplyWorkerPid(pid_t pid);
 
 /* directory to store replication slot data in */
 #define PG_REPLSLOT_DIR     "pg_replslot"
 
+/*
+ * The reserved name for a replication slot used to retain dead tuples for
+ * conflict detection in logical replication. See
+ * maybe_advance_nonremovable_xid() for detail.
+ */
+#define CONFLICT_DETECTION_SLOT "pg_conflict_detection"
+
 /*
  * Behaviour of replication slots, upon release or crash.
  *
 
 /* misc stuff */
 extern void ReplicationSlotInitialize(void);
-extern bool ReplicationSlotValidateName(const char *name, int elevel);
+extern bool ReplicationSlotValidateName(const char *name,
+                                       bool allow_reserved_name,
+                                       int elevel);
 extern void ReplicationSlotReserveWal(void);
 extern void ReplicationSlotsComputeRequiredXmin(bool already_locked);
 extern void ReplicationSlotsComputeRequiredLSN(void);
 
    /* Indicates whether apply can be performed in parallel. */
    bool        parallel_apply;
 
+   /*
+    * The changes made by this and later transactions must be retained to
+    * ensure reliable conflict detection during the apply phase.
+    *
+    * The logical replication launcher manages an internal replication slot
+    * named "pg_conflict_detection". It asynchronously collects this ID to
+    * decide when to advance the xmin value of the slot.
+    */
+   TransactionId oldest_nonremovable_xid;
+
    /* Stats. */
    XLogRecPtr  last_lsn;
    TimestampTz last_send_time;
 extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                     Oid dbid, Oid subid, const char *subname,
                                     Oid userid, Oid relid,
-                                    dsm_handle subworker_dsm);
+                                    dsm_handle subworker_dsm,
+                                    bool retain_dead_tuples);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 
  * the checkpoint are actually destroyed on disk. Replay can cope with a file
  * or block that doesn't exist, but not with a block that has the wrong
  * contents.
+ *
+ * Setting DELAY_CHKPT_IN_COMMIT is similar to setting DELAY_CHKPT_START, but
+ * it explicitly indicates that the reason for delaying the checkpoint is due
+ * to a transaction being within a critical commit section. We need this new
+ * flag to ensure all the transactions that have acquired commit timestamp are
+ * finished before we allow the logical replication client to advance its xid
+ * which is used to hold back dead rows for conflict detection.
  */
 #define DELAY_CHKPT_START      (1<<0)
 #define DELAY_CHKPT_COMPLETE   (1<<1)
+#define DELAY_CHKPT_IN_COMMIT  (DELAY_CHKPT_START | 1<<2)
 
 typedef enum
 {
 
 extern bool TransactionIdIsInProgress(TransactionId xid);
 extern TransactionId GetOldestNonRemovableTransactionId(Relation rel);
 extern TransactionId GetOldestTransactionIdConsideredRunning(void);
-extern TransactionId GetOldestActiveTransactionId(void);
+extern TransactionId GetOldestActiveTransactionId(bool inCommitOnly,
+                                                 bool allDbs);
 extern TransactionId GetOldestSafeDecodingTransactionId(bool catalogOnly);
 extern void GetReplicationHorizons(TransactionId *xmin, TransactionId *catalog_xmin);
 
 
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+ regress_testsub4
-                                                                                                                  List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | none   | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub4 SET (origin = any);
 \dRs+ regress_testsub4
-                                                                                                                  List of subscriptions
-       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
-------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+       Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub4 | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub3;
 ERROR:  invalid connection string syntax: missing "=" after "foobar" in connection info string
 
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET PUBLICATION testpub2, testpub3 WITH (refresh = false);
 ALTER SUBSCRIPTION regress_testsub SET (password_required = false);
 ALTER SUBSCRIPTION regress_testsub SET (run_as_owner = true);
 \dRs+
-                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | off                | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |           Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | f                 | t             | f        | f                  | off                | dbname=regress_doesnotexist2 | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (password_required = true);
 -- ok
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/12345');
 \dRs+
-                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/00012345
+                                                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |           Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist2 | 0/00012345
 (1 row)
 
 -- ok - with lsn = NONE
 ALTER SUBSCRIPTION regress_testsub SKIP (lsn = '0/0');
 ERROR:  invalid WAL location (LSN): 0/0
 \dRs+
-                                                                                                                      List of subscriptions
-      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           |  Skip LSN  
------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                List of subscriptions
+      Name       |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |           Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist2 | 0/00000000
 (1 row)
 
 BEGIN;
 ERROR:  invalid value for parameter "synchronous_commit": "foobar"
 HINT:  Available values: local, remote_write, remote_apply, on, off.
 \dRs+
-                                                                                                                        List of subscriptions
-        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |           Conninfo           |  Skip LSN  
----------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+------------------------------+------------
- regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | local              | dbname=regress_doesnotexist2 | 0/00000000
+                                                                                                                                  List of subscriptions
+        Name         |           Owner           | Enabled |     Publication     | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |           Conninfo           |  Skip LSN  
+---------------------+---------------------------+---------+---------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+------------------------------+------------
+ regress_testsub_foo | regress_subscription_user | f       | {testpub2,testpub3} | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | local              | dbname=regress_doesnotexist2 | 0/00000000
 (1 row)
 
 -- rename back to keep the rest simple
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | t      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (binary = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = parallel);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (streaming = false);
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 -- fail - publication already exists
 ALTER SUBSCRIPTION regress_testsub ADD PUBLICATION testpub1, testpub2 WITH (refresh = false);
 ERROR:  publication "testpub1" is already in subscription "regress_testsub"
 \dRs+
-                                                                                                                         List of subscriptions
-      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                                    List of subscriptions
+      Name       |           Owner           | Enabled |         Publication         | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-----------------------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub,testpub1,testpub2} | f      | off       | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 -- fail - publication used more than once
 -- ok - delete publications
 ALTER SUBSCRIPTION regress_testsub DROP PUBLICATION testpub1, testpub2 WITH (refresh = false);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | off       | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 DROP SUBSCRIPTION regress_testsub;
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | p                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 -- we can alter streaming when two_phase enabled
 ALTER SUBSCRIPTION regress_testsub SET (streaming = true);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | on        | p                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 WARNING:  subscription was created, but is not connected
 HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (disable_on_error = true);
 \dRs+
-                                                                                                                 List of subscriptions
-      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Synchronous commit |          Conninfo           |  Skip LSN  
------------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+-----------------------------+------------
- regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | off                | dbname=regress_doesnotexist | 0/00000000
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | t                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
+(1 row)
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+-- fail - retain_dead_tuples must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo);
+ERROR:  retain_dead_tuples requires a Boolean value
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false);
+WARNING:  subscription was created, but is not connected
+HINT:  To initiate replication, you must manually create the replication slot, enable the subscription, and refresh the subscription.
+\dRs+
+                                                                                                                            List of subscriptions
+      Name       |           Owner           | Enabled | Publication | Binary | Streaming | Two-phase commit | Disable on error | Origin | Password required | Run as owner? | Failover | Retain dead tuples | Synchronous commit |          Conninfo           |  Skip LSN  
+-----------------+---------------------------+---------+-------------+--------+-----------+------------------+------------------+--------+-------------------+---------------+----------+--------------------+--------------------+-----------------------------+------------
+ regress_testsub | regress_subscription_user | f       | {testpub}   | f      | parallel  | d                | f                | any    | t                 | f             | f        | f                  | off                | dbname=regress_doesnotexist | 0/00000000
 (1 row)
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 
 ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
 DROP SUBSCRIPTION regress_testsub;
 
+-- fail - retain_dead_tuples must be boolean
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = foo);
+
+-- ok
+CREATE SUBSCRIPTION regress_testsub CONNECTION 'dbname=regress_doesnotexist' PUBLICATION testpub WITH (connect = false, retain_dead_tuples = false);
+
+\dRs+
+
+ALTER SUBSCRIPTION regress_testsub SET (slot_name = NONE);
+DROP SUBSCRIPTION regress_testsub;
+
 -- let's do some tests with pg_create_subscription rather than superuser
 SET SESSION AUTHORIZATION regress_subscription_user3;
 
 
 # Copyright (c) 2025, PostgreSQL Global Development Group
 
-# Test the conflict detection of conflict type 'multiple_unique_conflicts'.
+# Test conflicts in logical replication
 use strict;
 use warnings FATAL => 'all';
 use PostgreSQL::Test::Cluster;
 
 # Create a subscriber node
 my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
-$node_subscriber->init;
+$node_subscriber->init(allows_streaming => 'logical');
 $node_subscriber->start;
 
 # Create a table on publisher
 
 pass('multiple_unique_conflicts detected on a leaf partition during insert');
 
+###############################################################################
+# Setup a bidirectional logical replication between node_A & node_B
+###############################################################################
+
+# Initialize nodes.
+
+# node_A. Increase the log_min_messages setting to DEBUG2 to debug test
+# failures. Disable autovacuum to avoid generating xid that could affect the
+# replication slot's xmin value.
+my $node_A = $node_publisher;
+$node_A->append_conf(
+   'postgresql.conf',
+   qq{autovacuum = off
+   log_min_messages = 'debug2'});
+$node_A->restart;
+
+# node_B
+my $node_B = $node_subscriber;
+$node_B->append_conf('postgresql.conf', "track_commit_timestamp = on");
+$node_B->restart;
+
+# Create table on node_A
+$node_A->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)");
+
+# Create the same table on node_B
+$node_B->safe_psql('postgres', "CREATE TABLE tab (a int PRIMARY KEY, b int)");
+
+my $subname_AB = 'tap_sub_a_b';
+my $subname_BA = 'tap_sub_b_a';
+
+# Setup logical replication
+# node_A (pub) -> node_B (sub)
+my $node_A_connstr = $node_A->connstr . ' dbname=postgres';
+$node_A->safe_psql('postgres', "CREATE PUBLICATION tap_pub_A FOR TABLE tab");
+$node_B->safe_psql(
+   'postgres', "
+   CREATE SUBSCRIPTION $subname_BA
+   CONNECTION '$node_A_connstr application_name=$subname_BA'
+   PUBLICATION tap_pub_A
+   WITH (origin = none, retain_dead_tuples = true)");
+
+# node_B (pub) -> node_A (sub)
+my $node_B_connstr = $node_B->connstr . ' dbname=postgres';
+$node_B->safe_psql('postgres', "CREATE PUBLICATION tap_pub_B FOR TABLE tab");
+$node_A->safe_psql(
+   'postgres', "
+   CREATE SUBSCRIPTION $subname_AB
+   CONNECTION '$node_B_connstr application_name=$subname_AB'
+   PUBLICATION tap_pub_B
+   WITH (origin = none, copy_data = off)");
+
+# Wait for initial table sync to finish
+$node_A->wait_for_subscription_sync($node_B, $subname_AB);
+$node_B->wait_for_subscription_sync($node_A, $subname_BA);
+
+is(1, 1, 'Bidirectional replication setup is complete');
+
+# Confirm that the conflict detection slot is created on Node B and the xmin
+# value is valid.
+ok( $node_B->poll_query_until(
+       'postgres',
+       "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the xmin value of slot 'pg_conflict_detection' is valid on Node B");
+
+##################################################
+# Check that the retain_dead_tuples option can be enabled only for disabled
+# subscriptions. Validate the NOTICE message during the subscription DDL, and
+# ensure the conflict detection slot is created upon enabling the
+# retain_dead_tuples option.
+##################################################
+
+# Alter retain_dead_tuples for enabled subscription
+my ($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+   "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true)");
+ok( $stderr =~
+     /ERROR:  cannot set option \"retain_dead_tuples\" for enabled subscription/,
+   "altering retain_dead_tuples is not allowed for enabled subscription");
+
+# Disable the subscription
+$node_A->psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE;");
+
+# Enable retain_dead_tuples for disabled subscription
+($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+   "ALTER SUBSCRIPTION $subname_AB SET (retain_dead_tuples = true);");
+ok( $stderr =~
+     /NOTICE:  deleted rows to detect conflicts would not be removed until the subscription is enabled/,
+   "altering retain_dead_tuples is allowed for disabled subscription");
+
+# Re-enable the subscription
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+
+# Confirm that the conflict detection slot is created on Node A and the xmin
+# value is valid.
+ok( $node_A->poll_query_until(
+       'postgres',
+       "SELECT xmin IS NOT NULL from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the xmin value of slot 'pg_conflict_detection' is valid on Node A");
+
+##################################################
+# Check the WARNING when changing the origin to ANY, if retain_dead_tuples is
+# enabled. This warns of the possibility of receiving changes from origins
+# other than the publisher.
+##################################################
+
+($cmdret, $stdout, $stderr) = $node_A->psql('postgres',
+   "ALTER SUBSCRIPTION $subname_AB SET (origin = any);");
+ok( $stderr =~
+     /WARNING:  subscription "tap_sub_a_b" enabled retain_dead_tuples but might not reliably detect conflicts for changes from different origins/,
+   "warn of the possibility of receiving changes from origins other than the publisher");
+
+# Reset the origin to none
+$node_A->psql('postgres',
+   "ALTER SUBSCRIPTION $subname_AB SET (origin = none);");
+
+###############################################################################
+# Check that dead tuples on node A cannot be cleaned by VACUUM until the
+# concurrent transactions on Node B have been applied and flushed on Node A.
+###############################################################################
+
+# Insert a record
+$node_A->safe_psql('postgres', "INSERT INTO tab VALUES (1, 1), (2, 2);");
+$node_A->wait_for_catchup($subname_BA);
+
+my $result = $node_B->safe_psql('postgres', "SELECT * FROM tab;");
+is($result, qq(1|1
+2|2), 'check replicated insert on node B');
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
+
+($cmdret, $stdout, $stderr) = $node_A->psql(
+   'postgres', qq(VACUUM (verbose) public.tab;)
+);
+
+ok( $stderr =~
+     qr/1 are dead but not yet removable/,
+   'the deleted column is non-removable');
+
+$node_A->safe_psql(
+   'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+# Remember the next transaction ID to be assigned
+my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
+
+# Confirm that the xmin value is advanced to the latest nextXid. If no
+# transactions are running, the apply worker selects nextXid as the candidate
+# for the non-removable xid. See GetOldestActiveTransactionId().
+ok( $node_A->poll_query_until(
+       'postgres',
+       "SELECT xmin = $next_xid from pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the xmin value of slot 'pg_conflict_detection' is updated on Node A");
+
+# Confirm that the dead tuple can be removed now
+($cmdret, $stdout, $stderr) = $node_A->psql(
+   'postgres', qq(VACUUM (verbose) public.tab;)
+);
+
+ok( $stderr =~
+     qr/1 removed, 1 remain, 0 are dead but not yet removable/,
+   'the deleted column is removed');
+
+###############################################################################
+# Check that the replication slot pg_conflict_detection is dropped after
+# removing all the subscriptions.
+###############################################################################
+
+$node_B->safe_psql(
+   'postgres', "DROP SUBSCRIPTION $subname_BA");
+
+ok( $node_B->poll_query_until(
+       'postgres',
+       "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the slot 'pg_conflict_detection' has been dropped on Node B");
+
+$node_A->safe_psql(
+   'postgres', "DROP SUBSCRIPTION $subname_AB");
+
+ok( $node_A->poll_query_until(
+       'postgres',
+       "SELECT count(*) = 0 FROM pg_replication_slots WHERE slot_name = 'pg_conflict_detection'"
+   ),
+   "the slot 'pg_conflict_detection' has been dropped on Node A");
+
 done_testing();
 
 Result
 ResultRelInfo
 ResultState
+RetainDeadTuplesData
+RetainDeadTuplesPhase
 ReturnSetInfo
 ReturnStmt
 ReturningClause