sub_remove_rels = lappend(sub_remove_rels, remove_rel);
- logicalrep_worker_stop(sub->oid, relid);
+ logicalrep_worker_stop(WORKERTYPE_TABLESYNC, sub->oid, relid);
/*
* For READY state, we would have already dropped the
{
LogicalRepWorker *w = (LogicalRepWorker *) lfirst(lc);
- logicalrep_worker_stop(w->subid, w->relid);
+ logicalrep_worker_stop(w->type, w->subid, w->relid);
}
list_free(subworkers);
}
/*
- * Walks the workers array and searches for one that matches given
- * subscription id and relid.
+ * Walks the workers array and searches for one that matches given worker type,
+ * subscription id, and relation id.
*
- * We are only interested in the leader apply worker or table sync worker.
+ * For apply workers, the relid should be set to InvalidOid, as they manage
+ * changes across all tables. For table sync workers, the relid should be set
+ * to the OID of the relation being synchronized.
*/
LogicalRepWorker *
-logicalrep_worker_find(Oid subid, Oid relid, bool only_running)
+logicalrep_worker_find(LogicalRepWorkerType wtype, Oid subid, Oid relid,
+ bool only_running)
{
int i;
LogicalRepWorker *res = NULL;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
Assert(LWLockHeldByMe(LogicalRepWorkerLock));
- /* Search for attached worker for a given subscription id. */
+ /* Search for an attached worker that matches the specified criteria. */
for (i = 0; i < max_logical_replication_workers; i++)
{
LogicalRepWorker *w = &LogicalRepCtx->workers[i];
continue;
if (w->in_use && w->subid == subid && w->relid == relid &&
- (!only_running || w->proc))
+ w->type == wtype && (!only_running || w->proc))
{
res = w;
break;
}
/*
- * Stop the logical replication worker for subid/relid, if any.
+ * Stop the logical replication worker that matches the specified worker type,
+ * subscription id, and relation id.
*/
void
-logicalrep_worker_stop(Oid subid, Oid relid)
+logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, false);
+ worker = logicalrep_worker_find(wtype, subid, relid, false);
if (worker)
{
}
/*
- * Wake up (using latch) any logical replication worker for specified sub/rel.
+ * Wake up (using latch) any logical replication worker that matches the
+ * specified worker type, subscription id, and relation id.
*/
void
-logicalrep_worker_wakeup(Oid subid, Oid relid)
+logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid, Oid relid)
{
LogicalRepWorker *worker;
+ /* relid must be valid only for table sync workers */
+ Assert((wtype == WORKERTYPE_TABLESYNC) == OidIsValid(relid));
+
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(subid, relid, true);
+ worker = logicalrep_worker_find(wtype, subid, relid, true);
if (worker)
logicalrep_worker_wakeup_ptr(worker);
continue;
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- w = logicalrep_worker_find(sub->oid, InvalidOid, false);
+ w = logicalrep_worker_find(WORKERTYPE_APPLY, sub->oid, InvalidOid,
+ false);
if (w != NULL)
{
CommitTransactionCommand();
/* Find the leader apply worker and signal it. */
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ logicalrep_worker_wakeup(WORKERTYPE_APPLY, MyLogicalRepWorker->subid,
+ InvalidOid);
/* Stop gracefully */
proc_exit(0);
/* Check if the sync worker is still running and bail if not. */
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid, relid,
+ worker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->subid, relid,
false);
LWLockRelease(LogicalRepWorkerLock);
if (!worker)
* waiting.
*/
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- worker = logicalrep_worker_find(MyLogicalRepWorker->subid,
- InvalidOid, false);
+ worker = logicalrep_worker_find(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid,
+ false);
if (worker && worker->proc)
logicalrep_worker_wakeup_ptr(worker);
LWLockRelease(LogicalRepWorkerLock);
*/
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- syncworker = logicalrep_worker_find(MyLogicalRepWorker->subid,
+ syncworker = logicalrep_worker_find(WORKERTYPE_TABLESYNC,
+ MyLogicalRepWorker->subid,
rstate->relid, false);
if (syncworker)
* Signal the leader apply worker, as it may be waiting for
* us.
*/
- logicalrep_worker_wakeup(MyLogicalRepWorker->subid, InvalidOid);
+ logicalrep_worker_wakeup(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid);
}
parallel_stream_nchanges = 0;
* maybe_advance_nonremovable_xid() for details).
*/
LWLockAcquire(LogicalRepWorkerLock, LW_SHARED);
- leader = logicalrep_worker_find(MyLogicalRepWorker->subid,
- InvalidOid, false);
+ leader = logicalrep_worker_find(WORKERTYPE_APPLY,
+ MyLogicalRepWorker->subid, InvalidOid,
+ false);
if (!leader)
{
ereport(ERROR,
extern PGDLLIMPORT List *table_states_not_ready;
extern void logicalrep_worker_attach(int slot);
-extern LogicalRepWorker *logicalrep_worker_find(Oid subid, Oid relid,
+extern LogicalRepWorker *logicalrep_worker_find(LogicalRepWorkerType wtype,
+ Oid subid, Oid relid,
bool only_running);
extern List *logicalrep_workers_find(Oid subid, bool only_running,
bool acquire_lock);
Oid userid, Oid relid,
dsm_handle subworker_dsm,
bool retain_dead_tuples);
-extern void logicalrep_worker_stop(Oid subid, Oid relid);
+extern void logicalrep_worker_stop(LogicalRepWorkerType wtype, Oid subid,
+ Oid relid);
extern void logicalrep_pa_worker_stop(ParallelApplyWorkerInfo *winfo);
-extern void logicalrep_worker_wakeup(Oid subid, Oid relid);
+extern void logicalrep_worker_wakeup(LogicalRepWorkerType wtype, Oid subid,
+ Oid relid);
extern void logicalrep_worker_wakeup_ptr(LogicalRepWorker *worker);
extern int logicalrep_sync_worker_count(Oid subid);