Allocate hash join files in a separate memory context
authorTomas Vondra <[email protected]>
Fri, 19 May 2023 14:31:11 +0000 (16:31 +0200)
committerTomas Vondra <[email protected]>
Fri, 19 May 2023 15:17:58 +0000 (17:17 +0200)
Should a hash join exceed memory limit, the hashtable is split up into
multiple batches. The number of batches is doubled each time a given
batch is determined not to fit in memory. Each batch file is allocated
with a block-sized buffer for buffering tuples and parallel hash join
has additional sharedtuplestore accessor buffers.

In some pathological cases requiring a lot of batches, often with skewed
data, bad stats, or very large datasets, users can run out-of-memory
solely from the memory overhead of all the batch files' buffers.

Batch files were allocated in the ExecutorState memory context, making
it very hard to identify when this batch explosion was the source of an
OOM. This commit allocates the batch files in a dedicated memory
context, making it easier to identify the cause of an OOM and work to
avoid it.

Based on initial draft by Tomas Vondra, with significant reworks and
improvements by Jehan-Guillaume de Rorthais.

Author: Jehan-Guillaume de Rorthais <[email protected]>
Author: Tomas Vondra <[email protected]>
Reviewed-by: Melanie Plageman <[email protected]>
Discussion: https://postgr.es/m/20190421114618.z3mpgmimc3rmubi4@development
Discussion: https://postgr.es/m/20230504193006.1b5b9622%40karst#273020ff4061fc7a2fbb1ba96b281f17

src/backend/executor/nodeHash.c
src/backend/executor/nodeHashjoin.c
src/backend/utils/sort/sharedtuplestore.c
src/include/executor/hashjoin.h
src/include/executor/nodeHashjoin.h

index 5fd1c5553ba3f4591aec92dd827edaa2354910c6..301e4acba3c26b05ee62d684352c0b3318f861e6 100644 (file)
@@ -484,7 +484,7 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
         *
         * The hashtable control block is just palloc'd from the executor's
         * per-query memory context.  Everything else should be kept inside the
-        * subsidiary hashCxt or batchCxt.
+        * subsidiary hashCxt, batchCxt or spillCxt.
         */
        hashtable = palloc_object(HashJoinTableData);
        hashtable->nbuckets = nbuckets;
@@ -538,6 +538,10 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
                                                                                                "HashBatchContext",
                                                                                                ALLOCSET_DEFAULT_SIZES);
 
+       hashtable->spillCxt = AllocSetContextCreate(hashtable->hashCxt,
+                                                                                               "HashSpillContext",
+                                                                                               ALLOCSET_DEFAULT_SIZES);
+
        /* Allocate data that will live for the life of the hashjoin */
 
        oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
@@ -570,12 +574,19 @@ ExecHashTableCreate(HashState *state, List *hashOperators, List *hashCollations,
 
        if (nbatch > 1 && hashtable->parallel_state == NULL)
        {
+               MemoryContext oldctx;
+
                /*
                 * allocate and initialize the file arrays in hashCxt (not needed for
                 * parallel case which uses shared tuplestores instead of raw files)
                 */
+               oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
+
                hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
                hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+
+               MemoryContextSwitchTo(oldctx);
+
                /* The files will not be opened until needed... */
                /* ... but make sure we have temp tablespaces established for them */
                PrepareTempTablespaces();
@@ -913,7 +924,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
        int                     oldnbatch = hashtable->nbatch;
        int                     curbatch = hashtable->curbatch;
        int                     nbatch;
-       MemoryContext oldcxt;
        long            ninmemory;
        long            nfreed;
        HashMemoryChunk oldchunks;
@@ -934,13 +944,16 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                   hashtable, nbatch, hashtable->spaceUsed);
 #endif
 
-       oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
-
        if (hashtable->innerBatchFile == NULL)
        {
+               MemoryContext oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
+
                /* we had no file arrays before */
                hashtable->innerBatchFile = palloc0_array(BufFile *, nbatch);
                hashtable->outerBatchFile = palloc0_array(BufFile *, nbatch);
+
+               MemoryContextSwitchTo(oldcxt);
+
                /* time to establish the temp tablespaces, too */
                PrepareTempTablespaces();
        }
@@ -951,8 +964,6 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                hashtable->outerBatchFile = repalloc0_array(hashtable->outerBatchFile, BufFile *, oldnbatch, nbatch);
        }
 
-       MemoryContextSwitchTo(oldcxt);
-
        hashtable->nbatch = nbatch;
 
        /*
@@ -1024,7 +1035,8 @@ ExecHashIncreaseNumBatches(HashJoinTable hashtable)
                                Assert(batchno > curbatch);
                                ExecHashJoinSaveTuple(HJTUPLE_MINTUPLE(hashTuple),
                                                                          hashTuple->hashvalue,
-                                                                         &hashtable->innerBatchFile[batchno]);
+                                                                         &hashtable->innerBatchFile[batchno],
+                                                                         hashtable);
 
                                hashtable->spaceUsed -= hashTupleSize;
                                nfreed++;
@@ -1683,7 +1695,8 @@ ExecHashTableInsert(HashJoinTable hashtable,
                Assert(batchno > hashtable->curbatch);
                ExecHashJoinSaveTuple(tuple,
                                                          hashvalue,
-                                                         &hashtable->innerBatchFile[batchno]);
+                                                         &hashtable->innerBatchFile[batchno],
+                                                         hashtable);
        }
 
        if (shouldFree)
@@ -2664,7 +2677,8 @@ ExecHashRemoveNextSkewBucket(HashJoinTable hashtable)
                        /* Put the tuple into a temp file for later batches */
                        Assert(batchno > hashtable->curbatch);
                        ExecHashJoinSaveTuple(tuple, hashvalue,
-                                                                 &hashtable->innerBatchFile[batchno]);
+                                                                 &hashtable->innerBatchFile[batchno],
+                                                                 hashtable);
                        pfree(hashTuple);
                        hashtable->spaceUsed -= tupleSize;
                        hashtable->spaceUsedSkew -= tupleSize;
@@ -3093,8 +3107,11 @@ ExecParallelHashJoinSetUpBatches(HashJoinTable hashtable, int nbatch)
        pstate->nbatch = nbatch;
        batches = dsa_get_address(hashtable->area, pstate->batches);
 
-       /* Use hash join memory context. */
-       oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+       /*
+        * Use hash join spill memory context to allocate accessors, including
+        * buffers for the temporary files.
+        */
+       oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
 
        /* Allocate this backend's accessor array. */
        hashtable->nbatch = nbatch;
@@ -3196,8 +3213,11 @@ ExecParallelHashEnsureBatchAccessors(HashJoinTable hashtable)
         */
        Assert(DsaPointerIsValid(pstate->batches));
 
-       /* Use hash join memory context. */
-       oldcxt = MemoryContextSwitchTo(hashtable->hashCxt);
+       /*
+        * Use hash join spill memory context to allocate accessors, including
+        * buffers for the temporary files.
+        */
+       oldcxt = MemoryContextSwitchTo(hashtable->spillCxt);
 
        /* Allocate this backend's accessor array. */
        hashtable->nbatch = pstate->nbatch;
index 615d9980cf50bb5e9f42ae51488ad133057857b9..e40436db38ebbb8e21c1fc135bb533120e71aa4e 100644 (file)
@@ -495,7 +495,8 @@ ExecHashJoinImpl(PlanState *pstate, bool parallel)
                                        Assert(parallel_state == NULL);
                                        Assert(batchno > hashtable->curbatch);
                                        ExecHashJoinSaveTuple(mintuple, hashvalue,
-                                                                                 &hashtable->outerBatchFile[batchno]);
+                                                                                 &hashtable->outerBatchFile[batchno],
+                                                                                 hashtable);
 
                                        if (shouldFree)
                                                heap_free_minimal_tuple(mintuple);
@@ -1317,21 +1318,39 @@ ExecParallelHashJoinNewBatch(HashJoinState *hjstate)
  * The data recorded in the file for each tuple is its hash value,
  * then the tuple in MinimalTuple format.
  *
- * Note: it is important always to call this in the regular executor
- * context, not in a shorter-lived context; else the temp file buffers
- * will get messed up.
+ * fileptr points to a batch file in one of the the hashtable arrays.
+ *
+ * The batch files (and their buffers) are allocated in the spill context
+ * created for the hashtable.
  */
 void
 ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-                                         BufFile **fileptr)
+                                         BufFile **fileptr, HashJoinTable hashtable)
 {
        BufFile    *file = *fileptr;
 
+       /*
+        * The batch file is lazily created. If this is the first tuple
+        * written to this batch, the batch file is created and its buffer is
+        * allocated in the spillCxt context, NOT in the batchCxt.
+        *
+        * During the build phase, buffered files are created for inner
+        * batches. Each batch's buffered file is closed (and its buffer freed)
+        * after the batch is loaded into memory during the outer side scan.
+        * Therefore, it is necessary to allocate the batch file buffer in a
+        * memory context which outlives the batch itself.
+        *
+        * Also, we use spillCxt instead of hashCxt for a better accounting of
+        * the spilling memory consumption.
+        */
        if (file == NULL)
        {
-               /* First write to this batch file, so open it. */
+               MemoryContext   oldctx = MemoryContextSwitchTo(hashtable->spillCxt);
+
                file = BufFileCreateTemp(false);
                *fileptr = file;
+
+               MemoryContextSwitchTo(oldctx);
        }
 
        BufFileWrite(file, &hashvalue, sizeof(uint32));
index 08312491599d9abdab42fc6e6579744581a6c372..236be65f221761bb848bcec4327778c22f3273f3 100644 (file)
@@ -308,11 +308,15 @@ sts_puttuple(SharedTuplestoreAccessor *accessor, void *meta_data,
        {
                SharedTuplestoreParticipant *participant;
                char            name[MAXPGPATH];
+               MemoryContext oldcxt;
 
                /* Create one.  Only this backend will write into it. */
                sts_filename(name, accessor, accessor->participant);
+
+               oldcxt = MemoryContextSwitchTo(accessor->context);
                accessor->write_file =
                        BufFileCreateFileSet(&accessor->fileset->fs, name);
+               MemoryContextSwitchTo(oldcxt);
 
                /* Set up the shared state for this backend's file. */
                participant = &accessor->sts->participants[accessor->participant];
@@ -527,11 +531,15 @@ sts_parallel_scan_next(SharedTuplestoreAccessor *accessor, void *meta_data)
                        if (accessor->read_file == NULL)
                        {
                                char            name[MAXPGPATH];
+                               MemoryContext oldcxt;
 
                                sts_filename(name, accessor, accessor->read_participant);
+
+                               oldcxt = MemoryContextSwitchTo(accessor->context);
                                accessor->read_file =
                                        BufFileOpenFileSet(&accessor->fileset->fs, name, O_RDONLY,
                                                                           false);
+                               MemoryContextSwitchTo(oldcxt);
                        }
 
                        /* Seek and load the chunk header. */
index 8ee59d2c7107e3dcffaf5dbacea2966866d7b4d6..857ca58f6f265b9947afde4ecf4e85887caf856b 100644 (file)
 /* ----------------------------------------------------------------
  *                             hash-join hash table structures
  *
- * Each active hashjoin has a HashJoinTable control block, which is
- * palloc'd in the executor's per-query context.  All other storage needed
- * for the hashjoin is kept in private memory contexts, two for each hashjoin.
- * This makes it easy and fast to release the storage when we don't need it
- * anymore.  (Exception: data associated with the temp files lives in the
- * per-query context too, since we always call buffile.c in that context.)
+ * Each active hashjoin has a HashJoinTable structure, which is
+ * palloc'd in the executor's per-query context.  Other storage needed for
+ * each hashjoin is kept in child contexts, three for each hashjoin:
+ *   - HashTableContext (hashCxt): the parent hash table storage context
+ *   - HashSpillContext (spillCxt): storage for temp files buffers
+ *   - HashBatchContext (batchCxt): storage for a batch in serial hash join
  *
  * The hashtable contexts are made children of the per-query context, ensuring
  * that they will be discarded at end of statement even if the join is
  * be cleaned up by the virtual file manager in event of an error.)
  *
  * Storage that should live through the entire join is allocated from the
- * "hashCxt", while storage that is only wanted for the current batch is
- * allocated in the "batchCxt".  By resetting the batchCxt at the end of
- * each batch, we free all the per-batch storage reliably and without tedium.
+ * "hashCxt" (mainly the hashtable's metadata). Also, the "hashCxt" context is
+ * the parent of "spillCxt" and "batchCxt". It makes it easy and fast to
+ * release the storage when we don't need it anymore.
+ *
+ * Data associated with temp files is allocated in the "spillCxt" context
+ * which lives for the duration of the entire join as batch files'
+ * creation and usage may span batch execution. These files are
+ * explicitly destroyed by calling BufFileClose() when the code is done
+ * with them. The aim of this context is to help accounting for the
+ * memory allocated for temp files and their buffers.
+ *
+ * Finally, data used only during a single batch's execution is allocated
+ * in the "batchCxt". By resetting the batchCxt at the end of each batch,
+ * we free all the per-batch storage reliably and without tedium.
  *
  * During first scan of inner relation, we get its tuples from executor.
  * If nbatch > 1 then tuples that don't belong in first batch get saved
@@ -350,6 +361,7 @@ typedef struct HashJoinTableData
 
        MemoryContext hashCxt;          /* context for whole-hash-join storage */
        MemoryContext batchCxt;         /* context for this-batch-only storage */
+       MemoryContext spillCxt;         /* context for spilling to temp files */
 
        /* used for dense allocation of tuples (into linked chunks) */
        HashMemoryChunk chunks;         /* one list for the whole batch */
index d3670708836c52493f496df241e9a53cafed948b..ccb704ede1d01a5a3d94c68d32b675136b3afc57 100644 (file)
@@ -29,6 +29,6 @@ extern void ExecHashJoinInitializeWorker(HashJoinState *state,
                                                                                 ParallelWorkerContext *pwcxt);
 
 extern void ExecHashJoinSaveTuple(MinimalTuple tuple, uint32 hashvalue,
-                                                                 BufFile **fileptr);
+                                                                 BufFile **fileptr, HashJoinTable hashtable);
 
 #endif                                                 /* NODEHASHJOIN_H */