Skip to content

Commit

Permalink
Fix CRAMFileReader stream management (#1533)
Browse files Browse the repository at this point in the history
* Prevent CRAMFileReader SeekableStreams from being closed by closeble iterators, so they can be
reused with multiple queries.
  • Loading branch information
cmnbroad authored Feb 4, 2021
1 parent 0b59514 commit bcc4777
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 2 deletions.
66 changes: 64 additions & 2 deletions src/main/java/htsjdk/samtools/CRAMFileReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ public class CRAMFileReader extends SamReader.ReaderImplementation implements Sa
private File cramFile;
private final CRAMReferenceSource referenceSource;
private InputStream inputStream;
private DeferredCloseSeekableStream deferredCloseSeekableStream;
private CRAMIterator iterator;
private BAMIndex mIndex;
private File mIndexFile;
Expand Down Expand Up @@ -387,20 +388,81 @@ public CloseableIterator<SAMRecord> queryUnmapped() {

private SeekableStream getSeekableStreamOrFailWithRTE() {
SeekableStream seekableStream = null;

if (cramFile != null) {
try {
// If this reader was provided with a File, create a SeekableStream directly and
// let it be closed by CloseableIterators, and then recreated on demand.
seekableStream = new SeekableFileStream(cramFile);
} catch (final FileNotFoundException e) {
throw new RuntimeException(e);
}
} else if (inputStream instanceof SeekableStream) {
seekableStream = (SeekableStream) inputStream;
} else if (inputStream != null && inputStream instanceof SeekableStream) {
// For SeekableStreams that were provided to the reader constructor instead of a File, we
// need to prevent CloseableIterators from closing the underlying stream since we can't
// reconstitute a SeekableStream from an InputStream. So wrap the underlying SeekableStream
// in a DeferredCloseSeekableStream with a no-op close implementation, and defer closing it
// until enclosing reader is closed.
if (deferredCloseSeekableStream == null) {
deferredCloseSeekableStream = new DeferredCloseSeekableStream((SeekableStream) inputStream);
}
seekableStream = deferredCloseSeekableStream;
}
return seekableStream;
}

// In order to reuse a SeekableStream multiple times with CloseableIterators (which close the
// underlying stream when they're done), we need to wrap the SeekableStream in an object that
// has a no-op close implementation so the actual close can be deferred until the enclosing
// reader is closed.
private static class DeferredCloseSeekableStream extends SeekableStream {
private final SeekableStream delegateStream;

public DeferredCloseSeekableStream(final SeekableStream delegateStream) {
this.delegateStream = delegateStream;
if (delegateStream instanceof DeferredCloseSeekableStream) {
throw new IllegalArgumentException("ReuseableSeekableStream objects cannot be nested");
}
}

public SeekableStream getDelegate() { return delegateStream; }

@Override
public long length() { return delegateStream.length(); }

@Override
public long position() throws IOException { return delegateStream.position(); }

@Override
public void seek(long position) throws IOException { delegateStream.seek(position); }

@Override
public int read() throws IOException { return delegateStream.read(); }

@Override
public int read(byte[] buffer, int offset, int length) throws IOException {
return delegateStream.read(buffer, offset, length);
}

@Override
public void close() throws IOException {
// defer close, and let the caller close the delegate when its ready to by calling
// getDelegate().close()
}

@Override
public boolean eof() throws IOException { return delegateStream.eof(); }

@Override
public String getSource() { return delegateStream.getSource(); }
}

@Override
public void close() {
// if at any point we created a deferredCloseSeekableStream, close the underlying delegate now
if (deferredCloseSeekableStream != null) {
CloserUtil.close(deferredCloseSeekableStream.getDelegate());
}
CloserUtil.close(iterator);
CloserUtil.close(inputStream);
CloserUtil.close(mIndex);
Expand Down
63 changes: 63 additions & 0 deletions src/test/java/htsjdk/samtools/CRAMIndexQueryTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
*/
package htsjdk.samtools;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;
import htsjdk.HtsjdkTest;
import htsjdk.samtools.seekablestream.SeekableFileStream;
import htsjdk.samtools.util.CloseableIterator;
Expand All @@ -34,7 +36,9 @@
import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
import java.util.function.Function;

Expand Down Expand Up @@ -785,4 +789,63 @@ public void testUnmappedMultiSlice(
Assert.assertEquals(count, expectedCount);
}

@DataProvider(name = "serialQueries")
public Object[][] serialQueries() {
return new Object[][]{
{cramQueryWithCRAI, cramQueryReference,
new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5},
{cramQueryWithLocalCRAI, cramQueryReference,
new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5},
{cramQueryWithBAI, cramQueryReference,
new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5},
};
}

@Test(dataProvider = "serialQueries")
public void testSerialQueriesOnRemoteFile(
final File cramFile,
final File referenceFile,
final QueryInterval interval,
final String[] expectedNames,
final int nUnmapped) throws IOException {
final Path cramIndex = SamFiles.findIndex(cramFile.toPath());

try (final FileSystem jimfs = Jimfs.newFileSystem(Configuration.unix())) {
final Path jimfsCRAM = jimfs.getPath("remotecram.cram");
final Path jimfsCRAI = jimfs.getPath("remotecram.crai");

Files.copy(cramFile.toPath(), jimfsCRAM);
Files.copy(cramIndex, jimfsCRAI);

SamReaderFactory factory = SamReaderFactory.makeDefault().validationStringency(ValidationStringency.LENIENT);
if (referenceFile != null) {
factory = factory.referenceSequence(referenceFile);
}
try (final SamReader reader = factory.open(jimfsCRAM)) {
try (final CloseableIterator<SAMRecord> it = reader.queryOverlapping(new QueryInterval[]{interval})) {
int count = 0;
while (it.hasNext()) {
final SAMRecord samRec = it.next();
Assert.assertTrue(count < expectedNames.length);
Assert.assertEquals(samRec.getReadName(), expectedNames[count]);
count++;
}
Assert.assertEquals(count, expectedNames.length);
}

// now execute a second query on the same SamReader, this time for unmapped reads, after having
// closed the previous iterator
try (final CloseableIterator<SAMRecord> it = reader.queryUnmapped()) {
int count = 0;
while (it.hasNext()) {
final SAMRecord samRec = it.next();
Assert.assertTrue(samRec.getReadUnmappedFlag());
count++;
}
Assert.assertEquals(count, nUnmapped);
}
}
}
}

}

0 comments on commit bcc4777

Please sign in to comment.