Skip to content

Commit 38b1a42

Browse files
authored
Add new dispatchers for webclient in proxies (#499)
Add new dispatchers for webclient in proxies
1 parent 5f9d18d commit 38b1a42

File tree

9 files changed

+444
-345
lines changed

9 files changed

+444
-345
lines changed
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package co.nilin.opex.api.ports.proxy.config
2+
3+
import kotlinx.coroutines.reactor.asCoroutineDispatcher
4+
import reactor.core.scheduler.Schedulers
5+
6+
object ProxyDispatchers {
7+
8+
val general = Schedulers.newBoundedElastic(10, 20, "general").asCoroutineDispatcher()
9+
val market = Schedulers.newBoundedElastic(30, 60, "market").asCoroutineDispatcher()
10+
val wallet = Schedulers.newBoundedElastic(10, 20, "wallet").asCoroutineDispatcher()
11+
}

api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/AccountantProxyImpl.kt

Lines changed: 32 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,10 @@ package co.nilin.opex.api.ports.proxy.impl
33
import co.nilin.opex.api.core.inout.PairFeeResponse
44
import co.nilin.opex.api.core.inout.PairInfoResponse
55
import co.nilin.opex.api.core.spi.AccountantProxy
6+
import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers
67
import co.nilin.opex.common.utils.LoggerDelegate
78
import kotlinx.coroutines.reactive.awaitSingle
9+
import kotlinx.coroutines.withContext
810
import org.springframework.beans.factory.annotation.Value
911
import org.springframework.http.MediaType
1012
import org.springframework.stereotype.Component
@@ -22,36 +24,42 @@ class AccountantProxyImpl(private val webClient: WebClient) : AccountantProxy {
2224

2325
override suspend fun getPairConfigs(): List<PairInfoResponse> {
2426
logger.info("fetching pair configs")
25-
return webClient.get()
26-
.uri("$baseUrl/config/all")
27-
.accept(MediaType.APPLICATION_JSON)
28-
.retrieve()
29-
.onStatus({ t -> t.isError }, { it.createException() })
30-
.bodyToFlux<PairInfoResponse>()
31-
.collectList()
32-
.awaitSingle()
27+
return withContext(ProxyDispatchers.general) {
28+
webClient.get()
29+
.uri("$baseUrl/config/all")
30+
.accept(MediaType.APPLICATION_JSON)
31+
.retrieve()
32+
.onStatus({ t -> t.isError }, { it.createException() })
33+
.bodyToFlux<PairInfoResponse>()
34+
.collectList()
35+
.awaitSingle()
36+
}
3337
}
3438

3539
override suspend fun getFeeConfigs(): List<PairFeeResponse> {
3640
logger.info("fetching fee configs")
37-
return webClient.get()
38-
.uri("$baseUrl/config/fee")
39-
.accept(MediaType.APPLICATION_JSON)
40-
.retrieve()
41-
.onStatus({ t -> t.isError }, { it.createException() })
42-
.bodyToFlux<PairFeeResponse>()
43-
.collectList()
44-
.awaitSingle()
41+
return withContext(ProxyDispatchers.general) {
42+
webClient.get()
43+
.uri("$baseUrl/config/fee")
44+
.accept(MediaType.APPLICATION_JSON)
45+
.retrieve()
46+
.onStatus({ t -> t.isError }, { it.createException() })
47+
.bodyToFlux<PairFeeResponse>()
48+
.collectList()
49+
.awaitSingle()
50+
}
4551
}
4652

4753
override suspend fun getFeeConfig(symbol: String): PairFeeResponse {
48-
logger.info("fetching fee configs")
49-
return webClient.get()
50-
.uri("$baseUrl/config/fee/$symbol")
51-
.accept(MediaType.APPLICATION_JSON)
52-
.retrieve()
53-
.onStatus({ t -> t.isError }, { it.createException() })
54-
.bodyToMono<PairFeeResponse>()
55-
.awaitSingle()
54+
logger.info("fetching fee configs for $symbol")
55+
return withContext(ProxyDispatchers.general) {
56+
webClient.get()
57+
.uri("$baseUrl/config/fee/$symbol")
58+
.accept(MediaType.APPLICATION_JSON)
59+
.retrieve()
60+
.onStatus({ t -> t.isError }, { it.createException() })
61+
.bodyToMono<PairFeeResponse>()
62+
.awaitSingle()
63+
}
5664
}
5765
}

api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BinanceGlobalMarketProxy.kt

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ package co.nilin.opex.api.ports.proxy.impl
22

33
import co.nilin.opex.api.core.inout.GlobalPrice
44
import co.nilin.opex.api.core.spi.GlobalMarketProxy
5+
import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers
56
import kotlinx.coroutines.reactive.awaitFirstOrElse
7+
import kotlinx.coroutines.withContext
68
import org.springframework.beans.factory.annotation.Value
79
import org.springframework.http.HttpHeaders
810
import org.springframework.http.MediaType
@@ -24,19 +26,21 @@ class BinanceGlobalMarketProxy(
2426
// Binance encoding requires to change some of the Java's encoding model
2527
// https://binance-docs.github.io/apidocs/spot/en/#symbol-price-ticker
2628
val param = symbols.map { s -> "\"$s\"" }.toString().replace(" ", "")
27-
val uri= UriComponentsBuilder.fromUriString("$baseUrl/api/v3/ticker/price")
29+
val uri = UriComponentsBuilder.fromUriString("$baseUrl/api/v3/ticker/price")
2830
.queryParam("symbols", URLEncoder.encode(param, Charsets.UTF_8).replace("%2C", ","))
2931
.build(true)
3032
.toUri()
3133

32-
return webClient.get()
33-
.uri(uri)
34-
.accept(MediaType.APPLICATION_JSON)
35-
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
36-
.retrieve()
37-
.onStatus({ t -> t.isError }, { it.createException() })
38-
.bodyToFlux<GlobalPrice>()
39-
.collectList()
40-
.awaitFirstOrElse { emptyList() }
34+
return withContext(ProxyDispatchers.general) {
35+
webClient.get()
36+
.uri(uri)
37+
.accept(MediaType.APPLICATION_JSON)
38+
.header(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
39+
.retrieve()
40+
.onStatus({ t -> t.isError }, { it.createException() })
41+
.bodyToFlux<GlobalPrice>()
42+
.collectList()
43+
.awaitFirstOrElse { emptyList() }
44+
}
4145
}
4246
}

api/api-ports/api-proxy-rest/src/main/kotlin/co/nilin/opex/api/ports/proxy/impl/BlockchainGatewayProxyImpl.kt

Lines changed: 37 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -4,11 +4,13 @@ import co.nilin.opex.api.core.inout.AssignResponse
44
import co.nilin.opex.api.core.inout.CurrencyImplementation
55
import co.nilin.opex.api.core.inout.DepositDetails
66
import co.nilin.opex.api.core.spi.BlockchainGatewayProxy
7+
import co.nilin.opex.api.ports.proxy.config.ProxyDispatchers
78
import co.nilin.opex.api.ports.proxy.data.AssignAddressRequest
89
import co.nilin.opex.api.ports.proxy.data.DepositDetailsRequest
910
import co.nilin.opex.common.utils.LoggerDelegate
1011
import kotlinx.coroutines.reactive.awaitFirstOrElse
1112
import kotlinx.coroutines.reactor.awaitSingleOrNull
13+
import kotlinx.coroutines.withContext
1214
import org.springframework.beans.factory.annotation.Value
1315
import org.springframework.http.MediaType
1416
import org.springframework.stereotype.Component
@@ -28,42 +30,48 @@ class BlockchainGatewayProxyImpl(private val client: WebClient) : BlockchainGate
2830

2931
override suspend fun assignAddress(uuid: String, currency: String, chain: String): AssignResponse? {
3032
logger.info("calling bc-gateway assign")
31-
return client.post()
32-
.uri(URI.create("$baseUrl/v1/address/assign"))
33-
.accept(MediaType.APPLICATION_JSON)
34-
.contentType(MediaType.APPLICATION_JSON)
35-
.body(Mono.just(AssignAddressRequest(uuid, currency, chain)))
36-
.retrieve()
37-
.onStatus({ t -> t.isError }, { it.createException() })
38-
.bodyToMono(AssignResponse::class.java)
39-
.awaitSingleOrNull()
33+
return withContext(ProxyDispatchers.general) {
34+
client.post()
35+
.uri(URI.create("$baseUrl/v1/address/assign"))
36+
.accept(MediaType.APPLICATION_JSON)
37+
.contentType(MediaType.APPLICATION_JSON)
38+
.body(Mono.just(AssignAddressRequest(uuid, currency, chain)))
39+
.retrieve()
40+
.onStatus({ t -> t.isError }, { it.createException() })
41+
.bodyToMono(AssignResponse::class.java)
42+
.awaitSingleOrNull()
43+
}
4044
}
4145

4246
override suspend fun getDepositDetails(refs: List<String>): List<DepositDetails> {
4347
logger.info("calling bc-gateway deposit details")
44-
return client.post()
45-
.uri(URI.create("$baseUrl/deposit/find/all"))
46-
.accept(MediaType.APPLICATION_JSON)
47-
.contentType(MediaType.APPLICATION_JSON)
48-
.body(Mono.just(DepositDetailsRequest(refs)))
49-
.retrieve()
50-
.onStatus({ t -> t.isError }, { it.createException() })
51-
.bodyToFlux<DepositDetails>()
52-
.collectList()
53-
.awaitFirstOrElse { emptyList() }
48+
return withContext(ProxyDispatchers.general) {
49+
client.post()
50+
.uri(URI.create("$baseUrl/deposit/find/all"))
51+
.accept(MediaType.APPLICATION_JSON)
52+
.contentType(MediaType.APPLICATION_JSON)
53+
.body(Mono.just(DepositDetailsRequest(refs)))
54+
.retrieve()
55+
.onStatus({ t -> t.isError }, { it.createException() })
56+
.bodyToFlux<DepositDetails>()
57+
.collectList()
58+
.awaitFirstOrElse { emptyList() }
59+
}
5460
}
5561

5662
override suspend fun getCurrencyImplementations(currency: String?): List<CurrencyImplementation> {
5763
logger.info("calling bc-gateway chain details")
58-
return client.get()
59-
.uri("$baseUrl/currency/chains") {
60-
it.queryParam("currency", currency)
61-
it.build()
62-
}.accept(MediaType.APPLICATION_JSON)
63-
.retrieve()
64-
.onStatus({ t -> t.isError }, { it.createException() })
65-
.bodyToFlux<CurrencyImplementation>()
66-
.collectList()
67-
.awaitFirstOrElse { emptyList() }
64+
return withContext(ProxyDispatchers.general) {
65+
client.get()
66+
.uri("$baseUrl/currency/chains") {
67+
it.queryParam("currency", currency)
68+
it.build()
69+
}.accept(MediaType.APPLICATION_JSON)
70+
.retrieve()
71+
.onStatus({ t -> t.isError }, { it.createException() })
72+
.bodyToFlux<CurrencyImplementation>()
73+
.collectList()
74+
.awaitFirstOrElse { emptyList() }
75+
}
6876
}
6977
}

0 commit comments

Comments
 (0)