Open
Description
Similar to groupedWithin
in Akka, I think this is a very useful utility to have.
Proposed solution
fun Flow<T>.groupedWithin(size: Int, limit: Duration): Flow<List<T>> { ... }
implementation can be based on Kotlin/kotlinx.coroutines#1290 (comment)
I have modified that code slightly, I can help work on a solution based on a channel flow.
Behavior
- Once flow reaches size items, it emits.
- If flow can't reach size items within limit time, it emits the items collected until now, unless there is none.
Why
This is very useful when we are bridging the gap between the regular APIs and streaming APIs. For example, assume you have an API to fetch SQS messages, traditionally you would implement it as
suspend fun main() {
val sqsClient = getSqsClient()
while (true) {
val items = sqsClient.poll(10)
process(items)
delay(10.seconds)
}
}
instead, we can use
suspend fun main() {
val sqsClient = getSqsClient()
flow {
while (true) {
val items = sqsClient.poll(10)
items.forEach { emit(it) }
delay(10.seconds)
}
}.groupedWithin(128, 30.seconds) {
process(it)
}
}