Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion crates/arkflow-plugin/src/processor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ pub mod batch;
pub mod json;
pub mod protobuf;
pub mod sql;
mod udf;
pub mod udf;

lazy_static::lazy_static! {
static ref INITIALIZED: OnceLock<()> = OnceLock::new();
Expand Down
4 changes: 3 additions & 1 deletion crates/arkflow-plugin/src/processor/sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
//!
//! DataFusion is used to process data with SQL queries.

use crate::processor::udf;
use arkflow_core::processor::{register_processor_builder, Processor, ProcessorBuilder};
use arkflow_core::{Error, MessageBatch};
use async_trait::async_trait;
Expand Down Expand Up @@ -117,9 +118,10 @@ impl SqlProcessor {
ctx.execute_logical_plan(plan).await
}

/// Create a new session context with JSON functions registered
/// Create a new session context with UDFs and JSON functions registered
fn create_session_context() -> Result<SessionContext, Error> {
let mut ctx = SessionContext::new();
udf::init(&mut ctx)?;
datafusion_functions_json::register_all(&mut ctx)
.map_err(|e| Error::Process(format!("Registration JSON function failed: {}", e)))?;
Ok(ctx)
Expand Down
50 changes: 50 additions & 0 deletions crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use arkflow_core::Error;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::AggregateUDF;
use std::sync::{Arc, RwLock};
use tracing::debug;

lazy_static::lazy_static! {
static ref UDFS: RwLock<Vec<Arc<AggregateUDF>>> = RwLock::new(Vec::new());
}

/// Register a new aggregate UDF (User Defined Function).
///
/// This function wraps the provided AggregateUDF instance in an Arc and stores it in the global UDFS list,
/// so it can later be registered with the FunctionRegistry.
///
/// # Arguments
/// * `udf` - The AggregateUDF instance to register.
pub fn register(udf: AggregateUDF) {
let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
udfs.push(Arc::new(udf));
}

pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {
let aggregate_udfs = UDFS
.read()
.expect("Failed to acquire read lock for aggregate UDFS");
aggregate_udfs
.iter()
.try_for_each(|udf| {
let existing_udf = registry.register_udaf(Arc::clone(udf))?;
if let Some(existing_udf) = existing_udf {
debug!("Overwrite existing aggregate UDF: {}", existing_udf.name());
}
Ok(()) as datafusion::common::Result<()>
})
.map_err(|e| Error::Config(format!("Failed to register aggregate UDFs: {}", e)))
}
28 changes: 28 additions & 0 deletions crates/arkflow-plugin/src/processor/udf/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,32 @@
* limitations under the License.
*/

/// Module for managing scalar user-defined functions (UDFs) for SQL processing.
///
/// This module provides functionality to register and initialize UDFs in a thread-safe manner.
/// UDFs are registered globally and then added to the SQL function registry during context initialization.
use arkflow_core::Error;
use datafusion::execution::FunctionRegistry;

pub mod aggregate_udf;
pub mod scalar_udf;
pub mod window_udf;

/// Initializes and registers all user-defined functions (UDFs).
///
/// This function calls the `init` function of each UDF module (aggregate, scalar, window)
/// to register their respective functions with the provided `FunctionRegistry`.
///
/// # Arguments
///
/// * `registry` - A mutable reference to a type implementing `FunctionRegistry` where the UDFs will be registered.
///
/// # Errors
///
/// Returns an `Error` if any of the underlying `init` calls fail during registration.
pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {
aggregate_udf::init(registry)?;
scalar_udf::init(registry)?;
window_udf::init(registry)?;
Ok(())
}
51 changes: 51 additions & 0 deletions crates/arkflow-plugin/src/processor/udf/scalar_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use arkflow_core::Error;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::ScalarUDF;
use std::sync::{Arc, RwLock};
use tracing::debug;

lazy_static::lazy_static! {
static ref UDFS: RwLock<Vec<Arc<ScalarUDF>>> = RwLock::new(Vec::new());
}

/// Register a new scalar UDF.
///
/// This function adds a UDF to the global registry. The UDF will be available for use
/// in SQL queries after the next call to `init`.
///
/// # Arguments
///
/// * `udf` - The UDF to register, wrapped in an Arc for shared ownership.
pub fn register(udf: ScalarUDF) {
let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
udfs.push(Arc::new(udf));
}

pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {
let scalar_udfs = UDFS
.read()
.expect("Failed to acquire read lock for scalar UDFS");
scalar_udfs
.iter()
.try_for_each(|udf| {
let existing_udf = registry.register_udf(Arc::clone(udf))?;
if let Some(existing_udf) = existing_udf {
debug!("Overwrite existing scalar UDF: {}", existing_udf.name());
}
Ok(()) as datafusion::common::Result<()>
})
.map_err(|e| Error::Config(format!("Failed to register scalar UDFs: {}", e)))
}
50 changes: 50 additions & 0 deletions crates/arkflow-plugin/src/processor/udf/window_udf.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
use arkflow_core::Error;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::WindowUDF;
use std::sync::{Arc, RwLock};
use tracing::debug;

lazy_static::lazy_static! {
static ref UDFS: RwLock<Vec<Arc<WindowUDF>>> = RwLock::new(Vec::new());
}

/// Register a new window UDF (User Defined Function).
///
/// This function wraps the provided WindowUDF instance in an Arc and stores it in the global UDFS list,
/// so it can later be registered with the FunctionRegistry.
///
/// # Arguments
/// * `udf` - The WindowUDF instance to register.
pub fn register(udf: WindowUDF) {
let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
udfs.push(Arc::new(udf));
}

pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {
let window_udfs = UDFS
.read()
.expect("Failed to acquire read lock for window UDFS");
window_udfs
.iter()
.try_for_each(|udf| {
let existing_udf = registry.register_udwf(Arc::clone(udf))?;
if let Some(existing_udf) = existing_udf {
debug!("Overwrite existing window UDF: {}", existing_udf.name());
}
Ok(()) as datafusion::common::Result<()>
})
.map_err(|e| Error::Config(format!("Failed to register window UDFs: {}", e)))
}
1 change: 0 additions & 1 deletion crates/arkflow/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
output::init();
processor::init();
buffer::init();

let mut cli = Cli::default();
cli.parse()?;
cli.run().await
Expand Down