Add BEGIN/COMMIT for transactional messages during decoding.
authorAmit Kapila <[email protected]>
Tue, 11 Jul 2023 03:01:11 +0000 (08:31 +0530)
committerAmit Kapila <[email protected]>
Tue, 11 Jul 2023 03:01:11 +0000 (08:31 +0530)
In test_decoding module, when skip_empty_xacts option was specified, add
BEGIN/COMMIT for transactional messages. This makes the handling of
transactional messages consistent irrespective of whether skip_empty_xacts
option was specified.

We decided not to backpatch this change because skip_empty_xacts is
primarily used to have consistent test results across different runs and
this change won't help with that.

Author: Vignesh C
Reviewed-by: Ashutosh Bapat, Hou Zhijie
Discussion: https://postgr.es/m/CAExHW5ujRhbOz6_aTq_jQA8NjeFqq9d_8G9viShWvXx8gdSXiQ@mail.gmail.com

contrib/test_decoding/expected/messages.out
contrib/test_decoding/sql/messages.sql
contrib/test_decoding/test_decoding.c

index c75d40190b6e990757babeefc8e4d6450b2bb0f2..0fd70036bd5fc2cc6ba67fdfe3a019bbf5bdc77c 100644 (file)
@@ -58,17 +58,23 @@ SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
  ignorethis
 (1 row)
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
                                 data                                
 --------------------------------------------------------------------
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg1
+ COMMIT
  message: transactional: 0 prefix: test, sz: 4 content:msg2
  message: transactional: 0 prefix: test, sz: 4 content:msg4
  message: transactional: 0 prefix: test, sz: 4 content:msg6
+ BEGIN
  message: transactional: 1 prefix: test, sz: 4 content:msg5
  message: transactional: 1 prefix: test, sz: 4 content:msg7
+ COMMIT
+ BEGIN
  message: transactional: 1 prefix: test, sz: 11 content:czechtastic
-(7 rows)
+ COMMIT
+(13 rows)
 
 -- test db filtering
 \set prevdb :DBNAME
index cf3f7738e57880ece0773269ae878e00b20ebe93..3d8500f99cb5bf7647e819db67195a54a1db2bfe 100644 (file)
@@ -19,7 +19,7 @@ COMMIT;
 
 SELECT 'ignorethis' FROM pg_logical_emit_message(true, 'test', 'czechtastic');
 
-SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1');
+SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'force-binary', '0', 'skip-empty-xacts', '1', 'include-xids', '0');
 
 -- test db filtering
 \set prevdb :DBNAME
index 12d1d0505d77b43c347aa9940016f0c0e5ffafe2..ab870d9e4dc505a58a50a7dd49c8a556fec62c09 100644 (file)
@@ -743,6 +743,18 @@ pg_decode_message(LogicalDecodingContext *ctx,
                                  ReorderBufferTXN *txn, XLogRecPtr lsn, bool transactional,
                                  const char *prefix, Size sz, const char *message)
 {
+       TestDecodingData *data = ctx->output_plugin_private;
+       TestDecodingTxnData *txndata;
+
+       txndata = transactional ? txn->output_plugin_private : NULL;
+
+       /* output BEGIN if we haven't yet for transactional messages */
+       if (transactional && data->skip_empty_xacts && !txndata->xact_wrote_changes)
+               pg_output_begin(ctx, data, txn, false);
+
+       if (transactional)
+               txndata->xact_wrote_changes = true;
+
        OutputPluginPrepareWrite(ctx, true);
        appendStringInfo(ctx->out, "message: transactional: %d prefix: %s, sz: %zu content:",
                                         transactional, prefix, sz);