<structfield>subretaindeadtuples</structfield> <type>bool</type>
       </para>
       <para>
-       If true, the information (e.g., dead tuples, commit timestamps, and
+       If true, the detection of <xref linkend="conflict-update-deleted"/> is
+       enabled and the information (e.g., dead tuples, commit timestamps, and
        origins) on the subscriber that is useful for conflict detection is
        retained.
       </para></entry>
 
       </para>
      </listitem>
     </varlistentry>
+    <varlistentry id="conflict-update-deleted" xreflabel="update_deleted">
+     <term><literal>update_deleted</literal></term>
+     <listitem>
+      <para>
+       The tuple to be updated was concurrently deleted by another origin. The
+       update will simply be skipped in this scenario. Note that this conflict
+       can only be detected when
+       <link linkend="guc-track-commit-timestamp"><varname>track_commit_timestamp</varname></link>
+       and <link linkend="sql-createsubscription-params-with-retain-dead-tuples"><literal>retain_dead_tuples</literal></link>
+       are enabled. Note that if a tuple cannot be found due to the table being
+       truncated, only a <literal>update_missing</literal> conflict will
+       arise. Additionally, if the tuple was deleted by the same origin, an
+       <literal>update_missing</literal> conflict will arise.
+      </para>
+     </listitem>
+    </varlistentry>
     <varlistentry id="conflict-update-missing" xreflabel="update_missing">
      <term><literal>update_missing</literal></term>
      <listitem>
 
       </para></entry>
      </row>
 
+     <row>
+      <entry role="catalog_table_entry"><para role="column_definition">
+       <structfield>confl_update_deleted</structfield> <type>bigint</type>
+      </para>
+      <para>
+       Number of times the tuple to be updated was concurrently deleted by
+       another source during the application of changes. See <xref linkend="conflict-update-deleted"/>
+       for details about this conflict.
+      </para></entry>
+     </row>
+
      <row>
       <entry role="catalog_table_entry"><para role="column_definition">
        <structfield>confl_update_missing</structfield> <type>bigint</type>
 
           Specifies whether the information (e.g., dead tuples, commit
           timestamps, and origins) required for conflict detection on the
           subscriber is retained. The default is <literal>false</literal>.
-          If set to <literal>true</literal>, a physical replication slot named
-          <quote><literal>pg_conflict_detection</literal></quote> will be
-          created on the subscriber to prevent the conflict information from
-          being removed.
+          If set to <literal>true</literal>, the detection of
+          <xref linkend="conflict-update-deleted"/> is enabled, and a physical
+          replication slot named <quote><literal>pg_conflict_detection</literal></quote>
+          created on the subscriber to prevent the information for detecting
+          conflicts from being removed.
          </para>
 
          <para>
 
         ss.confl_insert_exists,
         ss.confl_update_origin_differs,
         ss.confl_update_exists,
+        ss.confl_update_deleted,
         ss.confl_update_missing,
         ss.confl_delete_origin_differs,
         ss.confl_delete_missing,
 
 
 #include "postgres.h"
 
+#include "access/commit_ts.h"
 #include "access/genam.h"
 #include "access/gist.h"
 #include "access/relscan.h"
 #include "access/tableam.h"
 #include "access/transam.h"
 #include "access/xact.h"
+#include "access/heapam.h"
 #include "catalog/pg_am_d.h"
 #include "commands/trigger.h"
 #include "executor/executor.h"
 
 
 static bool tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-                        TypeCacheEntry **eq);
+                        TypeCacheEntry **eq, Bitmapset *columns);
 
 /*
  * Setup a ScanKey for a search in the relation 'rel' for a tuple 'key' that
            if (eq == NULL)
                eq = palloc0(sizeof(*eq) * outslot->tts_tupleDescriptor->natts);
 
-           if (!tuples_equal(outslot, searchslot, eq))
+           if (!tuples_equal(outslot, searchslot, eq, NULL))
                continue;
        }
 
 
 /*
  * Compare the tuples in the slots by checking if they have equal values.
+ *
+ * If 'columns' is not null, only the columns specified within it will be
+ * considered for the equality check, ignoring all other columns.
  */
 static bool
 tuples_equal(TupleTableSlot *slot1, TupleTableSlot *slot2,
-            TypeCacheEntry **eq)
+            TypeCacheEntry **eq, Bitmapset *columns)
 {
    int         attrnum;
 
        if (att->attisdropped || att->attgenerated)
            continue;
 
+       /*
+        * Ignore columns that are not listed for checking.
+        */
+       if (columns &&
+           !bms_is_member(att->attnum - FirstLowInvalidHeapAttributeNumber,
+                          columns))
+           continue;
+
        /*
         * If one value is NULL and other is not, then they are certainly not
         * equal
    /* Try to find the tuple */
    while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
    {
-       if (!tuples_equal(scanslot, searchslot, eq))
+       if (!tuples_equal(scanslot, searchslot, eq, NULL))
            continue;
 
        found = true;
    }
 }
 
+/*
+ * If the tuple is recently dead and was deleted by a transaction with a newer
+ * commit timestamp than previously recorded, update the associated transaction
+ * ID, commit time, and origin. This helps ensure that conflict detection uses
+ * the most recent and relevant deletion metadata.
+ */
+static void
+update_most_recent_deletion_info(TupleTableSlot *scanslot,
+                                TransactionId oldestxmin,
+                                TransactionId *delete_xid,
+                                TimestampTz *delete_time,
+                                RepOriginId *delete_origin)
+{
+   BufferHeapTupleTableSlot *hslot;
+   HeapTuple   tuple;
+   Buffer      buf;
+   bool        recently_dead = false;
+   TransactionId xmax;
+   TimestampTz localts;
+   RepOriginId localorigin;
+
+   hslot = (BufferHeapTupleTableSlot *) scanslot;
+
+   tuple = ExecFetchSlotHeapTuple(scanslot, false, NULL);
+   buf = hslot->buffer;
+
+   LockBuffer(buf, BUFFER_LOCK_SHARE);
+
+   /*
+    * We do not consider HEAPTUPLE_DEAD status because it indicates either
+    * tuples whose inserting transaction was aborted (meaning there is no
+    * commit timestamp or origin), or tuples deleted by a transaction older
+    * than oldestxmin, making it safe to ignore them during conflict
+    * detection (See comments atop worker.c for details).
+    */
+   if (HeapTupleSatisfiesVacuum(tuple, oldestxmin, buf) == HEAPTUPLE_RECENTLY_DEAD)
+       recently_dead = true;
+
+   LockBuffer(buf, BUFFER_LOCK_UNLOCK);
+
+   if (!recently_dead)
+       return;
+
+   xmax = HeapTupleHeaderGetUpdateXid(tuple->t_data);
+   if (!TransactionIdIsValid(xmax))
+       return;
+
+   /* Select the dead tuple with the most recent commit timestamp */
+   if (TransactionIdGetCommitTsData(xmax, &localts, &localorigin) &&
+       TimestampDifferenceExceeds(*delete_time, localts, 0))
+   {
+       *delete_xid = xmax;
+       *delete_time = localts;
+       *delete_origin = localorigin;
+   }
+}
+
+/*
+ * Searches the relation 'rel' for the most recently deleted tuple that matches
+ * the values in 'searchslot' and is not yet removable by VACUUM. The function
+ * returns the transaction ID, origin, and commit timestamp of the transaction
+ * that deleted this tuple.
+ *
+ * 'oldestxmin' acts as a cutoff transaction ID. Tuples deleted by transactions
+ * with IDs >= 'oldestxmin' are considered recently dead and are eligible for
+ * conflict detection.
+ *
+ * Instead of stopping at the first match, we scan all matching dead tuples to
+ * identify most recent deletion. This is crucial because only the latest
+ * deletion is relevant for resolving conflicts.
+ *
+ * For example, consider a scenario on the subscriber where a row is deleted,
+ * re-inserted, and then deleted again only on the subscriber:
+ *
+ *   - (pk, 1) - deleted at 9:00,
+ *   - (pk, 1) - deleted at 9:02,
+ *
+ * Now, a remote update arrives: (pk, 1) -> (pk, 2), timestamped at 9:01.
+ *
+ * If we mistakenly return the older deletion (9:00), the system may wrongly
+ * apply the remote update using a last-update-wins strategy. Instead, we must
+ * recognize the more recent deletion at 9:02 and skip the update. See
+ * comments atop worker.c for details. Note, as of now, conflict resolution
+ * is not implemented. Consequently, the system may incorrectly report the
+ * older tuple as the conflicted one, leading to misleading results.
+ *
+ * The commit timestamp of the deleting transaction is used to determine which
+ * tuple was deleted most recently.
+ */
+bool
+RelationFindDeletedTupleInfoSeq(Relation rel, TupleTableSlot *searchslot,
+                               TransactionId oldestxmin,
+                               TransactionId *delete_xid,
+                               RepOriginId *delete_origin,
+                               TimestampTz *delete_time)
+{
+   TupleTableSlot *scanslot;
+   TableScanDesc scan;
+   TypeCacheEntry **eq;
+   Bitmapset  *indexbitmap;
+   TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+   Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+
+   *delete_xid = InvalidTransactionId;
+   *delete_origin = InvalidRepOriginId;
+   *delete_time = 0;
+
+   /*
+    * If the relation has a replica identity key or a primary key that is
+    * unusable for locating deleted tuples (see
+    * IsIndexUsableForFindingDeletedTuple), a full table scan becomes
+    * necessary. In such cases, comparing the entire tuple is not required,
+    * since the remote tuple might not include all column values. Instead,
+    * the indexed columns alone are suffcient to identify the target tuple
+    * (see logicalrep_rel_mark_updatable).
+    */
+   indexbitmap = RelationGetIndexAttrBitmap(rel,
+                                            INDEX_ATTR_BITMAP_IDENTITY_KEY);
+
+   /* fallback to PK if no replica identity */
+   if (!indexbitmap)
+       indexbitmap = RelationGetIndexAttrBitmap(rel,
+                                                INDEX_ATTR_BITMAP_PRIMARY_KEY);
+
+   eq = palloc0(sizeof(*eq) * searchslot->tts_tupleDescriptor->natts);
+
+   /*
+    * Start a heap scan using SnapshotAny to identify dead tuples that are
+    * not visible under a standard MVCC snapshot. Tuples from transactions
+    * not yet committed or those just committed prior to the scan are
+    * excluded in update_most_recent_deletion_info().
+    */
+   scan = table_beginscan(rel, SnapshotAny, 0, NULL);
+   scanslot = table_slot_create(rel, NULL);
+
+   table_rescan(scan, NULL);
+
+   /* Try to find the tuple */
+   while (table_scan_getnextslot(scan, ForwardScanDirection, scanslot))
+   {
+       if (!tuples_equal(scanslot, searchslot, eq, indexbitmap))
+           continue;
+
+       update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+                                        delete_time, delete_origin);
+   }
+
+   table_endscan(scan);
+   ExecDropSingleTupleTableSlot(scanslot);
+
+   return *delete_time != 0;
+}
+
+/*
+ * Similar to RelationFindDeletedTupleInfoSeq() but using index scan to locate
+ * the deleted tuple.
+ */
+bool
+RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+                                   TupleTableSlot *searchslot,
+                                   TransactionId oldestxmin,
+                                   TransactionId *delete_xid,
+                                   RepOriginId *delete_origin,
+                                   TimestampTz *delete_time)
+{
+   Relation    idxrel;
+   ScanKeyData skey[INDEX_MAX_KEYS];
+   int         skey_attoff;
+   IndexScanDesc scan;
+   TupleTableSlot *scanslot;
+   TypeCacheEntry **eq = NULL;
+   bool        isIdxSafeToSkipDuplicates;
+   TupleDesc   desc PG_USED_FOR_ASSERTS_ONLY = RelationGetDescr(rel);
+
+   Assert(equalTupleDescs(desc, searchslot->tts_tupleDescriptor));
+   Assert(OidIsValid(idxoid));
+
+   *delete_xid = InvalidTransactionId;
+   *delete_time = 0;
+   *delete_origin = InvalidRepOriginId;
+
+   isIdxSafeToSkipDuplicates = (GetRelationIdentityOrPK(rel) == idxoid);
+
+   scanslot = table_slot_create(rel, NULL);
+
+   idxrel = index_open(idxoid, RowExclusiveLock);
+
+   /* Build scan key. */
+   skey_attoff = build_replindex_scan_key(skey, rel, idxrel, searchslot);
+
+   /*
+    * Start an index scan using SnapshotAny to identify dead tuples that are
+    * not visible under a standard MVCC snapshot. Tuples from transactions
+    * not yet committed or those just committed prior to the scan are
+    * excluded in update_most_recent_deletion_info().
+    */
+   scan = index_beginscan(rel, idxrel, SnapshotAny, NULL, skey_attoff, 0);
+
+   index_rescan(scan, skey, skey_attoff, NULL, 0);
+
+   /* Try to find the tuple */
+   while (index_getnext_slot(scan, ForwardScanDirection, scanslot))
+   {
+       /*
+        * Avoid expensive equality check if the index is primary key or
+        * replica identity index.
+        */
+       if (!isIdxSafeToSkipDuplicates)
+       {
+           if (eq == NULL)
+               eq = palloc0(sizeof(*eq) * scanslot->tts_tupleDescriptor->natts);
+
+           if (!tuples_equal(scanslot, searchslot, eq, NULL))
+               continue;
+       }
+
+       update_most_recent_deletion_info(scanslot, oldestxmin, delete_xid,
+                                        delete_time, delete_origin);
+   }
+
+   index_endscan(scan);
+
+   index_close(idxrel, NoLock);
+
+   ExecDropSingleTupleTableSlot(scanslot);
+
+   return *delete_time != 0;
+}
+
 /*
  * Find the tuple that violates the passed unique index (conflictindex).
  *
 
    [CT_UPDATE_EXISTS] = "update_exists",
    [CT_UPDATE_MISSING] = "update_missing",
    [CT_DELETE_ORIGIN_DIFFERS] = "delete_origin_differs",
+   [CT_UPDATE_DELETED] = "update_deleted",
    [CT_DELETE_MISSING] = "delete_missing",
    [CT_MULTIPLE_UNIQUE_CONFLICTS] = "multiple_unique_conflicts"
 };
        case CT_UPDATE_ORIGIN_DIFFERS:
        case CT_UPDATE_MISSING:
        case CT_DELETE_ORIGIN_DIFFERS:
+       case CT_UPDATE_DELETED:
        case CT_DELETE_MISSING:
            return errcode(ERRCODE_T_R_SERIALIZATION_FAILURE);
    }
 
            break;
 
+       case CT_UPDATE_DELETED:
+           if (localts)
+           {
+               if (localorigin == InvalidRepOriginId)
+                   appendStringInfo(&err_detail, _("The row to be updated was deleted locally in transaction %u at %s."),
+                                    localxmin, timestamptz_to_str(localts));
+               else if (replorigin_by_oid(localorigin, true, &origin_name))
+                   appendStringInfo(&err_detail, _("The row to be updated was deleted by a different origin \"%s\" in transaction %u at %s."),
+                                    origin_name, localxmin, timestamptz_to_str(localts));
+
+               /* The origin that modified this row has been removed. */
+               else
+                   appendStringInfo(&err_detail, _("The row to be updated was deleted by a non-existent origin in transaction %u at %s."),
+                                    localxmin, timestamptz_to_str(localts));
+           }
+           else
+               appendStringInfo(&err_detail, _("The row to be updated was deleted."));
+
+           break;
+
        case CT_UPDATE_MISSING:
            appendStringInfoString(&err_detail, _("Could not find the row to be updated."));
            break;
 
  * Each apply worker that enabled retain_dead_tuples option maintains a
  * non-removable transaction ID (oldest_nonremovable_xid) in shared memory to
  * prevent dead rows from being removed prematurely when the apply worker still
- * needs them to detect conflicts reliably. This helps to retain the required
- * commit_ts module information, which further helps to detect
- * update_origin_differs and delete_origin_differs conflicts reliably, as
+ * needs them to detect update_deleted conflicts. Additionally, this helps to
+ * retain the required commit_ts module information, which further helps to
+ * detect update_origin_differs and delete_origin_differs conflicts reliably, as
  * otherwise, vacuum freeze could remove the required information.
  *
  * The logical replication launcher manages an internal replication slot named
  * transactions that occurred concurrently with the tuple DELETE, any
  * subsequent UPDATE from a remote node should have a later timestamp. In such
  * cases, it is acceptable to detect an update_missing scenario and convert the
- * UPDATE to an INSERT when applying it. But, detecting concurrent remote
- * transactions with earlier timestamps than the DELETE is necessary, as the
- * UPDATEs in remote transactions should be ignored if their timestamp is
- * earlier than that of the dead tuples.
+ * UPDATE to an INSERT when applying it. But, for concurrent remote
+ * transactions with earlier timestamps than the DELETE, detecting
+ * update_deleted is necessary, as the UPDATEs in remote transactions should be
+ * ignored if their timestamp is earlier than that of the dead tuples.
  *
  * Note that advancing the non-removable transaction ID is not supported if the
  * publisher is also a physical standby. This is because the logical walsender
                                    Oid localidxoid,
                                    TupleTableSlot *remoteslot,
                                    TupleTableSlot **localslot);
+static bool FindDeletedTupleInLocalRel(Relation localrel,
+                                      Oid localidxoid,
+                                      TupleTableSlot *remoteslot,
+                                      TransactionId *delete_xid,
+                                      RepOriginId *delete_origin,
+                                      TimestampTz *delete_time);
 static void apply_handle_tuple_routing(ApplyExecutionData *edata,
                                       TupleTableSlot *remoteslot,
                                       LogicalRepTupleData *newtup,
    }
    else
    {
+       ConflictType type;
        TupleTableSlot *newslot = localslot;
 
+       /*
+        * Detecting whether the tuple was recently deleted or never existed
+        * is crucial to avoid misleading the user during confict handling.
+        */
+       if (FindDeletedTupleInLocalRel(localrel, localindexoid, remoteslot,
+                                      &conflicttuple.xmin,
+                                      &conflicttuple.origin,
+                                      &conflicttuple.ts) &&
+           conflicttuple.origin != replorigin_session_origin)
+           type = CT_UPDATE_DELETED;
+       else
+           type = CT_UPDATE_MISSING;
+
        /* Store the new tuple for conflict reporting */
        slot_store_data(newslot, relmapentry, newtup);
 
        /*
-        * The tuple to be updated could not be found.  Do nothing except for
-        * emitting a log message.
+        * The tuple to be updated could not be found or was deleted.  Do
+        * nothing except for emitting a log message.
         */
-       ReportApplyConflict(estate, relinfo, LOG, CT_UPDATE_MISSING,
-                           remoteslot, newslot, list_make1(&conflicttuple));
+       ReportApplyConflict(estate, relinfo, LOG, type, remoteslot, newslot,
+                           list_make1(&conflicttuple));
    }
 
    /* Cleanup. */
    return found;
 }
 
+/*
+ * Determine whether the index can reliably locate the deleted tuple in the
+ * local relation.
+ *
+ * An index may exclude deleted tuples if it was re-indexed or re-created during
+ * change application. Therefore, an index is considered usable only if the
+ * conflict detection slot.xmin (conflict_detection_xmin) is greater than the
+ * index tuple's xmin. This ensures that any tuples deleted prior to the index
+ * creation or re-indexing are not relevant for conflict detection in the
+ * current apply worker.
+ *
+ * Note that indexes may also be excluded if they were modified by other DDL
+ * operations, such as ALTER INDEX. However, this is acceptable, as the
+ * likelihood of such DDL changes coinciding with the need to scan dead
+ * tuples for the update_deleted is low.
+ */
+static bool
+IsIndexUsableForFindingDeletedTuple(Oid localindexoid,
+                                   TransactionId conflict_detection_xmin)
+{
+   HeapTuple   index_tuple;
+   TransactionId index_xmin;
+
+   index_tuple = SearchSysCache1(INDEXRELID, ObjectIdGetDatum(localindexoid));
+
+   if (!HeapTupleIsValid(index_tuple)) /* should not happen */
+       elog(ERROR, "cache lookup failed for index %u", localindexoid);
+
+   /*
+    * No need to check for a frozen transaction ID, as
+    * TransactionIdPrecedes() manages it internally, treating it as falling
+    * behind the conflict_detection_xmin.
+    */
+   index_xmin = HeapTupleHeaderGetXmin(index_tuple->t_data);
+
+   ReleaseSysCache(index_tuple);
+
+   return TransactionIdPrecedes(index_xmin, conflict_detection_xmin);
+}
+
+/*
+ * Attempts to locate a deleted tuple in the local relation that matches the
+ * values of the tuple received from the publication side (in 'remoteslot').
+ * The search is performed using either the replica identity index, primary
+ * key, other available index, or a sequential scan if necessary.
+ *
+ * Returns true if the deleted tuple is found. If found, the transaction ID,
+ * origin, and commit timestamp of the deletion are stored in '*delete_xid',
+ * '*delete_origin', and '*delete_time' respectively.
+ */
+static bool
+FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
+                          TupleTableSlot *remoteslot,
+                          TransactionId *delete_xid, RepOriginId *delete_origin,
+                          TimestampTz *delete_time)
+{
+   TransactionId oldestxmin;
+   ReplicationSlot *slot;
+
+   /*
+    * Return false if either dead tuples are not retained or commit timestamp
+    * data is not available.
+    */
+   if (!MySubscription->retaindeadtuples || !track_commit_timestamp)
+       return false;
+
+   /*
+    * For conflict detection, we use the conflict slot's xmin value instead
+    * of invoking GetOldestNonRemovableTransactionId(). The slot.xmin acts as
+    * a threshold to identify tuples that were recently deleted. These tuples
+    * are not visible to concurrent transactions, but we log an
+    * update_deleted conflict if such a tuple matches the remote update being
+    * applied.
+    *
+    * Although GetOldestNonRemovableTransactionId() can return a value older
+    * than the slot's xmin, for our current purpose it is acceptable to treat
+    * tuples deleted by transactions prior to slot.xmin as update_missing
+    * conflicts.
+    *
+    * Ideally, we would use oldest_nonremovable_xid, which is directly
+    * maintained by the leader apply worker. However, this value is not
+    * available to table synchronization or parallel apply workers, making
+    * slot.xmin a practical alternative in those contexts.
+    */
+   slot = SearchNamedReplicationSlot(CONFLICT_DETECTION_SLOT, true);
+
+   Assert(slot);
+
+   SpinLockAcquire(&slot->mutex);
+   oldestxmin = slot->data.xmin;
+   SpinLockRelease(&slot->mutex);
+
+   Assert(TransactionIdIsValid(oldestxmin));
+
+   if (OidIsValid(localidxoid) &&
+       IsIndexUsableForFindingDeletedTuple(localidxoid, oldestxmin))
+       return RelationFindDeletedTupleInfoByIndex(localrel, localidxoid,
+                                                  remoteslot, oldestxmin,
+                                                  delete_xid, delete_origin,
+                                                  delete_time);
+   else
+       return RelationFindDeletedTupleInfoSeq(localrel, remoteslot,
+                                              oldestxmin, delete_xid,
+                                              delete_origin, delete_time);
+}
+
 /*
  * This handles insert, update, delete on a partitioned table.
  */
                                                remoteslot_part, &localslot);
                if (!found)
                {
+                   ConflictType type;
                    TupleTableSlot *newslot = localslot;
 
+                   /*
+                    * Detecting whether the tuple was recently deleted or
+                    * never existed is crucial to avoid misleading the user
+                    * during confict handling.
+                    */
+                   if (FindDeletedTupleInLocalRel(partrel,
+                                                  part_entry->localindexoid,
+                                                  remoteslot_part,
+                                                  &conflicttuple.xmin,
+                                                  &conflicttuple.origin,
+                                                  &conflicttuple.ts) &&
+                       conflicttuple.origin != replorigin_session_origin)
+                       type = CT_UPDATE_DELETED;
+                   else
+                       type = CT_UPDATE_MISSING;
+
                    /* Store the new tuple for conflict reporting */
                    slot_store_data(newslot, part_entry, newtup);
 
                    /*
-                    * The tuple to be updated could not be found.  Do nothing
-                    * except for emitting a log message.
+                    * The tuple to be updated could not be found or was
+                    * deleted.  Do nothing except for emitting a log message.
                     */
                    ReportApplyConflict(estate, partrelinfo, LOG,
-                                       CT_UPDATE_MISSING, remoteslot_part,
-                                       newslot, list_make1(&conflicttuple));
+                                       type, remoteslot_part, newslot,
+                                       list_make1(&conflicttuple));
 
                    return;
                }
 {
    /*
     * It is sufficient to manage non-removable transaction ID for a
-    * subscription by the main apply worker to detect conflicts reliably even
-    * for table sync or parallel apply workers.
+    * subscription by the main apply worker to detect update_deleted reliably
+    * even for table sync or parallel apply workers.
     */
    if (!am_leader_apply_worker())
        return false;
     * We expect the publisher and subscriber clocks to be in sync using time
     * sync service like NTP. Otherwise, we will advance this worker's
     * oldest_nonremovable_xid prematurely, leading to the removal of rows
-    * required to detect conflicts reliably. This check primarily addresses
-    * scenarios where the publisher's clock falls behind; if the publisher's
-    * clock is ahead, subsequent transactions will naturally bear later
-    * commit timestamps, conforming to the design outlined atop worker.c.
+    * required to detect update_deleted reliably. This check primarily
+    * addresses scenarios where the publisher's clock falls behind; if the
+    * publisher's clock is ahead, subsequent transactions will naturally bear
+    * later commit timestamps, conforming to the design outlined atop
+    * worker.c.
     *
     * XXX Consider waiting for the publisher's clock to catch up with the
     * subscriber's before proceeding to the next phase.
 
 Datum
 pg_stat_get_subscription_stats(PG_FUNCTION_ARGS)
 {
-#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    11
+#define PG_STAT_GET_SUBSCRIPTION_STATS_COLS    12
    Oid         subid = PG_GETARG_OID(0);
    TupleDesc   tupdesc;
    Datum       values[PG_STAT_GET_SUBSCRIPTION_STATS_COLS] = {0};
                       INT8OID, -1, 0);
    TupleDescInitEntry(tupdesc, (AttrNumber) 6, "confl_update_exists",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_missing",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 7, "confl_update_deleted",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_delete_origin_differs",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 8, "confl_update_missing",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_missing",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 9, "confl_delete_origin_differs",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_multiple_unique_conflicts",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 10, "confl_delete_missing",
                       INT8OID, -1, 0);
-   TupleDescInitEntry(tupdesc, (AttrNumber) 11, "stats_reset",
+   TupleDescInitEntry(tupdesc, (AttrNumber) 11, "confl_multiple_unique_conflicts",
+                      INT8OID, -1, 0);
+   TupleDescInitEntry(tupdesc, (AttrNumber) 12, "stats_reset",
                       TIMESTAMPTZOID, -1, 0);
    BlessTupleDesc(tupdesc);
 
 
  */
 
 /*                         yyyymmddN */
-#define CATALOG_VERSION_NO 202507231
+#define CATALOG_VERSION_NO 202508041
 
 #endif
 
 { oid => '6231', descr => 'statistics: information about subscription stats',
   proname => 'pg_stat_get_subscription_stats', provolatile => 's',
   proparallel => 'r', prorettype => 'record', proargtypes => 'oid',
-  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
-  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o}',
-  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
+  proallargtypes => '{oid,oid,int8,int8,int8,int8,int8,int8,int8,int8,int8,int8,timestamptz}',
+  proargmodes => '{i,o,o,o,o,o,o,o,o,o,o,o,o}',
+  proargnames => '{subid,subid,apply_error_count,sync_error_count,confl_insert_exists,confl_update_origin_differs,confl_update_exists,confl_update_deleted,confl_update_missing,confl_delete_origin_differs,confl_delete_missing,confl_multiple_unique_conflicts,stats_reset}',
   prosrc => 'pg_stat_get_subscription_stats' },
 { oid => '6118', descr => 'statistics: information about subscription',
   proname => 'pg_stat_get_subscription', prorows => '10', proisstrict => 'f',
 
 #ifndef EXECUTOR_H
 #define EXECUTOR_H
 
+#include "datatype/timestamp.h"
 #include "executor/execdesc.h"
 #include "fmgr.h"
 #include "nodes/lockoptions.h"
                                         TupleTableSlot *outslot);
 extern bool RelationFindReplTupleSeq(Relation rel, LockTupleMode lockmode,
                                     TupleTableSlot *searchslot, TupleTableSlot *outslot);
-
+extern bool RelationFindDeletedTupleInfoSeq(Relation rel,
+                                           TupleTableSlot *searchslot,
+                                           TransactionId oldestxmin,
+                                           TransactionId *delete_xid,
+                                           RepOriginId *delete_origin,
+                                           TimestampTz *delete_time);
+extern bool RelationFindDeletedTupleInfoByIndex(Relation rel, Oid idxoid,
+                                               TupleTableSlot *searchslot,
+                                               TransactionId oldestxmin,
+                                               TransactionId *delete_xid,
+                                               RepOriginId *delete_origin,
+                                               TimestampTz *delete_time);
 extern void ExecSimpleRelationInsert(ResultRelInfo *resultRelInfo,
                                     EState *estate, TupleTableSlot *slot);
 extern void ExecSimpleRelationUpdate(ResultRelInfo *resultRelInfo,
 
    /* The updated row value violates unique constraint */
    CT_UPDATE_EXISTS,
 
+   /* The row to be updated was concurrently deleted by a different origin */
+   CT_UPDATE_DELETED,
+
    /* The row to be updated is missing */
    CT_UPDATE_MISSING,
 
 
    bool        parallel_apply;
 
    /*
-    * The changes made by this and later transactions must be retained to
-    * ensure reliable conflict detection during the apply phase.
+    * Changes made by this transaction and subsequent ones must be preserved.
+    * This ensures that update_deleted conflicts can be accurately detected
+    * during the apply phase of logical replication by this worker.
     *
     * The logical replication launcher manages an internal replication slot
     * named "pg_conflict_detection". It asynchronously collects this ID to
 
     ss.confl_insert_exists,
     ss.confl_update_origin_differs,
     ss.confl_update_exists,
+    ss.confl_update_deleted,
     ss.confl_update_missing,
     ss.confl_delete_origin_differs,
     ss.confl_delete_missing,
     ss.confl_multiple_unique_conflicts,
     ss.stats_reset
    FROM pg_subscription s,
-    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
+    LATERAL pg_stat_get_subscription_stats(s.oid) ss(subid, apply_error_count, sync_error_count, confl_insert_exists, confl_update_origin_differs, confl_update_exists, confl_update_deleted, confl_update_missing, confl_delete_origin_differs, confl_delete_missing, confl_multiple_unique_conflicts, stats_reset);
 pg_stat_sys_indexes| SELECT relid,
     indexrelid,
     schemaname,
 
 # Setup a bidirectional logical replication between node_A & node_B
 ###############################################################################
 
-# Initialize nodes.
+# Initialize nodes. Enable the track_commit_timestamp on both nodes to detect
+# the conflict when attempting to update a row that was previously modified by
+# a different origin.
 
 # node_A. Increase the log_min_messages setting to DEBUG2 to debug test
 # failures. Disable autovacuum to avoid generating xid that could affect the
 my $node_A = $node_publisher;
 $node_A->append_conf(
    'postgresql.conf',
-   qq{autovacuum = off
+   qq{track_commit_timestamp = on
+   autovacuum = off
    log_min_messages = 'debug2'});
 $node_A->restart;
 
 ###############################################################################
 # Check that dead tuples on node A cannot be cleaned by VACUUM until the
 # concurrent transactions on Node B have been applied and flushed on Node A.
+# Also, check that an update_deleted conflict is detected when updating a row
+# that was deleted by a different origin.
 ###############################################################################
 
 # Insert a record
    "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
 );
 
+my $log_location = -s $node_B->logfile;
+
 $node_B->safe_psql('postgres', "UPDATE tab SET b = 3 WHERE a = 1;");
 $node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 1;");
 
      qr/1 are dead but not yet removable/,
    'the deleted column is non-removable');
 
+# Ensure the DELETE is replayed on Node B
+$node_A->wait_for_catchup($subname_BA);
+
+# Check the conflict detected on Node B
+my $logfile = slurp_file($node_B->logfile(), $log_location);
+ok( $logfile =~
+     qr/conflict detected on relation "public.tab": conflict=delete_origin_differs.*
+.*DETAIL:.* Deleting the row that was modified locally in transaction [0-9]+ at .*
+.*Existing local tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+   'delete target row was modified in tab');
+
+$log_location = -s $node_A->logfile;
+
 $node_A->safe_psql(
    'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
 $node_B->wait_for_catchup($subname_AB);
 
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+     qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(1, 3\); replica identity \(a\)=\(1\)/,
+   'update target row was deleted in tab');
+
 # Remember the next transaction ID to be assigned
 my $next_xid = $node_A->safe_psql('postgres', "SELECT txid_current() + 1;");
 
      qr/1 removed, 1 remain, 0 are dead but not yet removable/,
    'the deleted column is removed');
 
+###############################################################################
+# Ensure that the deleted tuple needed to detect an update_deleted conflict is
+# accessible via a sequential table scan.
+###############################################################################
+
+# Drop the primary key from tab on node A and set REPLICA IDENTITY to FULL to
+# enforce sequential scanning of the table.
+$node_A->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_B->safe_psql('postgres', "ALTER TABLE tab REPLICA IDENTITY FULL");
+$node_A->safe_psql('postgres', "ALTER TABLE tab DROP CONSTRAINT tab_pkey;");
+
+# Disable the logical replication from node B to node A
+$node_A->safe_psql('postgres', "ALTER SUBSCRIPTION $subname_AB DISABLE");
+
+# Wait for the apply worker to stop
+$node_A->poll_query_until('postgres',
+   "SELECT count(*) = 0 FROM pg_stat_activity WHERE backend_type = 'logical replication apply worker'"
+);
+
+$node_B->safe_psql('postgres', "UPDATE tab SET b = 4 WHERE a = 2;");
+$node_A->safe_psql('postgres', "DELETE FROM tab WHERE a = 2;");
+
+$log_location = -s $node_A->logfile;
+
+$node_A->safe_psql(
+   'postgres', "ALTER SUBSCRIPTION $subname_AB ENABLE;");
+$node_B->wait_for_catchup($subname_AB);
+
+$logfile = slurp_file($node_A->logfile(), $log_location);
+ok( $logfile =~
+     qr/conflict detected on relation "public.tab": conflict=update_deleted.*
+.*DETAIL:.* The row to be updated was deleted locally in transaction [0-9]+ at .*
+.*Remote tuple \(2, 4\); replica identity full \(2, 2\)/,
+   'update target row was deleted in tab');
+
 ###############################################################################
 # Check that the replication slot pg_conflict_detection is dropped after
 # removing all the subscriptions.