*
* CSTATE_START_COMMAND starts the execution of a command. On a SQL
* command, the command is sent to the server, and we move to
- * CSTATE_WAIT_RESULT state. On a \sleep meta-command, the timer is set,
- * and we enter the CSTATE_SLEEP state to wait for it to expire. Other
- * meta-commands are executed immediately. If the command about to start
- * is actually beyond the end of the script, advance to CSTATE_END_TX.
+ * CSTATE_WAIT_RESULT state unless in pipeline mode. On a \sleep
+ * meta-command, the timer is set, and we enter the CSTATE_SLEEP state to
+ * wait for it to expire. Other meta-commands are executed immediately. If
+ * the command about to start is actually beyond the end of the script,
+ * advance to CSTATE_END_TX.
*
* CSTATE_WAIT_RESULT waits until we get a result set back from the server
* for the current command.
META_IF, /* \if */
META_ELIF, /* \elif */
META_ELSE, /* \else */
- META_ENDIF /* \endif */
+ META_ENDIF, /* \endif */
+ META_STARTPIPELINE, /* \startpipeline */
+ META_ENDPIPELINE /* \endpipeline */
} MetaCommand;
typedef enum QueryMode
mc = META_GSET;
else if (pg_strcasecmp(cmd, "aset") == 0)
mc = META_ASET;
+ else if (pg_strcasecmp(cmd, "startpipeline") == 0)
+ mc = META_STARTPIPELINE;
+ else if (pg_strcasecmp(cmd, "endpipeline") == 0)
+ mc = META_ENDPIPELINE;
else
mc = META_NONE;
return mc;
if (commands[j]->type != SQL_COMMAND)
continue;
preparedStatementName(name, st->use_file, j);
- res = PQprepare(st->con, name,
- commands[j]->argv[0], commands[j]->argc - 1, NULL);
- if (PQresultStatus(res) != PGRES_COMMAND_OK)
- pg_log_error("%s", PQerrorMessage(st->con));
- PQclear(res);
+ if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+ {
+ res = PQprepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL);
+ if (PQresultStatus(res) != PGRES_COMMAND_OK)
+ pg_log_error("%s", PQerrorMessage(st->con));
+ PQclear(res);
+ }
+ else
+ {
+ /*
+ * In pipeline mode, we use asynchronous functions. If a
+ * server-side error occurs, it will be processed later
+ * among the other results.
+ */
+ if (!PQsendPrepare(st->con, name,
+ commands[j]->argv[0], commands[j]->argc - 1, NULL))
+ pg_log_error("%s", PQerrorMessage(st->con));
+ }
}
st->prepared[st->use_file] = true;
}
int qrynum = 0;
/*
- * varprefix should be set only with \gset or \aset, and SQL commands do
- * not need it.
+ * varprefix should be set only with \gset or \aset, and \endpipeline and
+ * SQL commands do not need it.
*/
Assert((meta == META_NONE && varprefix == NULL) ||
+ ((meta == META_ENDPIPELINE) && varprefix == NULL) ||
((meta == META_GSET || meta == META_ASET) && varprefix != NULL));
res = PQgetResult(st->con);
/* otherwise the result is simply thrown away by PQclear below */
break;
+ case PGRES_PIPELINE_SYNC:
+ pg_log_debug("client %d pipeline ending", st->id);
+ if (PQexitPipelineMode(st->con) != 1)
+ pg_log_error("client %d failed to exit pipeline mode: %s", st->id,
+ PQerrorMessage(st->con));
+ break;
+
default:
/* anything else is unexpected */
pg_log_error("client %d script %d aborted in command %d query %d: %s",
/* Execute the command */
if (command->type == SQL_COMMAND)
{
+ /* disallow \aset and \gset in pipeline mode */
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+ {
+ if (command->meta == META_GSET)
+ {
+ commandFailed(st, "gset", "\\gset is not allowed in pipeline mode");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ else if (command->meta == META_ASET)
+ {
+ commandFailed(st, "aset", "\\aset is not allowed in pipeline mode");
+ st->state = CSTATE_ABORTED;
+ break;
+ }
+ }
+
if (!sendCommand(st, command))
{
commandFailed(st, "SQL", "SQL command send failed");
st->state = CSTATE_ABORTED;
}
else
- st->state = CSTATE_WAIT_RESULT;
+ {
+ /* Wait for results, unless in pipeline mode */
+ if (PQpipelineStatus(st->con) == PQ_PIPELINE_OFF)
+ st->state = CSTATE_WAIT_RESULT;
+ else
+ st->state = CSTATE_END_COMMAND;
+ }
}
else if (command->type == META_COMMAND)
{
if (readCommandResponse(st,
sql_script[st->use_file].commands[st->command]->meta,
sql_script[st->use_file].commands[st->command]->varprefix))
- st->state = CSTATE_END_COMMAND;
+ {
+ /*
+ * outside of pipeline mode: stop reading results.
+ * pipeline mode: continue reading results until an
+ * end-of-pipeline response.
+ */
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ st->state = CSTATE_END_COMMAND;
+ }
else
st->state = CSTATE_ABORTED;
break;
return CSTATE_ABORTED;
}
}
+ else if (command->meta == META_STARTPIPELINE)
+ {
+ /*
+ * In pipeline mode, we use a workflow based on libpq pipeline
+ * functions.
+ */
+ if (querymode == QUERY_SIMPLE)
+ {
+ commandFailed(st, "startpipeline", "cannot use pipeline mode with the simple query protocol");
+ return CSTATE_ABORTED;
+ }
+
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_OFF)
+ {
+ commandFailed(st, "startpipeline", "already in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (PQenterPipelineMode(st->con) == 0)
+ {
+ commandFailed(st, "startpipeline", "failed to enter pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ }
+ else if (command->meta == META_ENDPIPELINE)
+ {
+ if (PQpipelineStatus(st->con) != PQ_PIPELINE_ON)
+ {
+ commandFailed(st, "endpipeline", "not in pipeline mode");
+ return CSTATE_ABORTED;
+ }
+ if (!PQpipelineSync(st->con))
+ {
+ commandFailed(st, "endpipeline", "failed to send a pipeline sync");
+ return CSTATE_ABORTED;
+ }
+ /* Now wait for the PGRES_PIPELINE_SYNC and exit pipeline mode there */
+ /* collect pending results before getting out of pipeline mode */
+ return CSTATE_WAIT_RESULT;
+ }
/*
* executing the expression or shell command might have taken a
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
"missing command", NULL, -1);
}
- else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF)
+ else if (my_command->meta == META_ELSE || my_command->meta == META_ENDIF ||
+ my_command->meta == META_STARTPIPELINE ||
+ my_command->meta == META_ENDPIPELINE)
{
if (my_command->argc != 1)
syntax_error(source, lineno, my_command->first_line, my_command->argv[0],
# filenames are expected to be unique on a test
if (-e $filename)
{
- ok(0, "$filename must not already exists");
+ ok(0, "$filename must not already exist");
unlink $filename or die "cannot unlink $filename: $!";
}
append_to_file($filename, $$files{$fn});
}
});
+# Working \startpipeline
+pgbench(
+ '-t 1 -n -M extended',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline',
+ {
+ '001_pgbench_pipeline' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+ });
+
+# Working \startpipeline in prepared query mode
+pgbench(
+ '-t 1 -n -M prepared',
+ 0,
+ [ qr{type: .*/001_pgbench_pipeline_prep}, qr{actually processed: 1/1} ],
+ [],
+ 'working \startpipeline',
+ {
+ '001_pgbench_pipeline_prep' => q{
+-- test startpipeline
+\startpipeline
+} . "select 1;\n" x 10 . q{
+\endpipeline
+}
+ });
+
+# Try \startpipeline twice
+pgbench(
+ '-t 1 -n -M extended',
+ 2,
+ [],
+ [qr{already in pipeline mode}],
+ 'error: call \startpipeline twice',
+ {
+ '001_pgbench_pipeline_2' => q{
+-- startpipeline twice
+\startpipeline
+\startpipeline
+}
+ });
+
+# Try to end a pipeline that hasn't started
+pgbench(
+ '-t 1 -n -M extended',
+ 2,
+ [],
+ [qr{not in pipeline mode}],
+ 'error: \endpipeline with no start',
+ {
+ '001_pgbench_pipeline_3' => q{
+-- pipeline not started
+\endpipeline
+}
+ });
+
+# Try \gset in pipeline mode
+pgbench(
+ '-t 1 -n -M extended',
+ 2,
+ [],
+ [qr{gset is not allowed in pipeline mode}],
+ 'error: \gset not allowed in pipeline mode',
+ {
+ '001_pgbench_pipeline_4' => q{
+\startpipeline
+select 1 \gset f
+\endpipeline
+}
+ });
+
+
# trigger many expression errors
my @errors = (