@@ -46,12 +46,13 @@ public class OSMInputFile implements Sink, OSMInput {
46
46
private final InputStream bis ;
47
47
private final BlockingQueue <ReaderElement > itemQueue ;
48
48
private final Queue <ReaderElement > itemBatch ;
49
- Thread pbfReaderThread ;
50
49
private boolean eof ;
51
50
// for xml parsing
52
- private XMLStreamReader parser ;
51
+ private XMLStreamReader xmlParser ;
53
52
// for pbf parsing
54
53
private boolean binary = false ;
54
+ private PbfReader pbfReader ;
55
+ private Thread pbfReaderThread ;
55
56
private boolean hasIncomingData ;
56
57
private int workerThreads = -1 ;
57
58
private OSMFileHeader fileheader ;
@@ -138,17 +139,17 @@ private InputStream decode(File file) throws IOException {
138
139
private void openXMLStream (InputStream in )
139
140
throws XMLStreamException {
140
141
XMLInputFactory factory = XMLInputFactory .newInstance ();
141
- parser = factory .createXMLStreamReader (in , "UTF-8" );
142
+ xmlParser = factory .createXMLStreamReader (in , "UTF-8" );
142
143
143
- int event = parser .next ();
144
- if (event != XMLStreamConstants .START_ELEMENT || !parser .getLocalName ().equalsIgnoreCase ("osm" )) {
144
+ int event = xmlParser .next ();
145
+ if (event != XMLStreamConstants .START_ELEMENT || !xmlParser .getLocalName ().equalsIgnoreCase ("osm" )) {
145
146
throw new IllegalArgumentException ("File is not a valid OSM stream" );
146
147
}
147
148
// See https://wiki.openstreetmap.org/wiki/PBF_Format#Definition_of_the_OSMHeader_fileblock
148
- String timestamp = parser .getAttributeValue (null , "osmosis_replication_timestamp" );
149
+ String timestamp = xmlParser .getAttributeValue (null , "osmosis_replication_timestamp" );
149
150
150
151
if (timestamp == null )
151
- timestamp = parser .getAttributeValue (null , "timestamp" );
152
+ timestamp = xmlParser .getAttributeValue (null , "timestamp" );
152
153
153
154
if (timestamp != null ) {
154
155
try {
@@ -181,7 +182,7 @@ public ReaderElement getNext() throws XMLStreamException {
181
182
182
183
private ReaderElement getNextXML () throws XMLStreamException {
183
184
184
- int event = parser .next ();
185
+ int event = xmlParser .next ();
185
186
if (fileheader != null ) {
186
187
ReaderElement copyfileheader = fileheader ;
187
188
fileheader = null ;
@@ -190,32 +191,32 @@ private ReaderElement getNextXML() throws XMLStreamException {
190
191
191
192
while (event != XMLStreamConstants .END_DOCUMENT ) {
192
193
if (event == XMLStreamConstants .START_ELEMENT ) {
193
- String idStr = parser .getAttributeValue (null , "id" );
194
+ String idStr = xmlParser .getAttributeValue (null , "id" );
194
195
if (idStr != null ) {
195
- String name = parser .getLocalName ();
196
+ String name = xmlParser .getLocalName ();
196
197
long id = 0 ;
197
198
switch (name .charAt (0 )) {
198
199
case 'n' :
199
200
// note vs. node
200
201
if ("node" .equals (name )) {
201
202
id = Long .parseLong (idStr );
202
- return OSMXMLHelper .createNode (id , parser );
203
+ return OSMXMLHelper .createNode (id , xmlParser );
203
204
}
204
205
break ;
205
206
206
207
case 'w' : {
207
208
id = Long .parseLong (idStr );
208
- return OSMXMLHelper .createWay (id , parser );
209
+ return OSMXMLHelper .createWay (id , xmlParser );
209
210
}
210
211
case 'r' :
211
212
id = Long .parseLong (idStr );
212
- return OSMXMLHelper .createRelation (id , parser );
213
+ return OSMXMLHelper .createRelation (id , xmlParser );
213
214
}
214
215
}
215
216
}
216
- event = parser .next ();
217
+ event = xmlParser .next ();
217
218
}
218
- parser .close ();
219
+ xmlParser .close ();
219
220
return null ;
220
221
}
221
222
@@ -226,8 +227,10 @@ public boolean isEOF() {
226
227
@ Override
227
228
public void close () throws IOException {
228
229
try {
229
- if (!binary )
230
- parser .close ();
230
+ if (binary )
231
+ pbfReader .close ();
232
+ else
233
+ xmlParser .close ();
231
234
} catch (XMLStreamException ex ) {
232
235
throw new IOException (ex );
233
236
} finally {
@@ -244,8 +247,8 @@ private void openPBFReader(InputStream stream) {
244
247
if (workerThreads <= 0 )
245
248
workerThreads = 1 ;
246
249
247
- PbfReader reader = new PbfReader (stream , this , workerThreads );
248
- pbfReaderThread = new Thread (reader , "PBF Reader" );
250
+ pbfReader = new PbfReader (stream , this , workerThreads );
251
+ pbfReaderThread = new Thread (pbfReader , "PBF Reader" );
249
252
pbfReaderThread .start ();
250
253
}
251
254
@@ -273,7 +276,7 @@ private ReaderElement getNextPBF() {
273
276
if (!hasIncomingData && itemQueue .isEmpty ()) {
274
277
return null ; // signal EOF
275
278
}
276
-
279
+
277
280
if (itemQueue .drainTo (itemBatch , MAX_BATCH_SIZE ) == 0 ) {
278
281
try {
279
282
ReaderElement element = itemQueue .poll (100 , TimeUnit .MILLISECONDS );
@@ -286,7 +289,7 @@ private ReaderElement getNextPBF() {
286
289
}
287
290
}
288
291
}
289
-
292
+
290
293
return itemBatch .poll ();
291
294
}
292
295
}
0 commit comments