Allow pgoutput to send logical decoding messages.
authorAmit Kapila <[email protected]>
Tue, 6 Apr 2021 03:10:47 +0000 (08:40 +0530)
committerAmit Kapila <[email protected]>
Tue, 6 Apr 2021 03:10:47 +0000 (08:40 +0530)
The output plugin accepts a new parameter (messages) that controls if
logical decoding messages are written into the replication stream. It is
useful for those clients that use pgoutput as an output plugin and needs
to process messages that were written by pg_logical_emit_message().

Although logical streaming replication protocol supports logical
decoding messages now, logical replication does not use this feature yet.

Author: David Pirotte, Euler Taveira
Reviewed-by: Euler Taveira, Andres Freund, Ashutosh Bapat, Amit Kapila
Discussion: https://postgr.es/m/CADK3HHJ-+9SO7KuRLH=9Wa1rAo60Yreq1GFNkH_kd0=CdaWM+A@mail.gmail.com

doc/src/sgml/protocol.sgml
src/backend/replication/logical/proto.c
src/backend/replication/logical/worker.c
src/backend/replication/pgoutput/pgoutput.c
src/include/replication/logicalproto.h
src/include/replication/pgoutput.h
src/test/subscription/t/020_messages.pl [new file with mode: 0644]

index 43092fe62a61f076bd54e40670be671a7d413b22..380be5fb17c1db0f8d3512189c8491ca3326b1df 100644 (file)
@@ -6433,6 +6433,82 @@ Begin
 </listitem>
 </varlistentry>
 
+<varlistentry>
+<term>
+Message
+</term>
+<listitem>
+<para>
+
+<variablelist>
+<varlistentry>
+<term>
+        Byte1('M')
+</term>
+<listitem>
+<para>
+                Identifies the message as a logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int32
+</term>
+<listitem>
+<para>
+                Xid of the transaction. The XID is sent only when user has
+                requested streaming of in-progress transactions.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int8
+</term>
+<listitem>
+<para>
+                Flags; Either 0 for no flags or 1 if the logical decoding
+                message is transactional.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Int64
+</term>
+<listitem>
+<para>
+                The LSN of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        String
+</term>
+<listitem>
+<para>
+                The prefix of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+<varlistentry>
+<term>
+        Byte<replaceable>n</replaceable>
+</term>
+<listitem>
+<para>
+                The content of the logical decoding message.
+</para>
+</listitem>
+</varlistentry>
+
+</variablelist>
+</para>
+</listitem>
+</varlistentry>
+
 <varlistentry>
 <term>
 Commit
index f2c85cabb52337a25f6ae5672ae3e138308fa5d9..2a1f9830e05d3e1a9860549d0274d84ff2d3b05e 100644 (file)
@@ -25,6 +25,7 @@
  */
 #define LOGICALREP_IS_REPLICA_IDENTITY 1
 
+#define MESSAGE_TRANSACTIONAL (1<<0)
 #define TRUNCATE_CASCADE       (1<<0)
 #define TRUNCATE_RESTART_SEQS  (1<<1)
 
@@ -361,6 +362,33 @@ logicalrep_read_truncate(StringInfo in,
    return relids;
 }
 
+/*
+ * Write MESSAGE to stream
+ */
+void
+logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+                        bool transactional, const char *prefix, Size sz,
+                        const char *message)
+{
+   uint8       flags = 0;
+
+   pq_sendbyte(out, LOGICAL_REP_MSG_MESSAGE);
+
+   /* encode and send message flags */
+   if (transactional)
+       flags |= MESSAGE_TRANSACTIONAL;
+
+   /* transaction ID (if not valid, we're not streaming) */
+   if (TransactionIdIsValid(xid))
+       pq_sendint32(out, xid);
+
+   pq_sendint8(out, flags);
+   pq_sendint64(out, lsn);
+   pq_sendstring(out, prefix);
+   pq_sendint32(out, sz);
+   pq_sendbytes(out, message, sz);
+}
+
 /*
  * Write relation description to the output stream.
  */
index 354fbe4b4bc82e1b975f04422687281db88e12da..74d538b5e379a4ec99cce21a59ecbf015d25d870 100644 (file)
@@ -1939,6 +1939,15 @@ apply_dispatch(StringInfo s)
            apply_handle_origin(s);
            return;
 
+       case LOGICAL_REP_MSG_MESSAGE:
+
+           /*
+            * Logical replication does not use generic logical messages yet.
+            * Although, it could be used by other applications that use this
+            * output plugin.
+            */
+           return;
+
        case LOGICAL_REP_MSG_STREAM_START:
            apply_handle_stream_start(s);
            return;
index 6146c5acdb30ef7f3ad8c77921ba601156b6cdfc..f68348dcf45975f305b2a7bcbf7c95f01732ee08 100644 (file)
@@ -45,6 +45,10 @@ static void pgoutput_change(LogicalDecodingContext *ctx,
 static void pgoutput_truncate(LogicalDecodingContext *ctx,
                              ReorderBufferTXN *txn, int nrelations, Relation relations[],
                              ReorderBufferChange *change);
+static void pgoutput_message(LogicalDecodingContext *ctx,
+                            ReorderBufferTXN *txn, XLogRecPtr message_lsn,
+                            bool transactional, const char *prefix,
+                            Size sz, const char *message);
 static bool pgoutput_origin_filter(LogicalDecodingContext *ctx,
                                   RepOriginId origin_id);
 static void pgoutput_stream_start(struct LogicalDecodingContext *ctx,
@@ -142,6 +146,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->begin_cb = pgoutput_begin_txn;
    cb->change_cb = pgoutput_change;
    cb->truncate_cb = pgoutput_truncate;
+   cb->message_cb = pgoutput_message;
    cb->commit_cb = pgoutput_commit_txn;
    cb->filter_by_origin_cb = pgoutput_origin_filter;
    cb->shutdown_cb = pgoutput_shutdown;
@@ -152,6 +157,7 @@ _PG_output_plugin_init(OutputPluginCallbacks *cb)
    cb->stream_abort_cb = pgoutput_stream_abort;
    cb->stream_commit_cb = pgoutput_stream_commit;
    cb->stream_change_cb = pgoutput_change;
+   cb->stream_message_cb = pgoutput_message;
    cb->stream_truncate_cb = pgoutput_truncate;
 }
 
@@ -162,10 +168,12 @@ parse_output_parameters(List *options, PGOutputData *data)
    bool        protocol_version_given = false;
    bool        publication_names_given = false;
    bool        binary_option_given = false;
+   bool        messages_option_given = false;
    bool        streaming_given = false;
 
    data->binary = false;
    data->streaming = false;
+   data->messages = false;
 
    foreach(lc, options)
    {
@@ -221,6 +229,16 @@ parse_output_parameters(List *options, PGOutputData *data)
 
            data->binary = defGetBoolean(defel);
        }
+       else if (strcmp(defel->defname, "messages") == 0)
+       {
+           if (messages_option_given)
+               ereport(ERROR,
+                       (errcode(ERRCODE_SYNTAX_ERROR),
+                        errmsg("conflicting or redundant options")));
+           messages_option_given = true;
+
+           data->messages = defGetBoolean(defel);
+       }
        else if (strcmp(defel->defname, "streaming") == 0)
        {
            if (streaming_given)
@@ -689,6 +707,35 @@ pgoutput_truncate(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
    MemoryContextReset(data->context);
 }
 
+static void
+pgoutput_message(LogicalDecodingContext *ctx, ReorderBufferTXN *txn,
+                XLogRecPtr message_lsn, bool transactional, const char *prefix, Size sz,
+                const char *message)
+{
+   PGOutputData *data = (PGOutputData *) ctx->output_plugin_private;
+   TransactionId xid = InvalidTransactionId;
+
+   if (!data->messages)
+       return;
+
+   /*
+    * Remember the xid for the message in streaming mode. See
+    * pgoutput_change.
+    */
+   if (in_streaming)
+       xid = txn->xid;
+
+   OutputPluginPrepareWrite(ctx, true);
+   logicalrep_write_message(ctx->out,
+                            xid,
+                            message_lsn,
+                            transactional,
+                            prefix,
+                            sz,
+                            message);
+   OutputPluginWrite(ctx, true);
+}
+
 /*
  * Currently we always forward.
  */
index fa4c37277b134e9f18c63b2a776e0a6bc62a98ea..55b90c03eacec107b6c94332f9b85140243d4f75 100644 (file)
@@ -54,6 +54,7 @@ typedef enum LogicalRepMsgType
    LOGICAL_REP_MSG_TRUNCATE = 'T',
    LOGICAL_REP_MSG_RELATION = 'R',
    LOGICAL_REP_MSG_TYPE = 'Y',
+   LOGICAL_REP_MSG_MESSAGE = 'M',
    LOGICAL_REP_MSG_STREAM_START = 'S',
    LOGICAL_REP_MSG_STREAM_END = 'E',
    LOGICAL_REP_MSG_STREAM_COMMIT = 'c',
@@ -151,6 +152,8 @@ extern void logicalrep_write_truncate(StringInfo out, TransactionId xid,
                                      bool cascade, bool restart_seqs);
 extern List *logicalrep_read_truncate(StringInfo in,
                                      bool *cascade, bool *restart_seqs);
+extern void logicalrep_write_message(StringInfo out, TransactionId xid, XLogRecPtr lsn,
+                                    bool transactional, const char *prefix, Size sz, const char *message);
 extern void logicalrep_write_rel(StringInfo out, TransactionId xid,
                                 Relation rel);
 extern LogicalRepRelation *logicalrep_read_rel(StringInfo in);
index bb383d523eefc9150c0e14dda8a17ed399b3a4f5..51e7c0348da3c0f3d26bed3162af378dbee1b9d3 100644 (file)
@@ -26,6 +26,7 @@ typedef struct PGOutputData
    List       *publications;
    bool        binary;
    bool        streaming;
+   bool        messages;
 } PGOutputData;
 
 #endif                         /* PGOUTPUT_H */
diff --git a/src/test/subscription/t/020_messages.pl b/src/test/subscription/t/020_messages.pl
new file mode 100644 (file)
index 0000000..d9123ed
--- /dev/null
@@ -0,0 +1,148 @@
+# Tests that logical decoding messages
+use strict;
+use warnings;
+use PostgresNode;
+use TestLib;
+use Test::More tests => 5;
+
+# Create publisher node
+my $node_publisher = get_new_node('publisher');
+$node_publisher->init(allows_streaming => 'logical');
+$node_publisher->start;
+
+# Create subscriber node
+my $node_subscriber = get_new_node('subscriber');
+$node_subscriber->init(allows_streaming => 'logical');
+$node_subscriber->start;
+
+# Create some preexisting content on publisher
+$node_publisher->safe_psql('postgres',
+   "CREATE TABLE tab_test (a int primary key)");
+
+# Setup structure on subscriber
+$node_subscriber->safe_psql('postgres',
+   "CREATE TABLE tab_test (a int primary key)");
+
+# Setup logical replication
+my $publisher_connstr = $node_publisher->connstr . ' dbname=postgres';
+$node_publisher->safe_psql('postgres', "CREATE PUBLICATION tap_pub FOR TABLE tab_test");
+
+$node_subscriber->safe_psql('postgres',
+   "CREATE SUBSCRIPTION tap_sub CONNECTION '$publisher_connstr' PUBLICATION tap_pub"
+);
+
+# Ensure a transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres',
+   "SELECT pg_logical_emit_message(true, 'pgoutput', 'a transactional message')"
+);
+
+my $result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0)
+       FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub',
+           'messages', 'true')
+));
+
+# 66 77 67 == B M C == BEGIN MESSAGE COMMIT
+is($result, qq(66
+77
+67),
+   'messages on slot are B M C with message option');
+
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 1), encode(substr(data, 11, 8), 'escape')
+       FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub',
+           'messages', 'true')
+       OFFSET 1 LIMIT 1
+));
+
+is($result, qq(1|pgoutput),
+   "flag transactional is set to 1 and prefix is pgoutput");
+
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0)
+       FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub')
+));
+
+# 66 67 == B C == BEGIN COMMIT
+is($result, qq(66
+67),
+   'option messages defaults to false so message (M) is not available on slot');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+# ensure a non-transactional logical decoding message shows up on the slot
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (1)");
+
+my $message_lsn = $node_publisher->safe_psql('postgres',
+   "SELECT pg_logical_emit_message(false, 'pgoutput', 'a non-transactional message')");
+
+$node_publisher->safe_psql('postgres', "INSERT INTO tab_test VALUES (2)");
+
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0), get_byte(data, 1)
+       FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub',
+           'messages', 'true')
+       WHERE lsn = '$message_lsn' AND xid = 0
+));
+
+is($result, qq(77|0), 'non-transactional message on slot is M');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub ENABLE");
+$node_publisher->wait_for_catchup('tap_sub');
+
+$node_subscriber->safe_psql('postgres', "ALTER SUBSCRIPTION tap_sub DISABLE");
+
+# wait for the replication connection to drop from the publisher
+$node_publisher->poll_query_until('postgres',
+   'SELECT COUNT(*) FROM pg_catalog.pg_stat_replication', 0);
+
+# Ensure a non-transactional logical decoding message shows up on the slot when
+# it is emitted within an aborted transaction. The message won't emit until
+# something advances the LSN, hence, we intentionally forces the server to
+# switch to a new WAL file.
+$node_publisher->safe_psql(
+   'postgres', qq(
+       BEGIN;
+       SELECT pg_logical_emit_message(false, 'pgoutput',
+           'a non-transactional message is available even if the transaction is aborted 1');
+       INSERT INTO tab_test VALUES (3);
+       SELECT pg_logical_emit_message(true, 'pgoutput',
+           'a transactional message is not available if the transaction is aborted');
+       SELECT pg_logical_emit_message(false, 'pgoutput',
+           'a non-transactional message is available even if the transaction is aborted 2');
+       ROLLBACK;
+       SELECT pg_switch_wal();
+));
+
+$result = $node_publisher->safe_psql(
+   'postgres', qq(
+       SELECT get_byte(data, 0), get_byte(data, 1)
+       FROM pg_logical_slot_peek_binary_changes('tap_sub', NULL, NULL,
+           'proto_version', '1',
+           'publication_names', 'tap_pub',
+           'messages', 'true')
+));
+
+is($result, qq(77|0
+77|0),
+   'non-transactional message on slot from aborted transaction is M');
+
+$node_subscriber->stop('fast');
+$node_publisher->stop('fast');