TAP test for logical decoding on standby
authorAndres Freund <[email protected]>
Sat, 8 Apr 2023 09:24:50 +0000 (02:24 -0700)
committerAndres Freund <[email protected]>
Sat, 8 Apr 2023 09:24:50 +0000 (02:24 -0700)
Author: "Drouvot, Bertrand" <[email protected]>
Author: Amit Khandekar <[email protected]>
Author: Craig Ringer <[email protected]> (in an older version)
Author: Andres Freund <[email protected]>
Reviewed-by: "Drouvot, Bertrand" <[email protected]>
Reviewed-by: Andres Freund <[email protected]>
Reviewed-by: Robert Haas <[email protected]>
Reviewed-by: Amit Kapila <[email protected]>
Reviewed-by: Fabrízio de Royes Mello <[email protected]>
src/test/perl/PostgreSQL/Test/Cluster.pm
src/test/recovery/meson.build
src/test/recovery/t/035_standby_logical_decoding.pl [new file with mode: 0644]

index f3e36538d762c1226d978f745828eb153d908b80..6f7f4e5de4cb0f5148c7e31e2752208595b879a1 100644 (file)
@@ -3029,6 +3029,43 @@ $SIG{TERM} = $SIG{INT} = sub {
 
 =pod
 
+=item $node->create_logical_slot_on_standby(self, primary, slot_name, dbname)
+
+Create logical replication slot on given standby
+
+=cut
+
+sub create_logical_slot_on_standby
+{
+       my ($self, $primary, $slot_name, $dbname) = @_;
+       my ($stdout, $stderr);
+
+       my $handle;
+
+       $handle = IPC::Run::start(['pg_recvlogical', '-d', $self->connstr($dbname), '-P', 'test_decoding', '-S', $slot_name, '--create-slot'], '>', \$stdout, '2>', \$stderr);
+
+       # Once the slot's restart_lsn is determined, the standby looks for
+       # xl_running_xacts WAL record from the restart_lsn onwards. First wait
+       # until the slot restart_lsn is determined.
+
+       $self->poll_query_until(
+               'postgres', qq[
+               SELECT restart_lsn IS NOT NULL
+               FROM pg_catalog.pg_replication_slots WHERE slot_name = '$slot_name'
+       ]) or die "timed out waiting for logical slot to calculate its restart_lsn";
+
+       # Then arrange for the xl_running_xacts record for which pg_recvlogical is
+       # waiting.
+       $primary->safe_psql('postgres', 'SELECT pg_log_standby_snapshot()');
+
+       $handle->finish();
+
+       is($self->slot($slot_name)->{'slot_type'}, 'logical', $slot_name . ' on standby created')
+               or die "could not create slot" . $slot_name;
+}
+
+=pod
+
 =back
 
 =cut
index 59465b97f3f19dee9edb19f6326bb22cb76e5f19..e834ad5e0dce215943f6548ffbca6555637fce28 100644 (file)
@@ -40,6 +40,7 @@ tests += {
       't/032_relfilenode_reuse.pl',
       't/033_replay_tsp_drops.pl',
       't/034_create_database.pl',
+      't/035_standby_logical_decoding.pl',
     ],
   },
 }
diff --git a/src/test/recovery/t/035_standby_logical_decoding.pl b/src/test/recovery/t/035_standby_logical_decoding.pl
new file mode 100644 (file)
index 0000000..150290b
--- /dev/null
@@ -0,0 +1,734 @@
+# Copyright (c) 2023, PostgreSQL Global Development Group
+
+# logical decoding on standby : test logical decoding,
+# recovery conflict and standby promotion.
+
+use strict;
+use warnings;
+
+use PostgreSQL::Test::Cluster;
+use PostgreSQL::Test::Utils;
+use Test::More;
+
+my ($stdin, $stdout, $stderr, $cascading_stdout, $cascading_stderr, $ret, $handle, $slot);
+
+my $node_primary = PostgreSQL::Test::Cluster->new('primary');
+my $node_standby = PostgreSQL::Test::Cluster->new('standby');
+my $node_cascading_standby = PostgreSQL::Test::Cluster->new('cascading_standby');
+my $default_timeout = $PostgreSQL::Test::Utils::timeout_default;
+my $res;
+
+# Name for the physical slot on primary
+my $primary_slotname = 'primary_physical';
+my $standby_physical_slotname = 'standby_physical';
+
+# find $pat in logfile of $node after $off-th byte
+sub find_in_log
+{
+       my ($node, $pat, $off) = @_;
+
+       $off = 0 unless defined $off;
+       my $log = PostgreSQL::Test::Utils::slurp_file($node->logfile);
+       return 0 if (length($log) <= $off);
+
+       $log = substr($log, $off);
+
+       return $log =~ m/$pat/;
+}
+
+# Fetch xmin columns from slot's pg_replication_slots row, after waiting for
+# given boolean condition to be true to ensure we've reached a quiescent state.
+sub wait_for_xmins
+{
+       my ($node, $slotname, $check_expr) = @_;
+
+       $node->poll_query_until(
+               'postgres', qq[
+               SELECT $check_expr
+               FROM pg_catalog.pg_replication_slots
+               WHERE slot_name = '$slotname';
+       ]) or die "Timed out waiting for slot xmins to advance";
+}
+
+# Create the required logical slots on standby.
+sub create_logical_slots
+{
+       my ($node, $slot_prefix) = @_;
+
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+       $node->create_logical_slot_on_standby($node_primary, qq($inactive_slot), 'testdb');
+       $node->create_logical_slot_on_standby($node_primary, qq($active_slot), 'testdb');
+}
+
+# Drop the logical slots on standby.
+sub drop_logical_slots
+{
+       my ($slot_prefix) = @_;
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+       $node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$inactive_slot')]);
+       $node_standby->psql('postgres', qq[SELECT pg_drop_replication_slot('$active_slot')]);
+}
+
+# Acquire one of the standby logical slots created by create_logical_slots().
+# In case wait is true we are waiting for an active pid on the 'activeslot' slot.
+# If wait is not true it means we are testing a known failure scenario.
+sub make_slot_active
+{
+       my ($node, $slot_prefix, $wait, $to_stdout, $to_stderr) = @_;
+       my $slot_user_handle;
+
+       my $active_slot = $slot_prefix . 'activeslot';
+       $slot_user_handle = IPC::Run::start(['pg_recvlogical', '-d', $node->connstr('testdb'), '-S', qq($active_slot), '-o', 'include-xids=0', '-o', 'skip-empty-xacts=1', '--no-loop', '--start', '-f', '-'], '>', $to_stdout, '2>', $to_stderr);
+
+       if ($wait)
+       {
+               # make sure activeslot is in use
+               $node->poll_query_until('testdb',
+                       qq[SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = '$active_slot' AND active_pid IS NOT NULL)]
+               ) or die "slot never became active";
+       }
+       return $slot_user_handle;
+}
+
+# Check pg_recvlogical stderr
+sub check_pg_recvlogical_stderr
+{
+       my ($slot_user_handle, $check_stderr) = @_;
+       my $return;
+
+       # our client should've terminated in response to the walsender error
+       $slot_user_handle->finish;
+       $return = $?;
+       cmp_ok($return, "!=", 0, "pg_recvlogical exited non-zero");
+       if ($return) {
+               like($stderr, qr/$check_stderr/, 'slot has been invalidated');
+       }
+
+       return 0;
+}
+
+# Check if all the slots on standby are dropped. These include the 'activeslot'
+# that was acquired by make_slot_active(), and the non-active 'inactiveslot'.
+sub check_slots_dropped
+{
+       my ($slot_prefix, $slot_user_handle) = @_;
+
+       is($node_standby->slot($slot_prefix . 'inactiveslot')->{'slot_type'}, '', 'inactiveslot on standby dropped');
+       is($node_standby->slot($slot_prefix . 'activeslot')->{'slot_type'}, '', 'activeslot on standby dropped');
+
+       check_pg_recvlogical_stderr($slot_user_handle, "conflict with recovery");
+}
+
+# Change hot_standby_feedback and check xmin and catalog_xmin values.
+sub change_hot_standby_feedback_and_wait_for_xmins
+{
+       my ($hsf, $invalidated) = @_;
+
+       $node_standby->append_conf('postgresql.conf',qq[
+       hot_standby_feedback = $hsf
+       ]);
+
+       $node_standby->reload;
+
+       if ($hsf && $invalidated)
+       {
+               # With hot_standby_feedback on, xmin should advance,
+               # but catalog_xmin should still remain NULL since there is no logical slot.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NULL");
+       }
+       elsif ($hsf)
+       {
+               # With hot_standby_feedback on, xmin and catalog_xmin should advance.
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NOT NULL AND catalog_xmin IS NOT NULL");
+       }
+       else
+       {
+               # Both should be NULL since hs_feedback is off
+               wait_for_xmins($node_primary, $primary_slotname,
+                          "xmin IS NULL AND catalog_xmin IS NULL");
+
+       }
+}
+
+# Check conflicting status in pg_replication_slots.
+sub check_slots_conflicting_status
+{
+       my ($conflicting) = @_;
+
+       if ($conflicting)
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                                select bool_and(conflicting) from pg_replication_slots;));
+
+               is($res, 't',
+                       "Logical slots are reported as conflicting");
+       }
+       else
+       {
+               $res = $node_standby->safe_psql(
+                               'postgres', qq(
+                               select bool_or(conflicting) from pg_replication_slots;));
+
+               is($res, 'f',
+                       "Logical slots are reported as non conflicting");
+       }
+}
+
+# Drop the slots, re-create them, change hot_standby_feedback,
+# check xmin and catalog_xmin values, make slot active and reset stat.
+sub reactive_slots_change_hfs_and_wait_for_xmins
+{
+       my ($previous_slot_prefix, $slot_prefix, $hsf, $invalidated) = @_;
+
+       # drop the logical slots
+       drop_logical_slots($previous_slot_prefix);
+
+       # create the logical slots
+       create_logical_slots($node_standby, $slot_prefix);
+
+       change_hot_standby_feedback_and_wait_for_xmins($hsf, $invalidated);
+
+       $handle = make_slot_active($node_standby, $slot_prefix, 1, \$stdout, \$stderr);
+
+       # reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+       $node_standby->psql('testdb', q[select pg_stat_reset();]);
+}
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+sub check_for_invalidation
+{
+       my ($slot_prefix, $log_start, $test_name) = @_;
+
+       my $active_slot = $slot_prefix . 'activeslot';
+       my $inactive_slot = $slot_prefix . 'inactiveslot';
+
+       # message should be issued
+       ok( find_in_log(
+               $node_standby,
+               "invalidating obsolete replication slot \"$inactive_slot\"", $log_start),
+               "inactiveslot slot invalidation is logged $test_name");
+
+       ok( find_in_log(
+               $node_standby,
+               "invalidating obsolete replication slot \"$active_slot\"", $log_start),
+               "activeslot slot invalidation is logged $test_name");
+
+       # Verify that pg_stat_database_conflicts.confl_active_logicalslot has been updated
+       ok( $node_standby->poll_query_until(
+               'postgres',
+               "select (confl_active_logicalslot = 1) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+               'confl_active_logicalslot updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+}
+
+########################
+# Initialize primary node
+########################
+
+$node_primary->init(allows_streaming => 1, has_archiving => 1);
+$node_primary->append_conf('postgresql.conf', q{
+wal_level = 'logical'
+max_replication_slots = 4
+max_wal_senders = 4
+log_min_messages = 'debug2'
+log_error_verbosity = verbose
+});
+$node_primary->dump_info;
+$node_primary->start;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+
+$node_primary->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$primary_slotname');]);
+
+# Check conflicting is NULL for physical slot
+$res = $node_primary->safe_psql(
+               'postgres', qq[
+                SELECT conflicting is null FROM pg_replication_slots where slot_name = '$primary_slotname';]);
+
+is($res, 't',
+       "Physical slot reports conflicting as NULL");
+
+my $backup_name = 'b1';
+$node_primary->backup($backup_name);
+
+# Some tests need to wait for VACUUM to be replayed. But vacuum does not flush
+# WAL. An insert into flush_wal outside transaction does guarantee a flush.
+$node_primary->psql('testdb', q[CREATE TABLE flush_wal();]);
+
+#######################
+# Initialize standby node
+#######################
+
+$node_standby->init_from_backup(
+       $node_primary, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$primary_slotname']);
+$node_standby->start;
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->safe_psql('testdb', qq[SELECT * FROM pg_create_physical_replication_slot('$standby_physical_slotname');]);
+
+#######################
+# Initialize cascading standby node
+#######################
+$node_standby->backup($backup_name);
+$node_cascading_standby->init_from_backup(
+       $node_standby, $backup_name,
+       has_streaming => 1,
+       has_restoring => 1);
+$node_cascading_standby->append_conf('postgresql.conf',
+       qq[primary_slot_name = '$standby_physical_slotname']);
+$node_cascading_standby->start;
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+##################################################
+# Test that logical decoding on the standby
+# behaves correctly.
+##################################################
+
+# create the logical slots
+create_logical_slots($node_standby, 'behaves_ok_');
+
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,10) s;]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $result = $node_standby->safe_psql('testdb',
+       qq[SELECT pg_logical_slot_get_changes('behaves_ok_activeslot', NULL, NULL);]);
+
+# test if basic decoding works
+is(scalar(my @foobar = split /^/m, $result),
+       14, 'Decoding produced 14 rows (2 BEGIN/COMMIT and 10 rows)');
+
+# Insert some rows and verify that we get the same results from pg_recvlogical
+# and the SQL interface.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+my $expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT};
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+my $stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session');
+
+my $endpos = $node_standby->safe_psql('testdb',
+       "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+);
+
+# Insert some rows after $endpos, which we won't read.
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,50) s;]
+);
+
+$node_primary->wait_for_catchup($node_standby);
+
+my $stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+$node_standby->poll_query_until('testdb',
+       "SELECT EXISTS (SELECT 1 FROM pg_replication_slots WHERE slot_name = 'behaves_ok_activeslot' AND active_pid IS NULL)"
+) or die "slot never became inactive";
+
+$stdout_recv = $node_standby->pg_recvlogical_upto(
+    'testdb', 'behaves_ok_activeslot', $endpos, $default_timeout,
+    'include-xids'     => '0',
+    'skip-empty-xacts' => '1');
+chomp($stdout_recv);
+is($stdout_recv, '', 'pg_recvlogical acknowledged changes');
+
+$node_primary->safe_psql('postgres', 'CREATE DATABASE otherdb');
+
+is( $node_primary->psql(
+        'otherdb',
+        "SELECT lsn FROM pg_logical_slot_peek_changes('behaves_ok_activeslot', NULL, NULL) ORDER BY lsn DESC LIMIT 1;"
+    ),
+    3,
+    'replaying logical slot from another database fails');
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 1: hot_standby_feedback off and vacuum FULL
+##################################################
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum full on pg_class with hot_standby_feedback turned off on
+# the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('behaves_ok_', 'vacuum_full_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM full pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('vacuum_full_', 1, 'with vacuum FULL on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'vacuum_full_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"vacuum_full_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1,1);
+
+##################################################
+# Verify that invalidated logical slots stay invalidated across a restart.
+##################################################
+$node_standby->restart;
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+##################################################
+# Verify that invalidated logical slots do not lead to retaining WAL
+##################################################
+# XXXXX TODO
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 2: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+my $logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('vacuum_full_', 'row_removal_', 0, 1);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  DROP TABLE conflict_test;
+  VACUUM pg_class;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('row_removal_', $logstart, 'with vacuum on pg_class');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a shared catalog table
+# Scenario 3: conflict due to row removal with hot_standby_feedback off.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to create/drop a relation and
+# launch a vacuum on pg_class with hot_standby_feedback turned off on the standby.
+reactive_slots_change_hfs_and_wait_for_xmins('row_removal_', 'shared_row_removal_', 0, 1);
+
+# Trigger the conflict
+diag $node_primary->safe_psql('testdb', qq[
+  CREATE ROLE create_trash;
+  DROP ROLE create_trash;
+  VACUUM pg_authid;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('shared_row_removal_', $logstart, 'with vacuum on pg_authid');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'shared_row_removal_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"shared_row_removal_activeslot\"");
+
+##################################################
+# Recovery conflict: Same as Scenario 2 but on a non catalog table
+# Scenario 4: No conflict expected.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+reactive_slots_change_hfs_and_wait_for_xmins('shared_row_removal_', 'no_conflict_', 0, 1);
+
+# This should not trigger a conflict
+$node_primary->safe_psql('testdb', qq[
+  CREATE TABLE conflict_test(x integer, y text);
+  INSERT INTO conflict_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;
+  UPDATE conflict_test set x=1, y=1;
+  VACUUM conflict_test;
+  INSERT INTO flush_wal DEFAULT VALUES; -- see create table flush_wal
+]);
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# message should not be issued
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_inactiveslot\"", $logstart),
+  'inactiveslot slot invalidation is not logged with vacuum on conflict_test');
+
+ok( !find_in_log(
+   $node_standby,
+  "invalidating obsolete slot \"no_conflict_activeslot\"", $logstart),
+  'activeslot slot invalidation is not logged with vacuum on conflict_test');
+
+# Verify that pg_stat_database_conflicts.confl_active_logicalslot has not been updated
+ok( $node_standby->poll_query_until(
+       'postgres',
+       "select (confl_active_logicalslot = 0) from pg_stat_database_conflicts where datname = 'testdb'", 't'),
+       'confl_active_logicalslot not updated') or die "Timed out waiting confl_active_logicalslot to be updated";
+
+# Verify slots are reported as non conflicting in pg_replication_slots
+check_slots_conflicting_status(0);
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 0);
+
+# Restart the standby node to ensure no slots are still active
+$node_standby->restart;
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 4: conflict due to on-access pruning.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# One way to produce recovery conflict is to trigger an on-access pruning
+# on a relation marked as user_catalog_table.
+reactive_slots_change_hfs_and_wait_for_xmins('no_conflict_', 'pruning_', 0, 0);
+
+# This should trigger the conflict
+$node_primary->safe_psql('testdb', qq[CREATE TABLE prun(id integer, s char(2000)) WITH (fillfactor = 75, user_catalog_table = true);]);
+$node_primary->safe_psql('testdb', qq[INSERT INTO prun VALUES (1, 'A');]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'B';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'C';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'D';]);
+$node_primary->safe_psql('testdb', qq[UPDATE prun SET s = 'E';]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('pruning_', $logstart, 'with on-access pruning');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'pruning_', 0, \$stdout, \$stderr);
+
+# We are not able to read from the slot as it has been invalidated
+check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"pruning_activeslot\"");
+
+# Turn hot_standby_feedback back on
+change_hot_standby_feedback_and_wait_for_xmins(1, 1);
+
+##################################################
+# Recovery conflict: Invalidate conflicting slots, including in-use slots
+# Scenario 5: incorrect wal_level on primary.
+##################################################
+
+# get the position to search from in the standby logfile
+$logstart = -s $node_standby->logfile;
+
+# drop the logical slots
+drop_logical_slots('pruning_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'wal_level_');
+
+$handle = make_slot_active($node_standby, 'wal_level_', 1, \$stdout, \$stderr);
+
+# reset stat: easier to check for confl_active_logicalslot in pg_stat_database_conflicts
+$node_standby->psql('testdb', q[select pg_stat_reset();]);
+
+# Make primary wal_level replica. This will trigger slot conflict.
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'replica'
+]);
+$node_primary->restart;
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+# Check invalidation in the logfile and in pg_stat_database_conflicts
+check_for_invalidation('wal_level_', $logstart, 'due to wal_level');
+
+# Verify slots are reported as conflicting in pg_replication_slots
+check_slots_conflicting_status(1);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# We are not able to read from the slot as it requires wal_level at least logical on the primary server
+check_pg_recvlogical_stderr($handle, "logical decoding on a standby requires wal_level to be at least logical on the primary");
+
+# Restore primary wal_level
+$node_primary->append_conf('postgresql.conf',q[
+wal_level = 'logical'
+]);
+$node_primary->restart;
+$node_primary->wait_for_replay_catchup($node_standby);
+
+$handle = make_slot_active($node_standby, 'wal_level_', 0, \$stdout, \$stderr);
+# as the slot has been invalidated we should not be able to read
+check_pg_recvlogical_stderr($handle, "can no longer get changes from replication slot \"wal_level_activeslot\"");
+
+##################################################
+# DROP DATABASE should drops it's slots, including active slots.
+##################################################
+
+# drop the logical slots
+drop_logical_slots('wal_level_');
+
+# create the logical slots
+create_logical_slots($node_standby, 'drop_db_');
+
+$handle = make_slot_active($node_standby, 'drop_db_', 1, \$stdout, \$stderr);
+
+# Create a slot on a database that would not be dropped. This slot should not
+# get dropped.
+$node_standby->create_logical_slot_on_standby($node_primary, 'otherslot', 'postgres');
+
+# dropdb on the primary to verify slots are dropped on standby
+$node_primary->safe_psql('postgres', q[DROP DATABASE testdb]);
+
+$node_primary->wait_for_replay_catchup($node_standby);
+
+is($node_standby->safe_psql('postgres',
+       q[SELECT EXISTS(SELECT 1 FROM pg_database WHERE datname = 'testdb')]), 'f',
+       'database dropped on standby');
+
+check_slots_dropped('drop_db', $handle);
+
+is($node_standby->slot('otherslot')->{'slot_type'}, 'logical',
+       'otherslot on standby not dropped');
+
+# Cleanup : manually drop the slot that was not dropped.
+$node_standby->psql('postgres', q[SELECT pg_drop_replication_slot('otherslot')]);
+
+##################################################
+# Test standby promotion and logical decoding behavior
+# after the standby gets promoted.
+##################################################
+
+$node_standby->reload;
+
+$node_primary->psql('postgres', q[CREATE DATABASE testdb]);
+$node_primary->safe_psql('testdb', qq[CREATE TABLE decoding_test(x integer, y text);]);
+
+# create the logical slots
+create_logical_slots($node_standby, 'promotion_');
+
+# create the logical slots on the cascading standby too
+create_logical_slots($node_cascading_standby, 'promotion_');
+
+# Make slots actives
+$handle = make_slot_active($node_standby, 'promotion_', 1, \$stdout, \$stderr);
+my $cascading_handle = make_slot_active($node_cascading_standby, 'promotion_', 1, \$cascading_stdout, \$cascading_stderr);
+
+# Insert some rows before the promotion
+$node_primary->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(1,4) s;]
+);
+
+# Wait for both standbys to catchup
+$node_primary->wait_for_replay_catchup($node_standby);
+$node_standby->wait_for_replay_catchup($node_cascading_standby, $node_primary);
+
+# promote
+$node_standby->promote;
+
+# insert some rows on promoted standby
+$node_standby->safe_psql('testdb',
+       qq[INSERT INTO decoding_test(x,y) SELECT s, s::text FROM generate_series(5,7) s;]
+);
+
+# Wait for the cascading standby to catchup
+$node_standby->wait_for_replay_catchup($node_cascading_standby);
+
+$expected = q{BEGIN
+table public.decoding_test: INSERT: x[integer]:1 y[text]:'1'
+table public.decoding_test: INSERT: x[integer]:2 y[text]:'2'
+table public.decoding_test: INSERT: x[integer]:3 y[text]:'3'
+table public.decoding_test: INSERT: x[integer]:4 y[text]:'4'
+COMMIT
+BEGIN
+table public.decoding_test: INSERT: x[integer]:5 y[text]:'5'
+table public.decoding_test: INSERT: x[integer]:6 y[text]:'6'
+table public.decoding_test: INSERT: x[integer]:7 y[text]:'7'
+COMMIT};
+
+# check that we are decoding pre and post promotion inserted rows
+$stdout_sql = $node_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on promoted standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion
+my $pump_timeout = IPC::Run::timer($PostgreSQL::Test::Utils::timeout_default);
+
+ok( pump_until(
+        $handle, $pump_timeout, \$stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($stdout);
+is($stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session');
+
+# check that we are decoding pre and post promotion inserted rows on the cascading standby
+$stdout_sql = $node_cascading_standby->safe_psql('testdb',
+       qq[SELECT data FROM pg_logical_slot_peek_changes('promotion_inactiveslot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1');]
+);
+
+is($stdout_sql, $expected, 'got expected output from SQL decoding session on cascading standby');
+
+# check that we are decoding pre and post promotion inserted rows
+# with pg_recvlogical that has started before the promotion on the cascading standby
+ok( pump_until(
+        $cascading_handle, $pump_timeout, \$cascading_stdout, qr/^.*COMMIT.*COMMIT$/s),
+    'got 2 COMMIT from pg_recvlogical output');
+
+chomp($cascading_stdout);
+is($cascading_stdout, $expected,
+    'got same expected output from pg_recvlogical decoding session on cascading standby');
+
+done_testing();