Skip to content

Add time windowed chunking, groupedWithin #185

Open
@Dogacel

Description

@Dogacel

Similar to groupedWithin in Akka, I think this is a very useful utility to have.

Akka Docs

Proposed solution

fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }

implementation can be based on Kotlin/kotlinx.coroutines#1290 (comment)

I have modified that code slightly, I can help work on a solution based on a channel flow.

Behavior

  • Once flow reaches size items, it emits.
  • If flow can't reach size items within limit time, it emits the items collected until now, unless there is none.

Why
This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as

suspend fun main() {
  val sqsClient = getSqsClient()


  while (true) {
    val items = sqsClient.poll(10)
    process(items)
    delay(10.seconds)
  }
}

instead, we can use

suspend fun main() {
  val sqsClient = getSqsClient()

  flow {
    while (true) {
      val items = sqsClient.poll(10)
      items.forEach { emit(it) }
      delay(10.seconds)
    }
  }.groupedWithin(128, 30.seconds) {
    process(it)
  }
}

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions