*
  *
  * IDENTIFICATION
- *   $Header: /cvsroot/pgsql/src/backend/catalog/aclchk.c,v 1.90 2003/10/29 22:20:54 tgl Exp $
+ *   $Header: /cvsroot/pgsql/src/backend/catalog/aclchk.c,v 1.91 2003/10/31 20:00:49 tgl Exp $
  *
  * NOTES
  *   See acl.h.
 #endif   /* ACLDEBUG */
 
 
+/*
+ * Determine the effective grantor ID for a GRANT or REVOKE operation.
+ *
+ * Ordinarily this is just the current user, but when a superuser does
+ * GRANT or REVOKE, we pretend he is the object owner.  This ensures that
+ * all granted privileges appear to flow from the object owner, and there
+ * are never multiple "original sources" of a privilege.
+ */
+static AclId
+select_grantor(AclId ownerId)
+{
+   AclId       grantorId;
+
+   grantorId = GetUserId();
+
+   /* fast path if no difference */
+   if (grantorId == ownerId)
+       return grantorId;
+
+   if (superuser())
+       grantorId = ownerId;
+
+   return grantorId;
+}
+
+
 /*
  * If is_grant is true, adds the given privileges for the list of
  * grantees to the existing old_acl.  If is_grant is false, the
  */
 static Acl *
 merge_acl_with_grant(Acl *old_acl, bool is_grant,
-                    List *grantees, AclMode privileges,
                     bool grant_option, DropBehavior behavior,
-                    AclId owner_uid)
+                    List *grantees, AclMode privileges,
+                    AclId grantor_uid, AclId owner_uid)
 {
    unsigned    modechg;
    List       *j;
         * and later the user is removed from the group, the situation is
         * impossible to clean up.
         */
-       if (is_grant && idtype != ACL_IDTYPE_UID && grant_option)
+       if (is_grant && grant_option && idtype != ACL_IDTYPE_UID)
            ereport(ERROR,
                    (errcode(ERRCODE_INVALID_GRANT_OPERATION),
                     errmsg("grant options can only be granted to individual users")));
                    (errcode(ERRCODE_INVALID_GRANT_OPERATION),
                     errmsg("cannot revoke grant options from owner")));
 
-       aclitem.ai_grantor = GetUserId();
+       aclitem.ai_grantor = grantor_uid;
 
        ACLITEM_SET_PRIVS_IDTYPE(aclitem,
                                 (is_grant || !grant_option) ? privileges : ACL_NO_RIGHTS,
        bool        isNull;
        Acl        *old_acl;
        Acl        *new_acl;
+       AclId       grantorId;
+       AclId       ownerId;
        HeapTuple   newtuple;
        Datum       values[Natts_pg_class];
        char        nulls[Natts_pg_class];
            elog(ERROR, "cache lookup failed for relation %u", relOid);
        pg_class_tuple = (Form_pg_class) GETSTRUCT(tuple);
 
+       ownerId = pg_class_tuple->relowner;
+       grantorId = select_grantor(ownerId);
+
        if (stmt->is_grant
            && !pg_class_ownercheck(relOid, GetUserId())
-           && pg_class_aclcheck(relOid, GetUserId(), ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
+           && pg_class_aclcheck(relOid, GetUserId(),
+                                ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_CLASS, relvar->relname);
 
        /* Not sensible to grant on an index */
                            relvar->relname)));
 
        /*
-        * If there's no ACL, create a default using the pg_class.relowner
-        * field.
+        * If there's no ACL, substitute the proper default.
         */
        aclDatum = SysCacheGetAttr(RELOID, tuple, Anum_pg_class_relacl,
                                   &isNull);
        if (isNull)
-           old_acl = acldefault(ACL_OBJECT_RELATION,
-                                pg_class_tuple->relowner);
+           old_acl = acldefault(ACL_OBJECT_RELATION, ownerId);
        else
            /* get a detoasted copy of the ACL */
            old_acl = DatumGetAclPCopy(aclDatum);
 
        new_acl = merge_acl_with_grant(old_acl, stmt->is_grant,
-                                      stmt->grantees, privileges,
                                       stmt->grant_option, stmt->behavior,
-                                      pg_class_tuple->relowner);
+                                      stmt->grantees, privileges,
+                                      grantorId, ownerId);
 
        /* finished building new ACL value, now insert it */
        MemSet(values, 0, sizeof(values));
        bool        isNull;
        Acl        *old_acl;
        Acl        *new_acl;
+       AclId       grantorId;
+       AclId       ownerId;
        HeapTuple   newtuple;
        Datum       values[Natts_pg_database];
        char        nulls[Natts_pg_database];
                     errmsg("database \"%s\" does not exist", dbname)));
        pg_database_tuple = (Form_pg_database) GETSTRUCT(tuple);
 
+       ownerId = pg_database_tuple->datdba;
+       grantorId = select_grantor(ownerId);
+
        if (stmt->is_grant
-           && pg_database_tuple->datdba != GetUserId()
-           && pg_database_aclcheck(HeapTupleGetOid(tuple), GetUserId(), ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
+           && !pg_database_ownercheck(HeapTupleGetOid(tuple), GetUserId())
+           && pg_database_aclcheck(HeapTupleGetOid(tuple), GetUserId(),
+                                   ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_DATABASE,
                           NameStr(pg_database_tuple->datname));
 
        /*
-        * If there's no ACL, create a default.
+        * If there's no ACL, substitute the proper default.
         */
        aclDatum = heap_getattr(tuple, Anum_pg_database_datacl,
                                RelationGetDescr(relation), &isNull);
        if (isNull)
-           old_acl = acldefault(ACL_OBJECT_DATABASE,
-                                pg_database_tuple->datdba);
+           old_acl = acldefault(ACL_OBJECT_DATABASE, ownerId);
        else
            /* get a detoasted copy of the ACL */
            old_acl = DatumGetAclPCopy(aclDatum);
 
        new_acl = merge_acl_with_grant(old_acl, stmt->is_grant,
-                                      stmt->grantees, privileges,
                                       stmt->grant_option, stmt->behavior,
-                                      pg_database_tuple->datdba);
+                                      stmt->grantees, privileges,
+                                      grantorId, ownerId);
 
        /* finished building new ACL value, now insert it */
        MemSet(values, 0, sizeof(values));
        bool        isNull;
        Acl        *old_acl;
        Acl        *new_acl;
+       AclId       grantorId;
+       AclId       ownerId;
        HeapTuple   newtuple;
        Datum       values[Natts_pg_proc];
        char        nulls[Natts_pg_proc];
            elog(ERROR, "cache lookup failed for function %u", oid);
        pg_proc_tuple = (Form_pg_proc) GETSTRUCT(tuple);
 
+       ownerId = pg_proc_tuple->proowner;
+       grantorId = select_grantor(ownerId);
+
        if (stmt->is_grant
            && !pg_proc_ownercheck(oid, GetUserId())
-           && pg_proc_aclcheck(oid, GetUserId(), ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
+           && pg_proc_aclcheck(oid, GetUserId(),
+                               ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_PROC,
                           NameStr(pg_proc_tuple->proname));
 
        /*
-        * If there's no ACL, create a default using the pg_proc.proowner
-        * field.
+        * If there's no ACL, substitute the proper default.
         */
        aclDatum = SysCacheGetAttr(PROCOID, tuple, Anum_pg_proc_proacl,
                                   &isNull);
        if (isNull)
-           old_acl = acldefault(ACL_OBJECT_FUNCTION,
-                                pg_proc_tuple->proowner);
+           old_acl = acldefault(ACL_OBJECT_FUNCTION, ownerId);
        else
            /* get a detoasted copy of the ACL */
            old_acl = DatumGetAclPCopy(aclDatum);
 
        new_acl = merge_acl_with_grant(old_acl, stmt->is_grant,
-                                      stmt->grantees, privileges,
                                       stmt->grant_option, stmt->behavior,
-                                      pg_proc_tuple->proowner);
+                                      stmt->grantees, privileges,
+                                      grantorId, ownerId);
 
        /* finished building new ACL value, now insert it */
        MemSet(values, 0, sizeof(values));
        bool        isNull;
        Acl        *old_acl;
        Acl        *new_acl;
+       AclId       grantorId;
+       AclId       ownerId;
        HeapTuple   newtuple;
        Datum       values[Natts_pg_language];
        char        nulls[Natts_pg_language];
                     errmsg("language \"%s\" does not exist", langname)));
        pg_language_tuple = (Form_pg_language) GETSTRUCT(tuple);
 
-       if (!pg_language_tuple->lanpltrusted && stmt->is_grant)
-           ereport(ERROR,
-                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
-                    errmsg("language \"%s\" is not trusted", langname)));
+       /*
+        * Note: for now, languages are treated as owned by the bootstrap
+        * user.  We should add an owner column to pg_language instead.
+        */
+       ownerId = BOOTSTRAP_USESYSID;
+       grantorId = select_grantor(ownerId);
 
        if (stmt->is_grant
-           && !superuser()
-           && pg_language_aclcheck(HeapTupleGetOid(tuple), GetUserId(), ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
+           && !superuser()     /* XXX no ownercheck() available */
+           && pg_language_aclcheck(HeapTupleGetOid(tuple), GetUserId(),
+                                   ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_LANGUAGE,
                           NameStr(pg_language_tuple->lanname));
 
+       if (!pg_language_tuple->lanpltrusted && stmt->is_grant)
+           ereport(ERROR,
+                   (errcode(ERRCODE_WRONG_OBJECT_TYPE),
+                    errmsg("language \"%s\" is not trusted", langname)));
+
        /*
-        * If there's no ACL, create a default.
-        *
-        * Note: for now, languages are treated as owned by the bootstrap
-        * user.  We should add an owner column to pg_language instead.
+        * If there's no ACL, substitute the proper default.
         */
        aclDatum = SysCacheGetAttr(LANGNAME, tuple, Anum_pg_language_lanacl,
                                   &isNull);
        if (isNull)
-           old_acl = acldefault(ACL_OBJECT_LANGUAGE,
-                                BOOTSTRAP_USESYSID);
+           old_acl = acldefault(ACL_OBJECT_LANGUAGE, ownerId);
        else
            /* get a detoasted copy of the ACL */
            old_acl = DatumGetAclPCopy(aclDatum);
 
        new_acl = merge_acl_with_grant(old_acl, stmt->is_grant,
-                                      stmt->grantees, privileges,
                                       stmt->grant_option, stmt->behavior,
-                                      BOOTSTRAP_USESYSID);
+                                      stmt->grantees, privileges,
+                                      grantorId, ownerId);
 
        /* finished building new ACL value, now insert it */
        MemSet(values, 0, sizeof(values));
        bool        isNull;
        Acl        *old_acl;
        Acl        *new_acl;
+       AclId       grantorId;
+       AclId       ownerId;
        HeapTuple   newtuple;
        Datum       values[Natts_pg_namespace];
        char        nulls[Natts_pg_namespace];
                     errmsg("schema \"%s\" does not exist", nspname)));
        pg_namespace_tuple = (Form_pg_namespace) GETSTRUCT(tuple);
 
+       ownerId = pg_namespace_tuple->nspowner;
+       grantorId = select_grantor(ownerId);
+
        if (stmt->is_grant
-        && !pg_namespace_ownercheck(HeapTupleGetOid(tuple), GetUserId())
-           && pg_namespace_aclcheck(HeapTupleGetOid(tuple), GetUserId(), ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
+           && !pg_namespace_ownercheck(HeapTupleGetOid(tuple), GetUserId())
+           && pg_namespace_aclcheck(HeapTupleGetOid(tuple), GetUserId(),
+                                    ACL_GRANT_OPTION_FOR(privileges)) != ACLCHECK_OK)
            aclcheck_error(ACLCHECK_NO_PRIV, ACL_KIND_NAMESPACE,
                           nspname);
 
        /*
-        * If there's no ACL, create a default using the
-        * pg_namespace.nspowner field.
+        * If there's no ACL, substitute the proper default.
         */
        aclDatum = SysCacheGetAttr(NAMESPACENAME, tuple,
                                   Anum_pg_namespace_nspacl,
                                   &isNull);
        if (isNull)
-           old_acl = acldefault(ACL_OBJECT_NAMESPACE,
-                                pg_namespace_tuple->nspowner);
+           old_acl = acldefault(ACL_OBJECT_NAMESPACE, ownerId);
        else
            /* get a detoasted copy of the ACL */
            old_acl = DatumGetAclPCopy(aclDatum);
 
        new_acl = merge_acl_with_grant(old_acl, stmt->is_grant,
-                                      stmt->grantees, privileges,
                                       stmt->grant_option, stmt->behavior,
-                                      pg_namespace_tuple->nspowner);
+                                      stmt->grantees, privileges,
+                                      grantorId, ownerId);
 
        /* finished building new ACL value, now insert it */
        MemSet(values, 0, sizeof(values));
                               &isNull);
    if (isNull)
    {
-       /* No ACL, so build default ACL for rel */
+       /* No ACL, so build default ACL */
        AclId       ownerId;
 
        ownerId = ((Form_pg_class) GETSTRUCT(tuple))->relowner;