Skip to content

Commit

Permalink
Add support for reading from Unix/Posix pipes (fifos) (#57)
Browse files Browse the repository at this point in the history
* Add support for reading from Unix/Posix pipes (fifos)
Co-authored-by: Seth Stadick <sstadick@gmail.com>
Co-authored-by: tfenne <tfenne@tfenne.com>
  • Loading branch information
nh13 authored Mar 11, 2022
1 parent b21827d commit e49f15b
Show file tree
Hide file tree
Showing 2 changed files with 63 additions and 6 deletions.
13 changes: 12 additions & 1 deletion src/main/scala/com/fulcrumgenomics/commons/io/Io.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
package com.fulcrumgenomics.commons.io

import java.io._
import java.nio.file.attribute.BasicFileAttributes
import java.nio.file.{Files, Path}
import java.util.zip.{GZIPInputStream, GZIPOutputStream}

Expand Down Expand Up @@ -55,12 +56,22 @@ trait IoUtil {

/** Creates a new InputStream to read from the supplied path. Automatically handles gzipped files. */
def toInputStream(path: Path) : InputStream = {

PathUtil.extensionOf(path) match {
case Some(".gz") | Some(".bgz") | Some(".bgzip") =>
new GZIPInputStream(Files.newInputStream(path), bufferSize)
case _ => {
val stream = if (Files.isSameFile(path, Io.StdIn)) System.in else Files.newInputStream(path)
new BufferedInputStream(stream, bufferSize)
val attrs = Files.readAttributes(path.toRealPath(), classOf[BasicFileAttributes])

if (!attrs.isRegularFile && attrs.isOther) {
// Not a regular file, directory or symlink - which likely means a named pipe
// and for some reason buffering reading from named pipes goes badly
stream
}
else {
new BufferedInputStream(stream, bufferSize)
}
}
}
}
Expand Down
56 changes: 51 additions & 5 deletions src/test/scala/com/fulcrumgenomics/commons/io/IoTest.scala
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,18 @@
*/
package com.fulcrumgenomics.commons.io

import java.io.{BufferedOutputStream, BufferedReader, FileInputStream, FileOutputStream, InputStreamReader}
import java.nio.file.{Files, Path, Paths}
import java.io.{BufferedOutputStream, BufferedReader, FileInputStream, FileOutputStream, InputStreamReader, PrintStream}
import java.nio.file.{Files, FileSystems, Path, Paths}
import java.util.concurrent.TimeUnit
import java.util.zip.GZIPInputStream

import com.fulcrumgenomics.commons.util.UnitSpec

import scala.concurrent.ExecutionContext.Implicits.global
import scala.concurrent.duration.Duration
import scala.concurrent.{Await, Future}
import java.io.BufferedInputStream

/**
* Tests for various methods in the Io class
*/
Expand All @@ -53,9 +59,13 @@ class IoTest extends UnitSpec {
path
}

/** Impl of IoUtil to test that compressionLevel can be overridden and set */
class FakeIo(var compressionLevel: Int = 5, override val bufferSize: Int = 128*1024) extends IoUtil {}
object FakeIo extends FakeIo(compressionLevel=5, bufferSize=128*1024)
private val hasMkfifo: Boolean = FileSystems.getDefault().supportedFileAttributeViews().contains("posix")

def makeNamedPipe(path: Path): Unit = {
val pipeProcess = new ProcessBuilder("mkfifo", s"${path.toAbsolutePath}").start()
pipeProcess.waitFor(5, TimeUnit.SECONDS) shouldBe true
pipeProcess.exitValue() shouldBe 0
}

"Io.assertReadable" should "not throw an exception for extent files" in {
val f1 = tmpfile(); val f2 = tmpfile(); val f3 = tmpfile()
Expand All @@ -67,6 +77,16 @@ class IoTest extends UnitSpec {
Io.assertReadable(Io.StdIn)
}

it should "not throw an exception for Unix fifos" in {
assume(hasMkfifo, "Canceling Posix specific test.")
val lines = Seq("foo", "bar")
val pipeDir = tmpdir()
val pipePath = pipeDir.resolve("pipe1")
makeNamedPipe(pipePath)
Io.assertReadable(pipePath)
Files.delete(pipePath)
}

it should "throw an exception for when file isn't readable" in {
val nullpath: Path = null
an[IllegalArgumentException] should be thrownBy { Io.assertReadable(nullpath) }
Expand Down Expand Up @@ -193,6 +213,19 @@ class IoTest extends UnitSpec {
an[IllegalArgumentException] should be thrownBy Io.readLinesFromResource("/path/does/not/exist.json")
}

"Io.readLines" should "read Unix fifos" in {
assume(hasMkfifo, "Canceling Posix specific test.")
val lines = Seq("This is a line of text right here!", "And this is another!!! Woo.")
val pipeDir = tmpdir()
val pipePath = pipeDir.resolve("pipe2")
Files.deleteIfExists(pipePath) // in case of a failed previous test that needs clean up
makeNamedPipe(pipePath)
val writeFuture = Future { Io.writeLines(pipePath, lines); true }
Io.readLines(pipePath).toList should contain theSameElementsInOrderAs lines
Await.result(writeFuture, Duration(3, TimeUnit.SECONDS)) shouldBe true
Files.delete(pipePath)
}

"Io.readBytesFromResource" should "correctly read binary data from a resource" in {
val expected = Range.inclusive(Byte.MinValue, Byte.MaxValue).map(_.toByte).toArray
val actual = Io.readBytesFromResource("/com/fulcrumgenomics/commons/io/to-bytes-from-resource-test.bin")
Expand Down Expand Up @@ -220,7 +253,20 @@ class IoTest extends UnitSpec {
val out = Io.readLines(f).toSeq
out shouldBe in
}

it should "return BufferedInputStream for both regular and symlinked files" in {
val tempDir = tmpdir()
val realFile = tmpfile(readable=true)
val link = Paths.get(tempDir.toString, "symbolic_file.txt")
val symFile = Files.createSymbolicLink(link, realFile)
Io.toInputStream(realFile) shouldBe an[BufferedInputStream]
Io.toInputStream(symFile) shouldBe an[BufferedInputStream]
}

/** Impl of IoUtil to test that compressionLevel can be overridden and set */
class FakeIo(var compressionLevel: Int = 5, override val bufferSize: Int = 128*1024) extends IoUtil {}
object FakeIo extends FakeIo(compressionLevel=5, bufferSize=128*1024)

"IoUtil.compressionLevel" should "be settable" in {
FakeIo.compressionLevel = 6
FakeIo.compressionLevel shouldBe 6
Expand Down

0 comments on commit e49f15b

Please sign in to comment.