Skip to content

Commit 0f9cca5

Browse files
cloud-fanHyukjinKwon
authored andcommitted
[SPARK-44287][SQL][FOLLOWUP] Do not trigger execution too early
### What changes were proposed in this pull request? This is a followup of apache#41839 , to fix an unintentional change. That PR added an optimization to return an empty iterator directly if the input iterator is empty. However, checking `inputIterator.hasNext` may trigger query execution, which is different than before. It should be completely lazy and wait for the root operator's iterator to trigger the execution. ### Why are the changes needed? fix unintentional change ### Does this PR introduce _any_ user-facing change? no ### How was this patch tested? existing tests Closes apache#42226 from cloud-fan/fo. Authored-by: Wenchen Fan <[email protected]> Signed-off-by: Hyukjin Kwon <[email protected]>
1 parent 9f03f43 commit 0f9cca5

File tree

1 file changed

+26
-31
lines changed

1 file changed

+26
-31
lines changed

sql/core/src/main/scala/org/apache/spark/sql/execution/ColumnarEvaluatorFactory.scala

Lines changed: 26 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -70,42 +70,37 @@ class RowToColumnarEvaluatorFactory(
7070
inputs: Iterator[InternalRow]*): Iterator[ColumnarBatch] = {
7171
assert(inputs.length == 1)
7272
val rowIterator = inputs.head
73+
new Iterator[ColumnarBatch] {
74+
private lazy val converters = new RowToColumnConverter(schema)
75+
private lazy val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
76+
OffHeapColumnVector.allocateColumns(numRows, schema)
77+
} else {
78+
OnHeapColumnVector.allocateColumns(numRows, schema)
79+
}
80+
private lazy val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
7381

74-
if (rowIterator.hasNext) {
75-
new Iterator[ColumnarBatch] {
76-
private val converters = new RowToColumnConverter(schema)
77-
private val vectors: Seq[WritableColumnVector] = if (enableOffHeapColumnVector) {
78-
OffHeapColumnVector.allocateColumns(numRows, schema)
79-
} else {
80-
OnHeapColumnVector.allocateColumns(numRows, schema)
81-
}
82-
private val cb: ColumnarBatch = new ColumnarBatch(vectors.toArray)
83-
84-
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
85-
cb.close()
86-
}
82+
TaskContext.get().addTaskCompletionListener[Unit] { _ =>
83+
cb.close()
84+
}
8785

88-
override def hasNext: Boolean = {
89-
rowIterator.hasNext
90-
}
86+
override def hasNext: Boolean = {
87+
rowIterator.hasNext
88+
}
9189

92-
override def next(): ColumnarBatch = {
93-
cb.setNumRows(0)
94-
vectors.foreach(_.reset())
95-
var rowCount = 0
96-
while (rowCount < numRows && rowIterator.hasNext) {
97-
val row = rowIterator.next()
98-
converters.convert(row, vectors.toArray)
99-
rowCount += 1
100-
}
101-
cb.setNumRows(rowCount)
102-
numInputRows += rowCount
103-
numOutputBatches += 1
104-
cb
90+
override def next(): ColumnarBatch = {
91+
cb.setNumRows(0)
92+
vectors.foreach(_.reset())
93+
var rowCount = 0
94+
while (rowCount < numRows && rowIterator.hasNext) {
95+
val row = rowIterator.next()
96+
converters.convert(row, vectors.toArray)
97+
rowCount += 1
10598
}
99+
cb.setNumRows(rowCount)
100+
numInputRows += rowCount
101+
numOutputBatches += 1
102+
cb
106103
}
107-
} else {
108-
Iterator.empty
109104
}
110105
}
111106
}

0 commit comments

Comments
 (0)