Make view/rule permission checking behave properly with
authorTom Lane <[email protected]>
Sun, 9 Jul 2000 04:56:32 +0000 (04:56 +0000)
committerTom Lane <[email protected]>
Sun, 9 Jul 2000 04:56:32 +0000 (04:56 +0000)
subqueries in the rule.

src/backend/rewrite/locks.c

index 57cce96d901fad10d30e114433d1924fc0c77424..5054b215438ebd79ef28fedaa4c891cf5f20e685 100644 (file)
@@ -7,7 +7,7 @@
  *
  *
  * IDENTIFICATION
- *       $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.29 2000/05/30 00:49:51 momjian Exp $
+ *       $Header: /cvsroot/pgsql/src/backend/rewrite/Attic/locks.c,v 1.30 2000/07/09 04:56:32 tgl Exp $
  *
  *-------------------------------------------------------------------------
  */
@@ -17,9 +17,9 @@
 #include "catalog/pg_shadow.h"
 #include "optimizer/clauses.h"
 #include "rewrite/locks.h"
+#include "parser/parsetree.h"
 #include "utils/acl.h"
 #include "utils/syscache.h"
-#include "utils/syscache.h"
 
 
 /*
@@ -152,39 +152,148 @@ matchLocks(CmdType event,
 }
 
 
+/*
+ * Check the access permissions of tables that are referred to by a rule.
+ * We want to check the access permissions using the userid of the rule's
+ * owner, *not* of the current user (the one accessing the rule).  So, we
+ * do the permission check here and set skipAcl = TRUE in each of the rule's
+ * RTEs, to prevent the executor from running another check with the current
+ * user's ID.
+ *
+ * XXX This routine is called before the rule's query tree has been copied
+ * out of the relcache entry where it is kept.  Therefore, when we set
+ * skipAcl = TRUE, we are destructively modifying the relcache entry for
+ * the event relation!  This seems fairly harmless because the relcache
+ * querytree is only used as a source for the rewriter, but it's a tad
+ * unclean anyway.
+ *
+ * Note that we must check permissions every time, even if skipAcl was
+ * already set TRUE by a prior call.  This ensures that we enforce the
+ * current permission settings for each referenced table, even if they
+ * have changed since the relcache entry was loaded.
+ */
+
+typedef struct
+{
+       char       *evowner;
+} checkLockPerms_context;
+
+static bool
+checkLockPerms_walker(Node *node,
+                                         checkLockPerms_context *context)
+{
+       if (node == NULL)
+               return false;
+       if (IsA(node, SubLink))
+       {
+               /*
+                * Standard expression_tree_walker will not recurse into
+                * subselect, but here we must do so.
+                */
+               SubLink    *sub = (SubLink *) node;
+
+               if (checkLockPerms_walker((Node *) (sub->lefthand), context))
+                       return true;
+               if (checkLockPerms_walker((Node *) (sub->subselect), context))
+                       return true;
+               return false;
+       }
+       if (IsA(node, Query))
+       {
+               /* Reach here after recursing down into subselect above... */
+               Query      *qry = (Query *) node;
+               int                     rtablength = length(qry->rtable);
+               int                     i;
+
+               /* Check all the RTEs in this query node, except OLD and NEW */
+               for (i = 1; i <= rtablength; i++)
+               {
+                       RangeTblEntry *rte = rt_fetch(i, qry->rtable);
+                       int32           reqperm;
+                       int32           aclcheck_res;
+
+                       if (rte->ref != NULL)
+                       {
+                               if (strcmp(rte->ref->relname, "*NEW*") == 0)
+                                       continue;
+                               if (strcmp(rte->ref->relname, "*OLD*") == 0)
+                                       continue;
+                       }
+
+                       if (i == qry->resultRelation)
+                               switch (qry->commandType)
+                               {
+                                       case CMD_INSERT:
+                                               reqperm = ACL_AP;
+                                               break;
+                                       default:
+                                               reqperm = ACL_WR;
+                                               break;
+                               }
+                       else
+                               reqperm = ACL_RD;
+
+                       aclcheck_res = pg_aclcheck(rte->relname,
+                                                                          context->evowner,
+                                                                          reqperm);
+                       if (aclcheck_res != ACLCHECK_OK)
+                               elog(ERROR, "%s: %s",
+                                        rte->relname,
+                                        aclcheck_error_strings[aclcheck_res]);
+
+                       /*
+                        * Mark RTE to prevent executor from checking again with the
+                        * current user's ID...
+                        */
+                       rte->skipAcl = true;
+               }
+
+               /* If there are sublinks, search for them and check their RTEs */
+               if (qry->hasSubLinks)
+               {
+                       if (checkLockPerms_walker((Node *) (qry->targetList), context))
+                               return true;
+                       if (checkLockPerms_walker((Node *) (qry->qual), context))
+                               return true;
+                       if (checkLockPerms_walker((Node *) (qry->havingQual), context))
+                               return true;
+               }
+               return false;
+       }
+       return expression_tree_walker(node, checkLockPerms_walker,
+                                                                 (void *) context);
+}
+
 void
 checkLockPerms(List *locks, Query *parsetree, int rt_index)
 {
+       RangeTblEntry *rte;
        Relation        ev_rel;
        HeapTuple       usertup;
-       char       *evowner;
-       RangeTblEntry *rte;
-       int32           reqperm;
-       int32           aclcheck_res;
-       int                     i;
+       Form_pg_shadow userform;
+       checkLockPerms_context context;
        List       *l;
 
        if (locks == NIL)
-               return;
+               return;                                 /* nothing to check */
 
        /*
-        * Get the usename of the rules event relation owner
+        * Get the usename of the rule's event relation owner
         */
-       rte = (RangeTblEntry *) nth(rt_index - 1, parsetree->rtable);
+       rte = rt_fetch(rt_index, parsetree->rtable);
        ev_rel = heap_openr(rte->relname, AccessShareLock);
        usertup = SearchSysCacheTuple(SHADOWSYSID,
                                                          ObjectIdGetDatum(ev_rel->rd_rel->relowner),
                                                                  0, 0, 0);
        if (!HeapTupleIsValid(usertup))
-       {
                elog(ERROR, "cache lookup for userid %d failed",
                         ev_rel->rd_rel->relowner);
-       }
+       userform = (Form_pg_shadow) GETSTRUCT(usertup);
+       context.evowner = pstrdup(NameStr(userform->usename));
        heap_close(ev_rel, AccessShareLock);
-       evowner = pstrdup(NameStr(((Form_pg_shadow) GETSTRUCT(usertup))->usename));
 
        /*
-        * Check all the locks, that should get fired on this query
+        * Check all the locks that should get fired on this query
         */
        foreach(l, locks)
        {
@@ -192,53 +301,14 @@ checkLockPerms(List *locks, Query *parsetree, int rt_index)
                List       *action;
 
                /*
-                * In each lock check every action
+                * In each lock check every action.  We must scan the action
+                * recursively in case there are any sub-queries within it.
                 */
                foreach(action, onelock->actions)
                {
                        Query      *query = (Query *) lfirst(action);
 
-                       /*
-                        * In each action check every rangetable entry for read/write
-                        * permission of the event relations owner depending on if
-                        * it's the result relation (write) or not (read)
-                        */
-                       for (i = 2; i < length(query->rtable); i++)
-                       {
-                               if (i + 1 == query->resultRelation)
-                                       switch (query->resultRelation)
-                                       {
-                                               case CMD_INSERT:
-                                                       reqperm = ACL_AP;
-                                                       break;
-                                               default:
-                                                       reqperm = ACL_WR;
-                                                       break;
-                                       }
-                               else
-                                       reqperm = ACL_RD;
-
-                               rte = (RangeTblEntry *) nth(i, query->rtable);
-                               aclcheck_res = pg_aclcheck(rte->relname,
-                                                                                  evowner, reqperm);
-                               if (aclcheck_res != ACLCHECK_OK)
-                               {
-                                       elog(ERROR, "%s: %s",
-                                                rte->relname,
-                                                aclcheck_error_strings[aclcheck_res]);
-                               }
-
-                               /*
-                                * So this is allowed due to the permissions of the rules
-                                * event relation owner. But let's see if the next one too
-                                */
-                               rte->skipAcl = TRUE;
-                       }
+                       checkLockPerms_walker((Node *) query, &context);
                }
        }
-
-       /*
-        * Phew, that was close
-        */
-       return;
 }