*
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.114 2008/01/03 21:23:15 tgl Exp $
+ *   $PostgreSQL: pgsql/src/backend/commands/analyze.c,v 1.114.2.1 2008/04/03 16:27:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
 #include "access/heapam.h"
 #include "access/transam.h"
 #include "access/tuptoaster.h"
+#include "access/xact.h"
 #include "catalog/index.h"
 #include "catalog/indexing.h"
 #include "catalog/namespace.h"
 #include "pgstat.h"
 #include "postmaster/autovacuum.h"
 #include "storage/proc.h"
+#include "storage/procarray.h"
 #include "utils/acl.h"
 #include "utils/datum.h"
 #include "utils/lsyscache.h"
         * zero-column table.
         */
        if (!vacstmt->vacuum)
-           pgstat_report_analyze(RelationGetRelid(onerel),
-                                 onerel->rd_rel->relisshared,
-                                 0, 0);
-
+           pgstat_report_analyze(onerel, 0, 0);
        goto cleanup;
    }
 
        }
 
        /* report results to the stats collector, too */
-       pgstat_report_analyze(RelationGetRelid(onerel),
-                             onerel->rd_rel->relisshared,
-                             totalrows, totaldeadrows);
+       pgstat_report_analyze(onerel, totalrows, totaldeadrows);
    }
 
    /* We skip to here if there were no analyzable columns */
 acquire_sample_rows(Relation onerel, HeapTuple *rows, int targrows,
                    double *totalrows, double *totaldeadrows)
 {
-   int         numrows = 0;    /* # rows collected */
-   double      liverows = 0;   /* # rows seen */
+   int         numrows = 0;    /* # rows now in reservoir */
+   double      samplerows = 0; /* total # rows collected */
+   double      liverows = 0;   /* # live rows seen */
    double      deadrows = 0;   /* # dead rows seen */
    double      rowstoskip = -1;    /* -1 means not set yet */
    BlockNumber totalblocks;
+   TransactionId OldestXmin;
    BlockSamplerData bs;
    double      rstate;
 
 
    totalblocks = RelationGetNumberOfBlocks(onerel);
 
+   /* Need a cutoff xmin for HeapTupleSatisfiesVacuum */
+   OldestXmin = GetOldestXmin(onerel->rd_rel->relisshared, true);
+
    /* Prepare for sampling block numbers */
    BlockSampler_Init(&bs, totalblocks, targrows);
    /* Prepare for sampling rows */
         * We must maintain a pin on the target page's buffer to ensure that
         * the maxoffset value stays good (else concurrent VACUUM might delete
         * tuples out from under us).  Hence, pin the page until we are done
-        * looking at it.  We don't maintain a lock on the page, so tuples
-        * could get added to it, but we ignore such tuples.
+        * looking at it.  We also choose to hold sharelock on the buffer
+        * throughout --- we could release and re-acquire sharelock for
+        * each tuple, but since we aren't doing much work per tuple, the
+        * extra lock traffic is probably better avoided.
         */
        targbuffer = ReadBufferWithStrategy(onerel, targblock, vac_strategy);
        LockBuffer(targbuffer, BUFFER_LOCK_SHARE);
        targpage = BufferGetPage(targbuffer);
        maxoffset = PageGetMaxOffsetNumber(targpage);
-       LockBuffer(targbuffer, BUFFER_LOCK_UNLOCK);
 
        /* Inner loop over all tuples on the selected page */
        for (targoffset = FirstOffsetNumber; targoffset <= maxoffset; targoffset++)
        {
+           ItemId      itemid;
            HeapTupleData targtuple;
+           bool        sample_it = false;
+
+           itemid = PageGetItemId(targpage, targoffset);
+
+           /*
+            * We ignore unused and redirect line pointers.  DEAD line
+            * pointers should be counted as dead, because we need vacuum
+            * to run to get rid of them.  Note that this rule agrees with
+            * the way that heap_page_prune() counts things.
+            */
+           if (!ItemIdIsNormal(itemid))
+           {
+               if (ItemIdIsDead(itemid))
+                   deadrows += 1;
+               continue;
+           }
 
            ItemPointerSet(&targtuple.t_self, targblock, targoffset);
-           /* We use heap_release_fetch to avoid useless bufmgr traffic */
-           if (heap_release_fetch(onerel, SnapshotNow,
-                                  &targtuple, &targbuffer,
-                                  true, NULL))
+
+           targtuple.t_data = (HeapTupleHeader) PageGetItem(targpage, itemid);
+           targtuple.t_len = ItemIdGetLength(itemid);
+
+           switch (HeapTupleSatisfiesVacuum(targtuple.t_data,
+                                            OldestXmin,
+                                            targbuffer))
+           {
+               case HEAPTUPLE_LIVE:
+                   sample_it = true;
+                   liverows += 1;
+                   break;
+
+               case HEAPTUPLE_DEAD:
+               case HEAPTUPLE_RECENTLY_DEAD:
+                   /* Count dead and recently-dead rows */
+                   deadrows += 1;
+                   break;
+
+               case HEAPTUPLE_INSERT_IN_PROGRESS:
+                   /*
+                    * Insert-in-progress rows are not counted.  We assume
+                    * that when the inserting transaction commits or aborts,
+                    * it will send a stats message to increment the proper
+                    * count.  This works right only if that transaction ends
+                    * after we finish analyzing the table; if things happen
+                    * in the other order, its stats update will be
+                    * overwritten by ours.  However, the error will be
+                    * large only if the other transaction runs long enough
+                    * to insert many tuples, so assuming it will finish
+                    * after us is the safer option.
+                    *
+                    * A special case is that the inserting transaction might
+                    * be our own.  In this case we should count and sample
+                    * the row, to accommodate users who load a table and
+                    * analyze it in one transaction.  (pgstat_report_analyze
+                    * has to adjust the numbers we send to the stats collector
+                    * to make this come out right.)
+                    */
+                   if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmin(targtuple.t_data)))
+                   {
+                       sample_it = true;
+                       liverows += 1;
+                   }
+                   break;
+
+               case HEAPTUPLE_DELETE_IN_PROGRESS:
+                   /*
+                    * We count delete-in-progress rows as still live, using
+                    * the same reasoning given above; but we don't bother to
+                    * include them in the sample.
+                    *
+                    * If the delete was done by our own transaction, however,
+                    * we must count the row as dead to make
+                    * pgstat_report_analyze's stats adjustments come out
+                    * right.  (Note: this works out properly when the row
+                    * was both inserted and deleted in our xact.)
+                    */
+                   if (TransactionIdIsCurrentTransactionId(HeapTupleHeaderGetXmax(targtuple.t_data)))
+                       deadrows += 1;
+                   else
+                       liverows += 1;
+                   break;
+
+               default:
+                   elog(ERROR, "unexpected HeapTupleSatisfiesVacuum result");
+                   break;
+           }
+
+           if (sample_it)
            {
                /*
-                * The first targrows live rows are simply copied into the
+                * The first targrows sample rows are simply copied into the
                 * reservoir. Then we start replacing tuples in the sample
                 * until we reach the end of the relation.  This algorithm is
                 * from Jeff Vitter's paper (see full citation below). It
                    /*
                     * t in Vitter's paper is the number of records already
                     * processed.  If we need to compute a new S value, we
-                    * must use the not-yet-incremented value of liverows as
-                    * t.
+                    * must use the not-yet-incremented value of samplerows
+                    * as t.
                     */
                    if (rowstoskip < 0)
-                       rowstoskip = get_next_S(liverows, targrows, &rstate);
+                       rowstoskip = get_next_S(samplerows, targrows, &rstate);
 
                    if (rowstoskip <= 0)
                    {
                    rowstoskip -= 1;
                }
 
-               liverows += 1;
-           }
-           else
-           {
-               /* Count dead rows, but not empty slots */
-               if (targtuple.t_data != NULL)
-                   deadrows += 1;
+               samplerows += 1;
            }
        }
 
-       /* Now release the pin on the page */
-       ReleaseBuffer(targbuffer);
+       /* Now release the lock and pin on the page */
+       UnlockReleaseBuffer(targbuffer);
    }
 
    /*
 
  *
  * Copyright (c) 2001-2008, PostgreSQL Global Development Group
  *
- * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.169 2008/01/01 19:45:51 momjian Exp $
+ * $PostgreSQL: pgsql/src/backend/postmaster/pgstat.c,v 1.169.2.1 2008/04/03 16:27:32 tgl Exp $
  * ----------
  */
 #include "postgres.h"
  * --------
  */
 void
-pgstat_report_analyze(Oid tableoid, bool shared, PgStat_Counter livetuples,
+pgstat_report_analyze(Relation rel, PgStat_Counter livetuples,
                      PgStat_Counter deadtuples)
 {
    PgStat_MsgAnalyze msg;
    if (pgStatSock < 0 || !pgstat_track_counts)
        return;
 
+   /*
+    * Unlike VACUUM, ANALYZE might be running inside a transaction that
+    * has already inserted and/or deleted rows in the target table.
+    * ANALYZE will have counted such rows as live or dead respectively.
+    * Because we will report our counts of such rows at transaction end,
+    * we should subtract off these counts from what we send to the collector
+    * now, else they'll be double-counted after commit.  (This approach also
+    * ensures that the collector ends up with the right numbers if we abort
+    * instead of committing.)
+    */
+   if (rel->pgstat_info != NULL)
+   {
+       PgStat_TableXactStatus *trans;
+
+       for (trans = rel->pgstat_info->trans; trans; trans = trans->upper)
+       {
+           livetuples -= trans->tuples_inserted - trans->tuples_deleted;
+           deadtuples -= trans->tuples_deleted;
+       }
+       /* count stuff inserted by already-aborted subxacts, too */
+       deadtuples -= rel->pgstat_info->t_counts.t_new_dead_tuples;
+       /* Since ANALYZE's counts are estimates, we could have underflowed */
+       livetuples = Max(livetuples, 0);
+       deadtuples = Max(deadtuples, 0);
+   }
+
    pgstat_setheader(&msg.m_hdr, PGSTAT_MTYPE_ANALYZE);
-   msg.m_databaseid = shared ? InvalidOid : MyDatabaseId;
-   msg.m_tableoid = tableoid;
-   msg.m_autovacuum = IsAutoVacuumWorkerProcess();     /* is this autovacuum? */
+   msg.m_databaseid = rel->rd_rel->relisshared ? InvalidOid : MyDatabaseId;
+   msg.m_tableoid = RelationGetRelid(rel);
+   msg.m_autovacuum = IsAutoVacuumWorkerProcess(); /* is this autovacuum? */
    msg.m_analyzetime = GetCurrentTimestamp();
    msg.m_live_tuples = livetuples;
    msg.m_dead_tuples = deadtuples;