Introduce the 'force' option for the Drop Database command.
authorAmit Kapila <[email protected]>
Tue, 12 Nov 2019 05:36:13 +0000 (11:06 +0530)
committerAmit Kapila <[email protected]>
Wed, 13 Nov 2019 02:55:33 +0000 (08:25 +0530)
This new option terminates the other sessions connected to the target
database and then drop it.  To terminate other sessions, the current user
must have desired permissions (same as pg_terminate_backend()).  We don't
allow to terminate the sessions if prepared transactions, active logical
replication slots or subscriptions are present in the target database.

Author: Pavel Stehule with changes by me
Reviewed-by: Dilip Kumar, Vignesh C, Ibrar Ahmed, Anthony Nowocien,
Ryan Lambert and Amit Kapila
Discussion: https://postgr.es/m/CAP_rwwmLJJbn70vLOZFpxGw3XD7nLB_7+NKz46H5EOO2k5H7OQ@mail.gmail.com

13 files changed:
doc/src/sgml/ref/drop_database.sgml
src/backend/commands/dbcommands.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/equalfuncs.c
src/backend/parser/gram.y
src/backend/storage/ipc/procarray.c
src/backend/tcop/utility.c
src/bin/psql/tab-complete.c
src/include/commands/dbcommands.h
src/include/nodes/parsenodes.h
src/include/storage/procarray.h
src/test/regress/expected/drop_if_exists.out
src/test/regress/sql/drop_if_exists.sql

index 3ac06c984a3a40a5e76b8c589296d3e6c5224f25..cef1b904219fccf401140fe037a6473b12506057 100644 (file)
@@ -21,7 +21,11 @@ PostgreSQL documentation
 
  <refsynopsisdiv>
 <synopsis>
-DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
+DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable> [ [ WITH ] ( <replaceable class="parameter">option</replaceable> [, ...] ) ] 
+
+<phrase>where <replaceable class="parameter">option</replaceable> can be:</phrase>
+
+    FORCE
 </synopsis>
  </refsynopsisdiv>
 
@@ -32,9 +36,11 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
    <command>DROP DATABASE</command> drops a database. It removes the
    catalog entries for the database and deletes the directory
    containing the data.  It can only be executed by the database owner.
-   Also, it cannot be executed while you or anyone else are connected
-   to the target database.  (Connect to <literal>postgres</literal> or any
-   other database to issue this command.)
+   It cannot be executed while you are connected to the target database.
+   (Connect to <literal>postgres</literal> or any other database to issue this
+   command.)
+   Also, if anyone else is connected to the target database, this command will
+   fail unless you use the <literal>FORCE</literal> option described below.
   </para>
 
   <para>
@@ -64,6 +70,25 @@ DROP DATABASE [ IF EXISTS ] <replaceable class="parameter">name</replaceable>
      </para>
     </listitem>
    </varlistentry>
+
+   <varlistentry>
+    <term><literal>FORCE</literal></term>
+    <listitem>
+     <para>
+      Attempt to terminate all existing connections to the target database.
+      It doesn't terminate if prepared transactions, active logical replication
+      slots or subscriptions are present in the target database.
+     </para>
+     <para>
+      This will fail if the current user has no permissions to terminate other
+      connections. Required permissions are the same as with 
+      <literal>pg_terminate_backend</literal>, described in
+      <xref linkend="functions-admin-signal"/>.  This will also fail if we
+      are not able to terminate connections.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect1>
 
index 6f28859f730639da33b78b4c8103eb8e7249ee5d..446813f0f0b2a0530c0bdc9b0d36585da9019ab1 100644 (file)
@@ -810,7 +810,7 @@ createdb_failure_callback(int code, Datum arg)
  * DROP DATABASE
  */
 void
-dropdb(const char *dbname, bool missing_ok)
+dropdb(const char *dbname, bool missing_ok, bool force)
 {
        Oid                     db_id;
        bool            db_istemplate;
@@ -910,6 +910,14 @@ dropdb(const char *dbname, bool missing_ok)
                                                                  "There are %d subscriptions.",
                                                                  nsubscriptions, nsubscriptions)));
 
+
+       /*
+        * Attempt to terminate all existing connections to the target database if
+        * the user has requested to do so.
+        */
+       if (force)
+               TerminateOtherDBBackends(db_id);
+
        /*
         * Check for other backends in the target database.  (Because we hold the
         * database lock, no new ones can start after this.)
@@ -1430,6 +1438,30 @@ movedb_failure_callback(int code, Datum arg)
        (void) rmtree(dstpath, true);
 }
 
+/*
+ * Process options and call dropdb function.
+ */
+void
+DropDatabase(ParseState *pstate, DropdbStmt *stmt)
+{
+       bool            force = false;
+       ListCell   *lc;
+
+       foreach(lc, stmt->options)
+       {
+               DefElem    *opt = (DefElem *) lfirst(lc);
+
+               if (strcmp(opt->defname, "force") == 0)
+                       force = true;
+               else
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_SYNTAX_ERROR),
+                                        errmsg("unrecognized DROP DATABASE option \"%s\"", opt->defname),
+                                        parser_errposition(pstate, opt->location)));
+       }
+
+       dropdb(stmt->dbname, stmt->missing_ok, force);
+}
 
 /*
  * ALTER DATABASE name ...
index 3432bb921dd2d116623c55929ca76d1209e0876f..2f267e4bb6538085113608f009cc03698b692078 100644 (file)
@@ -3868,6 +3868,7 @@ _copyDropdbStmt(const DropdbStmt *from)
 
        COPY_STRING_FIELD(dbname);
        COPY_SCALAR_FIELD(missing_ok);
+       COPY_NODE_FIELD(options);
 
        return newnode;
 }
index 18cb0143733c026ce7ac6a28f5bfad9fa6afe1b4..da0e1d139acbf2e566d5745250eebe654d335d4d 100644 (file)
@@ -1676,6 +1676,7 @@ _equalDropdbStmt(const DropdbStmt *a, const DropdbStmt *b)
 {
        COMPARE_STRING_FIELD(dbname);
        COMPARE_SCALAR_FIELD(missing_ok);
+       COMPARE_NODE_FIELD(options);
 
        return true;
 }
index 3f67aaf30eae9ff9c104f9a7a0e5df206d272e84..2f7bd662e8a8a91d80165252ba6d07fbc13ddc0e 100644 (file)
@@ -310,6 +310,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
 %type <defelt> vac_analyze_option_elem
 %type <list>   vac_analyze_option_list
 %type <node>   vac_analyze_option_arg
+%type <defelt> drop_option
 %type <boolean>        opt_or_replace
                                opt_grant_grant_option opt_grant_admin_option
                                opt_nowait opt_if_exists opt_with_data
@@ -406,6 +407,7 @@ static Node *makeRecursiveViewSelect(char *relname, List *aliases, Node *query);
                                TriggerTransitions TriggerReferencing
                                publication_name_list
                                vacuum_relation_list opt_vacuum_relation_list
+                               drop_option_list
 
 %type <list>   group_by_list
 %type <node>   group_by_item empty_grouping_set rollup_clause cube_clause
@@ -10213,7 +10215,7 @@ AlterDatabaseSetStmt:
 
 /*****************************************************************************
  *
- *             DROP DATABASE [ IF EXISTS ]
+ *             DROP DATABASE [ IF EXISTS ] dbname [ [ WITH ] ( options ) ]
  *
  * This is implicitly CASCADE, no need for drop behavior
  *****************************************************************************/
@@ -10223,6 +10225,7 @@ DropdbStmt: DROP DATABASE database_name
                                        DropdbStmt *n = makeNode(DropdbStmt);
                                        n->dbname = $3;
                                        n->missing_ok = false;
+                                       n->options = NULL;
                                        $$ = (Node *)n;
                                }
                        | DROP DATABASE IF_P EXISTS database_name
@@ -10230,10 +10233,48 @@ DropdbStmt: DROP DATABASE database_name
                                        DropdbStmt *n = makeNode(DropdbStmt);
                                        n->dbname = $5;
                                        n->missing_ok = true;
+                                       n->options = NULL;
                                        $$ = (Node *)n;
                                }
+                       | DROP DATABASE database_name opt_with '(' drop_option_list ')'
+                               {
+                                       DropdbStmt *n = makeNode(DropdbStmt);
+                                       n->dbname = $3;
+                                       n->missing_ok = false;
+                                       n->options = $6;
+                                       $$ = (Node *)n;
+                               }
+                       | DROP DATABASE IF_P EXISTS database_name opt_with '(' drop_option_list ')'
+                               {
+                                       DropdbStmt *n = makeNode(DropdbStmt);
+                                       n->dbname = $5;
+                                       n->missing_ok = true;
+                                       n->options = $8;
+                                       $$ = (Node *)n;
+                               }
+               ;
+
+drop_option_list:
+                       drop_option
+                               {
+                                       $$ = list_make1((Node *) $1);
+                               }
+                       | drop_option_list ',' drop_option
+                               {
+                                       $$ = lappend($1, (Node *) $3);
+                               }
                ;
 
+/*
+ * Currently only the FORCE option is supported, but the syntax is designed
+ * to be extensible so that we can add more options in the future if required.
+ */
+drop_option:
+                       FORCE
+                               {
+                                       $$ = makeDefElem("force", NULL, @1);
+                               }
+               ;
 
 /*****************************************************************************
  *
index 3da53074b18412a40b385338ef0bbbd6dce785f1..13bcbe77de74ea7cef0519b4d3c2478e9ae3ff8d 100644 (file)
@@ -52,6 +52,8 @@
 #include "access/xact.h"
 #include "access/xlog.h"
 #include "catalog/catalog.h"
+#include "catalog/pg_authid.h"
+#include "commands/dbcommands.h"
 #include "miscadmin.h"
 #include "pgstat.h"
 #include "storage/proc.h"
@@ -2970,6 +2972,118 @@ CountOtherDBBackends(Oid databaseId, int *nbackends, int *nprepared)
        return true;                            /* timed out, still conflicts */
 }
 
+/*
+ * Terminate existing connections to the specified database. This routine
+ * is used by the DROP DATABASE command when user has asked to forcefully
+ * drop the database.
+ *
+ * The current backend is always ignored; it is caller's responsibility to
+ * check whether the current backend uses the given DB, if it's important.
+ *
+ * It doesn't allow to terminate the connections even if there is a one
+ * backend with the prepared transaction in the target database.
+ */
+void
+TerminateOtherDBBackends(Oid databaseId)
+{
+       ProcArrayStruct *arrayP = procArray;
+       List       *pids = NIL;
+       int                     nprepared = 0;
+       int                     i;
+
+       LWLockAcquire(ProcArrayLock, LW_SHARED);
+
+       for (i = 0; i < procArray->numProcs; i++)
+       {
+               int                     pgprocno = arrayP->pgprocnos[i];
+               PGPROC     *proc = &allProcs[pgprocno];
+
+               if (proc->databaseId != databaseId)
+                       continue;
+               if (proc == MyProc)
+                       continue;
+
+               if (proc->pid != 0)
+                       pids = lappend_int(pids, proc->pid);
+               else
+                       nprepared++;
+       }
+
+       LWLockRelease(ProcArrayLock);
+
+       if (nprepared > 0)
+               ereport(ERROR,
+                               (errcode(ERRCODE_OBJECT_IN_USE),
+                                errmsg("database \"%s\" is being used by prepared transaction",
+                                               get_database_name(databaseId)),
+                                errdetail_plural("There is %d prepared transaction using the database.",
+                                                                 "There are %d prepared transactions using the database.",
+                                                                 nprepared,
+                                                                 nprepared)));
+
+       if (pids)
+       {
+               ListCell   *lc;
+
+               /*
+                * Check whether we have the necessary rights to terminate other
+                * sessions.  We don't terminate any session untill we ensure that we
+                * have rights on all the sessions to be terminated.  These checks are
+                * the same as we do in pg_terminate_backend.
+                *
+                * In this case we don't raise some warnings - like "PID %d is not a
+                * PostgreSQL server process", because for us already finished session
+                * is not a problem.
+                */
+               foreach(lc, pids)
+               {
+                       int                     pid = lfirst_int(lc);
+                       PGPROC     *proc = BackendPidGetProc(pid);
+
+                       if (proc != NULL)
+                       {
+                               /* Only allow superusers to signal superuser-owned backends. */
+                               if (superuser_arg(proc->roleId) && !superuser())
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                                        (errmsg("must be a superuser to terminate superuser process"))));
+
+                               /* Users can signal backends they have role membership in. */
+                               if (!has_privs_of_role(GetUserId(), proc->roleId) &&
+                                       !has_privs_of_role(GetUserId(), DEFAULT_ROLE_SIGNAL_BACKENDID))
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_INSUFFICIENT_PRIVILEGE),
+                                                        (errmsg("must be a member of the role whose process is being terminated or member of pg_signal_backend"))));
+                       }
+               }
+
+               /*
+                * There's a race condition here: once we release the ProcArrayLock,
+                * it's possible for the session to exit before we issue kill.  That
+                * race condition possibility seems too unlikely to worry about.  See
+                * pg_signal_backend.
+                */
+               foreach(lc, pids)
+               {
+                       int                     pid = lfirst_int(lc);
+                       PGPROC     *proc = BackendPidGetProc(pid);
+
+                       if (proc != NULL)
+                       {
+                               /*
+                                * If we have setsid(), signal the backend's whole process
+                                * group
+                                */
+#ifdef HAVE_SETSID
+                               (void) kill(-pid, SIGTERM);
+#else
+                               (void) kill(pid, SIGTERM);
+#endif
+                       }
+               }
+       }
+}
+
 /*
  * ProcArraySetReplicationSlotXmin
  *
index 6787d8e66d35635c1081c6f300afdd0885bc7acd..3a03ca7e2f0d3ac2b41bb32dde4501fff3f0177b 100644 (file)
@@ -595,13 +595,9 @@ standard_ProcessUtility(PlannedStmt *pstmt,
                        break;
 
                case T_DropdbStmt:
-                       {
-                               DropdbStmt *stmt = (DropdbStmt *) parsetree;
-
-                               /* no event triggers for global objects */
-                               PreventInTransactionBlock(isTopLevel, "DROP DATABASE");
-                               dropdb(stmt->dbname, stmt->missing_ok);
-                       }
+                       /* no event triggers for global objects */
+                       PreventInTransactionBlock(isTopLevel, "DROP DATABASE");
+                       DropDatabase(pstate, (DropdbStmt *) parsetree);
                        break;
 
                        /* Query-level asynchronous notification */
index 2b1e3cda4a40272925847ec920d9892590fd54a8..98c917bf7aefd98483296d52d7e5ca57d1d0ed81 100644 (file)
@@ -2844,6 +2844,10 @@ psql_completion(const char *text, int start, int end)
                COMPLETE_WITH_FUNCTION_ARG(prev2_wd);
        else if (Matches("DROP", "FOREIGN"))
                COMPLETE_WITH("DATA WRAPPER", "TABLE");
+       else if (Matches("DROP", "DATABASE", MatchAny))
+               COMPLETE_WITH("WITH (");
+       else if (HeadMatches("DROP", "DATABASE") && (ends_with(prev_wd, '(')))
+               COMPLETE_WITH("FORCE");
 
        /* DROP INDEX */
        else if (Matches("DROP", "INDEX"))
index 154c8157ee227b4a60f645fb5f69ad8447bbf948..d1e91a2455986ffdbaf56559c0e28a9b24c6c2fd 100644 (file)
@@ -20,7 +20,8 @@
 #include "nodes/parsenodes.h"
 
 extern Oid     createdb(ParseState *pstate, const CreatedbStmt *stmt);
-extern void dropdb(const char *dbname, bool missing_ok);
+extern void dropdb(const char *dbname, bool missing_ok, bool force);
+extern void DropDatabase(ParseState *pstate, DropdbStmt *stmt);
 extern ObjectAddress RenameDatabase(const char *oldname, const char *newname);
 extern Oid     AlterDatabase(ParseState *pstate, AlterDatabaseStmt *stmt, bool isTopLevel);
 extern Oid     AlterDatabaseSet(AlterDatabaseSetStmt *stmt);
index d93a79a55470055b1c4b66117774ccace4897119..ff626cbe61e55aa018d9b811863f3df5ac649efe 100644 (file)
@@ -3145,6 +3145,7 @@ typedef struct DropdbStmt
        NodeTag         type;
        char       *dbname;                     /* database to drop */
        bool            missing_ok;             /* skip error if db is missing? */
+       List       *options;            /* currently only FORCE is supported */
 } DropdbStmt;
 
 /* ----------------------
index da8b672096f3d90e95b06b8337d4456976b90125..8f67b860e7289b2bd4502f07ebb107ff13660ed6 100644 (file)
@@ -113,6 +113,7 @@ extern void CancelDBBackends(Oid databaseid, ProcSignalReason sigmode, bool conf
 extern int     CountUserBackends(Oid roleid);
 extern bool CountOtherDBBackends(Oid databaseId,
                                                                 int *nbackends, int *nprepared);
+extern void TerminateOtherDBBackends(Oid databaseId);
 
 extern void XidCacheRemoveRunningXids(TransactionId xid,
                                                                          int nxids, const TransactionId *xids,
index 6a17467717741931abbb6ac420e6757e27e0c848..5e44c2c3ceafbee1acd00a391e9f81535e419074 100644 (file)
@@ -330,3 +330,13 @@ HINT:  Specify the argument list to select the routine unambiguously.
 -- cleanup
 DROP PROCEDURE test_ambiguous_procname(int);
 DROP PROCEDURE test_ambiguous_procname(text);
+-- This test checks both the functionality of 'if exists' and the syntax
+-- of the drop database command.
+drop database test_database_exists (force);
+ERROR:  database "test_database_exists" does not exist
+drop database test_database_exists with (force);
+ERROR:  database "test_database_exists" does not exist
+drop database if exists test_database_exists (force);
+NOTICE:  database "test_database_exists" does not exist, skipping
+drop database if exists test_database_exists with (force);
+NOTICE:  database "test_database_exists" does not exist, skipping
index 8a791b1ef2dd93bdcdf41194b1f6576226711f92..ac6168b91f831c0df6d7fad2a68938ab2d5485c1 100644 (file)
@@ -295,3 +295,10 @@ DROP ROUTINE IF EXISTS test_ambiguous_procname;
 -- cleanup
 DROP PROCEDURE test_ambiguous_procname(int);
 DROP PROCEDURE test_ambiguous_procname(text);
+
+-- This test checks both the functionality of 'if exists' and the syntax
+-- of the drop database command.
+drop database test_database_exists (force);
+drop database test_database_exists with (force);
+drop database if exists test_database_exists (force);
+drop database if exists test_database_exists with (force);