Skip to content

Commit 6f82601

Browse files
committed
Add support for partitioned by in DDL
1 parent 75cb966 commit 6f82601

File tree

16 files changed

+65
-85
lines changed

16 files changed

+65
-85
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,7 +166,7 @@ recursive = "0.1.1"
166166
regex = "1.8"
167167
rstest = "0.25.0"
168168
serde_json = "1"
169-
sqlparser = { git = "https://github.com/ArroyoSystems/sqlparser-rs", branch = "0.55.0/arroyo", features = ["visitor"] }
169+
sqlparser = { git = "https://github.com/ArroyoSystems/sqlparser-rs", branch = "partitioned_by", features = ["visitor"] }
170170
tempfile = "3"
171171
tokio = { version = "1.45", features = ["macros", "rt", "sync"] }
172172
url = "2.5.4"

datafusion/datasource/src/source.rs

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,16 @@
1717

1818
//! [`DataSource`] and [`DataSourceExec`]
1919
20-
use std::any::Any;
21-
use std::fmt;
22-
use std::fmt::{Debug, Formatter};
23-
use std::sync::Arc;
2420
use datafusion_physical_plan::execution_plan::{Boundedness, EmissionType};
2521
use datafusion_physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet};
2622
use datafusion_physical_plan::projection::ProjectionExec;
2723
use datafusion_physical_plan::{
2824
DisplayAs, DisplayFormatType, ExecutionPlan, PlanProperties,
2925
};
26+
use std::any::Any;
27+
use std::fmt;
28+
use std::fmt::{Debug, Formatter};
29+
use std::sync::Arc;
3030

3131
use crate::file_scan_config::FileScanConfig;
3232
use datafusion_common::config::ConfigOptions;

datafusion/ffi/src/udaf/accumulator.rs

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -44,8 +44,7 @@ pub struct FFI_Accumulator {
4444
) -> RResult<(), RString>,
4545

4646
// Evaluate and return a ScalarValues as protobuf bytes
47-
pub evaluate:
48-
unsafe extern "C" fn(accumulator: &Self) -> RResult<RVec<u8>, RString>,
47+
pub evaluate: unsafe extern "C" fn(accumulator: &Self) -> RResult<RVec<u8>, RString>,
4948

5049
pub size: unsafe extern "C" fn(accumulator: &Self) -> usize,
5150

@@ -256,8 +255,7 @@ impl Accumulator for ForeignAccumulator {
256255

257256
fn state(&self) -> Result<Vec<ScalarValue>> {
258257
unsafe {
259-
let state_protos =
260-
df_result!((self.accumulator.state)(&self.accumulator))?;
258+
let state_protos = df_result!((self.accumulator.state)(&self.accumulator))?;
261259

262260
state_protos
263261
.into_iter()

datafusion/functions-aggregate/src/median.rs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -250,11 +250,9 @@ impl<T: ArrowNumericType> Accumulator for MedianAccumulator<T> {
250250
OffsetBuffer::new(ScalarBuffer::from(vec![0, self.all_values.len() as i32]));
251251

252252
// Build inner array
253-
let values_array = PrimitiveArray::<T>::new(
254-
ScalarBuffer::from(self.all_values.clone()),
255-
None,
256-
)
257-
.with_data_type(self.data_type.clone());
253+
let values_array =
254+
PrimitiveArray::<T>::new(ScalarBuffer::from(self.all_values.clone()), None)
255+
.with_data_type(self.data_type.clone());
258256

259257
// Build the result list array
260258
let list_array = ListArray::new(

datafusion/functions-aggregate/src/sum.rs

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -219,11 +219,14 @@ impl AggregateUDFImpl for Sum {
219219
.into()];
220220

221221
if args.for_sliding {
222-
fields.push(Field::new(
223-
format_state_name(args.name, "count"),
224-
DataType::UInt64,
225-
true,
226-
).into())
222+
fields.push(
223+
Field::new(
224+
format_state_name(args.name, "count"),
225+
DataType::UInt64,
226+
true,
227+
)
228+
.into(),
229+
)
227230
}
228231
Ok(fields)
229232
}

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -555,8 +555,8 @@ where
555555
});
556556
// SAFETY: the offsets were constructed correctly in `insert_if_new` --
557557
// monotonically increasing, overflows were checked.
558-
let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.clone())) };
559-
558+
let offsets =
559+
unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.clone())) };
560560

561561
let values = buffer.as_slice().iter().map(|b| *b).collect();
562562

@@ -582,7 +582,6 @@ where
582582
}
583583
}
584584

585-
586585
/// Total number of entries (including null, if present)
587586
pub fn len(&self) -> usize {
588587
self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)

datafusion/physical-plan/src/aggregates/mod.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,9 @@ impl AggregateMode {
130130
AggregateMode::Partial
131131
| AggregateMode::Single
132132
| AggregateMode::SinglePartitioned => true,
133-
AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::CombinePartial => false,
133+
AggregateMode::Final
134+
| AggregateMode::FinalPartitioned
135+
| AggregateMode::CombinePartial => false,
134136
}
135137
}
136138
}
@@ -1254,7 +1256,9 @@ pub fn aggregate_expressions(
12541256
})
12551257
.collect()),
12561258
// In this mode, we build the merge expressions of the aggregation.
1257-
AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::CombinePartial => {
1259+
AggregateMode::Final
1260+
| AggregateMode::FinalPartitioned
1261+
| AggregateMode::CombinePartial => {
12581262
let mut col_idx_base = col_idx_base;
12591263
aggr_expr
12601264
.iter()

datafusion/physical-plan/src/aggregates/no_grouping.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,9 @@ fn aggregate_batch(
234234
AggregateMode::Partial
235235
| AggregateMode::Single
236236
| AggregateMode::SinglePartitioned => accum.update_batch(&values),
237-
AggregateMode::Final | AggregateMode::FinalPartitioned | AggregateMode::CombinePartial => accum.merge_batch(&values),
237+
AggregateMode::Final
238+
| AggregateMode::FinalPartitioned
239+
| AggregateMode::CombinePartial => accum.merge_batch(&values),
238240
};
239241
let size_post = accum.size();
240242
allocated += size_post.saturating_sub(size_pre);

datafusion/physical-plan/src/aggregates/row_hash.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -955,8 +955,9 @@ impl GroupedHashAggregateStream {
955955
// Next output each aggregate value
956956
for acc in self.accumulators.iter_mut() {
957957
match self.mode {
958-
AggregateMode::Partial
959-
| AggregateMode::CombinePartial => output.extend(acc.state(emit_to)?),
958+
AggregateMode::Partial | AggregateMode::CombinePartial => {
959+
output.extend(acc.state(emit_to)?)
960+
}
960961
_ if spilling => {
961962
// If spilling, output partial state because the spilled data will be
962963
// merged and re-evaluated later.

0 commit comments

Comments
 (0)