Skip to content

Commit 044bcda

Browse files
author
Jay Carey
authored
Use AsyncWriterPool for Fastq writing. (broadinstitute#1645)
This adds the new AsyncWriterPool from htsjdk 2.24.0 for fastq writing. Additionally, it removes some of the write threading from the basecalls converters since this is now covered in the async writing and would cause thread thrashing if left as it was.
1 parent eee472f commit 044bcda

File tree

7 files changed

+194
-247
lines changed

7 files changed

+194
-247
lines changed

src/main/java/picard/illumina/BasecallsConverter.java

Lines changed: 29 additions & 120 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,21 @@
11
package picard.illumina;
22

3+
import htsjdk.io.AsyncWriterPool;
4+
import htsjdk.io.Writer;
35
import htsjdk.samtools.util.IOUtil;
4-
import htsjdk.samtools.util.Log;
5-
import htsjdk.samtools.util.ProgressLogger;
66
import picard.PicardException;
77
import picard.illumina.parser.ClusterData;
88
import picard.illumina.parser.IlluminaDataProviderFactory;
99
import picard.illumina.parser.IlluminaDataType;
1010
import picard.illumina.parser.ReadStructure;
1111
import picard.illumina.parser.readers.BclQualityEvaluationStrategy;
12-
import picard.util.ThreadPoolExecutorUtil;
1312
import picard.util.ThreadPoolExecutorWithExceptions;
1413

1514
import java.io.File;
16-
import java.time.Duration;
15+
import java.io.IOException;
1716
import java.util.*;
18-
import java.util.concurrent.TimeUnit;
1917
import java.util.regex.Pattern;
18+
import java.util.stream.Collectors;
2019

2120
/**
2221
* BasecallsConverter utilizes an underlying IlluminaDataProvider to convert parsed and decoded sequencing data
@@ -46,23 +45,16 @@ public abstract class BasecallsConverter<CLUSTER_OUTPUT_RECORD> {
4645
IlluminaDataType.QualityScores,
4746
IlluminaDataType.Position,
4847
IlluminaDataType.PF));
49-
protected static final Log log = Log.getInstance(UnsortedBasecallsConverter.class);
5048

5149
protected final IlluminaDataProviderFactory factory;
5250
protected final boolean demultiplex;
5351
protected final boolean ignoreUnexpectedBarcodes;
54-
protected final Map<String, ? extends ConvertedClusterDataWriter<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap;
52+
protected final Map<String, ? extends Writer<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap;
5553
protected final boolean includeNonPfReads;
56-
protected final int numThreads;
57-
protected final ProgressLogger readProgressLogger = new ProgressLogger(log, 1000000, "Read");
58-
protected final ProgressLogger writeProgressLogger = new ProgressLogger(log, 1000000, "Write");
59-
protected final Map<Integer, List<? extends Runnable>> completedWork = new HashMap<>();
60-
protected final ThreadPoolExecutorWithExceptions tileWriteExecutor;
61-
protected final ThreadPoolExecutorWithExceptions tileReadExecutor;
62-
protected final ThreadPoolExecutorWithExceptions completedWorkExecutor = new ThreadPoolExecutorWithExceptions(1);
54+
protected final AsyncWriterPool writerPool;
6355
protected ClusterDataConverter<CLUSTER_OUTPUT_RECORD> converter = null;
6456
protected List<Integer> tiles;
65-
protected boolean tileProcessingComplete = false;
57+
6658

6759
/**
6860
* Constructs a new BasecallsConverter object.
@@ -74,7 +66,6 @@ public abstract class BasecallsConverter<CLUSTER_OUTPUT_RECORD> {
7466
* @param barcodeRecordWriterMap Map from barcode to CLUSTER_OUTPUT_RECORD writer. If demultiplex is false, must contain
7567
* one writer stored with key=null.
7668
* @param demultiplex If true, output is split by barcode, otherwise all are written to the same output stream.
77-
* @param numThreads Controls number of threads.
7869
* @param firstTile (For debugging) If non-null, start processing at this tile.
7970
* @param tileLimit (For debugging) If non-null, process no more than this many tiles.
8071
* @param bclQualityEvaluationStrategy The basecall quality evaluation strategy that is applyed to decoded base calls.
@@ -88,35 +79,28 @@ public BasecallsConverter(
8879
final File barcodesDir,
8980
final int lane,
9081
final ReadStructure readStructure,
91-
final Map<String, ? extends ConvertedClusterDataWriter<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap,
82+
final Map<String, ? extends Writer<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap,
9283
final boolean demultiplex,
93-
final int numThreads,
9484
final Integer firstTile,
9585
final Integer tileLimit,
9686
final BclQualityEvaluationStrategy bclQualityEvaluationStrategy,
9787
final boolean ignoreUnexpectedBarcodes,
9888
final boolean applyEamssFiltering,
9989
final boolean includeNonPfReads,
100-
final int numWriteThreads
90+
final AsyncWriterPool writerPool
10191
) {
10292
this.barcodeRecordWriterMap = barcodeRecordWriterMap;
10393
this.ignoreUnexpectedBarcodes = ignoreUnexpectedBarcodes;
10494
this.demultiplex = demultiplex;
105-
this.numThreads = numThreads;
10695

96+
this.writerPool = writerPool;
10797
this.factory = new IlluminaDataProviderFactory(basecallsDir,
10898
barcodesDir, lane, readStructure, bclQualityEvaluationStrategy, getDataTypesFromReadStructure(readStructure, demultiplex));
10999
this.factory.setApplyEamssFiltering(applyEamssFiltering);
110100
this.includeNonPfReads = includeNonPfReads;
111101
this.tiles = factory.getAvailableTiles();
112102
tiles.sort(TILE_NUMBER_COMPARATOR);
113103
setTileLimits(firstTile, tileLimit);
114-
tileWriteExecutor = new ThreadPoolExecutorWithExceptions(numWriteThreads);
115-
tileWriteExecutor.setKeepAliveTime(500, TimeUnit.MILLISECONDS);
116-
tileReadExecutor = new ThreadPoolExecutorWithExceptions(numThreads);
117-
final CompletedWorkChecker workChecker = new CompletedWorkChecker();
118-
completedWorkExecutor.submit(workChecker);
119-
completedWorkExecutor.shutdown();
120104
}
121105

122106
/**
@@ -125,7 +109,22 @@ public BasecallsConverter(
125109
* @param barcodes The barcodes used optionally for demultiplexing. Must contain at least a single null value if
126110
* no demultiplexing is being done.
127111
*/
128-
public abstract void processTilesAndWritePerSampleOutputs(final Set<String> barcodes);
112+
public abstract void processTilesAndWritePerSampleOutputs(final Set<String> barcodes) throws IOException;
113+
114+
/**
115+
* Closes all writers. If an AsycnWriterPool is used call close on that, otherwise iterate each writer and close it.
116+
*
117+
* @throws IOException throw if there is an error closing the writer.
118+
*/
119+
public void closeWriters() throws IOException {
120+
if (writerPool != null) {
121+
writerPool.close();
122+
} else {
123+
for (Writer<CLUSTER_OUTPUT_RECORD> writer : barcodeRecordWriterMap.values()) {
124+
writer.close();
125+
}
126+
}
127+
}
129128

130129
/**
131130
* Interface that defines a converter that takes ClusterData and returns OUTPUT_RECORD type objects.
@@ -144,7 +143,7 @@ protected interface ClusterDataConverter<OUTPUT_RECORD> {
144143
*
145144
* @param <OUTPUT_RECORD> The recode type to convert to.
146145
*/
147-
protected interface ConvertedClusterDataWriter<OUTPUT_RECORD> {
146+
protected interface ConvertedClusterDataWriter<OUTPUT_RECORD> extends Writer<OUTPUT_RECORD> {
148147
/**
149148
* Write out a single record of type OUTPUT_RECORD.
150149
*
@@ -158,97 +157,6 @@ protected interface ConvertedClusterDataWriter<OUTPUT_RECORD> {
158157
void close();
159158
}
160159

161-
protected void awaitTileProcessingCompletion() {
162-
tileReadExecutor.shutdown();
163-
// Wait for all the read threads to complete before checking for errors
164-
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Reading executor", tileReadExecutor, Duration.ofMinutes(5));
165-
tileProcessingComplete = true;
166-
167-
try {
168-
// Check for reading errors
169-
if (tileReadExecutor.hasError()) {
170-
interruptAndShutdownExecutors(tileReadExecutor, completedWorkExecutor, tileWriteExecutor);
171-
}
172-
173-
synchronized (completedWork) {
174-
log.debug("Final notification of work complete.");
175-
completedWork.notifyAll();
176-
}
177-
178-
// Wait for tile processing synchronization to complete
179-
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Tile completion executor", completedWorkExecutor, Duration.ofMinutes(5));
180-
181-
// Check for tile work synchronization errors
182-
if (completedWorkExecutor.hasError()) {
183-
interruptAndShutdownExecutors(tileReadExecutor, completedWorkExecutor, tileWriteExecutor);
184-
}
185-
} finally {
186-
// We are all done scheduling work. Now close the writers.
187-
barcodeRecordWriterMap.values().forEach(ConvertedClusterDataWriter::close);
188-
}
189-
}
190-
191-
protected void notifyWorkComplete(int tileNum, List<? extends Runnable> pumpList) {
192-
synchronized (completedWork) {
193-
log.debug("Notifying completed work. Tile: " + tileNum);
194-
completedWork.put(tileNum, pumpList);
195-
completedWork.notifyAll();
196-
}
197-
}
198-
199-
/**
200-
* CompletedWorkChecker is notified by the TileProcessor threads as work on a tile is complete and the
201-
* records are ready for writing. It also ensures that tiles are written out in the proper order according
202-
* by keep track of the current tile index in the sorted list of all tiles to be processed.
203-
* <p>
204-
* If a tile is finished and it is next in line to be written the CompletedWorkChecker thread will call
205-
* writeRecords on the SortedRecordToWriterPump.
206-
*/
207-
protected class CompletedWorkChecker implements Runnable {
208-
private int currentTileIndex = 0;
209-
210-
@Override
211-
public void run() {
212-
try {
213-
checkCompletedWork();
214-
} catch (InterruptedException e) {
215-
e.printStackTrace();
216-
}
217-
}
218-
219-
private void checkCompletedWork() throws InterruptedException {
220-
synchronized (completedWork) {
221-
while (currentTileIndex < tiles.size()) {
222-
// Wait only if tile processing is still occurring
223-
if (!tileProcessingComplete) {
224-
log.debug("Waiting for completed work.");
225-
completedWork.wait();
226-
}
227-
final Integer currentTile = tiles.get(currentTileIndex);
228-
if (completedWork.containsKey(currentTile)) {
229-
if (tileWriteExecutor.getQueue().size() == 0
230-
&& tileWriteExecutor.getActiveCount() == 0
231-
&& tileWriteExecutor.getTaskCount() == tileWriteExecutor.getCompletedTaskCount()) {
232-
// tileWriteExecutor will report 0 active workers even though the worker is still tidying up
233-
// so we add a small sleep to ensure it is finished before moving on to the next tile
234-
Thread.sleep(100);
235-
log.debug("Writing out tile. Tile: " + currentTile);
236-
completedWork.get(currentTile).forEach(tileWriteExecutor::submit);
237-
currentTileIndex++;
238-
}
239-
}
240-
}
241-
tileWriteExecutor.shutdown();
242-
ThreadPoolExecutorUtil.awaitThreadPoolTermination("Tile completion executor", tileWriteExecutor, Duration.ofMinutes(5));
243-
244-
// Check for writing errors
245-
if (tileWriteExecutor.hasError()) {
246-
interruptAndShutdownExecutors(tileReadExecutor, completedWorkExecutor, tileWriteExecutor);
247-
}
248-
}
249-
}
250-
}
251-
252160
/**
253161
* A comparator used to sort Illumina tiles in their proper order.
254162
* Because the tile number is followed by a colon, a tile number that is a prefix of another tile number
@@ -341,7 +249,8 @@ protected void setTileLimits(final Integer firstTile, final Integer tileLimit) {
341249

342250
protected void interruptAndShutdownExecutors(ThreadPoolExecutorWithExceptions... executors) {
343251
int tasksRunning = Arrays.stream(executors).mapToInt(test -> test.shutdownNow().size()).sum();
252+
String errorMessages = Arrays.stream(executors).map(e -> e.exception.toString()).collect(Collectors.joining(","));
344253
throw new PicardException("Exceptions in tile processing. There were " + tasksRunning
345-
+ " tasks were still running or queued and have been cancelled.");
254+
+ " tasks were still running or queued and have been cancelled. Errors: " + errorMessages);
346255
}
347256
}

src/main/java/picard/illumina/BasecallsConverterBuilder.java

Lines changed: 20 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,7 @@
11
package picard.illumina;
22

3+
import htsjdk.io.AsyncWriterPool;
4+
import htsjdk.io.Writer;
35
import htsjdk.samtools.SAMFileWriterImpl;
46
import htsjdk.samtools.util.SortingCollection;
57
import picard.PicardException;
@@ -22,21 +24,22 @@ public class BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> {
2224
private final File basecallsDir;
2325
private final int lane;
2426
private final ReadStructure readStructure;
25-
private final Map<String, ? extends BasecallsConverter.ConvertedClusterDataWriter<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap;
27+
private final Map<String, ? extends Writer<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap;
2628
private Comparator<CLUSTER_OUTPUT_RECORD> outputRecordComparator;
2729
private SortingCollection.Codec<CLUSTER_OUTPUT_RECORD> codecPrototype;
2830
private Class<CLUSTER_OUTPUT_RECORD> outputRecordClass;
2931
private int maxReadsInRamPerThread = SAMFileWriterImpl.getDefaultMaxRecordsInRam();
3032
private List<File> tmpDirs = Collections.singletonList(new File(System.getProperty("java.io.tmpdir")));
3133
private File barcodesDir;
3234
private boolean demultiplex = false;
33-
private int numThreads = Runtime.getRuntime().availableProcessors();
35+
private int numProcessors = Runtime.getRuntime().availableProcessors();
3436
private Integer firstTile = null;
3537
private Integer tileLimit = null;
3638
private BclQualityEvaluationStrategy bclQualityEvaluationStrategy = new BclQualityEvaluationStrategy(BclQualityEvaluationStrategy.ILLUMINA_ALLEGED_MINIMUM_QUALITY);
3739
private boolean ignoreUnexpectedBarcodes = false;
3840
private boolean applyEamssFiltering = false;
3941
private boolean includeNonPfReads = false;
42+
private AsyncWriterPool writerPool = null;
4043

4144
/**
4245
* Constructs a new builder used for creating BasecallsConverter objects.
@@ -48,7 +51,7 @@ public class BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> {
4851
* one writer stored with key=null.
4952
*/
5053
public BasecallsConverterBuilder(final File basecallsDir, final Integer lane, final ReadStructure readStructure,
51-
Map<String, ? extends BasecallsConverter.ConvertedClusterDataWriter<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap) {
54+
Map<String, ? extends Writer<CLUSTER_OUTPUT_RECORD>> barcodeRecordWriterMap) {
5255
this.basecallsDir = basecallsDir;
5356
this.lane = lane;
5457
this.readStructure = readStructure;
@@ -68,7 +71,7 @@ public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> withSorting(Comparator<C
6871
SortingCollection.Codec<CLUSTER_OUTPUT_RECORD> codecPrototype,
6972
Class<CLUSTER_OUTPUT_RECORD> outputRecordClass,
7073
List<File> tmpDirs) {
71-
if(outputRecordComparator == null || codecPrototype == null || outputRecordClass == null){
74+
if (outputRecordComparator == null || codecPrototype == null || outputRecordClass == null) {
7275
throw new PicardException("outputRecordComparator, codecPrototype and outputRecordClasse can not be null.");
7376
}
7477
this.outputRecordComparator = outputRecordComparator;
@@ -84,18 +87,19 @@ public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> withSorting(Comparator<C
8487
* @return A basecalls converter that will output records according to the parameters set.
8588
*/
8689
public BasecallsConverter<CLUSTER_OUTPUT_RECORD> build() {
87-
8890
if (outputRecordComparator != null && codecPrototype != null && outputRecordClass != null && tmpDirs != null) {
8991
return new SortedBasecallsConverter<>(basecallsDir, barcodesDir, lane, readStructure,
9092
barcodeRecordWriterMap, demultiplex, maxReadsInRamPerThread,
91-
tmpDirs, numThreads,
93+
tmpDirs, numProcessors,
9294
firstTile, tileLimit, outputRecordComparator,
9395
codecPrototype,
94-
outputRecordClass, bclQualityEvaluationStrategy, ignoreUnexpectedBarcodes, applyEamssFiltering, includeNonPfReads);
96+
outputRecordClass, bclQualityEvaluationStrategy, ignoreUnexpectedBarcodes, applyEamssFiltering,
97+
includeNonPfReads, writerPool);
9598
} else {
9699
return new UnsortedBasecallsConverter<>(basecallsDir, barcodesDir, lane, readStructure,
97-
barcodeRecordWriterMap, demultiplex, numThreads, firstTile, tileLimit,
98-
bclQualityEvaluationStrategy, ignoreUnexpectedBarcodes, applyEamssFiltering, includeNonPfReads);
100+
barcodeRecordWriterMap, demultiplex, firstTile, tileLimit,
101+
bclQualityEvaluationStrategy, ignoreUnexpectedBarcodes, applyEamssFiltering, includeNonPfReads,
102+
writerPool);
99103
}
100104
}
101105

@@ -174,14 +178,7 @@ public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> firstTile(Integer firstT
174178
* @return A builder that will create a converter with numProcessors set.
175179
*/
176180
public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> numProcessors(int numProcessors) {
177-
if (numProcessors == 0) {
178-
this.numThreads = Runtime.getRuntime().availableProcessors();
179-
} else if (numProcessors < 0) {
180-
this.numThreads = Runtime.getRuntime().availableProcessors() + numProcessors;
181-
} else {
182-
this.numThreads = numProcessors;
183-
}
184-
181+
this.numProcessors = numProcessors;
185182
return this;
186183
}
187184

@@ -216,7 +213,12 @@ public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> barcodesDir(File barcode
216213
* @return A builder that will create a converter with the maximum records in RAM set to `maxReadsInRam/numThreads`
217214
*/
218215
public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> withMaxRecordsInRam(int maxReadsInRam) {
219-
this.maxReadsInRamPerThread = Math.max(1, maxReadsInRam / this.numThreads);
216+
this.maxReadsInRamPerThread = Math.max(1, maxReadsInRam / this.numProcessors);
217+
return this;
218+
}
219+
220+
public BasecallsConverterBuilder<CLUSTER_OUTPUT_RECORD> withAsyncWriterPool(AsyncWriterPool writerPool) {
221+
this.writerPool = writerPool;
220222
return this;
221223
}
222224
}

0 commit comments

Comments
 (0)