Toy background worker, implementing flush-behind logic.
authorRobert Haas <[email protected]>
Wed, 10 Apr 2013 03:00:23 +0000 (23:00 -0400)
committerRobert Haas <[email protected]>
Wed, 10 Apr 2013 03:00:23 +0000 (23:00 -0400)
contrib/pg_ringflush/Makefile [new file with mode: 0644]
contrib/pg_ringflush/pg_ringflush.c [new file with mode: 0644]
src/backend/access/heap/hio.c
src/backend/storage/buffer/bufmgr.c
src/include/access/hio.h
src/include/storage/bufmgr.h

diff --git a/contrib/pg_ringflush/Makefile b/contrib/pg_ringflush/Makefile
new file mode 100644 (file)
index 0000000..3f34bc9
--- /dev/null
@@ -0,0 +1,14 @@
+# contrib/worker_spi/Makefile
+
+MODULES = pg_ringflush
+
+ifdef USE_PGXS
+PG_CONFIG = pg_config
+PGXS := $(shell $(PG_CONFIG) --pgxs)
+include $(PGXS)
+else
+subdir = contrib/pg_ringflush
+top_builddir = ../..
+include $(top_builddir)/src/Makefile.global
+include $(top_srcdir)/contrib/contrib-global.mk
+endif
diff --git a/contrib/pg_ringflush/pg_ringflush.c b/contrib/pg_ringflush/pg_ringflush.c
new file mode 100644 (file)
index 0000000..e3c4589
--- /dev/null
@@ -0,0 +1,241 @@
+/*
+ * pg_ringflush.c
+ *
+ * Background worker process to flush ring buffers
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "access/hio.h"
+#include "postmaster/bgworker.h"
+#include "storage/buf.h"
+#include "storage/bufmgr.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+#define REQUEST_QUEUE_SIZE                     8192
+
+typedef struct
+{
+       LWLockId                lock;
+       PGPROC             *flushproc;
+       uint64                  nrequest;
+       uint64                  ndone;
+       uint64                  ndropped;
+       Buffer                  data[REQUEST_QUEUE_SIZE];
+} RingFlushQueue;
+
+/* Hey, look, I'm a loadable module! */
+PG_MODULE_MAGIC;
+
+/* Variable declarations. */
+static shmem_startup_hook_type prev_shmem_startup_hook;
+static ring_drop_hook_type prev_ring_drop_hook;
+static bool                                    got_sigterm;
+static RingFlushQueue     *pg_ringflush_queue;
+static LWLockId                                lwlockid;
+
+/* Function prototypes. */
+extern void            _PG_init(void);
+static void            pg_ringflush_sigusr1(SIGNAL_ARGS);
+
+/*
+ * Entrypoint for background process.
+ */
+static void
+pg_ringflush_main(void *main_arg)
+{
+       Buffer             *current;
+
+       /*
+        * The bgworker code sets SIGUSR1 to be ignored; without the following,
+        * latches don't work.
+        */
+       pqsignal(SIGUSR1, pg_ringflush_sigusr1);
+
+       /* We're now ready to receive signals */
+       BackgroundWorkerUnblockSignals();
+
+       /* Allocate working space for flush requests. */
+       current = palloc(REQUEST_QUEUE_SIZE * sizeof(Buffer));
+
+       /* Advertise availability of flushing. */
+       pg_ringflush_queue->flushproc = MyProc;
+
+       /* Main loop. */
+       while (!got_sigterm)
+       {
+               int             rc;
+               uint64  npending;
+               uint64  nrequest;
+               uint64  i;
+
+               /* Wait until signaled. */
+               rc = WaitLatch(&MyProc->procLatch,
+                                          WL_LATCH_SET | WL_POSTMASTER_DEATH,
+                                          0L);
+               ResetLatch(&MyProc->procLatch);
+
+               /* Die immediately if postmaster exited. */
+               if (rc & WL_POSTMASTER_DEATH)
+                       proc_exit(1);
+
+               /* Lock the queue and extract the data we need. */
+               LWLockAcquire(lwlockid, LW_EXCLUSIVE);
+               nrequest = pg_ringflush_queue->nrequest;
+               npending = nrequest - pg_ringflush_queue->ndone;
+               if (npending > REQUEST_QUEUE_SIZE)
+               {
+                       /* Crap, the queue wrapped around. */
+                       pg_ringflush_queue->ndropped += npending - REQUEST_QUEUE_SIZE;
+                       pg_ringflush_queue->ndone += npending - REQUEST_QUEUE_SIZE;
+                       npending = REQUEST_QUEUE_SIZE;
+               }
+               for (i = 0; i < npending; ++i)
+               {
+                       uint64          offset = (nrequest - i - 1) % REQUEST_QUEUE_SIZE;
+                       current[i] = pg_ringflush_queue->data[offset];
+               }
+               pg_ringflush_queue->ndone += npending;
+               LWLockRelease(lwlockid);
+
+               /* OK, we've got a list.  Flush 'em. */
+               for (i = 0; i < npending && !got_sigterm; ++i)
+                       SyncOneBuffer(current[i], false);
+       }
+
+       /* Advertise unavailability of flushing. */
+       pg_ringflush_queue->flushproc = NULL;
+
+       proc_exit(0);
+}
+
+/*
+ * Standard SIGTERM handler: set a flag and our process latch so that we die,
+ * but make sure not to stomp on errno.
+ */
+static void
+pg_ringflush_sigterm(SIGNAL_ARGS)
+{
+       int             save_errno = errno;
+
+       got_sigterm = true;
+       if (MyProc != NULL)
+               SetLatch(&MyProc->procLatch);
+
+       errno = save_errno;
+}
+
+/*
+ * Standard SIGHUP handler: set our process latch so that we notice the signal,
+ * but make sure not to stomp on errno.
+ */
+static void
+pg_ringflush_sighup(SIGNAL_ARGS)
+{
+       int             save_errno = errno;
+
+       if (MyProc != NULL)
+               SetLatch(&MyProc->procLatch);
+
+       errno = save_errno;
+}
+
+/*
+ * SIGUSR1 handler: latch processing.
+ */
+static void
+pg_ringflush_sigusr1(SIGNAL_ARGS)
+{
+       int                     save_errno = errno;
+
+       latch_sigusr1_handler();
+
+       errno = save_errno;
+}
+
+/*
+ * Ring drop callback.
+ */
+static void
+pg_ringflush_ring_drop_hook(Buffer buffer)
+{
+       uint64          nrequest;
+       PGPROC     *bgproc;
+
+       if (prev_ring_drop_hook != NULL)
+               prev_ring_drop_hook(buffer);
+
+       if (pg_ringflush_queue == NULL)
+               return;
+
+       Assert(lwlockid != 0);
+       if (!LWLockConditionalAcquire(lwlockid, LW_EXCLUSIVE))
+               return;
+
+       nrequest = pg_ringflush_queue->nrequest;
+       pg_ringflush_queue->data[nrequest % REQUEST_QUEUE_SIZE] = buffer;
+       pg_ringflush_queue->nrequest = nrequest + 1;
+       bgproc = pg_ringflush_queue->flushproc;
+
+       LWLockRelease(lwlockid);
+
+       /* Kick him. */
+       if (bgproc != NULL)
+               SetLatch(&bgproc->procLatch);
+}
+
+/*
+ * Shared memory startup callback.
+ */
+static void
+pg_ringflush_shmem_startup(void)
+{
+       bool            found;
+
+       if (prev_shmem_startup_hook)
+               prev_shmem_startup_hook();
+
+       LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+       pg_ringflush_queue = ShmemInitStruct("pg_ringflush",
+                                                                                sizeof(RingFlushQueue), &found);
+       if (!found)
+               pg_ringflush_queue->lock = LWLockAssign();
+       lwlockid = pg_ringflush_queue->lock;
+       LWLockRelease(AddinShmemInitLock);
+}
+
+/*
+ * Module load callback.
+ */
+void
+_PG_init(void)
+{
+       BackgroundWorker        worker;
+
+       /* Register background worker. */
+       worker.bgw_name = "pg_ringflush";
+       worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+       worker.bgw_start_time = BgWorkerStart_ConsistentState;
+       worker.bgw_restart_time = 10;
+       worker.bgw_main = pg_ringflush_main;
+       worker.bgw_main_arg = NULL;
+       worker.bgw_sighup = pg_ringflush_sighup;
+       worker.bgw_sigterm = pg_ringflush_sigterm;
+       RegisterBackgroundWorker(&worker);
+
+       /* Request shared memory space and lwlock. */
+       RequestAddinShmemSpace(sizeof(RingFlushQueue));
+       RequestAddinLWLocks(1);
+
+       /* Set shared-memory startup hook. */
+       prev_shmem_startup_hook = shmem_startup_hook;
+       shmem_startup_hook = pg_ringflush_shmem_startup;
+
+       /* Set ring drop hook. */
+       prev_ring_drop_hook = ring_drop_hook;
+       ring_drop_hook = pg_ringflush_ring_drop_hook;
+}
index 8da26908daff45a40dca113148fe9f3ad3430efc..d5d53d34c756187df6e9efb2c906e78dc434a2de 100644 (file)
@@ -24,6 +24,8 @@
 #include "storage/lmgr.h"
 #include "storage/smgr.h"
 
+ring_drop_hook_type ring_drop_hook;
+
 
 /*
  * RelationPutHeapTuple - place tuple at specified page
@@ -83,6 +85,8 @@ ReadBufferBI(Relation relation, BlockNumber targetBlock,
                }
                /* ... else drop the old buffer */
                ReleaseBuffer(bistate->current_buf);
+               if (ring_drop_hook != NULL)
+                       ring_drop_hook(bistate->current_buf);
                bistate->current_buf = InvalidBuffer;
        }
 
index 1c414281ae50270a2c8ca2572b5af66f96d6549f..c646b8ff1c6f0cbeeb4543d7cc08be39c50b4193 100644 (file)
@@ -95,7 +95,6 @@ static bool PinBuffer(volatile BufferDesc *buf, BufferAccessStrategy strategy);
 static void PinBuffer_Locked(volatile BufferDesc *buf);
 static void UnpinBuffer(volatile BufferDesc *buf, bool fixOwner);
 static void BufferSync(int flags);
-static int     SyncOneBuffer(int buf_id, bool skip_recently_used);
 static void WaitIO(volatile BufferDesc *buf);
 static bool StartBufferIO(volatile BufferDesc *buf, bool forInput);
 static void TerminateBufferIO(volatile BufferDesc *buf, bool clear_dirty,
@@ -1648,7 +1647,7 @@ BgBufferSync(void)
  *
  * Note: caller must have done ResourceOwnerEnlargeBuffers.
  */
-static int
+int
 SyncOneBuffer(int buf_id, bool skip_recently_used)
 {
        volatile BufferDesc *bufHdr = &BufferDescriptors[buf_id];
index 6d61b4226a39ea75066425318f98b4c07adfc560..f30b24c2b855ad2b884fde89b3ee840b7e1b6481 100644 (file)
@@ -19,6 +19,8 @@
 #include "utils/relcache.h"
 #include "storage/buf.h"
 
+typedef void (*ring_drop_hook_type)(Buffer);
+extern ring_drop_hook_type ring_drop_hook;
 
 /*
  * state for bulk inserts --- private to heapam.c and hio.c
index 9be18608426c0315c820b91438bc4f78f941859a..a9531fdee2fe40183a8936ee3008f89bf3399d3c 100644 (file)
@@ -214,6 +214,7 @@ extern bool ConditionalLockBufferForCleanup(Buffer buffer);
 extern bool HoldingBufferPinThatDelaysRecovery(void);
 
 extern void AbortBufferIO(void);
+extern int     SyncOneBuffer(int buf_id, bool skip_recently_used);
 
 extern void BufmgrCommit(void);
 extern bool BgBufferSync(void);