EXTRA_INSTALL = contrib/test_decoding
 
+REGRESS = pg_logicalinspect
 ISOLATION = logical_inspect
 
 ISOLATION_OPTS = --temp-config $(top_srcdir)/contrib/pg_logicalinspect/logicalinspect.conf
 
--- /dev/null
+CREATE EXTENSION pg_logicalinspect;
+-- ===================================================================
+-- Tests for input validation
+-- ===================================================================
+SELECT pg_get_logical_snapshot_info('0-40796E18.foo');
+ERROR:  invalid snapshot file name "0-40796E18.foo"
+SELECT pg_get_logical_snapshot_info('0--40796E18.snap');
+ERROR:  invalid snapshot file name "0--40796E18.snap"
+SELECT pg_get_logical_snapshot_info('-1--40796E18.snap');
+ERROR:  invalid snapshot file name "-1--40796E18.snap"
+SELECT pg_get_logical_snapshot_info('0/40796E18.foo.snap');
+ERROR:  invalid snapshot file name "0/40796E18.foo.snap"
+SELECT pg_get_logical_snapshot_info('0/40796E18.snap');
+ERROR:  invalid snapshot file name "0/40796E18.snap"
+SELECT pg_get_logical_snapshot_info('');
+ERROR:  invalid snapshot file name ""
+SELECT pg_get_logical_snapshot_info(NULL);
+ pg_get_logical_snapshot_info 
+------------------------------
+ 
+(1 row)
+
+SELECT pg_get_logical_snapshot_info('../snapshots');
+ERROR:  invalid snapshot file name "../snapshots"
+SELECT pg_get_logical_snapshot_info('../snapshots/0-40796E18.snap');
+ERROR:  invalid snapshot file name "../snapshots/0-40796E18.snap"
+SELECT pg_get_logical_snapshot_meta('0-40796E18.foo');
+ERROR:  invalid snapshot file name "0-40796E18.foo"
+SELECT pg_get_logical_snapshot_meta('0-40796E18.foo.snap');
+ERROR:  invalid snapshot file name "0-40796E18.foo.snap"
+SELECT pg_get_logical_snapshot_meta('0/40796E18.snap');
+ERROR:  invalid snapshot file name "0/40796E18.snap"
+SELECT pg_get_logical_snapshot_meta('');
+ERROR:  invalid snapshot file name ""
+SELECT pg_get_logical_snapshot_meta(NULL);
+ pg_get_logical_snapshot_meta 
+------------------------------
+ 
+(1 row)
+
+SELECT pg_get_logical_snapshot_meta('../snapshots');
+ERROR:  invalid snapshot file name "../snapshots"
+-- ===================================================================
+-- Tests for permissions
+-- ===================================================================
+CREATE ROLE regress_pg_logicalinspect;
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_info(text)', 'EXECUTE'); -- no
+ has_function_privilege 
+------------------------
+ f
+(1 row)
+
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_meta(text)', 'EXECUTE'); -- no
+ has_function_privilege 
+------------------------
+ f
+(1 row)
+
+-- Functions accessible by users with role pg_read_server_files.
+GRANT pg_read_server_files TO regress_pg_logicalinspect;
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_info(text)', 'EXECUTE'); -- yes
+ has_function_privilege 
+------------------------
+ t
+(1 row)
+
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_meta(text)', 'EXECUTE'); -- yes
+ has_function_privilege 
+------------------------
+ t
+(1 row)
+
+-- ===================================================================
+-- Clean up
+-- ===================================================================
+DROP ROLE regress_pg_logicalinspect;
+DROP EXTENSION pg_logicalinspect;
 
    return stateDesc;
 }
 
+/*
+ * Extract the LSN from the given serialized snapshot file name.
+ */
+static XLogRecPtr
+parse_snapshot_filename(const char *filename)
+{
+   uint32      hi;
+   uint32      lo;
+   XLogRecPtr  lsn;
+   char        tmpfname[MAXPGPATH];
+
+   /*
+    * Extract the values to build the LSN.
+    *
+    * Note: Including ".snap" doesn't mean that sscanf() also does the format
+    * check including the suffix. The subsequent check validates if the given
+    * filename has the expected suffix.
+    */
+   if (sscanf(filename, "%X-%X.snap", &hi, &lo) != 2)
+       goto parse_error;
+
+   /*
+    * Bring back the extracted LSN to the snapshot file format and compare it
+    * to the given filename. This check strictly checks if the given filename
+    * follows the format of the snapshot filename.
+    */
+   sprintf(tmpfname, "%X-%X.snap", hi, lo);
+   if (strcmp(tmpfname, filename) != 0)
+       goto parse_error;
+
+   lsn = ((uint64) hi) << 32 | lo;
+
+   return lsn;
+
+parse_error:
+   ereport(ERROR,
+           errmsg("invalid snapshot file name \"%s\"", filename));
+}
+
 /*
  * Retrieve the logical snapshot file metadata.
  */
    Datum       values[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
    bool        nulls[PG_GET_LOGICAL_SNAPSHOT_META_COLS] = {0};
    TupleDesc   tupdesc;
-   char        path[MAXPGPATH];
+   XLogRecPtr  lsn;
    int         i = 0;
    text       *filename_t = PG_GETARG_TEXT_PP(0);
 
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");
 
-   sprintf(path, "%s/%s",
-           PG_LOGICAL_SNAPSHOTS_DIR,
-           text_to_cstring(filename_t));
+   lsn = parse_snapshot_filename(text_to_cstring(filename_t));
 
    /* Validate and restore the snapshot to 'ondisk' */
-   SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
+   SnapBuildRestoreSnapshot(&ondisk, lsn, CurrentMemoryContext, false);
 
    values[i++] = UInt32GetDatum(ondisk.magic);
    values[i++] = Int64GetDatum((int64) ondisk.checksum);
    Datum       values[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
    bool        nulls[PG_GET_LOGICAL_SNAPSHOT_INFO_COLS] = {0};
    TupleDesc   tupdesc;
-   char        path[MAXPGPATH];
+   XLogRecPtr  lsn;
    int         i = 0;
    text       *filename_t = PG_GETARG_TEXT_PP(0);
 
    if (get_call_result_type(fcinfo, NULL, &tupdesc) != TYPEFUNC_COMPOSITE)
        elog(ERROR, "return type must be a row type");
 
-   sprintf(path, "%s/%s",
-           PG_LOGICAL_SNAPSHOTS_DIR,
-           text_to_cstring(filename_t));
+   lsn = parse_snapshot_filename(text_to_cstring(filename_t));
 
    /* Validate and restore the snapshot to 'ondisk' */
-   SnapBuildRestoreSnapshot(&ondisk, path, CurrentMemoryContext, false);
+   SnapBuildRestoreSnapshot(&ondisk, lsn, CurrentMemoryContext, false);
 
    values[i++] = CStringGetTextDatum(get_snapbuild_state_desc(ondisk.builder.state));
    values[i++] = TransactionIdGetDatum(ondisk.builder.xmin);
 
--- /dev/null
+CREATE EXTENSION pg_logicalinspect;
+
+-- ===================================================================
+-- Tests for input validation
+-- ===================================================================
+
+SELECT pg_get_logical_snapshot_info('0-40796E18.foo');
+SELECT pg_get_logical_snapshot_info('0--40796E18.snap');
+SELECT pg_get_logical_snapshot_info('-1--40796E18.snap');
+SELECT pg_get_logical_snapshot_info('0/40796E18.foo.snap');
+SELECT pg_get_logical_snapshot_info('0/40796E18.snap');
+SELECT pg_get_logical_snapshot_info('');
+SELECT pg_get_logical_snapshot_info(NULL);
+SELECT pg_get_logical_snapshot_info('../snapshots');
+SELECT pg_get_logical_snapshot_info('../snapshots/0-40796E18.snap');
+
+SELECT pg_get_logical_snapshot_meta('0-40796E18.foo');
+SELECT pg_get_logical_snapshot_meta('0-40796E18.foo.snap');
+SELECT pg_get_logical_snapshot_meta('0/40796E18.snap');
+SELECT pg_get_logical_snapshot_meta('');
+SELECT pg_get_logical_snapshot_meta(NULL);
+SELECT pg_get_logical_snapshot_meta('../snapshots');
+
+-- ===================================================================
+-- Tests for permissions
+-- ===================================================================
+CREATE ROLE regress_pg_logicalinspect;
+
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_info(text)', 'EXECUTE'); -- no
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_meta(text)', 'EXECUTE'); -- no
+
+-- Functions accessible by users with role pg_read_server_files.
+GRANT pg_read_server_files TO regress_pg_logicalinspect;
+
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_info(text)', 'EXECUTE'); -- yes
+SELECT has_function_privilege('regress_pg_logicalinspect',
+  'pg_get_logical_snapshot_meta(text)', 'EXECUTE'); -- yes
+
+-- ===================================================================
+-- Clean up
+-- ===================================================================
+
+DROP ROLE regress_pg_logicalinspect;
+
+DROP EXTENSION pg_logicalinspect;
 
  * If 'missing_ok' is true, will not throw an error if the file is not found.
  */
 bool
-SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, const char *path,
+SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
                         MemoryContext context, bool missing_ok)
 {
    int         fd;
    pg_crc32c   checksum;
    Size        sz;
+   char        path[MAXPGPATH];
+
+   sprintf(path, "%s/%X-%X.snap",
+           PG_LOGICAL_SNAPSHOTS_DIR,
+           LSN_FORMAT_ARGS(lsn));
 
    fd = OpenTransientFile(path, O_RDONLY | PG_BINARY);
 
 SnapBuildRestore(SnapBuild *builder, XLogRecPtr lsn)
 {
    SnapBuildOnDisk ondisk;
-   char        path[MAXPGPATH];
 
    /* no point in loading a snapshot if we're already there */
    if (builder->state == SNAPBUILD_CONSISTENT)
        return false;
 
-   sprintf(path, "%s/%X-%X.snap",
-           PG_LOGICAL_SNAPSHOTS_DIR,
-           LSN_FORMAT_ARGS(lsn));
-
    /* validate and restore the snapshot to 'ondisk' */
-   if (!SnapBuildRestoreSnapshot(&ondisk, path, builder->context, true))
+   if (!SnapBuildRestoreSnapshot(&ondisk, lsn, builder->context, true))
        return false;
 
    /*
 
    /* variable amount of TransactionIds follows */
 } SnapBuildOnDisk;
 
-extern bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, const char *path,
+extern bool SnapBuildRestoreSnapshot(SnapBuildOnDisk *ondisk, XLogRecPtr lsn,
                                     MemoryContext context, bool missing_ok);
 
 #endif                         /* SNAPBUILD_INTERNAL_H */