Skip to content

feat: transaction batcher module #1348

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 25 commits into from
Feb 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions executions/graphql-kotlin-transaction-batcher/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
description = "Transaction Batcher"

val reactiveStreamsVersion: String by project
val junitVersion: String by project
val slf4jVersion: String by project
val reactorVersion: String by project
val reactorExtensionsVersion: String by project

dependencies {
implementation("org.reactivestreams:reactive-streams:$reactiveStreamsVersion")
implementation("org.slf4j:slf4j-api:$slf4jVersion")
testImplementation("io.projectreactor.kotlin:reactor-kotlin-extensions:$reactorExtensionsVersion")
testImplementation("io.projectreactor:reactor-core:$reactorVersion")
testImplementation("io.projectreactor:reactor-test:$reactorVersion")
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
testImplementation("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.publisher

import com.expediagroup.graphql.transactionbatcher.transaction.BatchableTransaction
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache
import org.reactivestreams.Publisher
import org.reactivestreams.Subscriber
import org.reactivestreams.Subscription
import org.slf4j.LoggerFactory

/**
* Interface representing a publisher with input [TInput] type and output [TOutput] type
*/
@Suppress(
"ReactiveStreamsSubscriberImplementation",
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why are we suppressing this warning? is this valid?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is valid, given that Subscriber is an interface, so it's valid to implement it, However a warning appears in the IDE when trying to do it.

i can see a lot of Subscriber implementations that need to add that suppress as well
https://github.com/search?p=2&q=%22ReactiveStreamsSubscriberImplementation%22&type=Code

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unsure, I am not getting any warning in my local

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

image

"UNCHECKED_CAST"
)
fun interface TriggeredPublisher<TInput, TOutput> {
/**
* Given an input of type [TInput] create a cold [Publisher] that will produce a [Publisher] of type [TOutput] of n elements
* that maps to the size of the input [List] of [TInput]
* order is important so make sure to produce elements in the same order of [input]
*/
fun produce(input: List<TInput>): Publisher<TOutput>

/**
* Attempts to collect values from [cache] first and then [produce]
*
* Example:
* if [TriggeredPublisher] is of type <Int, Int> and [cache] resolves [1, null, 3, null, 5, 6]
* we will attempt to produce elements for index 1 and 3
* when [produce] stream completes we will complete futures from either values resolved from [cache] or from [produce]
*/
fun trigger(
batchableTransactions: List<BatchableTransaction<TInput, TOutput>>,
cache: TransactionBatcherCache
) {
val transactionsNotInCache = batchableTransactions
.filterNot { transaction -> cache.contains(transaction.key) }
.map(BatchableTransaction<TInput, TOutput>::input)

produce(
transactionsNotInCache
).subscribe(
object : Subscriber<TOutput> {
private lateinit var subscription: Subscription
private val transactionResults = mutableListOf<TOutput>()

override fun onSubscribe(subscription: Subscription) {
this.subscription = subscription
this.subscription.request(1)
}

override fun onNext(result: TOutput) {
transactionResults += result
this.subscription.request(1)
}

override fun onError(throwable: Throwable) {
logger.error("Error while producing data", throwable)
}

override fun onComplete() {
val resultsIterator = transactionResults.iterator()
batchableTransactions.forEach { transaction ->
val result = cache.get(transaction.key) as? TOutput ?: resultsIterator.next().also { result ->
cache.set(transaction.key, result as Any)
}
transaction.future.complete(result)
}
}
}
)
}

companion object {
private val logger = LoggerFactory.getLogger(TriggeredPublisher::class.java)
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import java.util.concurrent.CompletableFuture

/**
* convenient class to store the reference of a [future] of type [TOutput]
* that will be resolved asynchronously at later point in time by using [input] as source
* it supports deduplication by using the [key] field
*/
data class BatchableTransaction<TInput, TOutput>(
val input: TInput,
val future: CompletableFuture<TOutput>,
val key: String
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher
import com.expediagroup.graphql.transactionbatcher.transaction.cache.DefaultTransactionBatcherCache
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache
import java.util.concurrent.CompletableFuture
import java.util.concurrent.ConcurrentHashMap
import kotlin.reflect.KClass

/**
* Holds logic to apply batching, deduplication and caching of [BatchableTransaction]
* if no [TransactionBatcherCache] implementation is provided it will use [DefaultTransactionBatcherCache]
*/
class TransactionBatcher(
private val cache: TransactionBatcherCache = DefaultTransactionBatcherCache()
) {

val batch = ConcurrentHashMap<
KClass<out TriggeredPublisher<Any, Any>>,
TriggeredPublisherTransactions
>()

/**
* adds a transaction [input] to the batch along with the [triggeredPublisher] instance that will receive the [BatchableTransaction]
* deduplication will be based on [transactionKey] which by default is the toString() representation of [input]
* batching will be based on the implementation of [TriggeredPublisher]
* this method returns a reference to a [CompletableFuture] which is a field of the [BatchableTransaction] that was just
* added into the queue
*/
@Suppress("UNCHECKED_CAST")
fun <TInput : Any, TOutput : Any> batch(
input: TInput,
transactionKey: String = input.toString(),
triggeredPublisher: TriggeredPublisher<TInput, TOutput>
): CompletableFuture<TOutput> {
val publisherClass = (triggeredPublisher as TriggeredPublisher<Any, Any>)::class
var future = CompletableFuture<TOutput>()
batch.computeIfPresent(publisherClass) { _, publisherTransactions ->
val transaction = publisherTransactions.transactions.computeIfAbsent(transactionKey) {
BatchableTransaction(
input,
future as CompletableFuture<Any>,
transactionKey
)
}
future = transaction.future as CompletableFuture<TOutput>
publisherTransactions
}
batch.computeIfAbsent(publisherClass) {
TriggeredPublisherTransactions(
triggeredPublisher,
linkedMapOf(
transactionKey to BatchableTransaction(
input,
future as CompletableFuture<Any>,
transactionKey
)
)
)
}
return future
}

/**
* Trigger concurrently and asynchronously the instances of [TriggeredPublisher] that the [batch] holds
* at the end clear the queue
*/
@Synchronized fun dispatch() {
batch.values.forEach { (triggeredPublisher, transactions) ->
triggeredPublisher.trigger(transactions.values.toList(), cache)
}
batch.clear()
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction

import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher

/**
* Type for [TransactionBatcher.batch] value, storing the [triggeredPublisher] instance
* and the [transactions] that need to be executed by it
*/
data class TriggeredPublisherTransactions(
val triggeredPublisher: TriggeredPublisher<Any, Any>,
val transactions: LinkedHashMap<String, BatchableTransaction<Any, Any>>
)
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction.cache

import java.util.concurrent.ConcurrentHashMap

/**
* Default implementation of [TransactionBatcherCache] using an in memory [cache]
* without eviction
*/
class DefaultTransactionBatcherCache : TransactionBatcherCache {
private val cache = ConcurrentHashMap<String, Any>()

override fun set(key: String, value: Any) {
cache[key] = value
}

override fun get(key: String): Any? = cache[key]

override fun contains(key: String): Boolean = cache.containsKey(key)
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.transaction.cache

/**
* Interface that allows any cache implementation that can be used by the TransactionBatcher
* by default will use [DefaultTransactionBatcherCache]
*/
interface TransactionBatcherCache {
fun set(key: String, value: Any)
fun get(key: String): Any?
fun contains(key: String): Boolean
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
/*
* Copyright 2022 Expedia, Inc
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.expediagroup.graphql.transactionbatcher.publisher

import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher
import reactor.core.publisher.Mono
import reactor.kotlin.core.publisher.toFlux
import reactor.kotlin.core.publisher.toMono
import java.time.Duration
import java.util.concurrent.atomic.AtomicInteger

data class AstronautServiceRequest(val id: Int)
data class Astronaut(val id: Int, val name: String)

class AstronautService(
private val transactionBatcher: TransactionBatcher
) {

val produceArguments: MutableList<List<AstronautServiceRequest>> = mutableListOf()
val getAstronautCallCount: AtomicInteger = AtomicInteger(0)

fun getAstronaut(request: AstronautServiceRequest): Mono<Astronaut> {
getAstronautCallCount.incrementAndGet()
val future = this.transactionBatcher.batch(request) { input: List<AstronautServiceRequest> ->
produceArguments.add(input)
input.toFlux()
.flatMapSequential { request ->
{ astronauts[request.id] }
.toMono()
.flatMap { (astronaut, delay) ->
astronaut.toMono().delayElement(delay)
}
}
}
return future.toMono()
}

companion object {
private val astronauts = mapOf(
1 to Pair(Astronaut(1, "Buzz Aldrin"), Duration.ofMillis(300)),
2 to Pair(Astronaut(2, "William Anders"), Duration.ofMillis(600)),
3 to Pair(Astronaut(3, "Neil Armstrong"), Duration.ofMillis(200))
)
}
}
Loading