logicalrep_worker_stop(sub->oid, relid);
/*
- * For READY state and SYNCDONE state, we would have already
- * dropped the tablesync origin.
+ * For READY state, we would have already dropped the
+ * tablesync origin.
*/
- if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
+ if (state != SUBREL_STATE_READY)
{
char originname[NAMEDATALEN];
* Drop the tablesync's origin tracking if exists.
*
* It is possible that the origin is not yet created for
- * tablesync worker so passing missing_ok = true. This can
- * happen for the states before SUBREL_STATE_FINISHEDCOPY.
+ * tablesync worker, this can happen for the states before
+ * SUBREL_STATE_FINISHEDCOPY. The tablesync worker or
+ * apply worker can also concurrently try to drop the
+ * origin and by this time the origin might be already
+ * removed. For these reasons, passing missing_ok = true.
*/
ReplicationOriginNameForTablesync(sub->oid, relid, originname,
sizeof(originname));
/*
* Drop the tablesync's origin tracking if exists.
*
- * For SYNCDONE/READY states, the tablesync origin tracking is known
- * to have already been dropped by the tablesync worker.
- *
* It is possible that the origin is not yet created for tablesync
* worker so passing missing_ok = true. This can happen for the states
* before SUBREL_STATE_FINISHEDCOPY.
*/
- if (rstate->state != SUBREL_STATE_SYNCDONE)
- {
- ReplicationOriginNameForTablesync(subid, relid, originname,
- sizeof(originname));
- replorigin_drop_by_name(originname, true, false);
- }
+ ReplicationOriginNameForTablesync(subid, relid, originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
}
/* Clean up dependencies */
/*
* UpdateSubscriptionRelState must be called within a transaction.
- * That transaction will be ended within the finish_sync_worker().
*/
if (!IsTransactionState())
StartTransactionCommand();
MyLogicalRepWorker->relstate,
MyLogicalRepWorker->relstate_lsn);
- /*
- * Cleanup the tablesync origin tracking.
- *
- * Resetting the origin session removes the ownership of the slot.
- * This is needed to allow the origin to be dropped.
- */
- ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
- MyLogicalRepWorker->relid,
- originname,
- sizeof(originname));
- replorigin_session_reset();
- replorigin_session_origin = InvalidRepOriginId;
- replorigin_session_origin_lsn = InvalidXLogRecPtr;
- replorigin_session_origin_timestamp = 0;
-
- /*
- * We expect that origin must be present. The concurrent operations
- * that remove origin like a refresh for the subscription take an
- * access exclusive lock on pg_subscription which prevent the previous
- * operation to update the rel state to SUBREL_STATE_SYNCDONE to
- * succeed.
- */
- replorigin_drop_by_name(originname, false, false);
-
/*
* End streaming so that LogRepWorkerWalRcvConn can be used to drop
* the slot.
/*
* Cleanup the tablesync slot.
*
- * This has to be done after the data changes because otherwise if
+ * This has to be done after updating the state because otherwise if
* there is an error while doing the database operations we won't be
* able to rollback dropped slot.
*/
*/
ReplicationSlotDropAtPubNode(LogRepWorkerWalRcvConn, syncslotname, false);
+ CommitTransactionCommand();
+ pgstat_report_stat(false);
+
+ /*
+ * Start a new transaction to clean up the tablesync origin tracking.
+ * This transaction will be ended within the finish_sync_worker().
+ * Now, even, if we fail to remove this here, the apply worker will
+ * ensure to clean it up afterward.
+ *
+ * We need to do this after the table state is set to SYNCDONE.
+ * Otherwise, if an error occurs while performing the database
+ * operation, the worker will be restarted and the in-memory state of
+ * replication progress (remote_lsn) won't be rolled-back which would
+ * have been cleared before restart. So, the restarted worker will use
+ * invalid replication progress state resulting in replay of
+ * transactions that have already been applied.
+ */
+ StartTransactionCommand();
+
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ MyLogicalRepWorker->relid,
+ originname,
+ sizeof(originname));
+
+ /*
+ * Resetting the origin session removes the ownership of the slot.
+ * This is needed to allow the origin to be dropped.
+ */
+ replorigin_session_reset();
+ replorigin_session_origin = InvalidRepOriginId;
+ replorigin_session_origin_lsn = InvalidXLogRecPtr;
+ replorigin_session_origin_timestamp = 0;
+
+ /*
+ * Drop the tablesync's origin tracking if exists.
+ *
+ * There is a chance that the user is concurrently performing refresh
+ * for the subscription where we remove the table state and its origin
+ * or the apply worker would have removed this origin. So passing
+ * missing_ok = true.
+ */
+ replorigin_drop_by_name(originname, true, false);
+
finish_sync_worker();
}
else
*/
if (current_lsn >= rstate->lsn)
{
+ char originname[NAMEDATALEN];
+
rstate->state = SUBREL_STATE_READY;
rstate->lsn = current_lsn;
if (!started_tx)
}
/*
- * Update the state to READY.
+ * Remove the tablesync origin tracking if exists.
+ *
+ * There is a chance that the user is concurrently performing
+ * refresh for the subscription where we remove the table
+ * state and its origin or the tablesync worker would have
+ * already removed this origin. We can't rely on tablesync
+ * worker to remove the origin tracking as if there is any
+ * error while dropping we won't restart it to drop the
+ * origin. So passing missing_ok = true.
+ */
+ ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+ rstate->relid,
+ originname,
+ sizeof(originname));
+ replorigin_drop_by_name(originname, true, false);
+
+ /*
+ * Update the state to READY only after the origin cleanup.
*/
UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
rstate->relid, rstate->state,