@@ -24,7 +24,8 @@ import org.apache.spark.{SparkEnv, SparkSQLException}
2424import org .apache .spark .internal .Logging
2525import org .apache .spark .sql .connect .common .ProtoUtils
2626import org .apache .spark .sql .connect .config .Connect .{CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_DURATION , CONNECT_EXECUTE_REATTACHABLE_SENDER_MAX_STREAM_SIZE }
27- import org .apache .spark .sql .connect .service .ExecuteHolder
27+ import org .apache .spark .sql .connect .service .{ExecuteHolder , SparkConnectService }
28+ import org .apache .spark .sql .connect .utils .ErrorUtils
2829
2930/**
3031 * ExecuteGrpcResponseSender sends responses to the GRPC stream. It consumes responses from
@@ -37,12 +38,14 @@ import org.apache.spark.sql.connect.service.ExecuteHolder
3738private [connect] class ExecuteGrpcResponseSender [T <: Message ](
3839 val executeHolder : ExecuteHolder ,
3940 grpcObserver : StreamObserver [T ])
40- extends Logging {
41+ extends Logging { self =>
4142
43+ // the executionObserver object is used as a synchronization lock between the
44+ // ExecuteGrpcResponseSender consumer and ExecuteResponseObserver producer.
4245 private var executionObserver = executeHolder.responseObserver
4346 .asInstanceOf [ExecuteResponseObserver [T ]]
4447
45- private var detached = false
48+ private var interrupted = false
4649
4750 // Signal to wake up when grpcCallObserver.isReady()
4851 private val grpcCallObserverReadySignal = new Object
@@ -51,42 +54,50 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
5154 private var consumeSleep = 0L
5255 private var sendSleep = 0L
5356
57+ // Thread handling the processing, in case it's done in the background.
58+ private var backgroundThread : Option [Thread ] = None
59+
5460 /**
55- * Detach this sender from executionObserver. Called only from executionObserver that this
56- * sender is attached to. Lock on executionObserver is held, and notifyAll will wake up this
57- * sender if sleeping.
61+ * Interrupt this sender and make it exit.
5862 */
59- def detach (): Unit = executionObserver.synchronized {
60- if (detached == true ) {
61- throw new IllegalStateException (" ExecuteGrpcResponseSender already detached!" )
62- }
63- detached = true
63+ def interrupt (): Unit = executionObserver.synchronized {
64+ interrupted = true
6465 executionObserver.notifyAll()
6566 }
6667
6768 def run (lastConsumedStreamIndex : Long ): Unit = {
6869 if (executeHolder.reattachable) {
69- // In reattachable execution, check if grpcObserver is ready for sending, by using
70- // setOnReadyHandler of the ServerCallStreamObserver. Otherwise, calling grpcObserver.onNext
71- // can queue the responses without sending them, and it is unknown how far behind it is, and
72- // hence how much the executionObserver needs to buffer.
70+ // In reattachable execution we use setOnReadyHandler and grpcCallObserver.isReady to control
71+ // backpressure. See sendResponse.
7372 //
74- // Because OnReady events get queued on the same GRPC inboud queue as the executePlan or
75- // reattachExecute RPC handler that this is executing in, OnReady events will not arrive and
76- // not trigger the OnReadyHandler unless this thread returns from executePlan/reattachExecute.
73+ // Because calls to OnReadyHandler get queued on the same GRPC inboud queue as the executePlan
74+ // or reattachExecute RPC handler that this is executing in, they will not arrive and not
75+ // trigger the OnReadyHandler unless this thread returns from executePlan/reattachExecute.
7776 // Therefore, we launch another thread to operate on the grpcObserver and send the responses,
7877 // while this thread will exit from the executePlan/reattachExecute call, allowing GRPC
7978 // to send the OnReady events.
8079 // See https://github.com/grpc/grpc-java/issues/7361
8180
82- val t = new Thread (
83- s " SparkConnectGRPCSender_ " +
84- s " opId= ${executeHolder.operationId}_startIndex= $lastConsumedStreamIndex" ) {
85- override def run (): Unit = {
86- execute(lastConsumedStreamIndex)
87- }
88- }
89- executeHolder.grpcSenderThreads += t
81+ backgroundThread = Some (
82+ new Thread (
83+ s " SparkConnectGRPCSender_ " +
84+ s " opId= ${executeHolder.operationId}_startIndex= $lastConsumedStreamIndex" ) {
85+ override def run (): Unit = {
86+ try {
87+ execute(lastConsumedStreamIndex)
88+ } catch {
89+ // This is executing in it's own thread, so need to handle RPC error like the
90+ // SparkConnectService handlers do.
91+ ErrorUtils .handleError(
92+ " async-grpc-response-sender" ,
93+ observer = grpcObserver,
94+ userId = executeHolder.request.getUserContext.getUserId,
95+ sessionId = executeHolder.request.getSessionId)
96+ } finally {
97+ executeHolder.removeGrpcResponseSender(self)
98+ }
99+ }
100+ })
90101
91102 val grpcCallObserver = grpcObserver.asInstanceOf [ServerCallStreamObserver [T ]]
92103 grpcCallObserver.setOnReadyHandler(() => {
@@ -97,16 +108,17 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
97108 })
98109
99110 // Start the thread and exit
100- t. start()
111+ backgroundThread.foreach(_. start() )
101112 } else {
102113 // Non reattachable execute runs directly in the GRPC thread.
103114 try {
104115 execute(lastConsumedStreamIndex)
105116 } finally {
117+ executeHolder.removeGrpcResponseSender(this )
106118 if (! executeHolder.reattachable) {
107119 // Non reattachable executions release here immediately.
108120 // (Reattachable executions release with ReleaseExecute RPC.)
109- executeHolder.close( )
121+ SparkConnectService .executionManager.removeExecuteHolder(executeHolder.key )
110122 }
111123 }
112124 }
@@ -159,9 +171,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
159171 while (! finished) {
160172 var response : Option [CachedStreamResponse [T ]] = None
161173
162- // Conditions for exiting the inner loop:
163- // 1. was detached from response observer
164- def detachedFromObserver = detached
174+ // Conditions for exiting the inner loop (and helpers to compute them):
175+ // 1. was interrupted
165176 // 2. has a response to send
166177 def gotResponse = response.nonEmpty
167178 // 3. sent everything from the stream and the stream is finished
@@ -170,24 +181,21 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
170181 def deadlineLimitReached =
171182 sentResponsesSize > maximumResponseSize || deadlineTimeMillis < System .currentTimeMillis()
172183
173- // Get next available response.
174- // Wait until either this sender got detached or next response is ready,
175- // or the stream is complete and it had already sent all responses.
176184 logTrace(s " Trying to get next response with index= $nextIndex. " )
177185 executionObserver.synchronized {
178186 logTrace(s " Acquired executionObserver lock. " )
179187 val sleepStart = System .nanoTime()
180188 var sleepEnd = 0L
181- while (! detachedFromObserver &&
189+ while (! interrupted &&
182190 ! gotResponse &&
183191 ! streamFinished &&
184192 ! deadlineLimitReached) {
185193 logTrace(s " Try to get response with index= $nextIndex from observer. " )
186194 response = executionObserver.consumeResponse(nextIndex)
187195 logTrace(s " Response index= $nextIndex from observer: ${response.isDefined}" )
188- // If response is empty, release executionObserver lock and wait to get notified.
189- // The state of detached , response and lastIndex are change under lock in
190- // executionObserver , and will notify upon state change.
196+ // If response is empty, release executionObserver monitor and wait to get notified.
197+ // The state of interrupted , response and lastIndex are changed under executionObserver
198+ // monitor , and will notify upon state change.
191199 if (response.isEmpty) {
192200 val timeout = Math .max(1 , deadlineTimeMillis - System .currentTimeMillis())
193201 logTrace(s " Wait for response to become available with timeout= $timeout ms. " )
@@ -197,7 +205,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
197205 }
198206 }
199207 logTrace(
200- s " Exiting loop: detached= $detached , " +
208+ s " Exiting loop: interrupted= $interrupted , " +
201209 s " response= ${response.map(r => ProtoUtils .abbreviate(r.response))}, " +
202210 s " lastIndex= ${executionObserver.getLastResponseIndex()}, " +
203211 s " deadline= ${deadlineLimitReached}" )
@@ -208,10 +216,8 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
208216 }
209217
210218 // Process the outcome of the inner loop.
211- if (detachedFromObserver) {
212- // This sender got detached by the observer.
213- // This only happens if this RPC is actually dead, and the client already came back with
214- // a ReattachExecute RPC. Kill this RPC.
219+ if (interrupted) {
220+ // This sender got interrupted. Kill this RPC.
215221 logWarning(
216222 s " Got detached from opId= ${executeHolder.operationId} at index ${nextIndex - 1 }. " +
217223 s " totalTime= ${System .nanoTime - startTime}ns " +
@@ -256,12 +262,11 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
256262 }
257263
258264 /**
259- * Send the response to the grpcCallObserver. In reattachable execution, we control the flow,
260- * and only pass the response to the grpcCallObserver when it's ready to send. Otherwise,
261- * grpcCallObserver.onNext() would return in a non-blocking way, but could queue responses
262- * without sending them if the client doesn't keep up receiving them. When pushing more
263- * responses to onNext(), there is no insight how far behind the service is in actually sending
264- * them out.
265+ * Send the response to the grpcCallObserver.
266+ *
267+ * In reattachable execution, we control the backpressure and only send when the
268+ * grpcCallObserver is in fact ready to send.
269+ *
265270 * @param deadlineTimeMillis
266271 * when reattachable, wait for ready stream until this deadline.
267272 * @return
@@ -278,12 +283,12 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
278283 grpcObserver.onNext(response.response)
279284 true
280285 } else {
281- // In reattachable execution, we control the flow , and only pass the response to the
286+ // In reattachable execution, we control the backpressure , and only pass the response to the
282287 // grpcCallObserver when it's ready to send.
283288 // Otherwise, grpcCallObserver.onNext() would return in a non-blocking way, but could queue
284289 // responses without sending them if the client doesn't keep up receiving them.
285290 // When pushing more responses to onNext(), there is no insight how far behind the service is
286- // in actually sending them out.
291+ // in actually sending them out. See https://github.com/grpc/grpc-java/issues/1549
287292 // By sending responses only when grpcCallObserver.isReady(), we control that the actual
288293 // sending doesn't fall behind what we push from here.
289294 // By using the deadline, we exit the RPC if the responses aren't picked up by the client.
@@ -295,15 +300,21 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
295300 logTrace(s " Acquired grpcCallObserverReadySignal lock. " )
296301 val sleepStart = System .nanoTime()
297302 var sleepEnd = 0L
298- while (! grpcCallObserver.isReady() && deadlineTimeMillis >= System .currentTimeMillis()) {
303+ // Conditions for exiting the inner loop
304+ // 1. was detached
305+ // 2. grpcCallObserver is ready to send more data
306+ // 3. time deadline is reached
307+ while (! interrupted &&
308+ ! grpcCallObserver.isReady() &&
309+ deadlineTimeMillis >= System .currentTimeMillis()) {
299310 val timeout = Math .max(1 , deadlineTimeMillis - System .currentTimeMillis())
300311 var sleepStart = System .nanoTime()
301312 logTrace(s " Wait for grpcCallObserver to become ready with timeout= $timeout ms. " )
302313 grpcCallObserverReadySignal.wait(timeout)
303314 logTrace(s " Reacquired grpcCallObserverReadySignal lock after waiting. " )
304315 sleepEnd = System .nanoTime()
305316 }
306- if (grpcCallObserver.isReady()) {
317+ if (! interrupted && grpcCallObserver.isReady()) {
307318 val sleepTime = if (sleepEnd > 0L ) sleepEnd - sleepStart else 0L
308319 logDebug(
309320 s " SEND opId= ${executeHolder.operationId} responseId= ${response.responseId} " +
@@ -313,7 +324,7 @@ private[connect] class ExecuteGrpcResponseSender[T <: Message](
313324 grpcCallObserver.onNext(response.response)
314325 true
315326 } else {
316- logTrace(s " grpcCallObserver is not ready, exiting. " )
327+ logTrace(s " exiting sendResponse without sending " )
317328 false
318329 }
319330 }
0 commit comments