Skip to content

Commit 8ef7125

Browse files
committed
Add ConcatenatingRecordReader
1 parent 92bd272 commit 8ef7125

File tree

3 files changed

+187
-1
lines changed

3 files changed

+187
-1
lines changed

datavec-api/src/main/java/org/datavec/api/records/reader/impl/ComposableRecordReader.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ public List<Writable> next() {
7272

7373
@Override
7474
public boolean hasNext() {
75-
Boolean readersHasNext = true;
75+
boolean readersHasNext = true;
7676
for (RecordReader reader : readers) {
7777
readersHasNext = readersHasNext && reader.hasNext();
7878
}
Lines changed: 138 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,138 @@
1+
/*
2+
* * Copyright 2017 Skymind, Inc.
3+
* *
4+
* * Licensed under the Apache License, Version 2.0 (the "License");
5+
* * you may not use this file except in compliance with the License.
6+
* * You may obtain a copy of the License at
7+
* *
8+
* * http://www.apache.org/licenses/LICENSE-2.0
9+
* *
10+
* * Unless required by applicable law or agreed to in writing, software
11+
* * distributed under the License is distributed on an "AS IS" BASIS,
12+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* * See the License for the specific language governing permissions and
14+
* * limitations under the License.
15+
*/
16+
17+
package org.datavec.api.records.reader.impl;
18+
19+
import org.datavec.api.conf.Configuration;
20+
import org.datavec.api.records.Record;
21+
import org.datavec.api.records.metadata.RecordMetaData;
22+
import org.datavec.api.records.reader.BaseRecordReader;
23+
import org.datavec.api.records.reader.RecordReader;
24+
import org.datavec.api.split.InputSplit;
25+
import org.datavec.api.writable.Writable;
26+
27+
import java.io.DataInputStream;
28+
import java.io.IOException;
29+
import java.net.URI;
30+
import java.util.ArrayList;
31+
import java.util.List;
32+
33+
/**
34+
* Combine multiple readers into a single reader. Records are read sequentially
35+
*/
36+
public class ConcatenatingRecordReader extends BaseRecordReader {
37+
38+
private RecordReader[] readers;
39+
40+
public ConcatenatingRecordReader(RecordReader... readers) {
41+
this.readers = readers;
42+
}
43+
44+
@Override
45+
public void initialize(InputSplit split) throws IOException, InterruptedException {
46+
47+
}
48+
49+
@Override
50+
public void initialize(Configuration conf, InputSplit split) throws IOException, InterruptedException {
51+
52+
}
53+
54+
@Override
55+
public List<Writable> next() {
56+
List<Writable> out = null;
57+
for( RecordReader rr : readers){
58+
if(rr.hasNext()){
59+
out = rr.next();
60+
break;
61+
}
62+
}
63+
invokeListeners(out);
64+
return out;
65+
}
66+
67+
@Override
68+
public boolean hasNext() {
69+
for (RecordReader reader : readers) {
70+
if(reader.hasNext()){
71+
return true;
72+
}
73+
}
74+
return false;
75+
}
76+
77+
@Override
78+
public List<String> getLabels() {
79+
return null;
80+
}
81+
82+
@Override
83+
public void close() throws IOException {
84+
for (RecordReader reader : readers)
85+
reader.close();
86+
}
87+
88+
@Override
89+
public void setConf(Configuration conf) {
90+
for (RecordReader reader : readers) {
91+
reader.setConf(conf);
92+
}
93+
}
94+
95+
@Override
96+
public Configuration getConf() {
97+
return readers[0].getConf();
98+
}
99+
100+
@Override
101+
public void reset() {
102+
for (RecordReader reader : readers)
103+
reader.reset();
104+
}
105+
106+
@Override
107+
public boolean resetSupported() {
108+
for(RecordReader rr : readers){
109+
if(!rr.resetSupported()){
110+
return false;
111+
}
112+
}
113+
return true;
114+
}
115+
116+
@Override
117+
public List<Writable> record(URI uri, DataInputStream dataInputStream) throws IOException {
118+
throw new UnsupportedOperationException(
119+
"Generating records from DataInputStream not supported for ComposableRecordReader");
120+
}
121+
122+
@Override
123+
public Record nextRecord() {
124+
return new org.datavec.api.records.impl.Record(next(), null);
125+
}
126+
127+
@Override
128+
public Record loadFromMetaData(RecordMetaData recordMetaData) throws IOException {
129+
throw new UnsupportedOperationException("Loading from metadata not yet implemented");
130+
}
131+
132+
@Override
133+
public List<Record> loadFromMetaData(List<RecordMetaData> recordMetaDatas) throws IOException {
134+
throw new UnsupportedOperationException("Loading from metadata not yet implemented");
135+
}
136+
137+
138+
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* * Copyright 2017 Skymind, Inc.
3+
* *
4+
* * Licensed under the Apache License, Version 2.0 (the "License");
5+
* * you may not use this file except in compliance with the License.
6+
* * You may obtain a copy of the License at
7+
* *
8+
* * http://www.apache.org/licenses/LICENSE-2.0
9+
* *
10+
* * Unless required by applicable law or agreed to in writing, software
11+
* * distributed under the License is distributed on an "AS IS" BASIS,
12+
* * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* * See the License for the specific language governing permissions and
14+
* * limitations under the License.
15+
*/
16+
17+
package org.datavec.api.records.reader.impl;
18+
19+
import org.datavec.api.records.reader.RecordReader;
20+
import org.datavec.api.records.reader.impl.csv.CSVRecordReader;
21+
import org.datavec.api.split.FileSplit;
22+
import org.datavec.api.util.ClassPathResource;
23+
import org.junit.Test;
24+
25+
import static org.junit.Assert.assertEquals;
26+
27+
public class TestConcatenatingRecordReader {
28+
29+
@Test
30+
public void test() throws Exception {
31+
32+
CSVRecordReader rr = new CSVRecordReader(0, ',');
33+
rr.initialize(new FileSplit(new ClassPathResource("iris.dat").getFile()));
34+
35+
CSVRecordReader rr2 = new CSVRecordReader(0, ',');
36+
rr2.initialize(new FileSplit(new ClassPathResource("iris.dat").getFile()));
37+
38+
RecordReader rrC = new ConcatenatingRecordReader(rr, rr2);
39+
40+
int count = 0;
41+
while(rrC.hasNext()){
42+
rrC.next();
43+
count++;
44+
}
45+
46+
assertEquals(300, count);
47+
}
48+
}

0 commit comments

Comments
 (0)