Fix assertion failure in apply worker.
authorAmit Kapila <[email protected]>
Wed, 3 May 2023 04:43:13 +0000 (10:13 +0530)
committerAmit Kapila <[email protected]>
Wed, 3 May 2023 04:47:49 +0000 (10:17 +0530)
During exit, the logical replication apply worker tries to release session
level locks, if any. However, if the apply worker exits due to an error
before its connection is initialized, trying to release locks can lead to
assertion failure. The locks will be acquired once the worker is
initialized, so we don't need to release them till the worker
initialization is complete.

Reported-by: Alexander Lakhin
Author: Hou Zhijie based on inputs from Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/2185d65f-5aae-3efa-c48f-fb42b173ef5c@gmail.com

src/backend/replication/logical/applyparallelworker.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index 45186837795e63f1c53926dedc571cdf02bbdad4..ee7a18137fcee16d606d189ec8aa7a4b8babd76b 100644 (file)
@@ -873,6 +873,8 @@ ParallelApplyWorkerMain(Datum main_arg)
        int                     worker_slot = DatumGetInt32(main_arg);
        char            originname[NAMEDATALEN];
 
+       InitializingApplyWorker = true;
+
        /* Setup signal handling. */
        pqsignal(SIGHUP, SignalHandlerForConfigReload);
        pqsignal(SIGINT, SignalHandlerForShutdownRequest);
@@ -940,6 +942,8 @@ ParallelApplyWorkerMain(Datum main_arg)
 
        InitializeApplyWorker();
 
+       InitializingApplyWorker = false;
+
        /* Setup replication origin tracking. */
        StartTransactionCommand();
        ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
index 970d170e73aa18e7cd86f0206b0ec62978c397dd..ceea1262315243a937d4fd7a5ce109dee2a3c5ef 100644 (file)
@@ -797,8 +797,11 @@ logicalrep_worker_onexit(int code, Datum arg)
         * Session level locks may be acquired outside of a transaction in
         * parallel apply mode and will not be released when the worker
         * terminates, so manually release all locks before the worker exits.
+        *
+        * The locks will be acquired once the worker is initialized.
         */
-       LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+       if (!InitializingApplyWorker)
+               LockReleaseAll(DEFAULT_LOCKMETHOD, true);
 
        ApplyLauncherWakeup();
 }
index dbf88c9553191b48c91dc9510ad2d5e55d0b2c53..879309b316c416fe78ed8152ab341956667e8d5b 100644 (file)
@@ -331,6 +331,9 @@ static TransactionId stream_xid = InvalidTransactionId;
  */
 static uint32 parallel_stream_nchanges = 0;
 
+/* Are we initializing a apply worker? */
+bool           InitializingApplyWorker = false;
+
 /*
  * We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
  * the subscription if the remote transaction's finish LSN matches the subskiplsn.
@@ -4526,6 +4529,8 @@ ApplyWorkerMain(Datum main_arg)
        WalRcvStreamOptions options;
        int                     server_version;
 
+       InitializingApplyWorker = true;
+
        /* Attach to slot */
        logicalrep_worker_attach(worker_slot);
 
@@ -4548,6 +4553,8 @@ ApplyWorkerMain(Datum main_arg)
 
        InitializeApplyWorker();
 
+       InitializingApplyWorker = false;
+
        /* Connect to the origin and start the replication. */
        elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
                 MySubscription->conninfo);
index dce71d2c501af81fdf3e6dc13a56aebff3013a90..b57eed052f631ffe57fcdc30f3231c7542382bcc 100644 (file)
@@ -225,6 +225,8 @@ extern PGDLLIMPORT LogicalRepWorker *MyLogicalRepWorker;
 
 extern PGDLLIMPORT bool in_remote_transaction;
 
+extern PGDLLIMPORT bool InitializingApplyWorker;
+
 extern void logicalrep_worker_attach(int slot);
 extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
                                                                                                bool only_running);