Skip to content

Commit cabba33

Browse files
committed
[SPARK-43061][CORE][SQL] Introduce PartitionEvaluator for SQL operator execution
### What changes were proposed in this pull request? This PR adds a new API `PartitionEvaluator` to define the computing logic and requires the caller side to explicitly list what needs to be serialized and sent to executors via `PartitionEvaluatorFactory`. Two new RDD APIs are added to use `PartitionEvaluator`: ``` /** * Return a new RDD by applying an evaluator to each partition of this RDD. The given evaluator * factory will be serialized and sent to executors, and each task will create an evaluator with * the factory, and use the evaluator to transform the data of the input partition. */ DeveloperApi Since("3.5.0") def mapPartitionsWithEvaluator[U: ClassTag]( partitionEvaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope { new MapPartitionsWithEvaluatorRDD(this, taskEvaluatorFactory) } /** * Zip this RDD's partitions with another RDD and return a new RDD by applying an evaluator to * the zipped partitions. Assumes that the two RDDs have the *same number of partitions*, but * does *not* require them to have the same number of elements in each partition. */ DeveloperApi Since("3.5.0") def zipPartitionsWithEvaluator[U: ClassTag]( rdd2: RDD[T], partitionEvaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope { new ZippedPartitionsWithEvaluatorRDD(this, rdd2, partitionEvaluatorFactory) } ``` Three SQL operators are updated to use the new API to do execution, as a showcase: Project, Filter, WholeStageCodegen. We can migrate more operators later. A config is added to still use the old code path by default. ### Why are the changes needed? Using lambda to define the computing logic is a bit tricky: 1. it's easy to mistakenly reference objects in the closure, which increases the time to serialize the lambda and sent to executors. `ProjectExec` and `FilterExec` use `child.output` in the lambda which means the entire `child` will be serialized. There are other places trying to avoid this problem, e.g. https://github.com/apache/spark/blob/v3.3.2/sql/core/src/main/scala/org/apache/spark/sql/execution/Columnar.scala#L90-L92 2. serializing lambda is strongly discouraged by the [official Java guide](https://docs.oracle.com/javase/tutorial/java/javaOO/lambdaexpressions.html#serialization). We should eventually get rid of lambda during distributed execution to make Spark more robust. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? existing tests Closes apache#40697 from cloud-fan/serde. Lead-authored-by: Wenchen Fan <[email protected]> Co-authored-by: Wenchen Fan <[email protected]> Signed-off-by: Wenchen Fan <[email protected]>
1 parent 99d2932 commit cabba33

File tree

11 files changed

+360
-37
lines changed

11 files changed

+360
-37
lines changed
Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import org.apache.spark.annotation.{DeveloperApi, Since}
21+
22+
/**
23+
* An evaluator for computing RDD partitions. Spark serializes and sends
24+
* [[PartitionEvaluatorFactory]] to executors, and then creates [[PartitionEvaluator]] via the
25+
* factory at the executor side.
26+
*/
27+
@DeveloperApi
28+
@Since("3.5.0")
29+
trait PartitionEvaluator[T, U] {
30+
31+
/**
32+
* Evaluates the RDD partition at the given index. There can be more than one input iterator,
33+
* if the RDD was zipped from multiple RDDs.
34+
*/
35+
def eval(partitionIndex: Int, inputs: Iterator[T]*): Iterator[U]
36+
}
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark
19+
20+
import java.io.Serializable
21+
22+
import org.apache.spark.annotation.{DeveloperApi, Since}
23+
24+
/**
25+
* A factory to create [[PartitionEvaluator]]. Spark serializes and sends
26+
* [[PartitionEvaluatorFactory]] to executors, and then creates [[PartitionEvaluator]] via the
27+
* factory at the executor side.
28+
*/
29+
@DeveloperApi
30+
@Since("3.5.0")
31+
trait PartitionEvaluatorFactory[T, U] extends Serializable {
32+
33+
/**
34+
* Creates a partition evaluator. Each RDD partition will create one evaluator instance, which
35+
* means one evaluator instance will be used by only one thread.
36+
*/
37+
def createEvaluator(): PartitionEvaluator[T, U]
38+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import scala.reflect.ClassTag
21+
22+
import org.apache.spark.{Partition, PartitionEvaluatorFactory, TaskContext}
23+
24+
private[spark] class MapPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
25+
var prev: RDD[T],
26+
evaluatorFactory: PartitionEvaluatorFactory[T, U])
27+
extends RDD[U](prev) {
28+
29+
override def getPartitions: Array[Partition] = firstParent[T].partitions
30+
31+
override def compute(split: Partition, context: TaskContext): Iterator[U] = {
32+
val evaluator = evaluatorFactory.createEvaluator()
33+
val input = firstParent[T].iterator(split, context)
34+
evaluator.eval(split.index, input)
35+
}
36+
37+
override def clearDependencies(): Unit = {
38+
super.clearDependencies()
39+
prev = null
40+
}
41+
}

core/src/main/scala/org/apache/spark/rdd/RDD.scala

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -908,6 +908,31 @@ abstract class RDD[T: ClassTag](
908908
preservesPartitioning)
909909
}
910910

911+
/**
912+
* Return a new RDD by applying an evaluator to each partition of this RDD. The given evaluator
913+
* factory will be serialized and sent to executors, and each task will create an evaluator with
914+
* the factory, and use the evaluator to transform the data of the input partition.
915+
*/
916+
@DeveloperApi
917+
@Since("3.5.0")
918+
def mapPartitionsWithEvaluator[U: ClassTag](
919+
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
920+
new MapPartitionsWithEvaluatorRDD(this, evaluatorFactory)
921+
}
922+
923+
/**
924+
* Zip this RDD's partitions with another RDD and return a new RDD by applying an evaluator to
925+
* the zipped partitions. Assumes that the two RDDs have the *same number of partitions*, but
926+
* does *not* require them to have the same number of elements in each partition.
927+
*/
928+
@DeveloperApi
929+
@Since("3.5.0")
930+
def zipPartitionsWithEvaluator[U: ClassTag](
931+
rdd2: RDD[T],
932+
evaluatorFactory: PartitionEvaluatorFactory[T, U]): RDD[U] = withScope {
933+
new ZippedPartitionsWithEvaluatorRDD(this, rdd2, evaluatorFactory)
934+
}
935+
911936
/**
912937
* Return a new RDD by applying a function to each partition of this RDD, while tracking the index
913938
* of the original partition.
Lines changed: 38 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,38 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.rdd
19+
20+
import scala.reflect.ClassTag
21+
22+
import org.apache.spark.{Partition, PartitionEvaluatorFactory, TaskContext}
23+
24+
private[spark] class ZippedPartitionsWithEvaluatorRDD[T : ClassTag, U : ClassTag](
25+
var rdd1: RDD[T],
26+
var rdd2: RDD[T],
27+
evaluatorFactory: PartitionEvaluatorFactory[T, U])
28+
extends ZippedPartitionsBaseRDD[U](rdd1.context, List(rdd1, rdd2)) {
29+
30+
override def compute(split: Partition, context: TaskContext): Iterator[U] = {
31+
val evaluator = evaluatorFactory.createEvaluator()
32+
val partitions = split.asInstanceOf[ZippedPartitionsPartition].partitions
33+
evaluator.eval(
34+
split.index,
35+
rdd1.iterator(partitions(0), context),
36+
rdd2.iterator(partitions(1), context))
37+
}
38+
}

sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1808,6 +1808,13 @@ object SQLConf {
18081808
.booleanConf
18091809
.createWithDefault(false)
18101810

1811+
val USE_PARTITION_EVALUATOR = buildConf("spark.sql.execution.usePartitionEvaluator")
1812+
.internal()
1813+
.doc("When true, use PartitionEvaluator to execute SQL operators.")
1814+
.version("3.5.0")
1815+
.booleanConf
1816+
.createWithDefault(false)
1817+
18111818
val STATE_STORE_PROVIDER_CLASS =
18121819
buildConf("spark.sql.streaming.stateStore.providerClass")
18131820
.internal()
@@ -5028,6 +5035,8 @@ class SQLConf extends Serializable with Logging {
50285035
def allowsTempViewCreationWithMultipleNameparts: Boolean =
50295036
getConf(SQLConf.ALLOW_TEMP_VIEW_CREATION_WITH_MULTIPLE_NAME_PARTS)
50305037

5038+
def usePartitionEvaluator: Boolean = getConf(SQLConf.USE_PARTITION_EVALUATOR)
5039+
50315040
/** ********************** SQLConf functionality methods ************ */
50325041

50335042
/** Set Spark SQL configuration properties. */
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, Predicate}
23+
import org.apache.spark.sql.execution.metric.SQLMetric
24+
25+
class FilterEvaluatorFactory(
26+
condition: Expression,
27+
childOutput: Seq[Attribute],
28+
numOutputRows: SQLMetric) extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
29+
30+
override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = {
31+
new FilterPartitionEvaluator
32+
}
33+
34+
class FilterPartitionEvaluator extends PartitionEvaluator[InternalRow, InternalRow] {
35+
override def eval(
36+
partitionIndex: Int,
37+
inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
38+
assert(inputs.length == 1)
39+
val predicate = Predicate.create(condition, childOutput)
40+
predicate.initialize(partitionIndex)
41+
inputs.head.filter { row =>
42+
val r = predicate.eval(row)
43+
if (r) numOutputRows += 1
44+
r
45+
}
46+
}
47+
}
48+
}
Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.{Attribute, Expression, UnsafeProjection}
23+
24+
class ProjectEvaluatorFactory(projectList: Seq[Expression], childOutput: Seq[Attribute])
25+
extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
26+
27+
override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = {
28+
new ProjectPartitionEvaluator
29+
}
30+
31+
class ProjectPartitionEvaluator extends PartitionEvaluator[InternalRow, InternalRow] {
32+
override def eval(
33+
partitionIndex: Int,
34+
inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
35+
assert(inputs.length == 1)
36+
val project = UnsafeProjection.create(projectList, childOutput)
37+
project.initialize(partitionIndex)
38+
inputs.head.map(project)
39+
}
40+
}
41+
}
Lines changed: 51 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.spark.sql.execution
19+
20+
import org.apache.spark.{PartitionEvaluator, PartitionEvaluatorFactory}
21+
import org.apache.spark.sql.catalyst.InternalRow
22+
import org.apache.spark.sql.catalyst.expressions.codegen.{CodeAndComment, CodeGenerator}
23+
import org.apache.spark.sql.execution.metric.SQLMetric
24+
25+
class WholeStageCodegenEvaluatorFactory(
26+
cleanedSource: CodeAndComment,
27+
durationMs: SQLMetric,
28+
references: Array[Any]) extends PartitionEvaluatorFactory[InternalRow, InternalRow] {
29+
30+
override def createEvaluator(): PartitionEvaluator[InternalRow, InternalRow] = {
31+
new WholeStageCodegenPartitionEvaluator()
32+
}
33+
34+
class WholeStageCodegenPartitionEvaluator extends PartitionEvaluator[InternalRow, InternalRow] {
35+
override def eval(
36+
partitionIndex: Int,
37+
inputs: Iterator[InternalRow]*): Iterator[InternalRow] = {
38+
val (clazz, _) = CodeGenerator.compile(cleanedSource)
39+
val buffer = clazz.generate(references).asInstanceOf[BufferedRowIterator]
40+
buffer.init(partitionIndex, inputs.toArray)
41+
new Iterator[InternalRow] {
42+
override def hasNext: Boolean = {
43+
val v = buffer.hasNext
44+
if (!v) durationMs += buffer.durationMs()
45+
v
46+
}
47+
override def next: InternalRow = buffer.next()
48+
}
49+
}
50+
}
51+
}

0 commit comments

Comments
 (0)