+++ /dev/null
-/*-------------------------------------------------------------------------
- *
- * postgresql_fdw.c
- * foreign-data wrapper for PostgreSQL
- *
- * Portions Copyright (c) 1996-2010, PostgreSQL Global Development Group
- *
- * IDENTIFICATION
- * $PostgreSQL$
- *
- *-------------------------------------------------------------------------
- */
-#include "postgres.h"
-
-#include "catalog/pg_operator.h"
-#include "catalog/pg_proc.h"
-#include "funcapi.h"
-#include "libpq-fe.h"
-#include "mb/pg_wchar.h"
-#include "miscadmin.h"
-#include "nodes/makefuncs.h"
-#include "nodes/nodeFuncs.h"
-#include "nodes/relation.h"
-#include "optimizer/clauses.h"
-#include "optimizer/cost.h"
-#include "parser/parsetree.h"
-#include "parser/scansup.h"
-#include "utils/builtins.h"
-#include "utils/lsyscache.h"
-#include "utils/memutils.h"
-#include "utils/resowner.h"
-#include "utils/syscache.h"
-
-#include "postgresql_fdw.h"
-
-PG_MODULE_MAGIC;
-
-extern Datum postgresql_fdw_handler(PG_FUNCTION_ARGS);
-
-/*
- * FDW routines
- */
-static FSConnection* pgConnectServer(ForeignServer *server, UserMapping *user);
-static void pgFreeFSConnection(FSConnection *conn);
-static void pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel);
-static void pgOpen(ForeignScanState *scanstate);
-static void pgBeginScan(ForeignScanState *scanstate);
-static void pgIterate(ForeignScanState *scanstate);
-static void pgClose(ForeignScanState *scanstate);
-static void pgReOpen(ForeignScanState *scanstate);
-
-/* helper for deparsing a request into SQL statement */
-static bool is_foreign_qual(Expr *expr);
-static bool foreign_qual_walker(Node *node, void *context);
-static void deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc, const char *aliasname, bool prefix);
-static void deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix);
-static char *deparseSql(ForeignScanState *scanstate);
-
-/* helper for handling result tuples */
-static void storeResult(Tuplestorestate *tupstore, bool is_sql_cmd,
- TupleDesc tupdesc, PGresult *res);
-
-/*
- * Connection management
- */
-static PGconn *GetConnection(ForeignServer *server, UserMapping *user);
-static void ReleaseConnection(PGconn *conn);
-static void check_conn_params(const char **keywords, const char **values);
-static PGconn *connect_pg_server(ForeignServer *server, UserMapping *user);
-static void cleanup_connection(ResourceReleasePhase phase,
- bool isCommit,
- bool isTopLevel,
- void *arg);
-
-/*
- * PostgreSQL specific portion of a foreign query request
- */
-typedef struct pgFdwReply
-{
- char *sql; /* SQL text to be sent to foreign server */
- Tuplestorestate *tupstore; /* store all of result tuples */
-} pgFdwReply;
-
-/*
- * FdwRoutine of PostgreSQL wrapper
- */
-static FdwRoutine postgresql_fdw_routine =
-{
- pgConnectServer,
- pgFreeFSConnection,
- pgEstimateCosts,
- pgOpen,
- pgBeginScan,
- pgIterate,
- pgClose,
- pgReOpen,
-};
-
-/*
- * return foreign-data wrapper handler object to execute foreign-data wrapper
- * routines.
- */
-PG_FUNCTION_INFO_V1(postgresql_fdw_handler);
-Datum
-postgresql_fdw_handler(PG_FUNCTION_ARGS)
-{
- PG_RETURN_POINTER(&postgresql_fdw_routine);
-}
-
-/*
- * Connect to a foreign PostgreSQL server with libpq if necessary.
- */
-static FSConnection *
-pgConnectServer(ForeignServer *server, UserMapping *user)
-{
- elog(DEBUG3, "%s() called for \"%s\"", __FUNCTION__, server->servername);
-
- return (FSConnection *) GetConnection(server, user);
-}
-
-
-/*
- * Disconnect from the foreign server if the connection is not referenced by
- * any other scan.
- */
-static void
-pgFreeFSConnection(FSConnection *conn)
-{
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- if (conn == NULL)
- return;
-
- ReleaseConnection((PGconn *)conn);
-}
-
-/*
- * Check whether the ExprState node can be evaluated in foreign server.
- *
- * An expression which consists of expressions below can be evaluated in
- * the foreign server.
- * - constant value
- * - variable (foreign table column)
- * - external parameter (parameter of prepared statement)
- * - array
- * - bool expression (AND/OR/NOT)
- * - NULL test (IS [NOT] NULL)
- * - operator
- * - IMMUTABLE only
- * - It is required that the meaning of the operator be the same as the
- * local server in the foreign server.
- * - function
- * - IMMUTABLE only
- * - It is required that the meaning of the operator be the same as the
- * local server in the foreign server.
- * - scalar array operator (ANY/ALL)
- */
-static bool
-is_foreign_qual(Expr *expr)
-{
- return !foreign_qual_walker((Node *) expr, NULL);
-}
-
-/*
- * return true if node can NOT be evaluatated in foreign server.
- */
-static bool
-foreign_qual_walker(Node *node, void *context)
-{
- if (node == NULL)
- return false;
-
- switch (nodeTag(node))
- {
- case T_Param:
- /* TODO: pass internal parameters to the foreign server */
- {
- ParamKind paramkind = ((Param *) node)->paramkind;
- elog(DEBUG1, "%s() param=%s", __FUNCTION__,
- paramkind == PARAM_EXTERN ? "PARAM_EXTERN" :
- paramkind == PARAM_EXEC ? "PARAM_EXEC" :
- paramkind == PARAM_SUBLINK ? "PARAM_SUBLINK" : "unkown");
- }
- if (((Param *) node)->paramkind != PARAM_EXTERN)
- return true;
- break;
- case T_DistinctExpr:
- case T_OpExpr:
- case T_ScalarArrayOpExpr:
- case T_FuncExpr:
- /*
- * If the qual contains any mutable function, the whole expression
- * should be evaluated on local side.
- */
- if (contain_mutable_functions(node))
- return true;
- break;
- case T_TargetEntry:
- case T_PlaceHolderVar:
- case T_AppendRelInfo:
- case T_PlaceHolderInfo:
- /* TODO: research whether those complex nodes are evaluatable. */
- return true;
- default:
- break;
- }
-
- return expression_tree_walker(node, foreign_qual_walker, context);
-}
-
-/*
- * Deparse the passed TupleDesc into SELECT clauses and append to the buffer
- * 'sql'.
- */
-static void
-deparseSelectClause(StringInfo sql, ForeignTable *table, TupleDesc tupdesc,
- const char *aliasname, bool prefix)
-{
- bool first;
- int i;
- const char *aliasname_q;
-
- /* The alias of relation is used in both SELECT clause and FROM clause. */
- aliasname_q = quote_identifier(aliasname);
-
- /* deparse SELECT clause */
- appendStringInfoString(sql, "SELECT ");
-
- /*
- * TODO: omit (deparse to "NULL") columns which are not used in the
- * original SQL.
- *
- * We must parse nodes parents of this ForeignScan node to determine unused
- * columns because some columns may be used only in parent Sort/Agg/Limit
- * nodes.
- */
- first = true;
- for (i = 0; i < tupdesc->natts; i++)
- {
- List *options;
- ListCell *lc;
- char *colname = NULL;
-
- /* skip dropped attributes */
- if (tupdesc->attrs[i]->attisdropped)
- continue;
-
- /* Determine column name to be used */
- options = GetGenericOptionsPerColumn(table->relid, i + 1);
- foreach (lc, options)
- {
- DefElem *def = (DefElem *) lfirst(lc);
- if (strcmp(def->defname, "colname") == 0)
- {
- colname = strVal(def->arg);
- break;
- }
- }
- if (!colname)
- colname = tupdesc->attrs[i]->attname.data;
-
- if (!first)
- appendStringInfoString(sql, ", ");
-
- if (prefix)
- appendStringInfo(sql, "%s.%s", aliasname_q, colname);
- else
- appendStringInfo(sql, "%s", colname);
-
- first = false;
- }
-
- /* Add oid system attribute if any on local side. */
- if (tupdesc->tdhasoid)
- {
- if (!first)
- appendStringInfo(sql, ", oid");
- else
- appendStringInfo(sql, "oid");
- first = false;
- }
-
- /* if target list is composed only of system attributes, add dummy column */
- if (first)
- appendStringInfoString(sql, "NULL");
-
- if (aliasname_q != aliasname)
- pfree((char *) aliasname_q);
-}
-
-/*
- * Deparse the passed information into FROM clauses and append to the buffer
- * 'sql'.
- */
-static void
-deparseFromClause(StringInfo sql, ForeignTable *table, const char *aliasname, bool prefix)
-{
- char *nspname = NULL;
- char *relname = NULL;
- const char *nspname_q;
- const char *relname_q;
- const char *aliasname_q;
- ListCell *lc;
-
- /* The alias of relation is used in both SELECT clause and FROM clause. */
- aliasname_q = quote_identifier(aliasname);
-
- /*
- * If the foreign table has generic option "nspname" and/or "relname", use
- * them in the foreign query. Otherwise, use local catalog names.
- * Each identifier should be quoted because they might be case sensitive.
- */
- foreach(lc, table->options)
- {
- DefElem *opt = lfirst(lc);
- if (strcmp(opt->defname, "nspname") == 0)
- nspname = pstrdup(strVal(opt->arg));
- else if (strcmp(opt->defname, "relname") == 0)
- relname = pstrdup(strVal(opt->arg));
- }
- if (nspname == NULL)
- nspname = get_namespace_name(get_rel_namespace(table->relid));
- if (relname == NULL)
- relname = get_rel_name(table->relid);
- nspname_q = quote_identifier(nspname);
- relname_q = quote_identifier(relname);
- appendStringInfo(sql, " FROM %s.%s %s", nspname_q, relname_q, aliasname_q);
- pfree(nspname);
- pfree(relname);
- if (nspname_q != nspname)
- pfree((char *) nspname_q);
- if (relname_q != relname)
- pfree((char * ) relname_q);
- if (aliasname_q != aliasname)
- pfree((char *) aliasname_q);
-}
-
-/*
- * Deparse query request into SQL statement.
- *
- * If an expression in PlanState.qual list satisfies is_foreign_qual(), the
- * expression is:
- * - deparsed into WHERE clause of remote SQL statement to evaluate that
- * expression on remote side
- * - removed from PlanState.qual list to avoid duplicate evaluation, on
- * remote side and local side
- */
-static char *
-deparseSql(ForeignScanState *scanstate)
-{
- EState *estate = scanstate->ss.ps.state;
- bool prefix;
- List *context;
- StringInfoData sql;
- ForeignScan *scan;
- RangeTblEntry *rte;
- ForeignTable *table = scanstate->table;
-
- /* extract ForeignScan and RangeTblEntry */
- scan = (ForeignScan *)scanstate->ss.ps.plan;
- rte = list_nth(estate->es_range_table, scan->scan.scanrelid - 1);
-
- /* prepare to deparse plan */
- initStringInfo(&sql);
- context = deparse_context_for_planstate((Node *)&scanstate->ss.ps, NULL,
- estate->es_range_table);
-
- /*
- * XXX: Prefix is set to false always because setting prefix to true makes
- * the SQL invalid when this is a child scan for an inherited table and qual
- * is not empty (need to generate WHERE clause). It might be needed to fix
- * deparse_expression() to deparse column references in the qual into
- * name of the child table, instead of name of the parent table, or table
- * alias.
- */
- prefix = false;
-
- /* deparse SELECT and FROM clauses */
- deparseSelectClause(&sql, table, scanstate->ss.ss_currentRelation->rd_att,
- rte->eref->aliasname, prefix);
- deparseFromClause(&sql, table, rte->eref->aliasname, prefix);
-
- /*
- * deparse WHERE cluase
- *
- * The expressions which satisfy is_foreign_qual() are deparsed into WHERE
- * clause of result SQL string, and they could be removed from qual of
- * PlanState to avoid duplicate evaluation at ExecScan().
- *
- * We never change the qual in the Plan node which was made by PREPARE
- * statement to make following EXECUTE statements work properly. The Plan
- * node is used repeatedly to create PlanState for each EXECUTE statement.
- */
- if (scanstate->ss.ps.plan->qual)
- {
- List *local_qual = NIL;
- List *foreign_qual = NIL;
- List *foreign_expr = NIL;
- ListCell *lc;
-
- /*
- * Divide qual of PlanState into two lists, one for local evaluation
- * and one for foreign evaluation.
- */
- foreach (lc, scanstate->ss.ps.qual)
- {
- ExprState *state = lfirst(lc);
-
- if (is_foreign_qual(state->expr))
- { foreign_qual = lappend(foreign_qual, state);
- foreign_expr = lappend(foreign_expr, state->expr);
- }
- else
- local_qual = lappend(local_qual, state);
- }
- /*
- * XXX: If the remote side is not reliable enough, we can keep the qual
- * in PlanState as is and evaluate them on local side too. If so, just
- * omit replacement below.
- */
- scanstate->ss.ps.qual = local_qual;
-
- /*
- * Deparse quals to be evaluated in the foreign server if any.
- * TODO: modify deparse_expression() to deparse conditions which use
- * internal parameters.
- */
- if (foreign_expr != NIL)
- {
- Node *node;
- node = (Node *) make_ands_explicit(foreign_expr);
- appendStringInfo(&sql, " WHERE %s",
- deparse_expression(node, context, prefix, false));
- /*
- * The contents of the list MUST NOT be free-ed because they are
- * referenced from Plan.qual list.
- */
- list_free(foreign_expr);
- }
- }
-
- elog(DEBUG1, "deparsed SQL is \"%s\"", sql.data);
-
- return sql.data;
-}
-
-/*
- * Deparse the request into SQL statement and keep it for future execution.
- *
- * XXX: deparsing should be done in pgEstimateCosts to estimate the costs by
- * executing EXPLAIN on remote side?
- */
-static void
-pgOpen(ForeignScanState *scanstate)
-{
- pgFdwReply *reply;
-
- elog(DEBUG3, "%s() called ", __FUNCTION__);
-
- /* FWD-specific portion */
- reply = (pgFdwReply *) palloc0(sizeof(*reply));
- reply->sql = deparseSql(scanstate);
- scanstate->reply = (FdwReply *) reply;
-}
-
-/*
- * Initiate actual scan on a foreign table.
- * This function is called just after pgOpen() if the ForeignScan was executed
- * for a real query or EXPLAIN statement with ANALYZE option.
- */
-static void
-pgBeginScan(ForeignScanState *scanstate)
-{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
- PGconn *conn = (PGconn *) scanstate->conn;
- PGresult *res;
- ParamListInfo info = scanstate->ss.ps.state->es_param_list_info;
- int numParams = info ? info->numParams : 0;
- Oid *types = NULL;
- const char **values = NULL;
-
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- /* construct parameter array in text format */
- /* TODO: omit unused parameter */
- if (numParams > 0)
- {
- int i;
-
- types = palloc0(sizeof(Oid) * numParams);
- values = palloc0(sizeof(char *) * numParams);
- for (i = 0; i < numParams; i++)
- {
- types[i] = info->params[i].ptype;
- if (info->params[i].isnull)
- values[i] = NULL;
- else
- {
- Oid out_func_oid;
- bool isvarlena;
- FmgrInfo func;
-
- /* TODO: cache FmgrInfo to use it again after pgReOpen() */
- /* TODO: send parameters in binary format rather than text */
- getTypeOutputInfo(types[i], &out_func_oid, &isvarlena);
- fmgr_info(out_func_oid, &func);
- values[i] =
- OutputFunctionCall(&func, info->params[i].value);
- }
- }
- }
-
- /*
- * Execute query with the parameters.
- * TODO: support internal parameters(PARAM_EXTERN)
- * TODO: support cursor mode for huge result sets.
- */
- res = PQexecParams(conn, reply->sql,
- numParams, types, values, NULL, NULL, 0);
- if (numParams > 0)
- {
- int i;
- pfree(types);
- for (i = 0; i < numParams; i++)
- pfree((char *) values[i]);
- pfree(values);
- }
-
- /*
- * If the query has failed, reporting details is enough here.
- * Connections which are used by this query (including other scans) will
- * be cleaned up by the foreign connection manager.
- */
- if (!res || PQresultStatus(res) != PGRES_TUPLES_OK)
- {
- char *msg;
-
- PQclear(res);
- msg = pstrdup(PQerrorMessage(conn));
- ereport(ERROR, (
- errmsg("could not execute foreign query"),
- errdetail("%s", msg), errhint("%s", reply->sql)));
- }
-
- /* Note: use PG_TRY to ensure freeing PGresult. */
- PG_TRY();
- {
- TupleDesc tupdesc = ExecGetScanType((ScanState *) scanstate);
-
- /* create tuplestore to store results */
- reply->tupstore = tuplestore_begin_heap(true, false, work_mem);
-
- storeResult(reply->tupstore, false, tupdesc, res);
-
- PQclear(res);
- }
- PG_CATCH();
- {
- PQclear(res);
- PG_RE_THROW();
- }
- PG_END_TRY();
-}
-
-/*
- * return tuples one by one.
- * - execute SQL statement which was deparsed in pgOpen()
- *
- * The all of result are fetched at once when pgIterate() is called first after
- * pgOpen() or pgReOpen().
- * pgIterate() moves the next tuple from tupstore to TupleTableSlot in
- * ScanState.
- */
-static void
-pgIterate(ForeignScanState *scanstate)
-{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
- TupleTableSlot *slot = scanstate->ss.ss_ScanTupleSlot;
-
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- /* store the next tuple into the slot from the tuplestore */
- if (tuplestore_gettupleslot(reply->tupstore, true, false, slot))
- {
- /*
- * Because the tuples stored in the tupstore are minimal tuples,
- * they have to be materialized to retrieve system attributes.
- */
- ExecMaterializeSlot(slot);
- }
- else
- {
- /* TODO: if cursor mode, reset tuple slot and fetch the next batch. */
- }
-}
-
-/*
- * Finish scanning foreign table and dispose objects used for this scan.
- */
-static void
-pgClose(ForeignScanState *scanstate)
-{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
-
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- if (reply == NULL)
- return;
-
- if (reply->tupstore != NULL)
- tuplestore_end(reply->tupstore);
-
- /*
- * reply->conn is not freed here because foreign connections are
- * released by executor via FreeFSConnection.
- */
- pfree(reply);
- scanstate->reply = NULL;
-}
-
-/*
- * Execute query with new parameter.
- */
-static void
-pgReOpen(ForeignScanState *scanstate)
-{
- pgFdwReply *reply = (pgFdwReply *) scanstate->reply;
-
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- /* Rewind tuplestore to retrieve all tuples again */
- if (reply->tupstore)
- tuplestore_rescan(reply->tupstore);
-}
-
-/*
- * Store a PGresult into tuplestore.
- */
-static void
-storeResult(Tuplestorestate *tupstore,
- bool is_sql_cmd,
- TupleDesc tupdesc,
- PGresult *res)
-{
- int i;
- int row;
- int ntuples;
- int nfields;
- int attnum; /* number of non-dropped columns */
- char **values;
- AttInMetadata *attinmeta;
- Form_pg_attribute *attrs;
-
- ntuples = PQntuples(res);
- nfields = is_sql_cmd ? 1 : PQnfields(res);
- attrs = tupdesc->attrs;
-
- /* count non-dropped columns */
- for (attnum = 0, i = 0; i < tupdesc->natts; i++)
- if (!attrs[i]->attisdropped)
- attnum++;
- if (tupdesc->tdhasoid)
- attnum++;
-
- /* check result and tuple descriptor have the same number of columns */
- if (attnum > 0 && attnum != nfields)
- ereport(ERROR,
- (errcode(ERRCODE_DATATYPE_MISMATCH),
- errmsg("remote query result rowtype does not match "
- "the specified FROM clause rowtype")));
-
- /* buffer should include dropped columns */
- values = palloc(sizeof(char *) * tupdesc->natts);
-
- /* put all tuples into the tuplestore */
- attinmeta = TupleDescGetAttInMetadata(tupdesc);
- for (row = 0; row < ntuples; row++)
- {
- int j;
- HeapTuple tuple;
- Oid oid = InvalidOid; /* oid of the tuple */
-
- CHECK_FOR_INTERRUPTS();
-
- if (!is_sql_cmd)
- {
- for (i = 0, j = 0; i < tupdesc->natts; i++)
- {
- /* skip dropped columns. */
- if (attrs[i]->attisdropped)
- {
- values[i] = NULL;
- continue;
- }
-
- if (PQgetisnull(res, row, j))
- values[i] = NULL;
- else
- values[i] = PQgetvalue(res, row, j);
- j++;
- }
-
- /* Get oid if any. Now j points the oid field of PGresult. */
- if (tupdesc->tdhasoid)
- {
- if (!PQgetisnull(res, row, j))
- {
- oid = DatumGetObjectId(DirectFunctionCall1(oidin,
- CStringGetDatum(PQgetvalue(res, row, j))));
- }
- }
- }
- else
- {
- values[0] = PQcmdStatus(res);
- }
-
- /* build the tuple, set oid if any, and put it into the tuplestore. */
- tuple = BuildTupleFromCStrings(attinmeta, values);
- if (tupdesc->tdhasoid)
- HeapTupleSetOid(tuple, oid);
- tuplestore_puttuple(tupstore, tuple);
- }
-
- /* clean up and return the tuplestore */
- tuplestore_donestoring(tupstore);
- pfree(values);
-}
-
-/*
- * Retrieve cost-factors of the foreign server from catalog.
- */
-static void
-get_server_costs(Oid relid, double *connection_cost, double *transfer_cost)
-{
- ForeignTable *table;
- ForeignServer *server;
- int n;
- const char **keywords;
- const char **values;
- int i;
-
- /*
- * Retrieve generic options from the target table and its server to correct
- * costs.
- */
- table = GetForeignTable(relid);
- server = GetForeignServer(table->serverid);
- n = list_length(table->options) + list_length(server->options) + 1;
- keywords = (const char **) palloc(sizeof(char *) * n);
- values = (const char **) palloc(sizeof(char *) * n);
- n = 0;
- n += flatten_generic_options(server->options, keywords + n, values + n);
- n += flatten_generic_options(table->options, keywords + n, values + n);
- keywords[n] = values[n] = NULL;
-
- for (i = 0; keywords[i]; i++)
- {
- if (pg_strcasecmp(keywords[i], "connection_cost") == 0)
- *connection_cost = strtod(values[i], NULL);
- else if (pg_strcasecmp(keywords[i], "transfer_cost") == 0)
- *transfer_cost = strtod(values[i], NULL);
- }
-
- pfree(keywords);
- pfree(values);
-}
-
-
-/*
- * Estimate costs of scanning on a foreign table.
- *
- * baserel->baserestrictinfo can be used to examine quals on the relation.
- */
-static void
-pgEstimateCosts(ForeignPath *path, PlannerInfo *root, RelOptInfo *baserel)
-{
- RangeTblEntry *rte;
- double connection_cost = 0.0;
- double transfer_cost = 0.0;
-
- elog(DEBUG3, "%s() called", __FUNCTION__);
-
- /*
- * Use cost_seqscan() to get approximate value.
- */
- cost_seqscan(&path->path, root, baserel);
-
- /* Get cost factor from catalog to correct costs. */
- rte = planner_rt_fetch(baserel->relid, root);
- get_server_costs(rte->relid, &connection_cost, &transfer_cost);
- path->path.startup_cost += connection_cost;
- path->path.total_cost += connection_cost;
- path->path.total_cost += transfer_cost *
- path->path.parent->width * path->path.parent->rows;
-}
-
-/* ============================================================================
- * Connection management functions
- * ==========================================================================*/
-
-/*
- * Connection cache entry managed with hash table.
- */
-typedef struct ConnCacheEntry
-{
- /* hash key must be first */
- char name[NAMEDATALEN]; /* connection name; used as hash key */
- int refs; /* reference counter */
- PGconn *conn; /* foreign server connection */
-} ConnCacheEntry;
-
-/*
- * Hash table which is used to cache connection to PostgreSQL servers, will be
- * initialized before first attempt to connect PostgreSQL server by the backend.
- */
-static HTAB *FSConnectionHash;
-
-/*
- * Get a PGconn which can be used to execute foreign query on the remote
- * PostgreSQL server with the user's authorization. If this was the first
- * request for the server, new connection is established.
- */
-static PGconn *
-GetConnection(ForeignServer *server, UserMapping *user)
-{
- const char *conname = server->servername;
- bool found;
- ConnCacheEntry *entry;
- PGconn *conn = NULL;
-
- /* initialize connection cache if it isn't */
- if (FSConnectionHash == NULL)
- {
- HASHCTL ctl;
-
- /* hash key is the name of the connection */
- MemSet(&ctl, 0, sizeof(ctl));
- ctl.keysize = NAMEDATALEN;
- ctl.entrysize = sizeof(ConnCacheEntry);
- /* allocate FSConnectionHash in the cache context */
- ctl.hcxt = CacheMemoryContext;
- FSConnectionHash = hash_create("Foreign Connections", 32,
- &ctl,
- HASH_ELEM | HASH_CONTEXT);
- }
-
- /* Is there any cached and valid connection with such name? */
- entry = hash_search(FSConnectionHash, conname, HASH_ENTER, &found);
- if (found)
- {
- if (entry->conn != NULL)
- {
- entry->refs++;
- elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
- return entry->conn;
- }
-
- /*
- * Connection cache entry was found but connection in it is invalid.
- * We reuse entry to store newly established connection later.
- */
- }
- else
- {
- /*
- * Use ResourceOner to clean the connection up on error including
- * user interrupt.
- */
- entry->refs = 0;
- entry->conn = NULL;
- RegisterResourceReleaseCallback(cleanup_connection, entry);
- }
-
- /*
- * Here we have to establish new connection.
- * Use PG_TRY block to ensure closing connection on error.
- */
- PG_TRY();
- {
- /* Connect to the foreign PostgreSQL server */
- conn = connect_pg_server(server, user);
-
- /*
- * Initialize the cache entry to keep new connection.
- * Note: entry->name has been initialized in hash_search(HASH_ENTER).
- */
- entry->refs = 1;
- entry->conn = conn;
- elog(DEBUG3, "connected to %s (%d)", entry->name, entry->refs);
- }
- PG_CATCH();
- {
- PQfinish(conn);
- entry->refs = 0;
- entry->conn = NULL;
- PG_RE_THROW();
- }
- PG_END_TRY();
-
- return conn;
-}
-
-/*
- * For non-superusers, insist that the connstr specify a password. This
- * prevents a password from being picked up from .pgpass, a service file,
- * the environment, etc. We don't want the postgres user's passwords
- * to be accessible to non-superusers.
- */
-static void
-check_conn_params(const char **keywords, const char **values)
-{
- int i;
-
- /* no check required if superuser */
- if (superuser())
- return;
-
- /* ok if params contain a non-empty password */
- for (i = 0; keywords[i] != NULL; i++)
- {
- if (strcmp(keywords[i], "password") == 0 && values[i][0] != '\0')
- return;
- }
-
- ereport(ERROR,
- (errcode(ERRCODE_S_R_E_PROHIBITED_SQL_STATEMENT_ATTEMPTED),
- errmsg("password is required"),
- errdetail("Non-superusers must provide a password in the connection string.")));
-}
-
-static PGconn *
-connect_pg_server(ForeignServer *server, UserMapping *user)
-{
- const char *conname = server->servername;
- PGconn *conn;
- const char **all_keywords;
- const char **all_values;
- const char **keywords;
- const char **values;
- int n;
- int i, j;
-
- /*
- * Construct connection params from generic options of ForeignServer and
- * UserMapping. Generic options might not be a one of connection options.
- */
- n = list_length(server->options) + list_length(user->options) + 1;
- all_keywords = (const char **) palloc(sizeof(char *) * n);
- all_values = (const char **) palloc(sizeof(char *) * n);
- keywords = (const char **) palloc(sizeof(char *) * n);
- values = (const char **) palloc(sizeof(char *) * n);
- n = 0;
- n += flatten_generic_options(server->options,
- all_keywords + n, all_values + n);
- n += flatten_generic_options(user->options,
- all_keywords + n, all_values + n);
- all_keywords[n] = all_values[n] = NULL;
-
- for (i = 0, j = 0; all_keywords[i]; i++)
- {
- /* Use only libpq connection options. */
- if (!is_libpq_connection_option(all_keywords[i]))
- continue;
- keywords[j] = all_keywords[i];
- values[j] = all_values[i];
- j++;
- }
- keywords[j] = values[j] = NULL;
- pfree(all_keywords);
- pfree(all_values);
-
- /* verify connection parameters and do connect */
- check_conn_params(keywords, values);
- conn = PQconnectdbParams(keywords, values, 0);
- if (!conn || PQstatus(conn) != CONNECTION_OK)
- ereport(ERROR,
- (errcode(ERRCODE_SQLCLIENT_UNABLE_TO_ESTABLISH_SQLCONNECTION),
- errmsg("could not connect to server \"%s\"", conname),
- errdetail("%s", PQerrorMessage(conn))));
- pfree(keywords);
- pfree(values);
-
- return conn;
-}
-
-/*
- * Mark the connection as "unused", and close it if the caller was the last
- * user of the connection.
- */
-static void
-ReleaseConnection(PGconn *conn)
-{
- HASH_SEQ_STATUS scan;
- ConnCacheEntry *entry;
-
- if (conn == NULL)
- return;
-
- /*
- * We need to scan seqencially since we use the address to find appropriate
- * PGconn from the hash table.
- */
- hash_seq_init(&scan, FSConnectionHash);
- while ((entry = (ConnCacheEntry *) hash_seq_search(&scan)))
- {
- if (entry->conn == conn)
- break;
- }
- hash_seq_term(&scan);
-
- /*
- * If the released connection was an orphan, just close it.
- */
- if (entry == NULL)
- {
- PQfinish(conn);
- return;
- }
-
- /* If the caller was the last referer, unregister it from cache. */
- entry->refs--;
- elog(DEBUG3, "ref %d for %s", entry->refs, entry->name);
- if (entry->refs == 0)
- {
- elog(DEBUG3, "closing connection \"%s\"", entry->name);
- PQfinish(entry->conn);
- entry->refs = 0;
- entry->conn = NULL;
- }
-}
-
-/*
- * Clean the connection up via ResourceOwner when pgClose couldn't close the
- * connection gracefully.
- */
-static void
-cleanup_connection(ResourceReleasePhase phase,
- bool isCommit,
- bool isTopLevel,
- void *arg)
-{
- ConnCacheEntry *entry = (ConnCacheEntry *) arg;
-
- /*
- * If the transaction was committed, the connection has been closed via
- * pgClose() and ReleaseConnection().
- */
- if (isCommit)
- return;
-
- /*
- * We clean the connection up on post-lock because foreign connections are
- * backend-internal resource.
- */
- if (phase != RESOURCE_RELEASE_AFTER_LOCKS)
- return;
-
- /*
- * We ignore cleanup for ResourceOwners other than transaction. At this
- * point, such a ResourceOwner is only Portal.
- */
- if (CurrentResourceOwner != CurTransactionResourceOwner)
- return;
-
- /*
- * We don't care whether we are in TopTransaction or Subtransaction.
- * Anyway, we close the connection and reset the reference counter.
- */
- if (entry->conn != NULL)
- {
- elog(DEBUG3, "closing connection to %s", entry->name);
- PQfinish(entry->conn);
- entry->refs = 0;
- entry->conn = NULL;
- }
- else
- elog(DEBUG3, "connection to %s already closed", entry->name);
-}
/*
* Describes the valid options for postgresql FDW, server, and user mapping.
*/
-struct PgFdwOption
+struct ConnectionOption
{
const char *optname;
Oid optcontext; /* Oid of catalog in which option may appear */
- bool is_conn_opt; /* True if the option is a connection option */
};
/*
- * Valid options for postgresql_fdw.
- * Connection options are copied from fe-connect.c PQconninfoOptions.
+ * Copied from fe-connect.c PQconninfoOptions.
+ *
* The list is small - don't bother with bsearch if it stays so.
*/
-static struct PgFdwOption valid_options[] = {
- /* Connection Options */
- {"authtype", ForeignServerRelationId, true},
- {"service", ForeignServerRelationId, true},
- {"user", UserMappingRelationId, true},
- {"password", UserMappingRelationId, true},
- {"connect_timeout", ForeignServerRelationId, true},
- {"dbname", ForeignServerRelationId, true},
- {"host", ForeignServerRelationId, true},
- {"hostaddr", ForeignServerRelationId, true},
- {"port", ForeignServerRelationId, true},
- {"tty", ForeignServerRelationId, true},
- {"options", ForeignServerRelationId, true},
- {"requiressl", ForeignServerRelationId, true},
- {"sslmode", ForeignServerRelationId, true},
- {"gsslib", ForeignServerRelationId, true},
-
- /* Catalog options */
- {"nspname", ForeignTableRelationId, false},
- {"relname", ForeignTableRelationId, false},
- {"colname", AttributeRelationId, false},
-
- /* Planner cost options */
- {"connection_cost", ForeignServerRelationId, false},
- {"transfer_cost", ForeignServerRelationId, false},
-
- /* Centinel */
- {NULL, InvalidOid, false}
+static struct ConnectionOption libpq_conninfo_options[] = {
+ {"authtype", ForeignServerRelationId},
+ {"service", ForeignServerRelationId},
+ {"user", UserMappingRelationId},
+ {"password", UserMappingRelationId},
+ {"connect_timeout", ForeignServerRelationId},
+ {"dbname", ForeignServerRelationId},
+ {"host", ForeignServerRelationId},
+ {"hostaddr", ForeignServerRelationId},
+ {"port", ForeignServerRelationId},
+ {"tty", ForeignServerRelationId},
+ {"options", ForeignServerRelationId},
+ {"requiressl", ForeignServerRelationId},
+ {"sslmode", ForeignServerRelationId},
+ {"gsslib", ForeignServerRelationId},
+ {NULL, InvalidOid}
};
+
/*
- * Check if the provided option is one of valid options.
+ * Check if the provided option is one of libpq conninfo options.
* context is the Oid of the catalog the option came from, or 0 if we
* don't care.
*/
static bool
-is_valid_option(const char *option, Oid context)
+is_conninfo_option(const char *option, Oid context)
{
- struct PgFdwOption *opt;
+ struct ConnectionOption *opt;
- for (opt = valid_options; opt->optname; opt++)
+ for (opt = libpq_conninfo_options; opt->optname; opt++)
if (context == opt->optcontext && strcmp(opt->optname, option) == 0)
return true;
return false;
}
-/*
- * Check if the provided option is one of libpq conninfo options.
- * XXX: Should be moved to interface/libpq or backend/libpq?
- */
-bool
-is_libpq_connection_option(const char *option)
-{
- struct PgFdwOption *opt;
-
- for (opt = valid_options; opt->optname; opt++)
- if (opt->is_conn_opt && strcmp(opt->optname, option) == 0)
- return true;
- return false;
-}
/*
- * Validate the generic option given to FOREIGN DATA WRAPPER, SERVER, USER
- * MAPPING or FOREIGN TABLE.
+ * Validate the generic option given to SERVER or USER MAPPING.
* Raise an ERROR if the option or its value is considered
* invalid.
*
{
DefElem *def = lfirst(cell);
- if (!is_valid_option(def->defname, catalog))
+ if (!is_conninfo_option(def->defname, catalog))
{
- struct PgFdwOption *opt;
+ struct ConnectionOption *opt;
StringInfoData buf;
/*
* with list of valid options for the object.
*/
initStringInfo(&buf);
- for (opt = valid_options; opt->optname; opt++)
+ for (opt = libpq_conninfo_options; opt->optname; opt++)
if (catalog == opt->optcontext)
appendStringInfo(&buf, "%s%s", (buf.len > 0) ? ", " : "",
opt->optname);