File tree Expand file tree Collapse file tree 2 files changed +6
-28
lines changed
java/au/csiro/variantspark/utils
scala/au/csiro/variantspark/cli/args Expand file tree Collapse file tree 2 files changed +6
-28
lines changed Original file line number Diff line number Diff line change @@ -20,37 +20,11 @@ public static boolean isBGZFile(String filePath) {
20
20
* .vcf is not GZP file and get htsjdk.samtools.SAMFormatException: at header from java.io.BufferedReader.readLine(BufferedReader.java:389)
21
21
*/
22
22
try (BufferedInputStream bufferedInputStream = new BufferedInputStream (new FileInputStream (filePath ))) {
23
- //bufferedInputStream.mark(100); // mark the current position
24
23
boolean isValid = BlockCompressedInputStream .isValidFile (bufferedInputStream );
25
- //bufferedInputStream.reset(); // reset back to the marked position
26
24
return isValid ;
27
25
} catch (IOException e ) {
28
- // Handle the exception
26
+ //handle exception for non proper bgzip file
29
27
return false ;
30
28
}
31
29
}
32
-
33
- /**
34
- *
35
- * @param file: an input file
36
- * @return true if input file is Gzip by check the first two byte of input file
37
- * @throws IOException
38
- */
39
- public static boolean isInputGZip (final File file ) throws IOException {
40
- //final PushbackInputStream pb = new PushbackInputStream(input, 2);
41
-
42
- try (final InputStream input = new FileInputStream (file )){
43
- int header = input .read (); //read ID1
44
- if (header == -1 ) return false ;
45
-
46
- int b = input .read (); //read ID2
47
- if (b == -1 ) return false ;
48
-
49
- //ID2 * 256 + ID1 = 35615
50
- if ( ( (b << 8 ) | header ) == GZIPInputStream .GZIP_MAGIC )
51
- return true ;
52
- }
53
-
54
- return false ;
55
- }
56
30
}
Original file line number Diff line number Diff line change @@ -2,7 +2,7 @@ package au.csiro.variantspark.cli.args
2
2
3
3
import org .kohsuke .args4j .Option
4
4
import au .csiro .pbdava .ssparkle .spark .SparkApp
5
- import au .csiro .variantspark .utils ._
5
+ import au .csiro .variantspark .utils .FileUtils
6
6
import org .apache .spark .rdd .RDD
7
7
import htsjdk .samtools .util .BlockCompressedInputStream
8
8
import org .apache .hadoop .fs .Path
@@ -18,11 +18,15 @@ trait SparkArgs extends SparkApp {
18
18
val isBGZ = FileUtils .isBGZFile(inputFile)
19
19
println(inputFile + " is loading to spark RDD, isBGZFile: " + isBGZ)
20
20
if (isBGZ) {
21
+ // BGZIP file is compressed as blocks, requires specialized libraries htsjdk
21
22
val path = new Path (inputFile)
22
23
val fs = path.getFileSystem(sc.hadoopConfiguration)
23
24
val bgzInputStream = new BlockCompressedInputStream (fs.open(path))
25
+ // each blocks can be decompressed independently and to be read in parallel
24
26
sc.parallelize(Stream .continually(bgzInputStream.readLine()).takeWhile(_ != null ).toList)
25
27
} else {
28
+ // The standard GZIP libraries can handle files compressed as a whole
29
+ // load .vcf, .vcf.gz or .vcf.bz2 to RDD
26
30
sc.textFile(inputFile, if (sparkPar > 0 ) sparkPar else sc.defaultParallelism)
27
31
}
28
32
}
You can’t perform that action at this time.
0 commit comments