* Record the partial change for the streaming of in-progress transactions. We
* can stream only complete changes so if we have a partial change like toast
* table insert or speculative insert then we mark such a 'txn' so that it
- * can't be streamed. We also ensure that if the changes in such a 'txn' are
- * above logical_decoding_work_mem threshold then we stream them as soon as we
- * have a complete change.
+ * can't be streamed. We also ensure that if the changes in such a 'txn' can
+ * be streamed and are above logical_decoding_work_mem threshold then we stream
+ * them as soon as we have a complete change.
*/
static void
ReorderBufferProcessPartialChange(ReorderBuffer *rb, ReorderBufferTXN *txn,
*/
if (ReorderBufferCanStartStreaming(rb) &&
!(rbtxn_has_partial_change(toptxn)) &&
- rbtxn_is_serialized(txn))
+ rbtxn_is_serialized(txn) &&
+ rbtxn_has_streamable_change(toptxn))
ReorderBufferStreamTXN(rb, toptxn);
}
return;
}
+ /*
+ * The changes that are sent downstream are considered streamable. We
+ * remember such transactions so that only those will later be considered
+ * for streaming.
+ */
+ if (change->action == REORDER_BUFFER_CHANGE_INSERT ||
+ change->action == REORDER_BUFFER_CHANGE_UPDATE ||
+ change->action == REORDER_BUFFER_CHANGE_DELETE ||
+ change->action == REORDER_BUFFER_CHANGE_INTERNAL_SPEC_INSERT ||
+ change->action == REORDER_BUFFER_CHANGE_TRUNCATE ||
+ change->action == REORDER_BUFFER_CHANGE_MESSAGE)
+ {
+ ReorderBufferTXN *toptxn;
+
+ /* get the top transaction */
+ if (txn->toptxn != NULL)
+ toptxn = txn->toptxn;
+ else
+ toptxn = txn;
+
+ toptxn->txn_flags |= RBTXN_HAS_STREAMABLE_CHANGE;
+ }
+
change->lsn = lsn;
change->txn = txn;
if (txn == NULL)
return;
- /* For streamed transactions notify the remote node about the abort. */
- if (rbtxn_is_streamed(txn))
- rb->stream_abort(rb, txn, lsn);
+ /* this transaction mustn't be streamed */
+ Assert(!rbtxn_is_streamed(txn));
/* cosmetic... */
txn->final_lsn = lsn;
}
/*
- * Find the largest toplevel transaction to evict (by streaming).
+ * Find the largest streamable toplevel transaction to evict (by streaming).
*
* This can be seen as an optimized version of ReorderBufferLargestTXN, which
* should give us the same transaction (because we don't update memory account
* for subtransaction with streaming, so it's always 0). But we can simply
* iterate over the limited number of toplevel transactions that have a base
* snapshot. There is no use of selecting a transaction that doesn't have base
- * snapshot because we don't decode such transactions.
+ * snapshot because we don't decode such transactions. Also, we do not select
+ * the transaction which doesn't have any streamable change.
*
* Note that, we skip transactions that contains incomplete changes. There
* is a scope of optimization here such that we can select the largest
* the subxact from where we streamed the last change.
*/
static ReorderBufferTXN *
-ReorderBufferLargestTopTXN(ReorderBuffer *rb)
+ReorderBufferLargestStreamableTopTXN(ReorderBuffer *rb)
{
dlist_iter iter;
Size largest_size = 0;
Assert(txn->base_snapshot != NULL);
if ((largest == NULL || txn->total_size > largest_size) &&
- (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)))
+ (txn->total_size > 0) && !(rbtxn_has_partial_change(txn)) &&
+ rbtxn_has_streamable_change(txn))
{
largest = txn;
largest_size = txn->total_size;
* memory by streaming, if possible. Otherwise, spill to disk.
*/
if (ReorderBufferCanStartStreaming(rb) &&
- (txn = ReorderBufferLargestTopTXN(rb)) != NULL)
+ (txn = ReorderBufferLargestStreamableTopTXN(rb)) != NULL)
{
/* we know there has to be one, because the size is not zero */
Assert(txn && !txn->toptxn);
* restarting.
*/
if (ReorderBufferCanStream(rb) &&
- !SnapBuildXactNeedsSkip(builder, ctx->reader->EndRecPtr))
+ !SnapBuildXactNeedsSkip(builder, ctx->reader->ReadRecPtr))
return true;
return false;
} ReorderBufferChange;
/* ReorderBufferTXN txn_flags */
-#define RBTXN_HAS_CATALOG_CHANGES 0x0001
-#define RBTXN_IS_SUBXACT 0x0002
-#define RBTXN_IS_SERIALIZED 0x0004
-#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
-#define RBTXN_IS_STREAMED 0x0010
-#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
-#define RBTXN_PREPARE 0x0040
-#define RBTXN_SKIPPED_PREPARE 0x0080
+#define RBTXN_HAS_CATALOG_CHANGES 0x0001
+#define RBTXN_IS_SUBXACT 0x0002
+#define RBTXN_IS_SERIALIZED 0x0004
+#define RBTXN_IS_SERIALIZED_CLEAR 0x0008
+#define RBTXN_IS_STREAMED 0x0010
+#define RBTXN_HAS_PARTIAL_CHANGE 0x0020
+#define RBTXN_PREPARE 0x0040
+#define RBTXN_SKIPPED_PREPARE 0x0080
+#define RBTXN_HAS_STREAMABLE_CHANGE 0x0100
/* Does the transaction have catalog changes? */
#define rbtxn_has_catalog_changes(txn) \
((txn)->txn_flags & RBTXN_HAS_PARTIAL_CHANGE) != 0 \
)
+/* Does this transaction contain streamable changes? */
+#define rbtxn_has_streamable_change(txn) \
+( \
+ ((txn)->txn_flags & RBTXN_HAS_STREAMABLE_CHANGE) != 0 \
+)
+
/*
* Has this transaction been streamed to downstream?
*