Support INCLUDE'd columns in SP-GiST.
authorTom Lane <[email protected]>
Mon, 5 Apr 2021 22:41:09 +0000 (18:41 -0400)
committerTom Lane <[email protected]>
Mon, 5 Apr 2021 22:41:21 +0000 (18:41 -0400)
Not much to say here: does what it says on the tin.
We steal a previously-always-zero bit from the nextOffset
field of leaf index tuples in order to track whether there
is a nulls bitmap.  Otherwise it works about like included
columns in other index types.

Pavel Borisov, reviewed by Andrey Borodin and Anastasia Lubennikova,
and rather heavily editorialized on by me

Discussion: https://postgr.es/m/CALT9ZEFi-vMp4faht9f9Junb1nO3NOSjhpxTmbm1UGLMsLqiEQ@mail.gmail.com

21 files changed:
doc/src/sgml/indices.sgml
doc/src/sgml/ref/create_index.sgml
doc/src/sgml/spgist.sgml
doc/src/sgml/xindex.sgml
src/backend/access/common/indextuple.c
src/backend/access/spgist/README
src/backend/access/spgist/spgdoinsert.c
src/backend/access/spgist/spginsert.c
src/backend/access/spgist/spgscan.c
src/backend/access/spgist/spgutils.c
src/backend/access/spgist/spgvacuum.c
src/backend/access/spgist/spgxlog.c
src/include/access/itup.h
src/include/access/spgist_private.h
src/test/modules/spgist_name_ops/expected/spgist_name_ops.out
src/test/modules/spgist_name_ops/sql/spgist_name_ops.sql
src/test/regress/expected/amutils.out
src/test/regress/expected/create_index_spgist.out
src/test/regress/expected/index_including.out
src/test/regress/sql/create_index_spgist.sql
src/test/regress/sql/index_including.sql

index 623962d1d89bd89fc9dbbbb1159c70ddff5060b2..56fbd45178362a2ac43958058d6b43fee1c66b20 100644 (file)
@@ -409,9 +409,11 @@ CREATE INDEX test2_mm_idx ON test2 (major, minor);
   </para>
 
   <para>
-   Currently, only the B-tree, GiST, GIN, and BRIN
-   index types support multicolumn
-   indexes.  Up to 32 columns can be specified.  (This limit can be
+   Currently, only the B-tree, GiST, GIN, and BRIN index types support
+   multiple-key-column indexes.  Whether there can be multiple key
+   columns is independent of whether <literal>INCLUDE</literal> columns
+   can be added to the index.  Indexes can have up to 32 columns,
+   including <literal>INCLUDE</literal> columns.  (This limit can be
    altered when building <productname>PostgreSQL</productname>; see the
    file <filename>pg_config_manual.h</filename>.)
   </para>
@@ -1208,8 +1210,8 @@ CREATE UNIQUE INDEX tab_x_y ON tab(x) INCLUDE (y);
    likely to not need to access the heap.  If the heap tuple must be visited
    anyway, it costs nothing more to get the column's value from there.
    Other restrictions are that expressions are not currently supported as
-   included columns, and that only B-tree and GiST indexes currently support
-   included columns.
+   included columns, and that only B-tree, GiST and SP-GiST indexes currently
+   support included columns.
   </para>
 
   <para>
index 682af4bbe4f1d251c0fd05fa5ac1e996d69ea968..cc484d5b3980aa30e7df0230f12a40b69ebbd584 100644 (file)
@@ -187,8 +187,8 @@ CREATE [ UNIQUE ] INDEX [ CONCURRENTLY ] [ [ IF NOT EXISTS ] <replaceable class=
        </para>
 
        <para>
-        Currently, the B-tree and the GiST index access methods support this
-        feature.  In B-tree and the GiST indexes, the values of columns listed
+        Currently, the B-tree, GiST and SP-GiST index access methods support
+        this feature.  In these indexes, the values of columns listed
         in the <literal>INCLUDE</literal> clause are included in leaf tuples
         which correspond to heap tuples, but are not included in upper-level
         index entries used for tree navigation.
@@ -695,7 +695,10 @@ Indexes:
 
   <para>
    Currently, only the B-tree, GiST, GIN, and BRIN index methods support
-   multicolumn indexes. Up to 32 fields can be specified by default.
+   multiple-key-column indexes.  Whether there can be multiple key
+   columns is independent of whether <literal>INCLUDE</literal> columns
+   can be added to the index.  Indexes can have up to 32 columns,
+   including <literal>INCLUDE</literal> columns.
    (This limit can be altered when building
    <productname>PostgreSQL</productname>.)  Only B-tree currently
    supports unique indexes.
index 054234784fa1440a43b287ff6e890366e95fa4c2..cfb2b3c836f190979cc68503de01b44399ddd812 100644 (file)
   inner tuples that are passed through to reach the leaf level.
  </para>
 
+ <para>
+  When an <acronym>SP-GiST</acronym> index is created with
+  <literal>INCLUDE</literal> columns, the values of those columns are also
+  stored in leaf tuples.  The <literal>INCLUDE</literal> columns are of no
+  concern to the <acronym>SP-GiST</acronym> operator class, so they are
+  not discussed further here.
+ </para>
+
  <para>
   Inner tuples are more complex, since they are branching points in the
   search tree.  Each inner tuple contains a set of one or more
index 609fa35d4cad799d27c9ea300921542d776a047d..0a4fe9a776ab589e2b3861c941a5298b15ecec88 100644 (file)
@@ -1426,11 +1426,15 @@ CREATE OPERATOR CLASS polygon_ops
         STORAGE box;
 </programlisting>
 
-   At present, only the GiST, GIN and BRIN index methods support a
+   At present, only the GiST, SP-GiST, GIN and BRIN index methods support a
    <literal>STORAGE</literal> type that's different from the column data type.
    The GiST <function>compress</function> and <function>decompress</function> support
    routines must deal with data-type conversion when <literal>STORAGE</literal>
-   is used.  In GIN, the <literal>STORAGE</literal> type identifies the type of
+   is used.  SP-GiST likewise requires a <function>compress</function>
+   support function to convert to the storage type, when that is different;
+   if an SP-GiST opclass also supports retrieving data, the reverse
+   conversion must be handled by the <function>consistent</function> function.
+   In GIN, the <literal>STORAGE</literal> type identifies the type of
    the <quote>key</quote> values, which normally is different from the type
    of the indexed column &mdash; for example, an operator class for
    integer-array columns might have keys that are just integers.  The
index ae932691af84fedabd0205e36ef72762f9b8a3d5..a4cb8914cc611887ada0954fc0b1f3d78a363ef4 100644 (file)
@@ -446,22 +446,37 @@ void
 index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
                   Datum *values, bool *isnull)
 {
-   int         hasnulls = IndexTupleHasNulls(tup);
-   int         natts = tupleDescriptor->natts; /* number of atts to extract */
-   int         attnum;
    char       *tp;             /* ptr to tuple data */
-   int         off;            /* offset in tuple data */
    bits8      *bp;             /* ptr to null bitmap in tuple */
-   bool        slow = false;   /* can we use/set attcacheoff? */
-
-   /* Assert to protect callers who allocate fixed-size arrays */
-   Assert(natts <= INDEX_MAX_KEYS);
 
    /* XXX "knows" t_bits are just after fixed tuple header! */
    bp = (bits8 *) ((char *) tup + sizeof(IndexTupleData));
 
    tp = (char *) tup + IndexInfoFindDataOffset(tup->t_info);
-   off = 0;
+
+   index_deform_tuple_internal(tupleDescriptor, values, isnull,
+                               tp, bp, IndexTupleHasNulls(tup));
+}
+
+/*
+ * Convert an index tuple into Datum/isnull arrays,
+ * without assuming any specific layout of the index tuple header.
+ *
+ * Caller must supply pointer to data area, pointer to nulls bitmap
+ * (which can be NULL if !hasnulls), and hasnulls flag.
+ */
+void
+index_deform_tuple_internal(TupleDesc tupleDescriptor,
+                           Datum *values, bool *isnull,
+                           char *tp, bits8 *bp, int hasnulls)
+{
+   int         natts = tupleDescriptor->natts; /* number of atts to extract */
+   int         attnum;
+   int         off = 0;        /* offset in tuple data */
+   bool        slow = false;   /* can we use/set attcacheoff? */
+
+   /* Assert to protect callers who allocate fixed-size arrays */
+   Assert(natts <= INDEX_MAX_KEYS);
 
    for (attnum = 0; attnum < natts; attnum++)
    {
index b55b07383206401aac9aadc5b51904b666b039ec..7117e02c7703fae6c638c835eac7e68f09c22dd7 100644 (file)
@@ -56,7 +56,7 @@ list and there is no free space on page, then SP-GiST creates a new inner
 tuple and distributes leaf tuples into a set of lists on, perhaps, several
 pages.
 
-Inner tuple consists of:
+An inner tuple consists of:
 
   optional prefix value - all successors must be consistent with it.
     Example:
@@ -67,14 +67,26 @@ Inner tuple consists of:
   list of nodes, where node is a (label, pointer) pair.
     Example of a label: a single character for radix tree
 
-Leaf tuple consists of:
+A leaf tuple consists of:
 
   a leaf value
     Example:
         radix tree - the rest of string (postfix)
         quad and k-d tree - the point itself
 
-  ItemPointer to the heap
+  ItemPointer to the corresponding heap tuple
+  nextOffset number of next leaf tuple in a chain on a leaf page
+
+  optional nulls bitmask
+  optional INCLUDE-column values
+
+For compatibility with pre-v14 indexes, a leaf tuple has a nulls bitmask
+only if there are null values (among the leaf value and the INCLUDE values)
+*and* there is at least one INCLUDE column.  The null-ness of the leaf
+value can be inferred from whether the tuple is on a "nulls page" (see below)
+so it is not necessary to represent it explicitly.  But we include it anyway
+in a bitmask used with INCLUDE values, so that standard tuple deconstruction
+code can be used.
 
 
 NULLS HANDLING
index e14c71abd07ca4f61b7c3b4796dba146084d1040..4d380c99f06f59f046491f769f0816d678282d88 100644 (file)
@@ -220,7 +220,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
        SpGistBlockIsRoot(current->blkno))
    {
        /* Tuple is not part of a chain */
-       leafTuple->nextOffset = InvalidOffsetNumber;
+       SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
        current->offnum = SpGistPageAddNewItem(state, current->page,
                                               (Item) leafTuple, leafTuple->size,
                                               NULL, false);
@@ -253,7 +253,7 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
                                             PageGetItemId(current->page, current->offnum));
        if (head->tupstate == SPGIST_LIVE)
        {
-           leafTuple->nextOffset = head->nextOffset;
+           SGLT_SET_NEXTOFFSET(leafTuple, SGLT_GET_NEXTOFFSET(head));
            offnum = SpGistPageAddNewItem(state, current->page,
                                          (Item) leafTuple, leafTuple->size,
                                          NULL, false);
@@ -264,14 +264,14 @@ addLeafTuple(Relation index, SpGistState *state, SpGistLeafTuple leafTuple,
             */
            head = (SpGistLeafTuple) PageGetItem(current->page,
                                                 PageGetItemId(current->page, current->offnum));
-           head->nextOffset = offnum;
+           SGLT_SET_NEXTOFFSET(head, offnum);
 
            xlrec.offnumLeaf = offnum;
            xlrec.offnumHeadLeaf = current->offnum;
        }
        else if (head->tupstate == SPGIST_DEAD)
        {
-           leafTuple->nextOffset = InvalidOffsetNumber;
+           SGLT_SET_NEXTOFFSET(leafTuple, InvalidOffsetNumber);
            PageIndexTupleDelete(current->page, current->offnum);
            if (PageAddItem(current->page,
                            (Item) leafTuple, leafTuple->size,
@@ -362,13 +362,13 @@ checkSplitConditions(Relation index, SpGistState *state,
        {
            /* We could see a DEAD tuple as first/only chain item */
            Assert(i == current->offnum);
-           Assert(it->nextOffset == InvalidOffsetNumber);
+           Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
            /* Don't count it in result, because it won't go to other page */
        }
        else
            elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-       i = it->nextOffset;
+       i = SGLT_GET_NEXTOFFSET(it);
    }
 
    *nToSplit = n;
@@ -437,7 +437,7 @@ moveLeafs(Relation index, SpGistState *state,
        {
            /* We could see a DEAD tuple as first/only chain item */
            Assert(i == current->offnum);
-           Assert(it->nextOffset == InvalidOffsetNumber);
+           Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
            /* We don't want to move it, so don't count it in size */
            toDelete[nDelete] = i;
            nDelete++;
@@ -446,7 +446,7 @@ moveLeafs(Relation index, SpGistState *state,
        else
            elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-       i = it->nextOffset;
+       i = SGLT_GET_NEXTOFFSET(it);
    }
 
    /* Find a leaf page that will hold them */
@@ -475,7 +475,7 @@ moveLeafs(Relation index, SpGistState *state,
             * don't care).  We're modifying the tuple on the source page
             * here, but it's okay since we're about to delete it.
             */
-           it->nextOffset = r;
+           SGLT_SET_NEXTOFFSET(it, r);
 
            r = SpGistPageAddNewItem(state, npage, (Item) it, it->size,
                                     &startOffset, false);
@@ -490,7 +490,7 @@ moveLeafs(Relation index, SpGistState *state,
    }
 
    /* add the new tuple as well */
-   newLeafTuple->nextOffset = r;
+   SGLT_SET_NEXTOFFSET(newLeafTuple, r);
    r = SpGistPageAddNewItem(state, npage,
                             (Item) newLeafTuple, newLeafTuple->size,
                             &startOffset, false);
@@ -690,14 +690,16 @@ doPickSplit(Relation index, SpGistState *state,
               *nodes;
    Buffer      newInnerBuffer,
                newLeafBuffer;
-   ItemPointerData *heapPtrs;
    uint8      *leafPageSelect;
    int        *leafSizes;
    OffsetNumber *toDelete;
    OffsetNumber *toInsert;
    OffsetNumber redirectTuplePos = InvalidOffsetNumber;
    OffsetNumber startOffsets[2];
+   SpGistLeafTuple *oldLeafs;
    SpGistLeafTuple *newLeafs;
+   Datum       leafDatums[INDEX_MAX_KEYS];
+   bool        leafIsnulls[INDEX_MAX_KEYS];
    int         spaceToDelete;
    int         currentFreeSpace;
    int         totalLeafSizes;
@@ -718,9 +720,9 @@ doPickSplit(Relation index, SpGistState *state,
    max = PageGetMaxOffsetNumber(current->page);
    n = max + 1;
    in.datums = (Datum *) palloc(sizeof(Datum) * n);
-   heapPtrs = (ItemPointerData *) palloc(sizeof(ItemPointerData) * n);
    toDelete = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
    toInsert = (OffsetNumber *) palloc(sizeof(OffsetNumber) * n);
+   oldLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
    newLeafs = (SpGistLeafTuple *) palloc(sizeof(SpGistLeafTuple) * n);
    leafPageSelect = (uint8 *) palloc(sizeof(uint8) * n);
 
@@ -752,7 +754,7 @@ doPickSplit(Relation index, SpGistState *state,
            {
                in.datums[nToInsert] =
                    isNulls ? (Datum) 0 : SGLTDATUM(it, state);
-               heapPtrs[nToInsert] = it->heapPtr;
+               oldLeafs[nToInsert] = it;
                nToInsert++;
                toDelete[nToDelete] = i;
                nToDelete++;
@@ -778,7 +780,7 @@ doPickSplit(Relation index, SpGistState *state,
            {
                in.datums[nToInsert] =
                    isNulls ? (Datum) 0 : SGLTDATUM(it, state);
-               heapPtrs[nToInsert] = it->heapPtr;
+               oldLeafs[nToInsert] = it;
                nToInsert++;
                toDelete[nToDelete] = i;
                nToDelete++;
@@ -790,7 +792,7 @@ doPickSplit(Relation index, SpGistState *state,
            {
                /* We could see a DEAD tuple as first/only chain item */
                Assert(i == current->offnum);
-               Assert(it->nextOffset == InvalidOffsetNumber);
+               Assert(SGLT_GET_NEXTOFFSET(it) == InvalidOffsetNumber);
                toDelete[nToDelete] = i;
                nToDelete++;
                /* replacing it with redirect will save no space */
@@ -798,7 +800,7 @@ doPickSplit(Relation index, SpGistState *state,
            else
                elog(ERROR, "unexpected SPGiST tuple state: %d", it->tupstate);
 
-           i = it->nextOffset;
+           i = SGLT_GET_NEXTOFFSET(it);
        }
    }
    in.nTuples = nToInsert;
@@ -811,7 +813,7 @@ doPickSplit(Relation index, SpGistState *state,
     */
    in.datums[in.nTuples] =
        isNulls ? (Datum) 0 : SGLTDATUM(newLeafTuple, state);
-   heapPtrs[in.nTuples] = newLeafTuple->heapPtr;
+   oldLeafs[in.nTuples] = newLeafTuple;
    in.nTuples++;
 
    memset(&out, 0, sizeof(out));
@@ -833,9 +835,19 @@ doPickSplit(Relation index, SpGistState *state,
        totalLeafSizes = 0;
        for (i = 0; i < in.nTuples; i++)
        {
-           newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-                                          out.leafTupleDatums[i],
-                                          false);
+           if (state->leafTupDesc->natts > 1)
+               spgDeformLeafTuple(oldLeafs[i],
+                                  state->leafTupDesc,
+                                  leafDatums,
+                                  leafIsnulls,
+                                  isNulls);
+
+           leafDatums[spgKeyColumn] = out.leafTupleDatums[i];
+           leafIsnulls[spgKeyColumn] = false;
+
+           newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
+                                          leafDatums,
+                                          leafIsnulls);
            totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
        }
    }
@@ -856,9 +868,22 @@ doPickSplit(Relation index, SpGistState *state,
        totalLeafSizes = 0;
        for (i = 0; i < in.nTuples; i++)
        {
-           newLeafs[i] = spgFormLeafTuple(state, heapPtrs + i,
-                                          (Datum) 0,
-                                          true);
+           if (state->leafTupDesc->natts > 1)
+               spgDeformLeafTuple(oldLeafs[i],
+                                  state->leafTupDesc,
+                                  leafDatums,
+                                  leafIsnulls,
+                                  isNulls);
+
+           /*
+            * Nulls tree can contain only null key values.
+            */
+           leafDatums[spgKeyColumn] = (Datum) 0;
+           leafIsnulls[spgKeyColumn] = true;
+
+           newLeafs[i] = spgFormLeafTuple(state, &oldLeafs[i]->heapPtr,
+                                          leafDatums,
+                                          leafIsnulls);
            totalLeafSizes += newLeafs[i]->size + sizeof(ItemIdData);
        }
    }
@@ -1192,10 +1217,10 @@ doPickSplit(Relation index, SpGistState *state,
        if (ItemPointerIsValid(&nodes[n]->t_tid))
        {
            Assert(ItemPointerGetBlockNumber(&nodes[n]->t_tid) == leafBlock);
-           it->nextOffset = ItemPointerGetOffsetNumber(&nodes[n]->t_tid);
+           SGLT_SET_NEXTOFFSET(it, ItemPointerGetOffsetNumber(&nodes[n]->t_tid));
        }
        else
-           it->nextOffset = InvalidOffsetNumber;
+           SGLT_SET_NEXTOFFSET(it, InvalidOffsetNumber);
 
        /* Insert it on page */
        newoffset = SpGistPageAddNewItem(state, BufferGetPage(leafBuffer),
@@ -1885,10 +1910,12 @@ spgSplitNodeAction(Relation index, SpGistState *state,
  */
 bool
 spgdoinsert(Relation index, SpGistState *state,
-           ItemPointer heapPtr, Datum datum, bool isnull)
+           ItemPointer heapPtr, Datum *datums, bool *isnulls)
 {
+   TupleDesc   leafDescriptor = state->leafTupDesc;
+   bool        isnull = isnulls[spgKeyColumn];
    int         level = 0;
-   Datum       leafDatum;
+   Datum       leafDatums[INDEX_MAX_KEYS];
    int         leafSize;
    SPPageDesc  current,
                parent;
@@ -1905,8 +1932,8 @@ spgdoinsert(Relation index, SpGistState *state,
     * Prepare the leaf datum to insert.
     *
     * If an optional "compress" method is provided, then call it to form the
-    * leaf datum from the input datum.  Otherwise store the input datum as
-    * is.  Since we don't use index_form_tuple in this AM, we have to make
+    * leaf key datum from the input datum.  Otherwise, store the input datum
+    * as is.  Since we don't use index_form_tuple in this AM, we have to make
     * sure value to be inserted is not toasted; FormIndexDatum doesn't
     * guarantee that.  But we assume the "compress" method to return an
     * untoasted value.
@@ -1918,32 +1945,43 @@ spgdoinsert(Relation index, SpGistState *state,
            FmgrInfo   *compressProcinfo = NULL;
 
            compressProcinfo = index_getprocinfo(index, 1, SPGIST_COMPRESS_PROC);
-           leafDatum = FunctionCall1Coll(compressProcinfo,
-                                         index->rd_indcollation[0],
-                                         datum);
+           leafDatums[spgKeyColumn] =
+               FunctionCall1Coll(compressProcinfo,
+                                 index->rd_indcollation[spgKeyColumn],
+                                 datums[spgKeyColumn]);
        }
        else
        {
            Assert(state->attLeafType.type == state->attType.type);
 
            if (state->attType.attlen == -1)
-               leafDatum = PointerGetDatum(PG_DETOAST_DATUM(datum));
+               leafDatums[spgKeyColumn] =
+                   PointerGetDatum(PG_DETOAST_DATUM(datums[spgKeyColumn]));
            else
-               leafDatum = datum;
+               leafDatums[spgKeyColumn] = datums[spgKeyColumn];
        }
    }
    else
-       leafDatum = (Datum) 0;
+       leafDatums[spgKeyColumn] = (Datum) 0;
+
+   /* Likewise, ensure that any INCLUDE values are not toasted */
+   for (int i = spgFirstIncludeColumn; i < leafDescriptor->natts; i++)
+   {
+       if (!isnulls[i])
+       {
+           if (TupleDescAttr(leafDescriptor, i)->attlen == -1)
+               leafDatums[i] = PointerGetDatum(PG_DETOAST_DATUM(datums[i]));
+           else
+               leafDatums[i] = datums[i];
+       }
+       else
+           leafDatums[i] = (Datum) 0;
+   }
 
    /*
-    * Compute space needed for a leaf tuple containing the given datum. This
-    * must match spgFormLeafTuple.
+    * Compute space needed for a leaf tuple containing the given data.
     */
-   leafSize = SGLTHDRSZ;
-   if (!isnull)
-       leafSize += SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
-   if (leafSize < SGDTSIZE)
-       leafSize = SGDTSIZE;
+   leafSize = SpGistGetLeafTupleSize(leafDescriptor, leafDatums, isnulls);
    /* Account for an item pointer, too */
    leafSize += sizeof(ItemIdData);
 
@@ -2048,7 +2086,7 @@ spgdoinsert(Relation index, SpGistState *state,
            int         nToSplit,
                        sizeToSplit;
 
-           leafTuple = spgFormLeafTuple(state, heapPtr, leafDatum, isnull);
+           leafTuple = spgFormLeafTuple(state, heapPtr, leafDatums, isnulls);
            if (leafTuple->size + sizeof(ItemIdData) <=
                SpGistPageGetFreeSpace(current.page, 1))
            {
@@ -2110,8 +2148,8 @@ spgdoinsert(Relation index, SpGistState *state,
            innerTuple = (SpGistInnerTuple) PageGetItem(current.page,
                                                        PageGetItemId(current.page, current.offnum));
 
-           in.datum = datum;
-           in.leafDatum = leafDatum;
+           in.datum = datums[spgKeyColumn];
+           in.leafDatum = leafDatums[spgKeyColumn];
            in.level = level;
            in.allTheSame = innerTuple->allTheSame;
            in.hasPrefix = (innerTuple->prefixSize > 0);
@@ -2160,9 +2198,9 @@ spgdoinsert(Relation index, SpGistState *state,
                    /* Replace leafDatum and recompute leafSize */
                    if (!isnull)
                    {
-                       leafDatum = out.result.matchNode.restDatum;
-                       leafSize = SGLTHDRSZ +
-                           SpGistGetLeafTypeSize(&state->attLeafType, leafDatum);
+                       leafDatums[spgKeyColumn] = out.result.matchNode.restDatum;
+                       leafSize = SpGistGetLeafTupleSize(leafDescriptor,
+                                                         leafDatums, isnulls);
                        leafSize += sizeof(ItemIdData);
                    }
 
index 0ca621450e647e01d157785f87941e85392e3622..1af0af7da21f3491ce5a9884a378b13504ce9299 100644 (file)
@@ -56,7 +56,7 @@ spgistBuildCallback(Relation index, ItemPointer tid, Datum *values,
     * any temp data when retrying.
     */
    while (!spgdoinsert(index, &buildstate->spgstate, tid,
-                       *values, *isnull))
+                       values, isnull))
    {
        MemoryContextReset(buildstate->tmpCtx);
    }
@@ -227,7 +227,7 @@ spginsert(Relation index, Datum *values, bool *isnull,
     * to avoid cumulative memory consumption.  That means we also have to
     * redo initSpGistState(), but it's cheap enough not to matter.
     */
-   while (!spgdoinsert(index, &spgstate, ht_ctid, *values, *isnull))
+   while (!spgdoinsert(index, &spgstate, ht_ctid, values, isnull))
    {
        MemoryContextReset(insertCtx);
        initSpGistState(&spgstate, index);
index 24a13d89df8e6753f7ef8fb14c8bf893accf4421..e14b9fa573af42b73daac9d3bc9e0d895e0034bf 100644 (file)
@@ -27,7 +27,8 @@
 #include "utils/rel.h"
 
 typedef void (*storeRes_func) (SpGistScanOpaque so, ItemPointer heapPtr,
-                              Datum leafValue, bool isNull, bool recheck,
+                              Datum leafValue, bool isNull,
+                              SpGistLeafTuple leafTuple, bool recheck,
                               bool recheckDistances, double *distances);
 
 /*
@@ -88,6 +89,9 @@ spgFreeSearchItem(SpGistScanOpaque so, SpGistSearchItem *item)
        DatumGetPointer(item->value) != NULL)
        pfree(DatumGetPointer(item->value));
 
+   if (item->leafTuple)
+       pfree(item->leafTuple);
+
    if (item->traversalValue)
        pfree(item->traversalValue);
 
@@ -133,6 +137,7 @@ spgAddStartItem(SpGistScanOpaque so, bool isnull)
    startEntry->isLeaf = false;
    startEntry->level = 0;
    startEntry->value = (Datum) 0;
+   startEntry->leafTuple = NULL;
    startEntry->traversalValue = NULL;
    startEntry->recheck = false;
    startEntry->recheckDistances = false;
@@ -299,7 +304,6 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
 {
    IndexScanDesc scan;
    SpGistScanOpaque so;
-   TupleDesc   outTupDesc;
    int         i;
 
    scan = RelationGetIndexScan(rel, keysz, orderbysz);
@@ -319,20 +323,13 @@ spgbeginscan(Relation rel, int keysz, int orderbysz)
                                             ALLOCSET_DEFAULT_SIZES);
 
    /*
-    * Set up indexTupDesc and xs_hitupdesc in case it's an index-only scan.
+    * Set up reconTupDesc and xs_hitupdesc in case it's an index-only scan,
+    * making sure that the key column is shown as being of type attType.
     * (It's rather annoying to do this work when it might be wasted, but for
     * most opclasses we can re-use the index reldesc instead of making one.)
     */
-   if (so->state.attType.type ==
-       TupleDescAttr(RelationGetDescr(rel), 0)->atttypid)
-       outTupDesc = RelationGetDescr(rel);
-   else
-   {
-       outTupDesc = CreateTemplateTupleDesc(1);
-       TupleDescInitEntry(outTupDesc, 1, NULL,
-                          so->state.attType.type, -1, 0);
-   }
-   so->indexTupDesc = scan->xs_hitupdesc = outTupDesc;
+   so->reconTupDesc = scan->xs_hitupdesc =
+       getSpGistTupleDesc(rel, &so->state.attType);
 
    /* Allocate various arrays needed for order-by scans */
    if (scan->numberOfOrderBys > 0)
@@ -435,6 +432,10 @@ spgendscan(IndexScanDesc scan)
    if (so->keyData)
        pfree(so->keyData);
 
+   if (so->state.leafTupDesc &&
+       so->state.leafTupDesc != RelationGetDescr(so->state.index))
+       FreeTupleDesc(so->state.leafTupDesc);
+
    if (so->state.deadTupleStorage)
        pfree(so->state.deadTupleStorage);
 
@@ -455,14 +456,14 @@ spgendscan(IndexScanDesc scan)
  * Leaf SpGistSearchItem constructor, called in queue context
  */
 static SpGistSearchItem *
-spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
+spgNewHeapItem(SpGistScanOpaque so, int level, SpGistLeafTuple leafTuple,
               Datum leafValue, bool recheck, bool recheckDistances,
               bool isnull, double *distances)
 {
    SpGistSearchItem *item = spgAllocSearchItem(so, isnull, distances);
 
    item->level = level;
-   item->heapPtr = *heapPtr;
+   item->heapPtr = leafTuple->heapPtr;
 
    /*
     * If we need the reconstructed value, copy it to queue cxt out of tmp
@@ -471,11 +472,28 @@ spgNewHeapItem(SpGistScanOpaque so, int level, ItemPointer heapPtr,
     * the wrong type.  The correct leafValue type is attType not leafType.
     */
    if (so->want_itup)
+   {
        item->value = isnull ? (Datum) 0 :
            datumCopy(leafValue, so->state.attType.attbyval,
                      so->state.attType.attlen);
+
+       /*
+        * If we're going to need to reconstruct INCLUDE attributes, store the
+        * whole leaf tuple so we can get the INCLUDE attributes out of it.
+        */
+       if (so->state.leafTupDesc->natts > 1)
+       {
+           item->leafTuple = palloc(leafTuple->size);
+           memcpy(item->leafTuple, leafTuple, leafTuple->size);
+       }
+       else
+           item->leafTuple = NULL;
+   }
    else
+   {
        item->value = (Datum) 0;
+       item->leafTuple = NULL;
+   }
    item->traversalValue = NULL;
    item->isLeaf = true;
    item->recheck = recheck;
@@ -555,7 +573,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
            /* the scan is ordered -> add the item to the queue */
            MemoryContext oldCxt = MemoryContextSwitchTo(so->traversalCxt);
            SpGistSearchItem *heapItem = spgNewHeapItem(so, item->level,
-                                                       &leafTuple->heapPtr,
+                                                       leafTuple,
                                                        leafValue,
                                                        recheck,
                                                        recheckDistances,
@@ -571,7 +589,7 @@ spgLeafTest(SpGistScanOpaque so, SpGistSearchItem *item,
            /* non-ordered scan, so report the item right away */
            Assert(!recheckDistances);
            storeRes(so, &leafTuple->heapPtr, leafValue, isnull,
-                    recheck, false, NULL);
+                    leafTuple, recheck, false, NULL);
            *reportedSome = true;
        }
    }
@@ -624,6 +642,8 @@ spgMakeInnerItem(SpGistScanOpaque so,
                    so->state.attLeafType.attlen)
        : (Datum) 0;
 
+   item->leafTuple = NULL;
+
    /*
     * Elements of out.traversalValues should be allocated in
     * in.traversalMemoryContext, which is actually a long lived context of
@@ -765,7 +785,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
                /* dead tuple should be first in chain */
                Assert(offset == ItemPointerGetOffsetNumber(&item->heapPtr));
                /* No live entries on this page */
-               Assert(leafTuple->nextOffset == InvalidOffsetNumber);
+               Assert(SGLT_GET_NEXTOFFSET(leafTuple) == InvalidOffsetNumber);
                return SpGistBreakOffsetNumber;
            }
        }
@@ -779,7 +799,7 @@ spgTestLeafTuple(SpGistScanOpaque so,
 
    spgLeafTest(so, item, leafTuple, isnull, reportedSome, storeRes);
 
-   return leafTuple->nextOffset;
+   return SGLT_GET_NEXTOFFSET(leafTuple);
 }
 
 /*
@@ -812,7 +832,8 @@ redirect:
            /* We store heap items in the queue only in case of ordered search */
            Assert(so->numberOfNonNullOrderBys > 0);
            storeRes(so, &item->heapPtr, item->value, item->isNull,
-                    item->recheck, item->recheckDistances, item->distances);
+                    item->leafTuple, item->recheck,
+                    item->recheckDistances, item->distances);
            reportedSome = true;
        }
        else
@@ -905,8 +926,9 @@ redirect:
 /* storeRes subroutine for getbitmap case */
 static void
 storeBitmap(SpGistScanOpaque so, ItemPointer heapPtr,
-           Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
-           double *distances)
+           Datum leafValue, bool isnull,
+           SpGistLeafTuple leafTuple, bool recheck,
+           bool recheckDistances, double *distances)
 {
    Assert(!recheckDistances && !distances);
    tbm_add_tuples(so->tbm, heapPtr, 1, recheck);
@@ -932,8 +954,9 @@ spggetbitmap(IndexScanDesc scan, TIDBitmap *tbm)
 /* storeRes subroutine for gettuple case */
 static void
 storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
-             Datum leafValue, bool isnull, bool recheck, bool recheckDistances,
-             double *nonNullDistances)
+             Datum leafValue, bool isnull,
+             SpGistLeafTuple leafTuple, bool recheck,
+             bool recheckDistances, double *nonNullDistances)
 {
    Assert(so->nPtrs < MaxIndexTuplesPerPage);
    so->heapPtrs[so->nPtrs] = *heapPtr;
@@ -978,9 +1001,20 @@ storeGettuple(SpGistScanOpaque so, ItemPointer heapPtr,
         * Reconstruct index data.  We have to copy the datum out of the temp
         * context anyway, so we may as well create the tuple here.
         */
-       so->reconTups[so->nPtrs] = heap_form_tuple(so->indexTupDesc,
-                                                  &leafValue,
-                                                  &isnull);
+       Datum       leafDatums[INDEX_MAX_KEYS];
+       bool        leafIsnulls[INDEX_MAX_KEYS];
+
+       /* We only need to deform the old tuple if it has INCLUDE attributes */
+       if (so->state.leafTupDesc->natts > 1)
+           spgDeformLeafTuple(leafTuple, so->state.leafTupDesc,
+                              leafDatums, leafIsnulls, isnull);
+
+       leafDatums[spgKeyColumn] = leafValue;
+       leafIsnulls[spgKeyColumn] = isnull;
+
+       so->reconTups[so->nPtrs] = heap_form_tuple(so->reconTupDesc,
+                                                  leafDatums,
+                                                  leafIsnulls);
    }
    so->nPtrs++;
 }
@@ -1048,6 +1082,10 @@ spgcanreturn(Relation index, int attno)
 {
    SpGistCache *cache;
 
+   /* INCLUDE attributes can always be fetched for index-only scans */
+   if (attno > 1)
+       return true;
+
    /* We can do it if the opclass config function says so */
    cache = spgGetCache(index);
 
index f2ba72b5aa39187831e9adaa77f23fdc8d9f2f19..72cbde7e0b6c5dcf705bbb2afd26a7eefcbda443 100644 (file)
@@ -19,6 +19,7 @@
 #include "access/htup_details.h"
 #include "access/reloptions.h"
 #include "access/spgist_private.h"
+#include "access/toast_compression.h"
 #include "access/transam.h"
 #include "access/xact.h"
 #include "catalog/pg_amop.h"
@@ -58,7 +59,7 @@ spghandler(PG_FUNCTION_ARGS)
    amroutine->amclusterable = false;
    amroutine->ampredlocks = false;
    amroutine->amcanparallel = false;
-   amroutine->amcaninclude = false;
+   amroutine->amcaninclude = true;
    amroutine->amusemaintenanceworkmem = false;
    amroutine->amparallelvacuumoptions =
        VACUUM_OPTION_PARALLEL_BULKDEL | VACUUM_OPTION_PARALLEL_COND_CLEANUP;
@@ -154,8 +155,19 @@ GetIndexInputType(Relation index, AttrNumber indexcol)
 static void
 fillTypeDesc(SpGistTypeDesc *desc, Oid type)
 {
+   HeapTuple   tp;
+   Form_pg_type typtup;
+
    desc->type = type;
-   get_typlenbyval(type, &desc->attlen, &desc->attbyval);
+   tp = SearchSysCache1(TYPEOID, ObjectIdGetDatum(type));
+   if (!HeapTupleIsValid(tp))
+       elog(ERROR, "cache lookup failed for type %u", type);
+   typtup = (Form_pg_type) GETSTRUCT(tp);
+   desc->attlen = typtup->typlen;
+   desc->attbyval = typtup->typbyval;
+   desc->attstorage = typtup->typstorage;
+   desc->attalign = typtup->typalign;
+   ReleaseSysCache(tp);
 }
 
 /*
@@ -178,22 +190,23 @@ spgGetCache(Relation index)
        cache = MemoryContextAllocZero(index->rd_indexcxt,
                                       sizeof(SpGistCache));
 
-       /* SPGiST doesn't support multi-column indexes */
-       Assert(index->rd_att->natts == 1);
+       /* SPGiST must have one key column and can also have INCLUDE columns */
+       Assert(IndexRelationGetNumberOfKeyAttributes(index) == 1);
+       Assert(IndexRelationGetNumberOfAttributes(index) <= INDEX_MAX_KEYS);
 
        /*
-        * Get the actual (well, nominal) data type of the column being
-        * indexed.  We pass this to the opclass config function so that
-        * polymorphic opclasses are possible.
+        * Get the actual (well, nominal) data type of the key column.  We
+        * pass this to the opclass config function so that polymorphic
+        * opclasses are possible.
         */
-       atttype = GetIndexInputType(index, 1);
+       atttype = GetIndexInputType(index, spgKeyColumn + 1);
 
        /* Call the config function to get config info for the opclass */
        in.attType = atttype;
 
        procinfo = index_getprocinfo(index, 1, SPGIST_CONFIG_PROC);
        FunctionCall2Coll(procinfo,
-                         index->rd_indcollation[0],
+                         index->rd_indcollation[spgKeyColumn],
                          PointerGetDatum(&in),
                          PointerGetDatum(&cache->config));
 
@@ -206,7 +219,7 @@ spgGetCache(Relation index)
         */
        if (!OidIsValid(cache->config.leafType))
            cache->config.leafType =
-               TupleDescAttr(RelationGetDescr(index), 0)->atttypid;
+               TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid;
 
        /* Get the information we need about each relevant datatype */
        fillTypeDesc(&cache->attType, atttype);
@@ -254,12 +267,60 @@ spgGetCache(Relation index)
    return cache;
 }
 
+/*
+ * Compute a tuple descriptor for leaf tuples or index-only-scan result tuples.
+ *
+ * We can use the relcache's tupdesc as-is in many cases, and it's always
+ * OK so far as any INCLUDE columns are concerned.  However, the entry for
+ * the key column has to match leafType in the first case or attType in the
+ * second case.  While the relcache's tupdesc *should* show leafType, this
+ * might not hold for legacy user-defined opclasses, since before v14 they
+ * were not allowed to declare their true storage type in CREATE OPCLASS.
+ * Also, attType can be different from what is in the relcache.
+ *
+ * This function gives back either a pointer to the relcache's tupdesc
+ * if that is suitable, or a palloc'd copy that's been adjusted to match
+ * the specified key column type.  We can avoid doing any catalog lookups
+ * here by insisting that the caller pass an SpGistTypeDesc not just an OID.
+ */
+TupleDesc
+getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType)
+{
+   TupleDesc   outTupDesc;
+   Form_pg_attribute att;
+
+   if (keyType->type ==
+       TupleDescAttr(RelationGetDescr(index), spgKeyColumn)->atttypid)
+       outTupDesc = RelationGetDescr(index);
+   else
+   {
+       outTupDesc = CreateTupleDescCopy(RelationGetDescr(index));
+       att = TupleDescAttr(outTupDesc, spgKeyColumn);
+       /* It's sufficient to update the type-dependent fields of the column */
+       att->atttypid = keyType->type;
+       att->atttypmod = -1;
+       att->attlen = keyType->attlen;
+       att->attbyval = keyType->attbyval;
+       att->attalign = keyType->attalign;
+       att->attstorage = keyType->attstorage;
+       /* We shouldn't need to bother with making these valid: */
+       att->attcollation = InvalidOid;
+       att->attcompression = InvalidCompressionMethod;
+       /* In case we changed typlen, we'd better reset following offsets */
+       for (int i = spgFirstIncludeColumn; i < outTupDesc->natts; i++)
+           TupleDescAttr(outTupDesc, i)->attcacheoff = -1;
+   }
+   return outTupDesc;
+}
+
 /* Initialize SpGistState for working with the given index */
 void
 initSpGistState(SpGistState *state, Relation index)
 {
    SpGistCache *cache;
 
+   state->index = index;
+
    /* Get cached static information about index */
    cache = spgGetCache(index);
 
@@ -269,6 +330,9 @@ initSpGistState(SpGistState *state, Relation index)
    state->attPrefixType = cache->attPrefixType;
    state->attLabelType = cache->attLabelType;
 
+   /* Ensure we have a valid descriptor for leaf tuples */
+   state->leafTupDesc = getSpGistTupleDesc(state->index, &state->attLeafType);
+
    /* Make workspace for constructing dead tuples */
    state->deadTupleStorage = palloc0(SGDTSIZE);
 
@@ -696,24 +760,6 @@ SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum)
    return MAXALIGN(size);
 }
 
-/*
- * Get the space needed to store a non-null datum of the indicated type
- * in a leaf tuple.  This is just the usual storage space for the type,
- * but rounded up to a MAXALIGN boundary.
- */
-unsigned int
-SpGistGetLeafTypeSize(SpGistTypeDesc *att, Datum datum)
-{
-   unsigned int size;
-
-   if (att->attlen > 0)
-       size = att->attlen;
-   else
-       size = VARSIZE_ANY(datum);
-
-   return MAXALIGN(size);
-}
-
 /*
  * Copy the given non-null datum to *target, in the inner-tuple case
  */
@@ -734,42 +780,111 @@ memcpyInnerDatum(void *target, SpGistTypeDesc *att, Datum datum)
 }
 
 /*
- * Copy the given non-null datum to *target, in the leaf-tuple case
+ * Compute space required for a leaf tuple holding the given data.
+ *
+ * This must match the size-calculation portion of spgFormLeafTuple.
  */
-static void
-memcpyLeafDatum(void *target, SpGistTypeDesc *att, Datum datum)
+Size
+SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
+                      Datum *datums, bool *isnulls)
 {
-   unsigned int size;
+   Size        size;
+   Size        data_size;
+   bool        needs_null_mask = false;
+   int         natts = tupleDescriptor->natts;
 
-   if (att->attbyval)
-   {
-       store_att_byval(target, datum, att->attlen);
-   }
-   else
+   /*
+    * Decide whether we need a nulls bitmask.
+    *
+    * If there is only a key attribute (natts == 1), never use a bitmask, for
+    * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
+    * need one if any attribute is null.
+    */
+   if (natts > 1)
    {
-       size = (att->attlen > 0) ? att->attlen : VARSIZE_ANY(datum);
-       memcpy(target, DatumGetPointer(datum), size);
+       for (int i = 0; i < natts; i++)
+       {
+           if (isnulls[i])
+           {
+               needs_null_mask = true;
+               break;
+           }
+       }
    }
+
+   /*
+    * Calculate size of the data part; same as for heap tuples.
+    */
+   data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
+
+   /*
+    * Compute total size.
+    */
+   size = SGLTHDRSZ(needs_null_mask);
+   size += data_size;
+   size = MAXALIGN(size);
+
+   /*
+    * Ensure that we can replace the tuple with a dead tuple later. This test
+    * is unnecessary when there are any non-null attributes, but be safe.
+    */
+   if (size < SGDTSIZE)
+       size = SGDTSIZE;
+
+   return size;
 }
 
 /*
- * Construct a leaf tuple containing the given heap TID and datum value
+ * Construct a leaf tuple containing the given heap TID and datum values
  */
 SpGistLeafTuple
 spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
-                Datum datum, bool isnull)
+                Datum *datums, bool *isnulls)
 {
    SpGistLeafTuple tup;
-   unsigned int size;
+   TupleDesc   tupleDescriptor = state->leafTupDesc;
+   Size        size;
+   Size        hoff;
+   Size        data_size;
+   bool        needs_null_mask = false;
+   int         natts = tupleDescriptor->natts;
+   char       *tp;             /* ptr to tuple data */
+   uint16      tupmask = 0;    /* unused heap_fill_tuple output */
 
-   /* compute space needed (note result is already maxaligned) */
-   size = SGLTHDRSZ;
-   if (!isnull)
-       size += SpGistGetLeafTypeSize(&state->attLeafType, datum);
+   /*
+    * Decide whether we need a nulls bitmask.
+    *
+    * If there is only a key attribute (natts == 1), never use a bitmask, for
+    * compatibility with the pre-v14 layout of leaf tuples.  Otherwise, we
+    * need one if any attribute is null.
+    */
+   if (natts > 1)
+   {
+       for (int i = 0; i < natts; i++)
+       {
+           if (isnulls[i])
+           {
+               needs_null_mask = true;
+               break;
+           }
+       }
+   }
 
    /*
-    * Ensure that we can replace the tuple with a dead tuple later.  This
-    * test is unnecessary when !isnull, but let's be safe.
+    * Calculate size of the data part; same as for heap tuples.
+    */
+   data_size = heap_compute_data_size(tupleDescriptor, datums, isnulls);
+
+   /*
+    * Compute total size.
+    */
+   hoff = SGLTHDRSZ(needs_null_mask);
+   size = hoff + data_size;
+   size = MAXALIGN(size);
+
+   /*
+    * Ensure that we can replace the tuple with a dead tuple later. This test
+    * is unnecessary when there are any non-null attributes, but be safe.
     */
    if (size < SGDTSIZE)
        size = SGDTSIZE;
@@ -778,10 +893,29 @@ spgFormLeafTuple(SpGistState *state, ItemPointer heapPtr,
    tup = (SpGistLeafTuple) palloc0(size);
 
    tup->size = size;
-   tup->nextOffset = InvalidOffsetNumber;
+   SGLT_SET_NEXTOFFSET(tup, InvalidOffsetNumber);
    tup->heapPtr = *heapPtr;
-   if (!isnull)
-       memcpyLeafDatum(SGLTDATAPTR(tup), &state->attLeafType, datum);
+
+   tp = (char *) tup + hoff;
+
+   if (needs_null_mask)
+   {
+       bits8      *bp;         /* ptr to null bitmap in tuple */
+
+       /* Set nullmask presence bit in SpGistLeafTuple header */
+       SGLT_SET_HASNULLMASK(tup, true);
+       /* Fill the data area and null mask */
+       bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
+       heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
+                       &tupmask, bp);
+   }
+   else if (natts > 1 || !isnulls[spgKeyColumn])
+   {
+       /* Fill data area only */
+       heap_fill_tuple(tupleDescriptor, datums, isnulls, tp, data_size,
+                       &tupmask, (bits8 *) NULL);
+   }
+   /* otherwise we have no data, nor a bitmap, to fill */
 
    return tup;
 }
@@ -925,7 +1059,7 @@ spgFormDeadTuple(SpGistState *state, int tupstate,
 
    tuple->tupstate = tupstate;
    tuple->size = SGDTSIZE;
-   tuple->nextOffset = InvalidOffsetNumber;
+   SGLT_SET_NEXTOFFSET(tuple, InvalidOffsetNumber);
 
    if (tupstate == SPGIST_REDIRECT)
    {
@@ -942,6 +1076,52 @@ spgFormDeadTuple(SpGistState *state, int tupstate,
    return tuple;
 }
 
+/*
+ * Convert an SPGiST leaf tuple into Datum/isnull arrays.
+ *
+ * The caller must allocate sufficient storage for the output arrays.
+ * (INDEX_MAX_KEYS entries should be enough.)
+ */
+void
+spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
+                  Datum *datums, bool *isnulls, bool keyColumnIsNull)
+{
+   bool        hasNullsMask = SGLT_GET_HASNULLMASK(tup);
+   char       *tp;             /* ptr to tuple data */
+   bits8      *bp;             /* ptr to null bitmap in tuple */
+
+   if (keyColumnIsNull && tupleDescriptor->natts == 1)
+   {
+       /*
+        * Trivial case: there is only the key attribute and we're in a nulls
+        * tree.  The hasNullsMask bit in the tuple header should not be set
+        * (and thus we can't use index_deform_tuple_internal), but
+        * nonetheless the result is NULL.
+        *
+        * Note: currently this is dead code, because noplace calls this when
+        * there is only the key attribute.  But we should cover the case.
+        */
+       Assert(!hasNullsMask);
+
+       datums[spgKeyColumn] = (Datum) 0;
+       isnulls[spgKeyColumn] = true;
+       return;
+   }
+
+   tp = (char *) tup + SGLTHDRSZ(hasNullsMask);
+   bp = (bits8 *) ((char *) tup + sizeof(SpGistLeafTupleData));
+
+   index_deform_tuple_internal(tupleDescriptor,
+                               datums, isnulls,
+                               tp, bp, hasNullsMask);
+
+   /*
+    * Key column isnull value from the tuple should be consistent with
+    * keyColumnIsNull flag from the caller.
+    */
+   Assert(keyColumnIsNull == isnulls[spgKeyColumn]);
+}
+
 /*
  * Extract the label datums of the nodes within innerTuple
  *
index a9ffca5183bd2b30fd54cbc99e7fcd68e70ceb6f..76fb0374c42a9252a4b889f3eff2c2af04d3597a 100644 (file)
@@ -168,23 +168,23 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
            }
 
            /* Form predecessor map, too */
-           if (lt->nextOffset != InvalidOffsetNumber)
+           if (SGLT_GET_NEXTOFFSET(lt) != InvalidOffsetNumber)
            {
                /* paranoia about corrupted chain links */
-               if (lt->nextOffset < FirstOffsetNumber ||
-                   lt->nextOffset > max ||
-                   predecessor[lt->nextOffset] != InvalidOffsetNumber)
+               if (SGLT_GET_NEXTOFFSET(lt) < FirstOffsetNumber ||
+                   SGLT_GET_NEXTOFFSET(lt) > max ||
+                   predecessor[SGLT_GET_NEXTOFFSET(lt)] != InvalidOffsetNumber)
                    elog(ERROR, "inconsistent tuple chain links in page %u of index \"%s\"",
                         BufferGetBlockNumber(buffer),
                         RelationGetRelationName(index));
-               predecessor[lt->nextOffset] = i;
+               predecessor[SGLT_GET_NEXTOFFSET(lt)] = i;
            }
        }
        else if (lt->tupstate == SPGIST_REDIRECT)
        {
            SpGistDeadTuple dt = (SpGistDeadTuple) lt;
 
-           Assert(dt->nextOffset == InvalidOffsetNumber);
+           Assert(SGLT_GET_NEXTOFFSET(dt) == InvalidOffsetNumber);
            Assert(ItemPointerIsValid(&dt->pointer));
 
            /*
@@ -201,7 +201,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
        }
        else
        {
-           Assert(lt->nextOffset == InvalidOffsetNumber);
+           Assert(SGLT_GET_NEXTOFFSET(lt) == InvalidOffsetNumber);
        }
    }
 
@@ -250,7 +250,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
        prevLive = deletable[i] ? InvalidOffsetNumber : i;
 
        /* scan down the chain ... */
-       j = head->nextOffset;
+       j = SGLT_GET_NEXTOFFSET(head);
        while (j != InvalidOffsetNumber)
        {
            SpGistLeafTuple lt;
@@ -301,7 +301,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
                interveningDeletable = false;
            }
 
-           j = lt->nextOffset;
+           j = SGLT_GET_NEXTOFFSET(lt);
        }
 
        if (prevLive == InvalidOffsetNumber)
@@ -366,7 +366,7 @@ vacuumLeafPage(spgBulkDeleteState *bds, Relation index, Buffer buffer,
        lt = (SpGistLeafTuple) PageGetItem(page,
                                           PageGetItemId(page, chainSrc[i]));
        Assert(lt->tupstate == SPGIST_LIVE);
-       lt->nextOffset = chainDest[i];
+       SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
    }
 
    MarkBufferDirty(buffer);
index d40c7b58776d769a2364bbc51d13a1f1b7c5cbca..3dfd2aa317b50eb0cf8d6773ac16fb9f9d8f5e58 100644 (file)
@@ -122,8 +122,8 @@ spgRedoAddLeaf(XLogReaderState *record)
 
                head = (SpGistLeafTuple) PageGetItem(page,
                                                     PageGetItemId(page, xldata->offnumHeadLeaf));
-               Assert(head->nextOffset == leafTupleHdr.nextOffset);
-               head->nextOffset = xldata->offnumLeaf;
+               Assert(SGLT_GET_NEXTOFFSET(head) == SGLT_GET_NEXTOFFSET(&leafTupleHdr));
+               SGLT_SET_NEXTOFFSET(head, xldata->offnumLeaf);
            }
        }
        else
@@ -822,7 +822,7 @@ spgRedoVacuumLeaf(XLogReaderState *record)
            lt = (SpGistLeafTuple) PageGetItem(page,
                                               PageGetItemId(page, chainSrc[i]));
            Assert(lt->tupstate == SPGIST_LIVE);
-           lt->nextOffset = chainDest[i];
+           SGLT_SET_NEXTOFFSET(lt, chainDest[i]);
        }
 
        PageSetLSN(page, lsn);
index b6813707d0b7a674c3dce5c546e2a1eb417f5055..1917375cde1d439e13c1afd6ff6b12ffa2c189a9 100644 (file)
@@ -154,6 +154,9 @@ extern Datum nocache_index_getattr(IndexTuple tup, int attnum,
                                   TupleDesc tupleDesc);
 extern void index_deform_tuple(IndexTuple tup, TupleDesc tupleDescriptor,
                               Datum *values, bool *isnull);
+extern void index_deform_tuple_internal(TupleDesc tupleDescriptor,
+                                       Datum *values, bool *isnull,
+                                       char *tp, bits8 *bp, int hasnulls);
 extern IndexTuple CopyIndexTuple(IndexTuple source);
 extern IndexTuple index_truncate_tuple(TupleDesc sourceDescriptor,
                                       IndexTuple source, int leavenatts);
index 980aa7766baa479e56ce9b0d95ba9bca7679300d..ba3da5b5404b04aedc6f9d42b413a10e6b0c4c19 100644 (file)
@@ -39,6 +39,10 @@ typedef struct SpGistOptions
    (BLCKSZ * (100 - SpGistGetFillFactor(relation)) / 100)
 
 
+/* SPGiST leaf tuples have one key column, optionally have included columns */
+#define spgKeyColumn 0
+#define spgFirstIncludeColumn 1
+
 /* Page numbers of fixed-location pages */
 #define SPGIST_METAPAGE_BLKNO   (0)    /* metapage */
 #define SPGIST_ROOT_BLKNO       (1)    /* root for normal entries */
@@ -125,16 +129,22 @@ typedef struct SpGistMetaPageData
  * search code; SpGistScanOpaque is for searches only.
  */
 
+typedef struct SpGistLeafTupleData *SpGistLeafTuple;   /* forward reference */
+
 /* Per-datatype info needed in SpGistState */
 typedef struct SpGistTypeDesc
 {
    Oid         type;
-   bool        attbyval;
    int16       attlen;
+   bool        attbyval;
+   char        attstorage;
+   char        attalign;
 } SpGistTypeDesc;
 
 typedef struct SpGistState
 {
+   Relation    index;          /* index we're working with */
+
    spgConfigOut config;        /* filled in by opclass config method */
 
    SpGistTypeDesc attType;     /* type of values to be indexed/restored */
@@ -142,17 +152,22 @@ typedef struct SpGistState
    SpGistTypeDesc attPrefixType;   /* type of inner-tuple prefix values */
    SpGistTypeDesc attLabelType;    /* type of node label values */
 
+   /* leafTupDesc typically points to index's tupdesc, but not always */
+   TupleDesc   leafTupDesc;    /* descriptor for leaf-level tuples */
+
    char       *deadTupleStorage;   /* workspace for spgFormDeadTuple */
 
    TransactionId myXid;        /* XID to use when creating a redirect tuple */
    bool        isBuild;        /* true if doing index build */
 } SpGistState;
 
+/* Item to be re-examined later during a search */
 typedef struct SpGistSearchItem
 {
    pairingheap_node phNode;    /* pairing heap node */
    Datum       value;          /* value reconstructed from parent, or
                                 * leafValue if isLeaf */
+   SpGistLeafTuple leafTuple;  /* whole leaf tuple, if needed */
    void       *traversalValue; /* opclass-specific traverse value */
    int         level;          /* level of items on this page */
    ItemPointerData heapPtr;    /* heap info, if heap tuple */
@@ -208,7 +223,7 @@ typedef struct SpGistScanOpaqueData
 
    /* These fields are only used in amgettuple scans: */
    bool        want_itup;      /* are we reconstructing tuples? */
-   TupleDesc   indexTupDesc;   /* if so, descriptor for reconstructed tuples */
+   TupleDesc   reconTupDesc;   /* if so, descriptor for reconstructed tuples */
    int         nPtrs;          /* number of TIDs found on current page */
    int         iPtr;           /* index for scanning through same */
    ItemPointerData heapPtrs[MaxIndexTuplesPerPage];    /* TIDs from cur page */
@@ -329,23 +344,36 @@ typedef SpGistNodeTupleData *SpGistNodeTuple;
                             PointerGetDatum(SGNTDATAPTR(x)))
 
 /*
- * SPGiST leaf tuple: carries a leaf datum and a heap tuple TID
+ * SPGiST leaf tuple: carries a leaf datum and a heap tuple TID,
+ * and optionally some "included" columns.
  *
  * In the simplest case, the leaf datum is the same as the indexed value;
  * but it could also be a suffix or some other sort of delta that permits
  * reconstruction given knowledge of the prefix path traversed to get here.
+ * Any included columns are stored without modification.
  *
- * If the leaf datum is NULL, it's not stored.  This is not represented
- * explicitly; we infer it from the tuple being stored on a nulls page.
+ * A nulls bitmap is present if there are included columns AND any of the
+ * datums are NULL.  We do not need a nulls bitmap for the case of a null
+ * leaf datum without included columns, as we can infer whether the leaf
+ * datum is null from whether the tuple is stored on a nulls page.  (This
+ * provision is mostly for backwards compatibility, but it does save space
+ * on 32-bit machines.)  As with other PG index tuple designs, if the nulls
+ * bitmap exists then it's of size INDEX_MAX_KEYS bits regardless of the
+ * actual number of attributes.  For the usual choice of INDEX_MAX_KEYS,
+ * this costs nothing because of alignment considerations.
  *
  * The size field is wider than could possibly be needed for an on-disk leaf
  * tuple, but this allows us to form leaf tuples even when the datum is too
  * wide to be stored immediately, and it costs nothing because of alignment
  * considerations.
  *
+ * t_info holds the nextOffset field (14 bits wide, enough for supported
+ * page sizes) plus the has-nulls-bitmap flag bit; another flag bit is free.
+ *
  * Normally, nextOffset links to the next tuple belonging to the same parent
- * node (which must be on the same page).  But when the root page is a leaf
- * page, we don't chain its tuples, so nextOffset is always 0 on the root.
+ * node (which must be on the same page), or it's 0 if there is no next tuple.
+ * But when the root page is a leaf page, we don't chain its tuples,
+ * so nextOffset is always 0 on the root.
  *
  * size must be a multiple of MAXALIGN; also, it must be at least SGDTSIZE
  * so that the tuple can be converted to REDIRECT status later.  (This
@@ -356,15 +384,30 @@ typedef struct SpGistLeafTupleData
 {
    unsigned int tupstate:2,    /* LIVE/REDIRECT/DEAD/PLACEHOLDER */
                size:30;        /* large enough for any palloc'able value */
-   OffsetNumber nextOffset;    /* next tuple in chain, or InvalidOffsetNumber */
+   uint16      t_info;         /* nextOffset, which links to the next tuple
+                                * in chain, plus two flag bits */
    ItemPointerData heapPtr;    /* TID of represented heap tuple */
-   /* leaf datum follows on a MAXALIGN boundary */
+   /* nulls bitmap follows if the flag bit for it is set */
+   /* leaf datum, then any included datums, follows on a MAXALIGN boundary */
 } SpGistLeafTupleData;
 
-typedef SpGistLeafTupleData *SpGistLeafTuple;
-
-#define SGLTHDRSZ          MAXALIGN(sizeof(SpGistLeafTupleData))
-#define SGLTDATAPTR(x)     (((char *) (x)) + SGLTHDRSZ)
+/* Macros to access nextOffset and bit fields inside t_info */
+#define SGLT_GET_NEXTOFFSET(spgLeafTuple) \
+   ((spgLeafTuple)->t_info & 0x3FFF)
+#define SGLT_GET_HASNULLMASK(spgLeafTuple) \
+   (((spgLeafTuple)->t_info & 0x8000) ? true : false)
+#define SGLT_SET_NEXTOFFSET(spgLeafTuple, offsetNumber) \
+   ((spgLeafTuple)->t_info = \
+    ((spgLeafTuple)->t_info & 0xC000) | ((offsetNumber) & 0x3FFF))
+#define SGLT_SET_HASNULLMASK(spgLeafTuple, hasnulls) \
+   ((spgLeafTuple)->t_info = \
+    ((spgLeafTuple)->t_info & 0x7FFF) | ((hasnulls) ? 0x8000 : 0))
+
+#define SGLTHDRSZ(hasnulls) \
+   ((hasnulls) ? MAXALIGN(sizeof(SpGistLeafTupleData) + \
+                          sizeof(IndexAttributeBitMapData)) : \
+    MAXALIGN(sizeof(SpGistLeafTupleData)))
+#define SGLTDATAPTR(x)     (((char *) (x)) + SGLTHDRSZ(SGLT_GET_HASNULLMASK(x)))
 #define SGLTDATUM(x, s)        fetch_att(SGLTDATAPTR(x), \
                                      (s)->attLeafType.attbyval, \
                                      (s)->attLeafType.attlen)
@@ -377,14 +420,14 @@ typedef SpGistLeafTupleData *SpGistLeafTuple;
  * Also, the pointer field must be in the same place as a leaf tuple's heapPtr
  * field, to satisfy some Asserts that we make when replacing a leaf tuple
  * with a dead tuple.
- * We don't use nextOffset, but it's needed to align the pointer field.
+ * We don't use t_info, but it's needed to align the pointer field.
  * pointer and xid are only valid when tupstate = REDIRECT.
  */
 typedef struct SpGistDeadTupleData
 {
    unsigned int tupstate:2,    /* LIVE/REDIRECT/DEAD/PLACEHOLDER */
                size:30;
-   OffsetNumber nextOffset;    /* not used in dead tuples */
+   uint16      t_info;         /* not used in dead tuples */
    ItemPointerData pointer;    /* redirection inside index */
    TransactionId xid;          /* ID of xact that inserted this tuple */
 } SpGistDeadTupleData;
@@ -451,6 +494,7 @@ typedef SpGistDeadTupleData *SpGistDeadTuple;
 #define SPGIST_DEFAULT_FILLFACTOR      80
 
 extern SpGistCache *spgGetCache(Relation index);
+extern TupleDesc getSpGistTupleDesc(Relation index, SpGistTypeDesc *keyType);
 extern void initSpGistState(SpGistState *state, Relation index);
 extern Buffer SpGistNewBuffer(Relation index);
 extern void SpGistUpdateMetaPage(Relation index);
@@ -461,10 +505,11 @@ extern void SpGistInitPage(Page page, uint16 f);
 extern void SpGistInitBuffer(Buffer b, uint16 f);
 extern void SpGistInitMetapage(Page page);
 extern unsigned int SpGistGetInnerTypeSize(SpGistTypeDesc *att, Datum datum);
-extern unsigned int SpGistGetLeafTypeSize(SpGistTypeDesc *att, Datum datum);
+extern Size SpGistGetLeafTupleSize(TupleDesc tupleDescriptor,
+                                  Datum *datums, bool *isnulls);
 extern SpGistLeafTuple spgFormLeafTuple(SpGistState *state,
                                        ItemPointer heapPtr,
-                                       Datum datum, bool isnull);
+                                       Datum *datums, bool *isnulls);
 extern SpGistNodeTuple spgFormNodeTuple(SpGistState *state,
                                        Datum label, bool isnull);
 extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
@@ -472,6 +517,9 @@ extern SpGistInnerTuple spgFormInnerTuple(SpGistState *state,
                                          int nNodes, SpGistNodeTuple *nodes);
 extern SpGistDeadTuple spgFormDeadTuple(SpGistState *state, int tupstate,
                                        BlockNumber blkno, OffsetNumber offnum);
+extern void spgDeformLeafTuple(SpGistLeafTuple tup, TupleDesc tupleDescriptor,
+                              Datum *datums, bool *isnulls,
+                              bool keyColumnIsNull);
 extern Datum *spgExtractNodeLabels(SpGistState *state,
                                   SpGistInnerTuple innerTuple);
 extern OffsetNumber SpGistPageAddNewItem(SpGistState *state, Page page,
@@ -490,7 +538,7 @@ extern void spgPageIndexMultiDelete(SpGistState *state, Page page,
                                    int firststate, int reststate,
                                    BlockNumber blkno, OffsetNumber offnum);
 extern bool spgdoinsert(Relation index, SpGistState *state,
-                       ItemPointer heapPtr, Datum datum, bool isnull);
+                       ItemPointer heapPtr, Datum *datums, bool *isnulls);
 
 /* spgproc.c */
 extern double *spg_key_orderbys_distances(Datum key, bool isLeaf,
index 87b8bd0d5e934fd0973147453c7d936c58ac8fc2..0ac99d08fbab431735a992682da55138c264bcd3 100644 (file)
@@ -17,16 +17,21 @@ INFO:  SP-GiST leaf data type text does not match declared type name
  name_ops_old | f
 (1 row)
 
-create table t(f1 name);
-create index on t using spgist(f1);
-\d+ t_f1_idx
-                   Index "public.t_f1_idx"
- Column | Type | Key? | Definition | Storage  | Stats target 
---------+------+------+------------+----------+--------------
- f1     | text | yes  | f1         | extended | 
+create table t(f1 name, f2 integer, f3 text);
+create index on t using spgist(f1) include(f2, f3);
+\d+ t_f1_f2_f3_idx
+                 Index "public.t_f1_f2_f3_idx"
+ Column |  Type   | Key? | Definition | Storage  | Stats target 
+--------+---------+------+------------+----------+--------------
+ f1     | text    | yes  | f1         | extended | 
+ f2     | integer | no   | f2         | plain    | 
+ f3     | text    | no   | f3         | extended | 
 spgist, for table "public.t"
 
-insert into t select proname from pg_proc;
+insert into t select
+  proname,
+  case when length(proname) % 2 = 0 then pronargs else null end,
+  prosrc from pg_proc;
 vacuum analyze t;
 explain (costs off)
 select * from t
@@ -36,33 +41,35 @@ select * from t
 ---------------------------------------------------------------------------------------------------
  Sort
    Sort Key: f1
-   ->  Index Only Scan using t_f1_idx on t
+   ->  Index Only Scan using t_f1_f2_f3_idx on t
          Index Cond: ((f1 > 'binary_upgrade_set_n'::name) AND (f1 < 'binary_upgrade_set_p'::name))
 (4 rows)
 
 select * from t
   where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p'
   order by 1;
-                          f1                          
-------------------------------------------------------
- binary_upgrade_set_next_array_pg_type_oid
- binary_upgrade_set_next_heap_pg_class_oid
- binary_upgrade_set_next_index_pg_class_oid
- binary_upgrade_set_next_multirange_array_pg_type_oid
- binary_upgrade_set_next_multirange_pg_type_oid
- binary_upgrade_set_next_pg_authid_oid
- binary_upgrade_set_next_pg_enum_oid
- binary_upgrade_set_next_pg_type_oid
- binary_upgrade_set_next_toast_pg_class_oid
+                          f1                          | f2 |                          f3                          
+------------------------------------------------------+----+------------------------------------------------------
+ binary_upgrade_set_next_array_pg_type_oid            |    | binary_upgrade_set_next_array_pg_type_oid
+ binary_upgrade_set_next_heap_pg_class_oid            |    | binary_upgrade_set_next_heap_pg_class_oid
+ binary_upgrade_set_next_index_pg_class_oid           |  1 | binary_upgrade_set_next_index_pg_class_oid
+ binary_upgrade_set_next_multirange_array_pg_type_oid |  1 | binary_upgrade_set_next_multirange_array_pg_type_oid
+ binary_upgrade_set_next_multirange_pg_type_oid       |  1 | binary_upgrade_set_next_multirange_pg_type_oid
+ binary_upgrade_set_next_pg_authid_oid                |    | binary_upgrade_set_next_pg_authid_oid
+ binary_upgrade_set_next_pg_enum_oid                  |    | binary_upgrade_set_next_pg_enum_oid
+ binary_upgrade_set_next_pg_type_oid                  |    | binary_upgrade_set_next_pg_type_oid
+ binary_upgrade_set_next_toast_pg_class_oid           |  1 | binary_upgrade_set_next_toast_pg_class_oid
 (9 rows)
 
-drop index t_f1_idx;
-create index on t using spgist(f1 name_ops_old);
-\d+ t_f1_idx
-                  Index "public.t_f1_idx"
- Column | Type | Key? | Definition | Storage | Stats target 
---------+------+------+------------+---------+--------------
- f1     | name | yes  | f1         | plain   | 
+drop index t_f1_f2_f3_idx;
+create index on t using spgist(f1 name_ops_old) include(f2, f3);
+\d+ t_f1_f2_f3_idx
+                 Index "public.t_f1_f2_f3_idx"
+ Column |  Type   | Key? | Definition | Storage  | Stats target 
+--------+---------+------+------------+----------+--------------
+ f1     | name    | yes  | f1         | plain    | 
+ f2     | integer | no   | f2         | plain    | 
+ f3     | text    | no   | f3         | extended | 
 spgist, for table "public.t"
 
 explain (costs off)
@@ -73,23 +80,23 @@ select * from t
 ---------------------------------------------------------------------------------------------------
  Sort
    Sort Key: f1
-   ->  Index Only Scan using t_f1_idx on t
+   ->  Index Only Scan using t_f1_f2_f3_idx on t
          Index Cond: ((f1 > 'binary_upgrade_set_n'::name) AND (f1 < 'binary_upgrade_set_p'::name))
 (4 rows)
 
 select * from t
   where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p'
   order by 1;
-                          f1                          
-------------------------------------------------------
- binary_upgrade_set_next_array_pg_type_oid
- binary_upgrade_set_next_heap_pg_class_oid
- binary_upgrade_set_next_index_pg_class_oid
- binary_upgrade_set_next_multirange_array_pg_type_oid
- binary_upgrade_set_next_multirange_pg_type_oid
- binary_upgrade_set_next_pg_authid_oid
- binary_upgrade_set_next_pg_enum_oid
- binary_upgrade_set_next_pg_type_oid
- binary_upgrade_set_next_toast_pg_class_oid
+                          f1                          | f2 |                          f3                          
+------------------------------------------------------+----+------------------------------------------------------
+ binary_upgrade_set_next_array_pg_type_oid            |    | binary_upgrade_set_next_array_pg_type_oid
+ binary_upgrade_set_next_heap_pg_class_oid            |    | binary_upgrade_set_next_heap_pg_class_oid
+ binary_upgrade_set_next_index_pg_class_oid           |  1 | binary_upgrade_set_next_index_pg_class_oid
+ binary_upgrade_set_next_multirange_array_pg_type_oid |  1 | binary_upgrade_set_next_multirange_array_pg_type_oid
+ binary_upgrade_set_next_multirange_pg_type_oid       |  1 | binary_upgrade_set_next_multirange_pg_type_oid
+ binary_upgrade_set_next_pg_authid_oid                |    | binary_upgrade_set_next_pg_authid_oid
+ binary_upgrade_set_next_pg_enum_oid                  |    | binary_upgrade_set_next_pg_enum_oid
+ binary_upgrade_set_next_pg_type_oid                  |    | binary_upgrade_set_next_pg_type_oid
+ binary_upgrade_set_next_toast_pg_class_oid           |  1 | binary_upgrade_set_next_toast_pg_class_oid
 (9 rows)
 
index 5c97e248a48a4839962827ed75603a4e2d7d9257..76e78ba41c7799b8ecaad942beb73abc47a5d871 100644 (file)
@@ -9,11 +9,14 @@ select opcname, amvalidate(opc.oid)
 from pg_opclass opc join pg_am am on am.oid = opcmethod
 where amname = 'spgist' and opcname = 'name_ops_old';
 
-create table t(f1 name);
-create index on t using spgist(f1);
-\d+ t_f1_idx
-
-insert into t select proname from pg_proc;
+create table t(f1 name, f2 integer, f3 text);
+create index on t using spgist(f1) include(f2, f3);
+\d+ t_f1_f2_f3_idx
+
+insert into t select
+  proname,
+  case when length(proname) % 2 = 0 then pronargs else null end,
+  prosrc from pg_proc;
 vacuum analyze t;
 
 explain (costs off)
@@ -24,10 +27,10 @@ select * from t
   where f1 > 'binary_upgrade_set_n' and f1 < 'binary_upgrade_set_p'
   order by 1;
 
-drop index t_f1_idx;
+drop index t_f1_f2_f3_idx;
 
-create index on t using spgist(f1 name_ops_old);
-\d+ t_f1_idx
+create index on t using spgist(f1 name_ops_old) include(f2, f3);
+\d+ t_f1_f2_f3_idx
 
 explain (costs off)
 select * from t
index d92a6d12c622c2c8117992c0ef58d7c5903b6cc7..7ab6113c61914f1c88609a0dd9db6accb53ddc8a 100644 (file)
@@ -171,7 +171,7 @@ select amname, prop, pg_indexam_has_property(a.oid, prop) as p
  spgist | can_unique    | f
  spgist | can_multi_col | f
  spgist | can_exclude   | t
- spgist | can_include   | f
+ spgist | can_include   | t
  spgist | bogus         | 
 (36 rows)
 
index afe38e16a2baead3ec2ffdeeaa14f687c8785f10..f4fb08a289ac7b6181b2c96f8c8d316219050b1e 100644 (file)
@@ -555,6 +555,35 @@ WHERE seq.dist IS DISTINCT FROM idx.dist;
 ---+------+---+---+------+---
 (0 rows)
 
+-- test KNN scan with included columns
+-- the distance numbers are not exactly the same across platforms
+SET extra_float_digits = 0;
+CREATE INDEX ON quad_point_tbl_ord_seq1 USING spgist(p) INCLUDE(dist);
+EXPLAIN (COSTS OFF)
+SELECT p, dist FROM quad_point_tbl_ord_seq1 ORDER BY p <-> '0,0' LIMIT 10;
+                                        QUERY PLAN                                         
+-------------------------------------------------------------------------------------------
+ Limit
+   ->  Index Only Scan using quad_point_tbl_ord_seq1_p_dist_idx on quad_point_tbl_ord_seq1
+         Order By: (p <-> '(0,0)'::point)
+(3 rows)
+
+SELECT p, dist FROM quad_point_tbl_ord_seq1 ORDER BY p <-> '0,0' LIMIT 10;
+     p     |       dist       
+-----------+------------------
+ (59,21)   | 62.6258732474047
+ (88,104)  | 136.235090927411
+ (39,143)  | 148.222805262888
+ (139,160) | 211.945747775227
+ (209,38)  |  212.42645786248
+ (157,156) | 221.325552072055
+ (175,150) | 230.488611432322
+ (236,34)  | 238.436574375661
+ (263,28)  | 264.486294540946
+ (322,53)  |  326.33265236565
+(10 rows)
+
+RESET extra_float_digits;
 -- check ORDER BY distance to NULL
 SELECT (SELECT p FROM kd_point_tbl ORDER BY p <-> pt, p <-> '0,0' LIMIT 1)
 FROM (VALUES (point '1,2'), (NULL), ('1234,5678')) pts(pt);
index 8e5d53e712acec68479a269c81a3f8dd25d140e3..86510687c743f54fd49f508d09176f10a8c52cd3 100644 (file)
@@ -349,14 +349,13 @@ SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname;
 
 DROP TABLE tbl;
 /*
- * 7. Check various AMs. All but btree and gist must fail.
+ * 7. Check various AMs. All but btree, gist and spgist must fail.
  */
 CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box);
 CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4);
 ERROR:  access method "brin" does not support included columns
 CREATE INDEX on tbl USING gist(c3) INCLUDE (c1, c4);
 CREATE INDEX on tbl USING spgist(c3) INCLUDE (c4);
-ERROR:  access method "spgist" does not support included columns
 CREATE INDEX on tbl USING gin(c1, c2) INCLUDE (c3, c4);
 ERROR:  access method "gin" does not support included columns
 CREATE INDEX on tbl USING hash(c1, c2) INCLUDE (c3, c4);
index bff5f2d0922fd4b83dcb6aeb272281d36569437e..b126dae6299ef11edbc15109686e3db9622ec011 100644 (file)
@@ -225,6 +225,15 @@ SELECT * FROM quad_point_tbl_ord_seq3 seq FULL JOIN kd_point_tbl_ord_idx3 idx
 ON seq.n = idx.n
 WHERE seq.dist IS DISTINCT FROM idx.dist;
 
+-- test KNN scan with included columns
+-- the distance numbers are not exactly the same across platforms
+SET extra_float_digits = 0;
+CREATE INDEX ON quad_point_tbl_ord_seq1 USING spgist(p) INCLUDE(dist);
+EXPLAIN (COSTS OFF)
+SELECT p, dist FROM quad_point_tbl_ord_seq1 ORDER BY p <-> '0,0' LIMIT 10;
+SELECT p, dist FROM quad_point_tbl_ord_seq1 ORDER BY p <-> '0,0' LIMIT 10;
+RESET extra_float_digits;
+
 -- check ORDER BY distance to NULL
 SELECT (SELECT p FROM kd_point_tbl ORDER BY p <-> pt, p <-> '0,0' LIMIT 1)
 FROM (VALUES (point '1,2'), (NULL), ('1234,5678')) pts(pt);
index 7e517483adf49b8e691b660dac294d7a9c8b051f..44b340053b75b2b2bdc500bd0a9472d368d1ab7b 100644 (file)
@@ -182,7 +182,7 @@ SELECT indexdef FROM pg_indexes WHERE tablename = 'tbl' ORDER BY indexname;
 DROP TABLE tbl;
 
 /*
- * 7. Check various AMs. All but btree and gist must fail.
+ * 7. Check various AMs. All but btree, gist and spgist must fail.
  */
 CREATE TABLE tbl (c1 int,c2 int, c3 box, c4 box);
 CREATE INDEX on tbl USING brin(c1, c2) INCLUDE (c3, c4);