Skip to content

Commit 4591fde

Browse files
feat: transaction batcher module (ExpediaGroup#1348)
### 📝 Description `transaction-batcher` module is an alternative to `data-loaders`, using reactive stream to apply batching and deduplication of transactions needed by data fetchers, the implementation is completely agnostic of GraphQL and compatible with any asynchronous approach used to resolve data. it also supports caching to avoid making a request to a previous completed transaction.
1 parent 5c8bf5b commit 4591fde

File tree

12 files changed

+669
-0
lines changed

12 files changed

+669
-0
lines changed
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
description = "Transaction Batcher"
2+
3+
val reactiveStreamsVersion: String by project
4+
val junitVersion: String by project
5+
val slf4jVersion: String by project
6+
val reactorVersion: String by project
7+
val reactorExtensionsVersion: String by project
8+
9+
dependencies {
10+
implementation("org.reactivestreams:reactive-streams:$reactiveStreamsVersion")
11+
implementation("org.slf4j:slf4j-api:$slf4jVersion")
12+
testImplementation("io.projectreactor.kotlin:reactor-kotlin-extensions:$reactorExtensionsVersion")
13+
testImplementation("io.projectreactor:reactor-core:$reactorVersion")
14+
testImplementation("io.projectreactor:reactor-test:$reactorVersion")
15+
testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion")
16+
testImplementation("org.junit.jupiter:junit-jupiter-engine:$junitVersion")
17+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,94 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.publisher
18+
19+
import com.expediagroup.graphql.transactionbatcher.transaction.BatchableTransaction
20+
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache
21+
import org.reactivestreams.Publisher
22+
import org.reactivestreams.Subscriber
23+
import org.reactivestreams.Subscription
24+
import org.slf4j.LoggerFactory
25+
26+
/**
27+
* Interface representing a publisher with input [TInput] type and output [TOutput] type
28+
*/
29+
@Suppress(
30+
"ReactiveStreamsSubscriberImplementation",
31+
"UNCHECKED_CAST"
32+
)
33+
fun interface TriggeredPublisher<TInput, TOutput> {
34+
/**
35+
* Given an input of type [TInput] create a cold [Publisher] that will produce a [Publisher] of type [TOutput] of n elements
36+
* that maps to the size of the input [List] of [TInput]
37+
* order is important so make sure to produce elements in the same order of [input]
38+
*/
39+
fun produce(input: List<TInput>): Publisher<TOutput>
40+
41+
/**
42+
* Attempts to collect values from [cache] first and then [produce]
43+
*
44+
* Example:
45+
* if [TriggeredPublisher] is of type <Int, Int> and [cache] resolves [1, null, 3, null, 5, 6]
46+
* we will attempt to produce elements for index 1 and 3
47+
* when [produce] stream completes we will complete futures from either values resolved from [cache] or from [produce]
48+
*/
49+
fun trigger(
50+
batchableTransactions: List<BatchableTransaction<TInput, TOutput>>,
51+
cache: TransactionBatcherCache
52+
) {
53+
val transactionsNotInCache = batchableTransactions
54+
.filterNot { transaction -> cache.contains(transaction.key) }
55+
.map(BatchableTransaction<TInput, TOutput>::input)
56+
57+
produce(
58+
transactionsNotInCache
59+
).subscribe(
60+
object : Subscriber<TOutput> {
61+
private lateinit var subscription: Subscription
62+
private val transactionResults = mutableListOf<TOutput>()
63+
64+
override fun onSubscribe(subscription: Subscription) {
65+
this.subscription = subscription
66+
this.subscription.request(1)
67+
}
68+
69+
override fun onNext(result: TOutput) {
70+
transactionResults += result
71+
this.subscription.request(1)
72+
}
73+
74+
override fun onError(throwable: Throwable) {
75+
logger.error("Error while producing data", throwable)
76+
}
77+
78+
override fun onComplete() {
79+
val resultsIterator = transactionResults.iterator()
80+
batchableTransactions.forEach { transaction ->
81+
val result = cache.get(transaction.key) as? TOutput ?: resultsIterator.next().also { result ->
82+
cache.set(transaction.key, result as Any)
83+
}
84+
transaction.future.complete(result)
85+
}
86+
}
87+
}
88+
)
89+
}
90+
91+
companion object {
92+
private val logger = LoggerFactory.getLogger(TriggeredPublisher::class.java)
93+
}
94+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.transaction
18+
19+
import java.util.concurrent.CompletableFuture
20+
21+
/**
22+
* convenient class to store the reference of a [future] of type [TOutput]
23+
* that will be resolved asynchronously at later point in time by using [input] as source
24+
* it supports deduplication by using the [key] field
25+
*/
26+
data class BatchableTransaction<TInput, TOutput>(
27+
val input: TInput,
28+
val future: CompletableFuture<TOutput>,
29+
val key: String
30+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,90 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.transaction
18+
19+
import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher
20+
import com.expediagroup.graphql.transactionbatcher.transaction.cache.DefaultTransactionBatcherCache
21+
import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache
22+
import java.util.concurrent.CompletableFuture
23+
import java.util.concurrent.ConcurrentHashMap
24+
import kotlin.reflect.KClass
25+
26+
/**
27+
* Holds logic to apply batching, deduplication and caching of [BatchableTransaction]
28+
* if no [TransactionBatcherCache] implementation is provided it will use [DefaultTransactionBatcherCache]
29+
*/
30+
class TransactionBatcher(
31+
private val cache: TransactionBatcherCache = DefaultTransactionBatcherCache()
32+
) {
33+
34+
val batch = ConcurrentHashMap<
35+
KClass<out TriggeredPublisher<Any, Any>>,
36+
TriggeredPublisherTransactions
37+
>()
38+
39+
/**
40+
* adds a transaction [input] to the batch along with the [triggeredPublisher] instance that will receive the [BatchableTransaction]
41+
* deduplication will be based on [transactionKey] which by default is the toString() representation of [input]
42+
* batching will be based on the implementation of [TriggeredPublisher]
43+
* this method returns a reference to a [CompletableFuture] which is a field of the [BatchableTransaction] that was just
44+
* added into the queue
45+
*/
46+
@Suppress("UNCHECKED_CAST")
47+
fun <TInput : Any, TOutput : Any> batch(
48+
input: TInput,
49+
transactionKey: String = input.toString(),
50+
triggeredPublisher: TriggeredPublisher<TInput, TOutput>
51+
): CompletableFuture<TOutput> {
52+
val publisherClass = (triggeredPublisher as TriggeredPublisher<Any, Any>)::class
53+
var future = CompletableFuture<TOutput>()
54+
batch.computeIfPresent(publisherClass) { _, publisherTransactions ->
55+
val transaction = publisherTransactions.transactions.computeIfAbsent(transactionKey) {
56+
BatchableTransaction(
57+
input,
58+
future as CompletableFuture<Any>,
59+
transactionKey
60+
)
61+
}
62+
future = transaction.future as CompletableFuture<TOutput>
63+
publisherTransactions
64+
}
65+
batch.computeIfAbsent(publisherClass) {
66+
TriggeredPublisherTransactions(
67+
triggeredPublisher,
68+
linkedMapOf(
69+
transactionKey to BatchableTransaction(
70+
input,
71+
future as CompletableFuture<Any>,
72+
transactionKey
73+
)
74+
)
75+
)
76+
}
77+
return future
78+
}
79+
80+
/**
81+
* Trigger concurrently and asynchronously the instances of [TriggeredPublisher] that the [batch] holds
82+
* at the end clear the queue
83+
*/
84+
@Synchronized fun dispatch() {
85+
batch.values.forEach { (triggeredPublisher, transactions) ->
86+
triggeredPublisher.trigger(transactions.values.toList(), cache)
87+
}
88+
batch.clear()
89+
}
90+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.transaction
18+
19+
import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher
20+
21+
/**
22+
* Type for [TransactionBatcher.batch] value, storing the [triggeredPublisher] instance
23+
* and the [transactions] that need to be executed by it
24+
*/
25+
data class TriggeredPublisherTransactions(
26+
val triggeredPublisher: TriggeredPublisher<Any, Any>,
27+
val transactions: LinkedHashMap<String, BatchableTransaction<Any, Any>>
28+
)
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,35 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.transaction.cache
18+
19+
import java.util.concurrent.ConcurrentHashMap
20+
21+
/**
22+
* Default implementation of [TransactionBatcherCache] using an in memory [cache]
23+
* without eviction
24+
*/
25+
class DefaultTransactionBatcherCache : TransactionBatcherCache {
26+
private val cache = ConcurrentHashMap<String, Any>()
27+
28+
override fun set(key: String, value: Any) {
29+
cache[key] = value
30+
}
31+
32+
override fun get(key: String): Any? = cache[key]
33+
34+
override fun contains(key: String): Boolean = cache.containsKey(key)
35+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.transaction.cache
18+
19+
/**
20+
* Interface that allows any cache implementation that can be used by the TransactionBatcher
21+
* by default will use [DefaultTransactionBatcherCache]
22+
*/
23+
interface TransactionBatcherCache {
24+
fun set(key: String, value: Any)
25+
fun get(key: String): Any?
26+
fun contains(key: String): Boolean
27+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,59 @@
1+
/*
2+
* Copyright 2022 Expedia, Inc
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.expediagroup.graphql.transactionbatcher.publisher
18+
19+
import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher
20+
import reactor.core.publisher.Mono
21+
import reactor.kotlin.core.publisher.toFlux
22+
import reactor.kotlin.core.publisher.toMono
23+
import java.time.Duration
24+
import java.util.concurrent.atomic.AtomicInteger
25+
26+
data class AstronautServiceRequest(val id: Int)
27+
data class Astronaut(val id: Int, val name: String)
28+
29+
class AstronautService(
30+
private val transactionBatcher: TransactionBatcher
31+
) {
32+
33+
val produceArguments: MutableList<List<AstronautServiceRequest>> = mutableListOf()
34+
val getAstronautCallCount: AtomicInteger = AtomicInteger(0)
35+
36+
fun getAstronaut(request: AstronautServiceRequest): Mono<Astronaut> {
37+
getAstronautCallCount.incrementAndGet()
38+
val future = this.transactionBatcher.batch(request) { input: List<AstronautServiceRequest> ->
39+
produceArguments.add(input)
40+
input.toFlux()
41+
.flatMapSequential { request ->
42+
{ astronauts[request.id] }
43+
.toMono()
44+
.flatMap { (astronaut, delay) ->
45+
astronaut.toMono().delayElement(delay)
46+
}
47+
}
48+
}
49+
return future.toMono()
50+
}
51+
52+
companion object {
53+
private val astronauts = mapOf(
54+
1 to Pair(Astronaut(1, "Buzz Aldrin"), Duration.ofMillis(300)),
55+
2 to Pair(Astronaut(2, "William Anders"), Duration.ofMillis(600)),
56+
3 to Pair(Astronaut(3, "Neil Armstrong"), Duration.ofMillis(200))
57+
)
58+
}
59+
}

0 commit comments

Comments
 (0)