Skip to content

Commit ebb81cc

Browse files
committed
HADOOP-9454. Support multipart uploads for s3native. Contributed by Jordan Mendelson and Akira AJISAKA.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1572235 13f79535-47bb-0310-9956-ffa450edef68
1 parent 48209ee commit ebb81cc

File tree

5 files changed

+281
-16
lines changed

5 files changed

+281
-16
lines changed

hadoop-common-project/hadoop-common/CHANGES.txt

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -343,6 +343,9 @@ Release 2.4.0 - UNRELEASED
343343
HADOOP-10348. Deprecate hadoop.ssl.configuration in branch-2, and remove
344344
it in trunk. (Haohui Mai via jing9)
345345

346+
HADOOP-9454. Support multipart uploads for s3native. (Jordan Mendelson and
347+
Akira AJISAKA via atm)
348+
346349
OPTIMIZATIONS
347350

348351
BUG FIXES

hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/s3native/Jets3tNativeFileSystemStore.java

Lines changed: 111 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,9 @@
2828
import java.io.IOException;
2929
import java.io.InputStream;
3030
import java.net.URI;
31+
import java.util.ArrayList;
32+
import java.util.Collections;
33+
import java.util.List;
3134

3235
import org.apache.commons.logging.Log;
3336
import org.apache.commons.logging.LogFactory;
@@ -41,17 +44,26 @@
4144
import org.jets3t.service.ServiceException;
4245
import org.jets3t.service.StorageObjectsChunk;
4346
import org.jets3t.service.impl.rest.httpclient.RestS3Service;
47+
import org.jets3t.service.model.MultipartPart;
48+
import org.jets3t.service.model.MultipartUpload;
4449
import org.jets3t.service.model.S3Bucket;
4550
import org.jets3t.service.model.S3Object;
4651
import org.jets3t.service.model.StorageObject;
4752
import org.jets3t.service.security.AWSCredentials;
53+
import org.jets3t.service.utils.MultipartUtils;
4854

4955
@InterfaceAudience.Private
5056
@InterfaceStability.Unstable
5157
class Jets3tNativeFileSystemStore implements NativeFileSystemStore {
5258

5359
private S3Service s3Service;
5460
private S3Bucket bucket;
61+
62+
private long multipartBlockSize;
63+
private boolean multipartEnabled;
64+
private long multipartCopyBlockSize;
65+
static final long MAX_PART_SIZE = (long)5 * 1024 * 1024 * 1024;
66+
5567
public static final Log LOG =
5668
LogFactory.getLog(Jets3tNativeFileSystemStore.class);
5769

@@ -67,13 +79,27 @@ public void initialize(URI uri, Configuration conf) throws IOException {
6779
} catch (S3ServiceException e) {
6880
handleS3ServiceException(e);
6981
}
82+
multipartEnabled =
83+
conf.getBoolean("fs.s3n.multipart.uploads.enabled", false);
84+
multipartBlockSize = Math.min(
85+
conf.getLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024),
86+
MAX_PART_SIZE);
87+
multipartCopyBlockSize = Math.min(
88+
conf.getLong("fs.s3n.multipart.copy.block.size", MAX_PART_SIZE),
89+
MAX_PART_SIZE);
90+
7091
bucket = new S3Bucket(uri.getHost());
7192
}
7293

7394
@Override
7495
public void storeFile(String key, File file, byte[] md5Hash)
7596
throws IOException {
76-
97+
98+
if (multipartEnabled && file.length() >= multipartBlockSize) {
99+
storeLargeFile(key, file, md5Hash);
100+
return;
101+
}
102+
77103
BufferedInputStream in = null;
78104
try {
79105
in = new BufferedInputStream(new FileInputStream(file));
@@ -98,6 +124,31 @@ public void storeFile(String key, File file, byte[] md5Hash)
98124
}
99125
}
100126

127+
public void storeLargeFile(String key, File file, byte[] md5Hash)
128+
throws IOException {
129+
S3Object object = new S3Object(key);
130+
object.setDataInputFile(file);
131+
object.setContentType("binary/octet-stream");
132+
object.setContentLength(file.length());
133+
if (md5Hash != null) {
134+
object.setMd5Hash(md5Hash);
135+
}
136+
137+
List<StorageObject> objectsToUploadAsMultipart =
138+
new ArrayList<StorageObject>();
139+
objectsToUploadAsMultipart.add(object);
140+
MultipartUtils mpUtils = new MultipartUtils(multipartBlockSize);
141+
142+
try {
143+
mpUtils.uploadObjects(bucket.getName(), s3Service,
144+
objectsToUploadAsMultipart, null);
145+
} catch (ServiceException e) {
146+
handleServiceException(e);
147+
} catch (Exception e) {
148+
throw new S3Exception(e);
149+
}
150+
}
151+
101152
@Override
102153
public void storeEmptyFile(String key) throws IOException {
103154
try {
@@ -152,11 +203,8 @@ public InputStream retrieve(String key) throws IOException {
152203
}
153204
S3Object object = s3Service.getObject(bucket.getName(), key);
154205
return object.getDataInputStream();
155-
} catch (S3ServiceException e) {
156-
handleS3ServiceException(key, e);
157-
return null; //never returned - keep compiler happy
158206
} catch (ServiceException e) {
159-
handleServiceException(e);
207+
handleServiceException(key, e);
160208
return null; //return null if key not found
161209
}
162210
}
@@ -180,11 +228,8 @@ public InputStream retrieve(String key, long byteRangeStart)
180228
S3Object object = s3Service.getObject(bucket, key, null, null, null,
181229
null, byteRangeStart, null);
182230
return object.getDataInputStream();
183-
} catch (S3ServiceException e) {
184-
handleS3ServiceException(key, e);
185-
return null; //never returned - keep compiler happy
186231
} catch (ServiceException e) {
187-
handleServiceException(e);
232+
handleServiceException(key, e);
188233
return null; //return null if key not found
189234
}
190235
}
@@ -244,8 +289,16 @@ public void delete(String key) throws IOException {
244289
LOG.debug("Deleting key:" + key + "from bucket" + bucket.getName());
245290
}
246291
s3Service.deleteObject(bucket, key);
247-
} catch (S3ServiceException e) {
248-
handleS3ServiceException(key, e);
292+
} catch (ServiceException e) {
293+
handleServiceException(key, e);
294+
}
295+
}
296+
297+
public void rename(String srcKey, String dstKey) throws IOException {
298+
try {
299+
s3Service.renameObject(bucket.getName(), srcKey, new S3Object(dstKey));
300+
} catch (ServiceException e) {
301+
handleServiceException(e);
249302
}
250303
}
251304

@@ -255,10 +308,52 @@ public void copy(String srcKey, String dstKey) throws IOException {
255308
if(LOG.isDebugEnabled()) {
256309
LOG.debug("Copying srcKey: " + srcKey + "to dstKey: " + dstKey + "in bucket: " + bucket.getName());
257310
}
311+
if (multipartEnabled) {
312+
S3Object object = s3Service.getObjectDetails(bucket, srcKey, null,
313+
null, null, null);
314+
if (multipartCopyBlockSize > 0 &&
315+
object.getContentLength() > multipartCopyBlockSize) {
316+
copyLargeFile(object, dstKey);
317+
return;
318+
}
319+
}
258320
s3Service.copyObject(bucket.getName(), srcKey, bucket.getName(),
259321
new S3Object(dstKey), false);
260-
} catch (S3ServiceException e) {
261-
handleS3ServiceException(srcKey, e);
322+
} catch (ServiceException e) {
323+
handleServiceException(srcKey, e);
324+
}
325+
}
326+
327+
public void copyLargeFile(S3Object srcObject, String dstKey) throws IOException {
328+
try {
329+
long partCount = srcObject.getContentLength() / multipartCopyBlockSize +
330+
(srcObject.getContentLength() % multipartCopyBlockSize > 0 ? 1 : 0);
331+
332+
MultipartUpload multipartUpload = s3Service.multipartStartUpload
333+
(bucket.getName(), dstKey, srcObject.getMetadataMap());
334+
335+
List<MultipartPart> listedParts = new ArrayList<MultipartPart>();
336+
for (int i = 0; i < partCount; i++) {
337+
long byteRangeStart = i * multipartCopyBlockSize;
338+
long byteLength;
339+
if (i < partCount - 1) {
340+
byteLength = multipartCopyBlockSize;
341+
} else {
342+
byteLength = srcObject.getContentLength() % multipartCopyBlockSize;
343+
if (byteLength == 0) {
344+
byteLength = multipartCopyBlockSize;
345+
}
346+
}
347+
348+
MultipartPart copiedPart = s3Service.multipartUploadPartCopy
349+
(multipartUpload, i + 1, bucket.getName(), srcObject.getKey(),
350+
null, null, null, null, byteRangeStart,
351+
byteRangeStart + byteLength - 1, null);
352+
listedParts.add(copiedPart);
353+
}
354+
355+
Collections.reverse(listedParts);
356+
s3Service.multipartCompleteUpload(multipartUpload, listedParts);
262357
} catch (ServiceException e) {
263358
handleServiceException(e);
264359
}
@@ -291,11 +386,11 @@ public void dump() throws IOException {
291386
System.out.println(sb);
292387
}
293388

294-
private void handleS3ServiceException(String key, S3ServiceException e) throws IOException {
295-
if ("NoSuchKey".equals(e.getS3ErrorCode())) {
389+
private void handleServiceException(String key, ServiceException e) throws IOException {
390+
if ("NoSuchKey".equals(e.getErrorCode())) {
296391
throw new FileNotFoundException("Key '" + key + "' does not exist in S3");
297392
} else {
298-
handleS3ServiceException(e);
393+
handleServiceException(e);
299394
}
300395
}
301396

hadoop-common-project/hadoop-common/src/main/resources/core-default.xml

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -532,6 +532,31 @@
532532
filesystem (s3n: URIs).</description>
533533
</property>
534534

535+
<property>
536+
<name>fs.s3n.multipart.uploads.enabled</name>
537+
<value>false</value>
538+
<description>Setting this property to true enables multiple uploads to
539+
native S3 filesystem. When uploading a file, it is split into blocks
540+
if the size is larger than fs.s3n.multipart.uploads.block.size.
541+
</description>
542+
</property>
543+
544+
<property>
545+
<name>fs.s3n.multipart.uploads.block.size</name>
546+
<value>67108864</value>
547+
<description>The block size for multipart uploads to native S3 filesystem.
548+
Default size is 64MB.
549+
</description>
550+
</property>
551+
552+
<property>
553+
<name>fs.s3n.multipart.copy.block.size</name>
554+
<value>5368709120</value>
555+
<description>The block size for multipart copy in native S3 filesystem.
556+
Default size is 5GB.
557+
</description>
558+
</property>
559+
535560
<property>
536561
<name>io.seqfile.compress.blocksize</name>
537562
<value>1000000</value>
Lines changed: 126 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,126 @@
1+
/**
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.s3native;
20+
21+
import org.apache.hadoop.conf.Configuration;
22+
import org.apache.hadoop.fs.Path;
23+
24+
import static org.junit.Assert.*;
25+
import static org.junit.Assume.*;
26+
27+
import org.junit.Before;
28+
import org.junit.After;
29+
import org.junit.BeforeClass;
30+
import org.junit.Test;
31+
32+
import java.io.BufferedInputStream;
33+
import java.io.BufferedOutputStream;
34+
import java.io.IOException;
35+
import java.io.InputStream;
36+
import java.io.OutputStream;
37+
import java.net.URI;
38+
import java.security.DigestInputStream;
39+
import java.security.DigestOutputStream;
40+
import java.security.MessageDigest;
41+
import java.security.NoSuchAlgorithmException;
42+
43+
44+
public class TestJets3tNativeFileSystemStore {
45+
private Configuration conf;
46+
private Jets3tNativeFileSystemStore store;
47+
private NativeS3FileSystem fs;
48+
49+
@Before
50+
public void setUp() throws Exception {
51+
conf = new Configuration();
52+
store = new Jets3tNativeFileSystemStore();
53+
fs = new NativeS3FileSystem(store);
54+
conf.setBoolean("fs.s3n.multipart.uploads.enabled", true);
55+
conf.setLong("fs.s3n.multipart.uploads.block.size", 64 * 1024 * 1024);
56+
fs.initialize(URI.create(conf.get("test.fs.s3n.name")), conf);
57+
}
58+
59+
@After
60+
public void tearDown() throws Exception {
61+
try {
62+
store.purge("test");
63+
} catch (Exception e) {}
64+
}
65+
66+
@BeforeClass
67+
public static void checkSettings() throws Exception {
68+
Configuration conf = new Configuration();
69+
assumeNotNull(conf.get("fs.s3n.awsAccessKeyId"));
70+
assumeNotNull(conf.get("fs.s3n.awsSecretAccessKey"));
71+
assumeNotNull(conf.get("test.fs.s3n.name"));
72+
}
73+
74+
protected void writeRenameReadCompare(Path path, long len)
75+
throws IOException, NoSuchAlgorithmException {
76+
// If len > fs.s3n.multipart.uploads.block.size,
77+
// we'll use a multipart upload copy
78+
MessageDigest digest = MessageDigest.getInstance("MD5");
79+
OutputStream out = new BufferedOutputStream(
80+
new DigestOutputStream(fs.create(path, false), digest));
81+
for (long i = 0; i < len; i++) {
82+
out.write('Q');
83+
}
84+
out.flush();
85+
out.close();
86+
87+
assertTrue("Exists", fs.exists(path));
88+
89+
// Depending on if this file is over 5 GB or not,
90+
// rename will cause a multipart upload copy
91+
Path copyPath = path.suffix(".copy");
92+
fs.rename(path, copyPath);
93+
94+
assertTrue("Copy exists", fs.exists(copyPath));
95+
96+
// Download file from S3 and compare the digest against the original
97+
MessageDigest digest2 = MessageDigest.getInstance("MD5");
98+
InputStream in = new BufferedInputStream(
99+
new DigestInputStream(fs.open(copyPath), digest2));
100+
long copyLen = 0;
101+
while (in.read() != -1) {copyLen++;}
102+
in.close();
103+
104+
assertEquals("Copy length matches original", len, copyLen);
105+
assertArrayEquals("Digests match", digest.digest(), digest2.digest());
106+
}
107+
108+
@Test
109+
public void testSmallUpload() throws IOException, NoSuchAlgorithmException {
110+
// Regular upload, regular copy
111+
writeRenameReadCompare(new Path("/test/small"), 16384);
112+
}
113+
114+
@Test
115+
public void testMediumUpload() throws IOException, NoSuchAlgorithmException {
116+
// Multipart upload, regular copy
117+
writeRenameReadCompare(new Path("/test/medium"), 33554432); // 100 MB
118+
}
119+
120+
@Test
121+
public void testExtraLargeUpload()
122+
throws IOException, NoSuchAlgorithmException {
123+
// Multipart upload, multipart copy
124+
writeRenameReadCompare(new Path("/test/xlarge"), 5368709121L); // 5GB+1byte
125+
}
126+
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
# Licensed under the Apache License, Version 2.0 (the "License");
2+
# you may not use this file except in compliance with the License.
3+
# You may obtain a copy of the License at
4+
#
5+
# http://www.apache.org/licenses/LICENSE-2.0
6+
#
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
# Speed up the s3native jets3t test
14+
15+
s3service.max-thread-count=10
16+
threaded-service.max-thread-count=10

0 commit comments

Comments
 (0)