#include "postgres.h"
 
-#include "fmgr.h"
-#include "funcapi.h"
 #include "access/heapam.h"
-#include "access/itup.h"
 #include "access/nbtree.h"
-#include "access/transam.h"
 #include "catalog/namespace.h"
 #include "catalog/pg_type.h"
+#include "funcapi.h"
+#include "miscadmin.h"
 #include "utils/builtins.h"
-#include "utils/inval.h"
 
-PG_FUNCTION_INFO_V1(bt_metap);
-PG_FUNCTION_INFO_V1(bt_page_items);
-PG_FUNCTION_INFO_V1(bt_page_stats);
 
 extern Datum bt_metap(PG_FUNCTION_ARGS);
 extern Datum bt_page_items(PG_FUNCTION_ARGS);
 extern Datum bt_page_stats(PG_FUNCTION_ARGS);
 
-#define BTMETAP_TYPE "public.bt_metap_type"
-#define BTMETAP_NCOLUMNS 6
-
-#define BTPAGEITEMS_TYPE "public.bt_page_items_type"
-#define BTPAGEITEMS_NCOLUMNS 6
-
-#define BTPAGESTATS_TYPE "public.bt_page_stats_type"
-#define BTPAGESTATS_NCOLUMNS 11
-
+PG_FUNCTION_INFO_V1(bt_metap);
+PG_FUNCTION_INFO_V1(bt_page_items);
+PG_FUNCTION_INFO_V1(bt_page_stats);
 
-#define IS_INDEX(r) ((r)->rd_rel->relkind == 'i')
+#define IS_INDEX(r) ((r)->rd_rel->relkind == RELKIND_INDEX)
 #define IS_BTREE(r) ((r)->rd_rel->relam == BTREE_AM_OID)
 
 #define CHECK_PAGE_OFFSET_RANGE(pg, offnum) { \
    BTCycleId   btpo_cycleid;
 }  BTPageStat;
 
-/* ------------------------------------------------
- * A structure for a whole btree index statistics
- * used by pgstatindex().
- * ------------------------------------------------
- */
-typedef struct BTIndexStat
-{
-   uint32      magic;
-   uint32      version;
-   BlockNumber root_blkno;
-   uint32      level;
-
-   BlockNumber fastroot;
-   uint32      fastlevel;
-
-   uint32      live_items;
-   uint32      dead_items;
-
-   uint32      root_pages;
-   uint32      internal_pages;
-   uint32      leaf_pages;
-   uint32      empty_pages;
-   uint32      deleted_pages;
-
-   uint32      page_size;
-   uint32      avg_item_size;
-
-   uint32      max_avail;
-   uint32      free_space;
-}  BTIndexStat;
-
 
 /* -------------------------------------------------
  * GetBTPageStatistics()
  *
- * Collect statistics of single b-tree leaf page
+ * Collect statistics of single b-tree page
  * -------------------------------------------------
  */
 static void
 /* -----------------------------------------------
  * bt_page()
  *
- * Usage: SELECT * FROM bt_page('t1_pkey', 0);
+ * Usage: SELECT * FROM bt_page('t1_pkey', 1);
  * -----------------------------------------------
  */
 Datum
    text       *relname = PG_GETARG_TEXT_P(0);
    uint32      blkno = PG_GETARG_UINT32(1);
    Buffer      buffer;
-
    Relation    rel;
    RangeVar   *relrv;
    Datum       result;
+   HeapTuple   tuple;
+   TupleDesc   tupleDesc;
+   int         j;
+   char       *values[11];
+   BTPageStat  stat;
+
+   if (!superuser())
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                (errmsg("must be superuser to use pageinspect functions"))));
 
    relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
    rel = relation_openrv(relrv, AccessShareLock);
 
-   CHECK_RELATION_BLOCK_RANGE(rel, blkno);
-
-   buffer = ReadBuffer(rel, blkno);
-
    if (!IS_INDEX(rel) || !IS_BTREE(rel))
-       elog(ERROR, "bt_page_stats() can only be used on b-tree index");
+       elog(ERROR, "relation \"%s\" is not a btree index",
+            RelationGetRelationName(rel));
 
    if (blkno == 0)
        elog(ERROR, "block 0 is a meta page");
 
-   {
-       HeapTuple   tuple;
-       TupleDesc   tupleDesc;
-       int         j;
-       char       *values[BTPAGESTATS_NCOLUMNS];
-
-       BTPageStat  stat;
-
-       /* keep compiler quiet */
-       stat.btpo_prev = stat.btpo_next = InvalidBlockNumber;
-       stat.btpo_flags = stat.free_size = stat.avg_item_size = 0;
-
-       GetBTPageStatistics(blkno, buffer, &stat);
-
-       tupleDesc = RelationNameGetTupleDesc(BTPAGESTATS_TYPE);
-
-       j = 0;
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.blkno);
-
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%c", stat.type);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.live_items);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.dead_items);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.avg_item_size);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.page_size);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.free_size);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.btpo_prev);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.btpo_next);
+   CHECK_RELATION_BLOCK_RANGE(rel, blkno);
 
-       values[j] = palloc(32);
-       if (stat.type == 'd')
-           snprintf(values[j++], 32, "%d", stat.btpo.xact);
-       else
-           snprintf(values[j++], 32, "%d", stat.btpo.level);
+   buffer = ReadBuffer(rel, blkno);
 
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", stat.btpo_flags);
+   /* keep compiler quiet */
+   stat.btpo_prev = stat.btpo_next = InvalidBlockNumber;
+   stat.btpo_flags = stat.free_size = stat.avg_item_size = 0;
+
+   GetBTPageStatistics(blkno, buffer, &stat);
+
+   /* Build a tuple descriptor for our result type */
+   if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
+       elog(ERROR, "return type must be a row type");
+
+   j = 0;
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.blkno);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%c", stat.type);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.live_items);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.dead_items);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.avg_item_size);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.page_size);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.free_size);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.btpo_prev);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.btpo_next);
+   values[j] = palloc(32);
+   if (stat.type == 'd')
+       snprintf(values[j++], 32, "%d", stat.btpo.xact);
+   else
+       snprintf(values[j++], 32, "%d", stat.btpo.level);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", stat.btpo_flags);
 
-       tuple = BuildTupleFromCStrings(TupleDescGetAttInMetadata(tupleDesc),
-                                      values);
+   tuple = BuildTupleFromCStrings(TupleDescGetAttInMetadata(tupleDesc),
+                                  values);
 
-       result = TupleGetDatum(TupleDescGetSlot(tupleDesc), tuple);
-   }
+   result = HeapTupleGetDatum(tuple);
 
    ReleaseBuffer(buffer);
 
 /*-------------------------------------------------------
  * bt_page_items()
  *
- * Get IndexTupleData set in a leaf page
+ * Get IndexTupleData set in a btree page
  *
- * Usage: SELECT * FROM bt_page_items('t1_pkey', 0);
+ * Usage: SELECT * FROM bt_page_items('t1_pkey', 1);
  *-------------------------------------------------------
  */
-/* ---------------------------------------------------
- * data structure for SRF to hold a scan information
- * ---------------------------------------------------
+
+/*
+ * cross-call data structure for SRF
  */
 struct user_args
 {
-   TupleDesc   tupd;
-   Relation    rel;
-   Buffer      buffer;
    Page        page;
-   uint16      offset;
+   OffsetNumber offset;
 };
 
 Datum
 {
    text       *relname = PG_GETARG_TEXT_P(0);
    uint32      blkno = PG_GETARG_UINT32(1);
-
-   RangeVar   *relrv;
    Datum       result;
-   char       *values[BTPAGEITEMS_NCOLUMNS];
-   BTPageOpaque opaque;
+   char       *values[6];
    HeapTuple   tuple;
-   ItemId      id;
-
    FuncCallContext *fctx;
    MemoryContext mctx;
-   struct user_args *uargs = NULL;
+   struct user_args *uargs;
 
-   if (blkno == 0)
-       elog(ERROR, "block 0 is a meta page");
+   if (!superuser())
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                (errmsg("must be superuser to use pageinspect functions"))));
 
    if (SRF_IS_FIRSTCALL())
    {
+       RangeVar   *relrv;
+       Relation    rel;
+       Buffer      buffer;
+       BTPageOpaque opaque;
+       TupleDesc   tupleDesc;
+
        fctx = SRF_FIRSTCALL_INIT();
-       mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
 
-       uargs = palloc(sizeof(struct user_args));
+       relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
+       rel = relation_openrv(relrv, AccessShareLock);
 
-       uargs->tupd = RelationNameGetTupleDesc(BTPAGEITEMS_TYPE);
-       uargs->offset = FirstOffsetNumber;
+       if (!IS_INDEX(rel) || !IS_BTREE(rel))
+           elog(ERROR, "relation \"%s\" is not a btree index",
+                RelationGetRelationName(rel));
 
-       relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
-       uargs->rel = relation_openrv(relrv, AccessShareLock);
+       if (blkno == 0)
+           elog(ERROR, "block 0 is a meta page");
+
+       CHECK_RELATION_BLOCK_RANGE(rel, blkno);
 
-       CHECK_RELATION_BLOCK_RANGE(uargs->rel, blkno);
+       buffer = ReadBuffer(rel, blkno);
 
-       uargs->buffer = ReadBuffer(uargs->rel, blkno);
+       /*
+        * We copy the page into local storage to avoid holding pin on
+        * the buffer longer than we must, and possibly failing to
+        * release it at all if the calling query doesn't fetch all rows.
+        */
+       mctx = MemoryContextSwitchTo(fctx->multi_call_memory_ctx);
+
+       uargs = palloc(sizeof(struct user_args));
+
+       uargs->page = palloc(BLCKSZ);
+       memcpy(uargs->page, BufferGetPage(buffer), BLCKSZ);
 
-       if (!IS_INDEX(uargs->rel) || !IS_BTREE(uargs->rel))
-           elog(ERROR, "bt_page_items() can only be used on b-tree index");
+       ReleaseBuffer(buffer);
+       relation_close(rel, AccessShareLock);
 
-       uargs->page = BufferGetPage(uargs->buffer);
+       uargs->offset = FirstOffsetNumber;
 
        opaque = (BTPageOpaque) PageGetSpecialPointer(uargs->page);
 
            elog(NOTICE, "page is deleted");
 
        fctx->max_calls = PageGetMaxOffsetNumber(uargs->page);
+
+       /* Build a tuple descriptor for our result type */
+       if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
+           elog(ERROR, "return type must be a row type");
+
+       fctx->attinmeta = TupleDescGetAttInMetadata(tupleDesc);
+
        fctx->user_fctx = uargs;
 
        MemoryContextSwitchTo(mctx);
 
    if (fctx->call_cntr < fctx->max_calls)
    {
+       ItemId      id;
        IndexTuple  itup;
+       int         j;
+       int         off;
+       int         dlen;
+       char       *dump;
+       char       *ptr;
 
        id = PageGetItemId(uargs->page, uargs->offset);
 
 
        itup = (IndexTuple) PageGetItem(uargs->page, id);
 
+       j = 0;
+       values[j] = palloc(32);
+       snprintf(values[j++], 32, "%d", uargs->offset);
+       values[j] = palloc(32);
+       snprintf(values[j++], 32, "(%u,%u)",
+                BlockIdGetBlockNumber(&(itup->t_tid.ip_blkid)),
+                itup->t_tid.ip_posid);
+       values[j] = palloc(32);
+       snprintf(values[j++], 32, "%d", (int) IndexTupleSize(itup));
+       values[j] = palloc(32);
+       snprintf(values[j++], 32, "%c", IndexTupleHasNulls(itup) ? 't' : 'f');
+       values[j] = palloc(32);
+       snprintf(values[j++], 32, "%c", IndexTupleHasVarwidths(itup) ? 't' : 'f');
+
+       ptr = (char *) itup + IndexInfoFindDataOffset(itup->t_info);
+       dlen = IndexTupleSize(itup) - IndexInfoFindDataOffset(itup->t_info);
+       dump = palloc0(dlen * 3 + 1);
+       values[j] = dump;
+       for (off = 0; off < dlen; off++)
        {
-           int         j = 0;
-
-           BlockNumber blkno = BlockIdGetBlockNumber(&(itup->t_tid.ip_blkid));
-
-           values[j] = palloc(32);
-           snprintf(values[j++], 32, "%d", uargs->offset);
-           values[j] = palloc(32);
-           snprintf(values[j++], 32, "(%u,%u)", blkno, itup->t_tid.ip_posid);
-           values[j] = palloc(32);
-           snprintf(values[j++], 32, "%d", (int) IndexTupleSize(itup));
-           values[j] = palloc(32);
-           snprintf(values[j++], 32, "%c", IndexTupleHasNulls(itup) ? 't' : 'f');
-           values[j] = palloc(32);
-           snprintf(values[j++], 32, "%c", IndexTupleHasVarwidths(itup) ? 't' : 'f');
-
-           {
-               int         off;
-               char       *dump;
-               char       *ptr = (char *) itup + IndexInfoFindDataOffset(itup->t_info);
-
-               dump = palloc(IndexTupleSize(itup) * 3);
-               memset(dump, 0, IndexTupleSize(itup) * 3);
-
-               for (off = 0;
-                    off < IndexTupleSize(itup) - IndexInfoFindDataOffset(itup->t_info);
-                    off++)
-               {
-                   if (dump[0] == '\0')
-                       sprintf(dump, "%02x", *(ptr + off) & 0xff);
-                   else
-                   {
-                       char        buf[4];
-
-                       sprintf(buf, " %02x", *(ptr + off) & 0xff);
-                       strcat(dump, buf);
-                   }
-               }
-               values[j] = dump;
-           }
-
-           tuple = BuildTupleFromCStrings(TupleDescGetAttInMetadata(uargs->tupd), values);
-           result = TupleGetDatum(TupleDescGetSlot(uargs->tupd), tuple);
+           if (off > 0)
+               *dump++ = ' ';
+           sprintf(dump, "%02x", *(ptr + off) & 0xff);
+           dump += 2;
        }
 
+       tuple = BuildTupleFromCStrings(fctx->attinmeta, values);
+       result = HeapTupleGetDatum(tuple);
+
        uargs->offset = uargs->offset + 1;
 
        SRF_RETURN_NEXT(fctx, result);
    }
    else
    {
-       ReleaseBuffer(uargs->buffer);
-       relation_close(uargs->rel, AccessShareLock);
-
+       pfree(uargs->page);
+       pfree(uargs);
        SRF_RETURN_DONE(fctx);
    }
 }
 /* ------------------------------------------------
  * bt_metap()
  *
- * Get a btree meta-page information
+ * Get a btree's meta-page information
  *
  * Usage: SELECT * FROM bt_metap('t1_pkey')
  * ------------------------------------------------
 bt_metap(PG_FUNCTION_ARGS)
 {
    text       *relname = PG_GETARG_TEXT_P(0);
-   Buffer      buffer;
-
+   Datum       result;
    Relation    rel;
    RangeVar   *relrv;
-   Datum       result;
+   BTMetaPageData *metad;
+   TupleDesc   tupleDesc;
+   int         j;
+   char       *values[6];
+   Buffer      buffer;
+   Page        page;
+   HeapTuple   tuple;
+
+   if (!superuser())
+       ereport(ERROR,
+               (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                (errmsg("must be superuser to use pageinspect functions"))));
 
    relrv = makeRangeVarFromNameList(textToQualifiedNameList(relname));
    rel = relation_openrv(relrv, AccessShareLock);
 
    if (!IS_INDEX(rel) || !IS_BTREE(rel))
-       elog(ERROR, "bt_metap() can only be used on b-tree index");
+       elog(ERROR, "relation \"%s\" is not a btree index",
+            RelationGetRelationName(rel));
 
    buffer = ReadBuffer(rel, 0);
-
-   {
-       BTMetaPageData *metad;
-
-       TupleDesc   tupleDesc;
-       int         j;
-       char       *values[BTMETAP_NCOLUMNS];
-       HeapTuple   tuple;
-
-       Page        page = BufferGetPage(buffer);
-
-       metad = BTPageGetMeta(page);
-
-       tupleDesc = RelationNameGetTupleDesc(BTMETAP_TYPE);
-
-       j = 0;
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_magic);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_version);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_root);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_level);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_fastroot);
-       values[j] = palloc(32);
-       snprintf(values[j++], 32, "%d", metad->btm_fastlevel);
-
-       tuple = BuildTupleFromCStrings(TupleDescGetAttInMetadata(tupleDesc),
-                                      values);
-
-       result = TupleGetDatum(TupleDescGetSlot(tupleDesc), tuple);
-   }
+   page = BufferGetPage(buffer);
+   metad = BTPageGetMeta(page);
+
+   /* Build a tuple descriptor for our result type */
+   if (get_call_result_type(fcinfo, NULL, &tupleDesc) != TYPEFUNC_COMPOSITE)
+       elog(ERROR, "return type must be a row type");
+
+   j = 0;
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_magic);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_version);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_root);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_level);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_fastroot);
+   values[j] = palloc(32);
+   snprintf(values[j++], 32, "%d", metad->btm_fastlevel);
+
+   tuple = BuildTupleFromCStrings(TupleDescGetAttInMetadata(tupleDesc),
+                                  values);
+
+   result = HeapTupleGetDatum(tuple);
 
    ReleaseBuffer(buffer);