During exit, the logical replication apply worker tries to release session
level locks, if any. However, if the apply worker exits due to an error
before its connection is initialized, trying to release locks can lead to
assertion failure. The locks will be acquired once the worker is
initialized, so we don't need to release them till the worker
initialization is complete.
Reported-by: Alexander Lakhin
Author: Hou Zhijie based on inputs from Sawada Masahiko and Amit Kapila
Reviewed-by: Amit Kapila
Discussion: https://postgr.es/m/
2185d65f-5aae-3efa-c48f-
fb42b173ef5c@gmail.com
int worker_slot = DatumGetInt32(main_arg);
char originname[NAMEDATALEN];
+ InitializingApplyWorker = true;
+
/* Setup signal handling. */
pqsignal(SIGHUP, SignalHandlerForConfigReload);
pqsignal(SIGINT, SignalHandlerForShutdownRequest);
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Setup replication origin tracking. */
StartTransactionCommand();
ReplicationOriginNameForLogicalRep(MySubscription->oid, InvalidOid,
* Session level locks may be acquired outside of a transaction in
* parallel apply mode and will not be released when the worker
* terminates, so manually release all locks before the worker exits.
+ *
+ * The locks will be acquired once the worker is initialized.
*/
- LockReleaseAll(DEFAULT_LOCKMETHOD, true);
+ if (!InitializingApplyWorker)
+ LockReleaseAll(DEFAULT_LOCKMETHOD, true);
ApplyLauncherWakeup();
}
*/
static uint32 parallel_stream_nchanges = 0;
+/* Are we initializing a apply worker? */
+bool InitializingApplyWorker = false;
+
/*
* We enable skipping all data modification changes (INSERT, UPDATE, etc.) for
* the subscription if the remote transaction's finish LSN matches the subskiplsn.
WalRcvStreamOptions options;
int server_version;
+ InitializingApplyWorker = true;
+
/* Attach to slot */
logicalrep_worker_attach(worker_slot);
InitializeApplyWorker();
+ InitializingApplyWorker = false;
+
/* Connect to the origin and start the replication. */
elog(DEBUG1, "connecting to publisher using connection string \"%s\"",
MySubscription->conninfo);
extern PGDLLIMPORT bool in_remote_transaction;
+extern PGDLLIMPORT bool InitializingApplyWorker;
+
extern void logicalrep_worker_attach(int slot);
extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
bool only_running);