pg_dump: Add support for zstd compression
authorTomas Vondra <[email protected]>
Wed, 5 Apr 2023 19:38:04 +0000 (21:38 +0200)
committerTomas Vondra <[email protected]>
Wed, 5 Apr 2023 19:39:33 +0000 (21:39 +0200)
Allow pg_dump to use the zstd compression, in addition to gzip/lz4. Bulk
of the new compression method is implemented in compress_zstd.{c,h},
covering the pg_dump compression APIs. The rest of the patch adds test
and makes various places aware of the new compression method.

The zstd library (which this patch relies on) supports multithreaded
compression since version 1.5. We however disallow that feature for now,
as it might interfere with parallel backups on platforms that rely on
threads (e.g. Windows). This can be improved / relaxed in the future.

This also fixes a minor issue in InitDiscoverCompressFileHandle(), which
was not updated to check if the file already has the .lz4 extension.

Adding zstd compression was originally proposed in 2020 (see the second
thread), but then was reworked to use the new compression API introduced
in e9960732a9. I've considered both threads when compiling the list of
reviewers.

Author: Justin Pryzby
Reviewed-by: Tomas Vondra, Jacob Champion, Andreas Karlsson
Discussion: https://postgr.es/m/20230224191840[email protected]
Discussion: https://postgr.es/m/20201221194924[email protected]

12 files changed:
doc/src/sgml/ref/pg_dump.sgml
src/bin/pg_dump/Makefile
src/bin/pg_dump/compress_io.c
src/bin/pg_dump/compress_zstd.c [new file with mode: 0644]
src/bin/pg_dump/compress_zstd.h [new file with mode: 0644]
src/bin/pg_dump/meson.build
src/bin/pg_dump/pg_backup_archiver.c
src/bin/pg_dump/pg_backup_directory.c
src/bin/pg_dump/pg_dump.c
src/bin/pg_dump/t/002_pg_dump.pl
src/tools/pginclude/cpluspluscheck
src/tools/pgindent/typedefs.list

index 77299878e02d793295e3caa16a07657a1f4d01d5..8de38e0fd0dec3375e2d7391742c530c0c6d2e58 100644 (file)
@@ -330,8 +330,9 @@ PostgreSQL documentation
            machine-readable format that <application>pg_restore</application>
            can read. A directory format archive can be manipulated with
            standard Unix tools; for example, files in an uncompressed archive
-           can be compressed with the <application>gzip</application> or
-           <application>lz4</application> tools.
+           can be compressed with the <application>gzip</application>,
+           <application>lz4</application>, or
+           <application>zstd</application> tools.
            This format is compressed by default using <literal>gzip</literal>
            and also supports parallel dumps.
           </para>
@@ -655,7 +656,8 @@ PostgreSQL documentation
        <para>
         Specify the compression method and/or the compression level to use.
         The compression method can be set to <literal>gzip</literal>,
-        <literal>lz4</literal>, or <literal>none</literal> for no compression.
+        <literal>lz4</literal>, <literal>zstd</literal>,
+        or <literal>none</literal> for no compression.
         A compression detail string can optionally be specified.  If the
         detail string is an integer, it specifies the compression level.
         Otherwise, it should be a comma-separated list of items, each of the
@@ -676,8 +678,9 @@ PostgreSQL documentation
         individual table-data segments, and the default is to compress using
         <literal>gzip</literal> at a moderate level. For plain text output,
         setting a nonzero compression level causes the entire output file to be compressed,
-        as though it had been fed through <application>gzip</application> or
-        <application>lz4</application>; but the default is not to compress.
+        as though it had been fed through <application>gzip</application>,
+        <application>lz4</application>, or <application>zstd</application>;
+        but the default is not to compress.
        </para>
        <para>
         The tar archive format currently does not support compression at all.
index eb8f59459a13c9d6456b10ec157fbaa5a2b28c72..24de7593a6ab63e9f756b897da82a8a4a888d085 100644 (file)
@@ -18,6 +18,7 @@ include $(top_builddir)/src/Makefile.global
 
 export GZIP_PROGRAM=$(GZIP)
 export LZ4
+export ZSTD
 export with_icu
 
 override CPPFLAGS := -I$(libpq_srcdir) $(CPPFLAGS)
@@ -29,6 +30,7 @@ OBJS = \
    compress_io.o \
    compress_lz4.o \
    compress_none.o \
+   compress_zstd.o \
    dumputils.o \
    parallel.o \
    pg_backup_archiver.o \
index 0972a4f934ad154b4dd62a3e7c0e06ff76e565b8..db19058354df647148351b6cde2b7d58f43821c8 100644 (file)
@@ -52,8 +52,8 @@
  *
  * InitDiscoverCompressFileHandle tries to infer the compression by the
  * filename suffix. If the suffix is not yet known then it tries to simply
- * open the file and if it fails, it tries to open the same file with the .gz
- * suffix, and then again with the .lz4 suffix.
+ * open the file and if it fails, it tries to open the same file with
+ * compressed suffixes (.gz, .lz4 and .zst, in this order).
  *
  * IDENTIFICATION
  *    src/bin/pg_dump/compress_io.c
@@ -69,6 +69,7 @@
 #include "compress_io.h"
 #include "compress_lz4.h"
 #include "compress_none.h"
+#include "compress_zstd.h"
 #include "pg_backup_utils.h"
 
 /*----------------------
@@ -77,7 +78,8 @@
  */
 
 /*
- * Checks whether a compression algorithm is supported.
+ * Checks whether support for a compression algorithm is implemented in
+ * pg_dump/restore.
  *
  * On success returns NULL, otherwise returns a malloc'ed string which can be
  * used by the caller in an error message.
@@ -98,6 +100,10 @@ supports_compression(const pg_compress_specification compression_spec)
    if (algorithm == PG_COMPRESSION_LZ4)
        supported = true;
 #endif
+#ifdef USE_ZSTD
+   if (algorithm == PG_COMPRESSION_ZSTD)
+       supported = true;
+#endif
 
    if (!supported)
        return psprintf("this build does not support compression with %s",
@@ -130,6 +136,8 @@ AllocateCompressor(const pg_compress_specification compression_spec,
        InitCompressorGzip(cs, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
        InitCompressorLZ4(cs, compression_spec);
+   else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+       InitCompressorZstd(cs, compression_spec);
 
    return cs;
 }
@@ -196,20 +204,36 @@ InitCompressFileHandle(const pg_compress_specification compression_spec)
        InitCompressFileHandleGzip(CFH, compression_spec);
    else if (compression_spec.algorithm == PG_COMPRESSION_LZ4)
        InitCompressFileHandleLZ4(CFH, compression_spec);
+   else if (compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+       InitCompressFileHandleZstd(CFH, compression_spec);
 
    return CFH;
 }
 
+/*
+ * Checks if a compressed file (with the specified extension) exists.
+ *
+ * The filename of the tested file is stored to fname buffer (the existing
+ * buffer is freed, new buffer is allocated and returned through the pointer).
+ */
+static bool
+check_compressed_file(const char *path, char **fname, char *ext)
+{
+   free_keep_errno(*fname);
+   *fname = psprintf("%s.%s", path, ext);
+   return (access(*fname, F_OK) == 0);
+}
+
 /*
  * Open a file for reading. 'path' is the file to open, and 'mode' should
  * be either "r" or "rb".
  *
  * If the file at 'path' contains the suffix of a supported compression method,
- * currently this includes ".gz" and ".lz4", then this compression will be used
+ * currently this includes ".gz", ".lz4" and ".zst", then this compression will be used
  * throughout. Otherwise the compression will be inferred by iteratively trying
  * to open the file at 'path', first as is, then by appending known compression
  * suffixes. So if you pass "foo" as 'path', this will open either "foo" or
- * "foo.gz" or "foo.lz4", trying in that order.
+ * "foo.{gz,lz4,zst}", trying in that order.
  *
  * On failure, return NULL with an error code in errno.
  */
@@ -229,36 +253,20 @@ InitDiscoverCompressFileHandle(const char *path, const char *mode)
 
    if (hasSuffix(fname, ".gz"))
        compression_spec.algorithm = PG_COMPRESSION_GZIP;
+   else if (hasSuffix(fname, ".lz4"))
+       compression_spec.algorithm = PG_COMPRESSION_LZ4;
+   else if (hasSuffix(fname, ".zst"))
+       compression_spec.algorithm = PG_COMPRESSION_ZSTD;
    else
    {
-       bool        exists;
-
-       exists = (stat(path, &st) == 0);
-       /* avoid unused warning if it is not built with compression */
-       if (exists)
+       if (stat(path, &st) == 0)
            compression_spec.algorithm = PG_COMPRESSION_NONE;
-#ifdef HAVE_LIBZ
-       if (!exists)
-       {
-           free_keep_errno(fname);
-           fname = psprintf("%s.gz", path);
-           exists = (stat(fname, &st) == 0);
-
-           if (exists)
-               compression_spec.algorithm = PG_COMPRESSION_GZIP;
-       }
-#endif
-#ifdef USE_LZ4
-       if (!exists)
-       {
-           free_keep_errno(fname);
-           fname = psprintf("%s.lz4", path);
-           exists = (stat(fname, &st) == 0);
-
-           if (exists)
-               compression_spec.algorithm = PG_COMPRESSION_LZ4;
-       }
-#endif
+       else if (check_compressed_file(path, &fname, "gz"))
+           compression_spec.algorithm = PG_COMPRESSION_GZIP;
+       else if (check_compressed_file(path, &fname, "lz4"))
+           compression_spec.algorithm = PG_COMPRESSION_LZ4;
+       else if (check_compressed_file(path, &fname, "zst"))
+           compression_spec.algorithm = PG_COMPRESSION_ZSTD;
    }
 
    CFH = InitCompressFileHandle(compression_spec);
diff --git a/src/bin/pg_dump/compress_zstd.c b/src/bin/pg_dump/compress_zstd.c
new file mode 100644 (file)
index 0000000..aa16822
--- /dev/null
@@ -0,0 +1,537 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.c
+ *  Routines for archivers to write a Zstd compressed data stream.
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/bin/pg_dump/compress_zstd.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres_fe.h"
+
+#include "pg_backup_utils.h"
+#include "compress_zstd.h"
+
+#ifndef USE_ZSTD
+
+void
+InitCompressorZstd(CompressorState *cs, const pg_compress_specification compression_spec)
+{
+   pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH, const pg_compress_specification compression_spec)
+{
+   pg_fatal("this build does not support compression with %s", "ZSTD");
+}
+
+#else
+
+#include <zstd.h>
+
+typedef struct ZstdCompressorState
+{
+   /* This is a normal file to which we read/write compressed data */
+   FILE       *fp;
+
+   ZSTD_CStream *cstream;
+   ZSTD_DStream *dstream;
+   ZSTD_outBuffer output;
+   ZSTD_inBuffer input;
+
+   /* pointer to a static string like from strerror(), for Zstd_write() */
+   const char *zstderror;
+} ZstdCompressorState;
+
+static ZSTD_CStream *_ZstdCStreamParams(pg_compress_specification compress);
+static void EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs);
+static void WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+                                  const void *data, size_t dLen);
+static void ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs);
+
+static void
+_Zstd_CCtx_setParam_or_die(ZSTD_CStream *cstream,
+                          ZSTD_cParameter param, int value, char *paramname)
+{
+   size_t      res;
+
+   res = ZSTD_CCtx_setParameter(cstream, param, value);
+   if (ZSTD_isError(res))
+       pg_fatal("could not set compression parameter: \"%s\": %s",
+                paramname, ZSTD_getErrorName(res));
+}
+
+/* Return a compression stream with parameters set per argument */
+static ZSTD_CStream *
+_ZstdCStreamParams(pg_compress_specification compress)
+{
+   ZSTD_CStream *cstream;
+
+   cstream = ZSTD_createCStream();
+   if (cstream == NULL)
+       pg_fatal("could not initialize compression library");
+
+   _Zstd_CCtx_setParam_or_die(cstream, ZSTD_c_compressionLevel,
+                              compress.level, "level");
+
+   return cstream;
+}
+
+/* Helper function for WriteDataToArchiveZstd and EndCompressorZstd */
+static void
+_ZstdWriteCommon(ArchiveHandle *AH, CompressorState *cs, bool flush)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+   ZSTD_inBuffer *input = &zstdcs->input;
+   ZSTD_outBuffer *output = &zstdcs->output;
+
+   /* Loop while there's any input or until flushed */
+   while (input->pos != input->size || flush)
+   {
+       size_t      res;
+
+       output->pos = 0;
+       res = ZSTD_compressStream2(zstdcs->cstream, output,
+                                  input, flush ? ZSTD_e_end : ZSTD_e_continue);
+
+       if (ZSTD_isError(res))
+           pg_fatal("could not compress data: %s", ZSTD_getErrorName(res));
+
+       /*
+        * Extra paranoia: avoid zero-length chunks, since a zero length chunk
+        * is the EOF marker in the custom format. This should never happen
+        * but...
+        */
+       if (output->pos > 0)
+           cs->writeF(AH, output->dst, output->pos);
+
+       if (res == 0)
+           break;              /* End of frame or all input consumed */
+   }
+}
+
+static void
+EndCompressorZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+   if (cs->readF != NULL)
+   {
+       Assert(zstdcs->cstream == NULL);
+       ZSTD_freeDStream(zstdcs->dstream);
+       pg_free(unconstify(void *, zstdcs->input.src));
+   }
+   else if (cs->writeF != NULL)
+   {
+       Assert(zstdcs->dstream == NULL);
+       _ZstdWriteCommon(AH, cs, true);
+       ZSTD_freeCStream(zstdcs->cstream);
+       pg_free(zstdcs->output.dst);
+   }
+
+   pg_free(zstdcs);
+}
+
+static void
+WriteDataToArchiveZstd(ArchiveHandle *AH, CompressorState *cs,
+                      const void *data, size_t dLen)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+
+   zstdcs->input.src = data;
+   zstdcs->input.size = dLen;
+   zstdcs->input.pos = 0;
+
+   _ZstdWriteCommon(AH, cs, false);
+}
+
+static void
+ReadDataFromArchiveZstd(ArchiveHandle *AH, CompressorState *cs)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) cs->private_data;
+   ZSTD_outBuffer *output = &zstdcs->output;
+   ZSTD_inBuffer *input = &zstdcs->input;
+   size_t      input_allocated_size = ZSTD_DStreamInSize();
+   size_t      res;
+
+   for (;;)
+   {
+       size_t      cnt;
+
+       /*
+        * Read compressed data.  Note that readF can resize the buffer; the
+        * new size is tracked and used for future loops.
+        */
+       input->size = input_allocated_size;
+       cnt = cs->readF(AH, (char **) unconstify(void **, &input->src), &input->size);
+
+       /* ensure that readF didn't *shrink* the buffer */
+       Assert(input->size >= input_allocated_size);
+       input_allocated_size = input->size;
+       input->size = cnt;
+       input->pos = 0;
+
+       if (cnt == 0)
+           break;
+
+       /* Now decompress */
+       while (input->pos < input->size)
+       {
+           output->pos = 0;
+           res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+           if (ZSTD_isError(res))
+               pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+           /*
+            * then write the decompressed data to the output handle
+            */
+           ((char *) output->dst)[output->pos] = '\0';
+           ahwrite(output->dst, 1, output->pos, AH);
+
+           if (res == 0)
+               break;          /* End of frame */
+       }
+   }
+}
+
+/* Public routine that supports Zstd compressed data I/O */
+void
+InitCompressorZstd(CompressorState *cs,
+                  const pg_compress_specification compression_spec)
+{
+   ZstdCompressorState *zstdcs;
+
+   cs->readData = ReadDataFromArchiveZstd;
+   cs->writeData = WriteDataToArchiveZstd;
+   cs->end = EndCompressorZstd;
+
+   cs->compression_spec = compression_spec;
+
+   zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+   cs->private_data = zstdcs;
+
+   /* We expect that exactly one of readF/writeF is specified */
+   Assert((cs->readF == NULL) != (cs->writeF == NULL));
+
+   if (cs->readF != NULL)
+   {
+       zstdcs->dstream = ZSTD_createDStream();
+       if (zstdcs->dstream == NULL)
+           pg_fatal("could not initialize compression library");
+
+       zstdcs->input.size = ZSTD_DStreamInSize();
+       zstdcs->input.src = pg_malloc(zstdcs->input.size);
+
+       /*
+        * output.size is the buffer size we tell zstd it can output to.
+        * Allocate an additional byte such that ReadDataFromArchiveZstd() can
+        * call ahwrite() with a null-terminated string, which is an optimized
+        * case in ExecuteSqlCommandBuf().
+        */
+       zstdcs->output.size = ZSTD_DStreamOutSize();
+       zstdcs->output.dst = pg_malloc(zstdcs->output.size + 1);
+   }
+   else if (cs->writeF != NULL)
+   {
+       zstdcs->cstream = _ZstdCStreamParams(cs->compression_spec);
+
+       zstdcs->output.size = ZSTD_CStreamOutSize();
+       zstdcs->output.dst = pg_malloc(zstdcs->output.size);
+       zstdcs->output.pos = 0;
+   }
+}
+
+/*
+ * Compressed stream API
+ */
+
+static bool
+Zstd_read(void *ptr, size_t size, size_t *rdsize, CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+   ZSTD_inBuffer *input = &zstdcs->input;
+   ZSTD_outBuffer *output = &zstdcs->output;
+   size_t      input_allocated_size = ZSTD_DStreamInSize();
+   size_t      res,
+               cnt;
+
+   output->size = size;
+   output->dst = ptr;
+   output->pos = 0;
+
+   for (;;)
+   {
+       Assert(input->pos <= input->size);
+       Assert(input->size <= input_allocated_size);
+
+       /*
+        * If the input is completely consumed, start back at the beginning
+        */
+       if (input->pos == input->size)
+       {
+           /* input->size is size produced by "fread" */
+           input->size = 0;
+           /* input->pos is position consumed by decompress */
+           input->pos = 0;
+       }
+
+       /* read compressed data if we must produce more input */
+       if (input->pos == input->size)
+       {
+           cnt = fread(unconstify(void *, input->src), 1, input_allocated_size, zstdcs->fp);
+           input->size = cnt;
+
+           Assert(cnt <= input_allocated_size);
+
+           /* If we have no more input to consume, we're done */
+           if (cnt == 0)
+               break;
+       }
+
+       while (input->pos < input->size)
+       {
+           /* now decompress */
+           res = ZSTD_decompressStream(zstdcs->dstream, output, input);
+
+           if (ZSTD_isError(res))
+               pg_fatal("could not decompress data: %s", ZSTD_getErrorName(res));
+
+           if (output->pos == output->size)
+               break;          /* No more room for output */
+
+           if (res == 0)
+               break;          /* End of frame */
+       }
+
+       if (output->pos == output->size)
+           break;              /* We read all the data that fits */
+   }
+
+   if (rdsize != NULL)
+       *rdsize = output->pos;
+
+   return true;
+}
+
+static bool
+Zstd_write(const void *ptr, size_t size, CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+   ZSTD_inBuffer *input = &zstdcs->input;
+   ZSTD_outBuffer *output = &zstdcs->output;
+   size_t      res,
+               cnt;
+
+   input->src = ptr;
+   input->size = size;
+   input->pos = 0;
+
+   /* Consume all input, to be flushed later */
+   while (input->pos != input->size)
+   {
+       output->pos = 0;
+       res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_continue);
+       if (ZSTD_isError(res))
+       {
+           zstdcs->zstderror = ZSTD_getErrorName(res);
+           return false;
+       }
+
+       cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+       if (cnt != output->pos)
+       {
+           zstdcs->zstderror = strerror(errno);
+           return false;
+       }
+   }
+
+   return size;
+}
+
+static int
+Zstd_getc(CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+   int         ret;
+
+   if (CFH->read_func(&ret, 1, NULL, CFH) != 1)
+   {
+       if (feof(zstdcs->fp))
+           pg_fatal("could not read from input file: end of file");
+       else
+           pg_fatal("could not read from input file: %m");
+   }
+   return ret;
+}
+
+static char *
+Zstd_gets(char *buf, int len, CompressFileHandle *CFH)
+{
+   int         i;
+
+   Assert(len > 0);
+
+   /*
+    * Read one byte at a time until newline or EOF. This is only used to read
+    * the list of LOs, and the I/O is buffered anyway.
+    */
+   for (i = 0; i < len - 1; ++i)
+   {
+       size_t      readsz;
+
+       if (!CFH->read_func(&buf[i], 1, &readsz, CFH))
+           break;
+       if (readsz != 1)
+           break;
+       if (buf[i] == '\n')
+       {
+           ++i;
+           break;
+       }
+   }
+   buf[i] = '\0';
+   return i > 0 ? buf : NULL;
+}
+
+static bool
+Zstd_close(CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+   if (zstdcs->cstream)
+   {
+       size_t      res,
+                   cnt;
+       ZSTD_inBuffer *input = &zstdcs->input;
+       ZSTD_outBuffer *output = &zstdcs->output;
+
+       /* Loop until the compression buffers are fully consumed */
+       for (;;)
+       {
+           output->pos = 0;
+           res = ZSTD_compressStream2(zstdcs->cstream, output, input, ZSTD_e_end);
+           if (ZSTD_isError(res))
+           {
+               zstdcs->zstderror = ZSTD_getErrorName(res);
+               return false;
+           }
+
+           cnt = fwrite(output->dst, 1, output->pos, zstdcs->fp);
+           if (cnt != output->pos)
+           {
+               zstdcs->zstderror = strerror(errno);
+               return false;
+           }
+
+           if (res == 0)
+               break;          /* End of frame */
+       }
+
+       ZSTD_freeCStream(zstdcs->cstream);
+       pg_free(zstdcs->output.dst);
+   }
+
+   if (zstdcs->dstream)
+   {
+       ZSTD_freeDStream(zstdcs->dstream);
+       pg_free(unconstify(void *, zstdcs->input.src));
+   }
+
+   if (fclose(zstdcs->fp) != 0)
+       return false;
+
+   pg_free(zstdcs);
+   return true;
+}
+
+static bool
+Zstd_eof(CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+   return feof(zstdcs->fp);
+}
+
+static bool
+Zstd_open(const char *path, int fd, const char *mode,
+         CompressFileHandle *CFH)
+{
+   FILE       *fp;
+   ZstdCompressorState *zstdcs;
+
+   if (fd >= 0)
+       fp = fdopen(fd, mode);
+   else
+       fp = fopen(path, mode);
+
+   if (fp == NULL)
+       return false;
+
+   zstdcs = (ZstdCompressorState *) pg_malloc0(sizeof(*zstdcs));
+   CFH->private_data = zstdcs;
+   zstdcs->fp = fp;
+
+   if (mode[0] == 'r')
+   {
+       zstdcs->input.src = pg_malloc0(ZSTD_DStreamInSize());
+       zstdcs->dstream = ZSTD_createDStream();
+       if (zstdcs->dstream == NULL)
+           pg_fatal("could not initialize compression library");
+   }
+   else if (mode[0] == 'w' || mode[0] == 'a')
+   {
+       zstdcs->output.size = ZSTD_CStreamOutSize();
+       zstdcs->output.dst = pg_malloc0(zstdcs->output.size);
+       zstdcs->cstream = _ZstdCStreamParams(CFH->compression_spec);
+       if (zstdcs->cstream == NULL)
+           pg_fatal("could not initialize compression library");
+   }
+   else
+       pg_fatal("unhandled mode");
+
+   return true;
+}
+
+static bool
+Zstd_open_write(const char *path, const char *mode, CompressFileHandle *CFH)
+{
+   char        fname[MAXPGPATH];
+
+   sprintf(fname, "%s.zst", path);
+   return CFH->open_func(fname, -1, mode, CFH);
+}
+
+static const char *
+Zstd_get_error(CompressFileHandle *CFH)
+{
+   ZstdCompressorState *zstdcs = (ZstdCompressorState *) CFH->private_data;
+
+   return zstdcs->zstderror;
+}
+
+void
+InitCompressFileHandleZstd(CompressFileHandle *CFH,
+                          const pg_compress_specification compression_spec)
+{
+   CFH->open_func = Zstd_open;
+   CFH->open_write_func = Zstd_open_write;
+   CFH->read_func = Zstd_read;
+   CFH->write_func = Zstd_write;
+   CFH->gets_func = Zstd_gets;
+   CFH->getc_func = Zstd_getc;
+   CFH->close_func = Zstd_close;
+   CFH->eof_func = Zstd_eof;
+   CFH->get_error_func = Zstd_get_error;
+
+   CFH->compression_spec = compression_spec;
+
+   CFH->private_data = NULL;
+}
+
+#endif                         /* USE_ZSTD */
diff --git a/src/bin/pg_dump/compress_zstd.h b/src/bin/pg_dump/compress_zstd.h
new file mode 100644 (file)
index 0000000..2aaa6b1
--- /dev/null
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ *
+ * compress_zstd.h
+ *  Zstd interface to compress_io.c routines
+ *
+ * Portions Copyright (c) 1996-2023, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *    src/bin/pg_dump/compress_zstd.h
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef COMPRESS_ZSTD_H
+#define COMPRESS_ZSTD_H
+
+#include "compress_io.h"
+
+extern void InitCompressorZstd(CompressorState *cs,
+       const pg_compress_specification compression_spec);
+extern void InitCompressFileHandleZstd(CompressFileHandle *CFH,
+       const pg_compress_specification compression_spec);
+
+#endif /* COMPRESS_ZSTD_H */
index b2fb7ac77fd6f50db05ab8c0f8d946d6761a603b..9d59a106f369aef93007651e67acf88a7ae54901 100644 (file)
@@ -5,6 +5,7 @@ pg_dump_common_sources = files(
   'compress_io.c',
   'compress_lz4.c',
   'compress_none.c',
+  'compress_zstd.c',
   'dumputils.c',
   'parallel.c',
   'pg_backup_archiver.c',
@@ -19,7 +20,7 @@ pg_dump_common_sources = files(
 pg_dump_common = static_library('libpgdump_common',
   pg_dump_common_sources,
   c_pch: pch_postgres_fe_h,
-  dependencies: [frontend_code, libpq, lz4, zlib],
+  dependencies: [frontend_code, libpq, lz4, zlib, zstd],
   kwargs: internal_lib_args,
 )
 
@@ -90,6 +91,7 @@ tests += {
     'env': {
       'GZIP_PROGRAM': gzip.path(),
       'LZ4': program_lz4.found() ? program_lz4.path() : '',
+      'ZSTD': program_zstd.found() ? program_zstd.path() : '',
       'with_icu': icu.found() ? 'yes' : 'no',
     },
     'tests': [
index ab77e373e91258829d7abcffb5a03866a922f912..d518349e100b9246e873bf6c6dc43bf118d01cc3 100644 (file)
@@ -2120,7 +2120,7 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 
        /*
         * Check if the specified archive is a directory. If so, check if
-        * there's a "toc.dat" (or "toc.dat.{gz,lz4}") file in it.
+        * there's a "toc.dat" (or "toc.dat.{gz,lz4,zst}") file in it.
         */
        if (stat(AH->fSpec, &st) == 0 && S_ISDIR(st.st_mode))
        {
@@ -2134,6 +2134,10 @@ _discoverArchiveFormat(ArchiveHandle *AH)
 #ifdef USE_LZ4
            if (_fileExistsInDirectory(AH->fSpec, "toc.dat.lz4"))
                return AH->format;
+#endif
+#ifdef USE_ZSTD
+           if (_fileExistsInDirectory(AH->fSpec, "toc.dat.zst"))
+               return AH->format;
 #endif
            pg_fatal("directory \"%s\" does not appear to be a valid archive (\"toc.dat\" does not exist)",
                     AH->fSpec);
index abaaa3b10e3fabbcdafbab7222224af378f66955..2177d5ff425b782136d972eb6796f596f7000028 100644 (file)
@@ -785,6 +785,8 @@ _PrepParallelRestore(ArchiveHandle *AH)
                strlcat(fname, ".gz", sizeof(fname));
            else if (AH->compression_spec.algorithm == PG_COMPRESSION_LZ4)
                strlcat(fname, ".lz4", sizeof(fname));
+           else if (AH->compression_spec.algorithm == PG_COMPRESSION_ZSTD)
+               strlcat(fname, ".zst", sizeof(fname));
 
            if (stat(fname, &st) == 0)
                te->dataLength = st.st_size;
index 6abbcff6834d2e541a96a117ea4116d04167684b..7a504dfe25bc575c8256b9a6036e5ba9ec6cf3e2 100644 (file)
@@ -56,6 +56,7 @@
 #include "catalog/pg_type_d.h"
 #include "common/connect.h"
 #include "common/relpath.h"
+#include "compress_io.h"
 #include "dumputils.h"
 #include "fe_utils/option_utils.h"
 #include "fe_utils/string_utils.h"
@@ -735,18 +736,18 @@ main(int argc, char **argv)
        pg_fatal("invalid compression specification: %s",
                 error_detail);
 
-   switch (compression_algorithm)
-   {
-       case PG_COMPRESSION_NONE:
-           /* fallthrough */
-       case PG_COMPRESSION_GZIP:
-           /* fallthrough */
-       case PG_COMPRESSION_LZ4:
-           break;
-       case PG_COMPRESSION_ZSTD:
-           pg_fatal("compression with %s is not yet supported", "ZSTD");
-           break;
-   }
+   error_detail = supports_compression(compression_spec);
+   if (error_detail != NULL)
+       pg_fatal("%s", error_detail);
+
+   /*
+    * Disable support for zstd workers for now - these are based on threading,
+    * and it's unclear how it interacts with parallel dumps on platforms where
+    * that relies on threads too (e.g. Windows).
+    */
+   if (compression_spec.options & PG_COMPRESSION_OPTION_WORKERS)
+       pg_log_warning("compression option \"%s\" is not currently supported by pg_dump",
+                      "workers");
 
    /*
     * Custom and directory formats are compressed by default with gzip when
index df26ba42d60b91801d4c34196758a7dde38e4e37..b5c97694e32cb1e45f354187d9bd7bccd011f726 100644 (file)
@@ -54,8 +54,9 @@ my $tempdir = PostgreSQL::Test::Utils::tempdir;
 # those lines) to validate that part of the process.
 
 my $supports_icu  = ($ENV{with_icu} eq 'yes');
-my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
 my $supports_gzip = check_pg_config("#define HAVE_LIBZ 1");
+my $supports_lz4  = check_pg_config("#define USE_LZ4 1");
+my $supports_zstd  = check_pg_config("#define USE_ZSTD 1");
 
 my %pgdump_runs = (
    binary_upgrade => {
@@ -213,6 +214,77 @@ my %pgdump_runs = (
        },
    },
 
+   compression_zstd_custom => {
+       test_key       => 'compression',
+       compile_option => 'zstd',
+       dump_cmd       => [
+           'pg_dump',      '--format=custom',
+           '--compress=zstd', "--file=$tempdir/compression_zstd_custom.dump",
+           'postgres',
+       ],
+       restore_cmd => [
+           'pg_restore',
+           "--file=$tempdir/compression_zstd_custom.sql",
+           "$tempdir/compression_zstd_custom.dump",
+       ],
+       command_like => {
+           command => [
+               'pg_restore',
+               '-l', "$tempdir/compression_zstd_custom.dump",
+           ],
+           expected => qr/Compression: zstd/,
+           name => 'data content is zstd compressed'
+       },
+   },
+
+   compression_zstd_dir => {
+       test_key       => 'compression',
+       compile_option => 'zstd',
+       dump_cmd       => [
+           'pg_dump',                              '--jobs=2',
+           '--format=directory',                   '--compress=zstd:1',
+           "--file=$tempdir/compression_zstd_dir", 'postgres',
+       ],
+       # Give coverage for manually compressed blob.toc files during
+       # restore.
+       compress_cmd => {
+           program => $ENV{'ZSTD'},
+           args    => [
+               '-z', '-f', '--rm',
+               "$tempdir/compression_zstd_dir/blobs.toc",
+               "-o", "$tempdir/compression_zstd_dir/blobs.toc.zst",
+           ],
+       },
+       # Verify that data files were compressed
+       glob_patterns => [
+           "$tempdir/compression_zstd_dir/toc.dat",
+           "$tempdir/compression_zstd_dir/*.dat.zst",
+       ],
+       restore_cmd => [
+           'pg_restore', '--jobs=2',
+           "--file=$tempdir/compression_zstd_dir.sql",
+           "$tempdir/compression_zstd_dir",
+       ],
+   },
+
+   compression_zstd_plain => {
+       test_key       => 'compression',
+       compile_option => 'zstd',
+       dump_cmd       => [
+           'pg_dump', '--format=plain', '--compress=zstd',
+           "--file=$tempdir/compression_zstd_plain.sql.zst", 'postgres',
+       ],
+       # Decompress the generated file to run through the tests.
+       compress_cmd => {
+           program => $ENV{'ZSTD'},
+           args    => [
+               '-d', '-f',
+               "$tempdir/compression_zstd_plain.sql.zst",
+               "-o", "$tempdir/compression_zstd_plain.sql",
+           ],
+       },
+   },
+
    clean => {
        dump_cmd => [
            'pg_dump',
@@ -4648,10 +4720,11 @@ foreach my $run (sort keys %pgdump_runs)
    my $test_key = $run;
    my $run_db   = 'postgres';
 
-   # Skip command-level tests for gzip/lz4 if there is no support for it.
+   # Skip command-level tests for gzip/lz4/zstd if the tool is not supported
    if ($pgdump_runs{$run}->{compile_option} &&
        (($pgdump_runs{$run}->{compile_option} eq 'gzip' && !$supports_gzip) ||
-       ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4)))
+       ($pgdump_runs{$run}->{compile_option} eq 'lz4' && !$supports_lz4) ||
+       ($pgdump_runs{$run}->{compile_option} eq 'zstd' && !$supports_zstd)))
    {
        note "$run: skipped due to no $pgdump_runs{$run}->{compile_option} support";
        next;
index b0e9aa99a2cc87b90140dfb193189f5b403d1794..4e09c4686b30b973fd4ca2443d04b5228e904f4b 100755 (executable)
@@ -154,6 +154,7 @@ do
    test "$f" = src/bin/pg_dump/compress_io.h && continue
    test "$f" = src/bin/pg_dump/compress_lz4.h && continue
    test "$f" = src/bin/pg_dump/compress_none.h && continue
+   test "$f" = src/bin/pg_dump/compress_zstd.h && continue
    test "$f" = src/bin/pg_dump/parallel.h && continue
    test "$f" = src/bin/pg_dump/pg_backup_archiver.h && continue
    test "$f" = src/bin/pg_dump/pg_dump.h && continue
index 5c0410869f739308d3c4848d1515328f833328ba..065acb6f50b8f28ee2d50a530916380cd63560c7 100644 (file)
@@ -3937,3 +3937,4 @@ yyscan_t
 z_stream
 z_streamp
 zic_t
+ZSTD_CStream