Oid serverid; /* foreign server OID used to get server name */
uint32 server_hashvalue; /* hash value of foreign server OID */
uint32 mapping_hashvalue; /* hash value of user mapping OID */
+ PgFdwConnState state; /* extra per-connection state */
} ConnCacheEntry;
/*
* will_prep_stmt must be true if caller intends to create any prepared
* statements. Since those don't go away automatically at transaction end
* (not even on error), we need this flag to cue manual cleanup.
+ *
+ * If state is not NULL, *state receives the per-connection state associated
+ * with the PGconn.
*/
PGconn *
-GetConnection(UserMapping *user, bool will_prep_stmt)
+GetConnection(UserMapping *user, bool will_prep_stmt, PgFdwConnState **state)
{
bool found;
bool retry = false;
*/
PG_TRY();
{
+ /* Process a pending asynchronous request if any. */
+ if (entry->state.pendingAreq)
+ process_pending_request(entry->state.pendingAreq);
/* Start a new transaction or subtransaction if needed. */
begin_remote_xact(entry);
}
/* Remember if caller will prepare statements */
entry->have_prep_stmt |= will_prep_stmt;
+ /* If caller needs access to the per-connection state, return it. */
+ if (state)
+ *state = &entry->state;
+
return entry->conn;
}
entry->mapping_hashvalue =
GetSysCacheHashValue1(USERMAPPINGOID,
ObjectIdGetDatum(user->umid));
+ memset(&entry->state, 0, sizeof(entry->state));
/* Now try to make the connection */
entry->conn = connect_pg_server(server, user);
* Caller is responsible for the error handling on the result.
*/
PGresult *
-pgfdw_exec_query(PGconn *conn, const char *query)
+pgfdw_exec_query(PGconn *conn, const char *query, PgFdwConnState *state)
{
+ /* First, process a pending asynchronous request, if any. */
+ if (state && state->pendingAreq)
+ process_pending_request(state->pendingAreq);
+
/*
* Submit a query. Since we don't use non-blocking mode, this also can
* block. But its risk is relatively small, so we ignore that for now.
{
entry->have_prep_stmt = false;
entry->have_error = false;
+ /* Also reset per-connection state */
+ memset(&entry->state, 0, sizeof(entry->state));
}
/* Disarm changing_xact_state if it all worked. */
* Cancel the currently-in-progress query (whose query text we do not have)
* and ignore the result. Returns true if we successfully cancel the query
* and discard any pending result, and false if not.
+ *
+ * XXX: if the query was one sent by fetch_more_data_begin(), we could get the
+ * query text from the pendingAreq saved in the per-connection state, then
+ * report the query using it.
*/
static bool
pgfdw_cancel_query(PGconn *conn)
END;
$d$;
ERROR: invalid option "password"
-HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size
+HINT: Valid options in this context are: service, passfile, channel_binding, connect_timeout, dbname, host, hostaddr, port, options, application_name, keepalives, keepalives_idle, keepalives_interval, keepalives_count, tcp_user_timeout, sslmode, sslcompression, sslcert, sslkey, sslrootcert, sslcrl, sslcrldir, requirepeer, ssl_min_protocol_version, ssl_max_protocol_version, gssencmode, krbsrvname, gsslib, target_session_attrs, use_remote_estimate, fdw_startup_cost, fdw_tuple_cost, extensions, updatable, fetch_size, batch_size, async_capable
CONTEXT: SQL statement "ALTER SERVER loopback_nopw OPTIONS (ADD password 'dummypw')"
PL/pgSQL function inline_code_block line 3 at EXECUTE
-- If we add a password for our user mapping instead, we should get a different
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+ SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (((b % 100) = 0))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (((b % 100) = 0))
+(8 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1000 | 0 | 0000
+ 1100 | 100 | 0100
+ 1200 | 200 | 0200
+ 1300 | 300 | 0300
+ 1400 | 400 | 0400
+ 1500 | 500 | 0500
+ 1600 | 600 | 0600
+ 1700 | 700 | 0700
+ 1800 | 800 | 0800
+ 1900 | 900 | 0900
+ 2000 | 0 | 0000
+ 2100 | 100 | 0100
+ 2200 | 200 | 0200
+ 2300 | 300 | 0300
+ 2400 | 400 | 0400
+ 2500 | 500 | 0500
+ 2600 | 600 | 0600
+ 2700 | 700 | 0700
+ 2800 | 800 | 0800
+ 2900 | 900 | 0900
+(20 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(10 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Async Foreign Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(14 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+(13 rows)
+
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+ 3505 | 505 | 0505
+(3 rows)
+
+DELETE FROM result_tbl;
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+ QUERY PLAN
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+ -> Append
+ -> Async Foreign Scan
+ Output: t1_1.a, t1_1.b, t1_1.c, t2_1.a, t2_1.b, t2_1.c
+ Relations: (public.async_p1 t1_1) INNER JOIN (public.async_p1 t2_1)
+ Remote SQL: SELECT r5.a, r5.b, r5.c, r8.a, r8.b, r8.c FROM (public.base_tbl1 r5 INNER JOIN public.base_tbl1 r8 ON (((r5.a = r8.a)) AND ((r5.b = r8.b)) AND (((r5.b % 100) = 0))))
+ -> Async Foreign Scan
+ Output: t1_2.a, t1_2.b, t1_2.c, t2_2.a, t2_2.b, t2_2.c
+ Relations: (public.async_p2 t1_2) INNER JOIN (public.async_p2 t2_2)
+ Remote SQL: SELECT r6.a, r6.b, r6.c, r9.a, r9.b, r9.c FROM (public.base_tbl2 r6 INNER JOIN public.base_tbl2 r9 ON (((r6.a = r9.a)) AND ((r6.b = r9.b)) AND (((r6.b % 100) = 0))))
+ -> Hash Join
+ Output: t1_3.a, t1_3.b, t1_3.c, t2_3.a, t2_3.b, t2_3.c
+ Hash Cond: ((t2_3.a = t1_3.a) AND (t2_3.b = t1_3.b))
+ -> Seq Scan on public.async_p3 t2_3
+ Output: t2_3.a, t2_3.b, t2_3.c
+ -> Hash
+ Output: t1_3.a, t1_3.b, t1_3.c
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: ((t1_3.b % 100) = 0)
+(20 rows)
+
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+SELECT * FROM join_tbl ORDER BY a1;
+ a1 | b1 | c1 | a2 | b2 | c2
+------+-----+------+------+-----+------
+ 1000 | 0 | 0000 | 1000 | 0 | 0000
+ 1100 | 100 | 0100 | 1100 | 100 | 0100
+ 1200 | 200 | 0200 | 1200 | 200 | 0200
+ 1300 | 300 | 0300 | 1300 | 300 | 0300
+ 1400 | 400 | 0400 | 1400 | 400 | 0400
+ 1500 | 500 | 0500 | 1500 | 500 | 0500
+ 1600 | 600 | 0600 | 1600 | 600 | 0600
+ 1700 | 700 | 0700 | 1700 | 700 | 0700
+ 1800 | 800 | 0800 | 1800 | 800 | 0800
+ 1900 | 900 | 0900 | 1900 | 900 | 0900
+ 2000 | 0 | 0000 | 2000 | 0 | 0000
+ 2100 | 100 | 0100 | 2100 | 100 | 0100
+ 2200 | 200 | 0200 | 2200 | 200 | 0200
+ 2300 | 300 | 0300 | 2300 | 300 | 0300
+ 2400 | 400 | 0400 | 2400 | 400 | 0400
+ 2500 | 500 | 0500 | 2500 | 500 | 0500
+ 2600 | 600 | 0600 | 2600 | 600 | 0600
+ 2700 | 700 | 0700 | 2700 | 700 | 0700
+ 2800 | 800 | 0800 | 2800 | 800 | 0800
+ 2900 | 900 | 0900 | 2900 | 900 | 0900
+ 3000 | 0 | 0000 | 3000 | 0 | 0000
+ 3100 | 100 | 0100 | 3100 | 100 | 0100
+ 3200 | 200 | 0200 | 3200 | 200 | 0200
+ 3300 | 300 | 0300 | 3300 | 300 | 0300
+ 3400 | 400 | 0400 | 3400 | 400 | 0400
+ 3500 | 500 | 0500 | 3500 | 500 | 0500
+ 3600 | 600 | 0600 | 3600 | 600 | 0600
+ 3700 | 700 | 0700 | 3700 | 700 | 0700
+ 3800 | 800 | 0800 | 3800 | 800 | 0800
+ 3900 | 900 | 0900 | 3900 | 900 | 0900
+(30 rows)
+
+DELETE FROM join_tbl;
+RESET enable_partitionwise_join;
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+ QUERY PLAN
+-----------------------------------------------------------------------------
+ Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 3000))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < 3000))
+(7 rows)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+ QUERY PLAN
+-----------------------------------------------------------------------
+ Foreign Scan on public.async_p1 async_pt
+ Output: async_pt.a, async_pt.b, async_pt.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < 2000))
+(3 rows)
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+PREPARE async_pt_query (int, int) AS
+ INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ Subplans Removed: 1
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE ((a < $1::integer))
+(11 rows)
+
+EXECUTE async_pt_query (3000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+ 2505 | 505 | 0505
+(2 rows)
+
+DELETE FROM result_tbl;
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Insert on public.result_tbl
+ -> Append
+ Subplans Removed: 2
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === $2)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE ((a < $1::integer))
+(7 rows)
+
+EXECUTE async_pt_query (2000, 505);
+SELECT * FROM result_tbl ORDER BY a;
+ a | b | c
+------+-----+------
+ 1505 | 505 | 0505
+(1 row)
+
+DELETE FROM result_tbl;
+RESET plan_cache_mode;
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ QUERY PLAN
+------------------------------------------------------------------------------------------
+ Nested Loop
+ Output: local_tbl.a, local_tbl.b, local_tbl.c, async_pt.a, async_pt.b, async_pt.c
+ -> Seq Scan on public.local_tbl
+ Output: local_tbl.a, local_tbl.b, local_tbl.c
+ Filter: (local_tbl.c = 'bar'::text)
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1 WHERE (($1::integer = a))
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2 WHERE (($1::integer = a))
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (local_tbl.a = async_pt_3.a)
+(15 rows)
+
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ QUERY PLAN
+-------------------------------------------------------------------------------
+ Nested Loop (actual rows=1 loops=1)
+ -> Seq Scan on local_tbl (actual rows=1 loops=1)
+ Filter: (c = 'bar'::text)
+ Rows Removed by Filter: 1
+ -> Append (actual rows=1 loops=1)
+ -> Async Foreign Scan on async_p1 async_pt_1 (never executed)
+ -> Async Foreign Scan on async_p2 async_pt_2 (actual rows=1 loops=1)
+ -> Seq Scan on async_p3 async_pt_3 (never executed)
+ Filter: (local_tbl.a = a)
+(9 rows)
+
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+ a | b | c | a | b | c
+------+-----+-----+------+-----+------
+ 2505 | 505 | bar | 2505 | 505 | 0505
+(1 row)
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+ QUERY PLAN
+----------------------------------------------------------------
+ Nested Loop
+ Output: t1.a, t1.b, t1.c, t2.a, t2.b, t2.c
+ Join Filter: (t1.a = t2.a)
+ -> Append
+ -> Async Foreign Scan on public.async_p1 t1_1
+ Output: t1_1.a, t1_1.b, t1_1.c
+ Filter: (t1_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 t1_2
+ Output: t1_2.a, t1_2.b, t1_2.c
+ Filter: (t1_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: (t1_3.b === 505)
+ -> Materialize
+ Output: t2.a, t2.b, t2.c
+ -> Foreign Scan on public.async_p2 t2
+ Output: t2.a, t2.b, t2.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+(20 rows)
+
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+ a | b | c | a | b | c
+------+-----+------+------+-----+------
+ 2505 | 505 | 0505 | 2505 | 505 | 0505
+(1 row)
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+ QUERY PLAN
+----------------------------------------------------------------
+ Limit
+ Output: t1.a, t1.b, t1.c
+ -> Append
+ -> Async Foreign Scan on public.async_p1 t1_1
+ Output: t1_1.a, t1_1.b, t1_1.c
+ Filter: (t1_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 t1_2
+ Output: t1_2.a, t1_2.b, t1_2.c
+ Filter: (t1_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 t1_3
+ Output: t1_3.a, t1_3.b, t1_3.c
+ Filter: (t1_3.b === 505)
+(14 rows)
+
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+ a | b | c
+------+-----+------
+ 3505 | 505 | 0505
+(1 row)
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl4');
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+ QUERY PLAN
+-------------------------------------------------------------------------
+ Insert on public.insert_tbl
+ Remote SQL: INSERT INTO public.base_tbl4(a, b, c) VALUES ($1, $2, $3)
+ Batch Size: 1
+ -> Append
+ -> Seq Scan on public.local_tbl
+ Output: local_tbl.a, local_tbl.b, local_tbl.c
+ -> Async Foreign Scan on public.remote_tbl
+ Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+ Remote SQL: SELECT a, b, c FROM public.base_tbl3
+(9 rows)
+
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+SELECT * FROM insert_tbl ORDER BY a;
+ a | b | c
+------+-----+-----
+ 1505 | 505 | foo
+ 2505 | 505 | bar
+(2 rows)
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+ QUERY PLAN
+----------------------------------------------------------------------------------------
+ Insert on public.join_tbl
+ CTE t
+ -> Update on public.remote_tbl
+ Output: remote_tbl.a, remote_tbl.b, remote_tbl.c
+ -> Foreign Update on public.remote_tbl
+ Remote SQL: UPDATE public.base_tbl3 SET c = (c || c) RETURNING a, b, c
+ -> Nested Loop Left Join
+ Output: async_pt.a, async_pt.b, async_pt.c, t.a, t.b, t.c
+ Join Filter: ((async_pt.a = t.a) AND (async_pt.b = t.b))
+ -> Append
+ -> Async Foreign Scan on public.async_p1 async_pt_1
+ Output: async_pt_1.a, async_pt_1.b, async_pt_1.c
+ Filter: (async_pt_1.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl1
+ -> Async Foreign Scan on public.async_p2 async_pt_2
+ Output: async_pt_2.a, async_pt_2.b, async_pt_2.c
+ Filter: (async_pt_2.b === 505)
+ Remote SQL: SELECT a, b, c FROM public.base_tbl2
+ -> Seq Scan on public.async_p3 async_pt_3
+ Output: async_pt_3.a, async_pt_3.b, async_pt_3.c
+ Filter: (async_pt_3.b === 505)
+ -> CTE Scan on t
+ Output: t.a, t.b, t.c
+(23 rows)
+
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+SELECT * FROM join_tbl ORDER BY a1;
+ a1 | b1 | c1 | a2 | b2 | c2
+------+-----+------+------+-----+--------
+ 1505 | 505 | 0505 | | |
+ 2505 | 505 | 0505 | 2505 | 505 | barbar
+ 3505 | 505 | 0505 | | |
+(3 rows)
+
+DELETE FROM join_tbl;
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);
* Validate option value, when we can do so without any context.
*/
if (strcmp(def->defname, "use_remote_estimate") == 0 ||
- strcmp(def->defname, "updatable") == 0)
+ strcmp(def->defname, "updatable") == 0 ||
+ strcmp(def->defname, "async_capable") == 0)
{
/* these accept only boolean values */
(void) defGetBoolean(def);
/* batch_size is available on both server and table */
{"batch_size", ForeignServerRelationId, false},
{"batch_size", ForeignTableRelationId, false},
+ /* async_capable is available on both server and table */
+ {"async_capable", ForeignServerRelationId, false},
+ {"async_capable", ForeignTableRelationId, false},
{"password_required", UserMappingRelationId, false},
/*
#include "commands/defrem.h"
#include "commands/explain.h"
#include "commands/vacuum.h"
+#include "executor/execAsync.h"
#include "foreign/fdwapi.h"
#include "funcapi.h"
#include "miscadmin.h"
#include "optimizer/tlist.h"
#include "parser/parsetree.h"
#include "postgres_fdw.h"
+#include "storage/latch.h"
#include "utils/builtins.h"
#include "utils/float.h"
#include "utils/guc.h"
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
unsigned int cursor_number; /* quasi-unique ID for my cursor */
bool cursor_exists; /* have we created the cursor? */
int numParams; /* number of parameters passed to query */
int fetch_ct_2; /* Min(# of fetches done, 2) */
bool eof_reached; /* true if last fetch reached EOF */
+ /* for asynchronous execution */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/* working memory contexts */
MemoryContext batch_cxt; /* context holding current batch of tuples */
MemoryContext temp_cxt; /* context for per-tuple temporary data */
/* for remote query execution */
PGconn *conn; /* connection for the scan */
+ PgFdwConnState *conn_state; /* extra per-connection state */
char *p_name; /* name of prepared statement, if created */
/* extracted fdw_private data */
/* for remote query execution */
PGconn *conn; /* connection for the update */
+ PgFdwConnState *conn_state; /* extra per-connection state */
int numParams; /* number of parameters passed to query */
FmgrInfo *param_flinfo; /* output conversion functions for them */
List *param_exprs; /* executable expressions for param values */
RelOptInfo *input_rel,
RelOptInfo *output_rel,
void *extra);
+static bool postgresIsForeignPathAsyncCapable(ForeignPath *path);
+static void postgresForeignAsyncRequest(AsyncRequest *areq);
+static void postgresForeignAsyncConfigureWait(AsyncRequest *areq);
+static void postgresForeignAsyncNotify(AsyncRequest *areq);
/*
* Helper functions
void *arg);
static void create_cursor(ForeignScanState *node);
static void fetch_more_data(ForeignScanState *node);
-static void close_cursor(PGconn *conn, unsigned int cursor_number);
+static void close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state);
static PgFdwModifyState *create_foreign_modify(EState *estate,
RangeTblEntry *rte,
ResultRelInfo *resultRelInfo,
double *totaldeadrows);
static void analyze_row_processor(PGresult *res, int row,
PgFdwAnalyzeState *astate);
+static void produce_tuple_asynchronously(AsyncRequest *areq, bool fetch);
+static void fetch_more_data_begin(AsyncRequest *areq);
static HeapTuple make_tuple_from_result_row(PGresult *res,
int row,
Relation rel,
/* Support functions for upper relation push-down */
routine->GetForeignUpperPaths = postgresGetForeignUpperPaths;
+ /* Support functions for asynchronous execution */
+ routine->IsForeignPathAsyncCapable = postgresIsForeignPathAsyncCapable;
+ routine->ForeignAsyncRequest = postgresForeignAsyncRequest;
+ routine->ForeignAsyncConfigureWait = postgresForeignAsyncConfigureWait;
+ routine->ForeignAsyncNotify = postgresForeignAsyncNotify;
+
PG_RETURN_POINTER(routine);
}
/*
* Extract user-settable option values. Note that per-table settings of
- * use_remote_estimate and fetch_size override per-server settings of
- * them, respectively.
+ * use_remote_estimate, fetch_size and async_capable override per-server
+ * settings of them, respectively.
*/
fpinfo->use_remote_estimate = false;
fpinfo->fdw_startup_cost = DEFAULT_FDW_STARTUP_COST;
fpinfo->fdw_tuple_cost = DEFAULT_FDW_TUPLE_COST;
fpinfo->shippable_extensions = NIL;
fpinfo->fetch_size = 100;
+ fpinfo->async_capable = false;
apply_server_options(fpinfo);
apply_table_options(fpinfo);
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- fsstate->conn = GetConnection(user, false);
+ fsstate->conn = GetConnection(user, false, &fsstate->conn_state);
/* Assign a unique ID for my cursor */
fsstate->cursor_number = GetCursorNumber(fsstate->conn);
&fsstate->param_flinfo,
&fsstate->param_exprs,
&fsstate->param_values);
+
+ /* Set the async-capable flag */
+ fsstate->async_capable = node->ss.ps.plan->async_capable;
}
/*
TupleTableSlot *slot = node->ss.ss_ScanTupleSlot;
/*
- * If this is the first call after Begin or ReScan, we need to create the
- * cursor on the remote side.
+ * In sync mode, if this is the first call after Begin or ReScan, we need
+ * to create the cursor on the remote side. In async mode, we would have
+ * already created the cursor before we get here, even if this is the
+ * first call after Begin or ReScan.
*/
if (!fsstate->cursor_exists)
create_cursor(node);
*/
if (fsstate->next_tuple >= fsstate->num_tuples)
{
+ /* In async mode, just clear tuple slot. */
+ if (fsstate->async_capable)
+ return ExecClearTuple(slot);
/* No point in another fetch if we already detected EOF, though. */
if (!fsstate->eof_reached)
fetch_more_data(node);
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fsstate->conn, sql);
+ res = pgfdw_exec_query(fsstate->conn, sql, fsstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fsstate->conn, true, sql);
PQclear(res);
/* Close the cursor if open, to prevent accumulation of cursors */
if (fsstate->cursor_exists)
- close_cursor(fsstate->conn, fsstate->cursor_number);
+ close_cursor(fsstate->conn, fsstate->cursor_number,
+ fsstate->conn_state);
/* Release remote connection */
ReleaseConnection(fsstate->conn);
* Get connection to the foreign server. Connection manager will
* establish new connection if necessary.
*/
- dmstate->conn = GetConnection(user, false);
+ dmstate->conn = GetConnection(user, false, &dmstate->conn_state);
/* Update the foreign-join-related fields. */
if (fsplan->scan.scanrelid == 0)
false, &retrieved_attrs, NULL);
/* Get the remote estimate */
- conn = GetConnection(fpinfo->user, false);
+ conn = GetConnection(fpinfo->user, false, NULL);
get_remote_estimate(sql.data, conn, &rows, &width,
&startup_cost, &total_cost);
ReleaseConnection(conn);
/*
* Execute EXPLAIN remotely.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql);
StringInfoData buf;
PGresult *res;
+ /* First, process a pending asynchronous request, if any. */
+ if (fsstate->conn_state->pendingAreq)
+ process_pending_request(fsstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format. We do the
* conversions in the short-lived per-tuple context, so as not to cause a
PG_TRY();
{
PGconn *conn = fsstate->conn;
- char sql[64];
int numrows;
int i;
- snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
- fsstate->fetch_size, fsstate->cursor_number);
+ if (fsstate->async_capable)
+ {
+ Assert(fsstate->conn_state->pendingAreq);
- res = pgfdw_exec_query(conn, sql);
- /* On error, report the original query, not the FETCH. */
- if (PQresultStatus(res) != PGRES_TUPLES_OK)
- pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ /*
+ * The query was already sent by an earlier call to
+ * fetch_more_data_begin. So now we just fetch the result.
+ */
+ res = pgfdw_get_result(conn, fsstate->query);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+
+ /* Reset per-connection state */
+ fsstate->conn_state->pendingAreq = NULL;
+ }
+ else
+ {
+ char sql[64];
+
+ /* This is a regular synchronous fetch. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ res = pgfdw_exec_query(conn, sql, fsstate->conn_state);
+ /* On error, report the original query, not the FETCH. */
+ if (PQresultStatus(res) != PGRES_TUPLES_OK)
+ pgfdw_report_error(ERROR, res, conn, false, fsstate->query);
+ }
/* Convert the data into HeapTuples */
numrows = PQntuples(res);
* Utility routine to close a cursor.
*/
static void
-close_cursor(PGconn *conn, unsigned int cursor_number)
+close_cursor(PGconn *conn, unsigned int cursor_number,
+ PgFdwConnState *conn_state)
{
char sql[64];
PGresult *res;
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(conn, sql);
+ res = pgfdw_exec_query(conn, sql, conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, true, sql);
PQclear(res);
user = GetUserMapping(userid, table->serverid);
/* Open connection; report that we'll create a prepared statement. */
- fmstate->conn = GetConnection(user, true);
+ fmstate->conn = GetConnection(user, true, &fmstate->conn_state);
fmstate->p_name = NULL; /* prepared statement not made yet */
/* Set up remote query information. */
operation == CMD_UPDATE ||
operation == CMD_DELETE);
+ /* First, process a pending asynchronous request, if any. */
+ if (fmstate->conn_state->pendingAreq)
+ process_pending_request(fmstate->conn_state->pendingAreq);
+
/*
* If the existing query was deparsed and prepared for a different number
* of rows, rebuild it for the proper number.
char *p_name;
PGresult *res;
+ /*
+ * The caller would already have processed a pending asynchronous request
+ * if any, so no need to do it here.
+ */
+
/* Construct name we'll use for the prepared statement. */
snprintf(prep_name, sizeof(prep_name), "pgsql_fdw_prep_%u",
GetPrepStmtNumber(fmstate->conn));
* We don't use a PG_TRY block here, so be careful not to throw error
* without releasing the PGresult.
*/
- res = pgfdw_exec_query(fmstate->conn, sql);
+ res = pgfdw_exec_query(fmstate->conn, sql, fmstate->conn_state);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, fmstate->conn, true, sql);
PQclear(res);
int numParams = dmstate->numParams;
const char **values = dmstate->param_values;
+ /* First, process a pending asynchronous request, if any. */
+ if (dmstate->conn_state->pendingAreq)
+ process_pending_request(dmstate->conn_state->pendingAreq);
+
/*
* Construct array of query parameter values in text format.
*/
*/
table = GetForeignTable(RelationGetRelid(relation));
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct command to get page count for relation.
/* In what follows, do not risk leaking any PGresults. */
PG_TRY();
{
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
table = GetForeignTable(RelationGetRelid(relation));
server = GetForeignServer(table->serverid);
user = GetUserMapping(relation->rd_rel->relowner, table->serverid);
- conn = GetConnection(user, false);
+ conn = GetConnection(user, false, NULL);
/*
* Construct cursor that retrieves whole rows from remote.
int fetch_size;
ListCell *lc;
- res = pgfdw_exec_query(conn, sql.data);
+ res = pgfdw_exec_query(conn, sql.data, NULL);
if (PQresultStatus(res) != PGRES_COMMAND_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
PQclear(res);
*/
/* Fetch some rows */
- res = pgfdw_exec_query(conn, fetch_sql);
+ res = pgfdw_exec_query(conn, fetch_sql, NULL);
/* On error, report the original query, not the FETCH. */
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, sql.data);
}
/* Close the cursor, just to be tidy. */
- close_cursor(conn, cursor_number);
+ close_cursor(conn, cursor_number, NULL);
}
PG_CATCH();
{
*/
server = GetForeignServer(serverOid);
mapping = GetUserMapping(GetUserId(), server->serverid);
- conn = GetConnection(mapping, false);
+ conn = GetConnection(mapping, false, NULL);
/* Don't attempt to import collation if remote server hasn't got it */
if (PQserverVersion(conn) < 90100)
appendStringInfoString(&buf, "SELECT 1 FROM pg_catalog.pg_namespace WHERE nspname = ");
deparseStringLiteral(&buf, stmt->remote_schema);
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
appendStringInfoString(&buf, " ORDER BY c.relname, a.attnum");
/* Fetch the data */
- res = pgfdw_exec_query(conn, buf.data);
+ res = pgfdw_exec_query(conn, buf.data, NULL);
if (PQresultStatus(res) != PGRES_TUPLES_OK)
pgfdw_report_error(ERROR, res, conn, false, buf.data);
ExtractExtensionList(defGetString(def), false);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
fpinfo->use_remote_estimate = defGetBoolean(def);
else if (strcmp(def->defname, "fetch_size") == 0)
fpinfo->fetch_size = strtol(defGetString(def), NULL, 10);
+ else if (strcmp(def->defname, "async_capable") == 0)
+ fpinfo->async_capable = defGetBoolean(def);
}
}
fpinfo->shippable_extensions = fpinfo_o->shippable_extensions;
fpinfo->use_remote_estimate = fpinfo_o->use_remote_estimate;
fpinfo->fetch_size = fpinfo_o->fetch_size;
+ fpinfo->async_capable = fpinfo_o->async_capable;
/* Merge the table level options from either side of the join. */
if (fpinfo_i)
* relation sizes.
*/
fpinfo->fetch_size = Max(fpinfo_o->fetch_size, fpinfo_i->fetch_size);
+
+ /*
+ * We'll prefer to consider this join async-capable if any table from
+ * either side of the join is considered async-capable.
+ */
+ fpinfo->async_capable = fpinfo_o->async_capable ||
+ fpinfo_i->async_capable;
}
}
add_path(final_rel, (Path *) final_path);
}
+/*
+ * postgresIsForeignPathAsyncCapable
+ * Check whether a given ForeignPath node is async-capable.
+ */
+static bool
+postgresIsForeignPathAsyncCapable(ForeignPath *path)
+{
+ RelOptInfo *rel = ((Path *) path)->parent;
+ PgFdwRelationInfo *fpinfo = (PgFdwRelationInfo *) rel->fdw_private;
+
+ return fpinfo->async_capable;
+}
+
+/*
+ * postgresForeignAsyncRequest
+ * Asynchronously request next tuple from a foreign PostgreSQL table.
+ */
+static void
+postgresForeignAsyncRequest(AsyncRequest *areq)
+{
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * postgresForeignAsyncConfigureWait
+ * Configure a file descriptor event for which we wish to wait.
+ */
+static void
+postgresForeignAsyncConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ AppendState *requestor = (AppendState *) areq->requestor;
+ WaitEventSet *set = requestor->as_eventset;
+
+ /* This should not be called unless callback_pending */
+ Assert(areq->callback_pending);
+
+ /* The core code would have registered postmaster death event */
+ Assert(GetNumRegisteredWaitEvents(set) >= 1);
+
+ /* Begin an asynchronous data fetch if not already done */
+ if (!pendingAreq)
+ fetch_more_data_begin(areq);
+ else if (pendingAreq->requestor != areq->requestor)
+ {
+ /*
+ * This is the case when the in-process request was made by another
+ * Append. Note that it might be useless to process the request,
+ * because the query might not need tuples from that Append anymore.
+ * Skip the given request if there are any configured events other
+ * than the postmaster death event; otherwise process the request,
+ * then begin a fetch to configure the event below, because otherwise
+ * we might end up with no configured events other than the postmaster
+ * death event.
+ */
+ if (GetNumRegisteredWaitEvents(set) > 1)
+ return;
+ process_pending_request(pendingAreq);
+ fetch_more_data_begin(areq);
+ }
+ else if (pendingAreq->requestee != areq->requestee)
+ {
+ /*
+ * This is the case when the in-process request was made by the same
+ * parent but for a different child. Since we configure only the
+ * event for the request made for that child, skip the given request.
+ */
+ return;
+ }
+ else
+ Assert(pendingAreq == areq);
+
+ AddWaitEventToSet(set, WL_SOCKET_READABLE, PQsocket(fsstate->conn),
+ NULL, areq);
+}
+
+/*
+ * postgresForeignAsyncNotify
+ * Fetch some more tuples from a file descriptor that becomes ready,
+ * requesting next tuple.
+ */
+static void
+postgresForeignAsyncNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ /* The core code would have initialized the callback_pending flag */
+ Assert(!areq->callback_pending);
+
+ /* On error, report the original query, not the FETCH. */
+ if (!PQconsumeInput(fsstate->conn))
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ fetch_more_data(node);
+
+ produce_tuple_asynchronously(areq, true);
+}
+
+/*
+ * Asynchronously produce next tuple from a foreign PostgreSQL table.
+ */
+static void
+produce_tuple_asynchronously(AsyncRequest *areq, bool fetch)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ AsyncRequest *pendingAreq = fsstate->conn_state->pendingAreq;
+ TupleTableSlot *result;
+
+ /* This should not be called if the request is currently in-process */
+ Assert(areq != pendingAreq);
+
+ /* Fetch some more tuples, if we've run out */
+ if (fsstate->next_tuple >= fsstate->num_tuples)
+ {
+ /* No point in another fetch if we already detected EOF, though */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+ return;
+ }
+
+ /* Get a tuple from the ForeignScan node */
+ result = ExecProcNode((PlanState *) node);
+ if (!TupIsNull(result))
+ {
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ return;
+ }
+ Assert(fsstate->next_tuple >= fsstate->num_tuples);
+
+ /* Fetch some more tuples, if we've not detected EOF yet */
+ if (!fsstate->eof_reached)
+ {
+ /* Mark the request as pending for a callback */
+ ExecAsyncRequestPending(areq);
+ /* Begin another fetch if requested and if no pending request */
+ if (fetch && !pendingAreq)
+ fetch_more_data_begin(areq);
+ }
+ else
+ {
+ /* There's nothing more to do; just return a NULL pointer */
+ result = NULL;
+ /* Mark the request as complete */
+ ExecAsyncRequestDone(areq, result);
+ }
+}
+
+/*
+ * Begin an asynchronous data fetch.
+ *
+ * Note: fetch_more_data must be called to fetch the result.
+ */
+static void
+fetch_more_data_begin(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ char sql[64];
+
+ Assert(!fsstate->conn_state->pendingAreq);
+
+ /* Create the cursor synchronously. */
+ if (!fsstate->cursor_exists)
+ create_cursor(node);
+
+ /* We will send this query, but not wait for the response. */
+ snprintf(sql, sizeof(sql), "FETCH %d FROM c%u",
+ fsstate->fetch_size, fsstate->cursor_number);
+
+ if (PQsendQuery(fsstate->conn, sql) < 0)
+ pgfdw_report_error(ERROR, NULL, fsstate->conn, false, fsstate->query);
+
+ /* Remember that the request is in process */
+ fsstate->conn_state->pendingAreq = areq;
+}
+
+/*
+ * Process a pending asynchronous request.
+ */
+void
+process_pending_request(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ PgFdwScanState *fsstate = (PgFdwScanState *) node->fdw_state;
+ EState *estate = node->ss.ps.state;
+ MemoryContext oldcontext;
+
+ /* The request should be currently in-process */
+ Assert(fsstate->conn_state->pendingAreq == areq);
+
+ oldcontext = MemoryContextSwitchTo(estate->es_query_cxt);
+
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+
+ /* Unlike AsyncNotify, we unset callback_pending ourselves */
+ areq->callback_pending = false;
+
+ fetch_more_data(node);
+
+ /* We need to send a new query afterwards; don't fetch */
+ produce_tuple_asynchronously(areq, false);
+
+ /* Unlike AsyncNotify, we call ExecAsyncResponse ourselves */
+ ExecAsyncResponse(areq);
+
+ MemoryContextSwitchTo(oldcontext);
+}
+
/*
* Create a tuple from the specified row of the PGresult.
*
#include "foreign/foreign.h"
#include "lib/stringinfo.h"
#include "libpq-fe.h"
+#include "nodes/execnodes.h"
#include "nodes/pathnodes.h"
#include "utils/relcache.h"
Cost fdw_startup_cost;
Cost fdw_tuple_cost;
List *shippable_extensions; /* OIDs of shippable extensions */
+ bool async_capable;
/* Cached catalog information. */
ForeignTable *table;
int relation_index;
} PgFdwRelationInfo;
+/*
+ * Extra control information relating to a connection.
+ */
+typedef struct PgFdwConnState
+{
+ AsyncRequest *pendingAreq; /* pending async request */
+} PgFdwConnState;
+
/* in postgres_fdw.c */
extern int set_transmission_modes(void);
extern void reset_transmission_modes(int nestlevel);
+extern void process_pending_request(AsyncRequest *areq);
/* in connection.c */
-extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt);
+extern PGconn *GetConnection(UserMapping *user, bool will_prep_stmt,
+ PgFdwConnState **state);
extern void ReleaseConnection(PGconn *conn);
extern unsigned int GetCursorNumber(PGconn *conn);
extern unsigned int GetPrepStmtNumber(PGconn *conn);
extern PGresult *pgfdw_get_result(PGconn *conn, const char *query);
-extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query);
+extern PGresult *pgfdw_exec_query(PGconn *conn, const char *query,
+ PgFdwConnState *state);
extern void pgfdw_report_error(int elevel, PGresult *res, PGconn *conn,
bool clear, const char *sql);
-- Clean up
DROP TABLE batch_table, batch_cp_upd_test CASCADE;
+
+-- ===================================================================
+-- test asynchronous execution
+-- ===================================================================
+
+ALTER SERVER loopback OPTIONS (DROP extensions);
+ALTER SERVER loopback OPTIONS (ADD async_capable 'true');
+ALTER SERVER loopback2 OPTIONS (ADD async_capable 'true');
+
+CREATE TABLE async_pt (a int, b int, c text) PARTITION BY RANGE (a);
+CREATE TABLE base_tbl1 (a int, b int, c text);
+CREATE TABLE base_tbl2 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p1 PARTITION OF async_pt FOR VALUES FROM (1000) TO (2000)
+ SERVER loopback OPTIONS (table_name 'base_tbl1');
+CREATE FOREIGN TABLE async_p2 PARTITION OF async_pt FOR VALUES FROM (2000) TO (3000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl2');
+INSERT INTO async_p1 SELECT 1000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+INSERT INTO async_p2 SELECT 2000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+-- simple queries
+CREATE TABLE result_tbl (a int, b int, c text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b % 100 = 0;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- Check case where multiple partitions use the same connection
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000)
+ SERVER loopback2 OPTIONS (table_name 'base_tbl3');
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+DROP FOREIGN TABLE async_p3;
+DROP TABLE base_tbl3;
+
+-- Check case where the partitioned table has local/remote partitions
+CREATE TABLE async_p3 PARTITION OF async_pt FOR VALUES FROM (3000) TO (4000);
+INSERT INTO async_p3 SELECT 3000 + i, i, to_char(i, 'FM0000') FROM generate_series(0, 999, 5) i;
+ANALYZE async_pt;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+INSERT INTO result_tbl SELECT * FROM async_pt WHERE b === 505;
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+-- partitionwise joins
+SET enable_partitionwise_join TO true;
+
+CREATE TABLE join_tbl (a1 int, b1 int, c1 text, a2 int, b2 int, c2 text);
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+INSERT INTO join_tbl SELECT * FROM async_pt t1, async_pt t2 WHERE t1.a = t2.a AND t1.b = t2.b AND t1.b % 100 = 0;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_partitionwise_join;
+
+-- Test interaction of async execution with plan-time partition pruning
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 3000;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt WHERE a < 2000;
+
+-- Test interaction of async execution with run-time partition pruning
+SET plan_cache_mode TO force_generic_plan;
+
+PREPARE async_pt_query (int, int) AS
+ INSERT INTO result_tbl SELECT * FROM async_pt WHERE a < $1 AND b === $2;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (3000, 505);
+EXECUTE async_pt_query (3000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+EXECUTE async_pt_query (2000, 505);
+EXECUTE async_pt_query (2000, 505);
+
+SELECT * FROM result_tbl ORDER BY a;
+DELETE FROM result_tbl;
+
+RESET plan_cache_mode;
+
+CREATE TABLE local_tbl(a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo'), (2505, 505, 'bar');
+ANALYZE local_tbl;
+
+CREATE INDEX base_tbl1_idx ON base_tbl1 (a);
+CREATE INDEX base_tbl2_idx ON base_tbl2 (a);
+CREATE INDEX async_p3_idx ON async_p3 (a);
+ANALYZE base_tbl1;
+ANALYZE base_tbl2;
+ANALYZE async_p3;
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (use_remote_estimate 'true');
+ALTER FOREIGN TABLE async_p2 OPTIONS (use_remote_estimate 'true');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+EXPLAIN (ANALYZE, COSTS OFF, SUMMARY OFF, TIMING OFF)
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+SELECT * FROM local_tbl, async_pt WHERE local_tbl.a = async_pt.a AND local_tbl.c = 'bar';
+
+ALTER FOREIGN TABLE async_p1 OPTIONS (DROP use_remote_estimate);
+ALTER FOREIGN TABLE async_p2 OPTIONS (DROP use_remote_estimate);
+
+DROP TABLE local_tbl;
+DROP INDEX base_tbl1_idx;
+DROP INDEX base_tbl2_idx;
+DROP INDEX async_p3_idx;
+
+-- Test that pending requests are processed properly
+SET enable_mergejoin TO false;
+SET enable_hashjoin TO false;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+SELECT * FROM async_pt t1, async_p2 t2 WHERE t1.a = t2.a AND t1.b === 505;
+
+EXPLAIN (VERBOSE, COSTS OFF)
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+SELECT * FROM async_pt t1 WHERE t1.b === 505 LIMIT 1;
+
+-- Check with foreign modify
+CREATE TABLE local_tbl (a int, b int, c text);
+INSERT INTO local_tbl VALUES (1505, 505, 'foo');
+
+CREATE TABLE base_tbl3 (a int, b int, c text);
+CREATE FOREIGN TABLE remote_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl3');
+INSERT INTO remote_tbl VALUES (2505, 505, 'bar');
+
+CREATE TABLE base_tbl4 (a int, b int, c text);
+CREATE FOREIGN TABLE insert_tbl (a int, b int, c text)
+ SERVER loopback OPTIONS (table_name 'base_tbl4');
+
+EXPLAIN (VERBOSE, COSTS OFF)
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+INSERT INTO insert_tbl (SELECT * FROM local_tbl UNION ALL SELECT * FROM remote_tbl);
+
+SELECT * FROM insert_tbl ORDER BY a;
+
+-- Check with direct modify
+EXPLAIN (VERBOSE, COSTS OFF)
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+WITH t AS (UPDATE remote_tbl SET c = c || c RETURNING *)
+INSERT INTO join_tbl SELECT * FROM async_pt LEFT JOIN t ON (async_pt.a = t.a AND async_pt.b = t.b) WHERE async_pt.b === 505;
+
+SELECT * FROM join_tbl ORDER BY a1;
+DELETE FROM join_tbl;
+
+RESET enable_mergejoin;
+RESET enable_hashjoin;
+
+-- Clean up
+DROP TABLE async_pt;
+DROP TABLE base_tbl1;
+DROP TABLE base_tbl2;
+DROP TABLE result_tbl;
+DROP TABLE local_tbl;
+DROP FOREIGN TABLE remote_tbl;
+DROP FOREIGN TABLE insert_tbl;
+DROP TABLE base_tbl3;
+DROP TABLE base_tbl4;
+DROP TABLE join_tbl;
+
+ALTER SERVER loopback OPTIONS (DROP async_capable);
+ALTER SERVER loopback2 OPTIONS (DROP async_capable);
</para>
<variablelist>
+ <varlistentry id="guc-enable-async-append" xreflabel="enable_async_append">
+ <term><varname>enable_async_append</varname> (<type>boolean</type>)
+ <indexterm>
+ <primary><varname>enable_async_append</varname> configuration parameter</primary>
+ </indexterm>
+ </term>
+ <listitem>
+ <para>
+ Enables or disables the query planner's use of async-aware
+ append plan types. The default is <literal>on</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
<varlistentry id="guc-enable-bitmapscan" xreflabel="enable_bitmapscan">
<term><varname>enable_bitmapscan</varname> (<type>boolean</type>)
<indexterm>
</para>
</sect2>
+ <sect2 id="fdw-callbacks-async">
+ <title>FDW Routines for Asynchronous Execution</title>
+ <para>
+ A <structname>ForeignScan</structname> node can, optionally, support
+ asynchronous execution as described in
+ <filename>src/backend/executor/README</filename>. The following
+ functions are all optional, but are all required if asynchronous
+ execution is to be supported.
+ </para>
+
+ <para>
+<programlisting>
+bool
+IsForeignPathAsyncCapable(ForeignPath *path);
+</programlisting>
+ Test whether a given <structname>ForeignPath</structname> path can scan
+ the underlying foreign relation asynchronously.
+ This function will only be called at the end of query planning when the
+ given path is a direct child of an <structname>AppendPath</structname>
+ path and when the planner believes that asynchronous execution improves
+ performance, and should return true if the given path is able to scan the
+ foreign relation asynchronously.
+ </para>
+
+ <para>
+ If this function is not defined, it is assumed that the given path scans
+ the foreign relation using <function>IterateForeignScan</function>.
+ (This implies that the callback functions described below will never be
+ called, so they need not be provided either.)
+ </para>
+
+ <para>
+<programlisting>
+void
+ForeignAsyncRequest(AsyncRequest *areq);
+</programlisting>
+ Produce one tuple asynchronously from the
+ <structname>ForeignScan</structname> node. <literal>areq</literal> is
+ the <structname>AsyncRequest</structname> struct describing the
+ <structname>ForeignScan</structname> node and the parent
+ <structname>Append</structname> node that requested the tuple from it.
+ This function should store the tuple into the slot specified by
+ <literal>areq->result</literal>, and set
+ <literal>areq->request_complete</literal> to <literal>true</literal>;
+ or if it needs to wait on an event external to the core server such as
+ network I/O, and cannot produce any tuple immediately, set the flag to
+ <literal>false</literal>, and set
+ <literal>areq->callback_pending</literal> to <literal>true</literal>
+ for the <structname>ForeignScan</structname> node to get a callback from
+ the callback functions described below. If no more tuples are available,
+ set the slot to NULL, and the
+ <literal>areq->request_complete</literal> flag to
+ <literal>true</literal>. It's recommended to use
+ <function>ExecAsyncRequestDone</function> or
+ <function>ExecAsyncRequestPending</function> to set the output parameters
+ in the <literal>areq</literal>.
+ </para>
+
+ <para>
+<programlisting>
+void
+ForeignAsyncConfigureWait(AsyncRequest *areq);
+</programlisting>
+ Configure a file descriptor event for which the
+ <structname>ForeignScan</structname> node wishes to wait.
+ This function will only be called when the
+ <structname>ForeignScan</structname> node has the
+ <literal>areq->callback_pending</literal> flag set, and should add
+ the event to the <structfield>as_eventset</structfield> of the parent
+ <structname>Append</structname> node described by the
+ <literal>areq</literal>. See the comments for
+ <function>ExecAsyncConfigureWait</function> in
+ <filename>src/backend/executor/execAsync.c</filename> for additional
+ information. When the file descriptor event occurs,
+ <function>ForeignAsyncNotify</function> will be called.
+ </para>
+
+ <para>
+<programlisting>
+void
+ForeignAsyncNotify(AsyncRequest *areq);
+</programlisting>
+ Process a relevant event that has occurred, then produce one tuple
+ asynchronously from the <structname>ForeignScan</structname> node.
+ This function should set the output parameters in the
+ <literal>areq</literal> in the same way as
+ <function>ForeignAsyncRequest</function>.
+ </para>
+ </sect2>
+
<sect2 id="fdw-callbacks-reparameterize-paths">
<title>FDW Routines for Reparameterization of Paths</title>
</thead>
<tbody>
+ <row>
+ <entry><literal>AppendReady</literal></entry>
+ <entry>Waiting for subplan nodes of an <literal>Append</literal> plan
+ node to be ready.</entry>
+ </row>
<row>
<entry><literal>BackupWaitWalArchive</literal></entry>
<entry>Waiting for WAL files required for a backup to be successfully
</sect3>
+ <sect3>
+ <title>Asynchronous Execution Options</title>
+
+ <para>
+ <filename>postgres_fdw</filename> supports asynchronous execution, which
+ runs multiple parts of an <structname>Append</structname> node
+ concurrently rather than serially to improve performance.
+ This execution can be controled using the following option:
+ </para>
+
+ <variablelist>
+
+ <varlistentry>
+ <term><literal>async_capable</literal></term>
+ <listitem>
+ <para>
+ This option controls whether <filename>postgres_fdw</filename> allows
+ foreign tables to be scanned concurrently for asynchronous execution.
+ It can be specified for a foreign table or a foreign server.
+ A table-level option overrides a server-level option.
+ The default is <literal>false</literal>.
+ </para>
+ </listitem>
+ </varlistentry>
+
+ </variablelist>
+ </sect3>
+
<sect3>
<title>Updatability Options</title>
}
if (plan->parallel_aware)
appendStringInfoString(es->str, "Parallel ");
+ if (plan->async_capable)
+ appendStringInfoString(es->str, "Async ");
appendStringInfoString(es->str, pname);
es->indent++;
}
if (custom_name)
ExplainPropertyText("Custom Plan Provider", custom_name, es);
ExplainPropertyBool("Parallel Aware", plan->parallel_aware, es);
+ ExplainPropertyBool("Async Capable", plan->async_capable, es);
}
switch (nodeTag(plan))
OBJS = \
execAmi.o \
+ execAsync.o \
execCurrent.o \
execExpr.o \
execExprInterp.o \
SRFs are disallowed in an UPDATE's targetlist. There, they would have the
effect of the same row being updated multiple times, which is not very
useful --- and updates after the first would have no effect anyway.
+
+
+Asynchronous Execution
+----------------------
+
+In cases where a node is waiting on an event external to the database system,
+such as a ForeignScan awaiting network I/O, it's desirable for the node to
+indicate that it cannot return any tuple immediately but may be able to do so
+at a later time. A process which discovers this type of situation can always
+handle it simply by blocking, but this may waste time that could be spent
+executing some other part of the plan tree where progress could be made
+immediately. This is particularly likely to occur when the plan tree contains
+an Append node. Asynchronous execution runs multiple parts of an Append node
+concurrently rather than serially to improve performance.
+
+For asynchronous execution, an Append node must first request a tuple from an
+async-capable child node using ExecAsyncRequest. Next, it must execute the
+asynchronous event loop using ExecAppendAsyncEventWait. Eventually, when a
+child node to which an asynchronous request has been made produces a tuple,
+the Append node will receive it from the event loop via ExecAsyncResponse. In
+the current implementation of asynchronous execution, the only node type that
+requests tuples from an async-capable child node is an Append, while the only
+node type that might be async-capable is a ForeignScan.
+
+Typically, the ExecAsyncResponse callback is the only one required for nodes
+that wish to request tuples asynchronously. On the other hand, async-capable
+nodes generally need to implement three methods:
+
+1. When an asynchronous request is made, the node's ExecAsyncRequest callback
+ will be invoked; it should use ExecAsyncRequestPending to indicate that the
+ request is pending for a callback described below. Alternatively, it can
+ instead use ExecAsyncRequestDone if a result is available immediately.
+
+2. When the event loop wishes to wait or poll for file descriptor events, the
+ node's ExecAsyncConfigureWait callback will be invoked to configure the
+ file descriptor event for which the node wishes to wait.
+
+3. When the file descriptor becomes ready, the node's ExecAsyncNotify callback
+ will be invoked; like #1, it should use ExecAsyncRequestPending for another
+ callback or ExecAsyncRequestDone to return a result immediately.
{
ListCell *l;
+ /* With async, tuples may be interleaved, so can't back up. */
+ if (((Append *) node)->nasyncplans > 0)
+ return false;
+
foreach(l, ((Append *) node)->appendplans)
{
if (!ExecSupportsBackwardScan((Plan *) lfirst(l)))
--- /dev/null
+/*-------------------------------------------------------------------------
+ *
+ * execAsync.c
+ * Support routines for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/backend/executor/execAsync.c
+ *
+ *-------------------------------------------------------------------------
+ */
+
+#include "postgres.h"
+
+#include "executor/execAsync.h"
+#include "executor/nodeAppend.h"
+#include "executor/nodeForeignscan.h"
+
+/*
+ * Asynchronously request a tuple from a designed async-capable node.
+ */
+void
+ExecAsyncRequest(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanRequest(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Give the asynchronous node a chance to configure the file descriptor event
+ * for which it wishes to wait. We expect the node-type specific callback to
+ * make a single call of the following form:
+ *
+ * AddWaitEventToSet(set, WL_SOCKET_READABLE, fd, NULL, areq);
+ */
+void
+ExecAsyncConfigureWait(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanConfigureWait(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+}
+
+/*
+ * Call the asynchronous node back when a relevant event has occurred.
+ */
+void
+ExecAsyncNotify(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestee))
+ {
+ case T_ForeignScanState:
+ ExecAsyncForeignScanNotify(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestee));
+ }
+
+ ExecAsyncResponse(areq);
+}
+
+/*
+ * Call the requestor back when an asynchronous node has produced a result.
+ */
+void
+ExecAsyncResponse(AsyncRequest *areq)
+{
+ switch (nodeTag(areq->requestor))
+ {
+ case T_AppendState:
+ ExecAsyncAppendResponse(areq);
+ break;
+ default:
+ /* If the node doesn't support async, caller messed up. */
+ elog(ERROR, "unrecognized node type: %d",
+ (int) nodeTag(areq->requestor));
+ }
+}
+
+/*
+ * A requestee node should call this function to deliver the tuple to its
+ * requestor node. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result)
+{
+ areq->request_complete = true;
+ areq->result = result;
+}
+
+/*
+ * A requestee node should call this function to indicate that it is pending
+ * for a callback. The requestee node can call this from its ExecAsyncRequest
+ * or ExecAsyncNotify callback.
+ */
+void
+ExecAsyncRequestPending(AsyncRequest *areq)
+{
+ areq->callback_pending = true;
+ areq->request_complete = false;
+ areq->result = NULL;
+}
#include "postgres.h"
+#include "executor/execAsync.h"
#include "executor/execdebug.h"
#include "executor/execPartition.h"
#include "executor/nodeAppend.h"
#include "miscadmin.h"
+#include "pgstat.h"
+#include "storage/latch.h"
/* Shared state for parallel-aware Append. */
struct ParallelAppendState
};
#define INVALID_SUBPLAN_INDEX -1
+#define EVENT_BUFFER_SIZE 16
static TupleTableSlot *ExecAppend(PlanState *pstate);
static bool choose_next_subplan_locally(AppendState *node);
static bool choose_next_subplan_for_leader(AppendState *node);
static bool choose_next_subplan_for_worker(AppendState *node);
static void mark_invalid_subplans_as_finished(AppendState *node);
+static void ExecAppendAsyncBegin(AppendState *node);
+static bool ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result);
+static bool ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result);
+static void ExecAppendAsyncEventWait(AppendState *node);
+static void classify_matching_subplans(AppendState *node);
/* ----------------------------------------------------------------
* ExecInitAppend
AppendState *appendstate = makeNode(AppendState);
PlanState **appendplanstates;
Bitmapset *validsubplans;
+ Bitmapset *asyncplans;
int nplans;
+ int nasyncplans;
int firstvalid;
int i,
j;
/* Let choose_next_subplan_* function handle setting the first subplan */
appendstate->as_whichplan = INVALID_SUBPLAN_INDEX;
+ appendstate->as_syncdone = false;
+ appendstate->as_begun = false;
/* If run-time partition pruning is enabled, then set that up now */
if (node->part_prune_info != NULL)
* While at it, find out the first valid partial plan.
*/
j = 0;
+ asyncplans = NULL;
+ nasyncplans = 0;
firstvalid = nplans;
i = -1;
while ((i = bms_next_member(validsubplans, i)) >= 0)
{
Plan *initNode = (Plan *) list_nth(node->appendplans, i);
+ /*
+ * Record async subplans. When executing EvalPlanQual, we treat them
+ * as sync ones; don't do this when initializing an EvalPlanQual plan
+ * tree.
+ */
+ if (initNode->async_capable && estate->es_epq_active == NULL)
+ {
+ asyncplans = bms_add_member(asyncplans, j);
+ nasyncplans++;
+ }
+
/*
* Record the lowest appendplans index which is a valid partial plan.
*/
appendstate->appendplans = appendplanstates;
appendstate->as_nplans = nplans;
+ /* Initialize async state */
+ appendstate->as_asyncplans = asyncplans;
+ appendstate->as_nasyncplans = nasyncplans;
+ appendstate->as_asyncrequests = NULL;
+ appendstate->as_asyncresults = (TupleTableSlot **)
+ palloc0(nasyncplans * sizeof(TupleTableSlot *));
+ appendstate->as_needrequest = NULL;
+ appendstate->as_eventset = NULL;
+
+ if (nasyncplans > 0)
+ {
+ appendstate->as_asyncrequests = (AsyncRequest **)
+ palloc0(nplans * sizeof(AsyncRequest *));
+
+ i = -1;
+ while ((i = bms_next_member(asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq;
+
+ areq = palloc(sizeof(AsyncRequest));
+ areq->requestor = (PlanState *) appendstate;
+ areq->requestee = appendplanstates[i];
+ areq->request_index = i;
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+
+ appendstate->as_asyncrequests[i] = areq;
+ }
+ }
+
/*
* Miscellaneous initialization
*/
ExecAppend(PlanState *pstate)
{
AppendState *node = castNode(AppendState, pstate);
+ TupleTableSlot *result;
- if (node->as_whichplan < 0)
+ /*
+ * If this is the first call after Init or ReScan, we need to do the
+ * initialization work.
+ */
+ if (!node->as_begun)
{
+ Assert(node->as_whichplan == INVALID_SUBPLAN_INDEX);
+ Assert(!node->as_syncdone);
+
/* Nothing to do if there are no subplans */
if (node->as_nplans == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ /* If there are any async subplans, begin executing them. */
+ if (node->as_nasyncplans > 0)
+ ExecAppendAsyncBegin(node);
+
/*
- * If no subplan has been chosen, we must choose one before
+ * If no sync subplan has been chosen, we must choose one before
* proceeding.
*/
- if (node->as_whichplan == INVALID_SUBPLAN_INDEX &&
- !node->choose_next_subplan(node))
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
+
+ Assert(node->as_syncdone ||
+ (node->as_whichplan >= 0 &&
+ node->as_whichplan < node->as_nplans));
+
+ /* And we're initialized. */
+ node->as_begun = true;
}
for (;;)
{
PlanState *subnode;
- TupleTableSlot *result;
CHECK_FOR_INTERRUPTS();
/*
- * figure out which subplan we are currently processing
+ * try to get a tuple from an async subplan if any
+ */
+ if (node->as_syncdone || !bms_is_empty(node->as_needrequest))
+ {
+ if (ExecAppendAsyncGetNext(node, &result))
+ return result;
+ Assert(!node->as_syncdone);
+ Assert(bms_is_empty(node->as_needrequest));
+ }
+
+ /*
+ * figure out which sync subplan we are currently processing
*/
Assert(node->as_whichplan >= 0 && node->as_whichplan < node->as_nplans);
subnode = node->appendplans[node->as_whichplan];
return result;
}
- /* choose new subplan; if none, we're done */
- if (!node->choose_next_subplan(node))
+ /*
+ * wait or poll async events if any. We do this before checking for
+ * the end of iteration, because it might drain the remaining async
+ * subplans.
+ */
+ if (node->as_nasyncremain > 0)
+ ExecAppendAsyncEventWait(node);
+
+ /* choose new sync subplan; if no sync/async subplans, we're done */
+ if (!node->choose_next_subplan(node) && node->as_nasyncremain == 0)
return ExecClearTuple(node->ps.ps_ResultTupleSlot);
}
}
void
ExecReScanAppend(AppendState *node)
{
+ int nasyncplans = node->as_nasyncplans;
int i;
/*
{
bms_free(node->as_valid_subplans);
node->as_valid_subplans = NULL;
+ if (nasyncplans > 0)
+ {
+ bms_free(node->as_valid_asyncplans);
+ node->as_valid_asyncplans = NULL;
+ }
}
for (i = 0; i < node->as_nplans; i++)
ExecReScan(subnode);
}
+ /* Reset async state */
+ if (nasyncplans > 0)
+ {
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ areq->callback_pending = false;
+ areq->request_complete = false;
+ areq->result = NULL;
+ }
+
+ bms_free(node->as_needrequest);
+ node->as_needrequest = NULL;
+ }
+
/* Let choose_next_subplan_* function handle setting the first subplan */
node->as_whichplan = INVALID_SUBPLAN_INDEX;
+ node->as_syncdone = false;
+ node->as_begun = false;
}
/* ----------------------------------------------------------------
/* ----------------------------------------------------------------
* choose_next_subplan_locally
*
- * Choose next subplan for a non-parallel-aware Append,
+ * Choose next sync subplan for a non-parallel-aware Append,
* returning false if there are no more.
* ----------------------------------------------------------------
*/
/* We should never be called when there are no subplans */
Assert(node->as_nplans > 0);
+ /* Nothing to do if syncdone */
+ if (node->as_syncdone)
+ return false;
+
/*
* If first call then have the bms member function choose the first valid
- * subplan by initializing whichplan to -1. If there happen to be no
- * valid subplans then the bms member function will handle that by
- * returning a negative number which will allow us to exit returning a
+ * sync subplan by initializing whichplan to -1. If there happen to be
+ * no valid sync subplans then the bms member function will handle that
+ * by returning a negative number which will allow us to exit returning a
* false value.
*/
if (whichplan == INVALID_SUBPLAN_INDEX)
{
- if (node->as_valid_subplans == NULL)
+ if (node->as_nasyncplans > 0)
+ {
+ /* We'd have filled as_valid_subplans already */
+ Assert(node->as_valid_subplans);
+ }
+ else if (node->as_valid_subplans == NULL)
node->as_valid_subplans =
ExecFindMatchingSubPlans(node->as_prune_state);
nextplan = bms_prev_member(node->as_valid_subplans, whichplan);
if (nextplan < 0)
+ {
+ /* Set as_syncdone if in async mode */
+ if (node->as_nasyncplans > 0)
+ node->as_syncdone = true;
return false;
+ }
node->as_whichplan = nextplan;
node->as_pstate->pa_finished[i] = true;
}
}
+
+/* ----------------------------------------------------------------
+ * Asynchronous Append Support
+ * ----------------------------------------------------------------
+ */
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncBegin
+ *
+ * Begin executing designed async-capable subplans.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncBegin(AppendState *node)
+{
+ int i;
+
+ /* Backward scan is not supported by async-aware Appends. */
+ Assert(ScanDirectionIsForward(node->ps.state->es_direction));
+
+ /* We should never be called when there are no async subplans. */
+ Assert(node->as_nasyncplans > 0);
+
+ /* If we've yet to determine the valid subplans then do so now. */
+ if (node->as_valid_subplans == NULL)
+ node->as_valid_subplans =
+ ExecFindMatchingSubPlans(node->as_prune_state);
+
+ classify_matching_subplans(node);
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (node->as_nasyncremain == 0)
+ return;
+
+ /* Make a request for each of the valid async subplans. */
+ i = -1;
+ while ((i = bms_next_member(node->as_valid_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ Assert(areq->request_index == i);
+ Assert(!areq->callback_pending);
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncGetNext
+ *
+ * Get the next tuple from any of the asynchronous subplans.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncGetNext(AppendState *node, TupleTableSlot **result)
+{
+ *result = NULL;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ while (node->as_nasyncremain > 0)
+ {
+ CHECK_FOR_INTERRUPTS();
+
+ /* Wait or poll async events. */
+ ExecAppendAsyncEventWait(node);
+
+ /* Request a tuple asynchronously. */
+ if (ExecAppendAsyncRequest(node, result))
+ return true;
+
+ /* Break from loop if there's any sync subplan that isn't complete. */
+ if (!node->as_syncdone)
+ break;
+ }
+
+ /*
+ * If all sync subplans are complete, we're totally done scanning the
+ * given node. Otherwise, we're done with the asynchronous stuff but
+ * must continue scanning the sync subplans.
+ */
+ if (node->as_syncdone)
+ {
+ Assert(node->as_nasyncremain == 0);
+ *result = ExecClearTuple(node->ps.ps_ResultTupleSlot);
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncRequest
+ *
+ * Request a tuple asynchronously.
+ * ----------------------------------------------------------------
+ */
+static bool
+ExecAppendAsyncRequest(AppendState *node, TupleTableSlot **result)
+{
+ Bitmapset *needrequest;
+ int i;
+
+ /* Nothing to do if there are no async subplans needing a new request. */
+ if (bms_is_empty(node->as_needrequest))
+ return false;
+
+ /*
+ * If there are any asynchronously-generated results that have not yet
+ * been returned, we have nothing to do; just return one of them.
+ */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ /* Make a new request for each of the async subplans that need it. */
+ needrequest = node->as_needrequest;
+ node->as_needrequest = NULL;
+ i = -1;
+ while ((i = bms_next_member(needrequest, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ /* Do the actual work. */
+ ExecAsyncRequest(areq);
+ }
+ bms_free(needrequest);
+
+ /* Return one of the asynchronously-generated results if any. */
+ if (node->as_nasyncresults > 0)
+ {
+ --node->as_nasyncresults;
+ *result = node->as_asyncresults[node->as_nasyncresults];
+ return true;
+ }
+
+ return false;
+}
+
+/* ----------------------------------------------------------------
+ * ExecAppendAsyncEventWait
+ *
+ * Wait or poll for file descriptor events and fire callbacks.
+ * ----------------------------------------------------------------
+ */
+static void
+ExecAppendAsyncEventWait(AppendState *node)
+{
+ long timeout = node->as_syncdone ? -1 : 0;
+ WaitEvent occurred_event[EVENT_BUFFER_SIZE];
+ int noccurred;
+ int i;
+
+ /* We should never be called when there are no valid async subplans. */
+ Assert(node->as_nasyncremain > 0);
+
+ node->as_eventset = CreateWaitEventSet(CurrentMemoryContext,
+ node->as_nasyncplans + 1);
+ AddWaitEventToSet(node->as_eventset, WL_EXIT_ON_PM_DEATH, PGINVALID_SOCKET,
+ NULL, NULL);
+
+ /* Give each waiting subplan a chance to add an event. */
+ i = -1;
+ while ((i = bms_next_member(node->as_asyncplans, i)) >= 0)
+ {
+ AsyncRequest *areq = node->as_asyncrequests[i];
+
+ if (areq->callback_pending)
+ ExecAsyncConfigureWait(areq);
+ }
+
+ /* Wait for at least one event to occur. */
+ noccurred = WaitEventSetWait(node->as_eventset, timeout, occurred_event,
+ EVENT_BUFFER_SIZE, WAIT_EVENT_APPEND_READY);
+ FreeWaitEventSet(node->as_eventset);
+ node->as_eventset = NULL;
+ if (noccurred == 0)
+ return;
+
+ /* Deliver notifications. */
+ for (i = 0; i < noccurred; i++)
+ {
+ WaitEvent *w = &occurred_event[i];
+
+ /*
+ * Each waiting subplan should have registered its wait event with
+ * user_data pointing back to its AsyncRequest.
+ */
+ if ((w->events & WL_SOCKET_READABLE) != 0)
+ {
+ AsyncRequest *areq = (AsyncRequest *) w->user_data;
+
+ /*
+ * Mark it as no longer needing a callback. We must do this
+ * before dispatching the callback in case the callback resets
+ * the flag.
+ */
+ Assert(areq->callback_pending);
+ areq->callback_pending = false;
+
+ /* Do the actual work. */
+ ExecAsyncNotify(areq);
+ }
+ }
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncAppendResponse
+ *
+ * Receive a response from an asynchronous request we made.
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncAppendResponse(AsyncRequest *areq)
+{
+ AppendState *node = (AppendState *) areq->requestor;
+ TupleTableSlot *slot = areq->result;
+
+ /* The result should be a TupleTableSlot or NULL. */
+ Assert(slot == NULL || IsA(slot, TupleTableSlot));
+
+ /* Nothing to do if the request is pending. */
+ if (!areq->request_complete)
+ {
+ /* The request would have been pending for a callback */
+ Assert(areq->callback_pending);
+ return;
+ }
+
+ /* If the result is NULL or an empty slot, there's nothing more to do. */
+ if (TupIsNull(slot))
+ {
+ /* The ending subplan wouldn't have been pending for a callback. */
+ Assert(!areq->callback_pending);
+ --node->as_nasyncremain;
+ return;
+ }
+
+ /* Save result so we can return it. */
+ Assert(node->as_nasyncresults < node->as_nasyncplans);
+ node->as_asyncresults[node->as_nasyncresults++] = slot;
+
+ /*
+ * Mark the subplan that returned a result as ready for a new request. We
+ * don't launch another one here immediately because it might complete.
+ */
+ node->as_needrequest = bms_add_member(node->as_needrequest,
+ areq->request_index);
+}
+
+/* ----------------------------------------------------------------
+ * classify_matching_subplans
+ *
+ * Classify the node's as_valid_subplans into sync ones and
+ * async ones, adjust it to contain sync ones only, and save
+ * async ones in the node's as_valid_asyncplans.
+ * ----------------------------------------------------------------
+ */
+static void
+classify_matching_subplans(AppendState *node)
+{
+ Bitmapset *valid_asyncplans;
+
+ Assert(node->as_valid_asyncplans == NULL);
+
+ /* Nothing to do if there are no valid subplans. */
+ if (bms_is_empty(node->as_valid_subplans))
+ {
+ node->as_syncdone = true;
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Nothing to do if there are no valid async subplans. */
+ if (!bms_overlap(node->as_valid_subplans, node->as_asyncplans))
+ {
+ node->as_nasyncremain = 0;
+ return;
+ }
+
+ /* Get valid async subplans. */
+ valid_asyncplans = bms_copy(node->as_asyncplans);
+ valid_asyncplans = bms_int_members(valid_asyncplans,
+ node->as_valid_subplans);
+
+ /* Adjust the valid subplans to contain sync subplans only. */
+ node->as_valid_subplans = bms_del_members(node->as_valid_subplans,
+ valid_asyncplans);
+ node->as_syncdone = bms_is_empty(node->as_valid_subplans);
+
+ /* Save valid async subplans. */
+ node->as_valid_asyncplans = valid_asyncplans;
+ node->as_nasyncremain = bms_num_members(valid_asyncplans);
+}
if (fdwroutine->ShutdownForeignScan)
fdwroutine->ShutdownForeignScan(node);
}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanRequest
+ *
+ * Asynchronously request a tuple from a designed async-capable node
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanRequest(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncRequest != NULL);
+ fdwroutine->ForeignAsyncRequest(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanConfigureWait
+ *
+ * In async mode, configure for a wait
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanConfigureWait(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncConfigureWait != NULL);
+ fdwroutine->ForeignAsyncConfigureWait(areq);
+}
+
+/* ----------------------------------------------------------------
+ * ExecAsyncForeignScanNotify
+ *
+ * Callback invoked when a relevant event has occurred
+ * ----------------------------------------------------------------
+ */
+void
+ExecAsyncForeignScanNotify(AsyncRequest *areq)
+{
+ ForeignScanState *node = (ForeignScanState *) areq->requestee;
+ FdwRoutine *fdwroutine = node->fdwroutine;
+
+ Assert(fdwroutine->ForeignAsyncNotify != NULL);
+ fdwroutine->ForeignAsyncNotify(areq);
+}
COPY_SCALAR_FIELD(plan_width);
COPY_SCALAR_FIELD(parallel_aware);
COPY_SCALAR_FIELD(parallel_safe);
+ COPY_SCALAR_FIELD(async_capable);
COPY_SCALAR_FIELD(plan_node_id);
COPY_NODE_FIELD(targetlist);
COPY_NODE_FIELD(qual);
*/
COPY_BITMAPSET_FIELD(apprelids);
COPY_NODE_FIELD(appendplans);
+ COPY_SCALAR_FIELD(nasyncplans);
COPY_SCALAR_FIELD(first_partial_plan);
COPY_NODE_FIELD(part_prune_info);
WRITE_INT_FIELD(plan_width);
WRITE_BOOL_FIELD(parallel_aware);
WRITE_BOOL_FIELD(parallel_safe);
+ WRITE_BOOL_FIELD(async_capable);
WRITE_INT_FIELD(plan_node_id);
WRITE_NODE_FIELD(targetlist);
WRITE_NODE_FIELD(qual);
WRITE_BITMAPSET_FIELD(apprelids);
WRITE_NODE_FIELD(appendplans);
+ WRITE_INT_FIELD(nasyncplans);
WRITE_INT_FIELD(first_partial_plan);
WRITE_NODE_FIELD(part_prune_info);
}
READ_INT_FIELD(plan_width);
READ_BOOL_FIELD(parallel_aware);
READ_BOOL_FIELD(parallel_safe);
+ READ_BOOL_FIELD(async_capable);
READ_INT_FIELD(plan_node_id);
READ_NODE_FIELD(targetlist);
READ_NODE_FIELD(qual);
READ_BITMAPSET_FIELD(apprelids);
READ_NODE_FIELD(appendplans);
+ READ_INT_FIELD(nasyncplans);
READ_INT_FIELD(first_partial_plan);
READ_NODE_FIELD(part_prune_info);
bool enable_parallel_append = true;
bool enable_parallel_hash = true;
bool enable_partition_pruning = true;
+bool enable_async_append = true;
typedef struct
{
static Plan *create_gating_plan(PlannerInfo *root, Path *path, Plan *plan,
List *gating_quals);
static Plan *create_join_plan(PlannerInfo *root, JoinPath *best_path);
+static bool is_async_capable_path(Path *path);
static Plan *create_append_plan(PlannerInfo *root, AppendPath *best_path,
int flags);
static Plan *create_merge_append_plan(PlannerInfo *root, MergeAppendPath *best_path,
return plan;
}
+/*
+ * is_async_capable_path
+ * Check whether a given Path node is async-capable.
+ */
+static bool
+is_async_capable_path(Path *path)
+{
+ switch (nodeTag(path))
+ {
+ case T_ForeignPath:
+ {
+ FdwRoutine *fdwroutine = path->parent->fdwroutine;
+
+ Assert(fdwroutine != NULL);
+ if (fdwroutine->IsForeignPathAsyncCapable != NULL &&
+ fdwroutine->IsForeignPathAsyncCapable((ForeignPath *) path))
+ return true;
+ }
+ break;
+ default:
+ break;
+ }
+ return false;
+}
+
/*
* create_append_plan
* Create an Append plan for 'best_path' and (recursively) plans
List *pathkeys = best_path->path.pathkeys;
List *subplans = NIL;
ListCell *subpaths;
+ int nasyncplans = 0;
RelOptInfo *rel = best_path->path.parent;
PartitionPruneInfo *partpruneinfo = NULL;
int nodenumsortkeys = 0;
Oid *nodeSortOperators = NULL;
Oid *nodeCollations = NULL;
bool *nodeNullsFirst = NULL;
+ bool consider_async = false;
/*
* The subpaths list could be empty, if every child was proven empty by
tlist_was_changed = (orig_tlist_length != list_length(plan->plan.targetlist));
}
+ /* If appropriate, consider async append */
+ consider_async = (enable_async_append && pathkeys == NIL &&
+ !best_path->path.parallel_safe &&
+ list_length(best_path->subpaths) > 1);
+
/* Build the plan for each child */
foreach(subpaths, best_path->subpaths)
{
}
subplans = lappend(subplans, subplan);
+
+ /* Check to see if subplan can be executed asynchronously */
+ if (consider_async && is_async_capable_path(subpath))
+ {
+ subplan->async_capable = true;
+ ++nasyncplans;
+ }
}
/*
}
plan->appendplans = subplans;
+ plan->nasyncplans = nasyncplans;
plan->first_partial_plan = best_path->first_partial_path;
plan->part_prune_info = partpruneinfo;
switch (w)
{
+ case WAIT_EVENT_APPEND_READY:
+ event_name = "AppendReady";
+ break;
case WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE:
event_name = "BackupWaitWalArchive";
break;
}
#endif
+/*
+ * Get the number of wait events registered in a given WaitEventSet.
+ */
+int
+GetNumRegisteredWaitEvents(WaitEventSet *set)
+{
+ return set->nevents;
+}
+
#if defined(WAIT_USE_POLL)
/*
true,
NULL, NULL, NULL
},
+ {
+ {"enable_async_append", PGC_USERSET, QUERY_TUNING_METHOD,
+ gettext_noop("Enables the planner's use of async append plans."),
+ NULL,
+ GUC_EXPLAIN
+ },
+ &enable_async_append,
+ true,
+ NULL, NULL, NULL
+ },
{
{"geqo", PGC_USERSET, QUERY_TUNING_GEQO,
gettext_noop("Enables genetic query optimization."),
#enable_partitionwise_aggregate = off
#enable_parallel_hash = on
#enable_partition_pruning = on
+#enable_async_append = on
# - Planner Cost Constants -
--- /dev/null
+/*-------------------------------------------------------------------------
+ * execAsync.h
+ * Support functions for asynchronous execution
+ *
+ * Portions Copyright (c) 1996-2021, PostgreSQL Global Development Group
+ * Portions Copyright (c) 1994, Regents of the University of California
+ *
+ * IDENTIFICATION
+ * src/include/executor/execAsync.h
+ *-------------------------------------------------------------------------
+ */
+
+#ifndef EXECASYNC_H
+#define EXECASYNC_H
+
+#include "nodes/execnodes.h"
+
+extern void ExecAsyncRequest(AsyncRequest *areq);
+extern void ExecAsyncConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncNotify(AsyncRequest *areq);
+extern void ExecAsyncResponse(AsyncRequest *areq);
+extern void ExecAsyncRequestDone(AsyncRequest *areq, TupleTableSlot *result);
+extern void ExecAsyncRequestPending(AsyncRequest *areq);
+
+#endif /* EXECASYNC_H */
extern void ExecAppendReInitializeDSM(AppendState *node, ParallelContext *pcxt);
extern void ExecAppendInitializeWorker(AppendState *node, ParallelWorkerContext *pwcxt);
+extern void ExecAsyncAppendResponse(AsyncRequest *areq);
+
#endif /* NODEAPPEND_H */
ParallelWorkerContext *pwcxt);
extern void ExecShutdownForeignScan(ForeignScanState *node);
+extern void ExecAsyncForeignScanRequest(AsyncRequest *areq);
+extern void ExecAsyncForeignScanConfigureWait(AsyncRequest *areq);
+extern void ExecAsyncForeignScanNotify(AsyncRequest *areq);
+
#endif /* NODEFOREIGNSCAN_H */
List *fdw_private,
RelOptInfo *child_rel);
+typedef bool (*IsForeignPathAsyncCapable_function) (ForeignPath *path);
+
+typedef void (*ForeignAsyncRequest_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncConfigureWait_function) (AsyncRequest *areq);
+
+typedef void (*ForeignAsyncNotify_function) (AsyncRequest *areq);
+
/*
* FdwRoutine is the struct returned by a foreign-data wrapper's handler
* function. It provides pointers to the callback functions needed by the
/* Support functions for path reparameterization. */
ReparameterizeForeignPathByChild_function ReparameterizeForeignPathByChild;
+
+ /* Support functions for asynchronous execution */
+ IsForeignPathAsyncCapable_function IsForeignPathAsyncCapable;
+ ForeignAsyncRequest_function ForeignAsyncRequest;
+ ForeignAsyncConfigureWait_function ForeignAsyncConfigureWait;
+ ForeignAsyncNotify_function ForeignAsyncNotify;
} FdwRoutine;
struct CopyMultiInsertBuffer *ri_CopyMultiInsertBuffer;
} ResultRelInfo;
+/* ----------------
+ * AsyncRequest
+ *
+ * State for an asynchronous tuple request.
+ * ----------------
+ */
+typedef struct AsyncRequest
+{
+ struct PlanState *requestor; /* Node that wants a tuple */
+ struct PlanState *requestee; /* Node from which a tuple is wanted */
+ int request_index; /* Scratch space for requestor */
+ bool callback_pending; /* Callback is needed */
+ bool request_complete; /* Request complete, result valid */
+ TupleTableSlot *result; /* Result (NULL if no more tuples) */
+} AsyncRequest;
+
/* ----------------
* EState information
*
* AppendState information
*
* nplans how many plans are in the array
- * whichplan which plan is being executed (0 .. n-1), or a
- * special negative value. See nodeAppend.c.
+ * whichplan which synchronous plan is being executed (0 .. n-1)
+ * or a special negative value. See nodeAppend.c.
* prune_state details required to allow partitions to be
* eliminated from the scan, or NULL if not possible.
- * valid_subplans for runtime pruning, valid appendplans indexes to
- * scan.
+ * valid_subplans for runtime pruning, valid synchronous appendplans
+ * indexes to scan.
* ----------------
*/
PlanState **appendplans; /* array of PlanStates for my inputs */
int as_nplans;
int as_whichplan;
+ bool as_begun; /* false means need to initialize */
+ Bitmapset *as_asyncplans; /* asynchronous plans indexes */
+ int as_nasyncplans; /* # of asynchronous plans */
+ AsyncRequest **as_asyncrequests; /* array of AsyncRequests */
+ TupleTableSlot **as_asyncresults; /* unreturned results of async plans */
+ int as_nasyncresults; /* # of valid entries in as_asyncresults */
+ bool as_syncdone; /* true if all synchronous plans done in
+ * asynchronous mode, else false */
+ int as_nasyncremain; /* # of remaining asynchronous plans */
+ Bitmapset *as_needrequest; /* asynchronous plans needing a new request */
+ struct WaitEventSet *as_eventset; /* WaitEventSet used to configure
+ * file descriptor wait events */
int as_first_partial_plan; /* Index of 'appendplans' containing
* the first partial plan */
ParallelAppendState *as_pstate; /* parallel coordination info */
Size pstate_len; /* size of parallel coordination info */
struct PartitionPruneState *as_prune_state;
Bitmapset *as_valid_subplans;
+ Bitmapset *as_valid_asyncplans; /* valid asynchronous plans indexes */
bool (*choose_next_subplan) (AppendState *);
};
bool parallel_aware; /* engage parallel-aware logic? */
bool parallel_safe; /* OK to use as part of parallel plan? */
+ /*
+ * information needed for asynchronous execution
+ */
+ bool async_capable; /* engage asynchronous-capable logic? */
+
/*
* Common structural data for all Plan types.
*/
Plan plan;
Bitmapset *apprelids; /* RTIs of appendrel(s) formed by this node */
List *appendplans;
+ int nasyncplans; /* # of asynchronous plans */
/*
* All 'appendplans' preceding this index are non-partial plans. All
extern PGDLLIMPORT bool enable_parallel_append;
extern PGDLLIMPORT bool enable_parallel_hash;
extern PGDLLIMPORT bool enable_partition_pruning;
+extern PGDLLIMPORT bool enable_async_append;
extern PGDLLIMPORT int constraint_exclusion;
extern double index_pages_fetched(double tuples_fetched, BlockNumber pages,
*/
typedef enum
{
- WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE = PG_WAIT_IPC,
+ WAIT_EVENT_APPEND_READY = PG_WAIT_IPC,
+ WAIT_EVENT_BACKUP_WAIT_WAL_ARCHIVE,
WAIT_EVENT_BGWORKER_SHUTDOWN,
WAIT_EVENT_BGWORKER_STARTUP,
WAIT_EVENT_BTREE_PAGE,
extern int WaitLatchOrSocket(Latch *latch, int wakeEvents,
pgsocket sock, long timeout, uint32 wait_event_info);
extern void InitializeLatchWaitSet(void);
+extern int GetNumRegisteredWaitEvents(WaitEventSet *set);
#endif /* LATCH_H */
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
<Plan> +
<Node-Type>Seq Scan</Node-Type> +
<Parallel-Aware>false</Parallel-Aware> +
+ <Async-Capable>false</Async-Capable> +
<Relation-Name>int8_tbl</Relation-Name> +
<Alias>i8</Alias> +
<Startup-Cost>N.N</Startup-Cost> +
- Plan: +
Node Type: "Seq Scan" +
Parallel Aware: false +
+ Async Capable: false +
Relation Name: "int8_tbl"+
Alias: "i8" +
Startup Cost: N.N +
"Plan": { +
"Node Type": "Seq Scan", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "int8_tbl",+
"Alias": "i8", +
"Startup Cost": N.N, +
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Relation Name": "tenk1", +
"Parallel Aware": true, +
"Local Hit Blocks": 0, +
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Sort Space Used": 0, +
"Local Hit Blocks": 0, +
"Actual Rows": 0, +
"Actual Loops": 0, +
"Startup Cost": 0.0, +
+ "Async Capable": false, +
"Parallel Aware": false, +
"Workers Planned": 0, +
"Local Hit Blocks": 0, +
"Node Type": "Incremental Sort", +
"Actual Rows": 55, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
"Node Type": "Incremental Sort", +
"Actual Rows": 70, +
"Actual Loops": 1, +
+ "Async Capable": false, +
"Presorted Key": [ +
"t.a" +
], +
"Node Type": "ModifyTable", +
"Operation": "Insert", +
"Parallel Aware": false, +
+ "Async Capable": false, +
"Relation Name": "insertconflicttest", +
"Alias": "insertconflicttest", +
"Conflict Resolution": "UPDATE", +
{ +
"Node Type": "Result", +
"Parent Relationship": "Member", +
- "Parallel Aware": false +
+ "Parallel Aware": false, +
+ "Async Capable": false +
} +
] +
} +
select name, setting from pg_settings where name like 'enable%';
name | setting
--------------------------------+---------
+ enable_async_append | on
enable_bitmapscan | on
enable_gathermerge | on
enable_hashagg | on
enable_seqscan | on
enable_sort | on
enable_tidscan | on
-(18 rows)
+(19 rows)
-- Test that the pg_timezone_names and pg_timezone_abbrevs views are
-- more-or-less working. We can't test their contents in any great detail