Skip to content

Commit f35eff9

Browse files
Bugfix / Tweak: Coroutine server implementation (#52)
* Forgot to terminate the object grant instance * Try the new coroutine server strategy * Self-review cleanup
1 parent 2f72941 commit f35eff9

File tree

4 files changed

+274
-429
lines changed

4 files changed

+274
-429
lines changed

server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayServer.kt

Lines changed: 93 additions & 129 deletions
Original file line numberDiff line numberDiff line change
@@ -2,24 +2,33 @@ package tech.figure.objectstore.gateway.server
22

33
import io.grpc.Status
44
import io.grpc.StatusRuntimeException
5-
import io.grpc.stub.StreamObserver
65
import io.provenance.scope.encryption.model.KeyRef
76
import io.provenance.scope.encryption.util.getAddress
87
import io.provenance.scope.encryption.util.toPublicKey
8+
import kotlinx.coroutines.flow.Flow
99
import mu.KLogging
1010
import org.lognet.springboot.grpc.GRpcService
1111
import org.springframework.beans.factory.annotation.Qualifier
12-
import tech.figure.objectstore.gateway.GatewayGrpc
12+
import tech.figure.objectstore.gateway.GatewayGrpcKt
1313
import tech.figure.objectstore.gateway.GatewayOuterClass
14+
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest
15+
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantObjectPermissionsRequest.GrantTargetCase
1416
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionRequest
1517
import tech.figure.objectstore.gateway.GatewayOuterClass.BatchGrantScopePermissionResponse
18+
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectByHashResponse
19+
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectRequest
20+
import tech.figure.objectstore.gateway.GatewayOuterClass.FetchObjectResponse
1621
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsRequest
1722
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantObjectPermissionsResponse
1823
import tech.figure.objectstore.gateway.GatewayOuterClass.GrantScopePermissionResponse
24+
import tech.figure.objectstore.gateway.GatewayOuterClass.PutObjectResponse
25+
import tech.figure.objectstore.gateway.GatewayOuterClass.RegisterExistingObjectResponse
26+
import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeObjectPermissionsResponse
1927
import tech.figure.objectstore.gateway.GatewayOuterClass.RevokeScopePermissionResponse
2028
import tech.figure.objectstore.gateway.address
2129
import tech.figure.objectstore.gateway.configuration.BeanQualifiers
2230
import tech.figure.objectstore.gateway.configuration.ProvenanceProperties
31+
import tech.figure.objectstore.gateway.exception.InvalidInputException
2332
import tech.figure.objectstore.gateway.publicKey
2433
import tech.figure.objectstore.gateway.server.interceptor.JwtServerInterceptor
2534
import tech.figure.objectstore.gateway.service.GrantResponse
@@ -35,139 +44,101 @@ class ObjectStoreGatewayServer(
3544
private val scopePermissionsService: ScopePermissionsService,
3645
private val objectService: ObjectService,
3746
private val provenanceProperties: ProvenanceProperties,
38-
) : GatewayGrpc.GatewayImplBase() {
47+
) : GatewayGrpcKt.GatewayCoroutineImplBase() {
3948

4049
companion object : KLogging() {
4150
const val DEFAULT_UNKNOWN_DESCRIPTION: String = "An unexpected error occurred. Please try again later"
4251
}
4352

44-
override fun fetchObject(
45-
request: GatewayOuterClass.FetchObjectRequest,
46-
responseObserver: StreamObserver<GatewayOuterClass.FetchObjectResponse>
47-
) {
48-
scopeFetchService.fetchScopeForGrantee(request.scopeAddress, publicKey(), request.granterAddress.takeIf { it.isNotBlank() }).let {
49-
responseObserver.onNext(
50-
GatewayOuterClass.FetchObjectResponse.newBuilder()
51-
.setScopeId(request.scopeAddress)
52-
.addAllRecords(it)
53-
.build()
54-
)
53+
override suspend fun fetchObject(request: FetchObjectRequest): FetchObjectResponse = scopeFetchService
54+
.fetchScopeForGrantee(request.scopeAddress, publicKey(), request.granterAddress.takeIf { it.isNotBlank() })
55+
.let {
56+
FetchObjectResponse.newBuilder()
57+
.setScopeId(request.scopeAddress)
58+
.addAllRecords(it)
59+
.build()
5560
}
56-
responseObserver.onCompleted()
57-
}
5861

59-
override fun putObject(
62+
override suspend fun putObject(
6063
request: GatewayOuterClass.PutObjectRequest,
61-
responseObserver: StreamObserver<GatewayOuterClass.PutObjectResponse>
62-
) {
63-
objectService.putObject(request.`object`, publicKey(), request.additionalAudienceKeysList.map { it.toPublicKey() }, useRequesterKey = request.useRequesterKey).let {
64-
responseObserver.onNext(
65-
GatewayOuterClass.PutObjectResponse.newBuilder()
66-
.setHash(it)
67-
.build()
68-
)
69-
}
70-
responseObserver.onCompleted()
71-
}
64+
): PutObjectResponse = objectService
65+
.putObject(request.`object`, publicKey(), request.additionalAudienceKeysList.map { it.toPublicKey() }, useRequesterKey = request.useRequesterKey)
66+
.let { PutObjectResponse.newBuilder().setHash(it).build() }
7267

73-
override fun registerExistingObject(
68+
override suspend fun registerExistingObject(
7469
request: GatewayOuterClass.RegisterExistingObjectRequest,
75-
responseObserver: StreamObserver<GatewayOuterClass.RegisterExistingObjectResponse>
76-
) {
77-
objectService.registerExistingObject(request.hash, publicKey(), request.granteeAddressList).let {
78-
responseObserver.onNext(
79-
GatewayOuterClass.RegisterExistingObjectResponse.newBuilder()
80-
.setRequest(request)
81-
.build()
82-
)
83-
}
84-
responseObserver.onCompleted()
85-
}
70+
): RegisterExistingObjectResponse = objectService
71+
.registerExistingObject(request.hash, publicKey(), request.granteeAddressList)
72+
.let { RegisterExistingObjectResponse.newBuilder().setRequest(request).build() }
8673

87-
override fun fetchObjectByHash(
74+
override suspend fun fetchObjectByHash(
8875
request: GatewayOuterClass.FetchObjectByHashRequest,
89-
responseObserver: StreamObserver<GatewayOuterClass.FetchObjectByHashResponse>
90-
) {
91-
objectService.getObject(request.hash, address()).let { obj ->
92-
responseObserver.onNext(
93-
GatewayOuterClass.FetchObjectByHashResponse.newBuilder()
94-
.setObject(obj)
95-
.build()
96-
)
97-
responseObserver.onCompleted()
98-
}
99-
}
76+
): FetchObjectByHashResponse = objectService.getObject(request.hash, address())
77+
.let { obj -> FetchObjectByHashResponse.newBuilder().setObject(obj).build() }
10078

101-
override fun grantObjectPermissions(
79+
override suspend fun grantObjectPermissions(
10280
request: GrantObjectPermissionsRequest,
103-
responseObserver: StreamObserver<GrantObjectPermissionsResponse>,
104-
) {
105-
objectService.grantAccess(request.hash, request.granteeAddress, address()).let {
106-
responseObserver.onNext(
107-
GrantObjectPermissionsResponse.newBuilder()
108-
.setHash(request.hash)
109-
.setGranteeAddress(request.granteeAddress)
110-
.build()
111-
)
112-
responseObserver.onCompleted()
81+
): GrantObjectPermissionsResponse = objectService
82+
.grantAccess(request.hash, request.granteeAddress, address())
83+
.let {
84+
GrantObjectPermissionsResponse.newBuilder()
85+
.setHash(request.hash)
86+
.setGranteeAddress(request.granteeAddress)
87+
.build()
11388
}
114-
}
11589

116-
override fun revokeObjectPermissions(
117-
request: GatewayOuterClass.RevokeObjectPermissionsRequest,
118-
responseObserver: StreamObserver<GatewayOuterClass.RevokeObjectPermissionsResponse>
119-
) {
120-
objectService.revokeAccess(request.hash, address(), request.granteeAddressList).let {
121-
responseObserver.onNext(
122-
GatewayOuterClass.RevokeObjectPermissionsResponse.newBuilder()
123-
.setRequest(request)
124-
.build()
125-
)
90+
override fun batchGrantObjectPermissions(request: BatchGrantObjectPermissionsRequest): Flow<GrantObjectPermissionsResponse> {
91+
val (granteeAddress, targetHashes) = when (request.grantTargetCase) {
92+
GrantTargetCase.ALL_HASHES -> request.allHashes.granteeAddress to null
93+
GrantTargetCase.SPECIFIED_HASHES -> request.specifiedHashes.let { it.granteeAddress to it.targetHashesList }
94+
else -> throw InvalidInputException("A grant target must be supplied")
12695
}
127-
responseObserver.onCompleted()
96+
return objectService.batchGrantAccess(
97+
granteeAddress = granteeAddress,
98+
granterAddress = address(),
99+
targetHashes = targetHashes,
100+
)
128101
}
129102

130-
override fun grantScopePermission(
103+
override suspend fun revokeObjectPermissions(
104+
request: GatewayOuterClass.RevokeObjectPermissionsRequest,
105+
): RevokeObjectPermissionsResponse = objectService
106+
.revokeAccess(request.hash, address(), request.granteeAddressList)
107+
.let { RevokeObjectPermissionsResponse.newBuilder().setRequest(request).build() }
108+
109+
override suspend fun grantScopePermission(
131110
request: GatewayOuterClass.GrantScopePermissionRequest,
132-
responseObserver: StreamObserver<GrantScopePermissionResponse>,
133-
) {
111+
): GrantScopePermissionResponse {
134112
val (grantResponse, sourceDetails) = processScopeGrant(
135113
requesterAddress = address(),
136114
scopeAddress = request.scopeAddress,
137115
granteeAddress = request.granteeAddress,
138116
grantId = request.grantId,
139117
requestType = "Manual Grant",
140118
)
141-
val respond: (accepted: Boolean, granterAddress: String?) -> Unit = { accepted, granterAddress ->
142-
responseObserver.onNext(
143-
GrantScopePermissionResponse.newBuilder().also { rpcResp ->
144-
rpcResp.request = request
145-
rpcResp.grantAccepted = accepted
146-
granterAddress?.also { rpcResp.granterAddress = it }
147-
}.build()
148-
)
149-
responseObserver.onCompleted()
119+
val getResponse: (accepted: Boolean, granterAddress: String?) -> GrantScopePermissionResponse = { accepted, granterAddress ->
120+
GrantScopePermissionResponse.newBuilder().also { rpcResp ->
121+
rpcResp.request = request
122+
rpcResp.grantAccepted = accepted
123+
granterAddress?.also { rpcResp.granterAddress = it }
124+
}.build()
150125
}
151-
when (grantResponse) {
152-
is GrantResponse.Accepted -> respond(true, grantResponse.granterAddress)
126+
return when (grantResponse) {
127+
is GrantResponse.Accepted -> getResponse(true, grantResponse.granterAddress)
153128
is GrantResponse.Rejected -> {
154129
logger.warn("REJECTED $sourceDetails: ${grantResponse.message}")
155-
respond(false, null)
130+
getResponse(false, null)
156131
}
157132
is GrantResponse.Error -> {
158133
logger.error("ERROR $sourceDetails", grantResponse.cause)
159-
responseObserver.onError(StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION)))
134+
throw StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION))
160135
}
161136
}
162137
}
163138

164-
override fun batchGrantScopePermission(
165-
request: BatchGrantScopePermissionRequest,
166-
responseObserver: StreamObserver<BatchGrantScopePermissionResponse>,
167-
) {
139+
override suspend fun batchGrantScopePermission(request: BatchGrantScopePermissionRequest): BatchGrantScopePermissionResponse {
168140
if (request.granteesList.isEmpty()) {
169-
responseObserver.onError(StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("At least one grantee is required")))
170-
return
141+
throw StatusRuntimeException(Status.INVALID_ARGUMENT.withDescription("At least one grantee is required"))
171142
}
172143
val requesterAddress = address()
173144
val completedGrantDetails = request.granteesList.mapNotNull { grantee ->
@@ -179,7 +150,7 @@ class ObjectStoreGatewayServer(
179150
requestType = "Batch Grant",
180151
)
181152
// The response should only include grantee information when the grantees did not throw exceptions. The
182-
// caller can cross-reference the sent request versus what was produced in that list to determine any failures,
153+
// caller can cross-reference the intercepted request versus what was produced in that list to determine any failures,
183154
// if they care.
184155
when (grantResponse) {
185156
is GrantResponse.Accepted -> grantee to grantResponse.granterAddress
@@ -193,27 +164,23 @@ class ObjectStoreGatewayServer(
193164
}
194165
}
195166
}
196-
responseObserver.onNext(
197-
BatchGrantScopePermissionResponse.newBuilder().also { batchResponse ->
198-
batchResponse.request = request
199-
completedGrantDetails.map { (grantee, granterAddress) ->
200-
GrantScopePermissionResponse.newBuilder().also { grantResponse ->
201-
grantResponse.requestBuilder.scopeAddress = request.scopeAddress
202-
grantResponse.requestBuilder.granteeAddress = grantee.granteeAddress
203-
grantResponse.requestBuilder.grantId = grantee.grantId
204-
granterAddress?.also { grantResponse.granterAddress = it }
205-
grantResponse.grantAccepted = granterAddress != null
206-
}.build()
207-
}.also(batchResponse::addAllGrantResponses)
208-
}.build()
209-
)
210-
responseObserver.onCompleted()
167+
return BatchGrantScopePermissionResponse.newBuilder().also { batchResponse ->
168+
batchResponse.request = request
169+
completedGrantDetails.map { (grantee, granterAddress) ->
170+
GrantScopePermissionResponse.newBuilder().also { grantResponse ->
171+
grantResponse.requestBuilder.scopeAddress = request.scopeAddress
172+
grantResponse.requestBuilder.granteeAddress = grantee.granteeAddress
173+
grantResponse.requestBuilder.grantId = grantee.grantId
174+
granterAddress?.also { grantResponse.granterAddress = it }
175+
grantResponse.grantAccepted = granterAddress != null
176+
}.build()
177+
}.also(batchResponse::addAllGrantResponses)
178+
}.build()
211179
}
212180

213-
override fun revokeScopePermission(
181+
override suspend fun revokeScopePermission(
214182
request: GatewayOuterClass.RevokeScopePermissionRequest,
215-
responseObserver: StreamObserver<RevokeScopePermissionResponse>,
216-
) {
183+
): RevokeScopePermissionResponse {
217184
val requesterAddress = address()
218185
val grantId = request.grantId.takeIf { it.isNotBlank() }
219186
val sourceDetails = "Main revoke request by $requesterAddress for scope ${request.scopeAddress}, grantee ${request.granteeAddress}${if (grantId != null) ", grantId $grantId" else ""}"
@@ -230,25 +197,22 @@ class ObjectStoreGatewayServer(
230197
grantId = grantId,
231198
sourceDetails = sourceDetails,
232199
)
233-
val respond: (accepted: Boolean, revokedGrants: Int?) -> Unit = { accepted, revokedGrants ->
234-
responseObserver.onNext(
235-
RevokeScopePermissionResponse.newBuilder().also { rpcResp ->
236-
rpcResp.request = request
237-
rpcResp.revokeAccepted = accepted
238-
revokedGrants?.also { rpcResp.revokedGrantsCount = it }
239-
}.build()
240-
)
241-
responseObserver.onCompleted()
200+
val getResponse: (accepted: Boolean, revokedGrants: Int?) -> RevokeScopePermissionResponse = { accepted, revokedGrants ->
201+
RevokeScopePermissionResponse.newBuilder().also { rpcResp ->
202+
rpcResp.request = request
203+
rpcResp.revokeAccepted = accepted
204+
revokedGrants?.also { rpcResp.revokedGrantsCount = it }
205+
}.build()
242206
}
243-
when (revokeResponse) {
244-
is RevokeResponse.Accepted -> respond(true, revokeResponse.revokedGrantsCount)
207+
return when (revokeResponse) {
208+
is RevokeResponse.Accepted -> getResponse(true, revokeResponse.revokedGrantsCount)
245209
is RevokeResponse.Rejected -> {
246210
logger.warn("REJECTED $sourceDetails: ${revokeResponse.message}")
247-
respond(false, null)
211+
getResponse(false, null)
248212
}
249213
is RevokeResponse.Error -> {
250214
logger.error("ERROR $sourceDetails", revokeResponse.cause)
251-
responseObserver.onError(StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION)))
215+
throw StatusRuntimeException(Status.UNKNOWN.withDescription(DEFAULT_UNKNOWN_DESCRIPTION))
252216
}
253217
}
254218
}

server/src/main/kotlin/tech/figure/objectstore/gateway/server/ObjectStoreGatewayStreamServer.kt

Lines changed: 0 additions & 28 deletions
This file was deleted.

0 commit comments

Comments
 (0)