|
25 | 25 | import java.nio.file.Paths;
|
26 | 26 | import java.text.ParsePosition;
|
27 | 27 | import java.text.SimpleDateFormat;
|
| 28 | +import java.util.ArrayDeque; |
28 | 29 | import java.util.Date;
|
| 30 | +import java.util.Deque; |
29 | 31 | import java.util.Locale;
|
| 32 | +import java.util.concurrent.ArrayBlockingQueue; |
| 33 | +import java.util.concurrent.TimeUnit; |
| 34 | +import java.util.concurrent.atomic.AtomicBoolean; |
30 | 35 | import java.util.concurrent.atomic.AtomicInteger;
|
31 | 36 |
|
32 | 37 | import org.apache.lucene.analysis.standard.StandardAnalyzer;
|
|
40 | 45 | import org.apache.lucene.document.SortedDocValuesField;
|
41 | 46 | import org.apache.lucene.document.StringField;
|
42 | 47 | import org.apache.lucene.document.TextField;
|
43 |
| -import org.apache.lucene.index.DirectoryReader; |
44 | 48 | //import org.apache.lucene.index.IndexReader;
|
45 | 49 | import org.apache.lucene.index.IndexWriter;
|
46 | 50 | import org.apache.lucene.index.IndexWriterConfig;
|
47 |
| -import org.apache.lucene.index.NoMergePolicy; |
48 | 51 | import org.apache.lucene.index.IndexWriterConfig.OpenMode;
|
| 52 | +import org.apache.lucene.index.NoMergePolicy; |
49 | 53 | import org.apache.lucene.store.Directory;
|
50 | 54 | import org.apache.lucene.store.FSDirectory;
|
51 | 55 | import org.apache.lucene.util.BytesRef;
|
|
59 | 63 |
|
60 | 64 | public class IndexGeoNames {
|
61 | 65 |
|
| 66 | + private static final int BATCH_SIZE = 128; |
| 67 | + |
62 | 68 | final static boolean normal = false;
|
63 | 69 |
|
64 | 70 | public static void main(String[] args) throws Exception {
|
@@ -97,6 +103,9 @@ public static void main(String[] args) throws Exception {
|
97 | 103 | // With reuse it's ~ 38% faster (41.8 sec vs 67.0 sec):
|
98 | 104 | final boolean reuseDocAndFields = false;
|
99 | 105 |
|
| 106 | + final AtomicBoolean done = new AtomicBoolean(); |
| 107 | + final ArrayBlockingQueue<Deque<String>> workQueue = new ArrayBlockingQueue<>(1000); |
| 108 | + |
100 | 109 | for(int i=0;i<numThreads;i++) {
|
101 | 110 | threads[i] = new Thread() {
|
102 | 111 | @Override
|
@@ -137,11 +146,22 @@ public void run() {
|
137 | 146 | SortedDocValuesField tzDV = new SortedDocValuesField("timezone", new BytesRef());
|
138 | 147 | doc.add(tzDV);
|
139 | 148 |
|
| 149 | + Deque<String> batch = null; |
140 | 150 | while (true) {
|
141 | 151 | try {
|
142 | 152 |
|
143 |
| - // Curiously BufferedReader.readLine seems to be thread-safe... |
144 |
| - String line = reader.readLine(); |
| 153 | + if (batch == null || batch.isEmpty()) { |
| 154 | + batch = workQueue.poll(100, TimeUnit.MILLISECONDS); |
| 155 | + if (batch == null) { |
| 156 | + if (done.get()) { |
| 157 | + break; |
| 158 | + } else { |
| 159 | + continue; |
| 160 | + } |
| 161 | + } |
| 162 | + } |
| 163 | + |
| 164 | + String line = batch.poll(); |
145 | 165 | if (line == null) {
|
146 | 166 | break;
|
147 | 167 | }
|
@@ -211,11 +231,22 @@ public void run() {
|
211 | 231 | }
|
212 | 232 | }
|
213 | 233 | } else {
|
| 234 | + Deque<String> batch = null; |
214 | 235 | while (true) {
|
215 | 236 | try {
|
216 | 237 |
|
217 |
| - // Curiously BufferedReader.readLine seems to be thread-safe... |
218 |
| - String line = reader.readLine(); |
| 238 | + if (batch == null || batch.isEmpty()) { |
| 239 | + batch = workQueue.poll(100, TimeUnit.MILLISECONDS); |
| 240 | + if (batch == null) { |
| 241 | + if (done.get()) { |
| 242 | + break; |
| 243 | + } else { |
| 244 | + continue; |
| 245 | + } |
| 246 | + } |
| 247 | + } |
| 248 | + |
| 249 | + String line = batch.poll(); |
219 | 250 | if (line == null) {
|
220 | 251 | break;
|
221 | 252 | }
|
@@ -284,10 +315,27 @@ public void run() {
|
284 | 315 | }
|
285 | 316 | }
|
286 | 317 | };
|
287 |
| - threads[i].start(); |
288 | 318 | }
|
289 |
| - for(int i=0;i<numThreads;i++) { |
290 |
| - threads[i].join(); |
| 319 | + for (Thread thread : threads) { |
| 320 | + thread.start(); |
| 321 | + } |
| 322 | + Deque<String> batch = new ArrayDeque<>(); |
| 323 | + for (String line = reader.readLine(); ; line = reader.readLine()) { |
| 324 | + if (line == null) { |
| 325 | + if (batch.isEmpty() == false) { |
| 326 | + workQueue.put(batch); |
| 327 | + } |
| 328 | + break; |
| 329 | + } |
| 330 | + batch.add(line); |
| 331 | + if (batch.size() == BATCH_SIZE) { |
| 332 | + workQueue.put(batch); |
| 333 | + batch = new ArrayDeque<>(); |
| 334 | + } |
| 335 | + } |
| 336 | + done.set(true); |
| 337 | + for (Thread thread : threads) { |
| 338 | + thread.join(); |
291 | 339 | }
|
292 | 340 | long ms = System.currentTimeMillis();
|
293 | 341 | System.out.println(docsIndexed + ": " + ((ms - startMS)/1000.0) + " sec");
|
|
0 commit comments