</listitem>
      </varlistentry>
 
+     <varlistentry id="guc-log-lock-failure" xreflabel="log_lock_failure">
+      <term><varname>log_lock_failure</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>log_lock_failure</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Controls whether a detailed log message is produced
+        when a lock acquisition fails.  This is useful for analyzing
+        the causes of lock failures.  Currently, only lock failures
+        due to <literal>SELECT NOWAIT</literal> is supported.
+        The default is <literal>off</literal>.  Only superusers and
+        users with the appropriate <literal>SET</literal> privilege
+        can change this setting.
+       </para>
+      </listitem>
+     </varlistentry>
+
+
      <varlistentry id="guc-log-recovery-conflict-waits" xreflabel="log_recovery_conflict_waits">
       <term><varname>log_recovery_conflict_waits</varname> (<type>boolean</type>)
       <indexterm>
 
                            Relation rel, ItemPointer ctid, XLTW_Oper oper,
                            int *remaining);
 static bool ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status,
-                                      uint16 infomask, Relation rel, int *remaining);
+                                      uint16 infomask, Relation rel, int *remaining,
+                                      bool logLockFailure);
 static void index_delete_sort(TM_IndexDeleteOp *delstate);
 static int bottomup_sort_and_shrink(TM_IndexDeleteOp *delstate);
 static XLogRecPtr log_heap_new_cid(Relation relation, HeapTuple tup);
    LockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
 #define UnlockTupleTuplock(rel, tup, mode) \
    UnlockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
-#define ConditionalLockTupleTuplock(rel, tup, mode) \
-   ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock)
+#define ConditionalLockTupleTuplock(rel, tup, mode, log) \
+   ConditionalLockTuple((rel), (tup), tupleLockExtraInfo[mode].hwlock, (log))
 
 #ifdef USE_PREFETCH
 /*
                    case LockWaitSkip:
                        if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
                                                        status, infomask, relation,
-                                                       NULL))
+                                                       NULL, false))
                        {
                            result = TM_WouldBlock;
                            /* recovery code expects to have buffer lock held */
                    case LockWaitError:
                        if (!ConditionalMultiXactIdWait((MultiXactId) xwait,
                                                        status, infomask, relation,
-                                                       NULL))
+                                                       NULL, log_lock_failure))
                            ereport(ERROR,
                                    (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
                                     errmsg("could not obtain lock on row in relation \"%s\"",
                                          XLTW_Lock);
                        break;
                    case LockWaitSkip:
-                       if (!ConditionalXactLockTableWait(xwait))
+                       if (!ConditionalXactLockTableWait(xwait, false))
                        {
                            result = TM_WouldBlock;
                            /* recovery code expects to have buffer lock held */
                        }
                        break;
                    case LockWaitError:
-                       if (!ConditionalXactLockTableWait(xwait))
+                       if (!ConditionalXactLockTableWait(xwait, log_lock_failure))
                            ereport(ERROR,
                                    (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
                                     errmsg("could not obtain lock on row in relation \"%s\"",
            break;
 
        case LockWaitSkip:
-           if (!ConditionalLockTupleTuplock(relation, tid, mode))
+           if (!ConditionalLockTupleTuplock(relation, tid, mode, false))
                return false;
            break;
 
        case LockWaitError:
-           if (!ConditionalLockTupleTuplock(relation, tid, mode))
+           if (!ConditionalLockTupleTuplock(relation, tid, mode, log_lock_failure))
                ereport(ERROR,
                        (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
                         errmsg("could not obtain lock on row in relation \"%s\"",
  * fail if lock is unavailable.  'rel', 'ctid' and 'oper' are used to set up
  * context information for error messages.  'remaining', if not NULL, receives
  * the number of members that are still running, including any (non-aborted)
- * subtransactions of our own transaction.
+ * subtransactions of our own transaction.  'logLockFailure' indicates whether
+ * to log details when a lock acquisition fails with 'nowait' enabled.
  *
  * We do this by sleeping on each member using XactLockTableWait.  Any
  * members that belong to the current backend are *not* waited for, however;
 Do_MultiXactIdWait(MultiXactId multi, MultiXactStatus status,
                   uint16 infomask, bool nowait,
                   Relation rel, ItemPointer ctid, XLTW_Oper oper,
-                  int *remaining)
+                  int *remaining, bool logLockFailure)
 {
    bool        result = true;
    MultiXactMember *members;
             */
            if (nowait)
            {
-               result = ConditionalXactLockTableWait(memxid);
+               result = ConditionalXactLockTableWait(memxid, logLockFailure);
                if (!result)
                    break;
            }
                int *remaining)
 {
    (void) Do_MultiXactIdWait(multi, status, infomask, false,
-                             rel, ctid, oper, remaining);
+                             rel, ctid, oper, remaining, false);
 }
 
 /*
  */
 static bool
 ConditionalMultiXactIdWait(MultiXactId multi, MultiXactStatus status,
-                          uint16 infomask, Relation rel, int *remaining)
+                          uint16 infomask, Relation rel, int *remaining,
+                          bool logLockFailure)
 {
    return Do_MultiXactIdWait(multi, status, infomask, true,
-                             rel, NULL, XLTW_None, remaining);
+                             rel, NULL, XLTW_None, remaining, logLockFailure);
 }
 
 /*
 
                                                  XLTW_FetchUpdated);
                                break;
                            case LockWaitSkip:
-                               if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+                               if (!ConditionalXactLockTableWait(SnapshotDirty.xmax, false))
                                    /* skip instead of waiting */
                                    return TM_WouldBlock;
                                break;
                            case LockWaitError:
-                               if (!ConditionalXactLockTableWait(SnapshotDirty.xmax))
+                               if (!ConditionalXactLockTableWait(SnapshotDirty.xmax, log_lock_failure))
                                    ereport(ERROR,
                                            (errcode(ERRCODE_LOCK_NOT_AVAILABLE),
                                             errmsg("could not obtain lock on row in relation \"%s\"",
 
 
    SetLocktagRelationOid(&tag, relid);
 
-   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+                             false);
 
    /*
     * Now that we have the lock, check for invalidation messages, so that we
 
    SetLocktagRelationOid(&tag, relid);
 
-   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+                             false);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
 
    SET_LOCKTAG_RELATION(tag, relid->dbId, relid->relId);
 
-   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+                             false);
 
    /*
     * Now that we have the lock, check for invalidation messages; see notes
                         relation->rd_lockInfo.lockRelId.dbId,
                         relation->rd_lockInfo.lockRelId.relId);
 
-   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, false, true, &locallock,
+                             false);
 
    /*
     * Now that we have the lock, check for invalidation messages; see notes
                         relation->rd_lockInfo.lockRelId.dbId,
                         relation->rd_lockInfo.lockRelId.relId);
 
-   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+                             false);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
  * Returns true iff the lock was acquired.
  */
 bool
-ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode)
+ConditionalLockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode,
+                    bool logLockFailure)
 {
    LOCKTAG     tag;
 
                      ItemPointerGetBlockNumber(tid),
                      ItemPointerGetOffsetNumber(tid));
 
-   return (LockAcquire(&tag, lockmode, false, true) != LOCKACQUIRE_NOT_AVAIL);
+   return (LockAcquireExtended(&tag, lockmode, false, true, true, NULL,
+                               logLockFailure) != LOCKACQUIRE_NOT_AVAIL);
 }
 
 /*
  * Returns true if the lock was acquired.
  */
 bool
-ConditionalXactLockTableWait(TransactionId xid)
+ConditionalXactLockTableWait(TransactionId xid, bool logLockFailure)
 {
    LOCKTAG     tag;
    bool        first = true;
 
        SET_LOCKTAG_TRANSACTION(tag, xid);
 
-       if (LockAcquire(&tag, ShareLock, false, true) == LOCKACQUIRE_NOT_AVAIL)
+       if (LockAcquireExtended(&tag, ShareLock, false, true, true, NULL,
+                               logLockFailure)
+           == LOCKACQUIRE_NOT_AVAIL)
            return false;
 
        LockRelease(&tag, ShareLock, false);
                       objid,
                       objsubid);
 
-   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+                             false);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
                       objid,
                       objsubid);
 
-   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock);
+   res = LockAcquireExtended(&tag, lockmode, false, true, true, &locallock,
+                             false);
 
    if (res == LOCKACQUIRE_NOT_AVAIL)
        return false;
 
 #include "access/xlogutils.h"
 #include "miscadmin.h"
 #include "pg_trace.h"
+#include "storage/lmgr.h"
 #include "storage/proc.h"
 #include "storage/procarray.h"
 #include "storage/spin.h"
 #include "utils/resowner.h"
 
 
-/* This configuration variable is used to set the lock table size */
-int            max_locks_per_xact; /* set by guc.c */
+/* GUC variables */
+int            max_locks_per_xact; /* used to set the lock table size */
+bool       log_lock_failure = false;
 
 #define NLOCKENTS() \
    mul_size(max_locks_per_xact, add_size(MaxBackends, max_prepared_xacts))
            bool dontWait)
 {
    return LockAcquireExtended(locktag, lockmode, sessionLock, dontWait,
-                              true, NULL);
+                              true, NULL, false);
 }
 
 /*
  *
  * If locallockp isn't NULL, *locallockp receives a pointer to the LOCALLOCK
  * table entry if a lock is successfully acquired, or NULL if not.
+ *
+ * logLockFailure indicates whether to log details when a lock acquisition
+ * fails with dontWait = true.
  */
 LockAcquireResult
 LockAcquireExtended(const LOCKTAG *locktag,
                    bool sessionLock,
                    bool dontWait,
                    bool reportMemoryError,
-                   LOCALLOCK **locallockp)
+                   LOCALLOCK **locallockp,
+                   bool logLockFailure)
 {
    LOCKMETHODID lockmethodid = locktag->locktag_lockmethodid;
    LockMethod  lockMethodTable;
 
        if (dontWait)
        {
+           /*
+            * Log lock holders and waiters as a detail log message if
+            * logLockFailure = true and lock acquisition fails with dontWait
+            * = true
+            */
+           if (logLockFailure)
+           {
+               StringInfoData buf,
+                           lock_waiters_sbuf,
+                           lock_holders_sbuf;
+               const char *modename;
+               int         lockHoldersNum = 0;
+
+               initStringInfo(&buf);
+               initStringInfo(&lock_waiters_sbuf);
+               initStringInfo(&lock_holders_sbuf);
+
+               DescribeLockTag(&buf, &locallock->tag.lock);
+               modename = GetLockmodeName(locallock->tag.lock.locktag_lockmethodid,
+                                          lockmode);
+
+               /* Gather a list of all lock holders and waiters */
+               LWLockAcquire(partitionLock, LW_SHARED);
+               GetLockHoldersAndWaiters(locallock, &lock_holders_sbuf,
+                                        &lock_waiters_sbuf, &lockHoldersNum);
+               LWLockRelease(partitionLock);
+
+               ereport(LOG,
+                       (errmsg("process %d could not obtain %s on %s",
+                               MyProcPid, modename, buf.data),
+                        errdetail_log_plural(
+                                             "Process holding the lock: %s, Wait queue: %s.",
+                                             "Processes holding the lock: %s, Wait queue: %s.",
+                                             lockHoldersNum,
+                                             lock_holders_sbuf.data,
+                                             lock_waiters_sbuf.data)));
+
+               pfree(buf.data);
+               pfree(lock_holders_sbuf.data);
+               pfree(lock_waiters_sbuf.data);
+           }
            if (locallockp)
                *locallockp = NULL;
            return LOCKACQUIRE_NOT_AVAIL;
 
            long        secs;
            int         usecs;
            long        msecs;
-           dlist_iter  proc_iter;
-           PROCLOCK   *curproclock;
-           bool        first_holder = true,
-                       first_waiter = true;
            int         lockHoldersNum = 0;
 
            initStringInfo(&buf);
            msecs = secs * 1000 + usecs / 1000;
            usecs = usecs % 1000;
 
-           /*
-            * we loop over the lock's procLocks to gather a list of all
-            * holders and waiters. Thus we will be able to provide more
-            * detailed information for lock debugging purposes.
-            *
-            * lock->procLocks contains all processes which hold or wait for
-            * this lock.
-            */
-
+           /* Gather a list of all lock holders and waiters */
            LWLockAcquire(partitionLock, LW_SHARED);
-
-           dlist_foreach(proc_iter, &lock->procLocks)
-           {
-               curproclock =
-                   dlist_container(PROCLOCK, lockLink, proc_iter.cur);
-
-               /*
-                * we are a waiter if myProc->waitProcLock == curproclock; we
-                * are a holder if it is NULL or something different
-                */
-               if (curproclock->tag.myProc->waitProcLock == curproclock)
-               {
-                   if (first_waiter)
-                   {
-                       appendStringInfo(&lock_waiters_sbuf, "%d",
-                                        curproclock->tag.myProc->pid);
-                       first_waiter = false;
-                   }
-                   else
-                       appendStringInfo(&lock_waiters_sbuf, ", %d",
-                                        curproclock->tag.myProc->pid);
-               }
-               else
-               {
-                   if (first_holder)
-                   {
-                       appendStringInfo(&lock_holders_sbuf, "%d",
-                                        curproclock->tag.myProc->pid);
-                       first_holder = false;
-                   }
-                   else
-                       appendStringInfo(&lock_holders_sbuf, ", %d",
-                                        curproclock->tag.myProc->pid);
-
-                   lockHoldersNum++;
-               }
-           }
-
+           GetLockHoldersAndWaiters(locallock, &lock_holders_sbuf,
+                                    &lock_waiters_sbuf, &lockHoldersNum);
            LWLockRelease(partitionLock);
 
            if (deadlock_state == DS_SOFT_DEADLOCK)
    errno = save_errno;
 }
 
+/*
+ * GetLockHoldersAndWaiters - get lock holders and waiters for a lock
+ *
+ * Fill lock_holders_sbuf and lock_waiters_sbuf with the PIDs of processes holding
+ * and waiting for the lock, and set lockHoldersNum to the number of lock holders.
+ *
+ * The lock table's partition lock must be held on entry and remains held on exit.
+ */
+void
+GetLockHoldersAndWaiters(LOCALLOCK *locallock, StringInfo lock_holders_sbuf,
+                        StringInfo lock_waiters_sbuf, int *lockHoldersNum)
+{
+   dlist_iter  proc_iter;
+   PROCLOCK   *curproclock;
+   LOCK       *lock = locallock->lock;
+   bool        first_holder = true,
+               first_waiter = true;
+
+#ifdef USE_ASSERT_CHECKING
+   {
+       uint32      hashcode = locallock->hashcode;
+       LWLock     *partitionLock = LockHashPartitionLock(hashcode);
+
+       Assert(LWLockHeldByMe(partitionLock));
+   }
+#endif
+
+   *lockHoldersNum = 0;
+
+   /*
+    * Loop over the lock's procLocks to gather a list of all holders and
+    * waiters. Thus we will be able to provide more detailed information for
+    * lock debugging purposes.
+    *
+    * lock->procLocks contains all processes which hold or wait for this
+    * lock.
+    */
+   dlist_foreach(proc_iter, &lock->procLocks)
+   {
+       curproclock =
+           dlist_container(PROCLOCK, lockLink, proc_iter.cur);
+
+       /*
+        * We are a waiter if myProc->waitProcLock == curproclock; we are a
+        * holder if it is NULL or something different.
+        */
+       if (curproclock->tag.myProc->waitProcLock == curproclock)
+       {
+           if (first_waiter)
+           {
+               appendStringInfo(lock_waiters_sbuf, "%d",
+                                curproclock->tag.myProc->pid);
+               first_waiter = false;
+           }
+           else
+               appendStringInfo(lock_waiters_sbuf, ", %d",
+                                curproclock->tag.myProc->pid);
+       }
+       else
+       {
+           if (first_holder)
+           {
+               appendStringInfo(lock_holders_sbuf, "%d",
+                                curproclock->tag.myProc->pid);
+               first_holder = false;
+           }
+           else
+               appendStringInfo(lock_holders_sbuf, ", %d",
+                                curproclock->tag.myProc->pid);
+
+           (*lockHoldersNum)++;
+       }
+   }
+}
+
 /*
  * ProcWaitForSignal - wait for a signal from another backend.
  *
 
        false,
        NULL, NULL, NULL
    },
+   {
+       {"log_lock_failure", PGC_SUSET, LOGGING_WHAT,
+           gettext_noop("Logs lock failures."),
+           NULL
+       },
+       &log_lock_failure,
+       false,
+       NULL, NULL, NULL
+   },
    {
        {"log_recovery_conflict_waits", PGC_SIGHUP, LOGGING_WHAT,
            gettext_noop("Logs standby recovery conflict waits."),
 
                    #   %% = '%'
                    # e.g. '<%u%%%d> '
 #log_lock_waits = off          # log lock waits >= deadlock_timeout
+#log_lock_failure = off        # log lock failures
 #log_recovery_conflict_waits = off # log standby recovery conflict waits
                    # >= deadlock_timeout
 #log_parameter_max_length = -1     # when logging statements, limit logged
 
 /* Lock a tuple (see heap_lock_tuple before assuming you understand this) */
 extern void LockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode);
 extern bool ConditionalLockTuple(Relation relation, ItemPointer tid,
-                                LOCKMODE lockmode);
+                                LOCKMODE lockmode, bool logLockFailure);
 extern void UnlockTuple(Relation relation, ItemPointer tid, LOCKMODE lockmode);
 
 /* Lock an XID (used to wait for a transaction to finish) */
 extern void XactLockTableDelete(TransactionId xid);
 extern void XactLockTableWait(TransactionId xid, Relation rel,
                              ItemPointer ctid, XLTW_Oper oper);
-extern bool ConditionalXactLockTableWait(TransactionId xid);
+extern bool ConditionalXactLockTableWait(TransactionId xid,
+                                        bool logLockFailure);
 
 /* Lock VXIDs, specified by conflicting locktags */
 extern void WaitForLockers(LOCKTAG heaplocktag, LOCKMODE lockmode, bool progress);
 
 
 /* GUC variables */
 extern PGDLLIMPORT int max_locks_per_xact;
+extern PGDLLIMPORT bool log_lock_failure;
 
 #ifdef LOCK_DEBUG
 extern PGDLLIMPORT int Trace_lock_oidmin;
                                             bool sessionLock,
                                             bool dontWait,
                                             bool reportMemoryError,
-                                            LOCALLOCK **locallockp);
+                                            LOCALLOCK **locallockp,
+                                            bool logLockFailure);
 extern void AbortStrongLockAcquire(void);
 extern void MarkLockClear(LOCALLOCK *locallock);
 extern bool LockRelease(const LOCKTAG *locktag,
 
 extern void ProcLockWakeup(LockMethod lockMethodTable, LOCK *lock);
 extern void CheckDeadLockAlert(void);
 extern void LockErrorCleanup(void);
+extern void GetLockHoldersAndWaiters(LOCALLOCK *locallock,
+                                    StringInfo lock_holders_sbuf,
+                                    StringInfo lock_waiters_sbuf,
+                                    int *lockHoldersNum);
 
 extern void ProcWaitForSignal(uint32 wait_event_info);
 extern void ProcSendSignal(ProcNumber procNumber);