Drop replication origin slots before tablesync worker exits.
authorAmit Kapila <[email protected]>
Tue, 30 Aug 2022 03:21:41 +0000 (08:51 +0530)
committerAmit Kapila <[email protected]>
Tue, 30 Aug 2022 03:21:41 +0000 (08:51 +0530)
Currently, the replication origin tracking of the tablesync worker is
dropped by the apply worker. So, there will be a small lag between the
tablesync worker exit and its origin tracking got removed. In the
meantime, new tablesync workers can be launched and will try to set up
a new origin tracking. This can lead the system to reach max configured
limit (max_replication_slots) even if the user has configured the max
limit considering the number of tablesync workers required in the system.

We decided not to back-patch as this can occur in very narrow
circumstances and users have to option to increase the configured limit by
increasing max_replication_slots.

Reported-by: Hubert Depesz Lubaczewski
Author: Ajin Cherian
Reviwed-by: Masahiko Sawada, Peter Smith, Hou Zhijie, Amit Kapila
Discussion: https://postgr.es/m/20220714115155[email protected]

src/backend/commands/subscriptioncmds.c
src/backend/replication/logical/tablesync.c

index 670b219c8d4c1c8e36883e2a668e1c27eaa2c612..f87796e5afefde307ab8af3805c6a94f278843ff 100644 (file)
@@ -919,10 +919,10 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                                logicalrep_worker_stop(sub->oid, relid);
 
                                /*
-                                * For READY state, we would have already dropped the
-                                * tablesync origin.
+                                * For READY state and SYNCDONE state, we would have already
+                                * dropped the tablesync origin.
                                 */
-                               if (state != SUBREL_STATE_READY)
+                               if (state != SUBREL_STATE_READY && state != SUBREL_STATE_SYNCDONE)
                                {
                                        char            originname[NAMEDATALEN];
 
@@ -930,11 +930,8 @@ AlterSubscription_refresh(Subscription *sub, bool copy_data,
                                         * Drop the tablesync's origin tracking if exists.
                                         *
                                         * It is possible that the origin is not yet created for
-                                        * tablesync worker, this can happen for the states before
-                                        * SUBREL_STATE_FINISHEDCOPY. The apply worker can also
-                                        * concurrently try to drop the origin and by this time
-                                        * the origin might be already removed. For these reasons,
-                                        * passing missing_ok = true.
+                                        * tablesync worker so passing missing_ok = true. This can
+                                        * happen for the states before SUBREL_STATE_FINISHEDCOPY.
                                         */
                                        ReplicationOriginNameForTablesync(sub->oid, relid, originname,
                                                                                                          sizeof(originname));
@@ -1507,13 +1504,19 @@ DropSubscription(DropSubscriptionStmt *stmt, bool isTopLevel)
                /*
                 * Drop the tablesync's origin tracking if exists.
                 *
+                * For SYNCDONE/READY states, the tablesync origin tracking is known
+                * to have already been dropped by the tablesync worker.
+                *
                 * It is possible that the origin is not yet created for tablesync
                 * worker so passing missing_ok = true. This can happen for the states
                 * before SUBREL_STATE_FINISHEDCOPY.
                 */
-               ReplicationOriginNameForTablesync(subid, relid, originname,
-                                                                                 sizeof(originname));
-               replorigin_drop_by_name(originname, true, false);
+               if (rstate->state != SUBREL_STATE_SYNCDONE)
+               {
+                       ReplicationOriginNameForTablesync(subid, relid, originname,
+                                                                                         sizeof(originname));
+                       replorigin_drop_by_name(originname, true, false);
+               }
        }
 
        /* Clean up dependencies */
index d37d8a0d74a450b7e294cecacbd8832c8cf9bfd0..91ba49a14bd4ee51b618008fb9b38cfbfc51819a 100644 (file)
@@ -291,6 +291,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
        {
                TimeLineID      tli;
                char            syncslotname[NAMEDATALEN] = {0};
+               char            originname[NAMEDATALEN] = {0};
 
                MyLogicalRepWorker->relstate = SUBREL_STATE_SYNCDONE;
                MyLogicalRepWorker->relstate_lsn = current_lsn;
@@ -309,6 +310,30 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                                                                   MyLogicalRepWorker->relstate,
                                                                   MyLogicalRepWorker->relstate_lsn);
 
+               /*
+                * Cleanup the tablesync origin tracking.
+                *
+                * Resetting the origin session removes the ownership of the slot.
+                * This is needed to allow the origin to be dropped.
+                */
+               ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
+                                                                                 MyLogicalRepWorker->relid,
+                                                                                 originname,
+                                                                                 sizeof(originname));
+               replorigin_session_reset();
+               replorigin_session_origin = InvalidRepOriginId;
+               replorigin_session_origin_lsn = InvalidXLogRecPtr;
+               replorigin_session_origin_timestamp = 0;
+
+               /*
+                * We expect that origin must be present. The concurrent operations
+                * that remove origin like a refresh for the subscription take an
+                * access exclusive lock on pg_subscription which prevent the previous
+                * operation to update the rel state to SUBREL_STATE_SYNCDONE to
+                * succeed.
+                */
+               replorigin_drop_by_name(originname, false, false);
+
                /*
                 * End streaming so that LogRepWorkerWalRcvConn can be used to drop
                 * the slot.
@@ -318,7 +343,7 @@ process_syncing_tables_for_sync(XLogRecPtr current_lsn)
                /*
                 * Cleanup the tablesync slot.
                 *
-                * This has to be done after updating the state because otherwise if
+                * This has to be done after the data changes because otherwise if
                 * there is an error while doing the database operations we won't be
                 * able to rollback dropped slot.
                 */
@@ -441,8 +466,6 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                         */
                        if (current_lsn >= rstate->lsn)
                        {
-                               char            originname[NAMEDATALEN];
-
                                rstate->state = SUBREL_STATE_READY;
                                rstate->lsn = current_lsn;
                                if (!started_tx)
@@ -452,26 +475,7 @@ process_syncing_tables_for_apply(XLogRecPtr current_lsn)
                                }
 
                                /*
-                                * Remove the tablesync origin tracking if exists.
-                                *
-                                * The normal case origin drop is done here instead of in the
-                                * process_syncing_tables_for_sync function because we don't
-                                * allow to drop the origin till the process owning the origin
-                                * is alive.
-                                *
-                                * There is a chance that the user is concurrently performing
-                                * refresh for the subscription where we remove the table
-                                * state and its origin and by this time the origin might be
-                                * already removed. So passing missing_ok = true.
-                                */
-                               ReplicationOriginNameForTablesync(MyLogicalRepWorker->subid,
-                                                                                                 rstate->relid,
-                                                                                                 originname,
-                                                                                                 sizeof(originname));
-                               replorigin_drop_by_name(originname, true, false);
-
-                               /*
-                                * Update the state to READY only after the origin cleanup.
+                                * Update the state to READY.
                                 */
                                UpdateSubscriptionRelState(MyLogicalRepWorker->subid,
                                                                                   rstate->relid, rstate->state,