committing streamed transaction
 (17 rows)
 
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+ count 
+-------
+     5
+(1 row)
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
  pg_drop_replication_slot 
 
 
 SELECT data FROM pg_logical_slot_get_changes('regression_slot', NULL,NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
 
+/*
+ * Test concurrent abort with toast data. When streaming the second insertion, we
+ * detect that the subtransaction was aborted, and reset the transaction while having
+ * the TOAST changes in memory, resulting in deallocating both decoded changes and
+ * TOAST reconstruction data. Memory usage counters must be updated correctly.
+ */
+BEGIN;
+INSERT INTO stream_test SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50) FROM generate_series(1, 500) g(i);
+ALTER TABLE stream_test ADD COLUMN i INT;
+SAVEPOINT s1;
+INSERT INTO stream_test(data, i) SELECT repeat(string_agg(to_char(g.i, 'FM0000'), ''), 50), 1 FROM generate_series(1, 500) g(i);
+ROLLBACK TO s1;
+COMMIT;
+SELECT count(*) FROM pg_logical_slot_get_changes('regression_slot', NULL, NULL, 'include-xids', '0', 'skip-empty-xacts', '1', 'stream-changes', '1');
+
 DROP TABLE stream_test;
 SELECT pg_drop_replication_slot('regression_slot');
 
    /* Reset the toast hash */
    ReorderBufferToastReset(rb, txn);
 
+   /* All changes must be deallocated */
+   Assert(txn->size == 0);
+
    pfree(txn);
 }
 
 {
    bool        found;
    dlist_mutable_iter iter;
+   Size        mem_freed = 0;
 
    /* cleanup subtransactions & their changes */
    dlist_foreach_modify(iter, &txn->subtxns)
        /* Check we're not mixing changes from different transactions. */
        Assert(change->txn == txn);
 
+       /*
+        * Instead of updating the memory counter for individual changes,
+        * we sum up the size of memory to free so we can update the memory
+        * counter all together below. This saves costs of maintaining
+        * the max-heap.
+        */
+       mem_freed += ReorderBufferChangeSize(change);
+
        ReorderBufferReturnChange(rb, change, false);
    }
 
+   /* Update the memory counter */
+   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
+
    /*
     * Cleanup the tuplecids we stored for decoding catalog snapshot access.
     * They are always stored in the toplevel transaction.
    if (rbtxn_is_serialized(txn))
        ReorderBufferRestoreCleanup(rb, txn);
 
-   /* Update the memory counter */
-   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
-
    /* deallocate */
    ReorderBufferReturnTXN(rb, txn);
 }
 ReorderBufferTruncateTXN(ReorderBuffer *rb, ReorderBufferTXN *txn, bool txn_prepared)
 {
    dlist_mutable_iter iter;
+   Size    mem_freed = 0;
 
    /* cleanup subtransactions & their changes */
    dlist_foreach_modify(iter, &txn->subtxns)
        /* remove the change from it's containing list */
        dlist_delete(&change->node);
 
+       /*
+        * Instead of updating the memory counter for individual changes,
+        * we sum up the size of memory to free so we can update the memory
+        * counter all together below. This saves costs of maintaining
+        * the max-heap.
+        */
+       mem_freed += ReorderBufferChangeSize(change);
+
        ReorderBufferReturnChange(rb, change, false);
    }
 
    /* Update the memory counter */
-   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, txn->size);
+   ReorderBufferChangeMemoryUpdate(rb, NULL, txn, false, mem_freed);
 
    /*
     * Mark the transaction as streamed.
        rb->stream_stop(rb, txn, last_lsn);
        ReorderBufferSaveTXNSnapshot(rb, txn, snapshot_now, command_id);
    }
+
+   /* All changes must be deallocated */
+   Assert(txn->size == 0);
 }
 
 /*