-
Notifications
You must be signed in to change notification settings - Fork 28.7k
[SPARK-52588][SQL] Approx_top_k: accumulate and estimate #51308
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Conversation
defaultCheck | ||
} else if (!k.foldable) { | ||
TypeCheckFailure("K must be a constant literal") | ||
} else { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we also check the StructType
of state
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, let's add test for this.
|
||
def getSketchStateDataType(itemDataType: DataType): StructType = | ||
StructType( | ||
StructField("Sketch", BinaryType, nullable = false) :: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sketch
=> sketch
. let's use camelCase
* and the maximum number of items tracked by the sketch. | ||
* | ||
* @param expr the child expression to accumulate items from | ||
* @param maxItemsTracked the maximum number of items to track in the sketch |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also add doc for mutableAggBufferOffset and inputAggBufferOffset
k = Literal(10), | ||
maxItemsTracked = Literal(10000) | ||
) | ||
assert(agg.checkInputDataTypes().isFailure) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also check the failure message
k = Sum(BoundReference(1, IntegerType, nullable = true)), | ||
maxItemsTracked = Literal(10) | ||
) | ||
assert(badAgg.checkInputDataTypes().isFailure) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
let's also check the failure message
@@ -2931,6 +2904,112 @@ class DataFrameAggregateSuite extends QueryTest | |||
res, | |||
Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0))) | |||
} | |||
|
|||
test("SPARK-52588: accumulate and estimate of Integer with default parameters") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we move the new tests related to approx_top_k_* to a new test suite?
@@ -2931,6 +2904,112 @@ class DataFrameAggregateSuite extends QueryTest | |||
res, | |||
Row(LocalTime.of(22, 1, 0), LocalTime.of(3, 0, 0))) | |||
} | |||
|
|||
test("SPARK-52588: accumulate and estimate of Integer with default parameters") { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's also test inputs with different data types, similar to the approx_top_k function.
What changes were proposed in this pull request?
This PR adds two SQL functions;
approx_top_k_accumulate
, an aggregation function that accumulates input data into a sketch, andapprox_top_k_estimate
, an expression function that estimates the top k frequent items from a sketch.1. approx_top_k_accumulate
Syntax
Arguments
expr
: An expression of BOOLEAN, BINARY, STRING, DATE, TIMESTAMP or numeric type.maxItemsTracked
: An optional INTEGER literal. If maxItemsTracked is not specified, it defaults to 10000. This is the maximum number of distinct values that can be tracked by the sketch.Returns
The return of this function is a STRUCT with three fields: (1) Sketch field, which is the BINARY form of the sketch status; (2) ItemTypeNull field, which is a null value indicating the original type of
expr
. And (3) MaxItemsTracked, which is themaxItemsTracked
argument.2. approx_top_k_estimate
Syntax
Arguments
state
: An expression for the sketch STRUCT that is generated by approx_top_k_accumulate or approx_top_k_combinek
: An optional INTEGER literal greater than 0. If k is not specified, it defaults to 5.Returns
Results are returned as an ARRAY of type STRUCT, where each STRUCT contains an item field for the value (with its original input type) and a count field (of type LONG) with the approximate number of occurrences. The array is sorted by count descending.
Summary of changes:
Tests:
approx_top_k_estimate(approx_top_k_accumulate(expr, maxItemsTracked), k)
together.Implementation:
approx_top_k_accumulate
approx_top_k_estimation
Why are the changes needed?
They are useful sibling functions for approx_top_k queries.
Does this PR introduce any user-facing change?
Yes, this PR introduces a new user-facing SQL function. See user examples as below.
How was this patch tested?
Unit tests for end-to-end SQL queries and invalid input for expressions.
Was this patch authored or co-authored using generative AI tooling?
N/A