#include "utils/builtins.h"
 #include "utils/memutils.h"
 #include "utils/ps_status.h"
+#include "utils/snapmgr.h"
 #include "utils/timestamp.h"
+#include "utils/tqual.h"
 
 
 /*
 static void asyncQueueReadAllNotifications(void);
 static bool asyncQueueProcessPageEntries(volatile QueuePosition *current,
                             QueuePosition stop,
-                            char *page_buffer);
+                            char *page_buffer,
+                            Snapshot snapshot);
 static void asyncQueueAdvanceTail(void);
 static void ProcessIncomingNotify(void);
 static void NotifyMyFrontEnd(const char *channel,
        }
    }
 
-   /* Queue any pending notifies */
+   /* Queue any pending notifies (must happen after the above) */
    if (pendingNotifies)
    {
        ListCell   *nextNotify;
     * have already committed before we started to LISTEN.
     *
     * Note that we are not yet listening on anything, so we won't deliver any
-    * notification to the frontend.
+    * notification to the frontend.  Also, although our transaction might
+    * have executed NOTIFY, those message(s) aren't queued yet so we can't
+    * see them in the queue.
     *
     * This will also advance the global tail pointer if possible.
     */
    volatile QueuePosition pos;
    QueuePosition oldpos;
    QueuePosition head;
+   Snapshot    snapshot;
    bool        advanceTail;
 
    /* page_buffer must be adequately aligned, so use a union */
        return;
    }
 
+   /* Get snapshot we'll use to decide which xacts are still in progress */
+   snapshot = RegisterSnapshot(GetLatestSnapshot());
+
    /*----------
     * Note that we deliver everything that we see in the queue and that
     * matches our _current_ listening state.
             * while sending the notifications to the frontend.
             */
            reachedStop = asyncQueueProcessPageEntries(&pos, head,
-                                                      page_buffer.buf);
+                                                      page_buffer.buf,
+                                                      snapshot);
        } while (!reachedStop);
    }
    PG_CATCH();
    /* If we were the laziest backend, try to advance the tail pointer */
    if (advanceTail)
        asyncQueueAdvanceTail();
+
+   /* Done with snapshot */
+   UnregisterSnapshot(snapshot);
 }
 
 /*
 static bool
 asyncQueueProcessPageEntries(volatile QueuePosition *current,
                             QueuePosition stop,
-                            char *page_buffer)
+                            char *page_buffer,
+                            Snapshot snapshot)
 {
    bool        reachedStop = false;
    bool        reachedEndOfPage;
        /* Ignore messages destined for other databases */
        if (qe->dboid == MyDatabaseId)
        {
-           if (TransactionIdIsInProgress(qe->xid))
+           if (XidInMVCCSnapshot(qe->xid, snapshot))
            {
                /*
                 * The source transaction is still in progress, so we can't
                 * this advance-then-back-up behavior when dealing with an
                 * uncommitted message.)
                 *
-                * Note that we must test TransactionIdIsInProgress before we
-                * test TransactionIdDidCommit, else we might return a message
-                * from a transaction that is not yet visible to snapshots;
-                * compare the comments at the head of tqual.c.
+                * Note that we must test XidInMVCCSnapshot before we test
+                * TransactionIdDidCommit, else we might return a message from
+                * a transaction that is not yet visible to snapshots; compare
+                * the comments at the head of tqual.c.
+                *
+                * Also, while our own xact won't be listed in the snapshot,
+                * we need not check for TransactionIdIsCurrentTransactionId
+                * because our transaction cannot (yet) have queued any
+                * messages.
                 */
                *current = thisentry;
                reachedStop = true;
 
 SnapshotData SnapshotAnyData = {HeapTupleSatisfiesAny};
 SnapshotData SnapshotToastData = {HeapTupleSatisfiesToast};
 
-/* local functions */
-static bool XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot);
 
 
 /*
  *
  * Note: GetSnapshotData never stores either top xid or subxids of our own
  * backend into a snapshot, so these xids will not be reported as "running"
- * by this function.  This is OK for current uses, because we actually only
- * apply this for known-committed XIDs.
+ * by this function.  This is OK for current uses, because we always check
+ * TransactionIdIsCurrentTransactionId first, except when it's known the
+ * XID could not be ours anyway.
  */
-static bool
+bool
 XidInMVCCSnapshot(TransactionId xid, Snapshot snapshot)
 {
    uint32      i;