Skip to content

PrefetchingResultSetIterator: fix bug w/r/t intermediate zero-row pages #1375

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 1 commit into
base: trunk
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
PrefetchingResultSetIterator: fix bug w/r/t intermediate zero-row pages
Zero row pages (that aren't the last) can sometimes happen, for example when
using "filtering".

The iterator has a logic bug which makes it throw an exception when encountering
this.

Fix this issue with minimal code changes and add associated unit test.
  • Loading branch information
eappere committed Feb 3, 2025
commit 93a0d4cd049c2e4a0ca240fbdcae44a5d9d3a8c7
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,10 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
}

private def maybePrefetch(): Unit = {
if (!currentIterator.hasNext && currentResultSet.hasMorePages) {
// It is sometimes possible to see pages have zero elements that are
// not the last, hence iterating until we get at least one element is
// needed
while (!currentIterator.hasNext && currentResultSet.hasMorePages) {
currentResultSet = Await.result(nextResultSet.get, Duration.Inf)
currentIterator = currentResultSet.currentPage().iterator()
nextResultSet = fetchNextPage()
Expand All @@ -48,8 +51,16 @@ class PrefetchingResultSetIterator(resultSet: AsyncResultSet, timer: Option[Time
currentIterator.hasNext || currentResultSet.hasMorePages

override def next(): Row = {
val row = currentIterator.next() // let's try to exhaust the current iterator first
val row = currentIterator.next()

// This must be called after the call to next() and not before,
// so that hasNext returns is a correct result
maybePrefetch()
row
}

// It is possible to have empty pages at the start of the query result,
// for example when using "filtering"; discard those and ensure hasNext
// always returns the correct result
maybePrefetch()
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package com.datastax.spark.connector.rdd.reader

import com.datastax.oss.driver.api.core.cql.{AsyncResultSet, Row}
import org.mockito.Mockito._
import org.scalatest.{FlatSpec, Matchers}
import org.scalatestplus.mockito.MockitoSugar

import java.util.concurrent.CompletableFuture
import scala.jdk.CollectionConverters._

class PrefetchingResultSetIteratorSpec extends FlatSpec with Matchers with MockitoSugar {

"PrefetchingResultSetIterator" should "handle empty pages that are not the last" in {
val row1 = mock[Row]
val row2 = mock[Row]
val row3 = mock[Row]

val asyncResultSet1 = mock[AsyncResultSet]
val asyncResultSet2 = mock[AsyncResultSet]
val asyncResultSet3 = mock[AsyncResultSet]
val asyncResultSet4 = mock[AsyncResultSet]
val asyncResultSet5 = mock[AsyncResultSet]

// First page is empty
when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet1.hasMorePages()).thenReturn(true)
when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2))

// Second page has data
when(asyncResultSet2.currentPage()).thenReturn(Seq(row1).asJava)
when(asyncResultSet2.hasMorePages()).thenReturn(true)
when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3))

// Third page is empty
when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet3.hasMorePages()).thenReturn(true)
when(asyncResultSet3.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet4))

// Fourth page has data
when(asyncResultSet4.currentPage()).thenReturn(Seq(row2, row3).asJava)
when(asyncResultSet4.hasMorePages()).thenReturn(true)
when(asyncResultSet4.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet5))

// Last page is empty
when(asyncResultSet5.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet5.hasMorePages()).thenReturn(false)

val iterator = new PrefetchingResultSetIterator(asyncResultSet1)

val rows = iterator.toList

rows should contain theSameElementsInOrderAs Seq(row1, row2, row3)

verify(asyncResultSet1).fetchNextPage()
verify(asyncResultSet2).fetchNextPage()
verify(asyncResultSet3).fetchNextPage()
verify(asyncResultSet4).fetchNextPage()
verify(asyncResultSet5, never()).fetchNextPage()
}

it should "handle a result made of empty pages only" in {
val asyncResultSet1 = mock[AsyncResultSet]
val asyncResultSet2 = mock[AsyncResultSet]
val asyncResultSet3 = mock[AsyncResultSet]

// First page is empty
when(asyncResultSet1.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet1.hasMorePages()).thenReturn(true)
when(asyncResultSet1.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet2))

// Second page is empty
when(asyncResultSet2.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet2.hasMorePages()).thenReturn(true)
when(asyncResultSet2.fetchNextPage()).thenReturn(CompletableFuture.completedFuture(asyncResultSet3))

// Last page is empty
when(asyncResultSet3.currentPage()).thenReturn(Seq.empty[Row].asJava)
when(asyncResultSet3.hasMorePages()).thenReturn(false)

val iterator = new PrefetchingResultSetIterator(asyncResultSet1)

val rows = iterator.toList

rows should contain theSameElementsInOrderAs Seq.empty[Row]

verify(asyncResultSet1).fetchNextPage()
verify(asyncResultSet2).fetchNextPage()
verify(asyncResultSet3, never()).fetchNextPage()
}
}