Skip to content

Commit a599c63

Browse files
authored
chore: ensure ParallelCompositeUploadBlobWriteSessionConfig is serializable (#2240)
1 parent f8f4e22 commit a599c63

File tree

3 files changed

+111
-36
lines changed

3 files changed

+111
-36
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadBlobWriteSessionConfig.java

Lines changed: 67 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818

1919
import static com.google.common.base.Preconditions.checkArgument;
2020
import static com.google.common.base.Preconditions.checkNotNull;
21+
import static java.util.Objects.requireNonNull;
2122

2223
import com.google.api.core.ApiFuture;
2324
import com.google.api.core.ApiFutures;
@@ -37,6 +38,8 @@
3738
import com.google.common.util.concurrent.ThreadFactoryBuilder;
3839
import com.google.storage.v2.WriteObjectResponse;
3940
import java.io.IOException;
41+
import java.io.ObjectOutputStream;
42+
import java.io.Serializable;
4043
import java.nio.charset.StandardCharsets;
4144
import java.security.SecureRandom;
4245
import java.time.Clock;
@@ -257,7 +260,8 @@ WriterFactory createFactory(Clock clock) throws IOException {
257260
*/
258261
@BetaApi
259262
@Immutable
260-
public abstract static class BufferStrategy extends Factory<BufferHandlePool> {
263+
public abstract static class BufferStrategy extends Factory<BufferHandlePool>
264+
implements Serializable {
261265

262266
private BufferStrategy() {}
263267

@@ -289,6 +293,7 @@ public static BufferStrategy fixedPool(int bufferCount, int bufferCapacity) {
289293
}
290294

291295
private static class SimpleBufferStrategy extends BufferStrategy {
296+
private static final long serialVersionUID = 8884826090481043434L;
292297

293298
private final int capacity;
294299

@@ -303,6 +308,7 @@ BufferHandlePool get() {
303308
}
304309

305310
private static class FixedBufferStrategy extends BufferStrategy {
311+
private static final long serialVersionUID = 3288902741819257066L;
306312

307313
private final int bufferCount;
308314
private final int bufferCapacity;
@@ -328,7 +334,7 @@ BufferHandlePool get() {
328334
*/
329335
@BetaApi
330336
@Immutable
331-
public abstract static class ExecutorSupplier extends Factory<Executor> {
337+
public abstract static class ExecutorSupplier extends Factory<Executor> implements Serializable {
332338
private static final AtomicInteger INSTANCE_COUNTER = new AtomicInteger(1);
333339

334340
private ExecutorSupplier() {}
@@ -341,13 +347,7 @@ private ExecutorSupplier() {}
341347
*/
342348
@BetaApi
343349
public static ExecutorSupplier cachedPool() {
344-
return new ExecutorSupplier() {
345-
@Override
346-
Executor get() {
347-
ThreadFactory threadFactory = newThreadFactory();
348-
return Executors.newCachedThreadPool(threadFactory);
349-
}
350-
};
350+
return new CachedSupplier();
351351
}
352352

353353
/**
@@ -359,13 +359,7 @@ Executor get() {
359359
*/
360360
@BetaApi
361361
public static ExecutorSupplier fixedPool(int poolSize) {
362-
return new ExecutorSupplier() {
363-
@Override
364-
Executor get() {
365-
ThreadFactory threadFactory = newThreadFactory();
366-
return Executors.newFixedThreadPool(poolSize, threadFactory);
367-
}
368-
};
362+
return new FixedSupplier(poolSize);
369363
}
370364

371365
/**
@@ -380,6 +374,7 @@ Executor get() {
380374
*/
381375
@BetaApi
382376
public static ExecutorSupplier useExecutor(Executor executor) {
377+
requireNonNull(executor, "executor must be non null");
383378
return new SuppliedExecutorSupplier(executor);
384379
}
385380

@@ -403,6 +398,36 @@ public SuppliedExecutorSupplier(Executor executor) {
403398
Executor get() {
404399
return executor;
405400
}
401+
402+
private void writeObject(ObjectOutputStream out) throws IOException {
403+
throw new java.io.InvalidClassException(this.getClass().getName() + "; Not serializable");
404+
}
405+
}
406+
407+
private static class CachedSupplier extends ExecutorSupplier implements Serializable {
408+
private static final long serialVersionUID = 7768210719775319260L;
409+
410+
@Override
411+
Executor get() {
412+
ThreadFactory threadFactory = newThreadFactory();
413+
return Executors.newCachedThreadPool(threadFactory);
414+
}
415+
}
416+
417+
private static class FixedSupplier extends ExecutorSupplier implements Serializable {
418+
private static final long serialVersionUID = 7771825977551614347L;
419+
420+
private final int poolSize;
421+
422+
public FixedSupplier(int poolSize) {
423+
this.poolSize = poolSize;
424+
}
425+
426+
@Override
427+
Executor get() {
428+
ThreadFactory threadFactory = newThreadFactory();
429+
return Executors.newFixedThreadPool(poolSize, threadFactory);
430+
}
406431
}
407432
}
408433

@@ -415,7 +440,8 @@ Executor get() {
415440
*/
416441
@BetaApi
417442
@Immutable
418-
public abstract static class PartNamingStrategy {
443+
public abstract static class PartNamingStrategy implements Serializable {
444+
private static final long serialVersionUID = 8343436026774231869L;
419445
private static final String FIELD_SEPARATOR = ";";
420446
private static final Encoder B64 = Base64.getUrlEncoder().withoutPadding();
421447
private static final HashFunction OBJECT_NAME_HASH_FUNCTION = Hashing.goodFastHash(128);
@@ -496,6 +522,7 @@ public static PartNamingStrategy prefix(String prefixPattern) {
496522
}
497523

498524
static final class WithPrefix extends PartNamingStrategy {
525+
private static final long serialVersionUID = 5709330763161570411L;
499526

500527
private final String prefix;
501528

@@ -518,6 +545,8 @@ protected String fmtFields(String randomKey, String nameDigest, String partRange
518545
}
519546

520547
static final class NoPrefix extends PartNamingStrategy {
548+
private static final long serialVersionUID = 5202415556658566017L;
549+
521550
public NoPrefix(SecureRandom rand) {
522551
super(rand);
523552
}
@@ -548,34 +577,35 @@ protected String fmtFields(String randomKey, String nameDigest, String partRange
548577
*/
549578
@BetaApi
550579
@Immutable
551-
public static class PartCleanupStrategy {
552-
private final boolean deleteParts;
553-
private final boolean deleteOnError;
554-
555-
private PartCleanupStrategy(boolean deleteParts, boolean deleteOnError) {
556-
this.deleteParts = deleteParts;
557-
this.deleteOnError = deleteOnError;
580+
public static class PartCleanupStrategy implements Serializable {
581+
private static final long serialVersionUID = -1434253614347199051L;
582+
private final boolean deletePartsOnSuccess;
583+
private final boolean deleteAllOnError;
584+
585+
private PartCleanupStrategy(boolean deletePartsOnSuccess, boolean deleteAllOnError) {
586+
this.deletePartsOnSuccess = deletePartsOnSuccess;
587+
this.deleteAllOnError = deleteAllOnError;
558588
}
559589

560-
boolean isDeleteParts() {
561-
return deleteParts;
590+
public boolean isDeletePartsOnSuccess() {
591+
return deletePartsOnSuccess;
562592
}
563593

564-
boolean isDeleteOnError() {
565-
return deleteOnError;
594+
public boolean isDeleteAllOnError() {
595+
return deleteAllOnError;
566596
}
567597

568598
/**
569-
* If an unrecoverable error is encountered, define whether to attempt to delete any object
570-
* parts already uploaded.
599+
* If an unrecoverable error is encountered, define whether to attempt to delete any objects
600+
* already uploaded.
571601
*
572602
* <p><i>Default:</i> {@code true}
573603
*
574604
* @since 2.28.0 This new api is in preview and is subject to breaking changes.
575605
*/
576606
@BetaApi
577-
PartCleanupStrategy withDeleteOnError(boolean deleteOnError) {
578-
return new PartCleanupStrategy(deleteParts, deleteOnError);
607+
PartCleanupStrategy withDeleteAllOnError(boolean deleteAllOnError) {
608+
return new PartCleanupStrategy(deletePartsOnSuccess, deleteAllOnError);
579609
}
580610

581611
/**
@@ -615,7 +645,11 @@ public static PartCleanupStrategy never() {
615645
}
616646
}
617647

618-
private abstract static class Factory<T> {
648+
private abstract static class Factory<T> implements Serializable {
649+
private static final long serialVersionUID = 271806144227661056L;
650+
651+
private Factory() {}
652+
619653
abstract T get();
620654
}
621655

google-cloud-storage/src/main/java/com/google/cloud/storage/ParallelCompositeUploadWritableByteChannel.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ public synchronized void close() throws IOException {
250250
},
251251
exec);
252252

253-
if (partCleanupStrategy.isDeleteOnError()) {
253+
if (partCleanupStrategy.isDeleteAllOnError()) {
254254
ApiFuture<BlobInfo> cleaningFuture =
255255
ApiFutures.catchingAsync(
256256
validatingTransform, Throwable.class, this::asyncCleanupAfterFailure, exec);
@@ -316,7 +316,7 @@ private void internalFlush(ByteBuffer buf) {
316316

317317
Throwable cause = e.getCause();
318318
BaseServiceException storageException;
319-
if (partCleanupStrategy.isDeleteOnError()) {
319+
if (partCleanupStrategy.isDeleteAllOnError()) {
320320
storageException = StorageException.coalesce(cause);
321321
ApiFuture<Object> cleanupFutures = asyncCleanupAfterFailure(storageException);
322322
// asynchronously fail the finalObject future
@@ -394,7 +394,7 @@ private BlobInfo compose(ImmutableList<BlobInfo> parts) {
394394
}
395395

396396
private ApiFuture<BlobInfo> cleanupParts(BlobInfo finalInfo) {
397-
if (!partCleanupStrategy.isDeleteParts()) {
397+
if (!partCleanupStrategy.isDeletePartsOnSuccess()) {
398398
return ApiFutures.immediateFuture(finalInfo);
399399
}
400400
List<ApiFuture<Boolean>> deletes =

google-cloud-storage/src/test/java/com/google/cloud/storage/SerializationTest.java

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.common.base.MoreObjects.firstNonNull;
2020
import static com.google.common.truth.Truth.assertThat;
2121
import static org.junit.Assert.assertEquals;
22+
import static org.junit.Assert.assertThrows;
2223

2324
import com.google.api.services.storage.model.StorageObject;
2425
import com.google.cloud.BaseSerializationTest;
@@ -32,6 +33,10 @@
3233
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
3334
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelV2State;
3435
import com.google.cloud.storage.BlobWriteChannelV2.BlobWriteChannelV2State;
36+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.BufferStrategy;
37+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.ExecutorSupplier;
38+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartCleanupStrategy;
39+
import com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig.PartNamingStrategy;
3540
import com.google.cloud.storage.Storage.BlobTargetOption;
3641
import com.google.cloud.storage.Storage.BucketField;
3742
import com.google.cloud.storage.Storage.ComposeRequest;
@@ -46,13 +51,15 @@
4651
import java.io.ByteArrayOutputStream;
4752
import java.io.IOException;
4853
import java.io.InputStream;
54+
import java.io.InvalidClassException;
4955
import java.io.ObjectInputStream;
5056
import java.io.ObjectOutputStream;
5157
import java.io.Serializable;
5258
import java.util.Base64;
5359
import java.util.Collections;
5460
import java.util.Map;
5561
import java.util.Properties;
62+
import java.util.concurrent.Executor;
5663
import org.junit.AfterClass;
5764
import org.junit.BeforeClass;
5865
import org.junit.Test;
@@ -368,4 +375,38 @@ public void testSerializableObjects() throws Exception {
368375
assertEquals(copy, copy);
369376
}
370377
}
378+
379+
@Test
380+
public void blobWriteSessionConfig_pcu() throws IOException, ClassNotFoundException {
381+
ParallelCompositeUploadBlobWriteSessionConfig pcu1 =
382+
BlobWriteSessionConfigs.parallelCompositeUpload();
383+
ParallelCompositeUploadBlobWriteSessionConfig pcu1copy = serializeAndDeserialize(pcu1);
384+
assertThat(pcu1copy).isNotNull();
385+
386+
ParallelCompositeUploadBlobWriteSessionConfig pcu2 =
387+
BlobWriteSessionConfigs.parallelCompositeUpload()
388+
.withBufferStrategy(BufferStrategy.fixedPool(1, 3))
389+
.withPartCleanupStrategy(PartCleanupStrategy.never())
390+
.withPartNamingStrategy(PartNamingStrategy.prefix("prefix"))
391+
.withExecutorSupplier(ExecutorSupplier.fixedPool(5));
392+
ParallelCompositeUploadBlobWriteSessionConfig pcu2copy = serializeAndDeserialize(pcu2);
393+
assertThat(pcu2copy).isNotNull();
394+
395+
InvalidClassException invalidClassException =
396+
assertThrows(
397+
InvalidClassException.class,
398+
() -> {
399+
Executor executor = command -> {};
400+
ParallelCompositeUploadBlobWriteSessionConfig pcu3 =
401+
BlobWriteSessionConfigs.parallelCompositeUpload()
402+
.withExecutorSupplier(ExecutorSupplier.useExecutor(executor));
403+
// executor is not serializable, this should throw an exception
404+
serializeAndDeserialize(pcu3);
405+
});
406+
407+
assertThat(invalidClassException)
408+
.hasMessageThat()
409+
.isEqualTo(
410+
"com.google.cloud.storage.ParallelCompositeUploadBlobWriteSessionConfig$ExecutorSupplier$SuppliedExecutorSupplier; Not serializable");
411+
}
371412
}

0 commit comments

Comments
 (0)