pg_dump: Use only LZ4 frame format for compression
authorTomas Vondra <[email protected]>
Fri, 31 Mar 2023 22:54:17 +0000 (00:54 +0200)
committerTomas Vondra <[email protected]>
Fri, 31 Mar 2023 22:54:50 +0000 (00:54 +0200)
After 0da243fed0 got committed, it was reported that in some cases the
compression ratio is rather poor - particularly for custom format with
narrow tables - due to writing the LZ4 header/footer for each row.

This commit switches to LZ4F (LZ4 frame format), eliminating most of the
overhead and greatly improving the compression ratio. This makes the
compressed size about the same for plain and custom formats (just like
for gzip, for example).

LZ4F is now used by both compression APIs, which allowed refactoring and
reusing more of the code. For consistency this also renames the LZ4File
struct to LZ4State, and a number of functions are now prefixed with
LZ4Stream_ (instead of LZ4File_).

Patch by Georgios Kokolatos, based on report and initial patch by Justin
Pryzby. Review and minor cleanups by me.

Author: Georgios Kokolatos, Justin Pryzby
Reported-by: Justin Pryzby
Reviewed-by: Tomas Vondra
Discussion: https://postgr.es/m/20230227044910.GO1653%40telsasoft.com

src/bin/pg_dump/compress_lz4.c

index fc2f4e116ddeb3180f99100e4b5cd625e64675e4..5aca9c1f0619d6c38a6413ff5b61c6ae9f709a47 100644 (file)
@@ -17,7 +17,6 @@
 #include "compress_lz4.h"
 
 #ifdef USE_LZ4
-#include <lz4.h>
 #include <lz4frame.h>
 
 /*
 #define LZ4F_HEADER_SIZE_MAX   32
 #endif
 
+/*---------------------------------
+ * Common to both compression APIs
+ *---------------------------------
+ */
+
+/*
+ * (de)compression state used by both the Compressor and Stream APIs.
+ */
+typedef struct LZ4State
+{
+       /*
+        * Used by the Stream API to keep track of the file stream.
+        */
+       FILE       *fp;
+
+       LZ4F_preferences_t prefs;
+
+       LZ4F_compressionContext_t       ctx;
+       LZ4F_decompressionContext_t     dtx;
+
+       /*
+        * Used by the Stream API's lazy initialization.
+        */
+       bool            inited;
+
+       /*
+        * Used by the Stream API to distinguish between compression and
+        * decompression operations.
+        */
+       bool            compressing;
+
+       /*
+        * Used by the Compressor API to mark if the compression headers have been
+        * written after initialization.
+        */
+       bool            needs_header_flush;
+
+       size_t          buflen;
+       char       *buffer;
+
+       /*
+        * Used by the Stream API to store already uncompressed data that the
+        * caller has not consumed.
+        */
+       size_t          overflowalloclen;
+       size_t          overflowlen;
+       char       *overflowbuf;
+
+       /*
+        * Used by both APIs to keep track of the compressed data length stored in
+        * the buffer.
+        */
+       size_t          compressedlen;
+
+       /*
+        * Used by both APIs to keep track of error codes.
+        */
+       size_t          errcode;
+} LZ4State;
+
+/*
+ * LZ4State_compression_init
+ *             Initialize the required LZ4State members for compression.
+ *
+ * Write the LZ4 frame header in a buffer keeping track of its length. Users of
+ * this function can choose when and how to write the header to a file stream.
+ *
+ * Returns true on success. In case of a failure returns false, and stores the
+ * error code in state->errcode.
+ */
+static bool
+LZ4State_compression_init(LZ4State *state)
+{
+       size_t          status;
+
+       state->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &state->prefs);
+
+       /*
+        * LZ4F_compressBegin requires a buffer that is greater or equal to
+        * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
+        */
+       if (state->buflen < LZ4F_HEADER_SIZE_MAX)
+               state->buflen = LZ4F_HEADER_SIZE_MAX;
+
+       status = LZ4F_createCompressionContext(&state->ctx, LZ4F_VERSION);
+       if (LZ4F_isError(status))
+       {
+               state->errcode = status;
+               return false;
+       }
+
+       state->buffer = pg_malloc(state->buflen);
+       status = LZ4F_compressBegin(state->ctx,
+                                                               state->buffer, state->buflen,
+                                                               &state->prefs);
+       if (LZ4F_isError(status))
+       {
+               state->errcode = status;
+               return false;
+       }
+
+       state->compressedlen = status;
+
+       return true;
+}
+
 /*----------------------
  * Compressor API
  *----------------------
  */
 
-typedef struct LZ4CompressorState
-{
-       char       *outbuf;
-       size_t          outsize;
-} LZ4CompressorState;
-
 /* Private routines that support LZ4 compressed data I/O */
-static void ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs);
-static void WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
-                                                                 const void *data, size_t dLen);
-static void EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs);
 
 static void
 ReadDataFromArchiveLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-       LZ4_streamDecode_t lz4StreamDecode;
-       char       *buf;
-       char       *decbuf;
-       size_t          buflen;
-       size_t          cnt;
-
-       buflen = DEFAULT_IO_BUFFER_SIZE;
-       buf = pg_malloc(buflen);
-       decbuf = pg_malloc(buflen);
+       size_t          r;
+       size_t          readbuflen;
+       char       *outbuf;
+       char       *readbuf;
+       LZ4F_decompressionContext_t ctx = NULL;
+       LZ4F_decompressOptions_t        dec_opt;
+       LZ4F_errorCode_t                        status;
+
+       memset(&dec_opt, 0, sizeof(dec_opt));
+       status = LZ4F_createDecompressionContext(&ctx, LZ4F_VERSION);
+       if (LZ4F_isError(status))
+               pg_fatal("could not create LZ4 decompression context: %s",
+                                LZ4F_getErrorName(status));
+
+       outbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+       readbuf = pg_malloc0(DEFAULT_IO_BUFFER_SIZE);
+       readbuflen = DEFAULT_IO_BUFFER_SIZE;
+       while ((r = cs->readF(AH, &readbuf, &readbuflen)) > 0)
+       {
+               char       *readp;
+               char       *readend;
 
-       LZ4_setStreamDecode(&lz4StreamDecode, NULL, 0);
+               /* Process one chunk */
+               readp = readbuf;
+               readend = readbuf + r;
+               while (readp < readend)
+               {
+                       size_t          out_size = DEFAULT_IO_BUFFER_SIZE;
+                       size_t          read_size = readend - readp;
 
-       while ((cnt = cs->readF(AH, &buf, &buflen)))
-       {
-               int                     decBytes = LZ4_decompress_safe_continue(&lz4StreamDecode,
-                                                                                                                       buf, decbuf,
-                                                                                                                       cnt, buflen);
+                       memset(outbuf, 0, DEFAULT_IO_BUFFER_SIZE);
+                       status = LZ4F_decompress(ctx, outbuf, &out_size,
+                                                                        readp, &read_size, &dec_opt);
+                       if (LZ4F_isError(status))
+                               pg_fatal("could not decompress: %s",
+                                                LZ4F_getErrorName(status));
 
-               ahwrite(decbuf, 1, decBytes, AH);
+                       ahwrite(outbuf, 1, out_size, AH);
+                       readp += read_size;
+               }
        }
 
-       pg_free(buf);
-       pg_free(decbuf);
+       pg_free(outbuf);
+       pg_free(readbuf);
+
+       status = LZ4F_freeDecompressionContext(ctx);
+       if (LZ4F_isError(status))
+               pg_fatal("could not free LZ4 decompression context: %s",
+                                LZ4F_getErrorName(status));
 }
 
 static void
 WriteDataToArchiveLZ4(ArchiveHandle *AH, CompressorState *cs,
                                          const void *data, size_t dLen)
 {
-       LZ4CompressorState *LZ4cs = (LZ4CompressorState *) cs->private_data;
-       size_t          compressed;
-       size_t          requiredsize = LZ4_compressBound(dLen);
+       LZ4State   *state = (LZ4State *) cs->private_data;
+       size_t          remaining = dLen;
+       size_t          status;
+       size_t          chunk;
 
-       if (requiredsize > LZ4cs->outsize)
+       /* Write the header if not yet written. */
+       if (state->needs_header_flush)
        {
-               LZ4cs->outbuf = pg_realloc(LZ4cs->outbuf, requiredsize);
-               LZ4cs->outsize = requiredsize;
+               cs->writeF(AH, state->buffer, state->compressedlen);
+               state->needs_header_flush = false;
        }
 
-       compressed = LZ4_compress_default(data, LZ4cs->outbuf,
-                                                                         dLen, LZ4cs->outsize);
+       while (remaining > 0)
+       {
 
-       if (compressed <= 0)
-               pg_fatal("failed to LZ4 compress data");
+               if (remaining > DEFAULT_IO_BUFFER_SIZE)
+                       chunk = DEFAULT_IO_BUFFER_SIZE;
+               else
+                       chunk = remaining;
 
-       cs->writeF(AH, LZ4cs->outbuf, compressed);
+               remaining -= chunk;
+               status = LZ4F_compressUpdate(state->ctx,
+                                                                        state->buffer, state->buflen,
+                                                                        data, chunk, NULL);
+
+               if (LZ4F_isError(status))
+                       pg_fatal("failed to LZ4 compress data: %s",
+                                        LZ4F_getErrorName(status));
+
+               cs->writeF(AH, state->buffer, status);
+
+               data = ((char *) data) + chunk;
+       }
 }
 
 static void
 EndCompressorLZ4(ArchiveHandle *AH, CompressorState *cs)
 {
-       LZ4CompressorState *LZ4cs;
+       LZ4State   *state = (LZ4State *) cs->private_data;
+       size_t          status;
 
-       LZ4cs = (LZ4CompressorState *) cs->private_data;
-       if (LZ4cs)
-       {
-               pg_free(LZ4cs->outbuf);
-               pg_free(LZ4cs);
-               cs->private_data = NULL;
-       }
+       /* Nothing needs to be done */
+       if (!state)
+               return;
+
+       /*
+        * Write the header if not yet written. The caller is not required to call
+        * writeData if the relation does not contain any data. Thus it is
+        * possible to reach here without having flushed the header. Do it before
+        * ending the compression.
+        */
+       if (state->needs_header_flush)
+               cs->writeF(AH, state->buffer, state->compressedlen);
+
+       status = LZ4F_compressEnd(state->ctx,
+                                                         state->buffer, state->buflen,
+                                                         NULL);
+       if (LZ4F_isError(status))
+               pg_fatal("failed to end compression: %s",
+                                LZ4F_getErrorName(status));
+
+       cs->writeF(AH, state->buffer, status);
+
+       status = LZ4F_freeCompressionContext(state->ctx);
+       if (LZ4F_isError(status))
+               pg_fatal("failed to end compression: %s",
+                                LZ4F_getErrorName(status));
+
+       pg_free(state->buffer);
+       pg_free(state);
+
+       cs->private_data = NULL;
 }
 
-
 /*
  * Public routines that support LZ4 compressed data I/O
  */
 void
 InitCompressorLZ4(CompressorState *cs, const pg_compress_specification compression_spec)
 {
+       LZ4State   *state;
+
        cs->readData = ReadDataFromArchiveLZ4;
        cs->writeData = WriteDataToArchiveLZ4;
        cs->end = EndCompressorLZ4;
 
        cs->compression_spec = compression_spec;
 
-       /* Will be lazy init'd */
-       cs->private_data = pg_malloc0(sizeof(LZ4CompressorState));
+       /*
+        * Read operations have access to the whole input. No state needs to be
+        * carried between calls.
+        */
+       if (cs->readF)
+               return;
+
+       state = pg_malloc0(sizeof(*state));
+       if (cs->compression_spec.level >= 0)
+               state->prefs.compressionLevel = cs->compression_spec.level;
+
+       if (!LZ4State_compression_init(state))
+               pg_fatal("could not initialize LZ4 compression: %s",
+                                LZ4F_getErrorName(state->errcode));
+
+       /* Remember that the header has not been written. */
+       state->needs_header_flush = true;
+       cs->private_data = state;
 }
 
 /*----------------------
- * Compress File API
+ * Compress Stream API
  *----------------------
  */
 
-/*
- * State needed for LZ4 (de)compression using the CompressFileHandle API.
- */
-typedef struct LZ4File
-{
-       FILE       *fp;
-
-       LZ4F_preferences_t prefs;
-
-       LZ4F_compressionContext_t ctx;
-       LZ4F_decompressionContext_t dtx;
-
-       bool            inited;
-       bool            compressing;
-
-       size_t          buflen;
-       char       *buffer;
-
-       size_t          overflowalloclen;
-       size_t          overflowlen;
-       char       *overflowbuf;
-
-       size_t          errcode;
-} LZ4File;
 
 /*
  * LZ4 equivalent to feof() or gzeof().  Return true iff there is no
@@ -163,21 +318,21 @@ typedef struct LZ4File
  * is reached.
  */
 static bool
-LZ4File_eof(CompressFileHandle *CFH)
+LZ4Stream_eof(CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
 
-       return fs->overflowlen == 0 && feof(fs->fp);
+       return state->overflowlen == 0 && feof(state->fp);
 }
 
 static const char *
-LZ4File_get_error(CompressFileHandle *CFH)
+LZ4Stream_get_error(CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        const char *errmsg;
 
-       if (LZ4F_isError(fs->errcode))
-               errmsg = LZ4F_getErrorName(fs->errcode);
+       if (LZ4F_isError(state->errcode))
+               errmsg = LZ4F_getErrorName(state->errcode);
        else
                errmsg = strerror(errno);
 
@@ -185,57 +340,34 @@ LZ4File_get_error(CompressFileHandle *CFH)
 }
 
 /*
- * Prepare an already alloc'ed LZ4File struct for subsequent calls (either
- * compression or decompression).
+ * Initialize an already alloc'ed LZ4State struct for subsequent calls.
  *
- * It creates the necessary contexts for the operations. When compressing data
- * (indicated by compressing=true), it additionally writes the LZ4 header in the
- * output stream.
+ * Creates the necessary contexts for either compresion or decompression. When
+ * compressing data (indicated by compressing=true), it additionally writes the
+ * LZ4 header in the output stream.
  *
  * Returns true on success. In case of a failure returns false, and stores the
- * error code in fs->errcode.
+ * error code in state->errcode.
  */
 static bool
-LZ4File_init(LZ4File *fs, int size, bool compressing)
+LZ4Stream_init(LZ4State *state, int size, bool compressing)
 {
        size_t          status;
 
-       if (fs->inited)
+       if (state->inited)
                return true;
 
-       fs->compressing = compressing;
-       fs->inited = true;
+       state->compressing = compressing;
+       state->inited = true;
 
        /* When compressing, write LZ4 header to the output stream. */
-       if (fs->compressing)
+       if (state->compressing)
        {
-               fs->buflen = LZ4F_compressBound(DEFAULT_IO_BUFFER_SIZE, &fs->prefs);
-
-               /*
-                * LZ4F_compressBegin requires a buffer that is greater or equal to
-                * LZ4F_HEADER_SIZE_MAX. Verify that the requirement is met.
-                */
-               if (fs->buflen < LZ4F_HEADER_SIZE_MAX)
-                       fs->buflen = LZ4F_HEADER_SIZE_MAX;
 
-               status = LZ4F_createCompressionContext(&fs->ctx, LZ4F_VERSION);
-               if (LZ4F_isError(status))
-               {
-                       fs->errcode = status;
+               if (!LZ4State_compression_init(state))
                        return false;
-               }
-
-               fs->buffer = pg_malloc(fs->buflen);
-               status = LZ4F_compressBegin(fs->ctx, fs->buffer, fs->buflen,
-                                                                       &fs->prefs);
-
-               if (LZ4F_isError(status))
-               {
-                       fs->errcode = status;
-                       return false;
-               }
 
-               if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+               if (fwrite(state->buffer, 1, state->compressedlen, state->fp) != state->compressedlen)
                {
                        errno = (errno) ? errno : ENOSPC;
                        return false;
@@ -243,19 +375,19 @@ LZ4File_init(LZ4File *fs, int size, bool compressing)
        }
        else
        {
-               status = LZ4F_createDecompressionContext(&fs->dtx, LZ4F_VERSION);
+               status = LZ4F_createDecompressionContext(&state->dtx, LZ4F_VERSION);
                if (LZ4F_isError(status))
                {
-                       fs->errcode = status;
+                       state->errcode = status;
                        return false;
                }
 
-               fs->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
-               fs->buffer = pg_malloc(fs->buflen);
+               state->buflen = Max(size, DEFAULT_IO_BUFFER_SIZE);
+               state->buffer = pg_malloc(state->buflen);
 
-               fs->overflowalloclen = fs->buflen;
-               fs->overflowbuf = pg_malloc(fs->overflowalloclen);
-               fs->overflowlen = 0;
+               state->overflowalloclen = state->buflen;
+               state->overflowbuf = pg_malloc(state->overflowalloclen);
+               state->overflowlen = 0;
        }
 
        return true;
@@ -272,28 +404,28 @@ LZ4File_init(LZ4File *fs, int size, bool compressing)
  * the 'ptr' buffer), or 0 if the overflow buffer is empty.
  */
 static int
-LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
+LZ4Stream_read_overflow(LZ4State *state, void *ptr, int size, bool eol_flag)
 {
        char       *p;
        int                     readlen = 0;
 
-       if (fs->overflowlen == 0)
+       if (state->overflowlen == 0)
                return 0;
 
-       if (fs->overflowlen >= size)
+       if (state->overflowlen >= size)
                readlen = size;
        else
-               readlen = fs->overflowlen;
+               readlen = state->overflowlen;
 
-       if (eol_flag && (p = memchr(fs->overflowbuf, '\n', readlen)))
+       if (eol_flag && (p = memchr(state->overflowbuf, '\n', readlen)))
                /* Include the line terminating char */
-               readlen = p - fs->overflowbuf + 1;
+               readlen = p - state->overflowbuf + 1;
 
-       memcpy(ptr, fs->overflowbuf, readlen);
-       fs->overflowlen -= readlen;
+       memcpy(ptr, state->overflowbuf, readlen);
+       state->overflowlen -= readlen;
 
-       if (fs->overflowlen > 0)
-               memmove(fs->overflowbuf, fs->overflowbuf + readlen, fs->overflowlen);
+       if (state->overflowlen > 0)
+               memmove(state->overflowbuf, state->overflowbuf + readlen, state->overflowlen);
 
        return readlen;
 }
@@ -306,7 +438,7 @@ LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
  * char if found first when the eol_flag is set. It is possible that the
  * decompressed output generated by reading any compressed input via the
  * LZ4F API, exceeds 'ptrsize'. Any exceeding decompressed content is stored
- * at an overflow buffer within LZ4File. Of course, when the function is
+ * at an overflow buffer within LZ4State. Of course, when the function is
  * called, it will first try to consume any decompressed content already
  * present in the overflow buffer, before decompressing new content.
  *
@@ -314,7 +446,7 @@ LZ4File_read_overflow(LZ4File *fs, void *ptr, int size, bool eol_flag)
  * buffer, or -1 in case of error.
  */
 static int
-LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
+LZ4Stream_read_internal(LZ4State *state, void *ptr, int ptrsize, bool eol_flag)
 {
        int                     dsize = 0;
        int                     rsize;
@@ -324,18 +456,18 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
        void       *readbuf;
 
        /* Lazy init */
-       if (!LZ4File_init(fs, size, false /* decompressing */ ))
+       if (!LZ4Stream_init(state, size, false /* decompressing */ ))
                return -1;
 
        /* Verify that there is enough space in the outbuf */
-       if (size > fs->buflen)
+       if (size > state->buflen)
        {
-               fs->buflen = size;
-               fs->buffer = pg_realloc(fs->buffer, size);
+               state->buflen = size;
+               state->buffer = pg_realloc(state->buffer, size);
        }
 
        /* use already decompressed content if available */
-       dsize = LZ4File_read_overflow(fs, ptr, size, eol_flag);
+       dsize = LZ4Stream_read_overflow(state, ptr, size, eol_flag);
        if (dsize == size || (eol_flag && memchr(ptr, '\n', dsize)))
                return dsize;
 
@@ -346,8 +478,8 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
                char       *rp;
                char       *rend;
 
-               rsize = fread(readbuf, 1, size, fs->fp);
-               if (rsize < size && !feof(fs->fp))
+               rsize = fread(readbuf, 1, size, state->fp);
+               if (rsize < size && !feof(state->fp))
                        return -1;
 
                rp = (char *) readbuf;
@@ -356,15 +488,15 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
                while (rp < rend)
                {
                        size_t          status;
-                       size_t          outlen = fs->buflen;
+                       size_t          outlen = state->buflen;
                        size_t          read_remain = rend - rp;
 
-                       memset(fs->buffer, 0, outlen);
-                       status = LZ4F_decompress(fs->dtx, fs->buffer, &outlen,
+                       memset(state->buffer, 0, outlen);
+                       status = LZ4F_decompress(state->dtx, state->buffer, &outlen,
                                                                         rp, &read_remain, NULL);
                        if (LZ4F_isError(status))
                        {
-                               fs->errcode = status;
+                               state->errcode = status;
                                return -1;
                        }
 
@@ -382,34 +514,34 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
                                size_t          len = outlen < lib ? outlen : lib;
 
                                if (eol_flag &&
-                                       (p = memchr(fs->buffer, '\n', outlen)) &&
-                                       (size_t) (p - fs->buffer + 1) <= len)
+                                       (p = memchr(state->buffer, '\n', outlen)) &&
+                                       (size_t) (p - state->buffer + 1) <= len)
                                {
-                                       len = p - fs->buffer + 1;
+                                       len = p - state->buffer + 1;
                                        eol_found = true;
                                }
 
-                               memcpy((char *) ptr + dsize, fs->buffer, len);
+                               memcpy((char *) ptr + dsize, state->buffer, len);
                                dsize += len;
 
                                /* move what did not fit, if any, at the beginning of the buf */
                                if (len < outlen)
-                                       memmove(fs->buffer, fs->buffer + len, outlen - len);
+                                       memmove(state->buffer, state->buffer + len, outlen - len);
                                outlen -= len;
                        }
 
                        /* if there is available output, save it */
                        if (outlen > 0)
                        {
-                               while (fs->overflowlen + outlen > fs->overflowalloclen)
+                               while (state->overflowlen + outlen > state->overflowalloclen)
                                {
-                                       fs->overflowalloclen *= 2;
-                                       fs->overflowbuf = pg_realloc(fs->overflowbuf,
-                                                                                                fs->overflowalloclen);
+                                       state->overflowalloclen *= 2;
+                                       state->overflowbuf = pg_realloc(state->overflowbuf,
+                                                                                                       state->overflowalloclen);
                                }
 
-                               memcpy(fs->overflowbuf + fs->overflowlen, fs->buffer, outlen);
-                               fs->overflowlen += outlen;
+                               memcpy(state->overflowbuf + state->overflowlen, state->buffer, outlen);
+                               state->overflowlen += outlen;
                        }
                }
        } while (rsize == size && dsize < size && eol_found == false);
@@ -423,14 +555,14 @@ LZ4File_read_internal(LZ4File *fs, void *ptr, int ptrsize, bool eol_flag)
  * Compress size bytes from ptr and write them to the stream.
  */
 static bool
-LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+LZ4Stream_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        size_t          status;
        int                     remaining = size;
 
        /* Lazy init */
-       if (!LZ4File_init(fs, size, true))
+       if (!LZ4Stream_init(state, size, true))
                return false;
 
        while (remaining > 0)
@@ -439,15 +571,15 @@ LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
 
                remaining -= chunk;
 
-               status = LZ4F_compressUpdate(fs->ctx, fs->buffer, fs->buflen,
+               status = LZ4F_compressUpdate(state->ctx, state->buffer, state->buflen,
                                                                         ptr, chunk, NULL);
                if (LZ4F_isError(status))
                {
-                       fs->errcode = status;
+                       state->errcode = status;
                        return false;
                }
 
-               if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+               if (fwrite(state->buffer, 1, status, state->fp) != status)
                {
                        errno = (errno) ? errno : ENOSPC;
                        return false;
@@ -461,13 +593,13 @@ LZ4File_write(const void *ptr, size_t size, CompressFileHandle *CFH)
  * fread() equivalent implementation for LZ4 compressed files.
  */
 static bool
-LZ4File_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
+LZ4Stream_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        int                     ret;
 
-       if ((ret = LZ4File_read_internal(fs, ptr, size, false)) < 0)
-               pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+       if ((ret = LZ4Stream_read_internal(state, ptr, size, false)) < 0)
+               pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
        if (rsize)
                *rsize = (size_t) ret;
@@ -479,15 +611,15 @@ LZ4File_read(void *ptr, size_t size, size_t *rsize, CompressFileHandle *CFH)
  * fgetc() equivalent implementation for LZ4 compressed files.
  */
 static int
-LZ4File_getc(CompressFileHandle *CFH)
+LZ4Stream_getc(CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        unsigned char c;
 
-       if (LZ4File_read_internal(fs, &c, 1, false) <= 0)
+       if (LZ4Stream_read_internal(state, &c, 1, false) <= 0)
        {
-               if (!LZ4File_eof(CFH))
-                       pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+               if (!LZ4Stream_eof(CFH))
+                       pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
                else
                        pg_fatal("could not read from input file: end of file");
        }
@@ -499,14 +631,14 @@ LZ4File_getc(CompressFileHandle *CFH)
  * fgets() equivalent implementation for LZ4 compressed files.
  */
 static char *
-LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
+LZ4Stream_gets(char *ptr, int size, CompressFileHandle *CFH)
 {
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        int                     ret;
 
-       ret = LZ4File_read_internal(fs, ptr, size, true);
-       if (ret < 0 || (ret == 0 && !LZ4File_eof(CFH)))
-               pg_fatal("could not read from input file: %s", LZ4File_get_error(CFH));
+       ret = LZ4Stream_read_internal(state, ptr, size, true);
+       if (ret < 0 || (ret == 0 && !LZ4Stream_eof(CFH)))
+               pg_fatal("could not read from input file: %s", LZ4Stream_get_error(CFH));
 
        /* Done reading */
        if (ret == 0)
@@ -520,55 +652,55 @@ LZ4File_gets(char *ptr, int size, CompressFileHandle *CFH)
  * remaining content and/or generated footer from the LZ4 API.
  */
 static bool
-LZ4File_close(CompressFileHandle *CFH)
+LZ4Stream_close(CompressFileHandle *CFH)
 {
        FILE       *fp;
-       LZ4File    *fs = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
        size_t          status;
 
-       fp = fs->fp;
-       if (fs->inited)
+       fp = state->fp;
+       if (state->inited)
        {
-               if (fs->compressing)
+               if (state->compressing)
                {
-                       status = LZ4F_compressEnd(fs->ctx, fs->buffer, fs->buflen, NULL);
+                       status = LZ4F_compressEnd(state->ctx, state->buffer, state->buflen, NULL);
                        if (LZ4F_isError(status))
                                pg_fatal("failed to end compression: %s",
                                                 LZ4F_getErrorName(status));
-                       else if (fwrite(fs->buffer, 1, status, fs->fp) != status)
+                       else if (fwrite(state->buffer, 1, status, state->fp) != status)
                        {
                                errno = (errno) ? errno : ENOSPC;
                                WRITE_ERROR_EXIT;
                        }
 
-                       status = LZ4F_freeCompressionContext(fs->ctx);
+                       status = LZ4F_freeCompressionContext(state->ctx);
                        if (LZ4F_isError(status))
                                pg_fatal("failed to end compression: %s",
                                                 LZ4F_getErrorName(status));
                }
                else
                {
-                       status = LZ4F_freeDecompressionContext(fs->dtx);
+                       status = LZ4F_freeDecompressionContext(state->dtx);
                        if (LZ4F_isError(status))
                                pg_fatal("failed to end decompression: %s",
                                                 LZ4F_getErrorName(status));
-                       pg_free(fs->overflowbuf);
+                       pg_free(state->overflowbuf);
                }
 
-               pg_free(fs->buffer);
+               pg_free(state->buffer);
        }
 
-       pg_free(fs);
+       pg_free(state);
 
        return fclose(fp) == 0;
 }
 
 static bool
-LZ4File_open(const char *path, int fd, const char *mode,
-                        CompressFileHandle *CFH)
+LZ4Stream_open(const char *path, int fd, const char *mode,
+                          CompressFileHandle *CFH)
 {
        FILE       *fp;
-       LZ4File    *lz4fp = (LZ4File *) CFH->private_data;
+       LZ4State   *state = (LZ4State *) CFH->private_data;
 
        if (fd >= 0)
                fp = fdopen(fd, mode);
@@ -576,17 +708,17 @@ LZ4File_open(const char *path, int fd, const char *mode,
                fp = fopen(path, mode);
        if (fp == NULL)
        {
-               lz4fp->errcode = errno;
+               state->errcode = errno;
                return false;
        }
 
-       lz4fp->fp = fp;
+       state->fp = fp;
 
        return true;
 }
 
 static bool
-LZ4File_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+LZ4Stream_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
 {
        char       *fname;
        int                     save_errno;
@@ -609,24 +741,24 @@ void
 InitCompressFileHandleLZ4(CompressFileHandle *CFH,
                                                  const pg_compress_specification compression_spec)
 {
-       LZ4File    *lz4fp;
-
-       CFH->open_func = LZ4File_open;
-       CFH->open_write_func = LZ4File_open_write;
-       CFH->read_func = LZ4File_read;
-       CFH->write_func = LZ4File_write;
-       CFH->gets_func = LZ4File_gets;
-       CFH->getc_func = LZ4File_getc;
-       CFH->eof_func = LZ4File_eof;
-       CFH->close_func = LZ4File_close;
-       CFH->get_error_func = LZ4File_get_error;
+       LZ4State   *state;
+
+       CFH->open_func = LZ4Stream_open;
+       CFH->open_write_func = LZ4Stream_open_write;
+       CFH->read_func = LZ4Stream_read;
+       CFH->write_func = LZ4Stream_write;
+       CFH->gets_func = LZ4Stream_gets;
+       CFH->getc_func = LZ4Stream_getc;
+       CFH->eof_func = LZ4Stream_eof;
+       CFH->close_func = LZ4Stream_close;
+       CFH->get_error_func = LZ4Stream_get_error;
 
        CFH->compression_spec = compression_spec;
-       lz4fp = pg_malloc0(sizeof(*lz4fp));
+       state = pg_malloc0(sizeof(*state));
        if (CFH->compression_spec.level >= 0)
-               lz4fp->prefs.compressionLevel = CFH->compression_spec.level;
+               state->prefs.compressionLevel = CFH->compression_spec.level;
 
-       CFH->private_data = lz4fp;
+       CFH->private_data = state;
 }
 #else                                                  /* USE_LZ4 */
 void