*   ENHANCEMENTS, OR MODIFICATIONS.
  *
  * IDENTIFICATION
- *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.103 2006/02/28 23:38:13 neilc Exp $
+ *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.104 2006/03/05 16:40:51 adunstan Exp $
  *
  **********************************************************************/
 
 #include "utils/typcache.h"
 #include "miscadmin.h"
 #include "mb/pg_wchar.h"
+#include "parser/parse_type.h"
 
 /* define this before the perl headers get a chance to mangle DLLIMPORT */
 extern DLLIMPORT bool check_function_bodies;
    MemoryContext     tmp_cxt;
 } plperl_call_data;
 
+/**********************************************************************
+ * The information we cache about prepared and saved plans
+ **********************************************************************/
+typedef struct plperl_query_desc
+{
+   char        qname[sizeof(long) * 2 + 1];
+   void       *plan;
+   int         nargs;
+   Oid        *argtypes;
+   FmgrInfo   *arginfuncs;
+   Oid        *argtypioparams;
+} plperl_query_desc;
 
 /**********************************************************************
  * Global data
 static bool plperl_safe_init_done = false;
 static PerlInterpreter *plperl_interp = NULL;
 static HV  *plperl_proc_hash = NULL;
+static HV  *plperl_query_hash = NULL;
 
 static bool plperl_use_strict = false;
 
    "$PLContainer->permit_only(':default');" \
    "$PLContainer->permit(qw[:base_math !:base_io sort time]);" \
    "$PLContainer->share(qw[&elog &spi_exec_query &return_next " \
-   "&spi_query &spi_fetchrow " \
+   "&spi_query &spi_fetchrow &spi_cursor_close " \
+   "&spi_prepare &spi_exec_prepared &spi_query_prepared &spi_freeplan " \
    "&_plperl_to_pg_array " \
    "&DEBUG &LOG &INFO &NOTICE &WARNING &ERROR %_SHARED ]);" \
    "sub ::mksafefunc {" \
    perl_run(plperl_interp);
 
    plperl_proc_hash = newHV();
+   plperl_query_hash = newHV();
 
 #ifdef WIN32
 
    {
        bool        uptodate;
 
-       prodesc = (plperl_proc_desc *) SvIV(*svp);
+       prodesc = INT2PTR( plperl_proc_desc *, SvUV(*svp));
 
        /************************************************************
         * If it's present, must check whether it's still up to date.
        }
 
        hv_store(plperl_proc_hash, internal_proname, proname_len,
-                newSViv((IV) prodesc), 0);
+                newSVuv( PTR2UV( prodesc)), 0);
    }
 
    ReleaseSysCache(procTup);
    PG_TRY();
    {
        void       *plan;
-       Portal      portal = NULL;
+       Portal      portal;
 
        /* Create a cursor for the query */
        plan = SPI_prepare(query, 0, NULL);
-       if (plan)
-           portal = SPI_cursor_open(NULL, plan, NULL, NULL, false);
-       if (portal)
-           cursor = newSVpv(portal->name, 0);
-       else
-           cursor = newSV(0);
+       if ( plan == NULL)
+           elog(ERROR, "SPI_prepare() failed:%s",
+               SPI_result_code_string(SPI_result));
+
+       portal = SPI_cursor_open(NULL, plan, NULL, NULL, false);
+       SPI_freeplan( plan);
+       if ( portal == NULL) 
+           elog(ERROR, "SPI_cursor_open() failed:%s",
+               SPI_result_code_string(SPI_result));
+       cursor = newSVpv(portal->name, 0);
 
        /* Commit the inner transaction, return to outer xact context */
        ReleaseCurrentSubTransaction();
        Portal      p = SPI_cursor_find(cursor);
 
        if (!p)
-           row = newSV(0);
+       {
+           row = &PL_sv_undef;
+       }
        else
        {
            SPI_cursor_fetch(p, true, 1);
            if (SPI_processed == 0)
            {
                SPI_cursor_close(p);
-               row = newSV(0);
+               row = &PL_sv_undef;
            }
            else
            {
 
    return row;
 }
+
+void
+plperl_spi_cursor_close(char *cursor)
+{
+   Portal p = SPI_cursor_find(cursor);
+   if (p)
+       SPI_cursor_close(p);
+}
+
+SV *
+plperl_spi_prepare(char* query, int argc, SV ** argv)
+{
+   plperl_query_desc *qdesc;
+   void       *plan;
+   int         i;
+   HeapTuple   typeTup;
+
+   MemoryContext oldcontext = CurrentMemoryContext;
+   ResourceOwner oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   MemoryContextSwitchTo(oldcontext);
+
+   /************************************************************
+    * Allocate the new querydesc structure
+    ************************************************************/
+   qdesc = (plperl_query_desc *) malloc(sizeof(plperl_query_desc));
+   MemSet(qdesc, 0, sizeof(plperl_query_desc));
+   snprintf(qdesc-> qname, sizeof(qdesc-> qname), "%lx", (long) qdesc);
+   qdesc-> nargs = argc;
+   qdesc-> argtypes = (Oid *) malloc(argc * sizeof(Oid));
+   qdesc-> arginfuncs = (FmgrInfo *) malloc(argc * sizeof(FmgrInfo));
+   qdesc-> argtypioparams = (Oid *) malloc(argc * sizeof(Oid));
+
+   PG_TRY();
+   {
+       /************************************************************
+        * Lookup the argument types by name in the system cache
+        * and remember the required information for input conversion
+        ************************************************************/
+       for (i = 0; i < argc; i++)
+       {
+           char       *argcopy;
+           List       *names = NIL;
+           ListCell   *l;
+           TypeName   *typename;
+
+           /************************************************************
+            * Use SplitIdentifierString() on a copy of the type name,
+            * turn the resulting pointer list into a TypeName node
+            * and call typenameType() to get the pg_type tuple.
+            ************************************************************/
+           argcopy = pstrdup(SvPV(argv[i],PL_na));
+           SplitIdentifierString(argcopy, '.', &names);
+           typename = makeNode(TypeName);
+           foreach(l, names)
+               typename->names = lappend(typename->names, makeString(lfirst(l)));
+
+           typeTup = typenameType(typename);
+           qdesc->argtypes[i] = HeapTupleGetOid(typeTup);
+           perm_fmgr_info(((Form_pg_type) GETSTRUCT(typeTup))->typinput,
+                          &(qdesc->arginfuncs[i]));
+           qdesc->argtypioparams[i] = getTypeIOParam(typeTup);
+           ReleaseSysCache(typeTup);
+
+           list_free(typename->names);
+           pfree(typename);
+           list_free(names);
+           pfree(argcopy);
+       }
+
+       /************************************************************
+        * Prepare the plan and check for errors
+        ************************************************************/
+       plan = SPI_prepare(query, argc, qdesc->argtypes);
+
+       if (plan == NULL)
+           elog(ERROR, "SPI_prepare() failed:%s",
+               SPI_result_code_string(SPI_result));
+
+       /************************************************************
+        * Save the plan into permanent memory (right now it's in the
+        * SPI procCxt, which will go away at function end).
+        ************************************************************/
+       qdesc->plan = SPI_saveplan(plan);
+       if (qdesc->plan == NULL)
+           elog(ERROR, "SPI_saveplan() failed: %s", 
+               SPI_result_code_string(SPI_result));
+
+       /* Release the procCxt copy to avoid within-function memory leak */
+       SPI_freeplan(plan);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context,
+        * but just in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+       
+       free(qdesc-> argtypes);
+       free(qdesc-> arginfuncs);
+       free(qdesc-> argtypioparams);
+       free(qdesc);
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact,
+        * it will have left us in a disconnected state.  We need this
+        * hack to return to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Punt the error to Perl */
+       croak("%s", edata->message);
+
+       /* Can't get here, but keep compiler quiet */
+       return NULL;
+   }
+   PG_END_TRY();
+
+   /************************************************************
+    * Insert a hashtable entry for the plan and return
+    * the key to the caller.
+    ************************************************************/
+   hv_store( plperl_query_hash, qdesc->qname, strlen(qdesc->qname), newSVuv( PTR2UV( qdesc)), 0);
+
+   return newSVpv( qdesc->qname, strlen(qdesc->qname));
+}  
+
+HV *
+plperl_spi_exec_prepared(char* query, HV * attr, int argc, SV ** argv)
+{
+   HV         *ret_hv;
+   SV **sv;
+   int i, limit, spi_rv;
+   char * nulls;
+   Datum      *argvalues;
+   plperl_query_desc *qdesc;
+
+   /*
+    * Execute the query inside a sub-transaction, so we can cope with
+    * errors sanely
+    */
+   MemoryContext oldcontext = CurrentMemoryContext;
+   ResourceOwner oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   /* Want to run inside function's memory context */
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       /************************************************************
+        * Fetch the saved plan descriptor, see if it's o.k.
+        ************************************************************/
+       sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
+       if ( sv == NULL) 
+           elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
+       if ( *sv == NULL || !SvOK( *sv))
+           elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value corrupted");
+
+       qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
+       if ( qdesc == NULL)
+           elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");
+
+       if ( qdesc-> nargs != argc) 
+           elog(ERROR, "spi_exec_prepared: expected %d argument(s), %d passed", 
+               qdesc-> nargs, argc);
+       
+       /************************************************************
+        * Parse eventual attributes
+        ************************************************************/
+       limit = 0;
+       if ( attr != NULL) 
+       {
+           sv = hv_fetch( attr, "limit", 5, 0);
+           if ( *sv && SvIOK( *sv))
+               limit = SvIV( *sv);
+       }
+       /************************************************************
+        * Set up arguments
+        ************************************************************/
+       if ( argc > 0) 
+       {
+           nulls = (char *)palloc( argc);
+           argvalues = (Datum *) palloc(argc * sizeof(Datum));
+           if ( nulls == NULL || argvalues == NULL) 
+               elog(ERROR, "spi_exec_prepared: not enough memory");
+       } 
+       else 
+       {
+           nulls = NULL;
+           argvalues = NULL;
+       }
+
+       for ( i = 0; i < argc; i++) 
+       {
+           if ( SvTYPE( argv[i]) != SVt_NULL) 
+           {
+               argvalues[i] =
+                   FunctionCall3( &qdesc->arginfuncs[i],
+                         CStringGetDatum( SvPV( argv[i], PL_na)),
+                         ObjectIdGetDatum( qdesc->argtypioparams[i]),
+                         Int32GetDatum(-1)
+                   );
+               nulls[i] = ' ';
+           } 
+           else 
+           {
+               argvalues[i] = (Datum) 0;
+               nulls[i] = 'n';
+           }
+       }
+
+       /************************************************************
+        * go
+        ************************************************************/
+       spi_rv = SPI_execute_plan(qdesc-> plan, argvalues, nulls, 
+                            current_call_data->prodesc->fn_readonly, limit);
+       ret_hv = plperl_spi_execute_fetch_result(SPI_tuptable, SPI_processed,
+                                                spi_rv);
+       if ( argc > 0) 
+       {
+           pfree( argvalues);
+           pfree( nulls);
+       }
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context,
+        * but just in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact,
+        * it will have left us in a disconnected state.  We need this
+        * hack to return to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Punt the error to Perl */
+       croak("%s", edata->message);
+
+       /* Can't get here, but keep compiler quiet */
+       return NULL;
+   }
+   PG_END_TRY();
+
+   return ret_hv;
+}
+
+SV *
+plperl_spi_query_prepared(char* query, int argc, SV ** argv)
+{
+   SV **sv;
+   int i;
+   char * nulls;
+   Datum      *argvalues;
+   plperl_query_desc *qdesc;
+   SV *cursor;
+   Portal portal = NULL;
+
+   /*
+    * Execute the query inside a sub-transaction, so we can cope with
+    * errors sanely
+    */
+   MemoryContext oldcontext = CurrentMemoryContext;
+   ResourceOwner oldowner = CurrentResourceOwner;
+
+   BeginInternalSubTransaction(NULL);
+   /* Want to run inside function's memory context */
+   MemoryContextSwitchTo(oldcontext);
+
+   PG_TRY();
+   {
+       /************************************************************
+        * Fetch the saved plan descriptor, see if it's o.k.
+        ************************************************************/
+       sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
+       if ( sv == NULL) 
+           elog(ERROR, "spi_query_prepared: Invalid prepared query passed");
+       if ( *sv == NULL || !SvOK( *sv))
+           elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value corrupted");
+
+       qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
+       if ( qdesc == NULL)
+           elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");
+
+       if ( qdesc-> nargs != argc) 
+           elog(ERROR, "spi_query_prepared: expected %d argument(s), %d passed", 
+               qdesc-> nargs, argc);
+       
+       /************************************************************
+        * Set up arguments
+        ************************************************************/
+       if ( argc > 0) 
+       {
+           nulls = (char *)palloc( argc);
+           argvalues = (Datum *) palloc(argc * sizeof(Datum));
+           if ( nulls == NULL || argvalues == NULL) 
+               elog(ERROR, "spi_query_prepared: not enough memory");
+       } 
+       else 
+       {
+           nulls = NULL;
+           argvalues = NULL;
+       }
+
+       for ( i = 0; i < argc; i++) 
+       {
+           if ( SvTYPE( argv[i]) != SVt_NULL) 
+           {
+               argvalues[i] =
+                   FunctionCall3( &qdesc->arginfuncs[i],
+                         CStringGetDatum( SvPV( argv[i], PL_na)),
+                         ObjectIdGetDatum( qdesc->argtypioparams[i]),
+                         Int32GetDatum(-1)
+                   );
+               nulls[i] = ' ';
+           } 
+           else 
+           {
+               argvalues[i] = (Datum) 0;
+               nulls[i] = 'n';
+           }
+       }
+
+       /************************************************************
+        * go
+        ************************************************************/
+       portal = SPI_cursor_open(NULL, qdesc-> plan, argvalues, nulls, 
+                           current_call_data->prodesc->fn_readonly);
+       if ( argc > 0) 
+       {
+           pfree( argvalues);
+           pfree( nulls);
+       }
+       if ( portal == NULL) 
+           elog(ERROR, "SPI_cursor_open() failed:%s",
+               SPI_result_code_string(SPI_result));
+
+       cursor = newSVpv(portal->name, 0);
+
+       /* Commit the inner transaction, return to outer xact context */
+       ReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+       /*
+        * AtEOSubXact_SPI() should not have popped any SPI context,
+        * but just in case it did, make sure we remain connected.
+        */
+       SPI_restore_connection();
+   }
+   PG_CATCH();
+   {
+       ErrorData  *edata;
+
+       /* Save error info */
+       MemoryContextSwitchTo(oldcontext);
+       edata = CopyErrorData();
+       FlushErrorState();
+
+       /* Abort the inner transaction */
+       RollbackAndReleaseCurrentSubTransaction();
+       MemoryContextSwitchTo(oldcontext);
+       CurrentResourceOwner = oldowner;
+
+       /*
+        * If AtEOSubXact_SPI() popped any SPI context of the subxact,
+        * it will have left us in a disconnected state.  We need this
+        * hack to return to connected state.
+        */
+       SPI_restore_connection();
+
+       /* Punt the error to Perl */
+       croak("%s", edata->message);
+
+       /* Can't get here, but keep compiler quiet */
+       return NULL;
+   }
+   PG_END_TRY();
+
+   return cursor;
+}
+
+void
+plperl_spi_freeplan(char *query)
+{
+   SV ** sv;
+   void * plan;
+   plperl_query_desc *qdesc;
+
+   sv = hv_fetch(plperl_query_hash, query, strlen(query), 0);
+   if ( sv == NULL) 
+       elog(ERROR, "spi_exec_freeplan: Invalid prepared query passed");
+   if ( *sv == NULL || !SvOK( *sv))
+       elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value corrupted");
+
+   qdesc = INT2PTR( plperl_query_desc *, SvUV(*sv));
+   if ( qdesc == NULL)
+       elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");
+
+   /*
+   *   free all memory before SPI_freeplan, so if it dies, nothing will be left over
+   */
+   hv_delete(plperl_query_hash, query, strlen(query), G_DISCARD);
+   plan = qdesc-> plan;
+   free(qdesc-> argtypes);
+   free(qdesc-> arginfuncs);
+   free(qdesc-> argtypioparams);
+   free(qdesc);
+
+   SPI_freeplan( plan);
+}