Skip to content

Commit 3a4b3f2

Browse files
authored
Merge pull request #17 from mmolimar/feature/agnostic_freader
New file reader: AgnosticFileReader
2 parents 1f93c45 + 999dfe5 commit 3a4b3f2

16 files changed

+324
-41
lines changed

docs/source/config_options.rst

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -276,3 +276,36 @@ In order to configure custom properties for this reader, the name you must use i
276276
* Type: string
277277
* Default: null
278278
* Importance: low
279+
280+
Agnostic
281+
--------------------------------------------
282+
283+
In order to configure custom properties for this reader, the name you must use is ``agnostic``.
284+
285+
``file_reader.agnostic.extensions.parquet``
286+
A comma-separated string list with the accepted extensions for Parquet files.
287+
288+
* Type: string
289+
* Default: parquet
290+
* Importance: medium
291+
292+
``file_reader.agnostic.extensions.avro``
293+
A comma-separated string list with the accepted extensions for Avro files.
294+
295+
* Type: string
296+
* Default: avro
297+
* Importance: medium
298+
299+
``file_reader.agnostic.extensions.sequence``
300+
A comma-separated string list with the accepted extensions for Sequence files.
301+
302+
* Type: string
303+
* Default: seq
304+
* Importance: medium
305+
306+
``file_reader.agnostic.extensions.delimited``
307+
A comma-separated string list with the accepted extensions for Delimited text files.
308+
309+
* Type: string
310+
* Default: tsv,csv
311+
* Importance: medium

docs/source/filereaders.rst

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,10 +8,12 @@ to Kafka is created by transforming the record by means of
88
`Confluent avro-converter <https://github.com/confluentinc/schema-registry/tree/master/avro-converter>`__
99
API.
1010

11+
More information about properties of this file reader :ref:`here<config_options-filereaders-avro>`.
12+
1113
Parquet
1214
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
1315

14-
Read files with `Parquet <https://parquet.apache.org/>`__ format.
16+
Reads files with `Parquet <https://parquet.apache.org/>`__ format.
1517

1618
The reader takes advantage of the Parquet-Avro API and uses the Parquet file
1719
as if it were an Avro file, so the message sent to Kafka is built in the same
@@ -22,6 +24,8 @@ way as the Avro file reader does.
2224
over and over again and has to seek the file, the performance
2325
can be affected.
2426

27+
More information about properties of this file reader :ref:`here<config_options-filereaders-parquet>`.
28+
2529
SequenceFile
2630
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
2731

@@ -32,8 +36,7 @@ This reader can process this file format and build a Kafka message with the
3236
key/value pair. These two values are named ``key`` and ``value`` in the message
3337
by default but you can customize these field names.
3438

35-
More information about properties of this file reader
36-
:ref:`here<config_options-filereaders-sequencefile>`.
39+
More information about properties of this file reader :ref:`here<config_options-filereaders-sequencefile>`.
3740

3841
Text
3942
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
@@ -44,6 +47,8 @@ Each line represents one record which will be in a field
4447
named ``value`` in the message sent to Kafka by default but you can
4548
customize these field names.
4649

50+
More information about properties of this file reader :ref:`here<config_options-filereaders-text>`.
51+
4752
Delimited text
4853
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
4954

@@ -56,3 +61,20 @@ Also, the token delimiter for columns is configurable.
5661

5762
More information about properties of this file reader :ref:`here<config_options-filereaders-delimited>`.
5863

64+
Agnostic
65+
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
66+
67+
Actually, this reader is a wrapper of the readers listing above.
68+
69+
It tries to read any kind of file format using an internal reader based on the file extension,
70+
applying the proper one (Parquet, Avro, SecuenceFile, Text or Delimited text). In case of no
71+
extension has been matched, the Text file reader will be applied.
72+
73+
Default extensions for each format:
74+
* Parquet: .parquet
75+
* Avro: .avro
76+
* SequenceFile: .seq
77+
* Delimited text: .tsv, .csv
78+
* Text: any other sort of file extension.
79+
80+
More information about properties of this file reader :ref:`here<config_options-filereaders-agnostic>`.

src/main/java/com/github/mmolimar/kafka/connect/fs/file/reader/AbstractFileReader.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,4 +46,8 @@ public final Struct next() {
4646

4747
protected abstract T nextRecord();
4848

49+
protected ReaderAdapter<T> getAdapter() {
50+
return adapter;
51+
}
52+
4953
}
Lines changed: 123 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,123 @@
1+
package com.github.mmolimar.kafka.connect.fs.file.reader;
2+
3+
import com.github.mmolimar.kafka.connect.fs.file.Offset;
4+
import com.github.mmolimar.kafka.connect.fs.util.ReflectionUtils;
5+
import org.apache.hadoop.fs.FileSystem;
6+
import org.apache.hadoop.fs.Path;
7+
import org.apache.kafka.connect.data.Struct;
8+
9+
import java.io.IOException;
10+
import java.util.Arrays;
11+
import java.util.List;
12+
import java.util.Map;
13+
14+
import static com.github.mmolimar.kafka.connect.fs.FsSourceTaskConfig.FILE_READER_PREFIX;
15+
16+
public class AgnosticFileReader extends AbstractFileReader<AgnosticFileReader.AgnosticRecord> {
17+
18+
private static final String FILE_READER_AGNOSTIC = FILE_READER_PREFIX + "agnostic.";
19+
private static final String FILE_READER_AGNOSTIC_EXTENSIONS = FILE_READER_AGNOSTIC + "extensions.";
20+
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET = FILE_READER_AGNOSTIC_EXTENSIONS + "parquet";
21+
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_AVRO = FILE_READER_AGNOSTIC_EXTENSIONS + "avro";
22+
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE = FILE_READER_AGNOSTIC_EXTENSIONS + "sequence";
23+
public static final String FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED = FILE_READER_AGNOSTIC_EXTENSIONS + "delimited";
24+
25+
private final AbstractFileReader reader;
26+
private List<String> parquetExtensions, avroExtensions, sequenceExtensions, delimitedExtensions;
27+
28+
public AgnosticFileReader(FileSystem fs, Path filePath, Map<String, Object> config) throws IOException {
29+
super(fs, filePath, new AgnosticAdapter(), config);
30+
31+
try {
32+
reader = (AbstractFileReader) readerByExtension(fs, filePath, config);
33+
} catch (RuntimeException | IOException e) {
34+
throw e;
35+
} catch (Throwable t) {
36+
throw new IOException("An error has ocurred when creating a concrete reader", t);
37+
}
38+
}
39+
40+
private FileReader readerByExtension(FileSystem fs, Path filePath, Map<String, Object> config)
41+
throws Throwable {
42+
int index = filePath.getName().lastIndexOf('.');
43+
String extension = index == -1 || index == filePath.getName().length() - 1 ? "" :
44+
filePath.getName().substring(index + 1).toLowerCase();
45+
46+
Class<? extends FileReader> clz;
47+
if (parquetExtensions.contains(extension)) {
48+
clz = ParquetFileReader.class;
49+
} else if (avroExtensions.contains(extension)) {
50+
clz = AvroFileReader.class;
51+
} else if (sequenceExtensions.contains(extension)) {
52+
clz = SequenceFileReader.class;
53+
} else if (delimitedExtensions.contains(extension)) {
54+
clz = DelimitedTextFileReader.class;
55+
} else {
56+
clz = TextFileReader.class;
57+
}
58+
59+
return ReflectionUtils.makeReader(clz, fs, filePath, config);
60+
}
61+
62+
@Override
63+
protected void configure(Map<String, Object> config) {
64+
this.parquetExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET) == null ?
65+
Arrays.asList("parquet") :
66+
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_PARQUET).toString().toLowerCase().split(","));
67+
this.avroExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO) == null ?
68+
Arrays.asList("avro") :
69+
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_AVRO).toString().toLowerCase().split(","));
70+
this.sequenceExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE) == null ?
71+
Arrays.asList("seq") :
72+
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_SEQUENCE).toString().toLowerCase().split(","));
73+
this.delimitedExtensions = config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED) == null ?
74+
Arrays.asList("tsv", "csv") :
75+
Arrays.asList(config.get(FILE_READER_AGNOSTIC_EXTENSIONS_DELIMITED).toString().toLowerCase().split(","));
76+
}
77+
78+
@Override
79+
public boolean hasNext() {
80+
return reader.hasNext();
81+
}
82+
83+
@Override
84+
public void seek(Offset offset) {
85+
reader.seek(offset);
86+
}
87+
88+
@Override
89+
public Offset currentOffset() {
90+
return reader.currentOffset();
91+
}
92+
93+
@Override
94+
public void close() throws IOException {
95+
reader.close();
96+
}
97+
98+
@Override
99+
protected AgnosticRecord nextRecord() {
100+
return new AgnosticRecord(reader.getAdapter(), reader.nextRecord());
101+
}
102+
103+
static class AgnosticAdapter implements ReaderAdapter<AgnosticRecord> {
104+
105+
public AgnosticAdapter() {
106+
}
107+
108+
@Override
109+
public Struct apply(AgnosticRecord ag) {
110+
return ag.adapter.apply(ag.record);
111+
}
112+
}
113+
114+
static class AgnosticRecord {
115+
private final ReaderAdapter<Object> adapter;
116+
private final Object record;
117+
118+
public AgnosticRecord(ReaderAdapter<Object> adapter, Object record) {
119+
this.adapter = adapter;
120+
this.record = record;
121+
}
122+
}
123+
}

src/main/java/com/github/mmolimar/kafka/connect/fs/util/ReflectionUtils.java

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,11 @@ public static Policy makePolicy(Class<? extends Policy> clazz, FsSourceTaskConfi
2525

2626
private static <T> T make(Class<T> clazz, Object... args) throws Throwable {
2727
try {
28-
if (args == null || args.length == 0) {
29-
return (T) clazz.getConstructor().newInstance();
30-
}
3128
Class[] constClasses = Arrays.stream(args).map(arg -> arg.getClass()).toArray(Class[]::new);
3229

3330
Constructor constructor = ConstructorUtils.getMatchingAccessibleConstructor(clazz, constClasses);
3431
return (T) constructor.newInstance(args);
35-
} catch (NoSuchMethodException |
36-
IllegalAccessException |
32+
} catch (IllegalAccessException |
3733
InstantiationException |
3834
InvocationTargetException e) {
3935
throw e.getCause();

src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/FileReaderTestBase.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,15 +68,15 @@ public void fileDoesNotExist() throws Throwable {
6868

6969
@Test(expected = IOException.class)
7070
public void emptyFile() throws Throwable {
71-
File tmp = File.createTempFile("test-", "");
71+
File tmp = File.createTempFile("test-", "." + getFileExtension());
7272
Path path = new Path(new Path(fsUri), tmp.getName());
7373
fs.moveFromLocalFile(new Path(tmp.getAbsolutePath()), path);
7474
getReader(fs, path, readerConfig);
7575
}
7676

7777
@Test(expected = IOException.class)
7878
public void invalidFileFormat() throws Throwable {
79-
File tmp = File.createTempFile("test-", "");
79+
File tmp = File.createTempFile("test-", "." + getFileExtension());
8080
try (BufferedWriter writer = new BufferedWriter(new FileWriter(tmp))) {
8181
writer.write("test");
8282
}
@@ -150,5 +150,6 @@ protected final FileReader getReader(FileSystem fs, Path path, Map<String, Objec
150150

151151
protected abstract void checkData(Struct record, long index);
152152

153+
protected abstract String getFileExtension();
153154

154155
}

src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/AvroFileReaderTest.java

Lines changed: 9 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs;
22

33
import com.github.mmolimar.kafka.connect.fs.file.Offset;
4+
import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader;
45
import com.github.mmolimar.kafka.connect.fs.file.reader.AvroFileReader;
56
import org.apache.avro.AvroTypeException;
67
import org.apache.avro.Schema;
@@ -29,19 +30,20 @@ public class AvroFileReaderTest extends HdfsFileReaderTestBase {
2930
private static final String FIELD_INDEX = "index";
3031
private static final String FIELD_NAME = "name";
3132
private static final String FIELD_SURNAME = "surname";
33+
private static final String FILE_EXTENSION = "avro";
3234

3335
private static Schema schema;
3436

3537
@BeforeClass
3638
public static void setUp() throws IOException {
3739
schema = new Schema.Parser().parse(AvroFileReaderTest.class.getResourceAsStream("/file/reader/schemas/people.avsc"));
38-
readerClass = AvroFileReader.class;
40+
readerClass = AgnosticFileReader.class;
3941
dataFile = createDataFile();
4042
readerConfig = new HashMap<>();
4143
}
4244

4345
private static Path createDataFile() throws IOException {
44-
File avroFile = File.createTempFile("test-", ".avro");
46+
File avroFile = File.createTempFile("test-", "." + FILE_EXTENSION);
4547
DatumWriter<GenericRecord> writer = new GenericDatumWriter<>(schema);
4648
try (DataFileWriter<GenericRecord> dataFileWriter = new DataFileWriter<>(writer)) {
4749
dataFileWriter.setFlushOnEveryBlock(true);
@@ -103,4 +105,9 @@ protected void checkData(Struct record, long index) {
103105
assertTrue(record.get(FIELD_NAME).toString().startsWith(index + "_"));
104106
assertTrue(record.get(FIELD_SURNAME).toString().startsWith(index + "_"));
105107
}
108+
109+
@Override
110+
protected String getFileExtension() {
111+
return FILE_EXTENSION;
112+
}
106113
}

src/test/java/com/github/mmolimar/kafka/connect/fs/file/reader/hdfs/DelimitedTextFileReaderTest.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
package com.github.mmolimar.kafka.connect.fs.file.reader.hdfs;
22

33
import com.github.mmolimar.kafka.connect.fs.file.Offset;
4+
import com.github.mmolimar.kafka.connect.fs.file.reader.AgnosticFileReader;
45
import com.github.mmolimar.kafka.connect.fs.file.reader.DelimitedTextFileReader;
56
import com.github.mmolimar.kafka.connect.fs.file.reader.FileReader;
67
import org.apache.hadoop.fs.FileSystem;
@@ -27,10 +28,11 @@ public class DelimitedTextFileReaderTest extends HdfsFileReaderTestBase {
2728
private static final String FIELD_COLUMN2 = "column_2";
2829
private static final String FIELD_COLUMN3 = "column_3";
2930
private static final String FIELD_COLUMN4 = "column_4";
31+
private static final String FILE_EXTENSION = "csv";
3032

3133
@BeforeClass
3234
public static void setUp() throws IOException {
33-
readerClass = DelimitedTextFileReader.class;
35+
readerClass = AgnosticFileReader.class;
3436
dataFile = createDataFile(true);
3537
readerConfig = new HashMap<String, Object>() {{
3638
put(DelimitedTextFileReader.FILE_READER_DELIMITED_TOKEN, ",");
@@ -39,7 +41,7 @@ public static void setUp() throws IOException {
3941
}
4042

4143
private static Path createDataFile(boolean header) throws IOException {
42-
File txtFile = File.createTempFile("test-", ".txt");
44+
File txtFile = File.createTempFile("test-", "." + FILE_EXTENSION);
4345
try (FileWriter writer = new FileWriter(txtFile)) {
4446

4547
if (header)
@@ -102,7 +104,7 @@ public void readAllDataWithoutHeader() throws Throwable {
102104

103105
@Test
104106
public void readAllDataWithMalformedRows() throws Throwable {
105-
File tmp = File.createTempFile("test-", "");
107+
File tmp = File.createTempFile("test-", "." + getFileExtension());
106108
try (FileWriter writer = new FileWriter(tmp)) {
107109
writer.append(FIELD_COLUMN1 + "," + FIELD_COLUMN2 + "," + FIELD_COLUMN3 + "," + FIELD_COLUMN4 + "\n");
108110
writer.append("dummy\n");
@@ -200,4 +202,9 @@ protected void checkData(Struct record, long index) {
200202
assertTrue(record.get(FIELD_COLUMN3).toString().startsWith(index + "_"));
201203
assertTrue(record.get(FIELD_COLUMN4).toString().startsWith(index + "_"));
202204
}
205+
206+
@Override
207+
protected String getFileExtension() {
208+
return FILE_EXTENSION;
209+
}
203210
}

0 commit comments

Comments
 (0)