Add support for asynchronous execution.
authorEtsuro Fujita <[email protected]>
Wed, 31 Mar 2021 09:45:00 +0000 (18:45 +0900)
committerEtsuro Fujita <[email protected]>
Wed, 31 Mar 2021 09:45:00 +0000 (18:45 +0900)
This implements asynchronous execution, which runs multiple parts of a
non-parallel-aware Append concurrently rather than serially to improve
performance when possible.  Currently, the only node type that can be
run concurrently is a ForeignScan that is an immediate child of such an
Append.  In the case where such ForeignScans access data on different
remote servers, this would run those ForeignScans concurrently, and
overlap the remote operations to be performed simultaneously, so it'll
improve the performance especially when the operations involve
time-consuming ones such as remote join and remote aggregation.

We may extend this to other node types such as joins or aggregates over
ForeignScans in the future.

This also adds the support for postgres_fdw, which is enabled by the
table-level/server-level option "async_capable".  The default is false.

Robert Haas, Kyotaro Horiguchi, Thomas Munro, and myself.  This commit
is mostly based on the patch proposed by Robert Haas, but also uses
stuff from the patch proposed by Kyotaro Horiguchi and from the patch
proposed by Thomas Munro.  Reviewed by Kyotaro Horiguchi, Konstantin
Knizhnik, Andrey Lepikhov, Movead Li, Thomas Munro, Justin Pryzby, and
others.

Discussion: https://postgr.es/m/CA%2BTgmoaXQEt4tZ03FtQhnzeDEMzBck%2BLrni0UWHVVgOTnA6C1w%40mail.gmail.com
Discussion: https://postgr.es/m/CA%2BhUKGLBRyu0rHrDCMC4%3DRn3252gogyp1SjOgG8SEKKZv%3DFwfQ%40mail.gmail.com
Discussion: https://postgr.es/m/20200228.170650.667613673625155850.horikyota.ntt%40gmail.com

39 files changed:
contrib/postgres_fdw/connection.c
contrib/postgres_fdw/expected/postgres_fdw.out
contrib/postgres_fdw/option.c
contrib/postgres_fdw/postgres_fdw.c
contrib/postgres_fdw/postgres_fdw.h
contrib/postgres_fdw/sql/postgres_fdw.sql
doc/src/sgml/config.sgml
doc/src/sgml/fdwhandler.sgml
doc/src/sgml/monitoring.sgml
doc/src/sgml/postgres-fdw.sgml
src/backend/commands/explain.c
src/backend/executor/Makefile
src/backend/executor/README
src/backend/executor/execAmi.c
src/backend/executor/execAsync.c [new file with mode: 0644]
src/backend/executor/nodeAppend.c
src/backend/executor/nodeForeignscan.c
src/backend/nodes/copyfuncs.c
src/backend/nodes/outfuncs.c
src/backend/nodes/readfuncs.c
src/backend/optimizer/path/costsize.c
src/backend/optimizer/plan/createplan.c
src/backend/postmaster/pgstat.c
src/backend/storage/ipc/latch.c
src/backend/utils/misc/guc.c
src/backend/utils/misc/postgresql.conf.sample
src/include/executor/execAsync.h [new file with mode: 0644]
src/include/executor/nodeAppend.h
src/include/executor/nodeForeignscan.h
src/include/foreign/fdwapi.h
src/include/nodes/execnodes.h
src/include/nodes/plannodes.h
src/include/optimizer/cost.h
src/include/pgstat.h
src/include/storage/latch.h
src/test/regress/expected/explain.out
src/test/regress/expected/incremental_sort.out
src/test/regress/expected/insert_conflict.out
src/test/regress/expected/sysviews.out

index ee0b4acf0bad6b717f428610dc038765ded7a305..54ab8edfab6013c97d6de9e72ae2b28261eb606a 100644 (file)
@@ -62,6 +62,7 @@ typedef struct ConnCacheEntry
    Oid         serverid;       /* foreign server OID used to get server name */
    uint32      server_hashvalue;   /* hash value of foreign server OID */
    uint32      mapping_hashvalue;  /* hash value of user mapping OID */
+   PgFdwConnState state;       /* extra per-connection state */
 } ConnCacheEntry;
 
 /*
@@ -115,9 +116,12 @@ static bool disconnect_cached_connections(Oid serverid);
  * will_prep_stmt must be true if caller intends to create any prepared
  * statements.  Since those don't go away automatically at transaction end
  * (not even on error), we need this flag to cue manual cleanup.
+ *
+ * If state is not NULL, *state receives the per-connection state associated
+ * with the PGconn.
  */
 PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
 {
    bool        found;
    bool        retry = false;
@@ -196,6 +200,9 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
     */
    PG_TRY();
    {
+       /* Process a pending asynchronous request if any. */
+       if (entry->state.pendingAreq)
+           process_pending_request(entry->state.pendingAreq);
        /* Start a new transaction or subtransaction if needed. */
        begin_remote_xact(entry);
    }
@@ -264,6 +271,10 @@ GetConnection(UserMapping *user, bool will_prep_stmt)
    /* Remember if caller will prepare statements */
    entry->have_prep_stmt |= will_prep_stmt;
 
+   /* If caller needs access to the per-connection state, return it. */
+   if (state)
+       *state = &entry->state;
+
    return entry->conn;
 }
 
@@ -291,6 +302,7 @@ make_new_connection(ConnCacheEntry *entry, UserMapping *user)
    entry->mapping_hashvalue =
        GetSysCacheHashValue1(USERMAPPINGOID,
                              ObjectIdGetDatum(user->umid));
+   memset(&entry->state, 0, sizeof(entry->state));
 
    /* Now try to make the connection */
    entry->conn = connect_pg_server(server, user);
@@ -648,8 +660,12 @@ GetPrepStmtNumber(PGconn *conn)
  * Caller is responsible for the error handling on the result.
  */
 PGresult *
-pgfdw_exec_query(PGconn *conn, const char *query)
+pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
 {
+   /* First, process a pending asynchronous request, if any. */
+   if (state && state->pendingAreq)
+       process_pending_request(state->pendingAreq);
+
    /*
     * Submit a query.  Since we don't use non-blocking mode, this also can
     * block.  But its risk is relatively small, so we ignore that for now.
@@ -940,6 +956,8 @@ pgfdw_xact_callback(XactEvent event, void *arg)
                    {
                        entry->have_prep_stmt = false;
                        entry->have_error = false;
+                       /* Also reset per-connection state */
+                       memset(&entry->state, 0, sizeof(entry->state));
                    }
 
                    /* Disarm changing_xact_state if it all worked. */
@@ -1172,6 +1190,10 @@ pgfdw_reject_incomplete_xact_state_change(ConnCacheEntry *entry)
  * Cancel the currently-in-progress query (whose query text we do not have)
  * and ignore the result.  Returns true if we successfully cancel the query
  * and discard any pending result, and false if not.
+ *
+ * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
+ * query text from the pendingAreq saved in the per-connection state, then
+ * report the query using it.
  */
 static bool
 pgfdw_cancel_query(PGconn *conn)
index f2c91c4782756cc4a115b3581b537a78b04f1599..f61e59cd200c31448d2a28631287361d6e04c989 100644 (file)
@@ -8946,7 +8946,7 @@ DO $d$
     END;
 $d$;
 ERROR:  invalid option "password"
-HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
+HINT:  Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
 CONTEXT:  SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
 PL/pgSQL function inline_code_block line 3 at EXECUTE
 -- If we add a password for our user mapping instead, we should get a different
@@ -9437,3 +9437,510 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
 
 -- Clean up
 DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+  SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+  SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0))
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0))
+(8 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1000 |   0 | 0000
+ 1100 | 100 | 0100
+ 1200 | 200 | 0200
+ 1300 | 300 | 0300
+ 1400 | 400 | 0400
+ 1500 | 500 | 0500
+ 1600 | 600 | 0600
+ 1700 | 700 | 0700
+ 1800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2000 |   0 | 0000
+ 2100 | 100 | 0100
+ 2200 | 200 | 0200
+ 2300 | 300 | 0300
+ 2400 | 400 | 0400
+ 2500 | 500 | 0500
+ 2600 | 600 | 0600
+ 2700 | 700 | 0700
+ 2800 | 800 | 0800
+ 2900 | 900 | 0900
+(20 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Filter: (async_pt_1.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Filter: (async_pt_2.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(10 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+  SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Filter: (async_pt_1.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Filter: (async_pt_2.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+         ->  Async Foreign Scan on public.async_p3 async_pt_3
+               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+               Filter: (async_pt_3.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(14 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Filter: (async_pt_1.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Filter: (async_pt_2.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+         ->  Seq Scan on public.async_p3 async_pt_3
+               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+               Filter: (async_pt_3.b === 505)
+(13 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+                                                                                           QUERY PLAN                                                                                            
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+   ->  Append
+         ->  Async Foreign Scan
+               Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c
+               Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
+               Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
+         ->  Async Foreign Scan
+               Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c
+               Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
+               Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
+         ->  Hash Join
+               Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c
+               Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
+               ->  Seq Scan on public.async_p3 t2_3
+                     Output: t2_3.a, t2_3.b, t2_3.c
+               ->  Hash
+                     Output: t1_3.a, t1_3.b, t1_3.c
+                     ->  Seq Scan on public.async_p3 t1_3
+                           Output: t1_3.a, t1_3.b, t1_3.c
+                           Filter: ((t1_3.b % 100) = 0)
+(20 rows)
+
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+SELECT * FROM join_tbl ORDER BY a1;
+  a1  | b1  |  c1  |  a2  | b2  |  c2  
+------+-----+------+------+-----+------
+ 1000 |   0 | 0000 | 1000 |   0 | 0000
+ 1100 | 100 | 0100 | 1100 | 100 | 0100
+ 1200 | 200 | 0200 | 1200 | 200 | 0200
+ 1300 | 300 | 0300 | 1300 | 300 | 0300
+ 1400 | 400 | 0400 | 1400 | 400 | 0400
+ 1500 | 500 | 0500 | 1500 | 500 | 0500
+ 1600 | 600 | 0600 | 1600 | 600 | 0600
+ 1700 | 700 | 0700 | 1700 | 700 | 0700
+ 1800 | 800 | 0800 | 1800 | 800 | 0800
+ 1900 | 900 | 0900 | 1900 | 900 | 0900
+ 2000 |   0 | 0000 | 2000 |   0 | 0000
+ 2100 | 100 | 0100 | 2100 | 100 | 0100
+ 2200 | 200 | 0200 | 2200 | 200 | 0200
+ 2300 | 300 | 0300 | 2300 | 300 | 0300
+ 2400 | 400 | 0400 | 2400 | 400 | 0400
+ 2500 | 500 | 0500 | 2500 | 500 | 0500
+ 2600 | 600 | 0600 | 2600 | 600 | 0600
+ 2700 | 700 | 0700 | 2700 | 700 | 0700
+ 2800 | 800 | 0800 | 2800 | 800 | 0800
+ 2900 | 900 | 0900 | 2900 | 900 | 0900
+ 3000 |   0 | 0000 | 3000 |   0 | 0000
+ 3100 | 100 | 0100 | 3100 | 100 | 0100
+ 3200 | 200 | 0200 | 3200 | 200 | 0200
+ 3300 | 300 | 0300 | 3300 | 300 | 0300
+ 3400 | 400 | 0400 | 3400 | 400 | 0400
+ 3500 | 500 | 0500 | 3500 | 500 | 0500
+ 3600 | 600 | 0600 | 3600 | 600 | 0600
+ 3700 | 700 | 0700 | 3700 | 700 | 0700
+ 3800 | 800 | 0800 | 3800 | 800 | 0800
+ 3900 | 900 | 0900 | 3900 | 900 | 0900
+(30 rows)
+
+DELETE FROM join_tbl;
+RESET enable_partitionwise_join;
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+                                 QUERY PLAN                                  
+-----------------------------------------------------------------------------
+ Append
+   ->  Async Foreign Scan on public.async_p1 async_pt_1
+         Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
+   ->  Async Foreign Scan on public.async_p2 async_pt_2
+         Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+         Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
+(7 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+                              QUERY PLAN                               
+-----------------------------------------------------------------------
+ Foreign Scan on public.async_p1 async_pt
+   Output: async_pt.a, async_pt.b, async_pt.c
+   Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
+(3 rows)
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+  INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+                                        QUERY PLAN                                        
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         Subplans Removed: 1
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Filter: (async_pt_1.b === $2)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Filter: (async_pt_2.b === $2)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer))
+(11 rows)
+
+EXECUTE async_pt_query (3000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+                                        QUERY PLAN                                        
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+   ->  Append
+         Subplans Removed: 2
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Filter: (async_pt_1.b === $2)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+(7 rows)
+
+EXECUTE async_pt_query (2000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+  a   |  b  |  c   
+------+-----+------
+ 1505 | 505 | 0505
+(1 row)
+
+DELETE FROM result_tbl;
+RESET plan_cache_mode;
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+                                        QUERY PLAN                                        
+------------------------------------------------------------------------------------------
+ Nested Loop
+   Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c
+   ->  Seq Scan on public.local_tbl
+         Output: local_tbl.a, local_tbl.b, local_tbl.c
+         Filter: (local_tbl.c = 'bar'::text)
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 async_pt_1
+               Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a))
+         ->  Async Foreign Scan on public.async_p2 async_pt_2
+               Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a))
+         ->  Seq Scan on public.async_p3 async_pt_3
+               Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+               Filter: (local_tbl.a = async_pt_3.a)
+(15 rows)
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+                                  QUERY PLAN                                   
+-------------------------------------------------------------------------------
+ Nested Loop (actual rows=1 loops=1)
+   ->  Seq Scan on local_tbl (actual rows=1 loops=1)
+         Filter: (c = 'bar'::text)
+         Rows Removed by Filter: 1
+   ->  Append (actual rows=1 loops=1)
+         ->  Async Foreign Scan on async_p1 async_pt_1 (never executed)
+         ->  Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1)
+         ->  Seq Scan on async_p3 async_pt_3 (never executed)
+               Filter: (local_tbl.a = a)
+(9 rows)
+
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+  a   |  b  |  c  |  a   |  b  |  c   
+------+-----+-----+------+-----+------
+ 2505 | 505 | bar | 2505 | 505 | 0505
+(1 row)
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Nested Loop
+   Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
+   Join Filter: (t1.a = t2.a)
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 t1_1
+               Output: t1_1.a, t1_1.b, t1_1.c
+               Filter: (t1_1.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1
+         ->  Async Foreign Scan on public.async_p2 t1_2
+               Output: t1_2.a, t1_2.b, t1_2.c
+               Filter: (t1_2.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+         ->  Seq Scan on public.async_p3 t1_3
+               Output: t1_3.a, t1_3.b, t1_3.c
+               Filter: (t1_3.b === 505)
+   ->  Materialize
+         Output: t2.a, t2.b, t2.c
+         ->  Foreign Scan on public.async_p2 t2
+               Output: t2.a, t2.b, t2.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(20 rows)
+
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+  a   |  b  |  c   |  a   |  b  |  c   
+------+-----+------+------+-----+------
+ 2505 | 505 | 0505 | 2505 | 505 | 0505
+(1 row)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+                           QUERY PLAN                           
+----------------------------------------------------------------
+ Limit
+   Output: t1.a, t1.b, t1.c
+   ->  Append
+         ->  Async Foreign Scan on public.async_p1 t1_1
+               Output: t1_1.a, t1_1.b, t1_1.c
+               Filter: (t1_1.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl1
+         ->  Async Foreign Scan on public.async_p2 t1_2
+               Output: t1_2.a, t1_2.b, t1_2.c
+               Filter: (t1_2.b === 505)
+               Remote SQL: SELECT a, b, c FROM public.base_tbl2
+         ->  Seq Scan on public.async_p3 t1_3
+               Output: t1_3.a, t1_3.b, t1_3.c
+               Filter: (t1_3.b === 505)
+(14 rows)
+
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+  a   |  b  |  c   
+------+-----+------
+ 3505 | 505 | 0505
+(1 row)
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+  SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+  SERVER loopback OPTIONS (table_name 'base_tbl4');
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+                               QUERY PLAN                                
+-------------------------------------------------------------------------
+ Insert on public.insert_tbl
+   Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3)
+   Batch Size: 1
+   ->  Append
+         ->  Seq Scan on public.local_tbl
+               Output: local_tbl.a, local_tbl.b, local_tbl.c
+         ->  Async Foreign Scan on public.remote_tbl
+               Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+               Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(9 rows)
+
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+SELECT * FROM insert_tbl ORDER BY a;
+  a   |  b  |  c  
+------+-----+-----
+ 1505 | 505 | foo
+ 2505 | 505 | bar
+(2 rows)
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+                                       QUERY PLAN                                       
+----------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+   CTE t
+     ->  Update on public.remote_tbl
+           Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+           ->  Foreign Update on public.remote_tbl
+                 Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c
+   ->  Nested Loop Left Join
+         Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c
+         Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b))
+         ->  Append
+               ->  Async Foreign Scan on public.async_p1 async_pt_1
+                     Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+                     Filter: (async_pt_1.b === 505)
+                     Remote SQL: SELECT a, b, c FROM public.base_tbl1
+               ->  Async Foreign Scan on public.async_p2 async_pt_2
+                     Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+                     Filter: (async_pt_2.b === 505)
+                     Remote SQL: SELECT a, b, c FROM public.base_tbl2
+               ->  Seq Scan on public.async_p3 async_pt_3
+                     Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+                     Filter: (async_pt_3.b === 505)
+         ->  CTE Scan on t
+               Output: t.a, t.b, t.c
+(23 rows)
+
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+SELECT * FROM join_tbl ORDER BY a1;
+  a1  | b1  |  c1  |  a2  | b2  |   c2   
+------+-----+------+------+-----+--------
+ 1505 | 505 | 0505 |      |     | 
+ 2505 | 505 | 0505 | 2505 | 505 | barbar
+ 3505 | 505 | 0505 |      |     | 
+(3 rows)
+
+DELETE FROM join_tbl;
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);
index 64698c4da3a50834318dad45c0672f30ea50eddc..530d7a66d408557f96f408b4010ec9bc238d2a89 100644 (file)
@@ -107,7 +107,8 @@ postgres_fdw_validator(PG_FUNCTION_ARGS)
         * Validate option value, when we can do so without any context.
         */
        if (strcmp(def->defname, "use_remote_estimate") == 0 ||
-           strcmp(def->defname, "updatable") == 0)
+           strcmp(def->defname, "updatable") == 0 ||
+           strcmp(def->defname, "async_capable") == 0)
        {
            /* these accept only boolean values */
            (void) defGetBoolean(def);
@@ -217,6 +218,9 @@ InitPgFdwOptions(void)
        /* batch_size is available on both server and table */
        {"batch_size", ForeignServerRelationId, false},
        {"batch_size", ForeignTableRelationId, false},
+       /* async_capable is available on both server and table */
+       {"async_capable", ForeignServerRelationId, false},
+       {"async_capable", ForeignTableRelationId, false},
        {"password_required", UserMappingRelationId, false},
 
        /*
index 20b25935ce68c4431a1a6704348c0ca4c0f7b3cf..cc73a6902f5c12781df5f7346bab93cf8bd574ad 100644 (file)
@@ -21,6 +21,7 @@
 #include "commands/defrem.h"
 #include "commands/explain.h"
 #include "commands/vacuum.h"
+#include "executor/execAsync.h"
 #include "foreign/fdwapi.h"
 #include "funcapi.h"
 #include "miscadmin.h"
@@ -37,6 +38,7 @@
 #include "optimizer/tlist.h"
 #include "parser/parsetree.h"
 #include "postgres_fdw.h"
+#include "storage/latch.h"
 #include "utils/builtins.h"
 #include "utils/float.h"
 #include "utils/guc.h"
@@ -143,6 +145,7 @@ typedef struct PgFdwScanState
 
    /* for remote query execution */
    PGconn     *conn;           /* connection for the scan */
+   PgFdwConnState *conn_state; /* extra per-connection state */
    unsigned int cursor_number; /* quasi-unique ID for my cursor */
    bool        cursor_exists;  /* have we created the cursor? */
    int         numParams;      /* number of parameters passed to query */
@@ -159,6 +162,9 @@ typedef struct PgFdwScanState
    int         fetch_ct_2;     /* Min(# of fetches done, 2) */
    bool        eof_reached;    /* true if last fetch reached EOF */
 
+   /* for asynchronous execution */
+   bool        async_capable;  /* engage asynchronous-capable logic? */
+
    /* working memory contexts */
    MemoryContext batch_cxt;    /* context holding current batch of tuples */
    MemoryContext temp_cxt;     /* context for per-tuple temporary data */
@@ -176,6 +182,7 @@ typedef struct PgFdwModifyState
 
    /* for remote query execution */
    PGconn     *conn;           /* connection for the scan */
+   PgFdwConnState *conn_state; /* extra per-connection state */
    char       *p_name;         /* name of prepared statement, if created */
 
    /* extracted fdw_private data */
@@ -219,6 +226,7 @@ typedef struct PgFdwDirectModifyState
 
    /* for remote query execution */
    PGconn     *conn;           /* connection for the update */
+   PgFdwConnState *conn_state; /* extra per-connection state */
    int         numParams;      /* number of parameters passed to query */
    FmgrInfo   *param_flinfo;   /* output conversion functions for them */
    List       *param_exprs;    /* executable expressions for param values */
@@ -408,6 +416,10 @@ static void postgresGetForeignUpperPaths(PlannerInfo *root,
                                         RelOptInfo *input_rel,
                                         RelOptInfo *output_rel,
                                         void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
 
 /*
  * Helper functions
@@ -437,7 +449,8 @@ static bool ec_member_matches_foreign(PlannerInfo *root, RelOptInfo *rel,
                                      void *arg);
 static void create_cursor(ForeignScanState *node);
 static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void close_cursor(PGconn *conn, unsigned int cursor_number,
+                        PgFdwConnState *conn_state);
 static PgFdwModifyState *create_foreign_modify(EState *estate,
                                               RangeTblEntry *rte,
                                               ResultRelInfo *resultRelInfo,
@@ -491,6 +504,8 @@ static int  postgresAcquireSampleRowsFunc(Relation relation, int elevel,
                                          double *totaldeadrows);
 static void analyze_row_processor(PGresult *res, int row,
                                  PgFdwAnalyzeState *astate);
+static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
+static void fetch_more_data_begin(AsyncRequest *areq);
 static HeapTuple make_tuple_from_result_row(PGresult *res,
                                            int row,
                                            Relation rel,
@@ -583,6 +598,12 @@ postgres_fdw_handler(PG_FUNCTION_ARGS)
    /* Support functions for upper relation push-down */
    routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
 
+   /* Support functions for asynchronous execution */
+   routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+   routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+   routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+   routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
    PG_RETURN_POINTER(routine);
 }
 
@@ -618,14 +639,15 @@ postgresGetForeignRelSize(PlannerInfo *root,
 
    /*
     * Extract user-settable option values.  Note that per-table settings of
-    * use_remote_estimate and fetch_size override per-server settings of
-    * them, respectively.
+    * use_remote_estimate, fetch_size and async_capable override per-server
+    * settings of them, respectively.
     */
    fpinfo->use_remote_estimate = false;
    fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
    fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
    fpinfo->shippable_extensions = NIL;
    fpinfo->fetch_size = 100;
+   fpinfo->async_capable = false;
 
    apply_server_options(fpinfo);
    apply_table_options(fpinfo);
@@ -1459,7 +1481,7 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
     * Get connection to the foreign server.  Connection manager will
     * establish new connection if necessary.
     */
-   fsstate->conn = GetConnection(user, false);
+   fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
 
    /* Assign a unique ID for my cursor */
    fsstate->cursor_number = GetCursorNumber(fsstate->conn);
@@ -1510,6 +1532,9 @@ postgresBeginForeignScan(ForeignScanState *node, int eflags)
                             &fsstate->param_flinfo,
                             &fsstate->param_exprs,
                             &fsstate->param_values);
+
+   /* Set the async-capable flag */
+   fsstate->async_capable = node->ss.ps.plan->async_capable;
 }
 
 /*
@@ -1524,8 +1549,10 @@ postgresIterateForeignScan(ForeignScanState *node)
    TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
 
    /*
-    * If this is the first call after Begin or ReScan, we need to create the
-    * cursor on the remote side.
+    * In sync mode, if this is the first call after Begin or ReScan, we need
+    * to create the cursor on the remote side.  In async mode, we would have
+    * already created the cursor before we get here, even if this is the
+    * first call after Begin or ReScan.
     */
    if (!fsstate->cursor_exists)
        create_cursor(node);
@@ -1535,6 +1562,9 @@ postgresIterateForeignScan(ForeignScanState *node)
     */
    if (fsstate->next_tuple >= fsstate->num_tuples)
    {
+       /* In async mode, just clear tuple slot. */
+       if (fsstate->async_capable)
+           return ExecClearTuple(slot);
        /* No point in another fetch if we already detected EOF, though. */
        if (!fsstate->eof_reached)
            fetch_more_data(node);
@@ -1596,7 +1626,7 @@ postgresReScanForeignScan(ForeignScanState *node)
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = pgfdw_exec_query(fsstate->conn, sql);
+   res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
    PQclear(res);
@@ -1624,7 +1654,8 @@ postgresEndForeignScan(ForeignScanState *node)
 
    /* Close the cursor if open, to prevent accumulation of cursors */
    if (fsstate->cursor_exists)
-       close_cursor(fsstate->conn, fsstate->cursor_number);
+       close_cursor(fsstate->conn, fsstate->cursor_number,
+                    fsstate->conn_state);
 
    /* Release remote connection */
    ReleaseConnection(fsstate->conn);
@@ -2501,7 +2532,7 @@ postgresBeginDirectModify(ForeignScanState *node, int eflags)
     * Get connection to the foreign server.  Connection manager will
     * establish new connection if necessary.
     */
-   dmstate->conn = GetConnection(user, false);
+   dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
 
    /* Update the foreign-join-related fields. */
    if (fsplan->scan.scanrelid == 0)
@@ -2882,7 +2913,7 @@ estimate_path_cost_size(PlannerInfo *root,
                                false, &retrieved_attrs, NULL);
 
        /* Get the remote estimate */
-       conn = GetConnection(fpinfo->user, false);
+       conn = GetConnection(fpinfo->user, false, NULL);
        get_remote_estimate(sql.data, conn, &rows, &width,
                            &startup_cost, &total_cost);
        ReleaseConnection(conn);
@@ -3328,7 +3359,7 @@ get_remote_estimate(const char *sql, PGconn *conn,
        /*
         * Execute EXPLAIN remotely.
         */
-       res = pgfdw_exec_query(conn, sql);
+       res = pgfdw_exec_query(conn, sql, NULL);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql);
 
@@ -3452,6 +3483,10 @@ create_cursor(ForeignScanState *node)
    StringInfoData buf;
    PGresult   *res;
 
+   /* First, process a pending asynchronous request, if any. */
+   if (fsstate->conn_state->pendingAreq)
+       process_pending_request(fsstate->conn_state->pendingAreq);
+
    /*
     * Construct array of query parameter values in text format.  We do the
     * conversions in the short-lived per-tuple context, so as not to cause a
@@ -3532,17 +3567,38 @@ fetch_more_data(ForeignScanState *node)
    PG_TRY();
    {
        PGconn     *conn = fsstate->conn;
-       char        sql[64];
        int         numrows;
        int         i;
 
-       snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
-                fsstate->fetch_size, fsstate->cursor_number);
+       if (fsstate->async_capable)
+       {
+           Assert(fsstate->conn_state->pendingAreq);
 
-       res = pgfdw_exec_query(conn, sql);
-       /* On error, report the original query, not the FETCH. */
-       if (PQresultStatus(res) != PGRES_TUPLES_OK)
-           pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+           /*
+            * The query was already sent by an earlier call to
+            * fetch_more_data_begin.  So now we just fetch the result.
+            */
+           res = pgfdw_get_result(conn, fsstate->query);
+           /* On error, report the original query, not the FETCH. */
+           if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+
+           /* Reset per-connection state */
+           fsstate->conn_state->pendingAreq = NULL;
+       }
+       else
+       {
+           char        sql[64];
+
+           /* This is a regular synchronous fetch. */
+           snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+                    fsstate->fetch_size, fsstate->cursor_number);
+
+           res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+           /* On error, report the original query, not the FETCH. */
+           if (PQresultStatus(res) != PGRES_TUPLES_OK)
+               pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+       }
 
        /* Convert the data into HeapTuples */
        numrows = PQntuples(res);
@@ -3634,7 +3690,8 @@ reset_transmission_modes(int nestlevel)
  * Utility routine to close a cursor.
  */
 static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PGconn *conn, unsigned int cursor_number,
+            PgFdwConnState *conn_state)
 {
    char        sql[64];
    PGresult   *res;
@@ -3645,7 +3702,7 @@ close_cursor(PGconn *conn, unsigned int cursor_number)
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = pgfdw_exec_query(conn, sql);
+   res = pgfdw_exec_query(conn, sql, conn_state);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, conn, true, sql);
    PQclear(res);
@@ -3694,7 +3751,7 @@ create_foreign_modify(EState *estate,
    user = GetUserMapping(userid, table->serverid);
 
    /* Open connection; report that we'll create a prepared statement. */
-   fmstate->conn = GetConnection(user, true);
+   fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
    fmstate->p_name = NULL;     /* prepared statement not made yet */
 
    /* Set up remote query information. */
@@ -3793,6 +3850,10 @@ execute_foreign_modify(EState *estate,
           operation == CMD_UPDATE ||
           operation == CMD_DELETE);
 
+   /* First, process a pending asynchronous request, if any. */
+   if (fmstate->conn_state->pendingAreq)
+       process_pending_request(fmstate->conn_state->pendingAreq);
+
    /*
     * If the existing query was deparsed and prepared for a different number
     * of rows, rebuild it for the proper number.
@@ -3894,6 +3955,11 @@ prepare_foreign_modify(PgFdwModifyState *fmstate)
    char       *p_name;
    PGresult   *res;
 
+   /*
+    * The caller would already have processed a pending asynchronous request
+    * if any, so no need to do it here.
+    */
+
    /* Construct name we'll use for the prepared statement. */
    snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
             GetPrepStmtNumber(fmstate->conn));
@@ -4079,7 +4145,7 @@ deallocate_query(PgFdwModifyState *fmstate)
     * We don't use a PG_TRY block here, so be careful not to throw error
     * without releasing the PGresult.
     */
-   res = pgfdw_exec_query(fmstate->conn, sql);
+   res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
    if (PQresultStatus(res) != PGRES_COMMAND_OK)
        pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
    PQclear(res);
@@ -4227,6 +4293,10 @@ execute_dml_stmt(ForeignScanState *node)
    int         numParams = dmstate->numParams;
    const char **values = dmstate->param_values;
 
+   /* First, process a pending asynchronous request, if any. */
+   if (dmstate->conn_state->pendingAreq)
+       process_pending_request(dmstate->conn_state->pendingAreq);
+
    /*
     * Construct array of query parameter values in text format.
     */
@@ -4628,7 +4698,7 @@ postgresAnalyzeForeignTable(Relation relation,
     */
    table = GetForeignTable(RelationGetRelid(relation));
    user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-   conn = GetConnection(user, false);
+   conn = GetConnection(user, false, NULL);
 
    /*
     * Construct command to get page count for relation.
@@ -4639,7 +4709,7 @@ postgresAnalyzeForeignTable(Relation relation,
    /* In what follows, do not risk leaking any PGresults. */
    PG_TRY();
    {
-       res = pgfdw_exec_query(conn, sql.data);
+       res = pgfdw_exec_query(conn, sql.data, NULL);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql.data);
 
@@ -4714,7 +4784,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
    table = GetForeignTable(RelationGetRelid(relation));
    server = GetForeignServer(table->serverid);
    user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
-   conn = GetConnection(user, false);
+   conn = GetConnection(user, false, NULL);
 
    /*
     * Construct cursor that retrieves whole rows from remote.
@@ -4731,7 +4801,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
        int         fetch_size;
        ListCell   *lc;
 
-       res = pgfdw_exec_query(conn, sql.data);
+       res = pgfdw_exec_query(conn, sql.data, NULL);
        if (PQresultStatus(res) != PGRES_COMMAND_OK)
            pgfdw_report_error(ERROR, res, conn, false, sql.data);
        PQclear(res);
@@ -4783,7 +4853,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
             */
 
            /* Fetch some rows */
-           res = pgfdw_exec_query(conn, fetch_sql);
+           res = pgfdw_exec_query(conn, fetch_sql, NULL);
            /* On error, report the original query, not the FETCH. */
            if (PQresultStatus(res) != PGRES_TUPLES_OK)
                pgfdw_report_error(ERROR, res, conn, false, sql.data);
@@ -4802,7 +4872,7 @@ postgresAcquireSampleRowsFunc(Relation relation, int elevel,
        }
 
        /* Close the cursor, just to be tidy. */
-       close_cursor(conn, cursor_number);
+       close_cursor(conn, cursor_number, NULL);
    }
    PG_CATCH();
    {
@@ -4942,7 +5012,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
     */
    server = GetForeignServer(serverOid);
    mapping = GetUserMapping(GetUserId(), server->serverid);
-   conn = GetConnection(mapping, false);
+   conn = GetConnection(mapping, false, NULL);
 
    /* Don't attempt to import collation if remote server hasn't got it */
    if (PQserverVersion(conn) < 90100)
@@ -4958,7 +5028,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
        appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
        deparseStringLiteral(&buf, stmt->remote_schema);
 
-       res = pgfdw_exec_query(conn, buf.data);
+       res = pgfdw_exec_query(conn, buf.data, NULL);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -5070,7 +5140,7 @@ postgresImportForeignSchema(ImportForeignSchemaStmt *stmt, Oid serverOid)
        appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
 
        /* Fetch the data */
-       res = pgfdw_exec_query(conn, buf.data);
+       res = pgfdw_exec_query(conn, buf.data, NULL);
        if (PQresultStatus(res) != PGRES_TUPLES_OK)
            pgfdw_report_error(ERROR, res, conn, false, buf.data);
 
@@ -5530,6 +5600,8 @@ apply_server_options(PgFdwRelationInfo *fpinfo)
                ExtractExtensionList(defGetString(def), false);
        else if (strcmp(def->defname, "fetch_size") == 0)
            fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+       else if (strcmp(def->defname, "async_capable") == 0)
+           fpinfo->async_capable = defGetBoolean(def);
    }
 }
 
@@ -5551,6 +5623,8 @@ apply_table_options(PgFdwRelationInfo *fpinfo)
            fpinfo->use_remote_estimate = defGetBoolean(def);
        else if (strcmp(def->defname, "fetch_size") == 0)
            fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+       else if (strcmp(def->defname, "async_capable") == 0)
+           fpinfo->async_capable = defGetBoolean(def);
    }
 }
 
@@ -5585,6 +5659,7 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
    fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
    fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
    fpinfo->fetch_size = fpinfo_o->fetch_size;
+   fpinfo->async_capable = fpinfo_o->async_capable;
 
    /* Merge the table level options from either side of the join. */
    if (fpinfo_i)
@@ -5606,6 +5681,13 @@ merge_fdw_options(PgFdwRelationInfo *fpinfo,
         * relation sizes.
         */
        fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+       /*
+        * We'll prefer to consider this join async-capable if any table from
+        * either side of the join is considered async-capable.
+        */
+       fpinfo->async_capable = fpinfo_o->async_capable ||
+           fpinfo_i->async_capable;
    }
 }
 
@@ -6489,6 +6571,236 @@ add_foreign_final_paths(PlannerInfo *root, RelOptInfo *input_rel,
    add_path(final_rel, (Path *) final_path);
 }
 
+/*
+ * postgresIsForeignPathAsyncCapable
+ *     Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+   RelOptInfo *rel = ((Path *) path)->parent;
+   PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
+
+   return fpinfo->async_capable;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ *     Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+   produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ *     Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+   AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+   AppendState *requestor = (AppendState *) areq->requestor;
+   WaitEventSet *set = requestor->as_eventset;
+
+   /* This should not be called unless callback_pending */
+   Assert(areq->callback_pending);
+
+   /* The core code would have registered postmaster death event */
+   Assert(GetNumRegisteredWaitEvents(set) >= 1);
+
+   /* Begin an asynchronous data fetch if not already done */
+   if (!pendingAreq)
+       fetch_more_data_begin(areq);
+   else if (pendingAreq->requestor != areq->requestor)
+   {
+       /*
+        * This is the case when the in-process request was made by another
+        * Append.  Note that it might be useless to process the request,
+        * because the query might not need tuples from that Append anymore.
+        * Skip the given request if there are any configured events other
+        * than the postmaster death event; otherwise process the request,
+        * then begin a fetch to configure the event below, because otherwise
+        * we might end up with no configured events other than the postmaster
+        * death event.
+        */
+       if (GetNumRegisteredWaitEvents(set) > 1)
+           return;
+       process_pending_request(pendingAreq);
+       fetch_more_data_begin(areq);
+   }
+   else if (pendingAreq->requestee != areq->requestee)
+   {
+       /*
+        * This is the case when the in-process request was made by the same
+        * parent but for a different child.  Since we configure only the
+        * event for the request made for that child, skip the given request.
+        */
+       return;
+   }
+   else
+       Assert(pendingAreq == areq);
+
+   AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+                     NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ *     Fetch some more tuples from a file descriptor that becomes ready,
+ *     requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+   /* The request should be currently in-process */
+   Assert(fsstate->conn_state->pendingAreq == areq);
+
+   /* The core code would have initialized the callback_pending flag */
+   Assert(!areq->callback_pending);
+
+   /* On error, report the original query, not the FETCH. */
+   if (!PQconsumeInput(fsstate->conn))
+       pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+   fetch_more_data(node);
+
+   produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * Asynchronously produce next tuple from a foreign PostgreSQL table.
+ */
+static void
+produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+   AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+   TupleTableSlot *result;
+
+   /* This should not be called if the request is currently in-process */
+   Assert(areq != pendingAreq);
+
+   /* Fetch some more tuples, if we've run out */
+   if (fsstate->next_tuple >= fsstate->num_tuples)
+   {
+       /* No point in another fetch if we already detected EOF, though */
+       if (!fsstate->eof_reached)
+       {
+           /* Mark the request as pending for a callback */
+           ExecAsyncRequestPending(areq);
+           /* Begin another fetch if requested and if no pending request */
+           if (fetch && !pendingAreq)
+               fetch_more_data_begin(areq);
+       }
+       else
+       {
+           /* There's nothing more to do; just return a NULL pointer */
+           result = NULL;
+           /* Mark the request as complete */
+           ExecAsyncRequestDone(areq, result);
+       }
+       return;
+   }
+
+   /* Get a tuple from the ForeignScan node */
+   result = ExecProcNode((PlanState *) node);
+   if (!TupIsNull(result))
+   {
+       /* Mark the request as complete */
+       ExecAsyncRequestDone(areq, result);
+       return;
+   }
+   Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+   /* Fetch some more tuples, if we've not detected EOF yet */
+   if (!fsstate->eof_reached)
+   {
+       /* Mark the request as pending for a callback */
+       ExecAsyncRequestPending(areq);
+       /* Begin another fetch if requested and if no pending request */
+       if (fetch && !pendingAreq)
+           fetch_more_data_begin(areq);
+   }
+   else
+   {
+       /* There's nothing more to do; just return a NULL pointer */
+       result = NULL;
+       /* Mark the request as complete */
+       ExecAsyncRequestDone(areq, result);
+   }
+}
+
+/*
+ * Begin an asynchronous data fetch.
+ *
+ * Note: fetch_more_data must be called to fetch the result.
+ */
+static void
+fetch_more_data_begin(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+   char        sql[64];
+
+   Assert(!fsstate->conn_state->pendingAreq);
+
+   /* Create the cursor synchronously. */
+   if (!fsstate->cursor_exists)
+       create_cursor(node);
+
+   /* We will send this query, but not wait for the response. */
+   snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+            fsstate->fetch_size, fsstate->cursor_number);
+
+   if (PQsendQuery(fsstate->conn, sql) < 0)
+       pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+   /* Remember that the request is in process */
+   fsstate->conn_state->pendingAreq = areq;
+}
+
+/*
+ * Process a pending asynchronous request.
+ */
+void
+process_pending_request(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+   EState     *estate = node->ss.ps.state;
+   MemoryContext oldcontext;
+
+   /* The request should be currently in-process */
+   Assert(fsstate->conn_state->pendingAreq == areq);
+
+   oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+   /* The request would have been pending for a callback */
+   Assert(areq->callback_pending);
+
+   /* Unlike AsyncNotify, we unset callback_pending ourselves */
+   areq->callback_pending = false;
+
+   fetch_more_data(node);
+
+   /* We need to send a new query afterwards; don't fetch */
+   produce_tuple_asynchronously(areq, false);
+
+   /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+   ExecAsyncResponse(areq);
+
+   MemoryContextSwitchTo(oldcontext);
+}
+
 /*
  * Create a tuple from the specified row of the PGresult.
  *
index 1f67b4d9fd27f3c374ceaa861f9b4a0e4d49195a..88d94da6f6bdcf1774356c6af757ccaa1f14ecbe 100644 (file)
@@ -16,6 +16,7 @@
 #include "foreign/foreign.h"
 #include "lib/stringinfo.h"
 #include "libpq-fe.h"
+#include "nodes/execnodes.h"
 #include "nodes/pathnodes.h"
 #include "utils/relcache.h"
 
@@ -78,6 +79,7 @@ typedef struct PgFdwRelationInfo
    Cost        fdw_startup_cost;
    Cost        fdw_tuple_cost;
    List       *shippable_extensions;   /* OIDs of shippable extensions */
+   bool        async_capable;
 
    /* Cached catalog information. */
    ForeignTable *table;
@@ -124,17 +126,28 @@ typedef struct PgFdwRelationInfo
    int         relation_index;
 } PgFdwRelationInfo;
 
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+   AsyncRequest *pendingAreq;  /* pending async request */
+} PgFdwConnState;
+
 /* in postgres_fdw.c */
 extern int set_transmission_modes(void);
 extern void reset_transmission_modes(int nestlevel);
+extern void process_pending_request(AsyncRequest *areq);
 
 /* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+                            PgFdwConnState **state);
 extern void ReleaseConnection(PGconn *conn);
 extern unsigned int GetCursorNumber(PGconn *conn);
 extern unsigned int GetPrepStmtNumber(PGconn *conn);
 extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
-extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
+                                 PgFdwConnState *state);
 extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
                               bool clear, const char *sql);
 
index e9b30517a514317ec88a76666fd2d6853c46d0d1..806a5bca28c4088361b33e4b28ba34335cf2a48d 100644 (file)
@@ -2928,3 +2928,198 @@ SELECT tableoid::regclass, * FROM batch_cp_upd_test;
 
 -- Clean up
 DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+  SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+  SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+  SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_partitionwise_join;
+
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+
+PREPARE async_pt_query (int, int) AS
+  INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+EXECUTE async_pt_query (3000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+EXECUTE async_pt_query (2000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+RESET plan_cache_mode;
+
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+  SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+  SERVER loopback OPTIONS (table_name 'base_tbl4');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+
+SELECT * FROM insert_tbl ORDER BY a;
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);
index ddc6d789d8aeeb1d8f35cc4f976a29d337a2daa6..701cb65cc7b150df1131b6b2a4805d7252138932 100644 (file)
@@ -4787,6 +4787,20 @@ ANY <replaceable class="parameter">num_sync</replaceable> ( <replaceable class="
       </para>
 
      <variablelist>
+     <varlistentry id="guc-enable-async-append" xreflabel="enable_async_append">
+      <term><varname>enable_async_append</varname> (<type>boolean</type>)
+      <indexterm>
+       <primary><varname>enable_async_append</varname> configuration parameter</primary>
+      </indexterm>
+      </term>
+      <listitem>
+       <para>
+        Enables or disables the query planner's use of async-aware
+        append plan types. The default is <literal>on</literal>.
+       </para>
+      </listitem>
+     </varlistentry>
+
      <varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
       <term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
       <indexterm>
index 04bc052ee837539432ab7839dd4fa6478dbc7699..635c9ec559a604001154900c38df595f7258fc96 100644 (file)
@@ -1483,6 +1483,96 @@ ShutdownForeignScan(ForeignScanState *node);
    </para>
    </sect2>
 
+   <sect2 id="fdw-callbacks-async">
+    <title>FDW Routines for Asynchronous Execution</title>
+    <para>
+     A <structname>ForeignScan</structname> node can, optionally, support
+     asynchronous execution as described in
+     <filename>src/backend/executor/README</filename>.  The following
+     functions are all optional, but are all required if asynchronous
+     execution is to be supported.
+    </para>
+
+    <para>
+<programlisting>
+bool
+IsForeignPathAsyncCapable(ForeignPath *path);
+</programlisting>
+     Test whether a given <structname>ForeignPath</structname> path can scan
+     the underlying foreign relation asynchronously.
+     This function will only be called at the end of query planning when the
+     given path is a direct child of an <structname>AppendPath</structname>
+     path and when the planner believes that asynchronous execution improves
+     performance, and should return true if the given path is able to scan the
+     foreign relation asynchronously.
+    </para>
+
+    <para>
+     If this function is not defined, it is assumed that the given path scans
+     the foreign relation using <function>IterateForeignScan</function>.
+     (This implies that the callback functions described below will never be
+     called, so they need not be provided either.)
+    </para>
+
+    <para>
+<programlisting>
+void
+ForeignAsyncRequest(AsyncRequest *areq);
+</programlisting>
+     Produce one tuple asynchronously from the
+     <structname>ForeignScan</structname> node.  <literal>areq</literal> is
+     the <structname>AsyncRequest</structname> struct describing the
+     <structname>ForeignScan</structname> node and the parent
+     <structname>Append</structname> node that requested the tuple from it.
+     This function should store the tuple into the slot specified by
+     <literal>areq-&gt;result</literal>, and set
+     <literal>areq-&gt;request_complete</literal> to <literal>true</literal>;
+     or if it needs to wait on an event external to the core server such as
+     network I/O, and cannot produce any tuple immediately, set the flag to
+     <literal>false</literal>, and set
+     <literal>areq-&gt;callback_pending</literal> to <literal>true</literal>
+     for the <structname>ForeignScan</structname> node to get a callback from
+     the callback functions described below.  If no more tuples are available,
+     set the slot to NULL, and the
+     <literal>areq-&gt;request_complete</literal> flag to
+     <literal>true</literal>.  It's recommended to use
+     <function>ExecAsyncRequestDone</function> or
+     <function>ExecAsyncRequestPending</function> to set the output parameters
+     in the <literal>areq</literal>.
+    </para>
+
+    <para>
+<programlisting>
+void
+ForeignAsyncConfigureWait(AsyncRequest *areq);
+</programlisting>
+     Configure a file descriptor event for which the 
+     <structname>ForeignScan</structname> node wishes to wait.
+     This function will only be called when the
+     <structname>ForeignScan</structname> node has the
+     <literal>areq-&gt;callback_pending</literal> flag set, and should add
+     the event to the <structfield>as_eventset</structfield> of the parent
+     <structname>Append</structname> node described by the
+     <literal>areq</literal>.  See the comments for
+     <function>ExecAsyncConfigureWait</function> in
+     <filename>src/backend/executor/execAsync.c</filename> for additional
+     information.  When the file descriptor event occurs,
+     <function>ForeignAsyncNotify</function> will be called.
+    </para>
+
+    <para>
+<programlisting>
+void
+ForeignAsyncNotify(AsyncRequest *areq);
+</programlisting>
+     Process a relevant event that has occurred, then produce one tuple
+     asynchronously from the <structname>ForeignScan</structname> node.
+     This function should set the output parameters in the
+     <literal>areq</literal> in the same way as
+     <function>ForeignAsyncRequest</function>.
+    </para>
+   </sect2>
+
    <sect2 id="fdw-callbacks-reparameterize-paths">
     <title>FDW Routines for Reparameterization of Paths</title>
 
index 43c07da20ea9556e9c6e5580d1f98b93ebd731f5..af540fb02f2f12733c5c861e10a541cf739baa93 100644 (file)
@@ -1564,6 +1564,11 @@ postgres   27093  0.0  0.0  30096  2752 ?        Ss   11:34   0:00 postgres: ser
     </thead>
 
     <tbody>
+     <row>
+      <entry><literal>AppendReady</literal></entry>
+      <entry>Waiting for subplan nodes of an <literal>Append</literal> plan
+       node to be ready.</entry>
+     </row>
      <row>
       <entry><literal>BackupWaitWalArchive</literal></entry>
       <entry>Waiting for WAL files required for a backup to be successfully
index 07aa25799daf17133113d490abb02ccfc8369a83..a1b426c50ba50a9ce6cc8a121e621e522329e5f0 100644 (file)
@@ -371,6 +371,34 @@ OPTIONS (ADD password_required 'false');
 
   </sect3>
 
+  <sect3>
+   <title>Asynchronous Execution Options</title>
+
+   <para>
+    <filename>postgres_fdw</filename> supports asynchronous execution, which
+    runs multiple parts of an <structname>Append</structname> node
+    concurrently rather than serially to improve performance.
+    This execution can be controled using the following option:
+   </para>
+
+   <variablelist>
+
+    <varlistentry>
+     <term><literal>async_capable</literal></term>
+     <listitem>
+      <para>
+       This option controls whether <filename>postgres_fdw</filename> allows
+       foreign tables to be scanned concurrently for asynchronous execution.
+       It can be specified for a foreign table or a foreign server.
+       A table-level option overrides a server-level option.
+       The default is <literal>false</literal>.
+      </para>
+     </listitem>
+    </varlistentry>
+
+   </variablelist>
+  </sect3>
+
   <sect3>
    <title>Updatability Options</title>
 
index afc45429ba4560e987dc909ca2eccdddd1a34d83..fe75cabdcc023771fee062225e416cd7eac976ce 100644 (file)
@@ -1394,6 +1394,8 @@ ExplainNode(PlanState *planstate, List *ancestors,
        }
        if (plan->parallel_aware)
            appendStringInfoString(es->str, "Parallel ");
+       if (plan->async_capable)
+           appendStringInfoString(es->str, "Async ");
        appendStringInfoString(es->str, pname);
        es->indent++;
    }
@@ -1413,6 +1415,7 @@ ExplainNode(PlanState *planstate, List *ancestors,
        if (custom_name)
            ExplainPropertyText("Custom Plan Provider", custom_name, es);
        ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+       ExplainPropertyBool("Async Capable", plan->async_capable, es);
    }
 
    switch (nodeTag(plan))
index 74ac59faa138b33cfd6c541dd35a9dd860bd7659..680fd69151b24c43acee25cf50f40ff9cd81fe3d 100644 (file)
@@ -14,6 +14,7 @@ include $(top_builddir)/src/Makefile.global
 
 OBJS = \
    execAmi.o \
+   execAsync.o \
    execCurrent.o \
    execExpr.o \
    execExprInterp.o \
index 18b2ac1865954de15d76462df4ca08072264dc9c..3726048c4a703c2fbdde305c3744f72543420334 100644 (file)
@@ -359,3 +359,43 @@ query returning the same set of scan tuples multiple times.  Likewise,
 SRFs are disallowed in an UPDATE's targetlist.  There, they would have the
 effect of the same row being updated multiple times, which is not very
 useful --- and updates after the first would have no effect anyway.
+
+
+Asynchronous Execution
+----------------------
+
+In cases where a node is waiting on an event external to the database system,
+such as a ForeignScan awaiting network I/O, it's desirable for the node to
+indicate that it cannot return any tuple immediately but may be able to do so
+at a later time.  A process which discovers this type of situation can always
+handle it simply by blocking, but this may waste time that could be spent
+executing some other part of the plan tree where progress could be made
+immediately.  This is particularly likely to occur when the plan tree contains
+an Append node.  Asynchronous execution runs multiple parts of an Append node
+concurrently rather than serially to improve performance.
+
+For asynchronous execution, an Append node must first request a tuple from an
+async-capable child node using ExecAsyncRequest.  Next, it must execute the
+asynchronous event loop using ExecAppendAsyncEventWait.  Eventually, when a
+child node to which an asynchronous request has been made produces a tuple,
+the Append node will receive it from the event loop via ExecAsyncResponse.  In
+the current implementation of asynchronous execution, the only node type that
+requests tuples from an async-capable child node is an Append, while the only
+node type that might be async-capable is a ForeignScan.
+
+Typically, the ExecAsyncResponse callback is the only one required for nodes
+that wish to request tuples asynchronously.  On the other hand, async-capable
+nodes generally need to implement three methods:
+
+1. When an asynchronous request is made, the node's ExecAsyncRequest callback
+   will be invoked; it should use ExecAsyncRequestPending to indicate that the
+   request is pending for a callback described below.  Alternatively, it can
+   instead use ExecAsyncRequestDone if a result is available immediately.
+
+2. When the event loop wishes to wait or poll for file descriptor events, the
+   node's ExecAsyncConfigureWait callback will be invoked to configure the
+   file descriptor event for which the node wishes to wait.
+
+3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback
+   will be invoked; like #1, it should use ExecAsyncRequestPending for another
+   callback or ExecAsyncRequestDone to return a result immediately.
index 4543ac79edfb40c346542ac6f86365c24458cc2d..58a8aa5ab75d4e9c577ba46d7a7a687ebcedd5d6 100644 (file)
@@ -531,6 +531,10 @@ ExecSupportsBackwardScan(Plan *node)
            {
                ListCell   *l;
 
+               /* With async, tuples may be interleaved, so can't back up. */
+               if (((Append *) node)->nasyncplans > 0)
+                   return false;
+
                foreach(l, ((Append *) node)->appendplans)
                {
                    if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
diff --git a/src/backend/executor/execAsync.c b/src/backend/executor/execAsync.c
new file mode 100644 (file)
index 0000000..f1985e6
--- /dev/null
@@ -0,0 +1,124 @@
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ *   Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *   src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+   switch (nodeTag(areq->requestee))
+   {
+       case T_ForeignScanState:
+           ExecAsyncForeignScanRequest(areq);
+           break;
+       default:
+           /* If the node doesn't support async, caller messed up. */
+           elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(areq->requestee));
+   }
+
+   ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait.  We expect the node-type specific callback to
+ * make a single call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+   switch (nodeTag(areq->requestee))
+   {
+       case T_ForeignScanState:
+           ExecAsyncForeignScanConfigureWait(areq);
+           break;
+       default:
+           /* If the node doesn't support async, caller messed up. */
+           elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(areq->requestee));
+   }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+   switch (nodeTag(areq->requestee))
+   {
+       case T_ForeignScanState:
+           ExecAsyncForeignScanNotify(areq);
+           break;
+       default:
+           /* If the node doesn't support async, caller messed up. */
+           elog(ERROR, "unrecognized node type: %d",
+                (int) nodeTag(areq->requestee));
+   }
+
+   ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+   switch (nodeTag(areq->requestor))
+   {
+       case T_AppendState:
+           ExecAsyncAppendResponse(areq);
+           break;
+       default:
+           /* If the node doesn't support async, caller messed up. */
+           elog(ERROR, "unrecognized node type: %d",
+               (int) nodeTag(areq->requestor));
+   }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node.  The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+   areq->request_complete = true;
+   areq->result = result;
+}
+
+/*
+ * A requestee node should call this function to indicate that it is pending
+ * for a callback.  The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestPending(AsyncRequest *areq)
+{
+   areq->callback_pending = true;
+   areq->request_complete = false;
+   areq->result = NULL;
+}
index 15e4115bd6df4b433fb3666abf9aedaaeb5c021e..7da8ffe0652073895cd8902eee5d9a78012f1aaa 100644 (file)
 
 #include "postgres.h"
 
+#include "executor/execAsync.h"
 #include "executor/execdebug.h"
 #include "executor/execPartition.h"
 #include "executor/nodeAppend.h"
 #include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
 
 /* Shared state for parallel-aware Append. */
 struct ParallelAppendState
@@ -78,12 +81,18 @@ struct ParallelAppendState
 };
 
 #define INVALID_SUBPLAN_INDEX      -1
+#define EVENT_BUFFER_SIZE          16
 
 static TupleTableSlot *ExecAppend(PlanState *pstate);
 static bool choose_next_subplan_locally(AppendState *node);
 static bool choose_next_subplan_for_leader(AppendState *node);
 static bool choose_next_subplan_for_worker(AppendState *node);
 static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
 
 /* ----------------------------------------------------------------
  *     ExecInitAppend
@@ -102,7 +111,9 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
    AppendState *appendstate = makeNode(AppendState);
    PlanState **appendplanstates;
    Bitmapset  *validsubplans;
+   Bitmapset  *asyncplans;
    int         nplans;
+   int         nasyncplans;
    int         firstvalid;
    int         i,
                j;
@@ -119,6 +130,8 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
 
    /* Let choose_next_subplan_* function handle setting the first subplan */
    appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+   appendstate->as_syncdone = false;
+   appendstate->as_begun = false;
 
    /* If run-time partition pruning is enabled, then set that up now */
    if (node->part_prune_info != NULL)
@@ -191,12 +204,25 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
     * While at it, find out the first valid partial plan.
     */
    j = 0;
+   asyncplans = NULL;
+   nasyncplans = 0;
    firstvalid = nplans;
    i = -1;
    while ((i = bms_next_member(validsubplans, i)) >= 0)
    {
        Plan       *initNode = (Plan *) list_nth(node->appendplans, i);
 
+       /*
+        * Record async subplans.  When executing EvalPlanQual, we treat them
+        * as sync ones; don't do this when initializing an EvalPlanQual plan
+        * tree.
+        */
+       if (initNode->async_capable && estate->es_epq_active == NULL)
+       {
+           asyncplans = bms_add_member(asyncplans, j);
+           nasyncplans++;
+       }
+
        /*
         * Record the lowest appendplans index which is a valid partial plan.
         */
@@ -210,6 +236,37 @@ ExecInitAppend(Append *node, EState *estate, int eflags)
    appendstate->appendplans = appendplanstates;
    appendstate->as_nplans = nplans;
 
+   /* Initialize async state */
+   appendstate->as_asyncplans = asyncplans;
+   appendstate->as_nasyncplans = nasyncplans;
+   appendstate->as_asyncrequests = NULL;
+   appendstate->as_asyncresults = (TupleTableSlot **)
+       palloc0(nasyncplans * sizeof(TupleTableSlot *));
+   appendstate->as_needrequest = NULL;
+   appendstate->as_eventset = NULL;
+
+   if (nasyncplans > 0)
+   {
+       appendstate->as_asyncrequests = (AsyncRequest **)
+           palloc0(nplans * sizeof(AsyncRequest *));
+
+       i = -1;
+       while ((i = bms_next_member(asyncplans, i)) >= 0)
+       {
+           AsyncRequest *areq;
+
+           areq = palloc(sizeof(AsyncRequest));
+           areq->requestor = (PlanState *) appendstate;
+           areq->requestee = appendplanstates[i];
+           areq->request_index = i;
+           areq->callback_pending = false;
+           areq->request_complete = false;
+           areq->result = NULL;
+
+           appendstate->as_asyncrequests[i] = areq;
+       }
+   }
+
    /*
     * Miscellaneous initialization
     */
@@ -232,31 +289,59 @@ static TupleTableSlot *
 ExecAppend(PlanState *pstate)
 {
    AppendState *node = castNode(AppendState, pstate);
+   TupleTableSlot *result;
 
-   if (node->as_whichplan < 0)
+   /*
+    * If this is the first call after Init or ReScan, we need to do the
+    * initialization work.
+    */
+   if (!node->as_begun)
    {
+       Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
+       Assert(!node->as_syncdone);
+
        /* Nothing to do if there are no subplans */
        if (node->as_nplans == 0)
            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
 
+       /* If there are any async subplans, begin executing them. */
+       if (node->as_nasyncplans > 0)
+           ExecAppendAsyncBegin(node);
+
        /*
-        * If no subplan has been chosen, we must choose one before
+        * If no sync subplan has been chosen, we must choose one before
         * proceeding.
         */
-       if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
-           !node->choose_next_subplan(node))
+       if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+       Assert(node->as_syncdone ||
+              (node->as_whichplan >= 0 &&
+               node->as_whichplan < node->as_nplans));
+
+       /* And we're initialized. */
+       node->as_begun = true;
    }
 
    for (;;)
    {
        PlanState  *subnode;
-       TupleTableSlot *result;
 
        CHECK_FOR_INTERRUPTS();
 
        /*
-        * figure out which subplan we are currently processing
+        * try to get a tuple from an async subplan if any
+        */
+       if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+       {
+           if (ExecAppendAsyncGetNext(node, &result))
+               return result;
+           Assert(!node->as_syncdone);
+           Assert(bms_is_empty(node->as_needrequest));
+       }
+
+       /*
+        * figure out which sync subplan we are currently processing
         */
        Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
        subnode = node->appendplans[node->as_whichplan];
@@ -276,8 +361,16 @@ ExecAppend(PlanState *pstate)
            return result;
        }
 
-       /* choose new subplan; if none, we're done */
-       if (!node->choose_next_subplan(node))
+       /*
+        * wait or poll async events if any. We do this before checking for
+        * the end of iteration, because it might drain the remaining async
+        * subplans.
+        */
+       if (node->as_nasyncremain > 0)
+           ExecAppendAsyncEventWait(node);
+
+       /* choose new sync subplan; if no sync/async subplans, we're done */
+       if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
            return ExecClearTuple(node->ps.ps_ResultTupleSlot);
    }
 }
@@ -313,6 +406,7 @@ ExecEndAppend(AppendState *node)
 void
 ExecReScanAppend(AppendState *node)
 {
+   int         nasyncplans = node->as_nasyncplans;
    int         i;
 
    /*
@@ -326,6 +420,11 @@ ExecReScanAppend(AppendState *node)
    {
        bms_free(node->as_valid_subplans);
        node->as_valid_subplans = NULL;
+       if (nasyncplans > 0)
+       {
+           bms_free(node->as_valid_asyncplans);
+           node->as_valid_asyncplans = NULL;
+       }
    }
 
    for (i = 0; i < node->as_nplans; i++)
@@ -347,8 +446,27 @@ ExecReScanAppend(AppendState *node)
            ExecReScan(subnode);
    }
 
+   /* Reset async state */
+   if (nasyncplans > 0)
+   {
+       i = -1;
+       while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+       {
+           AsyncRequest *areq = node->as_asyncrequests[i];
+
+           areq->callback_pending = false;
+           areq->request_complete = false;
+           areq->result = NULL;
+       }
+
+       bms_free(node->as_needrequest);
+       node->as_needrequest = NULL;
+   }
+
    /* Let choose_next_subplan_* function handle setting the first subplan */
    node->as_whichplan = INVALID_SUBPLAN_INDEX;
+   node->as_syncdone = false;
+   node->as_begun = false;
 }
 
 /* ----------------------------------------------------------------
@@ -429,7 +547,7 @@ ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt)
 /* ----------------------------------------------------------------
  *     choose_next_subplan_locally
  *
- *     Choose next subplan for a non-parallel-aware Append,
+ *     Choose next sync subplan for a non-parallel-aware Append,
  *     returning false if there are no more.
  * ----------------------------------------------------------------
  */
@@ -442,16 +560,25 @@ choose_next_subplan_locally(AppendState *node)
    /* We should never be called when there are no subplans */
    Assert(node->as_nplans > 0);
 
+   /* Nothing to do if syncdone */
+   if (node->as_syncdone)
+       return false;
+
    /*
     * If first call then have the bms member function choose the first valid
-    * subplan by initializing whichplan to -1.  If there happen to be no
-    * valid subplans then the bms member function will handle that by
-    * returning a negative number which will allow us to exit returning a
+    * sync subplan by initializing whichplan to -1.  If there happen to be
+    * no valid sync subplans then the bms member function will handle that
+    * by returning a negative number which will allow us to exit returning a
     * false value.
     */
    if (whichplan == INVALID_SUBPLAN_INDEX)
    {
-       if (node->as_valid_subplans == NULL)
+       if (node->as_nasyncplans > 0)
+       {
+           /* We'd have filled as_valid_subplans already */
+           Assert(node->as_valid_subplans);
+       }
+       else if (node->as_valid_subplans == NULL)
            node->as_valid_subplans =
                ExecFindMatchingSubPlans(node->as_prune_state);
 
@@ -467,7 +594,12 @@ choose_next_subplan_locally(AppendState *node)
        nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
 
    if (nextplan < 0)
+   {
+       /* Set as_syncdone if in async mode */
+       if (node->as_nasyncplans > 0)
+           node->as_syncdone = true;
        return false;
+   }
 
    node->as_whichplan = nextplan;
 
@@ -709,3 +841,306 @@ mark_invalid_subplans_as_finished(AppendState *node)
            node->as_pstate->pa_finished[i] = true;
    }
 }
+
+/* ----------------------------------------------------------------
+ *                     Asynchronous Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ *     ExecAppendAsyncBegin
+ *
+ *     Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+   int         i;
+
+   /* Backward scan is not supported by async-aware Appends. */
+   Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+   /* We should never be called when there are no async subplans. */
+   Assert(node->as_nasyncplans > 0);
+
+   /* If we've yet to determine the valid subplans then do so now. */
+   if (node->as_valid_subplans == NULL)
+       node->as_valid_subplans =
+           ExecFindMatchingSubPlans(node->as_prune_state);
+
+   classify_matching_subplans(node);
+
+   /* Nothing to do if there are no valid async subplans. */
+   if (node->as_nasyncremain == 0)
+       return;
+
+   /* Make a request for each of the valid async subplans. */
+   i = -1;
+   while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+   {
+       AsyncRequest *areq = node->as_asyncrequests[i];
+
+       Assert(areq->request_index == i);
+       Assert(!areq->callback_pending);
+
+       /* Do the actual work. */
+       ExecAsyncRequest(areq);
+   }
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAppendAsyncGetNext
+ *
+ *     Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+   *result = NULL;
+
+   /* We should never be called when there are no valid async subplans. */
+   Assert(node->as_nasyncremain > 0);
+
+   /* Request a tuple asynchronously. */
+   if (ExecAppendAsyncRequest(node, result))
+       return true;
+
+   while (node->as_nasyncremain > 0)
+   {
+       CHECK_FOR_INTERRUPTS();
+
+       /* Wait or poll async events. */
+       ExecAppendAsyncEventWait(node);
+
+       /* Request a tuple asynchronously. */
+       if (ExecAppendAsyncRequest(node, result))
+           return true;
+
+       /* Break from loop if there's any sync subplan that isn't complete. */
+       if (!node->as_syncdone)
+           break;
+   }
+
+   /*
+    * If all sync subplans are complete, we're totally done scanning the
+    * given node.  Otherwise, we're done with the asynchronous stuff but
+    * must continue scanning the sync subplans.
+    */
+   if (node->as_syncdone)
+   {
+       Assert(node->as_nasyncremain == 0);
+       *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+       return true;
+   }
+
+   return false;
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAppendAsyncRequest
+ *
+ *     Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+   Bitmapset  *needrequest;
+   int         i;
+
+   /* Nothing to do if there are no async subplans needing a new request. */
+   if (bms_is_empty(node->as_needrequest))
+       return false;
+
+   /*
+    * If there are any asynchronously-generated results that have not yet
+    * been returned, we have nothing to do; just return one of them.
+    */
+   if (node->as_nasyncresults > 0)
+   {
+       --node->as_nasyncresults;
+       *result = node->as_asyncresults[node->as_nasyncresults];
+       return true;
+   }
+
+   /* Make a new request for each of the async subplans that need it. */
+   needrequest = node->as_needrequest;
+   node->as_needrequest = NULL;
+   i = -1;
+   while ((i = bms_next_member(needrequest, i)) >= 0)
+   {
+       AsyncRequest *areq = node->as_asyncrequests[i];
+
+       /* Do the actual work. */
+       ExecAsyncRequest(areq);
+   }
+   bms_free(needrequest);
+
+   /* Return one of the asynchronously-generated results if any. */
+   if (node->as_nasyncresults > 0)
+   {
+       --node->as_nasyncresults;
+       *result = node->as_asyncresults[node->as_nasyncresults];
+       return true;
+   }
+
+   return false;
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAppendAsyncEventWait
+ *
+ *     Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+   long        timeout = node->as_syncdone ? -1 : 0;
+   WaitEvent   occurred_event[EVENT_BUFFER_SIZE];
+   int         noccurred;
+   int         i;
+
+   /* We should never be called when there are no valid async subplans. */
+   Assert(node->as_nasyncremain > 0);
+
+   node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+                                          node->as_nasyncplans + 1);
+   AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+                     NULL, NULL);
+
+   /* Give each waiting subplan a chance to add an event. */
+   i = -1;
+   while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+   {
+       AsyncRequest *areq = node->as_asyncrequests[i];
+
+       if (areq->callback_pending)
+           ExecAsyncConfigureWait(areq);
+   }
+
+   /* Wait for at least one event to occur. */
+   noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+                                EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+   FreeWaitEventSet(node->as_eventset);
+   node->as_eventset = NULL;
+   if (noccurred == 0)
+       return;
+
+   /* Deliver notifications. */
+   for (i = 0; i < noccurred; i++)
+   {
+       WaitEvent  *w = &occurred_event[i];
+
+       /*
+        * Each waiting subplan should have registered its wait event with
+        * user_data pointing back to its AsyncRequest.
+        */
+       if ((w->events & WL_SOCKET_READABLE) != 0)
+       {
+           AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+           /*
+            * Mark it as no longer needing a callback.  We must do this
+            * before dispatching the callback in case the callback resets
+            * the flag.
+            */
+           Assert(areq->callback_pending);
+           areq->callback_pending = false;
+
+           /* Do the actual work. */
+           ExecAsyncNotify(areq);
+       }
+   }
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAsyncAppendResponse
+ *
+ *     Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+   AppendState *node = (AppendState *) areq->requestor;
+   TupleTableSlot *slot = areq->result;
+
+   /* The result should be a TupleTableSlot or NULL. */
+   Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+   /* Nothing to do if the request is pending. */
+   if (!areq->request_complete)
+   {
+       /* The request would have been pending for a callback */
+       Assert(areq->callback_pending);
+       return;
+   }
+
+   /* If the result is NULL or an empty slot, there's nothing more to do. */
+   if (TupIsNull(slot))
+   {
+       /* The ending subplan wouldn't have been pending for a callback. */
+       Assert(!areq->callback_pending);
+       --node->as_nasyncremain;
+       return;
+   }
+
+   /* Save result so we can return it. */
+   Assert(node->as_nasyncresults < node->as_nasyncplans);
+   node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+   /*
+    * Mark the subplan that returned a result as ready for a new request.  We
+    * don't launch another one here immediately because it might complete.
+    */
+   node->as_needrequest = bms_add_member(node->as_needrequest,
+                                         areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ *     classify_matching_subplans
+ *
+ *     Classify the node's as_valid_subplans into sync ones and
+ *     async ones, adjust it to contain sync ones only, and save
+ *     async ones in the node's as_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+   Bitmapset  *valid_asyncplans;
+
+   Assert(node->as_valid_asyncplans == NULL);
+
+   /* Nothing to do if there are no valid subplans. */
+   if (bms_is_empty(node->as_valid_subplans))
+   {
+       node->as_syncdone = true;
+       node->as_nasyncremain = 0;
+       return;
+   }
+
+   /* Nothing to do if there are no valid async subplans. */
+   if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+   {
+       node->as_nasyncremain = 0;
+       return;
+   }
+
+   /* Get valid async subplans. */
+   valid_asyncplans = bms_copy(node->as_asyncplans);
+   valid_asyncplans = bms_int_members(valid_asyncplans,
+                                      node->as_valid_subplans);
+
+   /* Adjust the valid subplans to contain sync subplans only. */
+   node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+                                             valid_asyncplans);
+   node->as_syncdone = bms_is_empty(node->as_valid_subplans);
+
+   /* Save valid async subplans. */
+   node->as_valid_asyncplans = valid_asyncplans;
+   node->as_nasyncremain = bms_num_members(valid_asyncplans);
+}
index 0969e53c3a44dcb9e567b2d000b2cff80767dba7..898890fb08faf595d65eae46d6c8178998c24f36 100644 (file)
@@ -391,3 +391,51 @@ ExecShutdownForeignScan(ForeignScanState *node)
    if (fdwroutine->ShutdownForeignScan)
        fdwroutine->ShutdownForeignScan(node);
 }
+
+/* ----------------------------------------------------------------
+ *     ExecAsyncForeignScanRequest
+ *
+ *     Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   FdwRoutine *fdwroutine = node->fdwroutine;
+
+   Assert(fdwroutine->ForeignAsyncRequest != NULL);
+   fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAsyncForeignScanConfigureWait
+ *
+ *     In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   FdwRoutine *fdwroutine = node->fdwroutine;
+
+   Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+   fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ *     ExecAsyncForeignScanNotify
+ *
+ *     Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+   ForeignScanState *node = (ForeignScanState *) areq->requestee;
+   FdwRoutine *fdwroutine = node->fdwroutine;
+
+   Assert(fdwroutine->ForeignAsyncNotify != NULL);
+   fdwroutine->ForeignAsyncNotify(areq);
+}
index 1d0bb6e2e745582c05897f840f1bccc039726b29..d58b79d525cbad67c4ebac7e153c3d9371376766 100644 (file)
@@ -120,6 +120,7 @@ CopyPlanFields(const Plan *from, Plan *newnode)
    COPY_SCALAR_FIELD(plan_width);
    COPY_SCALAR_FIELD(parallel_aware);
    COPY_SCALAR_FIELD(parallel_safe);
+   COPY_SCALAR_FIELD(async_capable);
    COPY_SCALAR_FIELD(plan_node_id);
    COPY_NODE_FIELD(targetlist);
    COPY_NODE_FIELD(qual);
@@ -241,6 +242,7 @@ _copyAppend(const Append *from)
     */
    COPY_BITMAPSET_FIELD(apprelids);
    COPY_NODE_FIELD(appendplans);
+   COPY_SCALAR_FIELD(nasyncplans);
    COPY_SCALAR_FIELD(first_partial_plan);
    COPY_NODE_FIELD(part_prune_info);
 
index 301fa3049020354ddfe01d7df1d0ae67230bba67..ff127a19adf420615cc4636c5458740e0e90de1f 100644 (file)
@@ -333,6 +333,7 @@ _outPlanInfo(StringInfo str, const Plan *node)
    WRITE_INT_FIELD(plan_width);
    WRITE_BOOL_FIELD(parallel_aware);
    WRITE_BOOL_FIELD(parallel_safe);
+   WRITE_BOOL_FIELD(async_capable);
    WRITE_INT_FIELD(plan_node_id);
    WRITE_NODE_FIELD(targetlist);
    WRITE_NODE_FIELD(qual);
@@ -431,6 +432,7 @@ _outAppend(StringInfo str, const Append *node)
 
    WRITE_BITMAPSET_FIELD(apprelids);
    WRITE_NODE_FIELD(appendplans);
+   WRITE_INT_FIELD(nasyncplans);
    WRITE_INT_FIELD(first_partial_plan);
    WRITE_NODE_FIELD(part_prune_info);
 }
index 377185f7c679986651d1a847e22ab60c34d45e19..6a563e99033efac96f419ef6ef113b0c11dcec77 100644 (file)
@@ -1615,6 +1615,7 @@ ReadCommonPlan(Plan *local_node)
    READ_INT_FIELD(plan_width);
    READ_BOOL_FIELD(parallel_aware);
    READ_BOOL_FIELD(parallel_safe);
+   READ_BOOL_FIELD(async_capable);
    READ_INT_FIELD(plan_node_id);
    READ_NODE_FIELD(targetlist);
    READ_NODE_FIELD(qual);
@@ -1711,6 +1712,7 @@ _readAppend(void)
 
    READ_BITMAPSET_FIELD(apprelids);
    READ_NODE_FIELD(appendplans);
+   READ_INT_FIELD(nasyncplans);
    READ_INT_FIELD(first_partial_plan);
    READ_NODE_FIELD(part_prune_info);
 
index b92c94858821e9b2142b1d0c380e0b41274bb9aa..0c016a03dd95a4df286ecea7fb05a8dcec8cc7bc 100644 (file)
@@ -147,6 +147,7 @@ bool        enable_partitionwise_aggregate = false;
 bool       enable_parallel_append = true;
 bool       enable_parallel_hash = true;
 bool       enable_partition_pruning = true;
+bool       enable_async_append = true;
 
 typedef struct
 {
index 906cab7053229011fba6fe7f2bcd7ae15279c73b..78ef068fb7b4a356dd67ef6b8205abcae694f404 100644 (file)
@@ -81,6 +81,7 @@ static List *get_gating_quals(PlannerInfo *root, List *quals);
 static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
                                List *gating_quals);
 static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
 static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
                                int flags);
 static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
@@ -1080,6 +1081,31 @@ create_join_plan(PlannerInfo *root, JoinPath *best_path)
    return plan;
 }
 
+/*
+ * is_async_capable_path
+ *     Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+   switch (nodeTag(path))
+   {
+       case T_ForeignPath:
+           {
+               FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+               Assert(fdwroutine != NULL);
+               if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+                   fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+                   return true;
+           }
+           break;
+       default:
+           break;
+   }
+   return false;
+}
+
 /*
  * create_append_plan
  *   Create an Append plan for 'best_path' and (recursively) plans
@@ -1097,6 +1123,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
    List       *pathkeys = best_path->path.pathkeys;
    List       *subplans = NIL;
    ListCell   *subpaths;
+   int         nasyncplans = 0;
    RelOptInfo *rel = best_path->path.parent;
    PartitionPruneInfo *partpruneinfo = NULL;
    int         nodenumsortkeys = 0;
@@ -1104,6 +1131,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
    Oid        *nodeSortOperators = NULL;
    Oid        *nodeCollations = NULL;
    bool       *nodeNullsFirst = NULL;
+   bool        consider_async = false;
 
    /*
     * The subpaths list could be empty, if every child was proven empty by
@@ -1167,6 +1195,11 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
        tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
    }
 
+   /* If appropriate, consider async append */
+   consider_async = (enable_async_append && pathkeys == NIL &&
+                     !best_path->path.parallel_safe &&
+                     list_length(best_path->subpaths) > 1);
+
    /* Build the plan for each child */
    foreach(subpaths, best_path->subpaths)
    {
@@ -1234,6 +1267,13 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
        }
 
        subplans = lappend(subplans, subplan);
+
+       /* Check to see if subplan can be executed asynchronously */
+       if (consider_async && is_async_capable_path(subpath))
+       {
+           subplan->async_capable = true;
+           ++nasyncplans;
+       }
    }
 
    /*
@@ -1266,6 +1306,7 @@ create_append_plan(PlannerInfo *root, AppendPath *best_path, int flags)
    }
 
    plan->appendplans = subplans;
+   plan->nasyncplans = nasyncplans;
    plan->first_partial_plan = best_path->first_partial_path;
    plan->part_prune_info = partpruneinfo;
 
index 60f45ccc4ea0d30b0e50cf1b4bd9780475f5c45e..4b9bcd2b41a44d349382f3039ae9e1a81c3fc8d4 100644 (file)
@@ -3995,6 +3995,9 @@ pgstat_get_wait_ipc(WaitEventIPC w)
 
    switch (w)
    {
+       case WAIT_EVENT_APPEND_READY:
+           event_name = "AppendReady";
+           break;
        case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
            event_name = "BackupWaitWalArchive";
            break;
index 43a5fded103bf392281cea1578ef4b2c40009502..5f3318fa8f1c79e932adfed7a953f4a8859abf3e 100644 (file)
@@ -2020,6 +2020,15 @@ WaitEventSetWaitBlock(WaitEventSet *set, int cur_timeout,
 }
 #endif
 
+/*
+ * Get the number of wait events registered in a given WaitEventSet.
+ */
+int
+GetNumRegisteredWaitEvents(WaitEventSet *set)
+{
+   return set->nevents;
+}
+
 #if defined(WAIT_USE_POLL)
 
 /*
index 0c5dc4d3e8494a01c0fc3375c38440894355d2cf..03daec9a085559766f9fa9a65224671e06f62ea2 100644 (file)
@@ -1128,6 +1128,16 @@ static struct config_bool ConfigureNamesBool[] =
        true,
        NULL, NULL, NULL
    },
+   {
+       {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+           gettext_noop("Enables the planner's use of async append plans."),
+           NULL,
+           GUC_EXPLAIN
+       },
+       &enable_async_append,
+       true,
+       NULL, NULL, NULL
+   },
    {
        {"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
            gettext_noop("Enables genetic query optimization."),
index b234a6bfe64f4a1a26c82724814b894d40337f74..791d39cf0784efb2d63e149a71e85c59ea9b2ab0 100644 (file)
 #enable_partitionwise_aggregate = off
 #enable_parallel_hash = on
 #enable_partition_pruning = on
+#enable_async_append = on
 
 # - Planner Cost Constants -
 
diff --git a/src/include/executor/execAsync.h b/src/include/executor/execAsync.h
new file mode 100644 (file)
index 0000000..724034f
--- /dev/null
@@ -0,0 +1,25 @@
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ *     Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ *     src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncResponse(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+extern void ExecAsyncRequestPending(AsyncRequest *areq);
+
+#endif   /* EXECASYNC_H */
index cafd410a5daebd5ad6376e4a501c023a36642a29..fa54ac6ad23a5783be7a779f990bf39433c72491 100644 (file)
@@ -25,4 +25,6 @@ extern void ExecAppendInitializeDSM(AppendState *node, ParallelContext *pcxt);
 extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
 extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
 
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
 #endif                         /* NODEAPPEND_H */
index 6ae7733e25b004ecd32fca4063bb0992f31b6cbc..8ffc0ca5bf3de541d5d577358d26150052aeddf6 100644 (file)
@@ -31,4 +31,8 @@ extern void ExecForeignScanInitializeWorker(ForeignScanState *node,
                                            ParallelWorkerContext *pwcxt);
 extern void ExecShutdownForeignScan(ForeignScanState *node);
 
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
 #endif                         /* NODEFOREIGNSCAN_H */
index 248f78da4520289d323599021ed64716659cac1a..7c89d081c766a8bcf97b2a271fab59aa95c61444 100644 (file)
@@ -178,6 +178,14 @@ typedef List *(*ReparameterizeForeignPathByChild_function) (PlannerInfo *root,
                                                            List *fdw_private,
                                                            RelOptInfo *child_rel);
 
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
 /*
  * FdwRoutine is the struct returned by a foreign-data wrapper's handler
  * function.  It provides pointers to the callback functions needed by the
@@ -256,6 +264,12 @@ typedef struct FdwRoutine
 
    /* Support functions for path reparameterization. */
    ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+   /* Support functions for asynchronous execution */
+   IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+   ForeignAsyncRequest_function ForeignAsyncRequest;
+   ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+   ForeignAsyncNotify_function ForeignAsyncNotify;
 } FdwRoutine;
 
 
index e31ad6204e64b9f576efb131ed2ea170f6024740..09ea7ef6a6b985bb29ad23c44294630df413b8f5 100644 (file)
@@ -515,6 +515,22 @@ typedef struct ResultRelInfo
    struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
 } ResultRelInfo;
 
+/* ----------------
+ *   AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+   struct PlanState *requestor;    /* Node that wants a tuple */
+   struct PlanState *requestee;    /* Node from which a tuple is wanted */
+   int         request_index;  /* Scratch space for requestor */
+   bool        callback_pending;   /* Callback is needed */
+   bool        request_complete;   /* Request complete, result valid */
+   TupleTableSlot *result;     /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
 /* ----------------
  *   EState information
  *
@@ -1199,12 +1215,12 @@ typedef struct ModifyTableState
  *  AppendState information
  *
  *     nplans              how many plans are in the array
- *     whichplan           which plan is being executed (0 .. n-1), or a
- *                         special negative value. See nodeAppend.c.
+ *     whichplan           which synchronous plan is being executed (0 .. n-1)
+ *                         or a special negative value. See nodeAppend.c.
  *     prune_state         details required to allow partitions to be
  *                         eliminated from the scan, or NULL if not possible.
- *     valid_subplans      for runtime pruning, valid appendplans indexes to
- *                         scan.
+ *     valid_subplans      for runtime pruning, valid synchronous appendplans
+ *                         indexes to scan.
  * ----------------
  */
 
@@ -1220,12 +1236,25 @@ struct AppendState
    PlanState **appendplans;    /* array of PlanStates for my inputs */
    int         as_nplans;
    int         as_whichplan;
+   bool        as_begun;       /* false means need to initialize */
+   Bitmapset  *as_asyncplans;  /* asynchronous plans indexes */
+   int         as_nasyncplans; /* # of asynchronous plans */
+   AsyncRequest **as_asyncrequests;    /* array of AsyncRequests */
+   TupleTableSlot **as_asyncresults;   /* unreturned results of async plans */
+   int         as_nasyncresults;   /* # of valid entries in as_asyncresults */
+   bool        as_syncdone;    /* true if all synchronous plans done in
+                                * asynchronous mode, else false */
+   int         as_nasyncremain;    /* # of remaining asynchronous plans */
+   Bitmapset  *as_needrequest; /* asynchronous plans needing a new request */
+   struct WaitEventSet *as_eventset;   /* WaitEventSet used to configure
+                                        * file descriptor wait events */
    int         as_first_partial_plan;  /* Index of 'appendplans' containing
                                         * the first partial plan */
    ParallelAppendState *as_pstate; /* parallel coordination info */
    Size        pstate_len;     /* size of parallel coordination info */
    struct PartitionPruneState *as_prune_state;
    Bitmapset  *as_valid_subplans;
+   Bitmapset  *as_valid_asyncplans;    /* valid asynchronous plans indexes */
    bool        (*choose_next_subplan) (AppendState *);
 };
 
index 6e62104d0b77b4f75278b33b8bebeaed7ae12001..24ca616740bc0ae265b1331cde9b734d9421b117 100644 (file)
@@ -129,6 +129,11 @@ typedef struct Plan
    bool        parallel_aware; /* engage parallel-aware logic? */
    bool        parallel_safe;  /* OK to use as part of parallel plan? */
 
+   /*
+    * information needed for asynchronous execution
+    */
+   bool        async_capable;  /* engage asynchronous-capable logic? */
+
    /*
     * Common structural data for all Plan types.
     */
@@ -245,6 +250,7 @@ typedef struct Append
    Plan        plan;
    Bitmapset  *apprelids;      /* RTIs of appendrel(s) formed by this node */
    List       *appendplans;
+   int         nasyncplans;    /* # of asynchronous plans */
 
    /*
     * All 'appendplans' preceding this index are non-partial plans. All
index 1be93be09836c7d05c369b782ebd380e3b7e8c27..a3fd93fe07f012d98b99f3dade1508026c7a83ea 100644 (file)
@@ -65,6 +65,7 @@ extern PGDLLIMPORT bool enable_partitionwise_aggregate;
 extern PGDLLIMPORT bool enable_parallel_append;
 extern PGDLLIMPORT bool enable_parallel_hash;
 extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
 extern PGDLLIMPORT int constraint_exclusion;
 
 extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
index 87672e6f3025a56c5447bf1d693f6b7752f2c91b..d699502cd9a5b7743342d4b97b01ced00b347558 100644 (file)
@@ -966,7 +966,8 @@ typedef enum
  */
 typedef enum
 {
-   WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
+   WAIT_EVENT_APPEND_READY = PG_WAIT_IPC,
+   WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE,
    WAIT_EVENT_BGWORKER_SHUTDOWN,
    WAIT_EVENT_BGWORKER_STARTUP,
    WAIT_EVENT_BTREE_PAGE,
index 9e94fcaec24f4fcdddcc88d57bc89bdebf6d1c0e..44f9368c64471e282b4694351ecc96768afdb41f 100644 (file)
@@ -179,5 +179,6 @@ extern int  WaitLatch(Latch *latch, int wakeEvents, long timeout,
 extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
                              pgsocket sock, long timeout, uint32 wait_event_info);
 extern void InitializeLatchWaitSet(void);
+extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
 
 #endif                         /* LATCH_H */
index 791eba85111fcb7a5117b3b978538552b8eef04b..b89b99fb020de193d16ea026dcd6c4f97aeb1e18 100644 (file)
@@ -87,6 +87,7 @@ select explain_filter('explain (analyze, buffers, format json) select * from int
      "Plan": {                     +
        "Node Type": "Seq Scan",    +
        "Parallel Aware": false,    +
+       "Async Capable": false,     +
        "Relation Name": "int8_tbl",+
        "Alias": "i8",              +
        "Startup Cost": N.N,        +
@@ -136,6 +137,7 @@ select explain_filter('explain (analyze, buffers, format xml) select * from int8
      <Plan>                                            +
        <Node-Type>Seq Scan</Node-Type>                 +
        <Parallel-Aware>false</Parallel-Aware>          +
+       <Async-Capable>false</Async-Capable>            +
        <Relation-Name>int8_tbl</Relation-Name>         +
        <Alias>i8</Alias>                               +
        <Startup-Cost>N.N</Startup-Cost>                +
@@ -183,6 +185,7 @@ select explain_filter('explain (analyze, buffers, format yaml) select * from int
  - Plan:                      +
      Node Type: "Seq Scan"    +
      Parallel Aware: false    +
+     Async Capable: false     +
      Relation Name: "int8_tbl"+
      Alias: "i8"              +
      Startup Cost: N.N        +
@@ -233,6 +236,7 @@ select explain_filter('explain (buffers, format json) select * from int8_tbl i8'
      "Plan": {                     +
        "Node Type": "Seq Scan",    +
        "Parallel Aware": false,    +
+       "Async Capable": false,     +
        "Relation Name": "int8_tbl",+
        "Alias": "i8",              +
        "Startup Cost": N.N,        +
@@ -346,6 +350,7 @@ select jsonb_pretty(
                              "Actual Rows": 0,              +
                              "Actual Loops": 0,             +
                              "Startup Cost": 0.0,           +
+                             "Async Capable": false,        +
                              "Relation Name": "tenk1",      +
                              "Parallel Aware": true,        +
                              "Local Hit Blocks": 0,         +
@@ -391,6 +396,7 @@ select jsonb_pretty(
                      "Actual Rows": 0,                      +
                      "Actual Loops": 0,                     +
                      "Startup Cost": 0.0,                   +
+                     "Async Capable": false,                +
                      "Parallel Aware": false,               +
                      "Sort Space Used": 0,                  +
                      "Local Hit Blocks": 0,                 +
@@ -433,6 +439,7 @@ select jsonb_pretty(
              "Actual Rows": 0,                              +
              "Actual Loops": 0,                             +
              "Startup Cost": 0.0,                           +
+             "Async Capable": false,                        +
              "Parallel Aware": false,                       +
              "Workers Planned": 0,                          +
              "Local Hit Blocks": 0,                         +
index 68ca321163b523795f5145a9e95859ab5f17fd4d..a417b566d95abc0e138bba459575f70fd1ea18d8 100644 (file)
@@ -558,6 +558,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
          "Node Type": "Incremental Sort",       +
          "Actual Rows": 55,                     +
          "Actual Loops": 1,                     +
+         "Async Capable": false,                +
          "Presorted Key": [                     +
              "t.a"                              +
          ],                                     +
@@ -760,6 +761,7 @@ select jsonb_pretty(explain_analyze_inc_sort_nodes_without_memory('select * from
          "Node Type": "Incremental Sort",       +
          "Actual Rows": 70,                     +
          "Actual Loops": 1,                     +
+         "Async Capable": false,                +
          "Presorted Key": [                     +
              "t.a"                              +
          ],                                     +
index ff157ceb1c19fb18e5b1ab238a5ea3341154c711..499245068a55807f2c5b86b5fc642f1c5b117131 100644 (file)
@@ -204,6 +204,7 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
        "Node Type": "ModifyTable",                                     +
        "Operation": "Insert",                                          +
        "Parallel Aware": false,                                        +
+       "Async Capable": false,                                         +
        "Relation Name": "insertconflicttest",                          +
        "Alias": "insertconflicttest",                                  +
        "Conflict Resolution": "UPDATE",                                +
@@ -213,7 +214,8 @@ explain (costs off, format json) insert into insertconflicttest values (0, 'Bilb
          {                                                             +
            "Node Type": "Result",                                      +
            "Parent Relationship": "Member",                            +
-           "Parallel Aware": false                                     +
+           "Parallel Aware": false,                                    +
+           "Async Capable": false                                      +
          }                                                             +
        ]                                                               +
      }                                                                 +
index 6d048e309cb093efb1d05ecc5e60d441891e411a..98dde452e628f5d7f976a87238c1ecfb6a3e9030 100644 (file)
@@ -95,6 +95,7 @@ select count(*) = 0 as ok from pg_stat_wal_receiver;
 select name, setting from pg_settings where name like 'enable%';
               name              | setting 
 --------------------------------+---------
+ enable_async_append            | on
  enable_bitmapscan              | on
  enable_gathermerge             | on
  enable_hashagg                 | on
@@ -113,7 +114,7 @@ select name, setting from pg_settings where name like 'enable%';
  enable_seqscan                 | on
  enable_sort                    | on
  enable_tidscan                 | on
-(18 rows)
+(19 rows)
 
 -- Test that the pg_timezone_names and pg_timezone_abbrevs views are
 -- more-or-less working.  We can't test their contents in any great detail