Fix the incorrect assertion introduced in commit 7f13ac8123.
authorAmit Kapila <[email protected]>
Mon, 29 Aug 2022 02:40:10 +0000 (08:10 +0530)
committerAmit Kapila <[email protected]>
Mon, 29 Aug 2022 02:40:10 +0000 (08:10 +0530)
It has been incorrectly assumed in commit 7f13ac8123 that we can either
purge all or none in the catalog modifying xids list retrieved from a
serialized snapshot. It is quite possible that some of the xids in that
array are old enough to be pruned but not others.

As per buildfarm

Author: Amit Kapila and Masahiko Sawada
Reviwed-by: Masahiko Sawada
Discussion: https://postgr.es/m/CAA4eK1LBtv6ayE+TvCcPmC-xse=DVg=SmbyQD1nv_AaqcpUJEg@mail.gmail.com

contrib/test_decoding/expected/catalog_change_snapshot.out
contrib/test_decoding/specs/catalog_change_snapshot.spec
src/backend/replication/logical/snapbuild.c

index dc4f9b7018f70032beca0ef7749d492fefeb61f1..d2a4bdfcc131501fb4ce5410c7a4c02bce1cc291 100644 (file)
@@ -1,4 +1,4 @@
-Parsed test spec with 2 sessions
+Parsed test spec with 3 sessions
 
 starting permutation: s0_init s0_begin s0_savepoint s0_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s0_commit s1_get_changes
 step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
@@ -42,3 +42,57 @@ COMMIT
 stop    
 (1 row)
 
+
+starting permutation: s0_init s0_begin s0_truncate s2_begin s2_truncate s1_checkpoint s1_get_changes s0_commit s0_begin s0_insert s1_checkpoint s1_get_changes s2_commit s1_checkpoint s1_get_changes s0_commit s1_get_changes
+step s0_init: SELECT 'init' FROM pg_create_logical_replication_slot('isolation_slot', 'test_decoding');
+?column?
+--------
+init    
+(1 row)
+
+step s0_begin: BEGIN;
+step s0_truncate: TRUNCATE tbl1;
+step s2_begin: BEGIN;
+step s2_truncate: TRUNCATE tbl2;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data
+----
+(0 rows)
+
+step s0_commit: COMMIT;
+step s0_begin: BEGIN;
+step s0_insert: INSERT INTO tbl1 VALUES (1);
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl1: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s2_commit: COMMIT;
+step s1_checkpoint: CHECKPOINT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                   
+---------------------------------------
+BEGIN                                  
+table public.tbl2: TRUNCATE: (no-flags)
+COMMIT                                 
+(3 rows)
+
+step s0_commit: COMMIT;
+step s1_get_changes: SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0');
+data                                                         
+-------------------------------------------------------------
+BEGIN                                                        
+table public.tbl1: INSERT: val1[integer]:1 val2[integer]:null
+COMMIT                                                       
+(3 rows)
+
+?column?
+--------
+stop    
+(1 row)
+
index 2971ddc69cb618ad2e447f240931983d7306752b..ff8f68489bb3af66dc4280f41553a0f1bcae9c6d 100644 (file)
@@ -3,12 +3,15 @@
 setup
 {
     DROP TABLE IF EXISTS tbl1;
+    DROP TABLE IF EXISTS tbl2;
     CREATE TABLE tbl1 (val1 integer, val2 integer);
+    CREATE TABLE tbl2 (val1 integer, val2 integer);
 }
 
 teardown
 {
     DROP TABLE tbl1;
+    DROP TABLE tbl2;
     SELECT 'stop' FROM pg_drop_replication_slot('isolation_slot');
 }
 
@@ -26,6 +29,12 @@ setup { SET synchronous_commit=on; }
 step "s1_checkpoint" { CHECKPOINT; }
 step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_slot', NULL, NULL, 'skip-empty-xacts', '1', 'include-xids', '0'); }
 
+session "s2"
+setup { SET synchronous_commit=on; }
+step "s2_begin" { BEGIN; }
+step "s2_truncate" { TRUNCATE tbl2; }
+step "s2_commit" { COMMIT; }
+
 # For the transaction that TRUNCATEd the table tbl1, the last decoding decodes
 # only its COMMIT record, because it starts from the RUNNING_XACTS record emitted
 # during the first checkpoint execution.  This transaction must be marked as
@@ -37,3 +46,14 @@ step "s1_get_changes" { SELECT data FROM pg_logical_slot_get_changes('isolation_
 # record written by bgwriter.  One might think we can either stop the bgwriter or
 # increase LOG_SNAPSHOT_INTERVAL_MS but it's not practical via tests.
 permutation "s0_init" "s0_begin" "s0_savepoint" "s0_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
+
+# Test that we can purge the old catalog modifying transactions after restoring
+# them from the serialized snapshot. The first checkpoint will serialize the list
+# of two catalog modifying xacts. The purpose of the second checkpoint is to allow
+# partial pruning of the list of catalog modifying xact. The third checkpoint
+# followed by get_changes establishes a restart_point at the first checkpoint LSN.
+# The last get_changes will start decoding from the first checkpoint which
+# restores the list of catalog modifying xacts and then while decoding the second
+# checkpoint record it prunes one of the xacts in that list and when decoding the
+# next checkpoint, it will completely prune that list.
+permutation "s0_init" "s0_begin" "s0_truncate" "s2_begin" "s2_truncate" "s1_checkpoint" "s1_get_changes" "s0_commit" "s0_begin" "s0_insert" "s1_checkpoint" "s1_get_changes" "s2_commit" "s1_checkpoint" "s1_get_changes" "s0_commit" "s1_get_changes"
index 1ff2c12240d58f5c4647316dd168e3feaa5d2f4c..bf72ad45ec7334dbbfb9195d4dd916ffd4f8e711 100644 (file)
@@ -969,28 +969,40 @@ SnapBuildPurgeOlderTxn(SnapBuild *builder)
        pfree(workspace);
 
        /*
-        * Either all the xacts got purged or none. It is only possible to
-        * partially remove the xids from this array if one or more of the xids
-        * are still running but not all. That can happen if we start decoding
-        * from a point (LSN where the snapshot state became consistent) where all
-        * the xacts in this were running and then at least one of those got
-        * committed and a few are still running. We will never start from such a
-        * point because we won't move the slot's restart_lsn past the point where
-        * the oldest running transaction's restart_decoding_lsn is.
+        * Purge xids in ->catchange as well. The purged array must also be sorted
+        * in xidComparator order.
         */
-       if (builder->catchange.xcnt == 0 ||
-               TransactionIdFollowsOrEquals(builder->catchange.xip[0],
-                                                                        builder->xmin))
-               return;
+       if (builder->catchange.xcnt > 0)
+       {
+               /*
+                * Since catchange.xip is sorted, we find the lower bound of xids that
+                * are still interesting.
+                */
+               for (off = 0; off < builder->catchange.xcnt; off++)
+               {
+                       if (TransactionIdFollowsOrEquals(builder->catchange.xip[off],
+                                                                                        builder->xmin))
+                               break;
+               }
 
-       Assert(TransactionIdFollows(builder->xmin,
-                                                               builder->catchange.xip[builder->catchange.xcnt - 1]));
-       pfree(builder->catchange.xip);
-       builder->catchange.xip = NULL;
-       builder->catchange.xcnt = 0;
+               surviving_xids = builder->catchange.xcnt - off;
 
-       elog(DEBUG3, "purged catalog modifying transactions, oldest running xid %u",
-                builder->xmin);
+               if (surviving_xids > 0)
+               {
+                       memmove(builder->catchange.xip, &(builder->catchange.xip[off]),
+                                       surviving_xids * sizeof(TransactionId));
+               }
+               else
+               {
+                       pfree(builder->catchange.xip);
+                       builder->catchange.xip = NULL;
+               }
+
+               elog(DEBUG3, "purged catalog modifying transactions from %u to %u, xmin: %u, xmax: %u",
+                        (uint32) builder->catchange.xcnt, (uint32) surviving_xids,
+                        builder->xmin, builder->xmax);
+               builder->catchange.xcnt = surviving_xids;
+       }
 }
 
 /*