bool transactional, const char *prefix,
Size sz, const char *message);
static bool pg_decode_filter_prepare(LogicalDecodingContext *ctx,
+ TransactionId xid,
const char *gid);
static void pg_decode_begin_prepare_txn(LogicalDecodingContext *ctx,
ReorderBufferTXN *txn);
* substring, then we filter it out.
*/
static bool
-pg_decode_filter_prepare(LogicalDecodingContext *ctx, const char *gid)
+pg_decode_filter_prepare(LogicalDecodingContext *ctx, TransactionId xid,
+ const char *gid)
{
if (strstr(gid, "_nodecode") != NULL)
return true;
<command>COMMIT PREPARED</command> time. To signal that
decoding should be skipped, return <literal>true</literal>;
<literal>false</literal> otherwise. When the callback is not
- defined, <literal>false</literal> is assumed (i.e. nothing is
- filtered).
+ defined, <literal>false</literal> is assumed (i.e. no filtering, all
+ transactions using two-phase commit are decoded in two phases as well).
<programlisting>
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
const char *gid);
</programlisting>
- The <parameter>ctx</parameter> parameter has the same contents as for the
- other callbacks. The <parameter>gid</parameter> is the identifier that later
- identifies this transaction for <command>COMMIT PREPARED</command> or
- <command>ROLLBACK PREPARED</command>.
+ The <parameter>ctx</parameter> parameter has the same contents as for
+ the other callbacks. The parameters <parameter>xid</parameter>
+ and <parameter>gid</parameter> provide two different ways to identify
+ the transaction. The later <command>COMMIT PREPARED</command> or
+ <command>ROLLBACK PREPARED</command> carries both identifiers,
+ providing an output plugin the choice of what to use.
</para>
<para>
- The callback has to provide the same static answer for a given
- <parameter>gid</parameter> every time it is called.
+ The callback may be invoked multiple times per transaction to decode
+ and must provide the same static answer for a given pair of
+ <parameter>xid</parameter> and <parameter>gid</parameter> every time
+ it is called.
</para>
</sect3>
</para>
<para>
- Optionally the output plugin can specify a name pattern in the
- <function>filter_prepare_cb</function> and transactions with gid containing
- that name pattern will not be decoded as a two-phase commit transaction.
+ Optionally the output plugin can define filtering rules via
+ <function>filter_prepare_cb</function> to decode only specific transaction
+ in two phases. This can be achieved by pattern matching on the
+ <parameter>gid</parameter> or via lookups using the
+ <parameter>xid</parameter>.
</para>
<para>
static void DecodeXLogTuple(char *data, Size len, ReorderBufferTupleBuf *tup);
/* helper functions for decoding transactions */
-static inline bool FilterPrepare(LogicalDecodingContext *ctx, const char *gid);
+static inline bool FilterPrepare(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
static bool DecodeTXNNeedSkip(LogicalDecodingContext *ctx,
XLogRecordBuffer *buf, Oid dbId,
RepOriginId origin_id);
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_COMMIT_PREPARED)
- two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+ two_phase = !(FilterPrepare(ctx, xid,
+ parsed.twophase_gid));
DecodeCommit(ctx, buf, &parsed, xid, two_phase);
break;
* doesn't filter the transaction at prepare time.
*/
if (info == XLOG_XACT_ABORT_PREPARED)
- two_phase = !(FilterPrepare(ctx, parsed.twophase_gid));
+ two_phase = !(FilterPrepare(ctx, xid,
+ parsed.twophase_gid));
DecodeAbort(ctx, buf, &parsed, xid, two_phase);
break;
* manner iff output plugin supports two-phase commits and
* doesn't filter the transaction at prepare time.
*/
- if (FilterPrepare(ctx, parsed.twophase_gid))
+ if (FilterPrepare(ctx, parsed.twophase_xid,
+ parsed.twophase_gid))
{
ReorderBufferProcessXid(reorder, parsed.twophase_xid,
buf->origptr);
* this transaction as a regular commit later.
*/
static inline bool
-FilterPrepare(LogicalDecodingContext *ctx, const char *gid)
+FilterPrepare(LogicalDecodingContext *ctx, TransactionId xid,
+ const char *gid)
{
/*
* Skip if decoding of two-phase transactions at PREPARE time is not
if (ctx->callbacks.filter_prepare_cb == NULL)
return false;
- return filter_prepare_cb_wrapper(ctx, gid);
+ return filter_prepare_cb_wrapper(ctx, xid, gid);
}
static inline bool
}
bool
-filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid)
+filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, TransactionId xid,
+ const char *gid)
{
LogicalErrorCallbackState state;
ErrorContextCallback errcallback;
ctx->accept_writes = false;
/* do the actual work: call callback */
- ret = ctx->callbacks.filter_prepare_cb(ctx, gid);
+ ret = ctx->callbacks.filter_prepare_cb(ctx, xid, gid);
/* Pop the error context stack */
error_context_stack = errcallback.previous;
XLogRecPtr restart_lsn);
extern void LogicalConfirmReceivedLocation(XLogRecPtr lsn);
-extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx, const char *gid);
+extern bool filter_prepare_cb_wrapper(LogicalDecodingContext *ctx,
+ TransactionId xid, const char *gid);
extern bool filter_by_origin_cb_wrapper(LogicalDecodingContext *ctx, RepOriginId origin_id);
extern void ResetLogicalStreamingState(void);
extern void UpdateDecodingStats(LogicalDecodingContext *ctx);
* and sent as usual transaction.
*/
typedef bool (*LogicalDecodeFilterPrepareCB) (struct LogicalDecodingContext *ctx,
+ TransactionId xid,
const char *gid);
/*