Optimize the origin drop functionality.
authorAmit Kapila <[email protected]>
Fri, 3 Feb 2023 02:59:08 +0000 (08:29 +0530)
committerAmit Kapila <[email protected]>
Fri, 3 Feb 2023 02:59:08 +0000 (08:29 +0530)
To interlock against concurrent drops, we use to hold ExclusiveLock on
pg_replication_origin till xact commit. This blocks even concurrent drops
of different origins by tablesync workers. So, instead, lock the specific
origin to interlock against concurrent drops.

This reduces the test time variability in src/test/subscription where
multiple tables are being synced.

Author: Vignesh C
Reviewed-by: Hou Zhijie, Amit Kapila
Discussion: https://postgr.es/m/1412708.1674417574@sss.pgh.pa.us

src/backend/replication/logical/origin.c

index b754c43840f1890c4eb02b82381324a8f1c79e72..2c04c8707dc29d8a8be0d756f483cae55d454519 100644 (file)
@@ -338,16 +338,14 @@ replorigin_create(const char *roname)
  * Helper function to drop a replication origin.
  */
 static void
-replorigin_drop_guts(Relation rel, RepOriginId roident, bool nowait)
+replorigin_state_clear(RepOriginId roident, bool nowait)
 {
-   HeapTuple   tuple;
    int         i;
 
    /*
-    * First, clean up the slot state info, if there is any matching slot.
+    * Clean up the slot state info, if there is any matching slot.
     */
 restart:
-   tuple = NULL;
    LWLockAcquire(ReplicationOriginLock, LW_EXCLUSIVE);
 
    for (i = 0; i < max_replication_slots; i++)
@@ -402,19 +400,6 @@ restart:
    }
    LWLockRelease(ReplicationOriginLock);
    ConditionVariableCancelSleep();
-
-   /*
-    * Now, we can delete the catalog entry.
-    */
-   tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
-   if (!HeapTupleIsValid(tuple))
-       elog(ERROR, "cache lookup failed for replication origin with ID %d",
-            roident);
-
-   CatalogTupleDelete(rel, &tuple->t_self);
-   ReleaseSysCache(tuple);
-
-   CommandCounterIncrement();
 }
 
 /*
@@ -427,24 +412,43 @@ replorigin_drop_by_name(const char *name, bool missing_ok, bool nowait)
 {
    RepOriginId roident;
    Relation    rel;
+   HeapTuple   tuple;
 
    Assert(IsTransactionState());
 
-   /*
-    * To interlock against concurrent drops, we hold ExclusiveLock on
-    * pg_replication_origin till xact commit.
-    *
-    * XXX We can optimize this by acquiring the lock on a specific origin by
-    * using LockSharedObject if required. However, for that, we first to
-    * acquire a lock on ReplicationOriginRelationId, get the origin_id, lock
-    * the specific origin and then re-check if the origin still exists.
-    */
-   rel = table_open(ReplicationOriginRelationId, ExclusiveLock);
+   rel = table_open(ReplicationOriginRelationId, RowExclusiveLock);
 
    roident = replorigin_by_name(name, missing_ok);
 
-   if (OidIsValid(roident))
-       replorigin_drop_guts(rel, roident, nowait);
+   /* Lock the origin to prevent concurrent drops. */
+   LockSharedObject(ReplicationOriginRelationId, roident, 0,
+                    AccessExclusiveLock);
+
+   tuple = SearchSysCache1(REPLORIGIDENT, ObjectIdGetDatum(roident));
+   if (!HeapTupleIsValid(tuple))
+   {
+       if (!missing_ok)
+           elog(ERROR, "cache lookup failed for replication origin with ID %d",
+                roident);
+
+       /*
+        * We don't need to retain the locks if the origin is already dropped.
+        */
+       UnlockSharedObject(ReplicationOriginRelationId, roident, 0,
+                          AccessExclusiveLock);
+       table_close(rel, RowExclusiveLock);
+       return;
+   }
+
+   replorigin_state_clear(roident, nowait);
+
+   /*
+    * Now, we can delete the catalog entry.
+    */
+   CatalogTupleDelete(rel, &tuple->t_self);
+   ReleaseSysCache(tuple);
+
+   CommandCounterIncrement();
 
    /* We keep the lock on pg_replication_origin until commit */
    table_close(rel, NoLock);