diff --git a/executions/graphql-kotlin-transaction-batcher/build.gradle.kts b/executions/graphql-kotlin-transaction-batcher/build.gradle.kts new file mode 100644 index 0000000000..3b34f7392f --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/build.gradle.kts @@ -0,0 +1,17 @@ +description = "Transaction Batcher" + +val reactiveStreamsVersion: String by project +val junitVersion: String by project +val slf4jVersion: String by project +val reactorVersion: String by project +val reactorExtensionsVersion: String by project + +dependencies { + implementation("org.reactivestreams:reactive-streams:$reactiveStreamsVersion") + implementation("org.slf4j:slf4j-api:$slf4jVersion") + testImplementation("io.projectreactor.kotlin:reactor-kotlin-extensions:$reactorExtensionsVersion") + testImplementation("io.projectreactor:reactor-core:$reactorVersion") + testImplementation("io.projectreactor:reactor-test:$reactorVersion") + testImplementation("org.junit.jupiter:junit-jupiter-api:$junitVersion") + testImplementation("org.junit.jupiter:junit-jupiter-engine:$junitVersion") +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/TriggeredPublisher.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/TriggeredPublisher.kt new file mode 100644 index 0000000000..cd26f05441 --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/TriggeredPublisher.kt @@ -0,0 +1,94 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.publisher + +import com.expediagroup.graphql.transactionbatcher.transaction.BatchableTransaction +import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache +import org.reactivestreams.Publisher +import org.reactivestreams.Subscriber +import org.reactivestreams.Subscription +import org.slf4j.LoggerFactory + +/** + * Interface representing a publisher with input [TInput] type and output [TOutput] type + */ +@Suppress( + "ReactiveStreamsSubscriberImplementation", + "UNCHECKED_CAST" +) +fun interface TriggeredPublisher { + /** + * Given an input of type [TInput] create a cold [Publisher] that will produce a [Publisher] of type [TOutput] of n elements + * that maps to the size of the input [List] of [TInput] + * order is important so make sure to produce elements in the same order of [input] + */ + fun produce(input: List): Publisher + + /** + * Attempts to collect values from [cache] first and then [produce] + * + * Example: + * if [TriggeredPublisher] is of type and [cache] resolves [1, null, 3, null, 5, 6] + * we will attempt to produce elements for index 1 and 3 + * when [produce] stream completes we will complete futures from either values resolved from [cache] or from [produce] + */ + fun trigger( + batchableTransactions: List>, + cache: TransactionBatcherCache + ) { + val transactionsNotInCache = batchableTransactions + .filterNot { transaction -> cache.contains(transaction.key) } + .map(BatchableTransaction::input) + + produce( + transactionsNotInCache + ).subscribe( + object : Subscriber { + private lateinit var subscription: Subscription + private val transactionResults = mutableListOf() + + override fun onSubscribe(subscription: Subscription) { + this.subscription = subscription + this.subscription.request(1) + } + + override fun onNext(result: TOutput) { + transactionResults += result + this.subscription.request(1) + } + + override fun onError(throwable: Throwable) { + logger.error("Error while producing data", throwable) + } + + override fun onComplete() { + val resultsIterator = transactionResults.iterator() + batchableTransactions.forEach { transaction -> + val result = cache.get(transaction.key) as? TOutput ?: resultsIterator.next().also { result -> + cache.set(transaction.key, result as Any) + } + transaction.future.complete(result) + } + } + } + ) + } + + companion object { + private val logger = LoggerFactory.getLogger(TriggeredPublisher::class.java) + } +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/BatchableTransaction.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/BatchableTransaction.kt new file mode 100644 index 0000000000..b4d7a77fad --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/BatchableTransaction.kt @@ -0,0 +1,30 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction + +import java.util.concurrent.CompletableFuture + +/** + * convenient class to store the reference of a [future] of type [TOutput] + * that will be resolved asynchronously at later point in time by using [input] as source + * it supports deduplication by using the [key] field + */ +data class BatchableTransaction( + val input: TInput, + val future: CompletableFuture, + val key: String +) diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcher.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcher.kt new file mode 100644 index 0000000000..97aa19dd7b --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcher.kt @@ -0,0 +1,90 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction + +import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher +import com.expediagroup.graphql.transactionbatcher.transaction.cache.DefaultTransactionBatcherCache +import com.expediagroup.graphql.transactionbatcher.transaction.cache.TransactionBatcherCache +import java.util.concurrent.CompletableFuture +import java.util.concurrent.ConcurrentHashMap +import kotlin.reflect.KClass + +/** + * Holds logic to apply batching, deduplication and caching of [BatchableTransaction] + * if no [TransactionBatcherCache] implementation is provided it will use [DefaultTransactionBatcherCache] + */ +class TransactionBatcher( + private val cache: TransactionBatcherCache = DefaultTransactionBatcherCache() +) { + + val batch = ConcurrentHashMap< + KClass>, + TriggeredPublisherTransactions + >() + + /** + * adds a transaction [input] to the batch along with the [triggeredPublisher] instance that will receive the [BatchableTransaction] + * deduplication will be based on [transactionKey] which by default is the toString() representation of [input] + * batching will be based on the implementation of [TriggeredPublisher] + * this method returns a reference to a [CompletableFuture] which is a field of the [BatchableTransaction] that was just + * added into the queue + */ + @Suppress("UNCHECKED_CAST") + fun batch( + input: TInput, + transactionKey: String = input.toString(), + triggeredPublisher: TriggeredPublisher + ): CompletableFuture { + val publisherClass = (triggeredPublisher as TriggeredPublisher)::class + var future = CompletableFuture() + batch.computeIfPresent(publisherClass) { _, publisherTransactions -> + val transaction = publisherTransactions.transactions.computeIfAbsent(transactionKey) { + BatchableTransaction( + input, + future as CompletableFuture, + transactionKey + ) + } + future = transaction.future as CompletableFuture + publisherTransactions + } + batch.computeIfAbsent(publisherClass) { + TriggeredPublisherTransactions( + triggeredPublisher, + linkedMapOf( + transactionKey to BatchableTransaction( + input, + future as CompletableFuture, + transactionKey + ) + ) + ) + } + return future + } + + /** + * Trigger concurrently and asynchronously the instances of [TriggeredPublisher] that the [batch] holds + * at the end clear the queue + */ + @Synchronized fun dispatch() { + batch.values.forEach { (triggeredPublisher, transactions) -> + triggeredPublisher.trigger(transactions.values.toList(), cache) + } + batch.clear() + } +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TriggeredPublisherTransactions.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TriggeredPublisherTransactions.kt new file mode 100644 index 0000000000..91da72aa89 --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TriggeredPublisherTransactions.kt @@ -0,0 +1,28 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction + +import com.expediagroup.graphql.transactionbatcher.publisher.TriggeredPublisher + +/** + * Type for [TransactionBatcher.batch] value, storing the [triggeredPublisher] instance + * and the [transactions] that need to be executed by it + */ +data class TriggeredPublisherTransactions( + val triggeredPublisher: TriggeredPublisher, + val transactions: LinkedHashMap> +) diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/DefaultTransactionBatcherCache.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/DefaultTransactionBatcherCache.kt new file mode 100644 index 0000000000..5e680fc333 --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/DefaultTransactionBatcherCache.kt @@ -0,0 +1,35 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction.cache + +import java.util.concurrent.ConcurrentHashMap + +/** + * Default implementation of [TransactionBatcherCache] using an in memory [cache] + * without eviction + */ +class DefaultTransactionBatcherCache : TransactionBatcherCache { + private val cache = ConcurrentHashMap() + + override fun set(key: String, value: Any) { + cache[key] = value + } + + override fun get(key: String): Any? = cache[key] + + override fun contains(key: String): Boolean = cache.containsKey(key) +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/TransactionBatcherCache.kt b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/TransactionBatcherCache.kt new file mode 100644 index 0000000000..94df56a41b --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/main/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/cache/TransactionBatcherCache.kt @@ -0,0 +1,27 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction.cache + +/** + * Interface that allows any cache implementation that can be used by the TransactionBatcher + * by default will use [DefaultTransactionBatcherCache] + */ +interface TransactionBatcherCache { + fun set(key: String, value: Any) + fun get(key: String): Any? + fun contains(key: String): Boolean +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/AstronautService.kt b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/AstronautService.kt new file mode 100644 index 0000000000..8b4fc6502e --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/AstronautService.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.publisher + +import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher +import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.toFlux +import reactor.kotlin.core.publisher.toMono +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger + +data class AstronautServiceRequest(val id: Int) +data class Astronaut(val id: Int, val name: String) + +class AstronautService( + private val transactionBatcher: TransactionBatcher +) { + + val produceArguments: MutableList> = mutableListOf() + val getAstronautCallCount: AtomicInteger = AtomicInteger(0) + + fun getAstronaut(request: AstronautServiceRequest): Mono { + getAstronautCallCount.incrementAndGet() + val future = this.transactionBatcher.batch(request) { input: List -> + produceArguments.add(input) + input.toFlux() + .flatMapSequential { request -> + { astronauts[request.id] } + .toMono() + .flatMap { (astronaut, delay) -> + astronaut.toMono().delayElement(delay) + } + } + } + return future.toMono() + } + + companion object { + private val astronauts = mapOf( + 1 to Pair(Astronaut(1, "Buzz Aldrin"), Duration.ofMillis(300)), + 2 to Pair(Astronaut(2, "William Anders"), Duration.ofMillis(600)), + 3 to Pair(Astronaut(3, "Neil Armstrong"), Duration.ofMillis(200)) + ) + } +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/MissionService.kt b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/MissionService.kt new file mode 100644 index 0000000000..a5f17f6c7b --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/publisher/MissionService.kt @@ -0,0 +1,59 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.publisher + +import com.expediagroup.graphql.transactionbatcher.transaction.TransactionBatcher +import reactor.core.publisher.Mono +import reactor.kotlin.core.publisher.toFlux +import reactor.kotlin.core.publisher.toMono +import java.time.Duration +import java.util.concurrent.atomic.AtomicInteger + +data class MissionServiceRequest(val id: Int) +data class Mission(val id: Int, val designation: String, val crew: List) + +class MissionService( + private val transactionBatcher: TransactionBatcher +) { + + val produceArguments: MutableList> = mutableListOf() + val getMissionCallCount: AtomicInteger = AtomicInteger(0) + + fun getMission(request: MissionServiceRequest): Mono { + getMissionCallCount.incrementAndGet() + val future = this.transactionBatcher.batch(request) { input: List -> + produceArguments.add(input) + input.toFlux() + .flatMapSequential { request -> + { missions[request.id] } + .toMono() + .flatMap { (astronaut, delay) -> + astronaut.toMono().delayElement(delay) + } + } + } + return future.toMono() + } + + companion object { + private val missions = mapOf( + 2 to Pair(Mission(2, "Apollo 4", listOf(14, 30, 7)), Duration.ofMillis(100)), + 3 to Pair(Mission(3, "Apollo 5", listOf(23, 10, 12)), Duration.ofMillis(400)), + 4 to Pair(Mission(4, "Apollo 6", listOf(1, 28, 31, 6)), Duration.ofMillis(300)) + ) + } +} diff --git a/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcherTest.kt b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcherTest.kt new file mode 100644 index 0000000000..8e62ad74c6 --- /dev/null +++ b/executions/graphql-kotlin-transaction-batcher/src/test/kotlin/com/expediagroup/graphql/transactionbatcher/transaction/TransactionBatcherTest.kt @@ -0,0 +1,223 @@ +/* + * Copyright 2022 Expedia, Inc + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * https://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.expediagroup.graphql.transactionbatcher.transaction + +import com.expediagroup.graphql.transactionbatcher.publisher.Astronaut +import com.expediagroup.graphql.transactionbatcher.publisher.AstronautService +import com.expediagroup.graphql.transactionbatcher.publisher.AstronautServiceRequest +import com.expediagroup.graphql.transactionbatcher.publisher.Mission +import com.expediagroup.graphql.transactionbatcher.publisher.MissionService +import com.expediagroup.graphql.transactionbatcher.publisher.MissionServiceRequest +import org.junit.jupiter.api.Test +import reactor.kotlin.core.publisher.toFlux +import kotlin.test.assertEquals + +class TransactionBatcherTest { + @Test + fun `TransactionBatcher should batch transactions`() { + val transactionBatcher = TransactionBatcher() + + val astronautService = AstronautService(transactionBatcher) + val astronautRequest1 = AstronautServiceRequest(1) + val astronautRequest2 = AstronautServiceRequest(2) + val astronauts = mutableListOf() + + listOf(astronautRequest1, astronautRequest2).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + + assertEquals("Buzz Aldrin", astronauts[0].name) + assertEquals("William Anders", astronauts[1].name) + } + + @Test + fun `TransactionBatcher should batch multiple transaction types`() { + val transactionBatcher = TransactionBatcher() + + val astronautService = AstronautService(transactionBatcher) + val astronautRequest1 = AstronautServiceRequest(3) + val astronautRequest2 = AstronautServiceRequest(2) + val astronautRequest3 = AstronautServiceRequest(1) + val astronauts = mutableListOf() + + val missionService = MissionService(transactionBatcher) + val missionRequest1 = MissionServiceRequest(4) + val missionRequest2 = MissionServiceRequest(3) + val missionRequest3 = MissionServiceRequest(2) + val missions = mutableListOf() + + listOf(astronautRequest1, astronautRequest2, astronautRequest3).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + listOf(missionRequest1, missionRequest2, missionRequest3).toFlux() + .flatMapSequential(missionService::getMission) + .collectList() + .subscribe(missions::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + + assertEquals(3, astronautService.getAstronautCallCount.get()) + assertEquals(3, astronautService.produceArguments[0][0].id) + assertEquals(2, astronautService.produceArguments[0][1].id) + assertEquals(1, astronautService.produceArguments[0][2].id) + assertEquals("Neil Armstrong", astronauts[0].name) + assertEquals("William Anders", astronauts[1].name) + assertEquals("Buzz Aldrin", astronauts[2].name) + + assertEquals(3, missionService.getMissionCallCount.get()) + assertEquals(4, missionService.produceArguments[0][0].id) + assertEquals(3, missionService.produceArguments[0][1].id) + assertEquals(2, missionService.produceArguments[0][2].id) + assertEquals("Apollo 6", missions[0].designation) + assertEquals("Apollo 5", missions[1].designation) + assertEquals("Apollo 4", missions[2].designation) + } + + @Test + fun `TransactionBatcher should deduplicate transactions`() { + val transactionBatcher = TransactionBatcher() + + val astronautService = AstronautService(transactionBatcher) + val astronautRequest1 = AstronautServiceRequest(3) + val astronautRequest1Repeated = AstronautServiceRequest(3) + val astronautRequest2 = AstronautServiceRequest(1) + val astronauts = mutableListOf() + + val missionService = MissionService(transactionBatcher) + val missionRequest1 = MissionServiceRequest(4) + val missionRequest1Repeated = MissionServiceRequest(4) + val missionRequest2 = MissionServiceRequest(2) + val missions = mutableListOf() + + listOf(astronautRequest1, astronautRequest1Repeated, astronautRequest2).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + listOf(missionRequest1, missionRequest1Repeated, missionRequest2).toFlux() + .flatMapSequential(missionService::getMission) + .collectList() + .subscribe(missions::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + + assertEquals(3, astronautService.getAstronautCallCount.get()) + assertEquals(2, astronautService.produceArguments[0].size) + assertEquals(3, astronautService.produceArguments[0][0].id) + assertEquals(1, astronautService.produceArguments[0][1].id) + assertEquals("Neil Armstrong", astronauts[0].name) + assertEquals("Neil Armstrong", astronauts[1].name) + assertEquals("Buzz Aldrin", astronauts[2].name) + + assertEquals(3, missionService.getMissionCallCount.get()) + assertEquals(2, missionService.produceArguments[0].size) + assertEquals(4, missionService.produceArguments[0][0].id) + assertEquals(2, missionService.produceArguments[0][1].id) + assertEquals("Apollo 6", missions[0].designation) + assertEquals("Apollo 6", missions[1].designation) + assertEquals("Apollo 4", missions[2].designation) + } + + @Test + fun `TransactionBatcher should cache transaction results`() { + val transactionBatcher = TransactionBatcher() + + val astronautService = AstronautService(transactionBatcher) + val astronautRequest1 = AstronautServiceRequest(3) + val astronautRequest2 = AstronautServiceRequest(2) + val astronautRequest3 = AstronautServiceRequest(1) + val astronauts = mutableListOf() + + listOf(astronautRequest1, astronautRequest2, astronautRequest3).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + + assertEquals(3, astronautService.getAstronautCallCount.get()) + assertEquals(3, astronautService.produceArguments[0][0].id) + assertEquals(2, astronautService.produceArguments[0][1].id) + assertEquals(1, astronautService.produceArguments[0][2].id) + assertEquals("Neil Armstrong", astronauts[0].name) + assertEquals("William Anders", astronauts[1].name) + assertEquals("Buzz Aldrin", astronauts[2].name) + astronauts.clear() + + listOf(astronautRequest3, astronautRequest1, astronautRequest2).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + assertEquals(6, astronautService.getAstronautCallCount.get()) + assertEquals(0, astronautService.produceArguments[1].size) + assertEquals("Buzz Aldrin", astronauts[0].name) + assertEquals("Neil Armstrong", astronauts[1].name) + assertEquals("William Anders", astronauts[2].name) + } + + @Test + fun `TransactionBatcher should resolve from cache and apply transaction`() { + val transactionBatcher = TransactionBatcher() + + val astronautService = AstronautService(transactionBatcher) + val astronautRequest1 = AstronautServiceRequest(3) + val astronautRequest2 = AstronautServiceRequest(2) + val astronauts = mutableListOf() + + listOf(astronautRequest1, astronautRequest2).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + + assertEquals(2, astronautService.getAstronautCallCount.get()) + assertEquals(3, astronautService.produceArguments[0][0].id) + assertEquals(2, astronautService.produceArguments[0][1].id) + assertEquals("Neil Armstrong", astronauts[0].name) + assertEquals("William Anders", astronauts[1].name) + astronauts.clear() + + val astronautRequest3 = AstronautServiceRequest(1) + + listOf(astronautRequest3, astronautRequest1, astronautRequest2).toFlux() + .flatMapSequential(astronautService::getAstronaut) + .collectList() + .subscribe(astronauts::addAll) + + transactionBatcher.dispatch() + Thread.sleep(1000) + assertEquals(5, astronautService.getAstronautCallCount.get()) + assertEquals(1, astronautService.produceArguments[1].size) + assertEquals("Buzz Aldrin", astronauts[0].name) + assertEquals("Neil Armstrong", astronauts[1].name) + assertEquals("William Anders", astronauts[2].name) + } +} diff --git a/gradle.properties b/gradle.properties index 1d693fd9a6..a9700039b8 100644 --- a/gradle.properties +++ b/gradle.properties @@ -28,6 +28,7 @@ kotlinPoetVersion = 1.10.2 ktorVersion = 1.6.3 reactorVersion = 3.4.14 reactorExtensionsVersion = 1.1.5 +reactiveStreamsVersion = 1.0.3 slf4jVersion = 1.7.33 springBootVersion = 2.6.3 springVersion = 5.3.15 diff --git a/settings.gradle.kts b/settings.gradle.kts index 547838fbb1..6580e59fe6 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -50,6 +50,9 @@ include(":graphql-kotlin-federated-hooks-provider") include(":graphql-kotlin-server") include(":graphql-kotlin-spring-server") +// Executions +include(":graphql-kotlin-transaction-batcher") + // // Project mappings so we don't need to create projects that group subprojects // @@ -76,3 +79,6 @@ project(":graphql-kotlin-federated-hooks-provider").projectDir = file("plugins/s // Servers project(":graphql-kotlin-server").projectDir = file("servers/graphql-kotlin-server") project(":graphql-kotlin-spring-server").projectDir = file("servers/graphql-kotlin-spring-server") + +// Executions +project(":graphql-kotlin-transaction-batcher").projectDir = file("executions/graphql-kotlin-transaction-batcher")