Skip to content

Conversation

@chenquan
Copy link
Collaborator

@chenquan chenquan commented Apr 18, 2025

Summary by CodeRabbit

  • New Features
    • Added support for registering and managing custom scalar, aggregate, and window user-defined functions (UDFs) for SQL processing.
  • Documentation
    • Updated documentation to clarify that both UDFs and JSON functions are registered in the SQL session context.
  • Chores
    • Introduced new modules for scalar, aggregate, and window UDFs and updated module declarations for improved organization.

@chenquan chenquan marked this pull request as draft April 18, 2025 04:21
@coderabbitai
Copy link
Contributor

coderabbitai bot commented Apr 18, 2025

Caution

Review failed

The pull request is closed.

Walkthrough

The changes introduce a new module for managing scalar, aggregate, and window user-defined functions (UDFs) within the SQL processing component of the project. Each UDF type has a global, thread-safe registry with functions to register new UDFs and an init function to register all UDFs into a function registry with error handling and logging. The SqlProcessor::create_session_context function is updated to initialize these UDFs in addition to existing JSON functions. The udf module visibility is changed to public, and new submodules for each UDF type are added.

Changes

File(s) Change Summary
crates/arkflow-plugin/src/processor/sql.rs Imports udf module and updates SqlProcessor::create_session_context to initialize all UDF types before registering JSON functions. Doc comments updated.
crates/arkflow-plugin/src/processor/mod.rs Changed udf module visibility from private to public.
crates/arkflow-plugin/src/processor/udf/mod.rs Added public submodules: scalar_udf, aggregate_udf, and window_udf. Added generic init function delegating to submodules.
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs New file: Defines global thread-safe registry for scalar UDFs with register and init functions including error handling and logging.
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs New file: Defines global thread-safe registry for aggregate UDFs with register and init functions including error handling and logging.
crates/arkflow-plugin/src/processor/udf/window_udf.rs New file: Defines global thread-safe registry for window UDFs with register and init functions including error handling and logging.
crates/arkflow/src/main.rs Removed a trailing blank line after buffer::init() call; no functional changes.

Sequence Diagram(s)

sequenceDiagram
    participant User
    participant SqlProcessor
    participant udf_mod
    participant SessionContext

    User->>SqlProcessor: create_session_context()
    SqlProcessor->>udf_mod: init(&mut ctx)
    udf_mod->>SessionContext: Register scalar UDFs
    udf_mod->>SessionContext: Register aggregate UDFs
    udf_mod->>SessionContext: Register window UDFs
    SqlProcessor->>SessionContext: Register JSON functions
Loading

Possibly related PRs

Poem

🐇 In code’s quiet burrow, functions awake,
Scalars, aggregates, windows all partake.
Locked and loaded, ready to run,
UDFs join the SQL fun.
Context enriched, the queries leap—
A rabbit’s joy, in code so deep! 🌱✨


📜 Recent review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 3be1ee9 and ed4c51d.

📒 Files selected for processing (2)
  • crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/window_udf.rs (1 hunks)

🪧 Tips

Chat

There are 3 ways to chat with CodeRabbit:

  • Review comments: Directly reply to a review comment made by CodeRabbit. Example:
    • I pushed a fix in commit <commit_id>, please review it.
    • Generate unit testing code for this file.
    • Open a follow-up GitHub issue for this discussion.
  • Files and specific lines of code (under the "Files changed" tab): Tag @coderabbitai in a new review comment at the desired location with your query. Examples:
    • @coderabbitai generate unit testing code for this file.
    • @coderabbitai modularize this function.
  • PR comments: Tag @coderabbitai in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:
    • @coderabbitai gather interesting stats about this repository and render them as a table. Additionally, render a pie chart showing the language distribution in the codebase.
    • @coderabbitai read src/utils.ts and generate unit testing code.
    • @coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
    • @coderabbitai help me debug CodeRabbit configuration file.

Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.

CodeRabbit Commands (Invoked using PR comments)

  • @coderabbitai pause to pause the reviews on a PR.
  • @coderabbitai resume to resume the paused reviews.
  • @coderabbitai review to trigger an incremental review. This is useful when automatic reviews are disabled for the repository.
  • @coderabbitai full review to do a full review from scratch and review all the files again.
  • @coderabbitai summary to regenerate the summary of the PR.
  • @coderabbitai generate docstrings to generate docstrings for this PR.
  • @coderabbitai resolve resolve all the CodeRabbit review comments.
  • @coderabbitai configuration to show the current CodeRabbit configuration for the repository.
  • @coderabbitai help to get help.

Other keywords and placeholders

  • Add @coderabbitai ignore anywhere in the PR description to prevent this PR from being reviewed.
  • Add @coderabbitai summary to generate the high-level summary at a specific location in the PR description.
  • Add @coderabbitai anywhere in the PR title to generate the title automatically.

CodeRabbit Configuration File (.coderabbit.yaml)

  • You can programmatically configure CodeRabbit by adding a .coderabbit.yaml file to the root of your repository.
  • Please see the configuration documentation for more information.
  • If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation: # yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json

Documentation and Community

  • Visit our Documentation for detailed information on how to use CodeRabbit.
  • Join our Discord Community to get help, request features, and share feedback.
  • Follow us on X/Twitter for updates and announcements.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (3)

24-27: Consider handling potential lock poisoning.

While unwrap() is commonly used with locks in Rust, it can lead to panics if the lock is poisoned (which happens when a thread holding the lock panics). In a production system, it might be better to handle this case gracefully.

pub fn register(udf: Arc<ScalarUDF>) {
-    let mut udfs = SCALAR_UDFS.write().unwrap();
+    let mut udfs = SCALAR_UDFS.write().expect("Failed to acquire write lock for SCALAR_UDFS");
    udfs.push(udf);
}

29-40: Add explicit return statement for clarity.

The init function is missing an explicit return statement at the end. While Rust allows the last expression to be implicitly returned, adding a return statement would improve readability.

pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> {
    let udfs = SCALAR_UDFS.read().unwrap();
    udfs.iter()
        .try_for_each(|udf| {
            let existing_udf = registry.register_udf(Arc::clone(&udf))?;
            if let Some(existing_udf) = existing_udf {
                debug!("Overwrite existing UDF: {}", existing_udf.name());
            }
            Ok(()) as datafusion::common::Result<()>
        })
-        .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e)))
+        .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e)))
}

Consider handling the lock poisoning case here as well:

pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> {
-    let udfs = SCALAR_UDFS.read().unwrap();
+    let udfs = SCALAR_UDFS.read().expect("Failed to acquire read lock for SCALAR_UDFS");
    // Rest of the function remains the same

14-18: Consider adding documentation comments.

Since this is a new module with public functions, adding documentation comments (using ///) would help users understand how to use the module and its functions.

use arkflow_core::Error;
use datafusion::execution::FunctionRegistry;
use datafusion::logical_expr::ScalarUDF;
use std::sync::{Arc, RwLock};
use tracing::log::debug;

+/// Module for managing scalar user-defined functions (UDFs) for SQL processing.
+/// 
+/// This module provides functionality to register and initialize UDFs in a thread-safe manner.
+/// UDFs are registered globally and then added to the SQL function registry during context initialization.

lazy_static::lazy_static! {
    static ref SCALAR_UDFS: RwLock<Vec<Arc<ScalarUDF>>> = RwLock::new(Vec::new());
}

+/// Register a new scalar UDF.
+///
+/// This function adds a UDF to the global registry. The UDF will be available for use
+/// in SQL queries after the next call to `init`.
+///
+/// # Arguments
+///
+/// * `udf` - The UDF to register, wrapped in an Arc for shared ownership.
pub fn register(udf: Arc<ScalarUDF>) {
    // ...
}
crates/arkflow-plugin/src/processor/sql/mod.rs (3)

122-129: Consider updating function documentation.

The function comment now mentions only JSON functions, but it's also initializing scalar UDFs.

-    /// Create a new session context with JSON functions registered
+    /// Create a new session context with scalar UDFs and JSON functions registered
    fn create_session_context() -> Result<SessionContext, Error> {
        let mut ctx = SessionContext::new();
        scalar_udf::init(&mut ctx)?;
        datafusion_functions_json::register_all(&mut ctx)
            .map_err(|e| Error::Process(format!("Registration JSON function failed: {}", e)))?;
        Ok(ctx)
    }

123-127: Consider handling errors consistently.

The error handling for JSON function registration uses Error::Process, while the scalar UDF initialization presumably uses Error::Config (based on the implementation in scalar_udf.rs). It might be better to handle both error types consistently.

    fn create_session_context() -> Result<SessionContext, Error> {
        let mut ctx = SessionContext::new();
        scalar_udf::init(&mut ctx)?;
        datafusion_functions_json::register_all(&mut ctx)
-            .map_err(|e| Error::Process(format!("Registration JSON function failed: {}", e)))?;
+            .map_err(|e| Error::Config(format!("Registration JSON function failed: {}", e)))?;
        Ok(ctx)
    }

Alternatively, you could change the error type in scalar_udf.rs to be consistent with this one.


167-256: Consider adding tests for scalar UDFs.

The file includes tests for basic SQL processor functionality, but there are no tests specifically for the new scalar UDF functionality. Adding tests would ensure that UDFs work as expected.

You could add a test like this:

#[tokio::test]
async fn test_sql_processor_with_scalar_udf() {
    // Register a test UDF
    let test_udf = create_test_udf();
    scalar_udf::register(Arc::new(test_udf));
    
    let processor = SqlProcessor::new(SqlProcessorConfig {
        query: "SELECT test_udf(id) FROM flow".to_string(),
        table_name: None,
    })
    .unwrap();

    let schema = Arc::new(Schema::new(vec![
        Field::new("id", DataType::Int64, false),
    ]));

    let batch = RecordBatch::try_new(
        schema,
        vec![
            Arc::new(Int64Array::from(vec![1, 2, 3])),
        ],
    )
    .unwrap();

    let result = processor
        .process(MessageBatch::new_arrow(batch))
        .await
        .unwrap();

    assert_eq!(result.len(), 1);
    // Add more assertions to verify the UDF behavior
}

fn create_test_udf() -> ScalarUDF {
    // Implementation to create a test UDF
    // ...
}
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between fda4279 and 3e4bbeb.

📒 Files selected for processing (2)
  • crates/arkflow-plugin/src/processor/sql/mod.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (1 hunks)
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/sql/mod.rs (1)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (1)
  • init (29-40)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (3)
crates/arkflow-plugin/src/processor/sql/scalar_udf.rs (1)

20-22: Thread-safe global UDF registry is well designed.

The use of lazy_static with RwLock is a good approach for a thread-safe global registry of scalar UDFs. This implementation ensures that UDFs can be registered from anywhere in the application and will be properly synchronized.

crates/arkflow-plugin/src/processor/sql/mod.rs (2)

19-19: Module declaration for scalar UDF is properly placed.

The module declaration is correctly placed at the beginning of the file with other imports and module declarations.


125-125: UDF initialization is properly integrated in the workflow.

The initialization of scalar UDFs is correctly placed right after creating a new session context and before registering JSON functions. The error propagation with ? is consistent with the rest of the function.

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (4)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (4)

1-27: Remove duplicate license header.

The Apache 2.0 license header appears twice at the beginning of the file.

 /*
  *    Licensed under the Apache License, Version 2.0 (the "License");
  *    you may not use this file except in compliance with the License.
  *    You may obtain a copy of the License at
  *
  *        http://www.apache.org/licenses/LICENSE-2.0
  *
  *    Unless required by applicable law or agreed to in writing, software
  *    distributed under the License is distributed on an "AS IS" BASIS,
  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  *    See the License for the specific language governing permissions and
  *    limitations under the License.
  */

-/*
- *    Licensed under the Apache License, Version 2.0 (the "License");
- *    you may not use this file except in compliance with the License.
- *    You may obtain a copy of the License at
- *
- *        http://www.apache.org/licenses/LICENSE-2.0
- *
- *    Unless required by applicable law or agreed to in writing, software
- *    distributed under the License is distributed on an "AS IS" BASIS,
- *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *    See the License for the specific language governing permissions and
- *    limitations under the License.
- */

57-70: Improve error messages and add documentation for the init function.

The init function lacks documentation explaining its purpose and intended usage. Additionally, the error messages for lock acquisition failures could be more informative.

+/// Initialize all registered scalar UDFs into the given function registry.
+///
+/// This function is meant for internal use by the SQL processor to register all UDFs
+/// that have been added via the `register` function.
+///
+/// # Arguments
+///
+/// * `registry` - The function registry where UDFs will be registered
+///
+/// # Returns
+///
+/// A Result indicating success or failure of the registration process
 pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> {
     let udfs = SCALAR_UDFS
         .read()
-        .expect("Failed to acquire read lock for SCALAR_UDFS");
+        .expect("Failed to acquire read lock for SCALAR_UDFS - this might indicate a poisoned lock due to a previous panic");
     udfs.iter()
         .try_for_each(|udf| {
             let existing_udf = registry.register_udf(Arc::clone(&udf))?;
             if let Some(existing_udf) = existing_udf {
                 debug!("Overwrite existing UDF: {}", existing_udf.name());
             }
             Ok(()) as datafusion::common::Result<()>
         })
-        .map_err(|e| Error::Config(format!("Failed to register UDFs: {}", e)))
+        .map_err(|e| Error::Config(format!("Failed to register scalar UDFs in function registry: {}", e)))

50-54: Consider adding an improved error message for write lock acquisition.

While the current implementation works, providing a more informative error message for lock acquisition failures would help with debugging.

     let mut udfs = SCALAR_UDFS
         .write()
-        .expect("Failed to acquire write lock for SCALAR_UDFS");
+        .expect("Failed to acquire write lock for SCALAR_UDFS - this might indicate a poisoned lock due to a previous panic");

63-67: Consider adding UDF name to debug log during registration.

To improve debugging, it would be helpful to log both the name of the UDF being registered and the existing UDF being overwritten.

             let existing_udf = registry.register_udf(Arc::clone(&udf))?;
             if let Some(existing_udf) = existing_udf {
-                debug!("Overwrite existing UDF: {}", existing_udf.name());
+                debug!("Overwriting existing UDF '{}' with new implementation", existing_udf.name());
+            } else {
+                debug!("Registered new UDF: {}", udf.name());
             }
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between addd963 and 774e4c6.

📒 Files selected for processing (3)
  • crates/arkflow-plugin/src/processor/sql.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/udf/mod.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (1 hunks)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (6)
crates/arkflow-plugin/src/processor/udf/mod.rs (1)

14-14: LGTM! Proper module export for UDF functionality.

The addition of pub mod scalar_udf; correctly exposes the scalar UDF module as part of the public API. This change aligns with the PR objective to support custom SQL UDFs.

crates/arkflow-plugin/src/processor/sql.rs (2)

19-19: LGTM! Appropriate import added.

The import for scalar_udf is correctly added to bring the new module into scope for use in the SQL processor.


121-128: Good integration of UDF initialization.

The function now properly initializes scalar UDFs before registering JSON functions, and the docstring has been updated to reflect this addition. The error handling follows the existing pattern, maintaining consistency in the codebase.

crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (3)

34-40: LGTM! Good use of lazy_static for global UDF registry.

The use of RwLock with lazy_static provides a thread-safe global registry for scalar UDFs, which is appropriate for this use case. The module documentation clearly explains the purpose of this module.


42-55: LGTM! Well-documented public API for registering UDFs.

The register function has comprehensive documentation that clearly explains its purpose and usage. The implementation correctly acquires a write lock on the global registry before adding the new UDF.


28-33: Verify import usage with integration tests.

The imports look correct, but it would be beneficial to have integration tests to verify the UDF registration and initialization functionality works as expected.

Consider adding tests that:

  1. Register a custom UDF
  2. Initialize it in a session context
  3. Execute a SQL query that uses the UDF

Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 0

🧹 Nitpick comments (6)
crates/arkflow-plugin/src/processor/mod.rs (1)

25-25: Consider adding documentation for the newly public udf module.

Making the udf module public exposes it as part of the crate's API. It would be beneficial to add documentation comments explaining the purpose and usage of this module, similar to the existing comment on line 15 for the processor component.

- pub mod udf;
+ /// User-defined functions (UDFs) module for SQL processing
+ ///
+ /// This module provides functionality to register and manage scalar, aggregate, and window UDFs.
+ pub mod udf;
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)

21-24: Consider adding error handling instead of using expect.

The expect() call would cause the program to panic if the lock acquisition fails. In a library context, it's often better to handle this error gracefully by returning a Result instead of panicking.

- pub fn register(udf: Arc<AggregateUDF>) {
-     let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
-     udfs.push(udf);
+ pub fn register(udf: Arc<AggregateUDF>) -> Result<(), String> {
+     let mut udfs = UDFS.write().map_err(|e| format!("Failed to acquire write lock for UDFS: {}", e))?;
+     udfs.push(udf);
+     Ok(())
}
crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)

21-24: Consider adding error handling instead of using expect.

Similar to the aggregate_udf.rs file, the expect() call could cause a panic. Consider returning a Result to handle lock acquisition failures gracefully.

- pub fn register(udf: Arc<WindowUDF>) {
-     let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
-     udfs.push(udf);
+ pub fn register(udf: Arc<WindowUDF>) -> Result<(), String> {
+     let mut udfs = UDFS.write().map_err(|e| format!("Failed to acquire write lock for UDFS: {}", e))?;
+     udfs.push(udf);
+     Ok(())
}
crates/arkflow-plugin/src/processor/udf/mod.rs (3)

52-65: There's a typo in the debug message for window UDFs.

The debug message refers to "windows UDF" but should be "window UDF" to maintain consistency with the naming elsewhere.

- debug!("Overwrite existing windows UDF: {}", existing_udf.name());
+ debug!("Overwrite existing window UDF: {}", existing_udf.name());

Also, in the error message:

- .map_err(|e| Error::Config(format!("Failed to register windows UDFs: {}", e)))?;
+ .map_err(|e| Error::Config(format!("Failed to register window UDFs: {}", e)))?;

23-66: Consider refactoring to reduce code duplication.

The registration code for scalar, aggregate, and window UDFs follows the same pattern with only slight differences. Consider refactoring using a generic function to avoid repetition.

Here's a possible implementation:

pub(crate) fn init(registry: &mut dyn FunctionRegistry) -> Result<(), Error> {
    // Helper function to register UDFs with appropriate error handling
    fn register_udfs<T, F>(
        udfs: &[Arc<T>], 
        registry_fn: F, 
        udf_type: &str
    ) -> Result<(), Error> 
    where 
        F: Fn(&mut dyn FunctionRegistry, Arc<T>) -> datafusion::common::Result<Option<Arc<T>>>
    {
        udfs.iter()
            .try_for_each(|udf| {
                let existing_udf = registry_fn(registry, Arc::clone(udf))?;
                if let Some(existing_udf) = existing_udf {
                    debug!("Overwrite existing {} UDF: {}", udf_type, existing_udf.name());
                }
                Ok(()) as datafusion::common::Result<()>
            })
            .map_err(|e| Error::Config(format!("Failed to register {} UDFs: {}", udf_type, e)))
    }

    // Register scalar UDFs
    let scalar_udfs = scalar_udf::UDFS
        .read()
        .expect("Failed to acquire read lock for scalar UDFS");
    register_udfs(
        &scalar_udfs, 
        |reg, udf| reg.register_udf(udf), 
        "scalar"
    )?;

    // Register aggregate UDFs
    let aggregate_udfs = aggregate_udf::UDFS
        .read()
        .expect("Failed to acquire read lock for aggregate UDFS");
    register_udfs(
        &aggregate_udfs, 
        |reg, udf| reg.register_udaf(udf), 
        "aggregate"
    )?;

    // Register window UDFs
    let window_udfs = window_udf::UDFS
        .read()
        .expect("Failed to acquire read lock for window UDFS");
    register_udfs(
        &window_udfs, 
        |reg, udf| reg.register_udwf(udf), 
        "window"
    )?;

    Ok(())
}

24-36: Consider handling lock acquisition failures more gracefully.

All three UDF registrations use expect() when acquiring read locks, which will panic if lock acquisition fails. Consider using map_err to convert the lock error to your Error::Config type.

For example:

- let scalar_udfs = crate::processor::udf::scalar_udf::UDFS
-     .read()
-     .expect("Failed to acquire read lock for scalar UDFS");
+ let scalar_udfs = crate::processor::udf::scalar_udf::UDFS
+     .read()
+     .map_err(|e| Error::Config(format!("Failed to acquire read lock for scalar UDFs: {}", e)))?;

Also applies to: 38-50, 52-64

📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 774e4c6 and 7cb2ca1.

📒 Files selected for processing (7)
  • crates/arkflow-plugin/src/processor/mod.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/sql.rs (2 hunks)
  • crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/mod.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/window_udf.rs (1 hunks)
  • crates/arkflow/src/main.rs (0 hunks)
💤 Files with no reviewable changes (1)
  • crates/arkflow/src/main.rs
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/arkflow-plugin/src/processor/sql.rs
  • crates/arkflow-plugin/src/processor/udf/scalar_udf.rs
🧰 Additional context used
🧬 Code Graph Analysis (1)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (2)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (1)
  • register (48-51)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)
  • register (21-24)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (5)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)

14-19: The implementation looks good and follows the established pattern.

The implementation for aggregate UDF registration follows the same pattern as the scalar and window UDF modules. It properly uses thread-safe access via RwLock.

crates/arkflow-plugin/src/processor/udf/window_udf.rs (1)

14-19: The implementation looks good and follows the established pattern.

The implementation for window UDF registration follows the same pattern as the scalar and aggregate UDF modules with proper thread-safe access.

crates/arkflow-plugin/src/processor/udf/mod.rs (3)

14-21: LGTM! Proper module organization and imports.

The module organization into scalar, aggregate, and window UDF submodules is clean and logically structured. All necessary imports are present.


23-37: The scalar UDF registration implementation looks good.

The implementation correctly:

  1. Acquires a read lock on the scalar UDFs collection
  2. Iterates through each UDF and registers it with the provided registry
  3. Properly handles and logs overwrites of existing UDFs
  4. Maps DataFusion errors to the application's Error type

23-66:

❓ Verification inconclusive

Verify error handling for UDF registration.

The code correctly handles errors when registering UDFs, but I recommend verifying the behavior when multiple UDFs need to be registered and an error occurs with one of them. Does the current implementation stop at the first error or continue registering the remaining UDFs?


🏁 Script executed:

#!/bin/bash

# Check if there are any tests for UDF registration error handling
rg -A 5 -B 5 "test.*register.*error|test.*udf.*error" --type rust

# Check how errors are handled in other similar registration functions
rg -A 5 "try_for_each.*map_err" --type rust

Length of output: 113


Confirm UDF registration error flow

I didn’t find any existing tests for UDF registration error cases, and by design try_for_each will stop on the first Err, so any subsequent UDFs won’t be registered. Please verify whether this early exit is intended or if you need to register all UDFs and surface all errors at once.

Action items:

  • Add unit tests in crates/arkflow-plugin/src/processor/udf that simulate a failing UDF registration and confirm when/if the loop aborts.
  • If you intend to register every UDF regardless of individual failures, consider replacing try_for_each with a loop that collects all errors and returns an aggregated result.

@chenquan chenquan force-pushed the feat/sql-udf branch 2 times, most recently from 26f9de6 to db2af20 Compare April 18, 2025 12:06
Copy link
Contributor

@coderabbitai coderabbitai bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 1

🧹 Nitpick comments (7)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (2)

24-27: Consider preventing duplicate registrations.

register blindly pushes every UDF into the global Vec, so calling it twice with the same name stores two identical entries.
At init‑time the second entry silently overwrites the first in the FunctionRegistry, but the duplicate Arcs remain in memory.

If you don’t need multiple versions of the same window UDF, switching to a HashMap<String, Arc<WindowUDF>> keyed by udf.name() eliminates duplicates and reduces overhead.


30-32: Minor: unnecessary fully‑qualified path.

Inside the same module you can reference UDFS directly instead of
crate::processor::udf::window_udf::UDFS; this shortens the line and avoids refactor‑sensitive paths.

-    let window_udfs = crate::processor::udf::window_udf::UDFS
+    let window_udfs = UDFS
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (5)

24-27: Consider returning Result instead of using expect

Using expect will cause a panic if the lock can't be acquired, which might be problematic in production environments. Consider returning a Result instead to allow callers to handle the error gracefully.

-pub fn register(udf: AggregateUDF) {
-    let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
-    udfs.push(Arc::new(udf));
+pub fn register(udf: AggregateUDF) -> Result<(), Error> {
+    let mut udfs = UDFS.write().map_err(|e| Error::Config(
+        format!("Failed to acquire write lock for UDFS: {}", e)
+    ))?;
+    udfs.push(Arc::new(udf));
+    Ok(())
}

30-32: Simplify path to UDFS

The fully qualified path to UDFS is unnecessary since UDFS is defined in this module.

-    let aggregate_udfs = crate::processor::udf::aggregate_udf::UDFS
+    let aggregate_udfs = UDFS
         .read()
         .expect("Failed to acquire read lock for aggregate UDFS");

31-32: Consider returning Result instead of using expect

Similar to the register function, using expect here will panic if the lock can't be acquired. Consider propagating the error instead.

-        .read()
-        .expect("Failed to acquire read lock for aggregate UDFS");
+        .read()
+        .map_err(|e| Error::Config(
+            format!("Failed to acquire read lock for aggregate UDFS: {}", e)
+        ))?;

14-43: Add documentation comments

Consider adding documentation comments to explain the purpose and usage of the module and its functions. This will help other developers understand how to use this module correctly.

+/// Module for managing aggregate User-Defined Functions (UDFs).
+/// Provides thread-safe global storage and registration of aggregate UDFs.
 use arkflow_core::Error;
 use datafusion::execution::FunctionRegistry;
 use datafusion::logical_expr::AggregateUDF;
 use std::sync::{Arc, RwLock};
 use tracing::debug;

 lazy_static::lazy_static! {
+   /// Thread-safe global registry for aggregate UDFs.
    static ref UDFS: RwLock<Vec<Arc<AggregateUDF>>> = RwLock::new(Vec::new());
 }

+/// Registers an aggregate UDF to the global registry.
+///
+/// # Arguments
+///
+/// * `udf` - The aggregate UDF to register.
+///
+/// # Panics
+///
+/// Panics if it fails to acquire the write lock for the UDF registry.
 pub fn register(udf: AggregateUDF) {
     let mut udfs = UDFS.write().expect("Failed to acquire write lock for UDFS");
     udfs.push(Arc::new(udf));
 }

+/// Initializes and registers all aggregate UDFs with the provided function registry.
+///
+/// # Arguments
+///
+/// * `registry` - The function registry to register UDFs with.
+///
+/// # Returns
+///
+/// A Result indicating success or containing an error if registration fails.
 pub(crate) fn init<T: FunctionRegistry>(registry: &mut T) -> Result<(), Error> {

34-42: Improve error context with specific UDF information

When registering fails, it would be helpful to include more context about which specific UDF failed to register.

     aggregate_udfs
         .iter()
-        .try_for_each(|udf| {
+        .try_fold((), |_, udf| {
             let existing_udf = registry.register_udaf(Arc::clone(udf))?;
             if let Some(existing_udf) = existing_udf {
                 debug!("Overwrite existing aggregate UDF: {}", existing_udf.name());
             }
             Ok(()) as datafusion::common::Result<()>
         })
-        .map_err(|e| Error::Config(format!("Failed to register aggregate UDFs: {}", e)))
+        .map_err(|e| Error::Config(format!("Failed to register aggregate UDFs: {}", e)))
📜 Review details

Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro

📥 Commits

Reviewing files that changed from the base of the PR and between 5360148 and a2fd659.

📒 Files selected for processing (4)
  • crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/mod.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (1 hunks)
  • crates/arkflow-plugin/src/processor/udf/window_udf.rs (1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
  • crates/arkflow-plugin/src/processor/udf/mod.rs
  • crates/arkflow-plugin/src/processor/udf/scalar_udf.rs
🧰 Additional context used
🧬 Code Graph Analysis (2)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (3)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (2)
  • register (47-50)
  • init (52-66)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (2)
  • register (24-27)
  • init (29-43)
crates/arkflow-plugin/src/processor/udf/mod.rs (1)
  • init (26-31)
crates/arkflow-plugin/src/processor/udf/window_udf.rs (5)
crates/arkflow-plugin/src/processor/sql.rs (2)
  • new (52-64)
  • init (162-164)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (2)
  • register (24-27)
  • init (29-43)
crates/arkflow-plugin/src/processor/udf/scalar_udf.rs (2)
  • register (47-50)
  • init (52-66)
crates/arkflow-plugin/src/processor/udf/mod.rs (1)
  • init (26-31)
crates/arkflow-plugin/src/processor/mod.rs (1)
  • init (31-38)
⏰ Context from checks skipped due to timeout of 90000ms (2)
  • GitHub Check: build
  • GitHub Check: build
🔇 Additional comments (1)
crates/arkflow-plugin/src/processor/udf/aggregate_udf.rs (1)

1-44: Well-structured module for aggregate UDF management

The overall structure of this module follows a good design pattern for managing aggregate UDFs, consistent with the scalar and window UDF implementations. The thread-safe global registry with RwLock provides a clean way to register and access UDFs.

@chenquan chenquan marked this pull request as ready for review April 18, 2025 14:23
@chenquan chenquan merged commit 4085a0d into main Apr 18, 2025
3 of 4 checks passed
@chenquan chenquan added this to the v0.3.0 milestone Apr 19, 2025
@chenquan chenquan added the enhancement New feature or request label Apr 19, 2025
@chenquan chenquan deleted the feat/sql-udf branch April 19, 2025 03:34
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants