-
Notifications
You must be signed in to change notification settings - Fork 362
feat: transaction batcher module #1348
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
dariuszkuc
merged 25 commits into
ExpediaGroup:master
from
samuelAndalon:feat/transaction-batcher
Feb 23, 2022
Merged
Changes from all commits
Commits
Show all changes
25 commits
Select commit
Hold shift + click to select a range
586a943
feat: transaction batcher module
18d8f49
feat: resolve from cache and transaction
6c159bb
feat: unused import
be0656b
feat: address ktlint warnings
3f0da49
feat: TriggeredPublisher as an interface
d1b3259
feat: rename module
8c48c7f
feat: add comments
3d471eb
feat: update javadocs
e63ad23
feat: address comments
e35e4cb
feat: simplify example usages
eddea02
feat: pull master
bade1ab
feat: use functional interface for TriggeredPublisher
dca15eb
feat: merge master
80c3d1e
feat: renaming batch key
1175148
feat: use atomic methods of ConcurrentHashMap
ddda975
feat: rename queue key
8e668f9
Merge branch 'master' of https://github.com/ExpediaGroup/graphql-kotl…
434e071
feat: fix test
03d8e7b
feat: Synchroninzed dispatch method
d4560aa
feat: address comments and improve tests readability
30bce0a
feat: simplify TriggerePublisher onComplete
694bce6
feat: use toList to get transactions
8bbad78
feat: use computeIfAbsent for deduping and adding existing transactions
ba39eed
feat: add most simple test use case
fd29a98
feat: avoid using scope function
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
17 changes: 17 additions & 0 deletions
17
executions/graphql-kotlin-transaction-batcher/build.gradle.kts
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
description = "Transaction Batcher" | ||
|
||
val reactiveStreamsVersion: String by project | ||
val junitVersion: String by project | ||
val slf4jVersion: String by project | ||
val reactorVersion: String by project | ||
val reactorExtensionsVersion: String by project | ||
|
||
dependencies { | ||
implementation("org.reactivestreams:reactive-streams:$reactiveStreamsVersion") | ||
implementation("org.slf4j:slf4j-api:$slf4jVersion") | ||
testImplementation("io.projectreactor.kotlin:reactor-kotlin-extensions:$reactorExtensionsVersion") | ||
testImplementation("io.projectreactor:reactor-core:$reactorVersion") | ||
testImplementation("io.projectreactor:reactor-test:$reactorVersion") | ||
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion") | ||
testImplementation("org.junit.jupiter:junit-jupiter-engine:$junitVersion") | ||
} |
94 changes: 94 additions & 0 deletions
94
...c/main/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/TriggeredPublisher.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,94 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.publisher | ||
|
||
import com.expediagroup.graphql.transactionbatcher.transaction.BatchableTransaction | ||
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache | ||
import org.reactivestreams.Publisher | ||
import org.reactivestreams.Subscriber | ||
import org.reactivestreams.Subscription | ||
import org.slf4j.LoggerFactory | ||
|
||
/** | ||
* Interface representing a publisher with input [TInput] type and output [TOutput] type | ||
*/ | ||
@Suppress( | ||
"ReactiveStreamsSubscriberImplementation", | ||
"UNCHECKED_CAST" | ||
) | ||
fun interface TriggeredPublisher<TInput, TOutput> { | ||
/** | ||
* Given an input of type [TInput] create a cold [Publisher] that will produce a [Publisher] of type [TOutput] of n elements | ||
* that maps to the size of the input [List] of [TInput] | ||
* order is important so make sure to produce elements in the same order of [input] | ||
*/ | ||
fun produce(input: List<TInput>): Publisher<TOutput> | ||
|
||
/** | ||
* Attempts to collect values from [cache] first and then [produce] | ||
* | ||
* Example: | ||
* if [TriggeredPublisher] is of type <Int, Int> and [cache] resolves [1, null, 3, null, 5, 6] | ||
* we will attempt to produce elements for index 1 and 3 | ||
* when [produce] stream completes we will complete futures from either values resolved from [cache] or from [produce] | ||
*/ | ||
fun trigger( | ||
batchableTransactions: List<BatchableTransaction<TInput, TOutput>>, | ||
cache: TransactionBatcherCache | ||
) { | ||
val transactionsNotInCache = batchableTransactions | ||
.filterNot { transaction -> cache.contains(transaction.key) } | ||
.map(BatchableTransaction<TInput, TOutput>::input) | ||
|
||
produce( | ||
transactionsNotInCache | ||
).subscribe( | ||
object : Subscriber<TOutput> { | ||
private lateinit var subscription: Subscription | ||
private val transactionResults = mutableListOf<TOutput>() | ||
|
||
override fun onSubscribe(subscription: Subscription) { | ||
this.subscription = subscription | ||
this.subscription.request(1) | ||
} | ||
|
||
override fun onNext(result: TOutput) { | ||
transactionResults += result | ||
this.subscription.request(1) | ||
} | ||
|
||
override fun onError(throwable: Throwable) { | ||
logger.error("Error while producing data", throwable) | ||
} | ||
|
||
override fun onComplete() { | ||
val resultsIterator = transactionResults.iterator() | ||
batchableTransactions.forEach { transaction -> | ||
val result = cache.get(transaction.key) as? TOutput ?: resultsIterator.next().also { result -> | ||
cache.set(transaction.key, result as Any) | ||
} | ||
transaction.future.complete(result) | ||
} | ||
} | ||
} | ||
) | ||
} | ||
|
||
companion object { | ||
private val logger = LoggerFactory.getLogger(TriggeredPublisher::class.java) | ||
} | ||
} |
30 changes: 30 additions & 0 deletions
30
...in/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/BatchableTransaction.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,30 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.transaction | ||
|
||
import java.util.concurrent.CompletableFuture | ||
|
||
/** | ||
* convenient class to store the reference of a [future] of type [TOutput] | ||
* that will be resolved asynchronously at later point in time by using [input] as source | ||
* it supports deduplication by using the [key] field | ||
*/ | ||
data class BatchableTransaction<TInput, TOutput>( | ||
val input: TInput, | ||
val future: CompletableFuture<TOutput>, | ||
val key: String | ||
) |
90 changes: 90 additions & 0 deletions
90
...main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcher.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,90 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.transaction | ||
|
||
import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher | ||
import com.expediagroup.graphql.transactionbatcher.transaction.cache.DefaultTransactionBatcherCache | ||
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache | ||
import java.util.concurrent.CompletableFuture | ||
import java.util.concurrent.ConcurrentHashMap | ||
import kotlin.reflect.KClass | ||
|
||
/** | ||
* Holds logic to apply batching, deduplication and caching of [BatchableTransaction] | ||
* if no [TransactionBatcherCache] implementation is provided it will use [DefaultTransactionBatcherCache] | ||
*/ | ||
class TransactionBatcher( | ||
private val cache: TransactionBatcherCache = DefaultTransactionBatcherCache() | ||
) { | ||
|
||
val batch = ConcurrentHashMap< | ||
KClass<out TriggeredPublisher<Any, Any>>, | ||
TriggeredPublisherTransactions | ||
>() | ||
|
||
/** | ||
* adds a transaction [input] to the batch along with the [triggeredPublisher] instance that will receive the [BatchableTransaction] | ||
* deduplication will be based on [transactionKey] which by default is the toString() representation of [input] | ||
* batching will be based on the implementation of [TriggeredPublisher] | ||
* this method returns a reference to a [CompletableFuture] which is a field of the [BatchableTransaction] that was just | ||
* added into the queue | ||
*/ | ||
@Suppress("UNCHECKED_CAST") | ||
fun <TInput : Any, TOutput : Any> batch( | ||
input: TInput, | ||
transactionKey: String = input.toString(), | ||
triggeredPublisher: TriggeredPublisher<TInput, TOutput> | ||
): CompletableFuture<TOutput> { | ||
val publisherClass = (triggeredPublisher as TriggeredPublisher<Any, Any>)::class | ||
var future = CompletableFuture<TOutput>() | ||
batch.computeIfPresent(publisherClass) { _, publisherTransactions -> | ||
val transaction = publisherTransactions.transactions.computeIfAbsent(transactionKey) { | ||
BatchableTransaction( | ||
input, | ||
future as CompletableFuture<Any>, | ||
transactionKey | ||
) | ||
} | ||
future = transaction.future as CompletableFuture<TOutput> | ||
publisherTransactions | ||
} | ||
batch.computeIfAbsent(publisherClass) { | ||
TriggeredPublisherTransactions( | ||
triggeredPublisher, | ||
linkedMapOf( | ||
transactionKey to BatchableTransaction( | ||
input, | ||
future as CompletableFuture<Any>, | ||
transactionKey | ||
) | ||
) | ||
) | ||
} | ||
return future | ||
} | ||
|
||
/** | ||
* Trigger concurrently and asynchronously the instances of [TriggeredPublisher] that the [batch] holds | ||
* at the end clear the queue | ||
*/ | ||
@Synchronized fun dispatch() { | ||
batch.values.forEach { (triggeredPublisher, transactions) -> | ||
triggeredPublisher.trigger(transactions.values.toList(), cache) | ||
} | ||
batch.clear() | ||
} | ||
} |
28 changes: 28 additions & 0 deletions
28
...com/expediagroup/graphql/transactionbatcher/transaction/TriggeredPublisherTransactions.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,28 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.transaction | ||
|
||
import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher | ||
|
||
/** | ||
* Type for [TransactionBatcher.batch] value, storing the [triggeredPublisher] instance | ||
* and the [transactions] that need to be executed by it | ||
*/ | ||
data class TriggeredPublisherTransactions( | ||
val triggeredPublisher: TriggeredPublisher<Any, Any>, | ||
val transactions: LinkedHashMap<String, BatchableTransaction<Any, Any>> | ||
) |
35 changes: 35 additions & 0 deletions
35
...pediagroup/graphql/transactionbatcher/transaction/cache/DefaultTransactionBatcherCache.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,35 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.transaction.cache | ||
|
||
import java.util.concurrent.ConcurrentHashMap | ||
|
||
/** | ||
* Default implementation of [TransactionBatcherCache] using an in memory [cache] | ||
* without eviction | ||
*/ | ||
class DefaultTransactionBatcherCache : TransactionBatcherCache { | ||
private val cache = ConcurrentHashMap<String, Any>() | ||
|
||
override fun set(key: String, value: Any) { | ||
cache[key] = value | ||
} | ||
|
||
override fun get(key: String): Any? = cache[key] | ||
|
||
override fun contains(key: String): Boolean = cache.containsKey(key) | ||
} |
27 changes: 27 additions & 0 deletions
27
.../com/expediagroup/graphql/transactionbatcher/transaction/cache/TransactionBatcherCache.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,27 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.transaction.cache | ||
|
||
/** | ||
* Interface that allows any cache implementation that can be used by the TransactionBatcher | ||
* by default will use [DefaultTransactionBatcherCache] | ||
*/ | ||
interface TransactionBatcherCache { | ||
fun set(key: String, value: Any) | ||
fun get(key: String): Any? | ||
fun contains(key: String): Boolean | ||
} |
59 changes: 59 additions & 0 deletions
59
...src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/AstronautService.kt
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
/* | ||
* Copyright 2022 Expedia, Inc | ||
* | ||
* Licensed under the Apache License, Version 2.0 (the "License"); | ||
* you may not use this file except in compliance with the License. | ||
* You may obtain a copy of the License at | ||
* | ||
* https://www.apache.org/licenses/LICENSE-2.0 | ||
* | ||
* Unless required by applicable law or agreed to in writing, software | ||
* distributed under the License is distributed on an "AS IS" BASIS, | ||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
* See the License for the specific language governing permissions and | ||
* limitations under the License. | ||
*/ | ||
|
||
package com.expediagroup.graphql.transactionbatcher.publisher | ||
|
||
import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher | ||
import reactor.core.publisher.Mono | ||
import reactor.kotlin.core.publisher.toFlux | ||
import reactor.kotlin.core.publisher.toMono | ||
import java.time.Duration | ||
import java.util.concurrent.atomic.AtomicInteger | ||
|
||
data class AstronautServiceRequest(val id: Int) | ||
data class Astronaut(val id: Int, val name: String) | ||
|
||
class AstronautService( | ||
private val transactionBatcher: TransactionBatcher | ||
) { | ||
|
||
val produceArguments: MutableList<List<AstronautServiceRequest>> = mutableListOf() | ||
val getAstronautCallCount: AtomicInteger = AtomicInteger(0) | ||
|
||
fun getAstronaut(request: AstronautServiceRequest): Mono<Astronaut> { | ||
getAstronautCallCount.incrementAndGet() | ||
val future = this.transactionBatcher.batch(request) { input: List<AstronautServiceRequest> -> | ||
produceArguments.add(input) | ||
input.toFlux() | ||
.flatMapSequential { request -> | ||
{ astronauts[request.id] } | ||
.toMono() | ||
.flatMap { (astronaut, delay) -> | ||
astronaut.toMono().delayElement(delay) | ||
} | ||
} | ||
} | ||
return future.toMono() | ||
} | ||
|
||
companion object { | ||
samuelAndalon marked this conversation as resolved.
Show resolved
Hide resolved
|
||
private val astronauts = mapOf( | ||
1 to Pair(Astronaut(1, "Buzz Aldrin"), Duration.ofMillis(300)), | ||
2 to Pair(Astronaut(2, "William Anders"), Duration.ofMillis(600)), | ||
3 to Pair(Astronaut(3, "Neil Armstrong"), Duration.ofMillis(200)) | ||
) | ||
} | ||
} |
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
why are we suppressing this warning? is this valid?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is valid, given that
Subscriber
is an interface, so it's valid to implement it, However a warning appears in the IDE when trying to do it.i can see a lot of
Subscriber
implementations that need to add that suppress as wellhttps://github.com/search?p=2&q=%22ReactiveStreamsSubscriberImplementation%22&type=Code
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unsure, I am not getting any warning in my local
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.