Skip to content

Commit 2b1bb28

Browse files
authored
feat(cubesql): SQL push down complex window expressions (#8788)
1 parent 044341e commit 2b1bb28

File tree

2 files changed

+219
-50
lines changed

2 files changed

+219
-50
lines changed

rust/cubesql/cubesql/src/compile/mod.rs

+33
Original file line numberDiff line numberDiff line change
@@ -16678,4 +16678,37 @@ LIMIT {{ limit }}{% endif %}"#.to_string(),
1667816678

1667916679
Ok(())
1668016680
}
16681+
16682+
#[tokio::test]
16683+
async fn test_double_window_aggr_sql_push_down() {
16684+
if !Rewriter::sql_push_down_enabled() {
16685+
return;
16686+
}
16687+
init_testing_logger();
16688+
16689+
let query_plan = convert_select_to_query_plan(
16690+
r#"
16691+
SELECT
16692+
customer_gender AS customer_gender,
16693+
notes AS notes,
16694+
SUM(SUM(taxful_total_price)) OVER (PARTITION BY customer_gender ORDER BY customer_gender) AS sum,
16695+
AVG(SUM(taxful_total_price)) OVER (PARTITION BY notes ORDER BY notes) AS avg
16696+
FROM KibanaSampleDataEcommerce
16697+
GROUP BY 1, 2
16698+
"#
16699+
.to_string(),
16700+
DatabaseProtocol::PostgreSQL,
16701+
)
16702+
.await;
16703+
16704+
let logical_plan = query_plan.as_logical_plan();
16705+
let sql = logical_plan.find_cube_scan_wrapped_sql().wrapped_sql.sql;
16706+
assert!(sql.contains("OVER (PARTITION BY"));
16707+
16708+
let physical_plan = query_plan.as_physical_plan().await.unwrap();
16709+
println!(
16710+
"Physical plan: {}",
16711+
displayable(physical_plan.as_ref()).indent()
16712+
);
16713+
}
1668116714
}
Original file line numberDiff line numberDiff line change
@@ -1,64 +1,131 @@
1-
use crate::compile::rewrite::{
2-
cube_scan_wrapper, rewrite, rewriter::CubeRewrite, rules::wrapper::WrapperRules, window,
3-
wrapped_select, wrapped_select_window_expr_empty_tail, wrapper_pullup_replacer,
4-
wrapper_pushdown_replacer, ListType,
1+
use egg::{EGraph, Subst};
2+
3+
use crate::{
4+
compile::rewrite::{
5+
cube_scan_wrapper, rewrite, rewriter::CubeRewrite, rules::wrapper::WrapperRules,
6+
transforming_rewrite, window, wrapped_select, wrapped_select_window_expr_empty_tail,
7+
wrapper_pullup_replacer, wrapper_pushdown_replacer, ListType, LogicalPlanAnalysis,
8+
LogicalPlanLanguage,
9+
},
10+
var,
511
};
612

713
impl WrapperRules {
814
pub fn window_rules(&self, rules: &mut Vec<CubeRewrite>) {
9-
rules.extend(vec![rewrite(
10-
"wrapper-push-down-window-to-cube-scan",
11-
window(
12-
cube_scan_wrapper(
13-
wrapper_pullup_replacer(
14-
wrapped_select(
15-
"?select_type",
16-
"?projection_expr",
17-
"?subqueries",
18-
"?group_expr",
19-
"?aggr_expr",
20-
wrapped_select_window_expr_empty_tail(),
21-
"?cube_scan_input",
22-
"?joins",
23-
"?filter_expr",
24-
"?having_expr",
25-
"?limit",
26-
"?offset",
27-
"?order_expr",
28-
"?select_alias",
29-
"?select_distinct",
30-
"?select_push_to_cube",
31-
"?select_ungrouped_scan",
15+
rules.extend(vec![
16+
rewrite(
17+
"wrapper-push-down-window-to-cube-scan",
18+
window(
19+
cube_scan_wrapper(
20+
wrapper_pullup_replacer(
21+
wrapped_select(
22+
"?select_type",
23+
"?projection_expr",
24+
"?subqueries",
25+
"?group_expr",
26+
"?aggr_expr",
27+
wrapped_select_window_expr_empty_tail(),
28+
"?cube_scan_input",
29+
"?joins",
30+
"?filter_expr",
31+
"?having_expr",
32+
"?limit",
33+
"?offset",
34+
"?order_expr",
35+
"?select_alias",
36+
"?select_distinct",
37+
"?select_push_to_cube",
38+
"?select_ungrouped_scan",
39+
),
40+
"?context",
3241
),
33-
"?context",
42+
"CubeScanWrapperFinalized:false",
43+
),
44+
"?window_expr",
45+
),
46+
cube_scan_wrapper(
47+
wrapped_select(
48+
"?select_type",
49+
wrapper_pullup_replacer("?projection_expr", "?context"),
50+
wrapper_pullup_replacer("?subqueries", "?context"),
51+
wrapper_pullup_replacer("?group_expr", "?context"),
52+
wrapper_pullup_replacer("?aggr_expr", "?context"),
53+
wrapper_pushdown_replacer("?window_expr", "?context"),
54+
wrapper_pullup_replacer("?cube_scan_input", "?context"),
55+
wrapper_pullup_replacer("?joins", "?context"),
56+
wrapper_pullup_replacer("?filter_expr", "?context"),
57+
"?having_expr",
58+
"?limit",
59+
"?offset",
60+
wrapper_pullup_replacer("?order_expr", "?context"),
61+
"?select_alias",
62+
"?select_distinct",
63+
"?select_push_to_cube",
64+
"?select_ungrouped_scan",
3465
),
3566
"CubeScanWrapperFinalized:false",
3667
),
37-
"?window_expr",
3868
),
39-
cube_scan_wrapper(
40-
wrapped_select(
41-
"?select_type",
42-
wrapper_pullup_replacer("?projection_expr", "?context"),
43-
wrapper_pullup_replacer("?subqueries", "?context"),
44-
wrapper_pullup_replacer("?group_expr", "?context"),
45-
wrapper_pullup_replacer("?aggr_expr", "?context"),
46-
wrapper_pushdown_replacer("?window_expr", "?context"),
47-
wrapper_pullup_replacer("?cube_scan_input", "?context"),
48-
wrapper_pullup_replacer("?joins", "?context"),
49-
wrapper_pullup_replacer("?filter_expr", "?context"),
50-
"?having_expr",
51-
"?limit",
52-
"?offset",
53-
wrapper_pullup_replacer("?order_expr", "?context"),
54-
"?select_alias",
55-
"?select_distinct",
56-
"?select_push_to_cube",
57-
"?select_ungrouped_scan",
69+
transforming_rewrite(
70+
"wrapper-push-down-window-combined-to-cube-scan",
71+
window(
72+
cube_scan_wrapper(
73+
wrapper_pullup_replacer(
74+
wrapped_select(
75+
"?select_type",
76+
"?projection_expr",
77+
"?subqueries",
78+
"?group_expr",
79+
"?aggr_expr",
80+
"?wrapped_window_expr",
81+
"?cube_scan_input",
82+
"?joins",
83+
"?filter_expr",
84+
"?having_expr",
85+
"?limit",
86+
"?offset",
87+
"?order_expr",
88+
"?select_alias",
89+
"?select_distinct",
90+
"?select_ungrouped",
91+
"?select_ungrouped_scan",
92+
),
93+
"?context",
94+
),
95+
"CubeScanWrapperFinalized:false",
96+
),
97+
"?window_expr",
98+
),
99+
cube_scan_wrapper(
100+
wrapped_select(
101+
"?select_type",
102+
wrapper_pullup_replacer("?projection_expr", "?context"),
103+
wrapper_pullup_replacer("?subqueries", "?context"),
104+
wrapper_pullup_replacer("?group_expr", "?context"),
105+
wrapper_pullup_replacer("?aggr_expr", "?context"),
106+
"?new_window_expr",
107+
wrapper_pullup_replacer("?cube_scan_input", "?context"),
108+
wrapper_pullup_replacer("?joins", "?context"),
109+
wrapper_pullup_replacer("?filter_expr", "?context"),
110+
"?having_expr",
111+
"?limit",
112+
"?offset",
113+
wrapper_pullup_replacer("?order_expr", "?context"),
114+
"?select_alias",
115+
"?select_distinct",
116+
"?select_ungrouped",
117+
"?select_ungrouped_scan",
118+
),
119+
"CubeScanWrapperFinalized:false",
120+
),
121+
self.transform_window_combined(
122+
"?wrapped_window_expr",
123+
"?window_expr",
124+
"?context",
125+
"?new_window_expr",
58126
),
59-
"CubeScanWrapperFinalized:false",
60127
),
61-
)]);
128+
]);
62129

63130
if self.config_obj.push_down_pull_up_split() {
64131
Self::flat_list_pushdown_pullup_rules(
@@ -76,4 +143,73 @@ impl WrapperRules {
76143
);
77144
}
78145
}
146+
147+
fn transform_window_combined(
148+
&self,
149+
wrapped_window_expr_var: &'static str,
150+
window_expr_var: &'static str,
151+
context_var: &'static str,
152+
new_window_expr_var: &'static str,
153+
) -> impl Fn(&mut EGraph<LogicalPlanLanguage, LogicalPlanAnalysis>, &mut Subst) -> bool {
154+
let wrapped_window_expr_var = var!(wrapped_window_expr_var);
155+
let window_expr_var = var!(window_expr_var);
156+
let context_var = var!(context_var);
157+
let new_window_expr_var = var!(new_window_expr_var);
158+
let push_down_pull_up_split = self.config_obj.push_down_pull_up_split();
159+
move |egraph, subst| {
160+
for wrapped_node in &egraph[subst[wrapped_window_expr_var]].nodes {
161+
let LogicalPlanLanguage::WrappedSelectWindowExpr(wrapped_ids) = wrapped_node else {
162+
continue;
163+
};
164+
if wrapped_ids.is_empty() {
165+
continue;
166+
}
167+
168+
for window_node in &egraph[subst[window_expr_var]].nodes {
169+
let LogicalPlanLanguage::WindowWindowExpr(window_ids) = window_node else {
170+
continue;
171+
};
172+
173+
if !push_down_pull_up_split {
174+
let left = egraph.add(LogicalPlanLanguage::WrapperPullupReplacer([
175+
subst[wrapped_window_expr_var],
176+
subst[context_var],
177+
]));
178+
let right = egraph.add(LogicalPlanLanguage::WrapperPushdownReplacer([
179+
subst[window_expr_var],
180+
subst[context_var],
181+
]));
182+
183+
subst.insert(
184+
new_window_expr_var,
185+
egraph.add(LogicalPlanLanguage::WindowWindowExpr(vec![left, right])),
186+
);
187+
return true;
188+
}
189+
190+
let wrapped_ids = wrapped_ids.clone();
191+
let window_ids = window_ids.clone();
192+
193+
let mut new_window_expr_ids = Vec::new();
194+
for id in wrapped_ids {
195+
new_window_expr_ids.push(egraph.add(
196+
LogicalPlanLanguage::WrapperPullupReplacer([id, subst[context_var]]),
197+
));
198+
}
199+
for id in window_ids {
200+
new_window_expr_ids.push(egraph.add(
201+
LogicalPlanLanguage::WrapperPushdownReplacer([id, subst[context_var]]),
202+
));
203+
}
204+
205+
subst.insert(
206+
new_window_expr_var,
207+
egraph.add(LogicalPlanLanguage::WindowWindowExpr(new_window_expr_ids)),
208+
);
209+
return true;
210+
}
211+
}
212+
false
213+
}
214+
}
79215
}

0 commit comments

Comments
 (0)