Fix invalid memory access during the shutdown of the parallel apply worker.
authorAmit Kapila <[email protected]>
Tue, 9 May 2023 03:58:06 +0000 (09:28 +0530)
committerAmit Kapila <[email protected]>
Tue, 9 May 2023 03:58:06 +0000 (09:28 +0530)
The callback function pa_shutdown() accesses MyLogicalRepWorker which may
not be initialized if there is an error during the initialization of the
parallel apply worker. The other problem is that by the time it is invoked
even after the initialization of the worker, the MyLogicalRepWorker will
be reset by another callback logicalrep_worker_onexit. So, it won't have
the required information.

To fix this, register the shutdown callback after we are attached to the
worker slot.

After this fix, we observed another issue which is that sometimes the
leader apply worker tries to receive the message from the error queue that
might already be detached by the parallel apply worker leading to an
error. To prevent such an error, we ensure that the leader apply worker
detaches from the parallel apply worker's error queue before stopping it.

Reported-by: Sawada Masahiko
Author: Hou Zhijie
Reviewed-by: Sawada Masahiko, Amit Kapila
Discussion: https://postgr.es/m/CAD21AoDo+yUwNq6nTrvE2h9bB2vZfcag=jxWc7QxuWCmkDAqcA@mail.gmail.com

src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/launcher.c
src/include/replication/worker_internal.h

index ee7a18137fcee16d606d189ec8aa7a4b8babd76b..82c1ddcdcbf83143d24ebaa397f661cc642a4eb1 100644 (file)
@@ -577,16 +577,7 @@ pa_free_worker(ParallelApplyWorkerInfo *winfo)
                list_length(ParallelApplyWorkerPool) >
                (max_parallel_apply_workers_per_subscription / 2))
        {
-               int                     slot_no;
-               uint16          generation;
-
-               SpinLockAcquire(&winfo->shared->mutex);
-               generation = winfo->shared->logicalrep_worker_generation;
-               slot_no = winfo->shared->logicalrep_worker_slot_no;
-               SpinLockRelease(&winfo->shared->mutex);
-
-               logicalrep_pa_worker_stop(slot_no, generation);
-
+               logicalrep_pa_worker_stop(winfo);
                pa_free_worker_info(winfo);
 
                return;
@@ -636,8 +627,11 @@ pa_detach_all_error_mq(void)
        {
                ParallelApplyWorkerInfo *winfo = (ParallelApplyWorkerInfo *) lfirst(lc);
 
-               shm_mq_detach(winfo->error_mq_handle);
-               winfo->error_mq_handle = NULL;
+               if (winfo->error_mq_handle)
+               {
+                       shm_mq_detach(winfo->error_mq_handle);
+                       winfo->error_mq_handle = NULL;
+               }
        }
 }
 
@@ -845,6 +839,9 @@ LogicalParallelApplyLoop(shm_mq_handle *mqh)
  * Make sure the leader apply worker tries to read from our error queue one more
  * time. This guards against the case where we exit uncleanly without sending
  * an ErrorResponse, for example because some code calls proc_exit directly.
+ *
+ * Also explicitly detach from dsm segment to invoke on_dsm_detach callbacks,
+ * if any. See ParallelWorkerShutdown for details.
  */
 static void
 pa_shutdown(int code, Datum arg)
@@ -901,8 +898,6 @@ ParallelApplyWorkerMain(Datum main_arg)
                                (errcode(ERRCODE_OBJECT_NOT_IN_PREREQUISITE_STATE),
                                 errmsg("bad magic number in dynamic shared memory segment")));
 
-       before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
-
        /* Look up the shared information. */
        shared = shm_toc_lookup(toc, PARALLEL_APPLY_KEY_SHARED, false);
        MyParallelShared = shared;
@@ -921,6 +916,13 @@ ParallelApplyWorkerMain(Datum main_arg)
         */
        logicalrep_worker_attach(worker_slot);
 
+       /*
+        * Register the shutdown callback after we are attached to the worker
+        * slot. This is to ensure that MyLogicalRepWorker remains valid when this
+        * callback is invoked.
+        */
+       before_shmem_exit(pa_shutdown, PointerGetDatum(seg));
+
        SpinLockAcquire(&MyParallelShared->mutex);
        MyParallelShared->logicalrep_worker_generation = MyLogicalRepWorker->generation;
        MyParallelShared->logicalrep_worker_slot_no = worker_slot;
index ceea1262315243a937d4fd7a5ce109dee2a3c5ef..87b5593d2db166c861a5bfd886a05d863e113346 100644 (file)
@@ -609,19 +609,37 @@ logicalrep_worker_stop(Oid subid, Oid relid)
 }
 
 /*
- * Stop the logical replication parallel apply worker corresponding to the
- * input slot number.
+ * Stop the given logical replication parallel apply worker.
  *
  * Node that the function sends SIGINT instead of SIGTERM to the parallel apply
  * worker so that the worker exits cleanly.
  */
 void
-logicalrep_pa_worker_stop(int slot_no, uint16 generation)
+logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 {
+       int                     slot_no;
+       uint16          generation;
        LogicalRepWorker *worker;
 
+       SpinLockAcquire(&winfo->shared->mutex);
+       generation = winfo->shared->logicalrep_worker_generation;
+       slot_no = winfo->shared->logicalrep_worker_slot_no;
+       SpinLockRelease(&winfo->shared->mutex);
+
        Assert(slot_no >= 0 && slot_no < max_logical_replication_workers);
 
+       /*
+        * Detach from the error_mq_handle for the parallel apply worker before
+        * stopping it. This prevents the leader apply worker from trying to
+        * receive the message from the error queue that might already be detached
+        * by the parallel apply worker.
+        */
+       if (winfo->error_mq_handle)
+       {
+               shm_mq_detach(winfo->error_mq_handle);
+               winfo->error_mq_handle = NULL;
+       }
+
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
        worker = &LogicalRepCtx->workers[slot_no];
index b57eed052f631ffe57fcdc30f3231c7542382bcc..343e7818965aa94eedf95eb32ddd6cdb048ced92 100644 (file)
@@ -235,7 +235,7 @@ extern bool logicalrep_worker_launch(Oid dbid, Oid subid, const char *subname,
                                                                         Oid userid, Oid relid,
                                                                         dsm_handle subworker_dsm);
 extern void logicalrep_worker_stop(Oid subid, Oid relid);
-extern void logicalrep_pa_worker_stop(int slot_no, uint16 generation);
+extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
 extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);