Skip to content

Commit 16d7363

Browse files
committed
feat: introduce new BlobWriteSession
When writing a new Blob to GCS, there are secondary session related state and actions which can't be represented by WriteChannel. BlobWriteSession provides a new construct to allow retrieving the resultant object which is created after the WritableByteChannel is closed. Along with the new session, configuration for this is now performed at the StorageOptions level where cross session considerations can influence the implementation of the returned session. The configurable option present for this new StorageWriterConfig is chunkSize. In the future new configurations will be added with their corresponding options. For example, in a future release it will be possible to change from in memory buffering to instead buffer to disk thereby reducing heap usage.
1 parent 31ecab7 commit 16d7363

18 files changed

+533
-31
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.BetaApi;
21+
import java.io.IOException;
22+
import java.nio.channels.WritableByteChannel;
23+
24+
@BetaApi
25+
public interface BlobWriteSession {
26+
27+
WritableByteChannel open() throws IOException;
28+
29+
ApiFuture<BlobInfo> getResult();
30+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.ApiFuture;
20+
import java.io.IOException;
21+
import java.nio.channels.WritableByteChannel;
22+
23+
final class BlobWriteSessions {
24+
25+
private BlobWriteSessions() {}
26+
27+
static BlobWriteSession of(WritableByteChannelSession<?, BlobInfo> s) {
28+
return new WritableByteChannelSessionAdapter(s);
29+
}
30+
31+
static final class WritableByteChannelSessionAdapter implements BlobWriteSession {
32+
private final WritableByteChannelSession<?, BlobInfo> delegate;
33+
34+
private WritableByteChannelSessionAdapter(WritableByteChannelSession<?, BlobInfo> delegate) {
35+
this.delegate = delegate;
36+
}
37+
38+
@Override
39+
public WritableByteChannel open() throws IOException {
40+
return delegate.open();
41+
}
42+
43+
@Override
44+
public ApiFuture<BlobInfo> getResult() {
45+
return delegate.getResult();
46+
}
47+
}
48+
}
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.cloud.storage.TransportCompatibility.Transport;
20+
import java.util.Arrays;
21+
import java.util.stream.Collectors;
22+
23+
final class CrossTransportUtils {
24+
25+
static <T> T throwHttpJsonOnly(String methodName) {
26+
return throwHttpJsonOnly(Storage.class, methodName);
27+
}
28+
29+
static <T> T throwHttpJsonOnly(Class<?> clazz, String methodName) {
30+
return throwTransportOnly(clazz, methodName, Transport.HTTP);
31+
}
32+
33+
static <T> T throwGrpcOnly(String methodName) {
34+
return throwGrpcOnly(Storage.class, methodName);
35+
}
36+
37+
static <T> T throwGrpcOnly(Class<?> clazz, String methodName) {
38+
return throwTransportOnly(clazz, methodName, Transport.GRPC);
39+
}
40+
41+
static <T> T throwTransportOnly(Class<?> clazz, String methodName, Transport transport) {
42+
String builder;
43+
switch (transport) {
44+
case HTTP:
45+
builder = "StorageOptions.http()";
46+
break;
47+
case GRPC:
48+
builder = "StorageOptions.grpc()";
49+
break;
50+
default:
51+
throw new IllegalStateException(
52+
String.format("Broken Java Enum: %s received value: '%s'", Transport.class, transport));
53+
}
54+
String message =
55+
String.format(
56+
"%s#%s is only supported for %s transport. Please use %s to construct a compatible instance.",
57+
clazz.getName(), methodName, transport, builder);
58+
throw new UnsupportedOperationException(message);
59+
}
60+
61+
static String fmtMethodName(String name, Class<?>... args) {
62+
return name
63+
+ "("
64+
+ Arrays.stream(args).map(Class::getName).collect(Collectors.joining(", "))
65+
+ ")";
66+
}
67+
}
Lines changed: 125 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,125 @@
1+
/*
2+
* Copyright 2023 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.ApiFuture;
20+
import com.google.api.core.ApiFutures;
21+
import com.google.api.core.BetaApi;
22+
import com.google.api.core.InternalApi;
23+
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
24+
import com.google.cloud.storage.Conversions.Decoder;
25+
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
26+
import com.google.cloud.storage.UnifiedOpts.Opts;
27+
import com.google.common.base.Preconditions;
28+
import com.google.common.util.concurrent.MoreExecutors;
29+
import com.google.storage.v2.WriteObjectResponse;
30+
import java.nio.channels.WritableByteChannel;
31+
import java.time.Clock;
32+
import javax.annotation.concurrent.Immutable;
33+
34+
/**
35+
* Configure a writer which is logically equivalent to the following:
36+
*
37+
* <pre>{@code
38+
* Storage storage = ...;
39+
* WriteChannel writeChannel = storage.writer(BlobInfo, BlobWriteOption);
40+
* writeChannel.setChunkSize(chunkSize);
41+
* }</pre>
42+
*/
43+
@Immutable
44+
@BetaApi
45+
public final class DefaultStorageWriterConfig extends StorageWriterConfig {
46+
47+
private final int chunkSize;
48+
49+
@InternalApi
50+
DefaultStorageWriterConfig(int chunkSize) {
51+
this.chunkSize = chunkSize;
52+
}
53+
54+
public int getChunkSize() {
55+
return chunkSize;
56+
}
57+
58+
@BetaApi
59+
public DefaultStorageWriterConfig withChunkSize(int chunkSize) {
60+
Preconditions.checkArgument(
61+
chunkSize >= ByteSizeConstants._256KiB,
62+
"chunkSize must be >= %d",
63+
ByteSizeConstants._256KiB);
64+
return new DefaultStorageWriterConfig(chunkSize);
65+
}
66+
67+
@Override
68+
@InternalApi
69+
WriterFactory createFactory(Clock clock) {
70+
return new Factory(chunkSize);
71+
}
72+
73+
@InternalApi
74+
private static final class Factory implements WriterFactory {
75+
76+
private final int chunkSize;
77+
78+
private Factory(int chunkSize) {
79+
this.chunkSize = chunkSize;
80+
}
81+
82+
@InternalApi
83+
@Override
84+
public WritableByteChannelSession<?, BlobInfo> writeSession(
85+
StorageInternal s,
86+
BlobInfo info,
87+
Opts<ObjectTargetOpt> opts,
88+
Decoder<WriteObjectResponse, BlobInfo> d) {
89+
// todo: invert this
90+
// make GrpcBlobWriteChannel use this factory to produce its WriteSession
91+
if (s instanceof GrpcStorageImpl) {
92+
GrpcStorageImpl g = (GrpcStorageImpl) s;
93+
GrpcBlobWriteChannel writer = g.writer(info);
94+
writer.setChunkSize(chunkSize);
95+
WritableByteChannelSession<BufferedWritableByteChannel, WriteObjectResponse> session =
96+
writer.newLazyWriteChannel().getSession();
97+
return new DecoratedWritableByteChannelSession<>(session, d);
98+
}
99+
return CrossTransportUtils.throwGrpcOnly(DefaultStorageWriterConfig.class, "");
100+
}
101+
}
102+
103+
private static final class DecoratedWritableByteChannelSession<WBC extends WritableByteChannel, T>
104+
implements WritableByteChannelSession<WBC, BlobInfo> {
105+
106+
private final WritableByteChannelSession<WBC, T> delegate;
107+
private final Decoder<T, BlobInfo> d;
108+
109+
private DecoratedWritableByteChannelSession(
110+
WritableByteChannelSession<WBC, T> delegate, Decoder<T, BlobInfo> d) {
111+
this.delegate = delegate;
112+
this.d = d;
113+
}
114+
115+
@Override
116+
public ApiFuture<WBC> openAsync() {
117+
return delegate.openAsync();
118+
}
119+
120+
@Override
121+
public ApiFuture<BlobInfo> getResult() {
122+
return ApiFutures.transform(delegate.getResult(), d::decode, MoreExecutors.directExecutor());
123+
}
124+
}
125+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicCopyWriter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -87,6 +87,6 @@ public void copyChunk() {
8787

8888
@Override
8989
public RestorableState<CopyWriter> capture() {
90-
return GrpcStorageImpl.throwHttpJsonOnly(CopyWriter.class, "capture");
90+
return CrossTransportUtils.throwHttpJsonOnly(CopyWriter.class, "capture");
9191
}
9292
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobReadChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ final class GrpcBlobReadChannel extends BaseStorageReadChannel<Object> {
4343

4444
@Override
4545
public RestorableState<ReadChannel> capture() {
46-
return GrpcStorageImpl.throwHttpJsonOnly(ReadChannel.class, "capture");
46+
return CrossTransportUtils.throwHttpJsonOnly(ReadChannel.class, "capture");
4747
}
4848

4949
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcBlobWriteChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ final class GrpcBlobWriteChannel extends BaseStorageWriteChannel<WriteObjectResp
5050

5151
@Override
5252
public RestorableState<WriteChannel> capture() {
53-
return GrpcStorageImpl.throwHttpJsonOnly(WriteChannel.class, "capture");
53+
return CrossTransportUtils.throwHttpJsonOnly(WriteChannel.class, "capture");
5454
}
5555

5656
@Override

0 commit comments

Comments
 (0)