diff --git a/src/main/java/htsjdk/samtools/CRAMFileReader.java b/src/main/java/htsjdk/samtools/CRAMFileReader.java index fe1eeba9b1..43e66711ff 100644 --- a/src/main/java/htsjdk/samtools/CRAMFileReader.java +++ b/src/main/java/htsjdk/samtools/CRAMFileReader.java @@ -39,6 +39,7 @@ public class CRAMFileReader extends SamReader.ReaderImplementation implements Sa private File cramFile; private final CRAMReferenceSource referenceSource; private InputStream inputStream; + private DeferredCloseSeekableStream deferredCloseSeekableStream; private CRAMIterator iterator; private BAMIndex mIndex; private File mIndexFile; @@ -387,20 +388,81 @@ public CloseableIterator queryUnmapped() { private SeekableStream getSeekableStreamOrFailWithRTE() { SeekableStream seekableStream = null; + if (cramFile != null) { try { + // If this reader was provided with a File, create a SeekableStream directly and + // let it be closed by CloseableIterators, and then recreated on demand. seekableStream = new SeekableFileStream(cramFile); } catch (final FileNotFoundException e) { throw new RuntimeException(e); } - } else if (inputStream instanceof SeekableStream) { - seekableStream = (SeekableStream) inputStream; + } else if (inputStream != null && inputStream instanceof SeekableStream) { + // For SeekableStreams that were provided to the reader constructor instead of a File, we + // need to prevent CloseableIterators from closing the underlying stream since we can't + // reconstitute a SeekableStream from an InputStream. So wrap the underlying SeekableStream + // in a DeferredCloseSeekableStream with a no-op close implementation, and defer closing it + // until enclosing reader is closed. + if (deferredCloseSeekableStream == null) { + deferredCloseSeekableStream = new DeferredCloseSeekableStream((SeekableStream) inputStream); + } + seekableStream = deferredCloseSeekableStream; } return seekableStream; } + // In order to reuse a SeekableStream multiple times with CloseableIterators (which close the + // underlying stream when they're done), we need to wrap the SeekableStream in an object that + // has a no-op close implementation so the actual close can be deferred until the enclosing + // reader is closed. + private static class DeferredCloseSeekableStream extends SeekableStream { + private final SeekableStream delegateStream; + + public DeferredCloseSeekableStream(final SeekableStream delegateStream) { + this.delegateStream = delegateStream; + if (delegateStream instanceof DeferredCloseSeekableStream) { + throw new IllegalArgumentException("ReuseableSeekableStream objects cannot be nested"); + } + } + + public SeekableStream getDelegate() { return delegateStream; } + + @Override + public long length() { return delegateStream.length(); } + + @Override + public long position() throws IOException { return delegateStream.position(); } + + @Override + public void seek(long position) throws IOException { delegateStream.seek(position); } + + @Override + public int read() throws IOException { return delegateStream.read(); } + + @Override + public int read(byte[] buffer, int offset, int length) throws IOException { + return delegateStream.read(buffer, offset, length); + } + + @Override + public void close() throws IOException { + // defer close, and let the caller close the delegate when its ready to by calling + // getDelegate().close() + } + + @Override + public boolean eof() throws IOException { return delegateStream.eof(); } + + @Override + public String getSource() { return delegateStream.getSource(); } + } + @Override public void close() { + // if at any point we created a deferredCloseSeekableStream, close the underlying delegate now + if (deferredCloseSeekableStream != null) { + CloserUtil.close(deferredCloseSeekableStream.getDelegate()); + } CloserUtil.close(iterator); CloserUtil.close(inputStream); CloserUtil.close(mIndex); diff --git a/src/test/java/htsjdk/samtools/CRAMIndexQueryTest.java b/src/test/java/htsjdk/samtools/CRAMIndexQueryTest.java index 7b98a9b4df..027fc22cbc 100644 --- a/src/test/java/htsjdk/samtools/CRAMIndexQueryTest.java +++ b/src/test/java/htsjdk/samtools/CRAMIndexQueryTest.java @@ -23,6 +23,8 @@ */ package htsjdk.samtools; +import com.google.common.jimfs.Configuration; +import com.google.common.jimfs.Jimfs; import htsjdk.HtsjdkTest; import htsjdk.samtools.seekablestream.SeekableFileStream; import htsjdk.samtools.util.CloseableIterator; @@ -34,7 +36,9 @@ import java.io.File; import java.io.FileOutputStream; import java.io.IOException; +import java.nio.file.FileSystem; import java.nio.file.Files; +import java.nio.file.Path; import java.nio.file.StandardCopyOption; import java.util.function.Function; @@ -785,4 +789,63 @@ public void testUnmappedMultiSlice( Assert.assertEquals(count, expectedCount); } + @DataProvider(name = "serialQueries") + public Object[][] serialQueries() { + return new Object[][]{ + {cramQueryWithCRAI, cramQueryReference, + new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5}, + {cramQueryWithLocalCRAI, cramQueryReference, + new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5}, + {cramQueryWithBAI, cramQueryReference, + new QueryInterval(0, 100009, 100009), new String[]{"a", "b", "c"}, 5}, + }; + } + + @Test(dataProvider = "serialQueries") + public void testSerialQueriesOnRemoteFile( + final File cramFile, + final File referenceFile, + final QueryInterval interval, + final String[] expectedNames, + final int nUnmapped) throws IOException { + final Path cramIndex = SamFiles.findIndex(cramFile.toPath()); + + try (final FileSystem jimfs = Jimfs.newFileSystem(Configuration.unix())) { + final Path jimfsCRAM = jimfs.getPath("remotecram.cram"); + final Path jimfsCRAI = jimfs.getPath("remotecram.crai"); + + Files.copy(cramFile.toPath(), jimfsCRAM); + Files.copy(cramIndex, jimfsCRAI); + + SamReaderFactory factory = SamReaderFactory.makeDefault().validationStringency(ValidationStringency.LENIENT); + if (referenceFile != null) { + factory = factory.referenceSequence(referenceFile); + } + try (final SamReader reader = factory.open(jimfsCRAM)) { + try (final CloseableIterator it = reader.queryOverlapping(new QueryInterval[]{interval})) { + int count = 0; + while (it.hasNext()) { + final SAMRecord samRec = it.next(); + Assert.assertTrue(count < expectedNames.length); + Assert.assertEquals(samRec.getReadName(), expectedNames[count]); + count++; + } + Assert.assertEquals(count, expectedNames.length); + } + + // now execute a second query on the same SamReader, this time for unmapped reads, after having + // closed the previous iterator + try (final CloseableIterator it = reader.queryUnmapped()) { + int count = 0; + while (it.hasNext()) { + final SAMRecord samRec = it.next(); + Assert.assertTrue(samRec.getReadUnmappedFlag()); + count++; + } + Assert.assertEquals(count, nUnmapped); + } + } + } + } + }