--- /dev/null
+
+# Copyright (c) 2021, PostgreSQL Global Development Group
+
+# Test CREATE INDEX CONCURRENTLY with concurrent prepared-xact modifications
+use strict;
+use warnings;
+
+use Config;
+use PostgresNode;
+use TestLib;
+
+use Test::More tests => 6;
+
+my ($node, $result);
+
+#
+# Test set-up
+#
+$node = get_new_node('CIC_2PC_test');
+$node->init;
+$node->append_conf('postgresql.conf', 'max_prepared_transactions = 10');
+$node->append_conf('postgresql.conf', 'lock_timeout = 180000');
+$node->start;
+$node->safe_psql('postgres', q(CREATE EXTENSION amcheck));
+$node->safe_psql('postgres', q(CREATE TABLE tbl(i int)));
+
+
+#
+# Run 3 overlapping 2PC transactions with CIC
+#
+# We have two concurrent background psql processes: $main_h for INSERTs and
+# $cic_h for CIC.  Also, we use non-background psql for some COMMIT PREPARED
+# statements.
+#
+
+my $main_in    = '';
+my $main_out   = '';
+my $main_timer = IPC::Run::timeout(180);
+
+my $main_h =
+  $node->background_psql('postgres', \$main_in, \$main_out,
+   $main_timer, on_error_stop => 1);
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint1
+);
+pump $main_h until $main_out =~ /syncpoint1/ || $main_timer->is_expired;
+
+my $cic_in    = '';
+my $cic_out   = '';
+my $cic_timer = IPC::Run::timeout(180);
+my $cic_h =
+  $node->background_psql('postgres', \$cic_in, \$cic_out,
+   $cic_timer, on_error_stop => 1);
+$cic_in .= q(
+\echo start
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $cic_h until $cic_out =~ /start/ || $cic_timer->is_expired;
+
+$main_in .= q(
+PREPARE TRANSACTION 'a';
+);
+
+$main_in .= q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint2
+);
+pump $main_h until $main_out =~ /syncpoint2/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'a';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'b';
+BEGIN;
+INSERT INTO tbl VALUES(0);
+\echo syncpoint3
+);
+pump $main_h until $main_out =~ /syncpoint3/ || $main_timer->is_expired;
+
+$node->safe_psql('postgres', q(COMMIT PREPARED 'b';));
+
+$main_in .= q(
+PREPARE TRANSACTION 'c';
+COMMIT PREPARED 'c';
+);
+$main_h->pump_nb;
+
+$main_h->finish;
+$cic_h->finish;
+
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after overlapping 2PC');
+
+
+#
+# Server restart shall not change whether prepared xact blocks CIC
+#
+
+$node->safe_psql(
+   'postgres', q(
+BEGIN;
+INSERT INTO tbl VALUES(0);
+PREPARE TRANSACTION 'spans_restart';
+BEGIN;
+CREATE TABLE unused ();
+PREPARE TRANSACTION 'persists_forever';
+));
+$node->restart;
+
+my $reindex_in    = '';
+my $reindex_out   = '';
+my $reindex_timer = IPC::Run::timeout(180);
+my $reindex_h =
+  $node->background_psql('postgres', \$reindex_in, \$reindex_out,
+   $reindex_timer, on_error_stop => 1);
+$reindex_in .= q(
+\echo start
+DROP INDEX CONCURRENTLY idx;
+CREATE INDEX CONCURRENTLY idx ON tbl(i);
+);
+pump $reindex_h until $reindex_out =~ /start/ || $reindex_timer->is_expired;
+
+$node->safe_psql('postgres', "COMMIT PREPARED 'spans_restart'");
+$reindex_h->finish;
+$result = $node->psql('postgres', q(SELECT bt_index_check('idx',true)));
+is($result, '0', 'bt_index_check after 2PC and restart');
+
+
+#
+# Stress CIC+2PC with pgbench
+#
+
+# Fix broken index first
+$node->safe_psql('postgres', q(REINDEX TABLE tbl;));
+
+# Run background pgbench with CIC. We cannot mix-in this script into single
+# pgbench: CIC will deadlock with itself occasionally.
+my $pgbench_out   = '';
+my $pgbench_timer = IPC::Run::timeout(180);
+my $pgbench_h     = $node->background_pgbench(
+   '--no-vacuum --client=1 --transactions=100',
+   {
+       '002_pgbench_concurrent_cic' => q(
+           DROP INDEX CONCURRENTLY idx;
+           CREATE INDEX CONCURRENTLY idx ON tbl(i);
+           SELECT bt_index_check('idx',true);
+          )
+   },
+   \$pgbench_out,
+   $pgbench_timer);
+
+# Run pgbench.
+$node->pgbench(
+   '--no-vacuum --client=5 --transactions=100',
+   0,
+   [qr{actually processed}],
+   [qr{^$}],
+   'concurrent INSERTs w/ 2PC',
+   {
+       '002_pgbench_concurrent_2pc' => q(
+           BEGIN;
+           INSERT INTO tbl VALUES(0);
+           PREPARE TRANSACTION 'c:client_id';
+           COMMIT PREPARED 'c:client_id';
+         ),
+       '002_pgbench_concurrent_2pc_savepoint' => q(
+           BEGIN;
+           SAVEPOINT s1;
+           INSERT INTO tbl VALUES(0);
+           PREPARE TRANSACTION 'c:client_id';
+           COMMIT PREPARED 'c:client_id';
+         )
+   });
+
+$pgbench_h->pump_nb;
+$pgbench_h->finish();
+$result =
+    ($Config{osname} eq "MSWin32")
+  ? ($pgbench_h->full_results)[0]
+  : $pgbench_h->result(0);
+is($result, 0, "pgbench with CIC works");
+
+# done
+$node->stop;
+done_testing();
 
    proc->pgprocno = gxact->pgprocno;
    SHMQueueElemInit(&(proc->links));
    proc->waitStatus = PROC_WAIT_STATUS_OK;
-   /* We set up the gxact's VXID as InvalidBackendId/XID */
-   proc->lxid = (LocalTransactionId) xid;
+   if (LocalTransactionIdIsValid(MyProc->lxid))
+   {
+       /* clone VXID, for TwoPhaseGetXidByVirtualXID() to find */
+       proc->lxid = MyProc->lxid;
+       proc->backendId = MyBackendId;
+   }
+   else
+   {
+       Assert(AmStartupProcess() || !IsPostmasterEnvironment);
+       /* GetLockConflicts() uses this to specify a wait on the XID */
+       proc->lxid = xid;
+       proc->backendId = InvalidBackendId;
+   }
    proc->xid = xid;
    Assert(proc->xmin == InvalidTransactionId);
    proc->delayChkpt = false;
    proc->statusFlags = 0;
    proc->pid = 0;
-   proc->backendId = InvalidBackendId;
    proc->databaseId = databaseid;
    proc->roleId = owner;
    proc->tempNamespaceId = InvalidOid;
    return result;
 }
 
+/*
+ * TwoPhaseGetXidByVirtualXID
+ *     Lookup VXID among xacts prepared since last startup.
+ *
+ * (This won't find recovered xacts.)  If more than one matches, return any
+ * and set "have_more" to true.  To witness multiple matches, a single
+ * BackendId must consume 2^32 LXIDs, with no intervening database restart.
+ */
+TransactionId
+TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+                          bool *have_more)
+{
+   int         i;
+   TransactionId result = InvalidTransactionId;
+
+   Assert(VirtualTransactionIdIsValid(vxid));
+   LWLockAcquire(TwoPhaseStateLock, LW_SHARED);
+
+   for (i = 0; i < TwoPhaseState->numPrepXacts; i++)
+   {
+       GlobalTransaction gxact = TwoPhaseState->prepXacts[i];
+       PGPROC     *proc;
+       VirtualTransactionId proc_vxid;
+
+       if (!gxact->valid)
+           continue;
+       proc = &ProcGlobal->allProcs[gxact->pgprocno];
+       GET_VXID_FROM_PGPROC(proc_vxid, *proc);
+       if (VirtualTransactionIdEquals(vxid, proc_vxid))
+       {
+           /* Startup process sets proc->backendId to InvalidBackendId. */
+           Assert(!gxact->inredo);
+
+           if (result != InvalidTransactionId)
+           {
+               *have_more = true;
+               break;
+           }
+           result = gxact->xid;
+       }
+   }
+
+   LWLockRelease(TwoPhaseStateLock);
+
+   return result;
+}
+
 /*
  * TwoPhaseGetDummyBackendId
  *     Get the dummy backend ID for prepared transaction specified by XID
 
    /* Reset XactLastRecEnd until the next transaction writes something */
    XactLastRecEnd = 0;
 
+   /*
+    * Transfer our locks to a dummy PGPROC.  This has to be done before
+    * ProcArrayClearTransaction().  Otherwise, a GetLockConflicts() would
+    * conclude "xact already committed or aborted" for our locks.
+    */
+   PostPrepare_Locks(xid);
+
    /*
     * Let others know about no transaction in progress by me.  This has to be
     * done *after* the prepared transaction has been marked valid, else
 
    PostPrepare_MultiXact(xid);
 
-   PostPrepare_Locks(xid);
    PostPrepare_PredicateLocks(xid);
 
    ResourceOwnerRelease(TopTransactionResourceOwner,
 
  * To do this, obtain the current list of lockers, and wait on their VXIDs
  * until they are finished.
  *
- * Note we don't try to acquire the locks on the given locktags, only the VXIDs
- * of its lock holders; if somebody grabs a conflicting lock on the objects
- * after we obtained our initial list of lockers, we will not wait for them.
+ * Note we don't try to acquire the locks on the given locktags, only the
+ * VXIDs and XIDs of their lock holders; if somebody grabs a conflicting lock
+ * on the objects after we obtained our initial list of lockers, we will not
+ * wait for them.
  */
 void
 WaitForLockersMultiple(List *locktags, LOCKMODE lockmode, bool progress)
 
  * The result array is palloc'd and is terminated with an invalid VXID.
  * *countp, if not null, is updated to the number of items set.
  *
- * Of course, the result could be out of date by the time it's returned,
- * so use of this function has to be thought about carefully.
+ * Of course, the result could be out of date by the time it's returned, so
+ * use of this function has to be thought about carefully.  Similarly, a
+ * PGPROC with no "lxid" will be considered non-conflicting regardless of any
+ * lock it holds.  Existing callers don't care about a locker after that
+ * locker's pg_xact updates complete.  CommitTransaction() clears "lxid" after
+ * pg_xact updates and before releasing locks.
  *
  * Note we never include the current xact's vxid in the result array,
  * since an xact never blocks itself.
    }
 }
 
+/*
+ *     XactLockForVirtualXact
+ *
+ * If TransactionIdIsValid(xid), this is essentially XactLockTableWait(xid,
+ * NULL, NULL, XLTW_None) or ConditionalXactLockTableWait(xid).  Unlike those
+ * functions, it assumes "xid" is never a subtransaction and that "xid" is
+ * prepared, committed, or aborted.
+ *
+ * If !TransactionIdIsValid(xid), this locks every prepared XID having been
+ * known as "vxid" before its PREPARE TRANSACTION.
+ */
+static bool
+XactLockForVirtualXact(VirtualTransactionId vxid,
+                      TransactionId xid, bool wait)
+{
+   bool        more = false;
+
+   /* There is no point to wait for 2PCs if you have no 2PCs. */
+   if (max_prepared_xacts == 0)
+       return true;
+
+   do
+   {
+       LockAcquireResult lar;
+       LOCKTAG     tag;
+
+       /* Clear state from previous iterations. */
+       if (more)
+       {
+           xid = InvalidTransactionId;
+           more = false;
+       }
+
+       /* If we have no xid, try to find one. */
+       if (!TransactionIdIsValid(xid))
+           xid = TwoPhaseGetXidByVirtualXID(vxid, &more);
+       if (!TransactionIdIsValid(xid))
+       {
+           Assert(!more);
+           return true;
+       }
+
+       /* Check or wait for XID completion. */
+       SET_LOCKTAG_TRANSACTION(tag, xid);
+       lar = LockAcquire(&tag, ShareLock, false, !wait);
+       if (lar == LOCKACQUIRE_NOT_AVAIL)
+           return false;
+       LockRelease(&tag, ShareLock, false);
+   } while (more);
+
+   return true;
+}
+
 /*
  *     VirtualXactLock
  *
- * If wait = true, wait until the given VXID has been released, and then
- * return true.
+ * If wait = true, wait as long as the given VXID or any XID acquired by the
+ * same transaction is still running.  Then, return true.
  *
- * If wait = false, just check whether the VXID is still running, and return
- * true or false.
+ * If wait = false, just check whether that VXID or one of those XIDs is still
+ * running, and return true or false.
  */
 bool
 VirtualXactLock(VirtualTransactionId vxid, bool wait)
 {
    LOCKTAG     tag;
    PGPROC     *proc;
+   TransactionId xid = InvalidTransactionId;
 
    Assert(VirtualTransactionIdIsValid(vxid));
 
-   if (VirtualTransactionIdIsPreparedXact(vxid))
-   {
-       LockAcquireResult lar;
-
-       /*
-        * Prepared transactions don't hold vxid locks.  The
-        * LocalTransactionId is always a normal, locked XID.
-        */
-       SET_LOCKTAG_TRANSACTION(tag, vxid.localTransactionId);
-       lar = LockAcquire(&tag, ShareLock, false, !wait);
-       if (lar != LOCKACQUIRE_NOT_AVAIL)
-           LockRelease(&tag, ShareLock, false);
-       return lar != LOCKACQUIRE_NOT_AVAIL;
-   }
+   if (VirtualTransactionIdIsRecoveredPreparedXact(vxid))
+       /* no vxid lock; localTransactionId is a normal, locked XID */
+       return XactLockForVirtualXact(vxid, vxid.localTransactionId, wait);
 
    SET_LOCKTAG_VIRTUALTRANSACTION(tag, vxid);
 
     */
    proc = BackendIdGetProc(vxid.backendId);
    if (proc == NULL)
-       return true;
+       return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
 
    /*
     * We must acquire this lock before checking the backendId and lxid
     */
    LWLockAcquire(&proc->fpInfoLock, LW_EXCLUSIVE);
 
-   /* If the transaction has ended, our work here is done. */
    if (proc->backendId != vxid.backendId
        || proc->fpLocalTransactionId != vxid.localTransactionId)
    {
+       /* VXID ended */
        LWLockRelease(&proc->fpInfoLock);
-       return true;
+       return XactLockForVirtualXact(vxid, InvalidTransactionId, wait);
    }
 
    /*
        proc->fpVXIDLock = false;
    }
 
+   /*
+    * If the proc has an XID now, we'll avoid a TwoPhaseGetXidByVirtualXID()
+    * search.  The proc might have assigned this XID but not yet locked it,
+    * in which case the proc will lock this XID before releasing the VXID.
+    * The fpInfoLock critical section excludes VirtualXactLockTableCleanup(),
+    * so we won't save an XID of a different VXID.  It doesn't matter whether
+    * we save this before or after setting up the primary lock table entry.
+    */
+   xid = proc->xid;
+
    /* Done with proc->fpLockBits */
    LWLockRelease(&proc->fpInfoLock);
 
    (void) LockAcquire(&tag, ShareLock, false, false);
 
    LockRelease(&tag, ShareLock, false);
-   return true;
+   return XactLockForVirtualXact(vxid, xid, wait);
 }
 
 /*
 
  * (XXX is it worth testing likewise for duplicate catcache flush entries?
  * Probably not.)
  *
+ * Many subsystems own higher-level caches that depend on relcache and/or
+ * catcache, and they register callbacks here to invalidate their caches.
+ * While building a higher-level cache entry, a backend may receive a
+ * callback for the being-built entry or one of its dependencies.  This
+ * implies the new higher-level entry would be born stale, and it might
+ * remain stale for the life of the backend.  Many caches do not prevent
+ * that.  They rely on DDL for can't-miss catalog changes taking
+ * AccessExclusiveLock on suitable objects.  (For a change made with less
+ * locking, backends might never read the change.)  The relation cache,
+ * however, needs to reflect changes from CREATE INDEX CONCURRENTLY no later
+ * than the beginning of the next transaction.  Hence, when a relevant
+ * invalidation callback arrives during a build, relcache.c reattempts that
+ * build.  Caches with similar needs could do likewise.
+ *
  * If a relcache flush is issued for a system relation that we preload
  * from the relcache init file, we must also delete the init file so that
  * it will be rebuilt during the next backend restart.  The actual work of
 
 extern void AtAbort_Twophase(void);
 extern void PostPrepare_Twophase(void);
 
+extern TransactionId TwoPhaseGetXidByVirtualXID(VirtualTransactionId vxid,
+                                               bool *have_more);
 extern PGPROC *TwoPhaseGetDummyProc(TransactionId xid, bool lock_held);
 extern BackendId TwoPhaseGetDummyBackendId(TransactionId xid, bool lock_held);
 
 
 
 /*
  * Top-level transactions are identified by VirtualTransactionIDs comprising
- * PGPROC fields backendId and lxid.  For prepared transactions, the
- * LocalTransactionId is an ordinary XID.  These are guaranteed unique over
- * the short term, but will be reused after a database restart or XID
- * wraparound; hence they should never be stored on disk.
+ * PGPROC fields backendId and lxid.  For recovered prepared transactions, the
+ * LocalTransactionId is an ordinary XID; LOCKTAG_VIRTUALTRANSACTION never
+ * refers to that kind.  These are guaranteed unique over the short term, but
+ * will be reused after a database restart or XID wraparound; hence they
+ * should never be stored on disk.
  *
  * Note that struct VirtualTransactionId can not be assumed to be atomically
  * assignable as a whole.  However, type LocalTransactionId is assumed to
 #define LocalTransactionIdIsValid(lxid) ((lxid) != InvalidLocalTransactionId)
 #define VirtualTransactionIdIsValid(vxid) \
    (LocalTransactionIdIsValid((vxid).localTransactionId))
-#define VirtualTransactionIdIsPreparedXact(vxid) \
+#define VirtualTransactionIdIsRecoveredPreparedXact(vxid) \
    ((vxid).backendId == InvalidBackendId)
 #define VirtualTransactionIdEquals(vxid1, vxid2) \
    ((vxid1).backendId == (vxid2).backendId && \