4
4
import java .io .InputStream ;
5
5
import java .nio .file .FileAlreadyExistsException ;
6
6
import java .nio .file .NoSuchFileException ;
7
- import java .security . PrivilegedAction ;
8
- import java .util .ArrayList ;
9
- import java .util .List ;
10
- import java .util .Map ;
7
+ import java .util .* ;
8
+ import java .util .concurrent . atomic . AtomicLong ;
9
+ import java .util .function . Function ;
10
+ import java .util .stream . Collectors ;
11
11
12
+ import com .qcloud .cos .exception .MultiObjectDeleteException ;
12
13
import org .apache .lucene .util .SetOnce ;
14
+ import org .elasticsearch .ExceptionsHelper ;
13
15
import org .elasticsearch .common .Nullable ;
14
16
import org .elasticsearch .common .Strings ;
15
- import org .elasticsearch .common .blobstore .BlobMetaData ;
16
- import org .elasticsearch .common .blobstore .BlobPath ;
17
- import org .elasticsearch .common .blobstore .BlobStoreException ;
17
+ import org .elasticsearch .common .blobstore .*;
18
18
import org .elasticsearch .common .blobstore .support .AbstractBlobContainer ;
19
19
import org .elasticsearch .common .blobstore .support .PlainBlobMetaData ;
20
- import org .elasticsearch .common .collect .MapBuilder ;
21
20
import org .elasticsearch .common .collect .Tuple ;
22
21
23
22
import com .qcloud .cos .exception .CosClientException ;
29
28
*/
30
29
public class COSBlobContainer extends AbstractBlobContainer {
31
30
31
+ //TODO: 找cos开发确认一次删除最大值
32
+ private static final int MAX_BULK_DELETES = 500 ;
32
33
protected final COSBlobStore blobStore ;
33
34
protected final String keyPath ;
34
35
@@ -38,7 +39,6 @@ public class COSBlobContainer extends AbstractBlobContainer {
38
39
this .keyPath = path .buildAsString ();
39
40
}
40
41
41
- @ Override
42
42
public boolean blobExists (String blobName ) {
43
43
try {
44
44
SocketAccess .doPrivileged (() ->
@@ -68,9 +68,14 @@ public InputStream readBlob(String blobName) throws IOException {
68
68
}
69
69
70
70
@ Override
71
- public void writeBlob (String blobName , InputStream inputStream , long blobSize ) throws IOException {
72
- if (blobExists (blobName )) {
73
- throw new FileAlreadyExistsException ("blob [" + blobName + "] already exists, cannot overwrite" );
71
+ public void writeBlob (String blobName , InputStream inputStream , long blobSize , boolean failIfAlreadyExists ) throws IOException {
72
+ //TODO: 确认COS这里的逻辑,是否上传同名的object会报错
73
+ if (failIfAlreadyExists ) {
74
+ if (blobExists (blobName )) {
75
+ throw new FileAlreadyExistsException ("blob [" + blobName + "] already exists, cannot overwrite" );
76
+ }
77
+ } else {
78
+ this .deleteBlob (blobName );
74
79
}
75
80
76
81
if (blobSize <= COSService .MAX_SINGLE_FILE_SIZE .getBytes ()) {
@@ -80,6 +85,11 @@ public void writeBlob(String blobName, InputStream inputStream, long blobSize) t
80
85
}
81
86
}
82
87
88
+ @ Override
89
+ public void writeBlobAtomic (String blobName , InputStream inputStream , long blobSize , boolean failIfAlreadyExists ) throws IOException {
90
+ writeBlob (blobName , inputStream , blobSize , failIfAlreadyExists );
91
+ }
92
+
83
93
void doSingleUpload (String blobName , InputStream inputStream , long blobSize ) throws IOException {
84
94
if (blobSize > COSService .MAX_SINGLE_FILE_SIZE .getBytes ()) {
85
95
throw new IllegalArgumentException ("Upload request size [" + blobSize + "] can't be larger than max single file size" );
@@ -182,64 +192,164 @@ public void deleteBlob(String blobName) throws IOException {
182
192
}
183
193
184
194
@ Override
185
- public void move (String sourceBlobName , String targetBlobName ) throws IOException {
195
+ public DeleteResult delete () throws IOException {
196
+ final AtomicLong deletedBlobs = new AtomicLong ();
197
+ final AtomicLong deletedBytes = new AtomicLong ();
186
198
try {
187
- SocketAccess .doPrivileged (() ->
188
- this .blobStore .client ().copyObject (
189
- blobStore .bucket (),
190
- buildKey (sourceBlobName ),
191
- blobStore .bucket (),
192
- buildKey (targetBlobName )));
193
- SocketAccess .doPrivilegedVoid (() ->
194
- this .blobStore .client ().deleteObject (blobStore .bucket (), buildKey (sourceBlobName )));
195
- } catch (CosClientException e ) {
196
- throw new IOException ("Exception when copy blob from " + sourceBlobName + " to " + targetBlobName , e );
197
- }
198
- }
199
-
200
- @ Override
201
- public Map <String , BlobMetaData > listBlobsByPrefix (@ Nullable String blobNamePrefix ) throws IOException {
202
- return SocketAccess .doPrivileged ((PrivilegedAction <Map <String , BlobMetaData >>) () -> {
203
- MapBuilder <String , BlobMetaData > blobsBuilder = MapBuilder .newMapBuilder ();
204
199
ObjectListing prevListing = null ;
205
-
206
- while (true ) {
200
+ while (true ) {
207
201
ObjectListing list ;
208
202
if (prevListing != null ) {
209
- list = blobStore .client ().listNextBatchOfObjects (prevListing );
203
+ final ObjectListing finalPrevListing = prevListing ;
204
+ list = SocketAccess .doPrivileged (() -> blobStore .client ().listNextBatchOfObjects (finalPrevListing ));
210
205
} else {
211
- if (blobNamePrefix != null ) {
212
- list = blobStore .client ().listObjects (blobStore .bucket (), buildKey (blobNamePrefix ));
213
- } else {
214
- list = blobStore .client ().listObjects (blobStore .bucket (), keyPath );
215
- }
216
- }
217
- for (COSObjectSummary summary : list .getObjectSummaries ()) {
218
- /* TODO: 需要联系cos-sdk修改
219
- * 这里cos-sdk-v5有一些问题
220
- * summary.getKey() 返回的path路径缺少开头的路径分隔符"/"
221
- * 导致substring后path被错误截断
222
- */
223
- String oriName = "/" + summary .getKey ();
224
- //String name = summary.getKey().substring(keyPath.length());
225
- String name = oriName .substring (keyPath .length ());
226
- blobsBuilder .put (name , new PlainBlobMetaData (name , summary .getSize ()));
206
+ final ListObjectsRequest listObjectsRequest = new ListObjectsRequest ();
207
+ listObjectsRequest .setBucketName (blobStore .bucket ());
208
+ listObjectsRequest .setPrefix (keyPath );
209
+ list = SocketAccess .doPrivileged (() -> blobStore .client ().listObjects (listObjectsRequest ));
227
210
}
211
+ final List <String > blobsToDelete = new ArrayList <>();
212
+ list .getObjectSummaries ().forEach (cosObjectSummary -> {
213
+ deletedBlobs .incrementAndGet ();
214
+ deletedBytes .incrementAndGet ();
215
+ blobsToDelete .add (cosObjectSummary .getKey ());
216
+ });
228
217
if (list .isTruncated ()) {
229
218
prevListing = list ;
230
219
} else {
220
+ final List <String > lastBlobToDelete = new ArrayList <>(blobsToDelete );
221
+ lastBlobToDelete .add (keyPath );
231
222
break ;
232
223
}
233
224
}
234
- return blobsBuilder .immutableMap ();
235
- });
225
+ } catch (CosClientException e ) {
226
+ throw new IOException ("Exception when deleting blob container [\" + keyPath + \" ]" + e );
227
+ }
228
+ return new DeleteResult (deletedBlobs .get (), deletedBytes .get ());
229
+ }
230
+
231
+ @ Override
232
+ public void deleteBlobsIgnoringIfNotExists (List <String > blobNames ) throws IOException {
233
+ doDeleteBlobs (blobNames , true );
234
+ }
235
+
236
+ private void doDeleteBlobs (List <String > blobNames , boolean relative ) throws IOException {
237
+ if (blobNames .isEmpty ()) {
238
+ return ;
239
+ }
240
+ final Set <String > outstanding ;
241
+ if (relative ) {
242
+ outstanding = blobNames .stream ().map (this ::buildKey ).collect (Collectors .toSet ());
243
+ } else {
244
+ outstanding = new HashSet <>(blobNames );
245
+ }
246
+ try {
247
+ final List <DeleteObjectsRequest > deleteRequests = new ArrayList <>();
248
+ final List <String > partition = new ArrayList <>();
249
+ for (String key : outstanding ) {
250
+ partition .add (key );
251
+ if (partition .size () == MAX_BULK_DELETES ) {
252
+ deleteRequests .add (bulkDelete (blobStore .bucket (), partition ));
253
+ partition .clear ();
254
+ }
255
+ }
256
+ if (partition .isEmpty () == false ) {
257
+ deleteRequests .add (bulkDelete (blobStore .bucket (), partition ));
258
+ }
259
+ SocketAccess .doPrivilegedVoid ( () -> {
260
+ CosClientException aex = null ;
261
+ for (DeleteObjectsRequest deleteRequest : deleteRequests ) {
262
+ List <String > keyInRequest = deleteRequest .getKeys ().stream ().map (DeleteObjectsRequest .KeyVersion ::getKey ).collect (Collectors .toList ());
263
+ try {
264
+ blobStore .client ().deleteObjects (deleteRequest );
265
+ outstanding .removeAll (keyInRequest );
266
+ } catch (MultiObjectDeleteException e ) {
267
+ outstanding .removeAll (keyInRequest );
268
+ outstanding .addAll (
269
+ e .getErrors ().stream ().map (MultiObjectDeleteException .DeleteError ::getKey ).collect (Collectors .toList ())
270
+ );
271
+ aex = ExceptionsHelper .useOrSuppress (aex , e );
272
+ } catch (CosClientException e ) {
273
+ aex = ExceptionsHelper .useOrSuppress (aex , e );
274
+ }
275
+ }
276
+ if (aex != null ) {
277
+ throw aex ;
278
+ }
279
+ });
280
+ } catch (Exception e ) {
281
+ throw new IOException ("Failed to delete blobs [" + outstanding + "]" , e );
282
+ }
283
+ assert outstanding .isEmpty ();
284
+ }
285
+
286
+ private static DeleteObjectsRequest bulkDelete (String bucket , List <String > blobs ) {
287
+ return new DeleteObjectsRequest (bucket ).withKeys (blobs .toArray (Strings .EMPTY_ARRAY )).withQuiet (true );
288
+ }
289
+
290
+
291
+ @ Override
292
+ public Map <String , BlobMetaData > listBlobsByPrefix (@ Nullable String blobNamePrefix ) throws IOException {
293
+ try {
294
+ return SocketAccess .doPrivileged ( () ->
295
+ executeListing (generateListObjectsRequest (blobNamePrefix == null ? keyPath : buildKey (blobNamePrefix )))
296
+ .stream ()
297
+ .flatMap (listing -> listing .getObjectSummaries ().stream ())
298
+ .map (summary -> new PlainBlobMetaData (summary .getKey ().substring (keyPath .length ()), summary .getSize ()))
299
+ .collect (Collectors .toMap (PlainBlobMetaData ::name , Function .identity ()))
300
+ );
301
+ } catch (CosClientException e ) {
302
+ throw new IOException ("Exception when listing blobs by prefix [" + blobNamePrefix + "]" , e );
303
+ }
304
+
236
305
}
237
306
238
307
@ Override
239
308
public Map <String , BlobMetaData > listBlobs () throws IOException {
240
309
return listBlobsByPrefix (null );
241
310
}
242
311
312
+ @ Override
313
+ public Map <String , BlobContainer > children () throws IOException {
314
+ try {
315
+ return executeListing (generateListObjectsRequest (keyPath )).stream ()
316
+ .flatMap (listing -> listing .getCommonPrefixes ().stream ())
317
+ .map (prefix -> prefix .substring (keyPath .length ()))
318
+ .filter (name -> name .isEmpty () == false )
319
+ .map (name -> name .substring (0 , name .length () - 1 ))
320
+ .collect (Collectors .toMap (Function .identity (), name -> blobStore .blobContainer (path ().add (name ))));
321
+ } catch (CosClientException e ) {
322
+ throw new IOException ("Exception when listing children of [" + path ().buildAsString () + ']' , e );
323
+ }
324
+ }
325
+
326
+ private List <ObjectListing > executeListing (ListObjectsRequest listObjectsRequest ) {
327
+ final List <ObjectListing > results = new ArrayList <>();
328
+ ObjectListing prevListing = null ;
329
+ while (true ) {
330
+ ObjectListing list ;
331
+ if (prevListing != null ) {
332
+ final ObjectListing finalPrevListing = prevListing ;
333
+ list = SocketAccess .doPrivileged (() -> blobStore .client ().listNextBatchOfObjects (finalPrevListing ));
334
+ } else {
335
+ list = SocketAccess .doPrivileged (() -> blobStore .client ().listObjects (listObjectsRequest ));
336
+ }
337
+ results .add (list );
338
+ if (list .isTruncated ()) {
339
+ prevListing = list ;
340
+ } else {
341
+ break ;
342
+ }
343
+ }
344
+ return results ;
345
+ }
346
+
347
+ private ListObjectsRequest generateListObjectsRequest (String keyPath ) {
348
+ ListObjectsRequest listObjectsRequest = new ListObjectsRequest ();
349
+ listObjectsRequest .withBucketName (blobStore .bucket ()).withPrefix (keyPath ).withDelimiter ("/" );
350
+ return listObjectsRequest ;
351
+ }
352
+
243
353
protected String buildKey (String blobName ) {
244
354
return keyPath + blobName ;
245
355
}
0 commit comments