Add a test to verify that subscription to the standby works.
authorAmit Kapila <[email protected]>
Thu, 27 Apr 2023 08:52:53 +0000 (14:22 +0530)
committerAmit Kapila <[email protected]>
Thu, 27 Apr 2023 08:52:53 +0000 (14:22 +0530)
Author: Bertrand Drouvot
Reviewed-by: Vignesh C, Alvaro Herrera, Amit Kapila
Discussion: https://postgr.es/m/2fefa454-5a70-2174-ddbf-4a0e41537139@gmail.com

src/test/perl/PostgreSQL/Test/Cluster.pm
src/test/recovery/t/035_standby_logical_decoding.pl

index 6f7f4e5de4cb0f5148c7e31e2752208595b879a1..bc9b5dc6444cc438bb9b2987eec1613ed832685f 100644 (file)
@@ -2611,8 +2611,14 @@ When doing physical replication, the standby is usually identified by
 passing its PostgreSQL::Test::Cluster instance.  When doing logical
 replication, standby_name identifies a subscription.
 
-The default value of target_lsn is $node->lsn('write'), which ensures
-that the standby has caught up to what has been committed on the primary.
+When not in recovery, the default value of target_lsn is $node->lsn('write'),
+which ensures that the standby has caught up to what has been committed on
+the primary.
+
+When in recovery, the default value of target_lsn is $node->lsn('replay')
+instead which ensures that the cascaded standby has caught up to what has been
+replayed on the standby.
+
 If you pass an explicit value of target_lsn, it should almost always be
 the primary's write LSN; so this parameter is seldom needed except when
 querying some intermediate replication node rather than the primary.
@@ -2644,7 +2650,16 @@ sub wait_for_catchup
        }
        if (!defined($target_lsn))
        {
-               $target_lsn = $self->lsn('write');
+               my $isrecovery = $self->safe_psql('postgres', "SELECT pg_is_in_recovery()");
+               chomp($isrecovery);
+               if ($isrecovery eq 't')
+               {
+                       $target_lsn = $self->lsn('replay');
+               }
+               else
+               {
+                       $target_lsn = $self->lsn('write');
+               }
        }
        print "Waiting for replication conn "
          . $standby_name . "'s "
index b8f5311fe9026297c7ba1ec95c07d8bb84fccc24..f6d644741276d0b0994905a26b4f04d1d636c31f 100644 (file)
@@ -10,12 +10,17 @@ use PostgreSQL::Test::Cluster;
 use PostgreSQL::Test::Utils;
 use Test::More;
 
-my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot);
+my ($stdin,             $stdout,            $stderr,
+       $cascading_stdout,  $cascading_stderr,  $subscriber_stdin,
+       $subscriber_stdout, $subscriber_stderr, $ret,
+       $handle,            $slot);
 
 my $node_primary = PostgreSQL::Test::Cluster->new('primary');
 my $node_standby = PostgreSQL::Test::Cluster->new('standby');
 my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+my $node_subscriber = PostgreSQL::Test::Cluster->new('subscriber');
 my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $psql_timeout    = IPC::Run::timer($default_timeout);
 my $res;
 
 # Name for the physical slot on primary
@@ -267,7 +272,8 @@ $node_standby->init_from_backup(
        has_streaming => 1,
        has_restoring => 1);
 $node_standby->append_conf('postgresql.conf',
-       qq[primary_slot_name = '$primary_slotname']);
+       qq[primary_slot_name = '$primary_slotname'
+       max_replication_slots = 5]);
 $node_standby->start;
 $node_primary->wait_for_replay_catchup($node_standby);
 $node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]);
@@ -285,6 +291,26 @@ $node_cascading_standby->append_conf('postgresql.conf',
 $node_cascading_standby->start;
 $node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
 
+#######################
+# Initialize subscriber node
+#######################
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+my %psql_subscriber = (
+       'subscriber_stdin'  => '',
+       'subscriber_stdout' => '',
+       'subscriber_stderr' => '');
+$psql_subscriber{run} = IPC::Run::start(
+       [ 'psql', '-XA', '-f', '-', '-d', $node_subscriber->connstr('postgres') ],
+       '<',
+       \$psql_subscriber{subscriber_stdin},
+       '>',
+       \$psql_subscriber{subscriber_stdout},
+       '2>',
+       \$psql_subscriber{subscriber_stderr},
+       $psql_timeout);
+
 ##################################################
 # Test that logical decoding on the standby
 # behaves correctly.
@@ -365,6 +391,67 @@ is( $node_primary->psql(
     3,
     'replaying logical slot from another database fails');
 
+##################################################
+# Test that we can subscribe on the standby with the publication
+# created on the primary.
+##################################################
+
+# Create a table on the primary
+$node_primary->safe_psql('postgres',
+       "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a table (same structure) on the subscriber node
+$node_subscriber->safe_psql('postgres',
+       "CREATE TABLE tab_rep (a int primary key)");
+
+# Create a publication on the primary
+$node_primary->safe_psql('postgres',
+       "CREATE PUBLICATION tap_pub for table tab_rep");
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Subscribe on the standby
+my $standby_connstr = $node_standby->connstr . ' dbname=postgres';
+
+# Not using safe_psql() here as it would wait for activity on the primary
+# and we wouldn't be able to launch pg_log_standby_snapshot() on the primary
+# while waiting.
+# psql_subscriber() allows to not wait synchronously.
+$psql_subscriber{subscriber_stdin} .=
+  qq[CREATE SUBSCRIPTION tap_sub
+     CONNECTION '$standby_connstr'
+     PUBLICATION tap_pub
+     WITH (copy_data = off);];
+$psql_subscriber{subscriber_stdin} .= "\n";
+
+$psql_subscriber{run}->pump_nb();
+
+# Speed up the subscription creation
+$node_primary->safe_psql('postgres', "SELECT pg_log_standby_snapshot()");
+
+# Explicitly shut down psql instance gracefully - to avoid hangs
+# or worse on windows
+$psql_subscriber{subscriber_stdin} .= "\\q\n";
+$psql_subscriber{run}->finish;
+
+$node_subscriber->wait_for_subscription_sync($node_standby, 'tap_sub');
+
+# Insert some rows on the primary
+$node_primary->safe_psql('postgres',
+       qq[INSERT INTO tab_rep select generate_series(1,10);]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_catchup('tap_sub');
+
+# Check that the subscriber can see the rows inserted in the primary
+$result =
+  $node_subscriber->safe_psql('postgres', "SELECT count(*) FROM tab_rep");
+is($result, qq(10), 'check replicated inserts after subscription on standby');
+
+# We do not need the subscription and the subscriber anymore
+$node_subscriber->safe_psql('postgres', "DROP SUBSCRIPTION tap_sub");
+$node_subscriber->stop;
+
 ##################################################
 # Recovery conflict: Invalidate conflicting slots, including in-use slots
 # Scenario 1: hot_standby_feedback off and vacuum FULL