--- /dev/null
+/*
+ * pg_ringflush.c
+ *
+ * Background worker process to flush ring buffers
+ */
+
+#include "postgres.h"
+
+#include "fmgr.h"
+#include "access/hio.h"
+#include "postmaster/bgworker.h"
+#include "storage/buf.h"
+#include "storage/bufmgr.h"
+#include "storage/ipc.h"
+#include "storage/lwlock.h"
+#include "storage/proc.h"
+#include "storage/shmem.h"
+
+#define REQUEST_QUEUE_SIZE 8192
+
+typedef struct
+{
+ LWLockId lock;
+ PGPROC *flushproc;
+ uint64 nrequest;
+ uint64 ndone;
+ uint64 ndropped;
+ Buffer data[REQUEST_QUEUE_SIZE];
+} RingFlushQueue;
+
+/* Hey, look, I'm a loadable module! */
+PG_MODULE_MAGIC;
+
+/* Variable declarations. */
+static shmem_startup_hook_type prev_shmem_startup_hook;
+static ring_drop_hook_type prev_ring_drop_hook;
+static bool got_sigterm;
+static RingFlushQueue *pg_ringflush_queue;
+static LWLockId lwlockid;
+
+/* Function prototypes. */
+extern void _PG_init(void);
+static void pg_ringflush_sigusr1(SIGNAL_ARGS);
+
+/*
+ * Entrypoint for background process.
+ */
+static void
+pg_ringflush_main(void *main_arg)
+{
+ Buffer *current;
+
+ /*
+ * The bgworker code sets SIGUSR1 to be ignored; without the following,
+ * latches don't work.
+ */
+ pqsignal(SIGUSR1, pg_ringflush_sigusr1);
+
+ /* We're now ready to receive signals */
+ BackgroundWorkerUnblockSignals();
+
+ /* Allocate working space for flush requests. */
+ current = palloc(REQUEST_QUEUE_SIZE * sizeof(Buffer));
+
+ /* Advertise availability of flushing. */
+ pg_ringflush_queue->flushproc = MyProc;
+
+ /* Main loop. */
+ while (!got_sigterm)
+ {
+ int rc;
+ uint64 npending;
+ uint64 nrequest;
+ uint64 i;
+
+ /* Wait until signaled. */
+ rc = WaitLatch(&MyProc->procLatch,
+ WL_LATCH_SET | WL_POSTMASTER_DEATH,
+ 0L);
+ ResetLatch(&MyProc->procLatch);
+
+ /* Die immediately if postmaster exited. */
+ if (rc & WL_POSTMASTER_DEATH)
+ proc_exit(1);
+
+ /* Lock the queue and extract the data we need. */
+ LWLockAcquire(lwlockid, LW_EXCLUSIVE);
+ nrequest = pg_ringflush_queue->nrequest;
+ npending = nrequest - pg_ringflush_queue->ndone;
+ if (npending > REQUEST_QUEUE_SIZE)
+ {
+ /* Crap, the queue wrapped around. */
+ pg_ringflush_queue->ndropped += npending - REQUEST_QUEUE_SIZE;
+ pg_ringflush_queue->ndone += npending - REQUEST_QUEUE_SIZE;
+ npending = REQUEST_QUEUE_SIZE;
+ }
+ for (i = 0; i < npending; ++i)
+ {
+ uint64 offset = (nrequest - i - 1) % REQUEST_QUEUE_SIZE;
+ current[i] = pg_ringflush_queue->data[offset];
+ }
+ pg_ringflush_queue->ndone += npending;
+ LWLockRelease(lwlockid);
+
+ /* OK, we've got a list. Flush 'em. */
+ for (i = 0; i < npending && !got_sigterm; ++i)
+ SyncOneBuffer(current[i], false);
+ }
+
+ /* Advertise unavailability of flushing. */
+ pg_ringflush_queue->flushproc = NULL;
+
+ proc_exit(0);
+}
+
+/*
+ * Standard SIGTERM handler: set a flag and our process latch so that we die,
+ * but make sure not to stomp on errno.
+ */
+static void
+pg_ringflush_sigterm(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ got_sigterm = true;
+ if (MyProc != NULL)
+ SetLatch(&MyProc->procLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * Standard SIGHUP handler: set our process latch so that we notice the signal,
+ * but make sure not to stomp on errno.
+ */
+static void
+pg_ringflush_sighup(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ if (MyProc != NULL)
+ SetLatch(&MyProc->procLatch);
+
+ errno = save_errno;
+}
+
+/*
+ * SIGUSR1 handler: latch processing.
+ */
+static void
+pg_ringflush_sigusr1(SIGNAL_ARGS)
+{
+ int save_errno = errno;
+
+ latch_sigusr1_handler();
+
+ errno = save_errno;
+}
+
+/*
+ * Ring drop callback.
+ */
+static void
+pg_ringflush_ring_drop_hook(Buffer buffer)
+{
+ uint64 nrequest;
+ PGPROC *bgproc;
+
+ if (prev_ring_drop_hook != NULL)
+ prev_ring_drop_hook(buffer);
+
+ if (pg_ringflush_queue == NULL)
+ return;
+
+ Assert(lwlockid != 0);
+ if (!LWLockConditionalAcquire(lwlockid, LW_EXCLUSIVE))
+ return;
+
+ nrequest = pg_ringflush_queue->nrequest;
+ pg_ringflush_queue->data[nrequest % REQUEST_QUEUE_SIZE] = buffer;
+ pg_ringflush_queue->nrequest = nrequest + 1;
+ bgproc = pg_ringflush_queue->flushproc;
+
+ LWLockRelease(lwlockid);
+
+ /* Kick him. */
+ if (bgproc != NULL)
+ SetLatch(&bgproc->procLatch);
+}
+
+/*
+ * Shared memory startup callback.
+ */
+static void
+pg_ringflush_shmem_startup(void)
+{
+ bool found;
+
+ if (prev_shmem_startup_hook)
+ prev_shmem_startup_hook();
+
+ LWLockAcquire(AddinShmemInitLock, LW_EXCLUSIVE);
+ pg_ringflush_queue = ShmemInitStruct("pg_ringflush",
+ sizeof(RingFlushQueue), &found);
+ if (!found)
+ pg_ringflush_queue->lock = LWLockAssign();
+ lwlockid = pg_ringflush_queue->lock;
+ LWLockRelease(AddinShmemInitLock);
+}
+
+/*
+ * Module load callback.
+ */
+void
+_PG_init(void)
+{
+ BackgroundWorker worker;
+
+ /* Register background worker. */
+ worker.bgw_name = "pg_ringflush";
+ worker.bgw_flags = BGWORKER_SHMEM_ACCESS;
+ worker.bgw_start_time = BgWorkerStart_ConsistentState;
+ worker.bgw_restart_time = 10;
+ worker.bgw_main = pg_ringflush_main;
+ worker.bgw_main_arg = NULL;
+ worker.bgw_sighup = pg_ringflush_sighup;
+ worker.bgw_sigterm = pg_ringflush_sigterm;
+ RegisterBackgroundWorker(&worker);
+
+ /* Request shared memory space and lwlock. */
+ RequestAddinShmemSpace(sizeof(RingFlushQueue));
+ RequestAddinLWLocks(1);
+
+ /* Set shared-memory startup hook. */
+ prev_shmem_startup_hook = shmem_startup_hook;
+ shmem_startup_hook = pg_ringflush_shmem_startup;
+
+ /* Set ring drop hook. */
+ prev_ring_drop_hook = ring_drop_hook;
+ ring_drop_hook = pg_ringflush_ring_drop_hook;
+}