Add libpq pipeline mode support to pgbench
authorAlvaro Herrera <[email protected]>
Mon, 15 Mar 2021 21:33:03 +0000 (18:33 -0300)
committerAlvaro Herrera <[email protected]>
Mon, 15 Mar 2021 21:33:03 +0000 (18:33 -0300)
New metacommands \startpipeline and \endpipeline allow the user to run
queries in libpq pipeline mode.

Author: Daniel Vérité <[email protected]>
Reviewed-by: Álvaro Herrera <[email protected]>
Discussion: https://postgr.es/m/b4e34135-2bd9-4b8a-94ca-27d760da26d7@manitou-mail.org

doc/src/sgml/ref/pgbench.sgml
src/bin/pgbench/pgbench.c
src/bin/pgbench/t/001_pgbench_with_server.pl

index 299d93b24198ec3b6440ef76b230195b6444f28a..50cf22ba6baddf4c9efd08c95f139db0283382c5 100644 (file)
@@ -1110,6 +1110,12 @@ pgbench <optional> <replaceable>options</replaceable> </optional> <replaceable>d
       row, the last value is kept.
      </para>
 
+     <para>
+      <literal>\gset</literal> and <literal>\aset</literal> cannot be used in
+      pipeline mode, since the query results are not yet available by the time
+      the commands would need them.
+     </para>
+
      <para>
       The following example puts the final account balance from the first query
       into variable <replaceable>abalance</replaceable>, and fills variables
@@ -1270,6 +1276,22 @@ SELECT 4 AS four \; SELECT 5 AS five \aset
 </programlisting></para>
     </listitem>
    </varlistentry>
+
+   <varlistentry id='pgbench-metacommand-pipeline'>
+    <term><literal>\startpipeline</literal></term>
+    <term><literal>\endpipeline</literal></term>
+
+    <listitem>
+      <para>
+        These commands delimit the start and end of a pipeline of SQL
+        statements.  In pipeline mode, statements are sent to the server
+        without waiting for the results of previous statements.  See
+        <xref linkend="libpq-pipeline-mode"/> for more details.
+        Pipeline mode requires the use of extended query protocol.
+     </para>
+    </listitem>
+   </varlistentry>
+
   </variablelist>
  </refsect2>
 
index f6a214669c1a312227b33891ea0ccd1742dba735..e69d43b26ba91d95fe332de74725e16577cc46b4 100644 (file)
@@ -395,10 +395,11 @@ typedef enum
     *
     * CSTATE_START_COMMAND starts the execution of a command.  On a SQL
     * command, the command is sent to the server, and we move to
-    * CSTATE_WAIT_RESULT state.  On a \sleep meta-command, the timer is set,
-    * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
-    * meta-commands are executed immediately.  If the command about to start
-    * is actually beyond the end of the script, advance to CSTATE_END_TX.
+    * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
+    * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
+    * wait for it to expire. Other meta-commands are executed immediately. If
+    * the command about to start is actually beyond the end of the script,
+    * advance to CSTATE_END_TX.
     *
     * CSTATE_WAIT_RESULT waits until we get a result set back from the server
     * for the current command.
@@ -530,7 +531,9 @@ typedef enum MetaCommand
    META_IF,                    /* \if */
    META_ELIF,                  /* \elif */
    META_ELSE,                  /* \else */
-   META_ENDIF                  /* \endif */
+   META_ENDIF,                 /* \endif */
+   META_STARTPIPELINE,         /* \startpipeline */
+   META_ENDPIPELINE            /* \endpipeline */
 } MetaCommand;
 
 typedef enum QueryMode
@@ -2568,6 +2571,10 @@ getMetaCommand(const char *cmd)
        mc = META_GSET;
    else if (pg_strcasecmp(cmd, "aset") == 0)
        mc = META_ASET;
+   else if (pg_strcasecmp(cmd, "startpipeline") == 0)
+       mc = META_STARTPIPELINE;
+   else if (pg_strcasecmp(cmd, "endpipeline") == 0)
+       mc = META_ENDPIPELINE;
    else
        mc = META_NONE;
    return mc;
@@ -2757,11 +2764,25 @@ sendCommand(CState *st, Command *command)
                if (commands[j]->type != SQL_COMMAND)
                    continue;
                preparedStatementName(name, st->use_file, j);
-               res = PQprepare(st->con, name,
-                               commands[j]->argv[0], commands[j]->argc - 1, NULL);
-               if (PQresultStatus(res) != PGRES_COMMAND_OK)
-                   pg_log_error("%s", PQerrorMessage(st->con));
-               PQclear(res);
+               if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+               {
+                   res = PQprepare(st->con, name,
+                                   commands[j]->argv[0], commands[j]->argc - 1, NULL);
+                   if (PQresultStatus(res) != PGRES_COMMAND_OK)
+                       pg_log_error("%s", PQerrorMessage(st->con));
+                   PQclear(res);
+               }
+               else
+               {
+                   /*
+                    * In pipeline mode, we use asynchronous functions. If a
+                    * server-side error occurs, it will be processed later
+                    * among the other results.
+                    */
+                   if (!PQsendPrepare(st->con, name,
+                                      commands[j]->argv[0], commands[j]->argc - 1, NULL))
+                       pg_log_error("%s", PQerrorMessage(st->con));
+               }
            }
            st->prepared[st->use_file] = true;
        }
@@ -2802,10 +2823,11 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
    int         qrynum = 0;
 
    /*
-    * varprefix should be set only with \gset or \aset, and SQL commands do
-    * not need it.
+    * varprefix should be set only with \gset or \aset, and \endpipeline and
+    * SQL commands do not need it.
     */
    Assert((meta == META_NONE && varprefix == NULL) ||
+          ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
           ((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
 
    res = PQgetResult(st->con);
@@ -2874,6 +2896,13 @@ readCommandResponse(CState *st, MetaCommand meta, char *varprefix)
                /* otherwise the result is simply thrown away by PQclear below */
                break;
 
+           case PGRES_PIPELINE_SYNC:
+               pg_log_debug("client %d pipeline ending", st->id);
+               if (PQexitPipelineMode(st->con) != 1)
+                   pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
+                                PQerrorMessage(st->con));
+               break;
+
            default:
                /* anything else is unexpected */
                pg_log_error("client %d script %d aborted in command %d query %d: %s",
@@ -3127,13 +3156,36 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
                /* Execute the command */
                if (command->type == SQL_COMMAND)
                {
+                   /* disallow \aset and \gset in pipeline mode */
+                   if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+                   {
+                       if (command->meta == META_GSET)
+                       {
+                           commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
+                           st->state = CSTATE_ABORTED;
+                           break;
+                       }
+                       else if (command->meta == META_ASET)
+                       {
+                           commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
+                           st->state = CSTATE_ABORTED;
+                           break;
+                       }
+                   }
+
                    if (!sendCommand(st, command))
                    {
                        commandFailed(st, "SQL", "SQL command send failed");
                        st->state = CSTATE_ABORTED;
                    }
                    else
-                       st->state = CSTATE_WAIT_RESULT;
+                   {
+                       /* Wait for results, unless in pipeline mode */
+                       if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+                           st->state = CSTATE_WAIT_RESULT;
+                       else
+                           st->state = CSTATE_END_COMMAND;
+                   }
                }
                else if (command->type == META_COMMAND)
                {
@@ -3273,7 +3325,15 @@ advanceConnectionState(TState *thread, CState *st, StatsData *agg)
                if (readCommandResponse(st,
                                        sql_script[st->use_file].commands[st->command]->meta,
                                        sql_script[st->use_file].commands[st->command]->varprefix))
-                   st->state = CSTATE_END_COMMAND;
+               {
+                   /*
+                    * outside of pipeline mode: stop reading results.
+                    * pipeline mode: continue reading results until an
+                    * end-of-pipeline response.
+                    */
+                   if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+                       st->state = CSTATE_END_COMMAND;
+               }
                else
                    st->state = CSTATE_ABORTED;
                break;
@@ -3516,6 +3576,45 @@ executeMetaCommand(CState *st, pg_time_usec_t *now)
            return CSTATE_ABORTED;
        }
    }
+   else if (command->meta == META_STARTPIPELINE)
+   {
+       /*
+        * In pipeline mode, we use a workflow based on libpq pipeline
+        * functions.
+        */
+       if (querymode == QUERY_SIMPLE)
+       {
+           commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
+           return CSTATE_ABORTED;
+       }
+
+       if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+       {
+           commandFailed(st, "startpipeline", "already in pipeline mode");
+           return CSTATE_ABORTED;
+       }
+       if (PQenterPipelineMode(st->con) == 0)
+       {
+           commandFailed(st, "startpipeline", "failed to enter pipeline mode");
+           return CSTATE_ABORTED;
+       }
+   }
+   else if (command->meta == META_ENDPIPELINE)
+   {
+       if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+       {
+           commandFailed(st, "endpipeline", "not in pipeline mode");
+           return CSTATE_ABORTED;
+       }
+       if (!PQpipelineSync(st->con))
+       {
+           commandFailed(st, "endpipeline", "failed to send a pipeline sync");
+           return CSTATE_ABORTED;
+       }
+       /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
+       /* collect pending results before getting out of pipeline mode */
+       return CSTATE_WAIT_RESULT;
+   }
 
    /*
     * executing the expression or shell command might have taken a
@@ -4725,7 +4824,9 @@ process_backslash_command(PsqlScanState sstate, const char *source)
            syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
                         "missing command", NULL, -1);
    }
-   else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
+   else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
+            my_command->meta == META_STARTPIPELINE ||
+            my_command->meta == META_ENDPIPELINE)
    {
        if (my_command->argc != 1)
            syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
index daffc18e521945cd6a953d6272edcae5b6073f52..d11c4e8c242e0e0772c28d78d24ac5e70b23538b 100644 (file)
@@ -41,7 +41,7 @@ sub pgbench
            # filenames are expected to be unique on a test
            if (-e $filename)
            {
-               ok(0, "$filename must not already exists");
+               ok(0, "$filename must not already exist");
                unlink $filename or die "cannot unlink $filename: $!";
            }
            append_to_file($filename, $$files{$fn});
@@ -755,6 +755,83 @@ pgbench(
 }
    });
 
+# Working \startpipeline
+pgbench(
+   '-t 1 -n -M extended',
+   0,
+   [ qr{type: .*/001_pgbench_pipeline}, qr{actually processed: 1/1} ],
+   [],
+   'working \startpipeline',
+   {
+       '001_pgbench_pipeline' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+   });
+
+# Working \startpipeline in prepared query mode
+pgbench(
+   '-t 1 -n -M prepared',
+   0,
+   [ qr{type: .*/001_pgbench_pipeline_prep}, qr{actually processed: 1/1} ],
+   [],
+   'working \startpipeline',
+   {
+       '001_pgbench_pipeline_prep' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+   });
+
+# Try \startpipeline twice
+pgbench(
+   '-t 1 -n -M extended',
+   2,
+   [],
+   [qr{already in pipeline mode}],
+   'error: call \startpipeline twice',
+   {
+       '001_pgbench_pipeline_2' => q{
+-- startpipeline twice
+\startpipeline
+\startpipeline
+}
+   });
+
+# Try to end a pipeline that hasn't started
+pgbench(
+   '-t 1 -n -M extended',
+   2,
+   [],
+   [qr{not in pipeline mode}],
+   'error: \endpipeline with no start',
+   {
+       '001_pgbench_pipeline_3' => q{
+-- pipeline not started
+\endpipeline
+}
+   });
+
+# Try \gset in pipeline mode
+pgbench(
+   '-t 1 -n -M extended',
+   2,
+   [],
+   [qr{gset is not allowed in pipeline mode}],
+   'error: \gset not allowed in pipeline mode',
+   {
+       '001_pgbench_pipeline_4' => q{
+\startpipeline
+select 1 \gset f
+\endpipeline
+}
+   });
+
+
 # trigger many expression errors
 my @errors = (