Skip to content

Commit b9c2521

Browse files
zsxwingdongjoon-hyun
authored andcommitted
[SPARK-28489][SS] Fix a bug that KafkaOffsetRangeCalculator.getRanges may drop offsets
## What changes were proposed in this pull request? `KafkaOffsetRangeCalculator.getRanges` may drop offsets due to round off errors. The test added in this PR is one example. This PR rewrites the logic in `KafkaOffsetRangeCalculator.getRanges` to ensure it never drops offsets. ## How was this patch tested? The regression test. Closes apache#25237 from zsxwing/fix-range. Authored-by: Shixiong Zhu <[email protected]> Signed-off-by: Dongjoon Hyun <[email protected]>
1 parent c93d2dd commit b9c2521

File tree

2 files changed

+30
-11
lines changed

2 files changed

+30
-11
lines changed

external/kafka-0-10-sql/src/main/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculator.scala

Lines changed: 15 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -61,19 +61,23 @@ private[kafka010] class KafkaOffsetRangeCalculator(val minPartitions: Option[Int
6161

6262
// Splits offset ranges with relatively large amount of data to smaller ones.
6363
val totalSize = offsetRanges.map(_.size).sum
64-
val idealRangeSize = totalSize.toDouble / minPartitions.get
65-
6664
offsetRanges.flatMap { range =>
67-
// Split the current range into subranges as close to the ideal range size
68-
val numSplitsInRange = math.round(range.size.toDouble / idealRangeSize).toInt
69-
70-
(0 until numSplitsInRange).map { i =>
71-
val splitStart = range.fromOffset + range.size * (i.toDouble / numSplitsInRange)
72-
val splitEnd = range.fromOffset + range.size * ((i.toDouble + 1) / numSplitsInRange)
73-
KafkaOffsetRange(
74-
range.topicPartition, splitStart.toLong, splitEnd.toLong, preferredLoc = None)
65+
val tp = range.topicPartition
66+
val size = range.size
67+
// number of partitions to divvy up this topic partition to
68+
val parts = math.max(math.round(size.toDouble / totalSize * minPartitions.get), 1).toInt
69+
var remaining = size
70+
var startOffset = range.fromOffset
71+
(0 until parts).map { part =>
72+
// Fine to do integer division. Last partition will consume all the round off errors
73+
val thisPartition = remaining / (parts - part)
74+
remaining -= thisPartition
75+
val endOffset = math.min(startOffset + thisPartition, range.untilOffset)
76+
val offsetRange = KafkaOffsetRange(tp, startOffset, endOffset, None)
77+
startOffset = endOffset
78+
offsetRange
7579
}
76-
}
80+
}.filter(_.size > 0)
7781
}
7882
}
7983

external/kafka-0-10-sql/src/test/scala/org/apache/spark/sql/kafka010/KafkaOffsetRangeCalculatorSuite.scala

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,21 @@ class KafkaOffsetRangeCalculatorSuite extends SparkFunSuite {
141141
KafkaOffsetRange(tp2, 14, 21, None)))
142142
}
143143

144+
testWithMinPartitions("SPARK-28489: never drop offsets", 6) { calc =>
145+
assert(
146+
calc.getRanges(
147+
fromOffsets = Map(tp1 -> 0, tp2 -> 0, tp3 -> 0),
148+
untilOffsets = Map(tp1 -> 10, tp2 -> 10, tp3 -> 1)) ==
149+
Seq(
150+
KafkaOffsetRange(tp1, 0, 3, None),
151+
KafkaOffsetRange(tp1, 3, 6, None),
152+
KafkaOffsetRange(tp1, 6, 10, None),
153+
KafkaOffsetRange(tp2, 0, 3, None),
154+
KafkaOffsetRange(tp2, 3, 6, None),
155+
KafkaOffsetRange(tp2, 6, 10, None),
156+
KafkaOffsetRange(tp3, 0, 1, None)))
157+
}
158+
144159
private val tp1 = new TopicPartition("t1", 1)
145160
private val tp2 = new TopicPartition("t2", 1)
146161
private val tp3 = new TopicPartition("t3", 1)

0 commit comments

Comments
 (0)