COPY (INSERT/UPDATE/DELETE .. RETURNING ..)
authorTeodor Sigaev <[email protected]>
Fri, 27 Nov 2015 16:11:22 +0000 (19:11 +0300)
committerTeodor Sigaev <[email protected]>
Fri, 27 Nov 2015 16:11:22 +0000 (19:11 +0300)
Attached is a patch for being able to do COPY (query) without a CTE.

Author: Marko Tiikkaja
Review: Michael Paquier

doc/src/sgml/ref/copy.sgml
src/backend/commands/copy.c
src/backend/parser/gram.y
src/bin/psql/copy.c
src/include/nodes/parsenodes.h
src/test/regress/expected/copydml.out [new file with mode: 0644]
src/test/regress/parallel_schedule
src/test/regress/serial_schedule
src/test/regress/sql/copydml.sql [new file with mode: 0644]

index 2850b4763f26050d136ecd441198a1530392ae2a..07e2f45196f054a7be134b1b8b4b19089086de02 100644 (file)
@@ -112,10 +112,17 @@ COPY { <replaceable class="parameter">table_name</replaceable> [ ( <replaceable
     <term><replaceable class="parameter">query</replaceable></term>
     <listitem>
      <para>
-      A <xref linkend="sql-select"> or
-      <xref linkend="sql-values"> command
-      whose results are to be copied.
-      Note that parentheses are required around the query.
+      A <xref linkend="sql-select">, <xref linkend="sql-values">,
+      <xref linkend="sql-insert">, <xref linkend="sql-update"> or
+      <xref linkend="sql-delete"> command whose results are to be
+      copied.  Note that parentheses are required around the query.
+     </para>
+     <para>
+      For <command>INSERT</>, <command>UPDATE</> and
+      <command>DELETE</> queries a RETURNING clause must be provided,
+      and the target relation must not have a conditional rule, nor
+      an <literal>ALSO</> rule, nor an <literal>INSTEAD</> rule
+      that expands to multiple statements.
      </para>
     </listitem>
    </varlistentry>
index 47c6262ec2b58add29a2f7f7389a7fc4243dcb62..7dbe04e5138673706cd3958f9831bc1e66f0d53d 100644 (file)
@@ -201,7 +201,7 @@ typedef struct CopyStateData
        int                     raw_buf_len;    /* total # of bytes stored */
 } CopyStateData;
 
-/* DestReceiver for COPY (SELECT) TO */
+/* DestReceiver for COPY (query) TO */
 typedef struct
 {
        DestReceiver pub;                       /* publicly-known function pointers */
@@ -772,7 +772,8 @@ CopyLoadRawBuf(CopyState cstate)
  *
  * Either unload or reload contents of table <relation>, depending on <from>.
  * (<from> = TRUE means we are inserting into the table.)  In the "TO" case
- * we also support copying the output of an arbitrary SELECT query.
+ * we also support copying the output of an arbitrary SELECT, INSERT, UPDATE
+ * or DELETE query.
  *
  * If <pipe> is false, transfer is between the table and the file named
  * <filename>.  Otherwise, transfer is between the table and our regular
@@ -1374,11 +1375,11 @@ BeginCopy(bool is_from,
                Assert(!is_from);
                cstate->rel = NULL;
 
-               /* Don't allow COPY w/ OIDs from a select */
+               /* Don't allow COPY w/ OIDs from a query */
                if (cstate->oids)
                        ereport(ERROR,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
-                                        errmsg("COPY (SELECT) WITH OIDS is not supported")));
+                                        errmsg("COPY (query) WITH OIDS is not supported")));
 
                /*
                 * Run parse analysis and rewrite.  Note this also acquires sufficient
@@ -1393,9 +1394,36 @@ BeginCopy(bool is_from,
                rewritten = pg_analyze_and_rewrite((Node *) copyObject(raw_query),
                                                                                   queryString, NULL, 0);
 
-               /* We don't expect more or less than one result query */
-               if (list_length(rewritten) != 1)
-                       elog(ERROR, "unexpected rewrite result");
+               /* check that we got back something we can work with */
+               if (rewritten == NIL)
+               {
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                       errmsg("DO INSTEAD NOTHING rules are not supported for COPY")));
+               }
+               else if (list_length(rewritten) > 1)
+               {
+                       ListCell *lc;
+
+                       /* examine queries to determine which error message to issue */
+                       foreach(lc, rewritten)
+                       {
+                               Query     *q = (Query *) lfirst(lc);
+
+                               if (q->querySource == QSRC_QUAL_INSTEAD_RULE)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("conditional DO INSTEAD rules are not supported for COPY")));
+                               if (q->querySource == QSRC_NON_INSTEAD_RULE)
+                                       ereport(ERROR,
+                                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                        errmsg("DO ALSO rules are not supported for the COPY")));
+                       }
+
+                       ereport(ERROR,
+                                       (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                        errmsg("multi-statement DO INSTEAD rules are not supported for COPY")));
+               }
 
                query = (Query *) linitial(rewritten);
 
@@ -1406,9 +1434,24 @@ BeginCopy(bool is_from,
                                        (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
                                         errmsg("COPY (SELECT INTO) is not supported")));
 
-               Assert(query->commandType == CMD_SELECT);
                Assert(query->utilityStmt == NULL);
 
+               /*
+                * Similarly the grammar doesn't enforce the presence of a RETURNING
+                * clause, but this is required here.
+                */
+               if (query->commandType != CMD_SELECT &&
+                       query->returningList == NIL)
+               {
+                       Assert(query->commandType == CMD_INSERT ||
+                                  query->commandType == CMD_UPDATE ||
+                                  query->commandType == CMD_DELETE);
+
+                       ereport(ERROR,
+                                               (errcode(ERRCODE_FEATURE_NOT_SUPPORTED),
+                                                errmsg("COPY query must have a RETURNING clause")));
+               }
+
                /* plan the query */
                plan = pg_plan_query(query, 0, NULL);
 
index fba91d53ac3f3e70e5b196ebfbd35efebf55eed3..7916df8b099aa513532d2f0c607caf2897159c93 100644 (file)
@@ -2561,9 +2561,12 @@ ClosePortalStmt:
  *
  *             QUERY :
  *                             COPY relname [(columnList)] FROM/TO file [WITH] [(options)]
- *                             COPY ( SELECT ... ) TO file     [WITH] [(options)]
+ *                             COPY ( query ) TO file  [WITH] [(options)]
  *
- *                             where 'file' can be one of:
+ *                             where 'query' can be one of:
+ *                             { SELECT | UPDATE | INSERT | DELETE }
+ *
+ *                             and 'file' can be one of:
  *                             { PROGRAM 'command' | STDIN | STDOUT | 'filename' }
  *
  *                             In the preferred syntax the options are comma-separated
@@ -2574,7 +2577,7 @@ ClosePortalStmt:
  *                             COPY [ BINARY ] table [ WITH OIDS ] FROM/TO file
  *                                     [ [ USING ] DELIMITERS 'delimiter' ] ]
  *                                     [ WITH NULL AS 'null string' ]
- *                             This option placement is not supported with COPY (SELECT...).
+ *                             This option placement is not supported with COPY (query...).
  *
  *****************************************************************************/
 
@@ -2607,16 +2610,16 @@ CopyStmt:       COPY opt_binary qualified_name opt_column_list opt_oids
                                                n->options = list_concat(n->options, $11);
                                        $$ = (Node *)n;
                                }
-                       | COPY select_with_parens TO opt_program copy_file_name opt_with copy_options
+                       | COPY '(' PreparableStmt ')' TO opt_program copy_file_name opt_with copy_options
                                {
                                        CopyStmt *n = makeNode(CopyStmt);
                                        n->relation = NULL;
-                                       n->query = $2;
+                                       n->query = $3;
                                        n->attlist = NIL;
                                        n->is_from = false;
-                                       n->is_program = $4;
-                                       n->filename = $5;
-                                       n->options = $7;
+                                       n->is_program = $6;
+                                       n->filename = $7;
+                                       n->options = $9;
 
                                        if (n->is_program && n->filename == NULL)
                                                ereport(ERROR,
index f1eb518de79e882831455f4a18a9975396a01a81..c0fc28373a93a7e7b9f13268eea67a4df18d05af 100644 (file)
  *
  * The documented syntax is:
  *     \copy tablename [(columnlist)] from|to filename [options]
- *     \copy ( select stmt ) to filename [options]
+ *     \copy ( query stmt ) to filename [options]
  *
  * where 'filename' can be one of the following:
  *     '<file path>' | PROGRAM '<command>' | stdin | stdout | pstdout | pstdout
+ * and 'query' can be one of the following:
+ *  SELECT | UPDATE | INSERT | DELETE
  *
  * An undocumented fact is that you can still write BINARY before the
  * tablename; this is a hangover from the pre-7.3 syntax.  The options
@@ -118,7 +120,7 @@ parse_slash_copy(const char *args)
                        goto error;
        }
 
-       /* Handle COPY (SELECT) case */
+       /* Handle COPY (query) case */
        if (token[0] == '(')
        {
                int                     parens = 1;
index 9e1c48d8ca4e93d1d6befef1b8e37aa8c47053bb..9142e94b070d9a16c0eacfe9b4b9231f5c4914c2 100644 (file)
@@ -1680,7 +1680,8 @@ typedef struct CopyStmt
 {
        NodeTag         type;
        RangeVar   *relation;           /* the relation to copy */
-       Node       *query;                      /* the SELECT query to copy */
+       Node       *query;                      /* the query (SELECT or DML statement with
+                                                                * RETURNING) to copy */
        List       *attlist;            /* List of column names (as Strings), or NIL
                                                                 * for all columns */
        bool            is_from;                /* TO or FROM */
diff --git a/src/test/regress/expected/copydml.out b/src/test/regress/expected/copydml.out
new file mode 100644 (file)
index 0000000..1b53396
--- /dev/null
@@ -0,0 +1,112 @@
+--
+-- Test cases for COPY (INSERT/UPDATE/DELETE) TO
+--
+create table copydml_test (id serial, t text);
+insert into copydml_test (t) values ('a');
+insert into copydml_test (t) values ('b');
+insert into copydml_test (t) values ('c');
+insert into copydml_test (t) values ('d');
+insert into copydml_test (t) values ('e');
+--
+-- Test COPY (insert/update/delete ...)
+--
+copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+6
+copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+6
+copy (delete from copydml_test where t = 'g' returning id) to stdout;
+6
+--
+-- Test \copy (insert/update/delete ...)
+--
+\copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+7
+\copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+7
+\copy (delete from copydml_test where t = 'g' returning id) to stdout;
+7
+-- Error cases
+copy (insert into copydml_test default values) to stdout;
+ERROR:  COPY query must have a RETURNING clause
+copy (update copydml_test set t = 'g') to stdout;
+ERROR:  COPY query must have a RETURNING clause
+copy (delete from copydml_test) to stdout;
+ERROR:  COPY query must have a RETURNING clause
+create rule qqq as on insert to copydml_test do instead nothing;
+copy (insert into copydml_test default values) to stdout;
+ERROR:  DO INSTEAD NOTHING rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test do also delete from copydml_test;
+copy (insert into copydml_test default values) to stdout;
+ERROR:  DO ALSO rules are not supported for the COPY
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test do instead (delete from copydml_test; delete from copydml_test);
+copy (insert into copydml_test default values) to stdout;
+ERROR:  multi-statement DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test where new.t <> 'f' do instead delete from copydml_test;
+copy (insert into copydml_test default values) to stdout;
+ERROR:  conditional DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test do instead nothing;
+copy (update copydml_test set t = 'f') to stdout;
+ERROR:  DO INSTEAD NOTHING rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test do also delete from copydml_test;
+copy (update copydml_test set t = 'f') to stdout;
+ERROR:  DO ALSO rules are not supported for the COPY
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test do instead (delete from copydml_test; delete from copydml_test);
+copy (update copydml_test set t = 'f') to stdout;
+ERROR:  multi-statement DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test where new.t <> 'f' do instead delete from copydml_test;
+copy (update copydml_test set t = 'f') to stdout;
+ERROR:  conditional DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test do instead nothing;
+copy (delete from copydml_test) to stdout;
+ERROR:  DO INSTEAD NOTHING rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test do also insert into copydml_test default values;
+copy (delete from copydml_test) to stdout;
+ERROR:  DO ALSO rules are not supported for the COPY
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test do instead (insert into copydml_test default values; insert into copydml_test default values);
+copy (delete from copydml_test) to stdout;
+ERROR:  multi-statement DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test where old.t <> 'f' do instead insert into copydml_test default values;
+copy (delete from copydml_test) to stdout;
+ERROR:  conditional DO INSTEAD rules are not supported for COPY
+drop rule qqq on copydml_test;
+-- triggers
+create function qqq_trig() returns trigger as $$
+begin
+if tg_op in ('INSERT', 'UPDATE') then
+    raise notice '% %', tg_op, new.id;
+    return new;
+else
+    raise notice '% %', tg_op, old.id;
+    return old;
+end if;
+end
+$$ language plpgsql;
+create trigger qqqbef before insert or update or delete on copydml_test
+    for each row execute procedure qqq_trig();
+create trigger qqqaf after insert or update or delete on copydml_test
+    for each row execute procedure qqq_trig();
+copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+NOTICE:  INSERT 8
+8
+NOTICE:  INSERT 8
+copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+NOTICE:  UPDATE 8
+8
+NOTICE:  UPDATE 8
+copy (delete from copydml_test where t = 'g' returning id) to stdout;
+NOTICE:  DELETE 8
+8
+NOTICE:  DELETE 8
+drop table copydml_test;
+drop function qqq_trig();
index 3987b4c700fd1ca6dd3a2b62e31c69d7efa14a9c..b1bc7c716a3365e68dc6887a15b0b7599ab98484 100644 (file)
@@ -48,7 +48,7 @@ test: create_function_2
 # execute two copy tests parallel, to check that copy itself
 # is concurrent safe.
 # ----------
-test: copy copyselect
+test: copy copyselect copydml
 
 # ----------
 # More groups of parallel tests
index 379f2729be4951c097ddb20d002c13c1b50e6a19..ade9ef15530d442cabc669249e2acdf879394ad4 100644 (file)
@@ -57,6 +57,7 @@ test: create_table
 test: create_function_2
 test: copy
 test: copyselect
+test: copydml
 test: create_misc
 test: create_operator
 test: create_index
diff --git a/src/test/regress/sql/copydml.sql b/src/test/regress/sql/copydml.sql
new file mode 100644 (file)
index 0000000..9a29f9c
--- /dev/null
@@ -0,0 +1,91 @@
+--
+-- Test cases for COPY (INSERT/UPDATE/DELETE) TO
+--
+create table copydml_test (id serial, t text);
+insert into copydml_test (t) values ('a');
+insert into copydml_test (t) values ('b');
+insert into copydml_test (t) values ('c');
+insert into copydml_test (t) values ('d');
+insert into copydml_test (t) values ('e');
+
+--
+-- Test COPY (insert/update/delete ...)
+--
+copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+copy (delete from copydml_test where t = 'g' returning id) to stdout;
+
+--
+-- Test \copy (insert/update/delete ...)
+--
+\copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+\copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+\copy (delete from copydml_test where t = 'g' returning id) to stdout;
+
+-- Error cases
+copy (insert into copydml_test default values) to stdout;
+copy (update copydml_test set t = 'g') to stdout;
+copy (delete from copydml_test) to stdout;
+
+create rule qqq as on insert to copydml_test do instead nothing;
+copy (insert into copydml_test default values) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test do also delete from copydml_test;
+copy (insert into copydml_test default values) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test do instead (delete from copydml_test; delete from copydml_test);
+copy (insert into copydml_test default values) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on insert to copydml_test where new.t <> 'f' do instead delete from copydml_test;
+copy (insert into copydml_test default values) to stdout;
+drop rule qqq on copydml_test;
+
+create rule qqq as on update to copydml_test do instead nothing;
+copy (update copydml_test set t = 'f') to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test do also delete from copydml_test;
+copy (update copydml_test set t = 'f') to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test do instead (delete from copydml_test; delete from copydml_test);
+copy (update copydml_test set t = 'f') to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on update to copydml_test where new.t <> 'f' do instead delete from copydml_test;
+copy (update copydml_test set t = 'f') to stdout;
+drop rule qqq on copydml_test;
+
+create rule qqq as on delete to copydml_test do instead nothing;
+copy (delete from copydml_test) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test do also insert into copydml_test default values;
+copy (delete from copydml_test) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test do instead (insert into copydml_test default values; insert into copydml_test default values);
+copy (delete from copydml_test) to stdout;
+drop rule qqq on copydml_test;
+create rule qqq as on delete to copydml_test where old.t <> 'f' do instead insert into copydml_test default values;
+copy (delete from copydml_test) to stdout;
+drop rule qqq on copydml_test;
+
+-- triggers
+create function qqq_trig() returns trigger as $$
+begin
+if tg_op in ('INSERT', 'UPDATE') then
+    raise notice '% %', tg_op, new.id;
+    return new;
+else
+    raise notice '% %', tg_op, old.id;
+    return old;
+end if;
+end
+$$ language plpgsql;
+create trigger qqqbef before insert or update or delete on copydml_test
+    for each row execute procedure qqq_trig();
+create trigger qqqaf after insert or update or delete on copydml_test
+    for each row execute procedure qqq_trig();
+
+copy (insert into copydml_test (t) values ('f') returning id) to stdout;
+copy (update copydml_test set t = 'g' where t = 'f' returning id) to stdout;
+copy (delete from copydml_test where t = 'g' returning id) to stdout;
+
+drop table copydml_test;
+drop function qqq_trig();