1616
1717package com .google .cloud .storage ;
1818
19+ import static com .google .cloud .storage .GrpcUtils .contextWithBucketName ;
20+
1921import com .google .api .core .SettableApiFuture ;
22+ import com .google .api .gax .grpc .GrpcCallContext ;
23+ import com .google .api .gax .retrying .ResultRetryAlgorithm ;
24+ import com .google .api .gax .rpc .ApiStreamObserver ;
25+ import com .google .api .gax .rpc .BidiStreamingCallable ;
2026import com .google .cloud .storage .ChunkSegmenter .ChunkSegment ;
27+ import com .google .cloud .storage .Conversions .Decoder ;
2128import com .google .cloud .storage .Crc32cValue .Crc32cLengthKnown ;
29+ import com .google .cloud .storage .Retrying .RetryingDependencies ;
2230import com .google .cloud .storage .UnbufferedWritableByteChannelSession .UnbufferedWritableByteChannel ;
2331import com .google .common .annotations .VisibleForTesting ;
2432import com .google .protobuf .ByteString ;
3139import java .nio .channels .ClosedChannelException ;
3240import java .util .ArrayList ;
3341import java .util .List ;
42+ import java .util .concurrent .Semaphore ;
43+ import java .util .function .Supplier ;
3444import org .checkerframework .checker .nullness .qual .NonNull ;
3545
36- final class GapicBidiUnbufferedWritableByteChannel <
37- RequestFactoryT extends BidiWriteCtx .BidiWriteObjectRequestBuilderFactory >
38- implements UnbufferedWritableByteChannel {
39-
46+ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
47+ private final BidiStreamingCallable <BidiWriteObjectRequest , BidiWriteObjectResponse > write ;
48+ private final RetryingDependencies deps ;
49+ private final ResultRetryAlgorithm <?> alg ;
50+ private final String bucketName ;
51+ private final Supplier <GrpcCallContext > baseContextSupplier ;
4052 private final SettableApiFuture <BidiWriteObjectResponse > resultFuture ;
4153 private final ChunkSegmenter chunkSegmenter ;
4254
43- private final BidiWriteCtx <RequestFactoryT > writeCtx ;
44- private final WriteFlushStrategy . BidiFlusher flusher ;
55+ private final BidiWriteCtx <BidiResumableWrite > writeCtx ;
56+ private final BidiObserver responseObserver ;
4557
58+ private volatile ApiStreamObserver <BidiWriteObjectRequest > stream ;
4659 private boolean open = true ;
60+ private boolean first = true ;
4761 private boolean finished = false ;
4862
4963 GapicBidiUnbufferedWritableByteChannel (
64+ BidiStreamingCallable <BidiWriteObjectRequest , BidiWriteObjectResponse > write ,
65+ RetryingDependencies deps ,
66+ ResultRetryAlgorithm <?> alg ,
5067 SettableApiFuture <BidiWriteObjectResponse > resultFuture ,
5168 ChunkSegmenter chunkSegmenter ,
52- RequestFactoryT requestFactory ,
53- WriteFlushStrategy .BidiFlusherFactory flusherFactory ) {
69+ BidiResumableWrite requestFactory ,
70+ Supplier <GrpcCallContext > baseContextSupplier ) {
71+ this .write = write ;
72+ this .deps = deps ;
73+ this .alg = alg ;
74+ this .baseContextSupplier = baseContextSupplier ;
75+ this .bucketName = requestFactory .bucketName ();
5476 this .resultFuture = resultFuture ;
5577 this .chunkSegmenter = chunkSegmenter ;
5678
5779 this .writeCtx = new BidiWriteCtx <>(requestFactory );
58- this .flusher =
59- flusherFactory .newFlusher (
60- requestFactory .bucketName (), writeCtx .getConfirmedBytes ()::set , resultFuture ::set );
80+ this .responseObserver = new BidiObserver ();
6181 }
6282
6383 @ Override
6484 public long write (ByteBuffer [] srcs , int srcsOffset , int srcsLength ) throws IOException {
6585 return internalWrite (srcs , srcsOffset , srcsLength , false );
6686 }
6787
88+ @ Override
89+ public long writeAndClose (ByteBuffer [] srcs , int offset , int length ) throws IOException {
90+ long written = internalWrite (srcs , offset , length , true );
91+ close ();
92+ return written ;
93+ }
94+
6895 @ Override
6996 public boolean isOpen () {
7097 return open ;
7198 }
7299
73100 @ Override
74101 public void close () throws IOException {
102+ if (!open ) {
103+ return ;
104+ }
105+ ApiStreamObserver <BidiWriteObjectRequest > openedStream = openedStream ();
75106 if (!finished ) {
76107 BidiWriteObjectRequest message = finishMessage ();
77108 try {
78- flusher . close (message );
109+ openedStream . onNext (message );
79110 finished = true ;
111+ openedStream .onCompleted ();
80112 } catch (RuntimeException e ) {
81113 resultFuture .setException (e );
82114 throw e ;
83115 }
84116 } else {
85- flusher . close ( null );
117+ openedStream . onCompleted ( );
86118 }
119+ responseObserver .await ();
87120 open = false ;
88121 }
89122
90123 @ VisibleForTesting
91- BidiWriteCtx <RequestFactoryT > getWriteCtx () {
124+ BidiWriteCtx <BidiResumableWrite > getWriteCtx () {
92125 return writeCtx ;
93126 }
94127
@@ -130,7 +163,8 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
130163 finished = true ;
131164 }
132165
133- BidiWriteObjectRequest build = builder .build ();
166+ BidiWriteObjectRequest build = possiblyPairDownBidiRequest (builder , first ).build ();
167+ first = false ;
134168 messages .add (build );
135169 bytesConsumed += contentSize ;
136170 }
@@ -140,7 +174,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
140174 }
141175
142176 try {
143- flusher . flush (messages );
177+ flush (messages );
144178 } catch (RuntimeException e ) {
145179 resultFuture .setException (e );
146180 throw e ;
@@ -162,4 +196,123 @@ private BidiWriteObjectRequest finishMessage() {
162196 BidiWriteObjectRequest message = b .build ();
163197 return message ;
164198 }
199+
200+ private ApiStreamObserver <BidiWriteObjectRequest > openedStream () {
201+ if (stream == null ) {
202+ synchronized (this ) {
203+ if (stream == null ) {
204+ GrpcCallContext internalContext =
205+ contextWithBucketName (bucketName , baseContextSupplier .get ());
206+ stream =
207+ this .write
208+ .withDefaultCallContext (internalContext )
209+ .bidiStreamingCall (responseObserver );
210+ responseObserver .sem .drainPermits ();
211+ }
212+ }
213+ }
214+ return stream ;
215+ }
216+
217+ private void flush (@ NonNull List <BidiWriteObjectRequest > segments ) {
218+ Retrying .run (
219+ deps ,
220+ alg ,
221+ () -> {
222+ try {
223+ ApiStreamObserver <BidiWriteObjectRequest > opened = openedStream ();
224+ for (BidiWriteObjectRequest message : segments ) {
225+ opened .onNext (message );
226+ }
227+ if (!finished ) {
228+ BidiWriteObjectRequest message =
229+ BidiWriteObjectRequest .newBuilder ().setFlush (true ).setStateLookup (true ).build ();
230+ opened .onNext (message );
231+ }
232+ responseObserver .await ();
233+ return null ;
234+ } catch (Exception e ) {
235+ stream = null ;
236+ first = true ;
237+ throw e ;
238+ }
239+ },
240+ Decoder .identity ());
241+ }
242+
243+ private static BidiWriteObjectRequest .Builder possiblyPairDownBidiRequest (
244+ BidiWriteObjectRequest .Builder b , boolean firstMessageOfStream ) {
245+ if (firstMessageOfStream && b .getWriteOffset () == 0 ) {
246+ return b ;
247+ }
248+
249+ if (!firstMessageOfStream ) {
250+ b .clearUploadId ();
251+ }
252+
253+ if (b .getWriteOffset () > 0 ) {
254+ b .clearWriteObjectSpec ();
255+ }
256+
257+ if (b .getWriteOffset () > 0 && !b .getFinishWrite ()) {
258+ b .clearObjectChecksums ();
259+ }
260+ return b ;
261+ }
262+
263+ private class BidiObserver implements ApiStreamObserver <BidiWriteObjectResponse > {
264+
265+ private final Semaphore sem ;
266+ private volatile BidiWriteObjectResponse last ;
267+ private volatile RuntimeException previousError ;
268+
269+ private BidiObserver () {
270+ this .sem = new Semaphore (0 );
271+ }
272+
273+ @ Override
274+ public void onNext (BidiWriteObjectResponse value ) {
275+ // incremental update
276+ if (value .hasPersistedSize ()) {
277+ writeCtx .getConfirmedBytes ().set ((value .getPersistedSize ()));
278+ } else if (value .hasResource ()) {
279+ writeCtx .getConfirmedBytes ().set (value .getResource ().getSize ());
280+ }
281+ sem .release ();
282+ last = value ;
283+ }
284+
285+ @ Override
286+ public void onError (Throwable t ) {
287+ if (t instanceof RuntimeException ) {
288+ previousError = (RuntimeException ) t ;
289+ }
290+ sem .release ();
291+ }
292+
293+ @ Override
294+ public void onCompleted () {
295+ if (last != null && last .hasResource ()) {
296+ resultFuture .set (last );
297+ }
298+ sem .release ();
299+ }
300+
301+ void await () {
302+ try {
303+ sem .acquire ();
304+ } catch (InterruptedException e ) {
305+ if (e .getCause () instanceof RuntimeException ) {
306+ throw (RuntimeException ) e .getCause ();
307+ } else {
308+ throw new RuntimeException (e );
309+ }
310+ }
311+ RuntimeException err = previousError ;
312+ if (err != null ) {
313+ previousError = null ;
314+ throw err ;
315+ }
316+ }
317+ }
165318}
0 commit comments