Add worker type argument to logical replication worker functions.
authorAmit Kapila <[email protected]>
Tue, 28 Oct 2025 05:47:50 +0000 (05:47 +0000)
committerAmit Kapila <[email protected]>
Tue, 28 Oct 2025 05:47:50 +0000 (05:47 +0000)
Extend logicalrep_worker_stop, logicalrep_worker_wakeup, and
logicalrep_worker_find to accept a worker type argument. This change
enables differentiation between logical replication worker types, such as
apply workers and table sync workers. While preserving existing behavior,
it lays the groundwork for upcoming patch to add sequence synchronization
workers.

Author: Vignesh C <[email protected]>
Reviewed-by: shveta malik <[email protected]>
Reviewed-by: Peter Smith <[email protected]>
Reviewed-by: Chao Li <[email protected]>
Reviewed-by: Hayato Kuroda <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Discussion: https://postgr.es/m/CAA4eK1LC+KJiAkSrpE_NwvNdidw9F2os7GERUeSxSKv71gXysQ@mail.gmail.com

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/launcher.c
src/backend/replication/logical/syncutils.c
src/backend/replication/logical/tablesync.c
src/backend/replication/logical/worker.c
src/include/replication/worker_internal.h

index a0974d71de1f6476fea3aa412de37abde750b300..1f45444b4992b4f80e93a3d08c196dbc9fcfd67f 100644 (file)
@@ -1082,7 +1082,7 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
 
                sub_remove_rels = lappend(sub_remove_rels, remove_rel);
 
-               logicalrep_worker_stop(sub->oid, relid);
+               logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
 
                /*
                 * For READY state, we would have already dropped the
@@ -2134,7 +2134,7 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
    {
        LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
 
-       logicalrep_worker_stop(w->subid, w->relid);
+       logicalrep_worker_stop(w->type, w->subid, w->relid);
    }
    list_free(subworkers);
 
index 218cefe86e20e2b473bbf50fa58438b933a052e9..95b5cae9a5521146c2a728d4691023b5ed7f5b5d 100644 (file)
@@ -245,20 +245,25 @@ WaitForReplicationWorkerAttach(LogicalRepWorker *worker,
 }
 
 /*
- * Walks the workers array and searches for one that matches given
- * subscription id and relid.
+ * Walks the workers array and searches for one that matches given worker type,
+ * subscription id, and relation id.
  *
- * We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables. For table sync workers, the relid should be set
+ * to the OID of the relation being synchronized.
  */
 LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
+                      bool only_running)
 {
    int         i;
    LogicalRepWorker *res = NULL;
 
+   /* relid must be valid only for table sync workers */
+   Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
    Assert(LWLockHeldByMe(LogicalRepWorkerLock));
 
-   /* Search for attached worker for a given subscription id. */
+   /* Search for an attached worker that matches the specified criteria. */
    for (i = 0; i < max_logical_replication_workers; i++)
    {
        LogicalRepWorker *w = &LogicalRepCtx->workers[i];
@@ -268,7 +273,7 @@ logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
            continue;
 
        if (w->in_use && w->subid == subid && w->relid == relid &&
-           (!only_running || w->proc))
+           w->type == wtype && (!only_running || w->proc))
        {
            res = w;
            break;
@@ -627,16 +632,20 @@ logicalrep_worker_stop_internal(LogicalRepWorker *worker, int signo)
 }
 
 /*
- * Stop the logical replication worker for subid/relid, if any.
+ * Stop the logical replication worker that matches the specified worker type,
+ * subscription id, and relation id.
  */
 void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
 {
    LogicalRepWorker *worker;
 
+   /* relid must be valid only for table sync workers */
+   Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
    LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-   worker = logicalrep_worker_find(subid, relid, false);
+   worker = logicalrep_worker_find(wtype, subid, relid, false);
 
    if (worker)
    {
@@ -694,16 +703,20 @@ logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo)
 }
 
 /*
- * Wake up (using latch) any logical replication worker for specified sub/rel.
+ * Wake up (using latch) any logical replication worker that matches the
+ * specified worker type, subscription id, and relation id.
  */
 void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
 {
    LogicalRepWorker *worker;
 
+   /* relid must be valid only for table sync workers */
+   Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
    LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-   worker = logicalrep_worker_find(subid, relid, true);
+   worker = logicalrep_worker_find(wtype, subid, relid, true);
 
    if (worker)
        logicalrep_worker_wakeup_ptr(worker);
@@ -1260,7 +1273,8 @@ ApplyLauncherMain(Datum main_arg)
                continue;
 
            LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-           w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+           w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
+                                      false);
 
            if (w != NULL)
            {
index e452a1e78d4c156d6f75d9c96d1ed29a6a833536..ae8c93859168ee2d6545e3814dd31ec8c20cad1b 100644 (file)
@@ -69,7 +69,8 @@ FinishSyncWorker(void)
    CommitTransactionCommand();
 
    /* Find the leader apply worker and signal it. */
-   logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+   logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+                            InvalidOid);
 
    /* Stop gracefully */
    proc_exit(0);
index 40e1ed3c20eaa3b99632b6c1508e95706142f4a3..58c98488d7b7bf5f620e69d93f41604fff5fa00e 100644 (file)
@@ -160,7 +160,8 @@ wait_for_table_state_change(Oid relid, char expected_state)
 
        /* Check if the sync worker is still running and bail if not. */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-       worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
+       worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
+                                       MyLogicalRepWorker->subid, relid,
                                        false);
        LWLockRelease(LogicalRepWorkerLock);
        if (!worker)
@@ -207,8 +208,9 @@ wait_for_worker_state_change(char expected_state)
         * waiting.
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-       worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                       InvalidOid, false);
+       worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+                                       MyLogicalRepWorker->subid, InvalidOid,
+                                       false);
        if (worker && worker->proc)
            logicalrep_worker_wakeup_ptr(worker);
        LWLockRelease(LogicalRepWorkerLock);
@@ -476,7 +478,8 @@ ProcessSyncingTablesForApply(XLogRecPtr current_lsn)
             */
            LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
 
-           syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+           syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
+                                               MyLogicalRepWorker->subid,
                                                rstate->relid, false);
 
            if (syncworker)
index 5df5a4612b6628ac20da41f8acc217fd4cbd9711..7edd1c9cf060f951a702f6bd2312c090bff315d0 100644 (file)
@@ -1817,7 +1817,8 @@ apply_handle_stream_start(StringInfo s)
                 * Signal the leader apply worker, as it may be waiting for
                 * us.
                 */
-               logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+               logicalrep_worker_wakeup(WORKERTYPE_APPLY,
+                                        MyLogicalRepWorker->subid, InvalidOid);
            }
 
            parallel_stream_nchanges = 0;
@@ -3284,8 +3285,9 @@ FindDeletedTupleInLocalRel(Relation localrel, Oid localidxoid,
         * maybe_advance_nonremovable_xid() for details).
         */
        LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
-       leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
-                                       InvalidOid, false);
+       leader = logicalrep_worker_find(WORKERTYPE_APPLY,
+                                       MyLogicalRepWorker->subid, InvalidOid,
+                                       false);
        if (!leader)
        {
            ereport(ERROR,
index ae352f6e69121397cdfe0ce7fdd0fb673c86740b..e23fa9a4514efb2059c09e5c181baa2c1599835a 100644 (file)
@@ -254,7 +254,8 @@ extern PGDLLIMPORT bool InitializingApplyWorker;
 extern PGDLLIMPORT List *table_states_not_ready;
 
 extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
+                                               Oid subid, Oid relid,
                                                bool only_running);
 extern List *logicalrep_workers_find(Oid subid, bool only_running,
                                     bool acquire_lock);
@@ -263,9 +264,11 @@ extern bool logicalrep_worker_launch(LogicalRepWorkerType wtype,
                                     Oid userid, Oid relid,
                                     dsm_handle subworker_dsm,
                                     bool retain_dead_tuples);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
+                                  Oid relid);
 extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
+                                    Oid relid);
 extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
 
 extern int logicalrep_sync_worker_count(Oid subid);