#include "commands/vacuum.h"
 #include "miscadmin.h"
 #include "pgstat.h"
+#include "portability/instr_time.h"
 #include "postmaster/autovacuum.h"
 #include "storage/bufmgr.h"
 #include "storage/freespace.h"
 #define REL_TRUNCATE_MINIMUM   1000
 #define REL_TRUNCATE_FRACTION  16
 
+/*
+ * Timing parameters for truncate locking heuristics.
+ *
+ * These were not exposed as user tunable GUC values because it didn't seem
+ * that the potential for improvement was great enough to merit the cost of
+ * supporting them.
+ */
+#define AUTOVACUUM_TRUNCATE_LOCK_CHECK_INTERVAL        20  /* ms */
+#define AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL     50  /* ms */
+#define AUTOVACUUM_TRUNCATE_LOCK_TIMEOUT           5000        /* ms */
+
 /*
  * Guesstimation of number of dead tuples per page.  This is used to
  * provide an upper limit to memory allocated when vacuuming small
    ItemPointer dead_tuples;    /* array of ItemPointerData */
    int         num_index_scans;
    TransactionId latestRemovedXid;
+   bool        lock_waiter_detected;
 } LVRelStats;
 
 
    vacrelstats->old_rel_pages = onerel->rd_rel->relpages;
    vacrelstats->old_rel_tuples = onerel->rd_rel->reltuples;
    vacrelstats->num_index_scans = 0;
+   vacrelstats->pages_removed = 0;
+   vacrelstats->lock_waiter_detected = false;
 
    /* Open all indexes of the relation */
    vac_open_indexes(onerel, RowExclusiveLock, &nindexes, &Irel);
                        vacrelstats->hasindex,
                        new_frozen_xid);
 
-   /* report results to the stats collector, too */
-   pgstat_report_vacuum(RelationGetRelid(onerel),
-                        onerel->rd_rel->relisshared,
-                        new_rel_tuples);
+   /*
+    * Report results to the stats collector, too. An early terminated
+    * lazy_truncate_heap attempt suppresses the message and also cancels the
+    * execution of ANALYZE, if that was ordered.
+    */
+   if (!vacrelstats->lock_waiter_detected)
+       pgstat_report_vacuum(RelationGetRelid(onerel),
+                            onerel->rd_rel->relisshared,
+                            new_rel_tuples);
+   else
+       vacstmt->options &= ~VACOPT_ANALYZE;
 
    /* and log the action if appropriate */
    if (IsAutoVacuumWorkerProcess() && Log_autovacuum_min_duration >= 0)
    BlockNumber old_rel_pages = vacrelstats->rel_pages;
    BlockNumber new_rel_pages;
    PGRUsage    ru0;
+   int         lock_retry;
 
    pg_rusage_init(&ru0);
 
    /*
-    * We need full exclusive lock on the relation in order to do truncation.
-    * If we can't get it, give up rather than waiting --- we don't want to
-    * block other backends, and we don't want to deadlock (which is quite
-    * possible considering we already hold a lower-grade lock).
-    */
-   if (!ConditionalLockRelation(onerel, AccessExclusiveLock))
-       return;
-
-   /*
-    * Now that we have exclusive lock, look to see if the rel has grown
-    * whilst we were vacuuming with non-exclusive lock.  If so, give up; the
-    * newly added pages presumably contain non-deletable tuples.
+    * Loop until no more truncating can be done.
     */
-   new_rel_pages = RelationGetNumberOfBlocks(onerel);
-   if (new_rel_pages != old_rel_pages)
+   do
    {
        /*
-        * Note: we intentionally don't update vacrelstats->rel_pages with the
-        * new rel size here.  If we did, it would amount to assuming that the
-        * new pages are empty, which is unlikely.  Leaving the numbers alone
-        * amounts to assuming that the new pages have the same tuple density
-        * as existing ones, which is less unlikely.
+        * We need full exclusive lock on the relation in order to do
+        * truncation. If we can't get it, give up rather than waiting --- we
+        * don't want to block other backends, and we don't want to deadlock
+        * (which is quite possible considering we already hold a lower-grade
+        * lock).
         */
-       UnlockRelation(onerel, AccessExclusiveLock);
-       return;
-   }
+       vacrelstats->lock_waiter_detected = false;
+       lock_retry = 0;
+       while (true)
+       {
+           if (ConditionalLockRelation(onerel, AccessExclusiveLock))
+               break;
 
-   /*
-    * Scan backwards from the end to verify that the end pages actually
-    * contain no tuples.  This is *necessary*, not optional, because other
-    * backends could have added tuples to these pages whilst we were
-    * vacuuming.
-    */
-   new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
+           /*
+            * Check for interrupts while trying to (re-)acquire the exclusive
+            * lock.
+            */
+           CHECK_FOR_INTERRUPTS();
 
-   if (new_rel_pages >= old_rel_pages)
-   {
-       /* can't do anything after all */
-       UnlockRelation(onerel, AccessExclusiveLock);
-       return;
-   }
+           if (++lock_retry > (AUTOVACUUM_TRUNCATE_LOCK_TIMEOUT /
+                               AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL))
+           {
+               /*
+                * We failed to establish the lock in the specified number of
+                * retries. This means we give up truncating. Suppress the
+                * ANALYZE step. Doing an ANALYZE at this point will reset the
+                * dead_tuple_count in the stats collector, so we will not get
+                * called by the autovacuum launcher again to do the truncate.
+                */
+               vacrelstats->lock_waiter_detected = true;
+               ereport(LOG,
+                       (errmsg("automatic vacuum of table \"%s.%s.%s\": "
+                               "cannot (re)acquire exclusive "
+                               "lock for truncate scan",
+                               get_database_name(MyDatabaseId),
+                           get_namespace_name(RelationGetNamespace(onerel)),
+                               RelationGetRelationName(onerel))));
+               return;
+           }
 
-   /*
-    * Okay to truncate.
-    */
-   RelationTruncate(onerel, new_rel_pages);
+           pg_usleep(AUTOVACUUM_TRUNCATE_LOCK_WAIT_INTERVAL);
+       }
 
-   /*
-    * We can release the exclusive lock as soon as we have truncated.  Other
-    * backends can't safely access the relation until they have processed the
-    * smgr invalidation that smgrtruncate sent out ... but that should happen
-    * as part of standard invalidation processing once they acquire lock on
-    * the relation.
-    */
-   UnlockRelation(onerel, AccessExclusiveLock);
+       /*
+        * Now that we have exclusive lock, look to see if the rel has grown
+        * whilst we were vacuuming with non-exclusive lock.  If so, give up;
+        * the newly added pages presumably contain non-deletable tuples.
+        */
+       new_rel_pages = RelationGetNumberOfBlocks(onerel);
+       if (new_rel_pages != old_rel_pages)
+       {
+           /*
+            * Note: we intentionally don't update vacrelstats->rel_pages with
+            * the new rel size here.  If we did, it would amount to assuming
+            * that the new pages are empty, which is unlikely. Leaving the
+            * numbers alone amounts to assuming that the new pages have the
+            * same tuple density as existing ones, which is less unlikely.
+            */
+           UnlockRelation(onerel, AccessExclusiveLock);
+           return;
+       }
 
-   /*
-    * Update statistics.  Here, it *is* correct to adjust rel_pages without
-    * also touching reltuples, since the tuple count wasn't changed by the
-    * truncation.
-    */
-   vacrelstats->rel_pages = new_rel_pages;
-   vacrelstats->pages_removed = old_rel_pages - new_rel_pages;
+       /*
+        * Scan backwards from the end to verify that the end pages actually
+        * contain no tuples.  This is *necessary*, not optional, because
+        * other backends could have added tuples to these pages whilst we
+        * were vacuuming.
+        */
+       new_rel_pages = count_nondeletable_pages(onerel, vacrelstats);
 
-   ereport(elevel,
-           (errmsg("\"%s\": truncated %u to %u pages",
-                   RelationGetRelationName(onerel),
-                   old_rel_pages, new_rel_pages),
-            errdetail("%s.",
-                      pg_rusage_show(&ru0))));
+       if (new_rel_pages >= old_rel_pages)
+       {
+           /* can't do anything after all */
+           UnlockRelation(onerel, AccessExclusiveLock);
+           return;
+       }
+
+       /*
+        * Okay to truncate.
+        */
+       RelationTruncate(onerel, new_rel_pages);
+
+       /*
+        * We can release the exclusive lock as soon as we have truncated.
+        * Other backends can't safely access the relation until they have
+        * processed the smgr invalidation that smgrtruncate sent out ... but
+        * that should happen as part of standard invalidation processing once
+        * they acquire lock on the relation.
+        */
+       UnlockRelation(onerel, AccessExclusiveLock);
+
+       /*
+        * Update statistics.  Here, it *is* correct to adjust rel_pages
+        * without also touching reltuples, since the tuple count wasn't
+        * changed by the truncation.
+        */
+       vacrelstats->pages_removed += old_rel_pages - new_rel_pages;
+       vacrelstats->rel_pages = new_rel_pages;
+
+       ereport(elevel,
+               (errmsg("\"%s\": truncated %u to %u pages",
+                       RelationGetRelationName(onerel),
+                       old_rel_pages, new_rel_pages),
+                errdetail("%s.",
+                          pg_rusage_show(&ru0))));
+       old_rel_pages = new_rel_pages;
+   } while (new_rel_pages > vacrelstats->nonempty_pages &&
+            vacrelstats->lock_waiter_detected);
 }
 
 /*
 count_nondeletable_pages(Relation onerel, LVRelStats *vacrelstats)
 {
    BlockNumber blkno;
+   instr_time  starttime;
+   instr_time  currenttime;
+   instr_time  elapsed;
+
+   /* Initialize the starttime if we check for conflicting lock requests */
+   INSTR_TIME_SET_CURRENT(starttime);
 
    /* Strange coding of loop control is needed because blkno is unsigned */
    blkno = vacrelstats->rel_pages;
                    maxoff;
        bool        hastup;
 
+       /*
+        * Check if another process requests a lock on our relation. We are
+        * holding an AccessExclusiveLock here, so they will be waiting. We
+        * only do this in autovacuum_truncate_lock_check millisecond
+        * intervals, and we only check if that interval has elapsed once
+        * every 32 blocks to keep the number of system calls and actual
+        * shared lock table lookups to a minimum.
+        */
+       if ((blkno % 32) == 0)
+       {
+           INSTR_TIME_SET_CURRENT(currenttime);
+           elapsed = currenttime;
+           INSTR_TIME_SUBTRACT(elapsed, starttime);
+           if ((INSTR_TIME_GET_MICROSEC(elapsed) / 1000)
+               >= AUTOVACUUM_TRUNCATE_LOCK_CHECK_INTERVAL)
+           {
+               if (LockHasWaitersRelation(onerel, AccessExclusiveLock))
+               {
+                   ereport(elevel,
+                           (errmsg("\"%s\": suspending truncate "
+                                   "due to conflicting lock request",
+                                   RelationGetRelationName(onerel))));
+
+                   vacrelstats->lock_waiter_detected = true;
+                   return blkno;
+               }
+               starttime = currenttime;
+           }
+       }
+
        /*
         * We don't insert a vacuum delay point here, because we have an
         * exclusive lock on the table which we want to hold for as short a
 
    return lockhash;
 }
 
+/*
+ * LockHasWaiters -- look up 'locktag' and check if releasing this
+ *     lock would wake up other processes waiting for it.
+ */
+bool
+LockHasWaiters(const LOCKTAG *locktag, LOCKMODE lockmode, bool sessionLock)
+{
+   LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
+   LockMethod  lockMethodTable;
+   LOCALLOCKTAG localtag;
+   LOCALLOCK  *locallock;
+   LOCK       *lock;
+   PROCLOCK   *proclock;
+   LWLockId    partitionLock;
+   bool        hasWaiters = false;
+
+   if (lockmethodid <= 0 || lockmethodid >= lengthof(LockMethods))
+       elog(ERROR, "unrecognized lock method: %d", lockmethodid);
+   lockMethodTable = LockMethods[lockmethodid];
+   if (lockmode <= 0 || lockmode > lockMethodTable->numLockModes)
+       elog(ERROR, "unrecognized lock mode: %d", lockmode);
+
+#ifdef LOCK_DEBUG
+   if (LOCK_DEBUG_ENABLED(locktag))
+       elog(LOG, "LockHasWaiters: lock [%u,%u] %s",
+            locktag->locktag_field1, locktag->locktag_field2,
+            lockMethodTable->lockModeNames[lockmode]);
+#endif
+
+   /*
+    * Find the LOCALLOCK entry for this lock and lockmode
+    */
+   MemSet(&localtag, 0, sizeof(localtag));     /* must clear padding */
+   localtag.lock = *locktag;
+   localtag.mode = lockmode;
+
+   locallock = (LOCALLOCK *) hash_search(LockMethodLocalHash,
+                                         (void *) &localtag,
+                                         HASH_FIND, NULL);
+
+   /*
+    * let the caller print its own error message, too. Do not ereport(ERROR).
+    */
+   if (!locallock || locallock->nLocks <= 0)
+   {
+       elog(WARNING, "you don't own a lock of type %s",
+            lockMethodTable->lockModeNames[lockmode]);
+       return false;
+   }
+
+   /*
+    * Check the shared lock table.
+    */
+   partitionLock = LockHashPartitionLock(locallock->hashcode);
+
+   LWLockAcquire(partitionLock, LW_SHARED);
+
+   /*
+    * We don't need to re-find the lock or proclock, since we kept their
+    * addresses in the locallock table, and they couldn't have been removed
+    * while we were holding a lock on them.
+    */
+   lock = locallock->lock;
+   LOCK_PRINT("LockHasWaiters: found", lock, lockmode);
+   proclock = locallock->proclock;
+   PROCLOCK_PRINT("LockHasWaiters: found", proclock);
+
+   /*
+    * Double-check that we are actually holding a lock of the type we want to
+    * release.
+    */
+   if (!(proclock->holdMask & LOCKBIT_ON(lockmode)))
+   {
+       PROCLOCK_PRINT("LockHasWaiters: WRONGTYPE", proclock);
+       LWLockRelease(partitionLock);
+       elog(WARNING, "you don't own a lock of type %s",
+            lockMethodTable->lockModeNames[lockmode]);
+       RemoveLocalLock(locallock);
+       return false;
+   }
+
+   /*
+    * Do the checking.
+    */
+   if ((lockMethodTable->conflictTab[lockmode] & lock->waitMask) != 0)
+       hasWaiters = true;
+
+   LWLockRelease(partitionLock);
+
+   return hasWaiters;
+}
+
 
 /*
  * LockAcquire -- Check for lock conflicts, sleep if conflict found,