* Copyright (c) 2000-2006, PostgreSQL Global Development Group
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/access/transam/varsup.c,v 1.72 2006/07/14 14:52:17 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/access/transam/varsup.c,v 1.73 2006/09/03 15:59:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
     */
    if (MyProc != NULL)
    {
+       /*
+        * Use volatile pointer to prevent code rearrangement; other backends
+        * could be examining my subxids info concurrently, and we don't
+        * want them to see an invalid intermediate state, such as
+        * incrementing nxids before filling the array entry.  Note we are
+        * assuming that TransactionId and int fetch/store are atomic.
+        */
+       volatile PGPROC *myproc = MyProc;
+
        if (!isSubXact)
-           MyProc->xid = xid;
+           myproc->xid = xid;
        else
        {
-           if (MyProc->subxids.nxids < PGPROC_MAX_CACHED_SUBXIDS)
+           int     nxids = myproc->subxids.nxids;
+
+           if (nxids < PGPROC_MAX_CACHED_SUBXIDS)
            {
-               MyProc->subxids.xids[MyProc->subxids.nxids] = xid;
-               MyProc->subxids.nxids++;
+               myproc->subxids.xids[nxids] = xid;
+               myproc->subxids.nxids = nxids + 1;
            }
            else
-               MyProc->subxids.overflowed = true;
+               myproc->subxids.overflowed = true;
        }
    }
 
 
  *
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.16 2006/07/30 20:17:11 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/storage/ipc/procarray.c,v 1.17 2006/09/03 15:59:38 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
  * This ensures that the set of transactions seen as "running" by the
  * current xact will not change after it takes the snapshot.
  *
- * Note that only top-level XIDs are included in the snapshot. We can
- * still apply the xmin and xmax limits to subtransaction XIDs, but we
- * need to work a bit harder to see if XIDs in [xmin..xmax) are running.
+ * All running top-level XIDs are included in the snapshot.  We also try
+ * to include running subtransaction XIDs, but since PGPROC has only a
+ * limited cache area for subxact XIDs, full information may not be
+ * available.  If we find any overflowed subxid arrays, we have to mark
+ * the snapshot's subxid data as overflowed, and extra work will need to
+ * be done to determine what's running (see XidInSnapshot() in tqual.c).
  *
  * We also update the following backend-global variables:
  *     TransactionXmin: the oldest xmin of any snapshot in use in the
    TransactionId globalxmin;
    int         index;
    int         count = 0;
+   int         subcount = 0;
 
    Assert(snapshot != NULL);
 
    /*
     * Allocating space for maxProcs xids is usually overkill; numProcs would
     * be sufficient.  But it seems better to do the malloc while not holding
-    * the lock, so we can't look at numProcs.
+    * the lock, so we can't look at numProcs.  Likewise, we allocate much
+    * more subxip storage than is probably needed.
     *
     * This does open a possibility for avoiding repeated malloc/free: since
     * maxProcs does not change at runtime, we can simply reuse the previous
-    * xip array if any.  (This relies on the fact that all callers pass
+    * xip arrays if any.  (This relies on the fact that all callers pass
     * static SnapshotData structs.)
     */
    if (snapshot->xip == NULL)
            ereport(ERROR,
                    (errcode(ERRCODE_OUT_OF_MEMORY),
                     errmsg("out of memory")));
+       Assert(snapshot->subxip == NULL);
+       snapshot->subxip = (TransactionId *)
+           malloc(arrayP->maxProcs * PGPROC_MAX_CACHED_SUBXIDS * sizeof(TransactionId));
+       if (snapshot->subxip == NULL)
+           ereport(ERROR,
+                   (errcode(ERRCODE_OUT_OF_MEMORY),
+                    errmsg("out of memory")));
    }
 
    globalxmin = xmin = GetTopTransactionId();
 
    /*
-    * If we are going to set MyProc->xmin then we'd better get exclusive
-    * lock; if not, this is a read-only operation so it can be shared.
+    * It is sufficient to get shared lock on ProcArrayLock, even if we
+    * are computing a serializable snapshot and therefore will be setting
+    * MyProc->xmin.  This is because any two backends that have overlapping
+    * shared holds on ProcArrayLock will certainly compute the same xmin
+    * (since no xact, in particular not the oldest, can exit the set of
+    * running transactions while we hold ProcArrayLock --- see further
+    * discussion just below).  So it doesn't matter whether another backend
+    * concurrently doing GetSnapshotData or GetOldestXmin sees our xmin as
+    * set or not; he'd compute the same xmin for himself either way.
     */
-   LWLockAcquire(ProcArrayLock, serializable ? LW_EXCLUSIVE : LW_SHARED);
+   LWLockAcquire(ProcArrayLock, LW_SHARED);
 
    /*--------------------
     * Unfortunately, we have to call ReadNewTransactionId() after acquiring
        if (TransactionIdIsNormal(xid))
            if (TransactionIdPrecedes(xid, globalxmin))
                globalxmin = xid;
+
+       /*
+        * Save subtransaction XIDs if possible (if we've already overflowed,
+        * there's no point).  Note that the subxact XIDs must be later than
+        * their parent, so no need to check them against xmin.
+        *
+        * The other backend can add more subxids concurrently, but cannot
+        * remove any.  Hence it's important to fetch nxids just once.
+        * Should be safe to use memcpy, though.  (We needn't worry about
+        * missing any xids added concurrently, because they must postdate
+        * xmax.)
+        */
+       if (subcount >= 0)
+       {
+           if (proc->subxids.overflowed)
+               subcount = -1;                  /* overflowed */
+           else
+           {
+               int     nxids = proc->subxids.nxids;
+
+               if (nxids > 0)
+               {
+                   memcpy(snapshot->subxip + subcount,
+                          proc->subxids.xids,
+                          nxids * sizeof(TransactionId));
+                   subcount += nxids;
+               }
+           }
+       }
    }
 
    if (serializable)
    snapshot->xmin = xmin;
    snapshot->xmax = xmax;
    snapshot->xcnt = count;
+   snapshot->subxcnt = subcount;
 
    snapshot->curcid = GetCurrentCommandId();
 
    int         i,
                j;
 
-   Assert(!TransactionIdEquals(xid, InvalidTransactionId));
+   Assert(TransactionIdIsValid(xid));
 
    /*
     * We must hold ProcArrayLock exclusively in order to remove transactions
 
  * Portions Copyright (c) 1994, Regents of the University of California
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.95 2006/07/13 17:47:01 momjian Exp $
+ *   $PostgreSQL: pgsql/src/backend/utils/time/tqual.c,v 1.96 2006/09/03 15:59:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 TransactionId RecentXmin = InvalidTransactionId;
 TransactionId RecentGlobalXmin = InvalidTransactionId;
 
+/* local functions */
+static bool XidInSnapshot(TransactionId xid, Snapshot snapshot);
+
 
 /*
  * HeapTupleSatisfiesItself
    /*
     * By here, the inserting transaction has committed - have to check
     * when...
-    *
-    * Note that the provided snapshot contains only top-level XIDs, so we
-    * have to convert a subxact XID to its parent for comparison. However, we
-    * can make first-pass range checks with the given XID, because a subxact
-    * with XID < xmin has surely also got a parent with XID < xmin, while one
-    * with XID >= xmax must belong to a parent that was not yet committed at
-    * the time of this snapshot.
     */
-   if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmin(tuple),
-                                    snapshot->xmin))
-   {
-       TransactionId parentXid;
-
-       if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmin(tuple),
-                                        snapshot->xmax))
-           return false;
-
-       parentXid = SubTransGetTopmostTransaction(HeapTupleHeaderGetXmin(tuple));
-
-       if (TransactionIdFollowsOrEquals(parentXid, snapshot->xmin))
-       {
-           uint32      i;
-
-           /* no point in checking parentXid against xmax here */
-
-           for (i = 0; i < snapshot->xcnt; i++)
-           {
-               if (TransactionIdEquals(parentXid, snapshot->xip[i]))
-                   return false;
-           }
-       }
-   }
+   if (XidInSnapshot(HeapTupleHeaderGetXmin(tuple), snapshot))
+       return false;           /* treat as still in progress */
 
    if (tuple->t_infomask & HEAP_XMAX_INVALID)  /* xid invalid or aborted */
        return true;
 
    /*
     * OK, the deleting transaction committed too ... but when?
-    *
-    * See notes for the similar tests on tuple xmin, above.
     */
-   if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmax(tuple),
-                                    snapshot->xmin))
-   {
-       TransactionId parentXid;
+   if (XidInSnapshot(HeapTupleHeaderGetXmax(tuple), snapshot))
+       return true;            /* treat as still in progress */
 
-       if (TransactionIdFollowsOrEquals(HeapTupleHeaderGetXmax(tuple),
-                                        snapshot->xmax))
-           return true;
-
-       parentXid = SubTransGetTopmostTransaction(HeapTupleHeaderGetXmax(tuple));
-
-       if (TransactionIdFollowsOrEquals(parentXid, snapshot->xmin))
-       {
-           uint32      i;
-
-           /* no point in checking parentXid against xmax here */
-
-           for (i = 0; i < snapshot->xcnt; i++)
-           {
-               if (TransactionIdEquals(parentXid, snapshot->xip[i]))
-                   return true;
-           }
-       }
-   }
-
-/* This is to be used only for disaster recovery and requires serious analysis. */
-#ifndef MAKE_EXPIRED_TUPLES_VISIBLE
    return false;
-#else
-   return true;
-#endif
 }
 
 
 CopySnapshot(Snapshot snapshot)
 {
    Snapshot    newsnap;
+   Size        subxipoff;
+   Size        size;
 
-   /* We allocate any XID array needed in the same palloc block. */
-   newsnap = (Snapshot) palloc(sizeof(SnapshotData) +
-                               snapshot->xcnt * sizeof(TransactionId));
+   /* We allocate any XID arrays needed in the same palloc block. */
+   size = subxipoff = sizeof(SnapshotData) +
+       snapshot->xcnt * sizeof(TransactionId);
+   if (snapshot->subxcnt > 0)
+       size += snapshot->subxcnt * sizeof(TransactionId);
+
+   newsnap = (Snapshot) palloc(size);
    memcpy(newsnap, snapshot, sizeof(SnapshotData));
+
+   /* setup XID array */
    if (snapshot->xcnt > 0)
    {
        newsnap->xip = (TransactionId *) (newsnap + 1);
    else
        newsnap->xip = NULL;
 
+   /* setup subXID array */
+   if (snapshot->subxcnt > 0)
+   {
+       newsnap->subxip = (TransactionId *) ((char *) newsnap + subxipoff);
+       memcpy(newsnap->subxip, snapshot->subxip,
+              snapshot->subxcnt * sizeof(TransactionId));
+   }
+   else
+       newsnap->subxip = NULL;
+
    return newsnap;
 }
 
    LatestSnapshot = NULL;
    ActiveSnapshot = NULL;      /* just for cleanliness */
 }
+
+/*
+ * XidInSnapshot
+ *     Is the given XID still-in-progress according to the snapshot?
+ *
+ * Note: GetSnapshotData never stores either top xid or subxids of our own
+ * backend into a snapshot, so these xids will not be reported as "running"
+ * by this function.  This is OK for current uses, because we actually only
+ * apply this for known-committed XIDs.
+ */
+static bool
+XidInSnapshot(TransactionId xid, Snapshot snapshot)
+{
+   uint32      i;
+
+   /*
+    * Make a quick range check to eliminate most XIDs without looking at the
+    * xip arrays.  Note that this is OK even if we convert a subxact XID to
+    * its parent below, because a subxact with XID < xmin has surely also got
+    * a parent with XID < xmin, while one with XID >= xmax must belong to a
+    * parent that was not yet committed at the time of this snapshot.
+    */
+
+   /* Any xid < xmin is not in-progress */
+   if (TransactionIdPrecedes(xid, snapshot->xmin))
+       return false;
+   /* Any xid >= xmax is in-progress */
+   if (TransactionIdFollowsOrEquals(xid, snapshot->xmax))
+       return true;
+
+   /*
+    * If the snapshot contains full subxact data, the fastest way to check
+    * things is just to compare the given XID against both subxact XIDs and
+    * top-level XIDs.  If the snapshot overflowed, we have to use pg_subtrans
+    * to convert a subxact XID to its parent XID, but then we need only look
+    * at top-level XIDs not subxacts.
+    */
+   if (snapshot->subxcnt >= 0)
+   {
+       /* full data, so search subxip */
+       int32       j;
+
+       for (j = 0; j < snapshot->subxcnt; j++)
+       {
+           if (TransactionIdEquals(xid, snapshot->subxip[j]))
+               return true;
+       }
+
+       /* not there, fall through to search xip[] */
+   }
+   else
+   {
+       /* overflowed, so convert xid to top-level */
+       xid = SubTransGetTopmostTransaction(xid);
+
+       /*
+        * If xid was indeed a subxact, we might now have an xid < xmin,
+        * so recheck to avoid an array scan.  No point in rechecking xmax.
+        */
+       if (TransactionIdPrecedes(xid, snapshot->xmin))
+           return false;
+   }
+
+   for (i = 0; i < snapshot->xcnt; i++)
+   {
+       if (TransactionIdEquals(xid, snapshot->xip[i]))
+           return true;
+   }
+
+   return false;
+}
 
  * Portions Copyright (c) 1996-2006, PostgreSQL Global Development Group
  * Portions Copyright (c) 1994, Regents of the University of California
  *
- * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.62 2006/07/13 16:49:20 momjian Exp $
+ * $PostgreSQL: pgsql/src/include/utils/tqual.h,v 1.63 2006/09/03 15:59:39 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
    uint32      xcnt;           /* # of xact ids in xip[] */
    TransactionId *xip;         /* array of xact IDs in progress */
    /* note: all ids in xip[] satisfy xmin <= xip[i] < xmax */
+   int32       subxcnt;        /* # of xact ids in subxip[], -1 if overflow */
+   TransactionId *subxip;      /* array of subxact IDs in progress */
+   /*
+    * note: all ids in subxip[] are >= xmin, but we don't bother filtering
+    * out any that are >= xmax
+    */
    CommandId   curcid;         /* in my xact, CID < curcid are visible */
 } SnapshotData;