-
Notifications
You must be signed in to change notification settings - Fork 39
feat(sql): Custom SQL udf are supported #244
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
|
Caution Review failedThe pull request is closed. WalkthroughThe changes introduce a new module for managing scalar, aggregate, and window user-defined functions (UDFs) within the SQL processing component of the project. Each UDF type has a global, thread-safe registry with functions to register new UDFs and an Changes
Sequence Diagram(s)sequenceDiagram
participant User
participant SqlProcessor
participant udf_mod
participant SessionContext
User->>SqlProcessor: create_session_context()
SqlProcessor->>udf_mod: init(&mut ctx)
udf_mod->>SessionContext: Register scalar UDFs
udf_mod->>SessionContext: Register aggregate UDFs
udf_mod->>SessionContext: Register window UDFs
SqlProcessor->>SessionContext: Register JSON functions
Possibly related PRs
Poem
📜 Recent review detailsConfiguration used: CodeRabbit UI 📒 Files selected for processing (2)
🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments. CodeRabbit Commands (Invoked using PR comments)
Other keywords and placeholders
CodeRabbit Configuration File (
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (3)
24-27: Consider handling potential lock poisoning.While
unwrap()is commonly used with locks in Rust, it can lead to panics if the lock is poisoned (which happens when a thread holding the lock panics). In a production system, it might be better to handle this case gracefully.pub fn register(udf: Arc<ScalarUDF>) { - let mut udfs = SCALAR_UDFS.write().unwrap(); + let mut udfs = SCALAR_UDFS.write().expect("Failed to acquire write lock for SCALAR_UDFS"); udfs.push(udf); }
29-40: Add explicit return statement for clarity.The
initfunction is missing an explicit return statement at the end. While Rust allows the last expression to be implicitly returned, adding a return statement would improve readability.pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> { let udfs = SCALAR_UDFS.read().unwrap(); udfs.iter() .try_for_each(|udf| { let existing_udf = registry.register_udf(Arc::clone(&udf))?; if let Some(existing_udf) = existing_udf { debug!("Overwrite existing UDF: {}", existing_udf.name()); } Ok(()) as datafusion::common::Result<()> }) - .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e))) + .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e))) }Consider handling the lock poisoning case here as well:
pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> { - let udfs = SCALAR_UDFS.read().unwrap(); + let udfs = SCALAR_UDFS.read().expect("Failed to acquire read lock for SCALAR_UDFS"); // Rest of the function remains the same
14-18: Consider adding documentation comments.Since this is a new module with public functions, adding documentation comments (using
///) would help users understand how to use the module and its functions.use arkflow_core::Error; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::ScalarUDF; use std::sync::{Arc, RwLock}; use tracing::log::debug; +/// Module for managing scalar user-defined functions (UDFs) for SQL processing. +/// +/// This module provides functionality to register and initialize UDFs in a thread-safe manner. +/// UDFs are registered globally and then added to the SQL function registry during context initialization. lazy_static::lazy_static! { static ref SCALAR_UDFS: RwLock<Vec<Arc<ScalarUDF>>> = RwLock::new(Vec::new()); } +/// Register a new scalar UDF. +/// +/// This function adds a UDF to the global registry. The UDF will be available for use +/// in SQL queries after the next call to `init`. +/// +/// # Arguments +/// +/// * `udf` - The UDF to register, wrapped in an Arc for shared ownership. pub fn register(udf: Arc<ScalarUDF>) { // ... }crates/arkflow-plugin/src/processor/sql/mod.rs (3)
122-129: Consider updating function documentation.The function comment now mentions only JSON functions, but it's also initializing scalar UDFs.
- /// Create a new session context with JSON functions registered + /// Create a new session context with scalar UDFs and JSON functions registered fn create_session_context() -> Result<SessionContext, Error> { let mut ctx = SessionContext::new(); scalar_udf::init(&mut ctx)?; datafusion_functions_json::register_all(&mut ctx) .map_err(|e| Error::Process(format!("Registration JSON function failed: {}", e)))?; Ok(ctx) }
123-127: Consider handling errors consistently.The error handling for JSON function registration uses
Error::Process, while the scalar UDF initialization presumably usesError::Config(based on the implementation in scalar_udf.rs). It might be better to handle both error types consistently.fn create_session_context() -> Result<SessionContext, Error> { let mut ctx = SessionContext::new(); scalar_udf::init(&mut ctx)?; datafusion_functions_json::register_all(&mut ctx) - .map_err(|e| Error::Process(format!("Registration JSON function failed: {}", e)))?; + .map_err(|e| Error::Config(format!("Registration JSON function failed: {}", e)))?; Ok(ctx) }Alternatively, you could change the error type in scalar_udf.rs to be consistent with this one.
167-256: Consider adding tests for scalar UDFs.The file includes tests for basic SQL processor functionality, but there are no tests specifically for the new scalar UDF functionality. Adding tests would ensure that UDFs work as expected.
You could add a test like this:
#[tokio::test] async fn test_sql_processor_with_scalar_udf() { // Register a test UDF let test_udf = create_test_udf(); scalar_udf::register(Arc::new(test_udf)); let processor = SqlProcessor::new(SqlProcessorConfig { query: "SELECT test_udf(id) FROM flow".to_string(), table_name: None, }) .unwrap(); let schema = Arc::new(Schema::new(vec![ Field::new("id", DataType::Int64, false), ])); let batch = RecordBatch::try_new( schema, vec![ Arc::new(Int64Array::from(vec![1, 2, 3])), ], ) .unwrap(); let result = processor .process(MessageBatch::new_arrow(batch)) .await .unwrap(); assert_eq!(result.len(), 1); // Add more assertions to verify the UDF behavior } fn create_test_udf() -> ScalarUDF { // Implementation to create a test UDF // ... }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (2)
crates/arkflow-plugin/src/processor/sql/mod.rs(2 hunks)crates/arkflow-plugin/src/processor/sql/scalar_udf.rs(1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/sql/mod.rs (1)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (1)
init(29-40)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (3)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (1)
20-22: Thread-safe global UDF registry is well designed.The use of
lazy_staticwithRwLockis a good approach for a thread-safe global registry of scalar UDFs. This implementation ensures that UDFs can be registered from anywhere in the application and will be properly synchronized.crates/arkflow-plugin/src/processor/sql/mod.rs (2)
19-19: Module declaration for scalar UDF is properly placed.The module declaration is correctly placed at the beginning of the file with other imports and module declarations.
125-125: UDF initialization is properly integrated in the workflow.The initialization of scalar UDFs is correctly placed right after creating a new session context and before registering JSON functions. The error propagation with
?is consistent with the rest of the function.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (4)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (4)
1-27: Remove duplicate license header.The Apache 2.0 license header appears twice at the beginning of the file.
/* * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ -/* - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */
57-70: Improve error messages and add documentation for the init function.The
initfunction lacks documentation explaining its purpose and intended usage. Additionally, the error messages for lock acquisition failures could be more informative.+/// Initialize all registered scalar UDFs into the given function registry. +/// +/// This function is meant for internal use by the SQL processor to register all UDFs +/// that have been added via the `register` function. +/// +/// # Arguments +/// +/// * `registry` - The function registry where UDFs will be registered +/// +/// # Returns +/// +/// A Result indicating success or failure of the registration process pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> { let udfs = SCALAR_UDFS .read() - .expect("Failed to acquire read lock for SCALAR_UDFS"); + .expect("Failed to acquire read lock for SCALAR_UDFS - this might indicate a poisoned lock due to a previous panic"); udfs.iter() .try_for_each(|udf| { let existing_udf = registry.register_udf(Arc::clone(&udf))?; if let Some(existing_udf) = existing_udf { debug!("Overwrite existing UDF: {}", existing_udf.name()); } Ok(()) as datafusion::common::Result<()> }) - .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e))) + .map_err(|e| Error::Config(format!("Failed to register scalar UDFs in function registry: {}", e)))
50-54: Consider adding an improved error message for write lock acquisition.While the current implementation works, providing a more informative error message for lock acquisition failures would help with debugging.
let mut udfs = SCALAR_UDFS .write() - .expect("Failed to acquire write lock for SCALAR_UDFS"); + .expect("Failed to acquire write lock for SCALAR_UDFS - this might indicate a poisoned lock due to a previous panic");
63-67: Consider adding UDF name to debug log during registration.To improve debugging, it would be helpful to log both the name of the UDF being registered and the existing UDF being overwritten.
let existing_udf = registry.register_udf(Arc::clone(&udf))?; if let Some(existing_udf) = existing_udf { - debug!("Overwrite existing UDF: {}", existing_udf.name()); + debug!("Overwriting existing UDF '{}' with new implementation", existing_udf.name()); + } else { + debug!("Registered new UDF: {}", udf.name()); }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (3)
crates/arkflow-plugin/src/processor/sql.rs(2 hunks)crates/arkflow-plugin/src/processor/udf/mod.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/scalar_udf.rs(1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (6)
crates/arkflow-plugin/src/processor/udf/mod.rs (1)
14-14: LGTM! Proper module export for UDF functionality.The addition of
pub mod scalar_udf;correctly exposes the scalar UDF module as part of the public API. This change aligns with the PR objective to support custom SQL UDFs.crates/arkflow-plugin/src/processor/sql.rs (2)
19-19: LGTM! Appropriate import added.The import for
scalar_udfis correctly added to bring the new module into scope for use in the SQL processor.
121-128: Good integration of UDF initialization.The function now properly initializes scalar UDFs before registering JSON functions, and the docstring has been updated to reflect this addition. The error handling follows the existing pattern, maintaining consistency in the codebase.
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (3)
34-40: LGTM! Good use of lazy_static for global UDF registry.The use of
RwLockwithlazy_staticprovides a thread-safe global registry for scalar UDFs, which is appropriate for this use case. The module documentation clearly explains the purpose of this module.
42-55: LGTM! Well-documented public API for registering UDFs.The
registerfunction has comprehensive documentation that clearly explains its purpose and usage. The implementation correctly acquires a write lock on the global registry before adding the new UDF.
28-33: Verify import usage with integration tests.The imports look correct, but it would be beneficial to have integration tests to verify the UDF registration and initialization functionality works as expected.
Consider adding tests that:
- Register a custom UDF
- Initialize it in a session context
- Execute a SQL query that uses the UDF
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
🧹 Nitpick comments (6)
crates/arkflow-plugin/src/processor/mod.rs (1)
25-25: Consider adding documentation for the newly publicudfmodule.Making the
udfmodule public exposes it as part of the crate's API. It would be beneficial to add documentation comments explaining the purpose and usage of this module, similar to the existing comment on line 15 for the processor component.- pub mod udf; + /// User-defined functions (UDFs) module for SQL processing + /// + /// This module provides functionality to register and manage scalar, aggregate, and window UDFs. + pub mod udf;crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)
21-24: Consider adding error handling instead of using expect.The
expect()call would cause the program to panic if the lock acquisition fails. In a library context, it's often better to handle this error gracefully by returning a Result instead of panicking.- pub fn register(udf: Arc<AggregateUDF>) { - let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS"); - udfs.push(udf); + pub fn register(udf: Arc<AggregateUDF>) -> Result<(), String> { + let mut udfs = UDFS.write().map_err(|e| format!("Failed to acquire write lock for UDFS: {}", e))?; + udfs.push(udf); + Ok(()) }crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)
21-24: Consider adding error handling instead of using expect.Similar to the aggregate_udf.rs file, the
expect()call could cause a panic. Consider returning a Result to handle lock acquisition failures gracefully.- pub fn register(udf: Arc<WindowUDF>) { - let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS"); - udfs.push(udf); + pub fn register(udf: Arc<WindowUDF>) -> Result<(), String> { + let mut udfs = UDFS.write().map_err(|e| format!("Failed to acquire write lock for UDFS: {}", e))?; + udfs.push(udf); + Ok(()) }crates/arkflow-plugin/src/processor/udf/mod.rs (3)
52-65: There's a typo in the debug message for window UDFs.The debug message refers to "windows UDF" but should be "window UDF" to maintain consistency with the naming elsewhere.
- debug!("Overwrite existing windows UDF: {}", existing_udf.name()); + debug!("Overwrite existing window UDF: {}", existing_udf.name());Also, in the error message:
- .map_err(|e| Error::Config(format!("Failed to register windows UDFs: {}", e)))?; + .map_err(|e| Error::Config(format!("Failed to register window UDFs: {}", e)))?;
23-66: Consider refactoring to reduce code duplication.The registration code for scalar, aggregate, and window UDFs follows the same pattern with only slight differences. Consider refactoring using a generic function to avoid repetition.
Here's a possible implementation:
pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> { // Helper function to register UDFs with appropriate error handling fn register_udfs<T, F>( udfs: &[Arc<T>], registry_fn: F, udf_type: &str ) -> Result<(), Error> where F: Fn(&mut dyn FunctionRegistry, Arc<T>) -> datafusion::common::Result<Option<Arc<T>>> { udfs.iter() .try_for_each(|udf| { let existing_udf = registry_fn(registry, Arc::clone(udf))?; if let Some(existing_udf) = existing_udf { debug!("Overwrite existing {} UDF: {}", udf_type, existing_udf.name()); } Ok(()) as datafusion::common::Result<()> }) .map_err(|e| Error::Config(format!("Failed to register {} UDFs: {}", udf_type, e))) } // Register scalar UDFs let scalar_udfs = scalar_udf::UDFS .read() .expect("Failed to acquire read lock for scalar UDFS"); register_udfs( &scalar_udfs, |reg, udf| reg.register_udf(udf), "scalar" )?; // Register aggregate UDFs let aggregate_udfs = aggregate_udf::UDFS .read() .expect("Failed to acquire read lock for aggregate UDFS"); register_udfs( &aggregate_udfs, |reg, udf| reg.register_udaf(udf), "aggregate" )?; // Register window UDFs let window_udfs = window_udf::UDFS .read() .expect("Failed to acquire read lock for window UDFS"); register_udfs( &window_udfs, |reg, udf| reg.register_udwf(udf), "window" )?; Ok(()) }
24-36: Consider handling lock acquisition failures more gracefully.All three UDF registrations use
expect()when acquiring read locks, which will panic if lock acquisition fails. Consider usingmap_errto convert the lock error to yourError::Configtype.For example:
- let scalar_udfs = crate::processor::udf::scalar_udf::UDFS - .read() - .expect("Failed to acquire read lock for scalar UDFS"); + let scalar_udfs = crate::processor::udf::scalar_udf::UDFS + .read() + .map_err(|e| Error::Config(format!("Failed to acquire read lock for scalar UDFs: {}", e)))?;Also applies to: 38-50, 52-64
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (7)
crates/arkflow-plugin/src/processor/mod.rs(1 hunks)crates/arkflow-plugin/src/processor/sql.rs(2 hunks)crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/mod.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/scalar_udf.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/window_udf.rs(1 hunks)crates/arkflow/src/main.rs(0 hunks)
💤 Files with no reviewable changes (1)
- crates/arkflow/src/main.rs
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/arkflow-plugin/src/processor/sql.rs
- crates/arkflow-plugin/src/processor/udf/scalar_udf.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (2)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (1)
register(48-51)crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)
register(21-24)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (5)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)
14-19: The implementation looks good and follows the established pattern.The implementation for aggregate UDF registration follows the same pattern as the scalar and window UDF modules. It properly uses thread-safe access via RwLock.
crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)
14-19: The implementation looks good and follows the established pattern.The implementation for window UDF registration follows the same pattern as the scalar and aggregate UDF modules with proper thread-safe access.
crates/arkflow-plugin/src/processor/udf/mod.rs (3)
14-21: LGTM! Proper module organization and imports.The module organization into scalar, aggregate, and window UDF submodules is clean and logically structured. All necessary imports are present.
23-37: The scalar UDF registration implementation looks good.The implementation correctly:
- Acquires a read lock on the scalar UDFs collection
- Iterates through each UDF and registers it with the provided registry
- Properly handles and logs overwrites of existing UDFs
- Maps DataFusion errors to the application's Error type
23-66:❓ Verification inconclusive
Verify error handling for UDF registration.
The code correctly handles errors when registering UDFs, but I recommend verifying the behavior when multiple UDFs need to be registered and an error occurs with one of them. Does the current implementation stop at the first error or continue registering the remaining UDFs?
🏁 Script executed:
#!/bin/bash # Check if there are any tests for UDF registration error handling rg -A 5 -B 5 "test.*register.*error|test.*udf.*error" --type rust # Check how errors are handled in other similar registration functions rg -A 5 "try_for_each.*map_err" --type rustLength of output: 113
Confirm UDF registration error flow
I didn’t find any existing tests for UDF registration error cases, and by design
try_for_eachwill stop on the firstErr, so any subsequent UDFs won’t be registered. Please verify whether this early exit is intended or if you need to register all UDFs and surface all errors at once.Action items:
- Add unit tests in
crates/arkflow-plugin/src/processor/udfthat simulate a failing UDF registration and confirm when/if the loop aborts.- If you intend to register every UDF regardless of individual failures, consider replacing
try_for_eachwith a loop that collects all errors and returns an aggregated result.
26f9de6 to
db2af20
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 1
🧹 Nitpick comments (7)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (2)
24-27: Consider preventing duplicate registrations.
registerblindly pushes every UDF into the globalVec, so calling it twice with the same name stores two identical entries.
At init‑time the second entry silently overwrites the first in theFunctionRegistry, but the duplicateArcs remain in memory.If you don’t need multiple versions of the same window UDF, switching to a
HashMap<String, Arc<WindowUDF>>keyed byudf.name()eliminates duplicates and reduces overhead.
30-32: Minor: unnecessary fully‑qualified path.Inside the same module you can reference
UDFSdirectly instead of
crate::processor::udf::window_udf::UDFS; this shortens the line and avoids refactor‑sensitive paths.- let window_udfs = crate::processor::udf::window_udf::UDFS + let window_udfs = UDFScrates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (5)
24-27: Consider returning Result instead of using expectUsing
expectwill cause a panic if the lock can't be acquired, which might be problematic in production environments. Consider returning aResultinstead to allow callers to handle the error gracefully.-pub fn register(udf: AggregateUDF) { - let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS"); - udfs.push(Arc::new(udf)); +pub fn register(udf: AggregateUDF) -> Result<(), Error> { + let mut udfs = UDFS.write().map_err(|e| Error::Config( + format!("Failed to acquire write lock for UDFS: {}", e) + ))?; + udfs.push(Arc::new(udf)); + Ok(()) }
30-32: Simplify path to UDFSThe fully qualified path to
UDFSis unnecessary sinceUDFSis defined in this module.- let aggregate_udfs = crate::processor::udf::aggregate_udf::UDFS + let aggregate_udfs = UDFS .read() .expect("Failed to acquire read lock for aggregate UDFS");
31-32: Consider returning Result instead of using expectSimilar to the
registerfunction, usingexpecthere will panic if the lock can't be acquired. Consider propagating the error instead.- .read() - .expect("Failed to acquire read lock for aggregate UDFS"); + .read() + .map_err(|e| Error::Config( + format!("Failed to acquire read lock for aggregate UDFS: {}", e) + ))?;
14-43: Add documentation commentsConsider adding documentation comments to explain the purpose and usage of the module and its functions. This will help other developers understand how to use this module correctly.
+/// Module for managing aggregate User-Defined Functions (UDFs). +/// Provides thread-safe global storage and registration of aggregate UDFs. use arkflow_core::Error; use datafusion::execution::FunctionRegistry; use datafusion::logical_expr::AggregateUDF; use std::sync::{Arc, RwLock}; use tracing::debug; lazy_static::lazy_static! { + /// Thread-safe global registry for aggregate UDFs. static ref UDFS: RwLock<Vec<Arc<AggregateUDF>>> = RwLock::new(Vec::new()); } +/// Registers an aggregate UDF to the global registry. +/// +/// # Arguments +/// +/// * `udf` - The aggregate UDF to register. +/// +/// # Panics +/// +/// Panics if it fails to acquire the write lock for the UDF registry. pub fn register(udf: AggregateUDF) { let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS"); udfs.push(Arc::new(udf)); } +/// Initializes and registers all aggregate UDFs with the provided function registry. +/// +/// # Arguments +/// +/// * `registry` - The function registry to register UDFs with. +/// +/// # Returns +/// +/// A Result indicating success or containing an error if registration fails. pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {
34-42: Improve error context with specific UDF informationWhen registering fails, it would be helpful to include more context about which specific UDF failed to register.
aggregate_udfs .iter() - .try_for_each(|udf| { + .try_fold((), |_, udf| { let existing_udf = registry.register_udaf(Arc::clone(udf))?; if let Some(existing_udf) = existing_udf { debug!("Overwrite existing aggregate UDF: {}", existing_udf.name()); } Ok(()) as datafusion::common::Result<()> }) - .map_err(|e| Error::Config(format!("Failed to register aggregate UDFs: {}", e))) + .map_err(|e| Error::Config(format!("Failed to register aggregate UDFs: {}", e)))
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (4)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/mod.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/scalar_udf.rs(1 hunks)crates/arkflow-plugin/src/processor/udf/window_udf.rs(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- crates/arkflow-plugin/src/processor/udf/mod.rs
- crates/arkflow-plugin/src/processor/udf/scalar_udf.rs
🧰 Additional context used
🧬 Code Graph Analysis (2)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (3)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (2)
register(47-50)init(52-66)crates/arkflow-plugin/src/processor/udf/window_udf.rs (2)
register(24-27)init(29-43)crates/arkflow-plugin/src/processor/udf/mod.rs (1)
init(26-31)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (5)
crates/arkflow-plugin/src/processor/sql.rs (2)
new(52-64)init(162-164)crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (2)
register(24-27)init(29-43)crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (2)
register(47-50)init(52-66)crates/arkflow-plugin/src/processor/udf/mod.rs (1)
init(26-31)crates/arkflow-plugin/src/processor/mod.rs (1)
init(31-38)
⏰ Context from checks skipped due to timeout of 90000ms (2)
- GitHub Check: build
- GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)
1-44: Well-structured module for aggregate UDF managementThe overall structure of this module follows a good design pattern for managing aggregate UDFs, consistent with the scalar and window UDF implementations. The thread-safe global registry with RwLock provides a clean way to register and access UDFs.
Summary by CodeRabbit