$node_publisher->safe_psql('postgres',
"CREATE PUBLICATION tpub FOR ALL TABLES");
+# ------------------------------------------------------
+# Ensure binary mode also executes COPY in binary format
+# ------------------------------------------------------
+
+# Insert some content before creating a subscription
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_numerical (a, b, c, d) VALUES
+ (1, 1.2, 1.3, 10),
+ (2, 2.2, 2.3, 20);
+ INSERT INTO public.test_arrays (a, b, c) VALUES
+ ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
+ ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+ ));
+
my $publisher_connstring = $node_publisher->connstr . ' dbname=postgres';
$node_subscriber->safe_psql('postgres',
"CREATE SUBSCRIPTION tsub CONNECTION '$publisher_connstring' "
. "PUBLICATION tpub WITH (slot_name = tpub_slot, binary = true)");
+# Ensure the COPY command is executed in binary format on the publisher
+$node_publisher->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT WITH \(FORMAT binary\)/
+);
+
# Ensure nodes are in sync with each other
$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+my $sync_check = qq(
+ SELECT a, b, c, d FROM test_numerical ORDER BY a;
+ SELECT a, b, c FROM test_arrays ORDER BY a;
+);
+
+# Check the synced data on the subscriber
+my $result = $node_subscriber->safe_psql('postgres', $sync_check);
+
+is( $result, '1|1.2|1.3|10
+2|2.2|2.3|20
+{1,2,3}|{1.1,1.2,1.3}|{one,two,three}
+{3,1,2}|{1.3,1.1,1.2}|{three,one,two}', 'check synced data on subscriber');
+
+# ----------------------------------
+# Ensure apply works in binary mode
+# ----------------------------------
+
# Insert some content and make sure it's replicated across
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_arrays (a, b, c) VALUES
- ('{1,2,3}', '{1.1, 1.2, 1.3}', '{"one", "two", "three"}'),
- ('{3,1,2}', '{1.3, 1.1, 1.2}', '{"three", "one", "two"}');
+ ('{2,1,3}', '{1.2, 1.1, 1.3}', '{"two", "one", "three"}'),
+ ('{1,3,2}', '{1.1, 1.3, 1.2}', '{"one", "three", "two"}');
INSERT INTO public.test_numerical (a, b, c, d) VALUES
- (1, 1.2, 1.3, 10),
- (2, 2.2, 2.3, 20),
- (3, 3.2, 3.3, 30);
+ (3, 3.2, 3.3, 30),
+ (4, 4.2, 4.3, 40);
));
$node_publisher->wait_for_catchup('tsub');
-my $result = $node_subscriber->safe_psql('postgres',
+$result = $node_subscriber->safe_psql('postgres',
"SELECT a, b, c, d FROM test_numerical ORDER BY a");
is( $result, '1|1.2|1.3|10
2|2.2|2.3|20
-3|3.2|3.3|30', 'check replicated data on subscriber');
+3|3.2|3.3|30
+4|4.2|4.3|40', 'check replicated data on subscriber');
# Test updates as well
$node_publisher->safe_psql(
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
+{1,3,2}|{42,1.3,1.2}|
+{2,1,3}|{42,1.1,1.3}|
{3,1,2}|{42,1.1,1.2}|', 'check updated replicated data on subscriber');
$result = $node_subscriber->safe_psql('postgres',
is( $result, '1|42||10
2|42||20
-3|42||30', 'check updated replicated data on subscriber');
+3|42||30
+4|42||40', 'check updated replicated data on subscriber');
+
+# ------------------------------------------------------------------------------
+# Use ALTER SUBSCRIPTION to change to text format and then back to binary format
+# ------------------------------------------------------------------------------
# Test to reset back to text formatting, and then to binary again
$node_subscriber->safe_psql('postgres',
$node_publisher->safe_psql(
'postgres', qq(
INSERT INTO public.test_numerical (a, b, c, d) VALUES
- (4, 4.2, 4.3, 40);
+ (5, 5.2, 5.3, 50);
));
$node_publisher->wait_for_catchup('tsub');
is( $result, '1|42||10
2|42||20
3|42||30
-4|4.2|4.3|40', 'check replicated data on subscriber');
+4|42||40
+5|5.2|5.3|50', 'check replicated data on subscriber');
$node_subscriber->safe_psql('postgres',
"ALTER SUBSCRIPTION tsub SET (binary = true);");
"SELECT a, b, c FROM test_arrays ORDER BY a");
is( $result, '{1,2,3}|{42,1.2,1.3}|
+{1,3,2}|{42,1.3,1.2}|
+{2,1,3}|{42,1.1,1.3}|
{2,3,1}|{1.2,1.3,1.1}|{two,three,one}
{3,1,2}|{42,1.1,1.2}|', 'check replicated data on subscriber');
+# ---------------------------------------------------------------
+# Test binary replication without and with send/receive functions
+# ---------------------------------------------------------------
+
+# Create a custom type without send/rcv functions
+$ddl = qq(
+ CREATE TYPE myvarchar;
+ CREATE FUNCTION myvarcharin(cstring, oid, integer) RETURNS myvarchar
+ LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharin';
+ CREATE FUNCTION myvarcharout(myvarchar) RETURNS cstring
+ LANGUAGE internal IMMUTABLE PARALLEL SAFE STRICT AS 'varcharout';
+ CREATE TYPE myvarchar (
+ input = myvarcharin,
+ output = myvarcharout);
+ CREATE TABLE public.test_myvarchar (
+ a myvarchar
+ ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Insert some initial data
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ INSERT INTO public.test_myvarchar (a) VALUES
+ ('a');
+ ));
+
+# Check the subscriber log from now on.
+my $offset = -s $node_subscriber->logfile;
+
+# Refresh the publication to trigger the tablesync
+$node_subscriber->safe_psql('postgres',
+ "ALTER SUBSCRIPTION tsub REFRESH PUBLICATION");
+
+# It should fail
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? no binary input function available for type/,
+ $offset);
+
+# Create and set send/rcv functions for the custom type
+$ddl = qq(
+ CREATE FUNCTION myvarcharsend(myvarchar) RETURNS bytea
+ LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharsend';
+ CREATE FUNCTION myvarcharrecv(internal, oid, integer) RETURNS myvarchar
+ LANGUAGE internal STABLE PARALLEL SAFE STRICT AS 'varcharrecv';
+ ALTER TYPE myvarchar SET (
+ send = myvarcharsend,
+ receive = myvarcharrecv
+ ););
+
+$node_publisher->safe_psql('postgres', $ddl);
+$node_subscriber->safe_psql('postgres', $ddl);
+
+# Now tablesync should succeed
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+# Check the synced data on the subscriber
+$result =
+ $node_subscriber->safe_psql('postgres', 'SELECT a FROM test_myvarchar;');
+
+is($result, 'a', 'check synced data on subscriber with custom type');
+
+# -----------------------------------------------------
+# Test mismatched column types with/without binary mode
+# -----------------------------------------------------
+
+# Test syncing tables with mismatching column types
+$node_publisher->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE public.test_mismatching_types (
+ a bigint PRIMARY KEY
+ );
+ INSERT INTO public.test_mismatching_types (a)
+ VALUES (1), (2);
+ ));
+
+# Check the subscriber log from now on.
+$offset = -s $node_subscriber->logfile;
+
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ CREATE TABLE public.test_mismatching_types (
+ a int PRIMARY KEY
+ );
+ ALTER SUBSCRIPTION tsub REFRESH PUBLICATION;
+ ));
+
+# Cannot sync due to type mismatch
+$node_subscriber->wait_for_log(
+ qr/ERROR: ( [A-Z0-9]+:)? incorrect binary data format/, $offset);
+
+# Check the publisher log from now on.
+$offset = -s $node_publisher->logfile;
+
+# Setting binary to false should allow syncing
+$node_subscriber->safe_psql(
+ 'postgres', qq(
+ ALTER SUBSCRIPTION tsub SET (binary = false);));
+
+# Ensure the COPY command is executed in text format on the publisher
+$node_publisher->wait_for_log(
+ qr/LOG: ( [A-Z0-9]+:)? statement: COPY (.+)? TO STDOUT\n/, $offset);
+
+$node_subscriber->wait_for_subscription_sync($node_publisher, 'tsub');
+
+# Check the synced data on the subscriber
+$result = $node_subscriber->safe_psql('postgres',
+ 'SELECT a FROM test_mismatching_types ORDER BY a;');
+
+is( $result, '1
+2', 'check synced data on subscriber with binary = false');
+
$node_subscriber->stop('fast');
$node_publisher->stop('fast');