Skip to content

Commit 42dfd27

Browse files
authored
HADOOP-16859: ABFS: Add unbuffer support to ABFS connector.
Contributed by Sahil Takiar
1 parent 7f8685f commit 42dfd27

File tree

5 files changed

+241
-21
lines changed

5 files changed

+241
-21
lines changed

hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/fs/contract/AbstractContractUnbufferTest.java

Lines changed: 49 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -18,12 +18,13 @@
1818

1919
package org.apache.hadoop.fs.contract;
2020

21-
import org.apache.hadoop.fs.FSDataInputStream;
22-
import org.apache.hadoop.fs.Path;
23-
2421
import org.junit.Test;
2522

2623
import java.io.IOException;
24+
import java.util.Arrays;
25+
26+
import org.apache.hadoop.fs.FSDataInputStream;
27+
import org.apache.hadoop.fs.Path;
2728

2829
import static org.apache.hadoop.fs.contract.ContractTestUtils.createFile;
2930
import static org.apache.hadoop.fs.contract.ContractTestUtils.dataset;
@@ -34,21 +35,22 @@
3435
public abstract class AbstractContractUnbufferTest extends AbstractFSContractTestBase {
3536

3637
private Path file;
38+
private byte[] fileBytes;
3739

3840
@Override
3941
public void setup() throws Exception {
4042
super.setup();
4143
skipIfUnsupported(SUPPORTS_UNBUFFER);
4244
file = path("unbufferFile");
43-
createFile(getFileSystem(), file, true,
44-
dataset(TEST_FILE_LEN, 0, 255));
45+
fileBytes = dataset(TEST_FILE_LEN, 0, 255);
46+
createFile(getFileSystem(), file, true, fileBytes);
4547
}
4648

4749
@Test
4850
public void testUnbufferAfterRead() throws IOException {
4951
describe("unbuffer a file after a single read");
5052
try (FSDataInputStream stream = getFileSystem().open(file)) {
51-
assertEquals(128, stream.read(new byte[128]));
53+
validateFullFileContents(stream);
5254
unbuffer(stream);
5355
}
5456
}
@@ -58,15 +60,14 @@ public void testUnbufferBeforeRead() throws IOException {
5860
describe("unbuffer a file before a read");
5961
try (FSDataInputStream stream = getFileSystem().open(file)) {
6062
unbuffer(stream);
61-
assertEquals(128, stream.read(new byte[128]));
63+
validateFullFileContents(stream);
6264
}
6365
}
6466

6567
@Test
6668
public void testUnbufferEmptyFile() throws IOException {
6769
Path emptyFile = path("emptyUnbufferFile");
68-
createFile(getFileSystem(), emptyFile, true,
69-
dataset(TEST_FILE_LEN, 0, 255));
70+
getFileSystem().create(emptyFile, true).close();
7071
describe("unbuffer an empty file");
7172
try (FSDataInputStream stream = getFileSystem().open(emptyFile)) {
7273
unbuffer(stream);
@@ -79,13 +80,15 @@ public void testUnbufferOnClosedFile() throws IOException {
7980
FSDataInputStream stream = null;
8081
try {
8182
stream = getFileSystem().open(file);
82-
assertEquals(128, stream.read(new byte[128]));
83+
validateFullFileContents(stream);
8384
} finally {
8485
if (stream != null) {
8586
stream.close();
8687
}
8788
}
88-
unbuffer(stream);
89+
if (stream != null) {
90+
unbuffer(stream);
91+
}
8992
}
9093

9194
@Test
@@ -94,32 +97,58 @@ public void testMultipleUnbuffers() throws IOException {
9497
try (FSDataInputStream stream = getFileSystem().open(file)) {
9598
unbuffer(stream);
9699
unbuffer(stream);
97-
assertEquals(128, stream.read(new byte[128]));
100+
validateFullFileContents(stream);
98101
unbuffer(stream);
99102
unbuffer(stream);
100103
}
101104
}
102105

103-
@Test
106+
@Test
104107
public void testUnbufferMultipleReads() throws IOException {
105108
describe("unbuffer a file multiple times");
106109
try (FSDataInputStream stream = getFileSystem().open(file)) {
107110
unbuffer(stream);
108-
assertEquals(128, stream.read(new byte[128]));
111+
validateFileContents(stream, TEST_FILE_LEN / 8, 0);
109112
unbuffer(stream);
110-
assertEquals(128, stream.read(new byte[128]));
111-
assertEquals(128, stream.read(new byte[128]));
113+
validateFileContents(stream, TEST_FILE_LEN / 8, TEST_FILE_LEN / 8);
114+
validateFileContents(stream, TEST_FILE_LEN / 4, TEST_FILE_LEN / 4);
112115
unbuffer(stream);
113-
assertEquals(128, stream.read(new byte[128]));
114-
assertEquals(128, stream.read(new byte[128]));
115-
assertEquals(128, stream.read(new byte[128]));
116+
validateFileContents(stream, TEST_FILE_LEN / 2, TEST_FILE_LEN / 2);
116117
unbuffer(stream);
118+
assertEquals("stream should be at end of file", TEST_FILE_LEN,
119+
stream.getPos());
117120
}
118121
}
119122

120123
private void unbuffer(FSDataInputStream stream) throws IOException {
121124
long pos = stream.getPos();
122125
stream.unbuffer();
123-
assertEquals(pos, stream.getPos());
126+
assertEquals("unbuffer unexpectedly changed the stream position", pos,
127+
stream.getPos());
128+
}
129+
130+
protected void validateFullFileContents(FSDataInputStream stream)
131+
throws IOException {
132+
validateFileContents(stream, TEST_FILE_LEN, 0);
133+
}
134+
135+
protected void validateFileContents(FSDataInputStream stream, int length,
136+
int startIndex)
137+
throws IOException {
138+
byte[] streamData = new byte[length];
139+
assertEquals("failed to read expected number of bytes from "
140+
+ "stream", length, stream.read(streamData));
141+
byte[] validateFileBytes;
142+
if (startIndex == 0 && length == fileBytes.length) {
143+
validateFileBytes = fileBytes;
144+
} else {
145+
validateFileBytes = Arrays.copyOfRange(fileBytes, startIndex,
146+
startIndex + length);
147+
}
148+
assertArrayEquals("invalid file contents", validateFileBytes, streamData);
149+
}
150+
151+
protected Path getFile() {
152+
return file;
124153
}
125154
}

hadoop-tools/hadoop-azure/src/main/java/org/apache/hadoop/fs/azurebfs/services/AbfsInputStream.java

Lines changed: 26 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -25,16 +25,22 @@
2525

2626
import com.google.common.base.Preconditions;
2727

28+
import org.apache.hadoop.fs.CanUnbuffer;
2829
import org.apache.hadoop.fs.FSExceptionMessages;
2930
import org.apache.hadoop.fs.FSInputStream;
3031
import org.apache.hadoop.fs.FileSystem.Statistics;
32+
import org.apache.hadoop.fs.StreamCapabilities;
3133
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AbfsRestOperationException;
3234
import org.apache.hadoop.fs.azurebfs.contracts.exceptions.AzureBlobFileSystemException;
3335

36+
import static org.apache.hadoop.util.StringUtils.toLowerCase;
37+
3438
/**
3539
* The AbfsInputStream for AbfsClient.
3640
*/
37-
public class AbfsInputStream extends FSInputStream {
41+
public class AbfsInputStream extends FSInputStream implements CanUnbuffer,
42+
StreamCapabilities {
43+
3844
private final AbfsClient client;
3945
private final Statistics statistics;
4046
private final String path;
@@ -390,4 +396,23 @@ public synchronized void reset() throws IOException {
390396
public boolean markSupported() {
391397
return false;
392398
}
399+
400+
@Override
401+
public synchronized void unbuffer() {
402+
buffer = null;
403+
// Preserve the original position returned by getPos()
404+
fCursor = fCursor - limit + bCursor;
405+
fCursorAfterLastRead = -1;
406+
bCursor = 0;
407+
limit = 0;
408+
}
409+
410+
@Override
411+
public boolean hasCapability(String capability) {
412+
return StreamCapabilities.UNBUFFER.equals(toLowerCase(capability));
413+
}
414+
415+
byte[] getBuffer() {
416+
return buffer;
417+
}
393418
}
Lines changed: 77 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,77 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.contract;
20+
21+
import java.io.IOException;
22+
23+
import org.apache.hadoop.conf.Configuration;
24+
import org.apache.hadoop.fs.FSDataInputStream;
25+
import org.apache.hadoop.fs.contract.AbstractContractUnbufferTest;
26+
27+
/**
28+
* Contract test for unbuffer operation.
29+
*/
30+
public class ITestAbfsContractUnbuffer extends AbstractContractUnbufferTest {
31+
private final boolean isSecure;
32+
private final ABFSContractTestBinding binding;
33+
34+
public ITestAbfsContractUnbuffer() throws Exception {
35+
binding = new ABFSContractTestBinding();
36+
this.isSecure = binding.isSecureMode();
37+
}
38+
39+
@Override
40+
public void setup() throws Exception {
41+
binding.setup();
42+
super.setup();
43+
}
44+
45+
@Override
46+
protected Configuration createConfiguration() {
47+
return binding.getRawConfiguration();
48+
}
49+
50+
@Override
51+
protected AbfsFileSystemContract createContract(Configuration conf) {
52+
return new AbfsFileSystemContract(conf, isSecure);
53+
}
54+
55+
/**
56+
* {@link org.apache.hadoop.fs.azurebfs.services.AbfsInputStream} does not
57+
* allow calling {@link org.apache.hadoop.fs.Seekable#getPos()} on a closed
58+
* stream, so this test needs to be overridden so that it does not call
59+
* getPos() after the stream has been closed.
60+
*/
61+
@Override
62+
public void testUnbufferOnClosedFile() throws IOException {
63+
describe("unbuffer a file before a read");
64+
FSDataInputStream stream = null;
65+
try {
66+
stream = getFileSystem().open(getFile());
67+
validateFullFileContents(stream);
68+
} finally {
69+
if (stream != null) {
70+
stream.close();
71+
}
72+
}
73+
if (stream != null) {
74+
stream.unbuffer();
75+
}
76+
}
77+
}
Lines changed: 84 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,84 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hadoop.fs.azurebfs.services;
20+
21+
import java.io.IOException;
22+
23+
import org.junit.Test;
24+
25+
import org.apache.hadoop.fs.FSDataInputStream;
26+
import org.apache.hadoop.fs.Path;
27+
import org.apache.hadoop.fs.azurebfs.AbstractAbfsIntegrationTest;
28+
import org.apache.hadoop.fs.contract.ContractTestUtils;
29+
30+
/**
31+
* Integration test for calling
32+
* {@link org.apache.hadoop.fs.CanUnbuffer#unbuffer} on {@link AbfsInputStream}.
33+
* Validates that the underlying stream's buffer is null.
34+
*/
35+
public class ITestAbfsUnbuffer extends AbstractAbfsIntegrationTest {
36+
37+
private Path dest;
38+
39+
public ITestAbfsUnbuffer() throws Exception {
40+
}
41+
42+
@Override
43+
public void setup() throws Exception {
44+
super.setup();
45+
dest = path("ITestAbfsUnbuffer");
46+
47+
byte[] data = ContractTestUtils.dataset(16, 'a', 26);
48+
ContractTestUtils.writeDataset(getFileSystem(), dest, data, data.length,
49+
16, true);
50+
}
51+
52+
@Test
53+
public void testUnbuffer() throws IOException {
54+
// Open file, read half the data, and then call unbuffer
55+
try (FSDataInputStream inputStream = getFileSystem().open(dest)) {
56+
assertTrue("unexpected stream type "
57+
+ inputStream.getWrappedStream().getClass().getSimpleName(),
58+
inputStream.getWrappedStream() instanceof AbfsInputStream);
59+
readAndAssertBytesRead(inputStream, 8);
60+
assertFalse("AbfsInputStream buffer should not be null",
61+
isBufferNull(inputStream));
62+
inputStream.unbuffer();
63+
64+
// Check the the underlying buffer is null
65+
assertTrue("AbfsInputStream buffer should be null",
66+
isBufferNull(inputStream));
67+
}
68+
}
69+
70+
private boolean isBufferNull(FSDataInputStream inputStream) {
71+
return ((AbfsInputStream) inputStream.getWrappedStream()).getBuffer() == null;
72+
}
73+
74+
/**
75+
* Read the specified number of bytes from the given
76+
* {@link FSDataInputStream} and assert that
77+
* {@link FSDataInputStream#read(byte[])} read the specified number of bytes.
78+
*/
79+
private static void readAndAssertBytesRead(FSDataInputStream inputStream,
80+
int bytesToRead) throws IOException {
81+
assertEquals("AbfsInputStream#read did not read the correct number of "
82+
+ "bytes", bytesToRead, inputStream.read(new byte[bytesToRead]));
83+
}
84+
}

hadoop-tools/hadoop-azure/src/test/resources/abfs.xml

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -61,4 +61,9 @@
6161
<name>fs.contract.supports-getfilestatus</name>
6262
<value>true</value>
6363
</property>
64+
65+
<property>
66+
<name>fs.contract.supports-unbuffer</name>
67+
<value>true</value>
68+
</property>
6469
</configuration>

0 commit comments

Comments
 (0)