Cleaned up code for by interval traversals for Jared. Initialization code refactored and made clear. by loci and by loci by interval use the same underlying code now. Everyone uses the same initialization code to set things up. It's a party in the TraversalEngine and everyone's invited...

git-svn-id: file:///humgen/gsa-scr1/gsa-engineering/svn_contents/trunk@179 348d0f76-0448-11de-a6fe-93d51630548a
This commit is contained in:
depristo 2009-03-24 22:32:45 +00:00
parent 28c1330b4b
commit 919a86e876
2 changed files with 101 additions and 112 deletions

View File

@ -149,8 +149,8 @@ public class GenomeAnalysisTK extends CommandLineProgram {
}
engine.setSafetyChecking(!UNSAFE);
engine.setSortOnFly(ENABLED_SORT_ON_FLY);
engine.initialize(ENABLED_THREADED_IO);
engine.setThreadedIO(ENABLED_THREADED_IO);
engine.initialize();
//engine.testReference();
//LocusWalker<Integer,Integer> walker = new PileupWalker();

View File

@ -86,6 +86,7 @@ public class TraversalEngine {
public boolean FILTER_UNSORTED_READS = false;
public int MAX_ON_FLY_SORTS = 100000;
public long N_RECORDS_TO_PRINT = 100000;
public boolean THREADED_IO = false;
public int THREADED_IO_BUFFER_SIZE = 10000;
/**
@ -131,6 +132,10 @@ public class TraversalEngine {
this.maxReads = maxReads;
}
public void setThreadedIO(final boolean threadedIO) {
this.THREADED_IO = threadedIO;
}
public void setDebugging(final boolean d) {
DEBUGGING = d;
}
@ -353,7 +358,7 @@ public class TraversalEngine {
*
* @return true on success
*/
public boolean initialize(final boolean THREADED_IO) {
public boolean initialize() {
lastProgressPrintTime = startTime = System.currentTimeMillis();
initializeReference();
// Initial the reference ordered data iterators
@ -362,57 +367,76 @@ public class TraversalEngine {
return true;
}
private void initializeReads(final boolean THREADED_IO) {
private Iterator<SAMRecord> initializeReads() {
samReader = initializeSAMFile(readsFile);
return WrapReadsIterator(getReadsIterator(samReader), true);
}
Iterator<SAMRecord> samIterator;
try {
samReadIter = loadSAMFile(readsFile, THREADED_IO);
}
catch (IOException ex) {
// TODO: IOException should be a checked exception in this case.
throw new RuntimeIOException(ex);
}
if (SORT_ON_FLY)
samReadIter = new SortSamIterator(samReadIter, MAX_ON_FLY_SORTS);
if (beSafeP)
samReadIter = new VerifyingSamIterator(samReadIter);
if (THREADED_IO) {
logger.info(String.format("Enabling threaded I/O with buffer of %d reads", THREADED_IO_BUFFER_SIZE));
samReadIter = new ThreadedIterator<SAMRecord>(samReadIter, THREADED_IO_BUFFER_SIZE);
private Iterator<SAMRecord> getReadsIterator(final SAMFileReader samReader) {
// If the file has an index, querying functions are available. Use them if possible...
if (samReader.hasIndex()) {
return new SamQueryIterator(samReader, locs);
} else {
return samReader.iterator();
}
}
protected Iterator<SAMRecord> loadSAMFile(final File samFile, final boolean threadedIO)
throws IOException {
Iterator<SAMRecord> iterator = null;
private Iterator<SAMRecord> WrapReadsIterator( final Iterator<SAMRecord> rawIterator, final boolean enableVerification ) {
Iterator<SAMRecord> wrappedIterator = rawIterator;
samReader = new SAMFileReader(readsFile, true);
if (SORT_ON_FLY)
wrappedIterator = new SortSamIterator(wrappedIterator, MAX_ON_FLY_SORTS);
if (beSafeP && enableVerification)
wrappedIterator = new VerifyingSamIterator(wrappedIterator);
if (THREADED_IO) {
logger.info(String.format("Enabling threaded I/O with buffer of %d reads", THREADED_IO_BUFFER_SIZE));
wrappedIterator = new ThreadedIterator<SAMRecord>(wrappedIterator, THREADED_IO_BUFFER_SIZE);
}
return wrappedIterator;
}
private SAMFileReader initializeSAMFile(final File samFile) {
SAMFileReader samReader = new SAMFileReader(samFile, true);
samReader.setValidationStringency(strictness);
final SAMFileHeader header = samReader.getFileHeader();
logger.info(String.format("Sort order is: " + header.getSortOrder()));
// If the file has an index, querying functions are available. Use them if possible...
if (samReader.hasIndex()) {
iterator = new SamQueryIterator(samReader, locs);
} else {
// Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream.
samReader.close();
final FileInputStream samFileStream = new FileInputStream(readsFile);
final InputStream bufferedStream = new BufferedInputStream(samFileStream);
samReader = new SAMFileReader(readsFile, true);
samReader.setValidationStringency(strictness);
samReadingTracker = new FileProgressTracker<SAMRecord>(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000);
iterator = samReadingTracker;
}
return iterator;
return samReader;
}
// cleaning up past mistakes
// private Iterator<SAMRecord> loadSAMFile(final File samFile)
// throws IOException {
// Iterator<SAMRecord> iterator = null;
//
// samReader = new SAMFileReader(samFile, true);
// samReader.setValidationStringency(strictness);
//
// final SAMFileHeader header = samReader.getFileHeader();
// logger.info(String.format("Sort order is: " + header.getSortOrder()));
//
// // If the file has an index, querying functions are available. Use them if possible...
// if (samReader.hasIndex()) {
// iterator = new SamQueryIterator(samReader, locs);
// } else {
// // Ugh. Close and reopen the file so that the file progress decorator can be assigned to the input stream.
// samReader.close();
//
// final FileInputStream samFileStream = new FileInputStream(readsFile);
// //final InputStream bufferedStream = new BufferedInputStream(samFileStream);
// samReader = new SAMFileReader(readsFile, true);
// samReader.setValidationStringency(strictness);
//
// samReadingTracker = new FileProgressTracker<SAMRecord>(readsFile, samReader.iterator(), samFileStream.getChannel(), 1000);
// iterator = samReadingTracker;
// }
//
// return iterator;
// }
/**
* Prepare the reference for stream processing
@ -544,30 +568,36 @@ public class TraversalEngine {
* @return 0 on success
*/
protected <M, T> int traverseByLoci(LocusWalker<M, T> walker) {
initializeReads(false);
samReadIter = initializeReads();
verifySortOrder(true);
// prepare the read filtering read iterator and provide it to a new locus iterator
FilteringIterator filterIter = new FilteringIterator(samReadIter, new locusStreamFilterFunc());
//LocusIterator iter = new SingleLocusIterator(filterIter);
LocusIterator iter = new LocusIteratorByHanger(filterIter);
// initialize the walker object
walker.initialize();
// Initialize the T sum using the walker
T sum = walker.reduceInit();
boolean done = false;
GenomeLoc prevLoc = null;
// We aren't locus oriented
T sum = carryWalkerOverInterval(walker, samReadIter, walker.reduceInit(), null);
printOnTraversalDone("loci", sum);
walker.onTraversalDone();
return 0;
}
private <M, T> T carryWalkerOverInterval( LocusWalker<M, T> walker, Iterator<SAMRecord> readIter, T sum, GenomeLoc interval ) {
// prepare the read filtering read iterator and provide it to a new locus iterator
FilteringIterator filterIter = new FilteringIterator(readIter, new locusStreamFilterFunc());
boolean done = false;
LocusIterator iter = new LocusIteratorByHanger(filterIter);
while (iter.hasNext() && !done) {
this.nRecords++;
// actually get the read and hand it to the walker
LocusContext locus = iter.next();
if ( inLocations(locus.getLocation()) ) {
// if we don't have a particular interval we're processing, check them all, otherwise only operate at this
// location
if ( ( interval == null && inLocations(locus.getLocation()) ) || interval.overlapsP(locus.getLocation()) ) {
//System.out.format("Working at %s\n", locus.getLocation().toString());
@ -601,13 +631,17 @@ public class TraversalEngine {
done = true;
}
}
printOnTraversalDone("loci", sum);
walker.onTraversalDone();
return 0;
return sum;
}
/**
* Same as the normal locus traverser, but oriented by locus, rather than implicitly
*
* @param walker A locus walker object
* @param <M> MapType -- the result of calling map() on walker
* @param <T> ReduceType -- the result of calling reduce() on the walker
* @return 0 on success
*/
protected <M, T> int traverseByLociByInterval(LocusWalker<M, T> walker) {
//verifySortOrder(true);
@ -616,62 +650,17 @@ public class TraversalEngine {
// Initialize the T sum using the walker
T sum = walker.reduceInit();
samReader = initializeSAMFile(readsFile);
for ( GenomeLoc interval : locs ) {
System.out.printf("Processing locus %s%n", interval.toString());
GenomeLoc[] intervalArray = { interval };
samReader = new SAMFileReader(readsFile, true);
samReader.setValidationStringency(strictness);
CloseableIterator<SAMRecord> readIters = samReader.queryOverlapping( interval.getContig(),
(int)interval.getStart(),
(int)interval.getStop() );
CloseableIterator<SAMRecord> readIter = samReader.queryOverlapping( interval.getContig(),
(int)interval.getStart(),
(int)interval.getStop() );
// prepare the read filtering read iterator and provide it to a new locus iterator
FilteringIterator filterIter = new FilteringIterator(readIters, new locusStreamFilterFunc());
boolean done = false;
LocusIterator iter = new LocusIteratorByHanger(filterIter);
while (iter.hasNext() && !done) {
this.nRecords++;
// actually get the read and hand it to the walker
LocusContext locus = iter.next();
if ( interval.overlapsP(locus.getLocation()) ) {
//System.out.format("Working at %s\n", locus.getLocation().toString());
// Jump forward in the reference to this locus location
final ReferenceIterator refSite;
refSite = refIter.seekForward(locus.getLocation());
final char refBase = refSite.getBaseAsChar();
locus.setReferenceContig(refSite.getCurrentContig());
// Iterate forward to get all reference ordered data covering this locus
final List<ReferenceOrderedDatum> rodData = getReferenceOrderedDataAtLocus(rodIters, locus.getLocation());
logger.debug(String.format(" Reference: %s:%d %c", refSite.getCurrentContig().getName(), refSite.getPosition(), refBase));
//
// Execute our contract with the walker. Call filter, map, and reduce
//
final boolean keepMeP = walker.filter(rodData, refBase, locus);
if (keepMeP) {
M x = walker.map(rodData, refBase, locus);
sum = walker.reduce(x, sum);
}
if (this.maxReads > 0 && this.nRecords > this.maxReads) {
logger.warn(String.format("Maximum number of reads encountered, terminating traversal " + this.nRecords));
done = true;
}
printProgress("loci", locus.getLocation());
if (pastFinalLocation(locus.getLocation()))
done = true;
}
}
readIters.close();
Iterator<SAMRecord> wrappedIter = WrapReadsIterator( readIter, false );
sum = carryWalkerOverInterval(walker, wrappedIter, sum, interval);
readIter.close();
}
printOnTraversalDone("loci", sum);
@ -691,7 +680,7 @@ public class TraversalEngine {
* @return 0 on success
*/
protected <M, R> int traverseByRead(ReadWalker<M, R> walker) {
initializeReads(false);
samReadIter = initializeReads();
if (refFileName == null && !walker.requiresOrderedReads() && verifyingSamReadIter != null) {
logger.warn(String.format("STATUS: No reference file provided and unordered reads are tolerated, enabling out of order read processing."));