/**********************************************************************
  * plperl.c - perl as a procedural language for PostgreSQL
  *
- *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.121 2006/10/19 18:32:47 tgl Exp $
+ *   $PostgreSQL: pgsql/src/pl/plperl/plperl.c,v 1.122 2006/11/13 17:13:57 adunstan Exp $
  *
  **********************************************************************/
 
 #include "utils/lsyscache.h"
 #include "utils/memutils.h"
 #include "utils/typcache.h"
+#include "utils/hsearch.h"
 
 /* perl stuff */
 #include "plperl.h"
    SV         *reference;
 } plperl_proc_desc;
 
+/* hash table entry for proc desc  */
+
+typedef struct plperl_proc_entry
+{
+   char proc_name[NAMEDATALEN];
+   plperl_proc_desc *proc_data;
+} plperl_proc_entry;
+
 /*
  * The information we cache for the duration of a single call to a
  * function.
    Oid        *argtypioparams;
 } plperl_query_desc;
 
+/* hash table entry for query desc  */
+
+typedef struct plperl_query_entry
+{
+   char query_name[NAMEDATALEN];
+   plperl_query_desc *query_data;
+} plperl_query_entry;
+
 /**********************************************************************
  * Global data
  **********************************************************************/
+
+typedef enum
+{
+   INTERP_NONE,
+   INTERP_HELD,
+   INTERP_TRUSTED,
+   INTERP_UNTRUSTED,
+   INTERP_BOTH
+} InterpState;
+
+static InterpState interp_state = INTERP_NONE;
+static bool can_run_two = false;
+
 static bool plperl_safe_init_done = false;
-static PerlInterpreter *plperl_interp = NULL;
-static HV  *plperl_proc_hash = NULL;
-static HV  *plperl_query_hash = NULL;
+static PerlInterpreter *plperl_trusted_interp = NULL;
+static PerlInterpreter *plperl_untrusted_interp = NULL;
+static PerlInterpreter *plperl_held_interp = NULL;
+static bool can_run_two;
+static bool trusted_context;
+static HTAB  *plperl_proc_hash = NULL;
+static HTAB  *plperl_query_hash = NULL;
 
 static bool plperl_use_strict = false;
 
 {
    /* Be sure we do initialization only once (should be redundant now) */
    static bool inited = false;
+    HASHCTL     hash_ctl;
 
    if (inited)
        return;
 
    EmitWarningsOnPlaceholders("plperl");
 
+   MemSet(&hash_ctl, 0, sizeof(hash_ctl));
+
+   hash_ctl.keysize = NAMEDATALEN;
+   hash_ctl.entrysize = sizeof(plperl_proc_entry);
+
+   plperl_proc_hash = hash_create("PLPerl Procedures",
+                                  32,
+                                  &hash_ctl,
+                                  HASH_ELEM);
+
+   hash_ctl.entrysize = sizeof(plperl_query_entry);
+   plperl_query_hash = hash_create("PLPerl Queries",
+                                   32,
+                                   &hash_ctl,
+                                   HASH_ELEM);
+
    plperl_init_interp();
 
    inited = true;
    "      elog(ERROR,'trusted Perl functions disabled - " \
    "      please upgrade Perl Safe module to version 2.09 or later');}]); }"
 
+#define TEST_FOR_MULTI \
+   "use Config; " \
+   "$Config{usemultiplicity} eq 'define' or "  \
+    "($Config{usethreads} eq 'define' " \
+   " and $Config{useithreads} eq 'define')"
+
+
+/********************************************************************
+ *
+ * We start out by creating a "held" interpreter that we can use in
+ * trusted or untrusted mode (but not both) as the need arises. Later, we
+ * assign that interpreter if it is available to either the trusted or 
+ * untrusted interpreter. If it has already been assigned, and we need to
+ * create the other interpreter, we do that if we can, or error out.
+ * We detect if it is safe to run two interpreters during the setup of the
+ * dummy interpreter.
+ */
+
+
+static void 
+check_interp(bool trusted)
+{
+   if (interp_state == INTERP_HELD)
+   {
+       if (trusted)
+       {
+           plperl_trusted_interp = plperl_held_interp;
+           interp_state = INTERP_TRUSTED;
+       }
+       else
+       {
+           plperl_untrusted_interp = plperl_held_interp;
+           interp_state = INTERP_UNTRUSTED;
+       }
+       plperl_held_interp = NULL;
+       trusted_context = trusted;
+   }
+   else if (interp_state == INTERP_BOTH || 
+            (trusted && interp_state == INTERP_TRUSTED) ||
+            (!trusted && interp_state == INTERP_UNTRUSTED))
+   {
+       if (trusted_context != trusted)
+       {
+           if (trusted)
+               PERL_SET_CONTEXT(plperl_trusted_interp);
+           else
+               PERL_SET_CONTEXT(plperl_untrusted_interp);
+           trusted_context = trusted;
+       }
+   }
+   else if (can_run_two)
+   {
+       PERL_SET_CONTEXT(plperl_held_interp);
+       plperl_init_interp();
+       if (trusted)
+           plperl_trusted_interp = plperl_held_interp;
+       else
+           plperl_untrusted_interp = plperl_held_interp;
+       interp_state = INTERP_BOTH;
+       plperl_held_interp = NULL;
+       trusted_context = trusted;
+   }
+   else
+   {
+       elog(ERROR, 
+            "can not allocate second Perl interpreter on this platform");
+
+   }
+   
+}
+
+
+static void
+restore_context (bool old_context)
+{
+   if (trusted_context != old_context)
+   {
+       if (old_context)
+           PERL_SET_CONTEXT(plperl_trusted_interp);
+       else
+           PERL_SET_CONTEXT(plperl_untrusted_interp);
+       trusted_context = old_context;
+   }
+}
 
 static void
 plperl_init_interp(void)
    save_time = loc ? pstrdup(loc) : NULL;
 #endif
 
-   plperl_interp = perl_alloc();
-   if (!plperl_interp)
+
+   plperl_held_interp = perl_alloc();
+   if (!plperl_held_interp)
        elog(ERROR, "could not allocate Perl interpreter");
 
-   perl_construct(plperl_interp);
-   perl_parse(plperl_interp, plperl_init_shared_libs, 3, embedding, NULL);
-   perl_run(plperl_interp);
+   perl_construct(plperl_held_interp);
+   perl_parse(plperl_held_interp, plperl_init_shared_libs, 
+              3, embedding, NULL);
+   perl_run(plperl_held_interp);
 
-   plperl_proc_hash = newHV();
-   plperl_query_hash = newHV();
+   if (interp_state == INTERP_NONE)
+   {
+       SV *res;
+
+       res = eval_pv(TEST_FOR_MULTI,TRUE);
+       can_run_two = SvIV(res); 
+       interp_state = INTERP_HELD;
+   }
 
 #ifdef WIN32
 
    Datum       retval;
    ReturnSetInfo *rsi;
    SV         *array_ret = NULL;
+   bool       oldcontext = trusted_context;
 
    /*
     * Create the call_data beforing connecting to SPI, so that it is not
                            "cannot accept a set")));
    }
 
+   check_interp(prodesc->lanpltrusted);
+
    perlret = plperl_call_perl_func(prodesc, fcinfo);
 
    /************************************************************
        SvREFCNT_dec(perlret);
 
    current_call_data = NULL;
+   restore_context(oldcontext);
+
    return retval;
 }
 
    Datum       retval;
    SV         *svTD;
    HV         *hvTD;
+   bool       oldcontext = trusted_context;
 
    /*
     * Create the call_data beforing connecting to SPI, so that it is not
    prodesc = compile_plperl_function(fcinfo->flinfo->fn_oid, true);
    current_call_data->prodesc = prodesc;
 
+   check_interp(prodesc->lanpltrusted);
+
    svTD = plperl_trigger_build_args(fcinfo);
    perlret = plperl_call_perl_trigger_func(prodesc, fcinfo, svTD);
    hvTD = (HV *) SvRV(svTD);
        SvREFCNT_dec(perlret);
 
    current_call_data = NULL;
+   restore_context(oldcontext);
    return retval;
 }
 
    char        internal_proname[64];
    plperl_proc_desc *prodesc = NULL;
    int         i;
-   SV        **svp;
+   plperl_proc_entry *hash_entry;
+   bool found;
+   bool oldcontext = trusted_context;
 
    /* We'll need the pg_proc tuple in any case... */
    procTup = SearchSysCache(PROCOID,
    /************************************************************
     * Lookup the internal proc name in the hashtable
     ************************************************************/
-   svp = hv_fetch_string(plperl_proc_hash, internal_proname);
-   if (svp)
+   hash_entry = hash_search(plperl_proc_hash, internal_proname, 
+                            HASH_FIND, NULL);
+
+   if (hash_entry)
    {
        bool        uptodate;
 
-       prodesc = INT2PTR(plperl_proc_desc *, SvUV(*svp));
+       prodesc = hash_entry->proc_data;
 
        /************************************************************
         * If it's present, must check whether it's still up to date.
 
        if (!uptodate)
        {
-           /* need we delete old entry? */
+           free(prodesc); /* are we leaking memory here? */
            prodesc = NULL;
+           hash_search(plperl_proc_hash, internal_proname,
+                       HASH_REMOVE,NULL);
        }
    }
 
        /************************************************************
         * Create the procedure in the interpreter
         ************************************************************/
+
+       check_interp(prodesc->lanpltrusted);
+
        prodesc->reference = plperl_create_sub(proc_source, prodesc->lanpltrusted);
+
+       restore_context(oldcontext);
+
        pfree(proc_source);
        if (!prodesc->reference)    /* can this happen? */
        {
                 internal_proname);
        }
 
-       hv_store_string(plperl_proc_hash, internal_proname,
-                       newSVuv(PTR2UV(prodesc)));
+       hash_entry = hash_search(plperl_proc_hash, internal_proname,
+                                HASH_ENTER, &found);
+       hash_entry->proc_data = prodesc;
    }
 
    ReleaseSysCache(procTup);
 plperl_spi_prepare(char *query, int argc, SV **argv)
 {
    plperl_query_desc *qdesc;
+   plperl_query_entry *hash_entry;
+   bool        found;
    void       *plan;
    int         i;
 
     * Insert a hashtable entry for the plan and return
     * the key to the caller.
     ************************************************************/
-   hv_store_string(plperl_query_hash, qdesc->qname, newSVuv(PTR2UV(qdesc)));
+
+   hash_entry = hash_search(plperl_query_hash, qdesc->qname,
+                            HASH_ENTER,&found);
+   hash_entry->query_data = qdesc;
 
    return newSVstring(qdesc->qname);
 }
    char       *nulls;
    Datum      *argvalues;
    plperl_query_desc *qdesc;
+   plperl_query_entry *hash_entry;
 
    /*
     * Execute the query inside a sub-transaction, so we can cope with errors
        /************************************************************
         * Fetch the saved plan descriptor, see if it's o.k.
         ************************************************************/
-       sv = hv_fetch_string(plperl_query_hash, query);
-       if (sv == NULL)
+
+       hash_entry = hash_search(plperl_query_hash, query,
+                                        HASH_FIND,NULL);
+       if (hash_entry == NULL)
            elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
-       if (*sv == NULL || !SvOK(*sv))
-           elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value corrupted");
 
-       qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
+       qdesc = hash_entry->query_data;
+
        if (qdesc == NULL)
            elog(ERROR, "spi_exec_prepared: panic - plperl_query_hash value vanished");
 
 SV *
 plperl_spi_query_prepared(char *query, int argc, SV **argv)
 {
-   SV        **sv;
    int         i;
    char       *nulls;
    Datum      *argvalues;
    plperl_query_desc *qdesc;
+   plperl_query_entry *hash_entry;
    SV         *cursor;
    Portal      portal = NULL;
 
        /************************************************************
         * Fetch the saved plan descriptor, see if it's o.k.
         ************************************************************/
-       sv = hv_fetch_string(plperl_query_hash, query);
-       if (sv == NULL)
-           elog(ERROR, "spi_query_prepared: Invalid prepared query passed");
-       if (*sv == NULL || !SvOK(*sv))
-           elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value corrupted");
+       hash_entry = hash_search(plperl_query_hash, query,
+                                        HASH_FIND,NULL);
+       if (hash_entry == NULL)
+           elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
+
+       qdesc = hash_entry->query_data;
 
-       qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
        if (qdesc == NULL)
            elog(ERROR, "spi_query_prepared: panic - plperl_query_hash value vanished");
 
 void
 plperl_spi_freeplan(char *query)
 {
-   SV        **sv;
    void       *plan;
    plperl_query_desc *qdesc;
+   plperl_query_entry *hash_entry;
 
-   sv = hv_fetch_string(plperl_query_hash, query);
-   if (sv == NULL)
-       elog(ERROR, "spi_exec_freeplan: Invalid prepared query passed");
-   if (*sv == NULL || !SvOK(*sv))
-       elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value corrupted");
+   hash_entry = hash_search(plperl_query_hash, query,
+                                        HASH_FIND,NULL);
+   if (hash_entry == NULL)
+       elog(ERROR, "spi_exec_prepared: Invalid prepared query passed");
+
+   qdesc = hash_entry->query_data;
 
-   qdesc = INT2PTR(plperl_query_desc *, SvUV(*sv));
    if (qdesc == NULL)
        elog(ERROR, "spi_exec_freeplan: panic - plperl_query_hash value vanished");
 
     * free all memory before SPI_freeplan, so if it dies, nothing will be
     * left over
     */
-   hv_delete(plperl_query_hash, query, strlen(query), G_DISCARD);
+   hash_search(plperl_query_hash, query, 
+               HASH_REMOVE,NULL);
+
    plan = qdesc->plan;
    free(qdesc->argtypes);
    free(qdesc->arginfuncs);