Skip to content

Commit b586e59

Browse files
committed
Handle new Arroyo syntax forms
1 parent 7551d50 commit b586e59

File tree

14 files changed

+91
-30
lines changed

14 files changed

+91
-30
lines changed

Cargo.lock

Lines changed: 2 additions & 4 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -166,11 +166,12 @@ recursive = "0.1.1"
166166
regex = "1.8"
167167
rstest = "0.25.0"
168168
serde_json = "1"
169-
sqlparser = { version = "0.55.0", features = ["visitor"] }
169+
sqlparser = { git = "https://github.com/ArroyoSystems/sqlparser-rs", branch = "0.55.0/arroyo", features = ["visitor"] }
170170
tempfile = "3"
171171
tokio = { version = "1.45", features = ["macros", "rt", "sync"] }
172172
url = "2.5.4"
173173

174+
174175
[profile.release]
175176
codegen-units = 1
176177
lto = true

datafusion-examples/examples/advanced_udaf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ impl Accumulator for GeometricMean {
134134
// This function serializes our state to `ScalarValue`, which DataFusion uses
135135
// to pass this state between execution stages.
136136
// Note that this can be arbitrary data.
137-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
137+
fn state(&self) -> Result<Vec<ScalarValue>> {
138138
Ok(vec![
139139
ScalarValue::from(self.prod),
140140
ScalarValue::from(self.n),
@@ -143,7 +143,7 @@ impl Accumulator for GeometricMean {
143143

144144
// DataFusion expects this function to return the final value of this aggregator.
145145
// in this case, this is the formula of the geometric mean
146-
fn evaluate(&mut self) -> Result<ScalarValue> {
146+
fn evaluate(&self) -> Result<ScalarValue> {
147147
let value = self.prod.powf(1.0 / self.n as f64);
148148
Ok(ScalarValue::from(value))
149149
}

datafusion-examples/examples/simple_udaf.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl Accumulator for GeometricMean {
7171
// This function serializes our state to `ScalarValue`, which DataFusion uses
7272
// to pass this state between execution stages.
7373
// Note that this can be arbitrary data.
74-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
74+
fn state(&self) -> Result<Vec<ScalarValue>> {
7575
Ok(vec![
7676
ScalarValue::from(self.prod),
7777
ScalarValue::from(self.n),
@@ -80,7 +80,7 @@ impl Accumulator for GeometricMean {
8080

8181
// DataFusion expects this function to return the final value of this aggregator.
8282
// in this case, this is the formula of the geometric mean
83-
fn evaluate(&mut self) -> Result<ScalarValue> {
83+
fn evaluate(&self) -> Result<ScalarValue> {
8484
let value = self.prod.powf(1.0 / self.n as f64);
8585
Ok(ScalarValue::from(value))
8686
}

datafusion/core/tests/user_defined/user_defined_aggregates.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ impl TimeSum {
596596
}
597597

598598
impl Accumulator for TimeSum {
599-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
599+
fn state(&self) -> Result<Vec<ScalarValue>> {
600600
Ok(vec![self.evaluate()?])
601601
}
602602

@@ -617,7 +617,7 @@ impl Accumulator for TimeSum {
617617
self.update_batch(states)
618618
}
619619

620-
fn evaluate(&mut self) -> Result<ScalarValue> {
620+
fn evaluate(&self) -> Result<ScalarValue> {
621621
Ok(ScalarValue::TimestampNanosecond(Some(self.sum), None))
622622
}
623623

@@ -740,12 +740,12 @@ impl FirstSelector {
740740
}
741741

742742
impl Accumulator for FirstSelector {
743-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
743+
fn state(&self) -> Result<Vec<ScalarValue>> {
744744
self.evaluate().map(|s| vec![s])
745745
}
746746

747747
/// produce the output structure
748-
fn evaluate(&mut self) -> Result<ScalarValue> {
748+
fn evaluate(&self) -> Result<ScalarValue> {
749749
self.to_state()
750750
}
751751

@@ -838,15 +838,15 @@ impl Accumulator for TestGroupsAccumulator {
838838
Ok(())
839839
}
840840

841-
fn evaluate(&mut self) -> Result<ScalarValue> {
841+
fn evaluate(&self) -> Result<ScalarValue> {
842842
Ok(ScalarValue::from(self.result))
843843
}
844844

845845
fn size(&self) -> usize {
846846
size_of::<u64>()
847847
}
848848

849-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
849+
fn state(&self) -> Result<Vec<ScalarValue>> {
850850
Ok(vec![ScalarValue::from(self.result)])
851851
}
852852

@@ -977,7 +977,7 @@ impl Accumulator for MetadataBasedAccumulator {
977977
Ok(())
978978
}
979979

980-
fn evaluate(&mut self) -> Result<ScalarValue> {
980+
fn evaluate(&self) -> Result<ScalarValue> {
981981
let v = match self.double_output {
982982
true => self.curr_sum * 2,
983983
false => self.curr_sum,
@@ -990,7 +990,7 @@ impl Accumulator for MetadataBasedAccumulator {
990990
9
991991
}
992992

993-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
993+
fn state(&self) -> Result<Vec<ScalarValue>> {
994994
Ok(vec![ScalarValue::from(self.curr_sum)])
995995
}
996996

datafusion/core/tests/user_defined/user_defined_scalar_functions.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -345,7 +345,7 @@ async fn udaf_as_window_func() -> Result<()> {
345345
struct MyAccumulator;
346346

347347
impl Accumulator for MyAccumulator {
348-
fn state(&mut self) -> Result<Vec<ScalarValue>> {
348+
fn state(&self) -> Result<Vec<ScalarValue>> {
349349
unimplemented!()
350350
}
351351

@@ -357,7 +357,7 @@ async fn udaf_as_window_func() -> Result<()> {
357357
unimplemented!()
358358
}
359359

360-
fn evaluate(&mut self) -> Result<ScalarValue> {
360+
fn evaluate(&self) -> Result<ScalarValue> {
361361
unimplemented!()
362362
}
363363

datafusion/functions-aggregate-common/src/aggregate/count_distinct/bytes.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,8 @@ impl<O: OffsetSizeTrait> Accumulator for BytesDistinctCountAccumulator<O> {
5252
}
5353

5454
fn state(&self) -> datafusion_common::Result<Vec<ScalarValue>> {
55-
exec_err!("immutable state not supported for BytesDistinctCount")
55+
let arr = self.0.as_state();
56+
Ok(vec![SingleRowListArrayBuilder::new(arr).build_list_scalar()])
5657
}
5758

5859
fn update_batch(&mut self, values: &[ArrayRef]) -> datafusion_common::Result<()> {

datafusion/functions-aggregate/src/array_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1076,7 +1076,7 @@ mod tests {
10761076

10771077
fn merge(
10781078
mut acc1: Box<dyn Accumulator>,
1079-
mut acc2: Box<dyn Accumulator>,
1079+
acc2: Box<dyn Accumulator>,
10801080
) -> Result<Box<dyn Accumulator>> {
10811081
let intermediate_state = acc2.state().and_then(|e| {
10821082
e.iter()

datafusion/functions-aggregate/src/string_agg.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -499,7 +499,7 @@ mod tests {
499499

500500
fn merge(
501501
mut acc1: Box<dyn Accumulator>,
502-
mut acc2: Box<dyn Accumulator>,
502+
acc2: Box<dyn Accumulator>,
503503
) -> Result<Box<dyn Accumulator>> {
504504
let intermediate_state = acc2.state().and_then(|e| {
505505
e.iter()

datafusion/physical-expr-common/src/binary_map.rs

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -79,6 +79,10 @@ impl<O: OffsetSizeTrait> ArrowBytesSet<O> {
7979
self.0.into_state()
8080
}
8181

82+
pub fn as_state(&self) -> ArrayRef {
83+
self.0.as_state()
84+
}
85+
8286
/// Returns the total number of distinct values (including nulls) seen so far
8387
pub fn len(&self) -> usize {
8488
self.0.len()
@@ -526,6 +530,59 @@ where
526530
}
527531
}
528532

533+
/// Converts this set into a `StringArray`, `LargeStringArray`,
534+
/// `BinaryArray`, or `LargeBinaryArray` containing each distinct value
535+
/// that was inserted. This is done without copying the values.
536+
///
537+
/// The values are guaranteed to be returned in the same order in which
538+
/// they were first seen.
539+
pub fn as_state(&self) -> ArrayRef {
540+
let Self {
541+
output_type,
542+
map: _,
543+
map_size: _,
544+
offsets,
545+
buffer,
546+
random_state: _,
547+
hashes_buffer: _,
548+
null,
549+
} = self;
550+
551+
// Only make a `NullBuffer` if there was a null value
552+
let nulls = null.map(|(_payload, null_index)| {
553+
let num_values = offsets.len() - 1;
554+
single_null_buffer(num_values, null_index)
555+
});
556+
// SAFETY: the offsets were constructed correctly in `insert_if_new` --
557+
// monotonically increasing, overflows were checked.
558+
let offsets = unsafe { OffsetBuffer::new_unchecked(ScalarBuffer::from(offsets.clone())) };
559+
560+
561+
let values = buffer.as_slice().iter().map(|b| *b).collect();
562+
563+
match output_type {
564+
OutputType::Binary => {
565+
// SAFETY: the offsets were constructed correctly
566+
Arc::new(unsafe {
567+
GenericBinaryArray::new_unchecked(offsets, values, nulls)
568+
})
569+
}
570+
OutputType::Utf8 => {
571+
// SAFETY:
572+
// 1. the offsets were constructed safely
573+
//
574+
// 2. we asserted the input arrays were all the correct type and
575+
// thus since all the values that went in were valid (e.g. utf8)
576+
// so are all the values that come out
577+
Arc::new(unsafe {
578+
GenericStringArray::new_unchecked(offsets, values, nulls)
579+
})
580+
}
581+
_ => unreachable!("View types should use `ArrowBytesViewMap`"),
582+
}
583+
}
584+
585+
529586
/// Total number of entries (including null, if present)
530587
pub fn len(&self) -> usize {
531588
self.non_null_len() + self.null.map(|_| 1).unwrap_or(0)

0 commit comments

Comments
 (0)