Skip to content
This repository has been archived by the owner on Jun 20, 2024. It is now read-only.

Yf contaminate and estimate bid #384

Draft
wants to merge 13 commits into
base: main
Choose a base branch
from
29 changes: 24 additions & 5 deletions core/src/main/scala/dagr/core/tasksystem/EitherTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@
package dagr.core.tasksystem

object EitherTask {

sealed trait Choice
object Left extends Choice

object Left extends Choice

object Right extends Choice

/**
Expand All @@ -48,17 +51,33 @@ object EitherTask {
* @param goLeft an expression that returns a Boolean, with true indicating Left and false indicating Right
*/
def of(left: Task, right: Task, goLeft: => Boolean): EitherTask = new EitherTask(left, right, () => if (goLeft) Left else Right)

def of(left: Iterable[Task], right: Iterable[Task], goLeft: => Boolean): EitherTask = {

val leftPipeline = new Pipeline {
def build(): Unit = {
left.foreach(root ==> _)
}
}
val rightPipeline = new Pipeline {
def build(): Unit = {
right.foreach(root ==> _)
}
}

new EitherTask(leftPipeline, rightPipeline, () => if (goLeft) Left else Right)
}
}

/** A task that returns either the left or right task based on a deferred choice. The choice function is
* not evaluated until all dependencies have been met and the `EitherTask` needs to make a decision about
* which task to return from [[getTasks].
* which task to return from [[getTasks]].
*
* @param left the left task.
* @param right the right task
* @param left the left task.
* @param right the right task
* @param choice an expression that returns either Left or Right when invoked
*/
class EitherTask private (private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task {
class EitherTask private(private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task {
/** Decides which task to return based on `choice` at execution time. */
override def getTasks: Iterable[Task] = Seq(if (choice() eq EitherTask.Left) left else right)
}
30 changes: 18 additions & 12 deletions core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,30 @@ import java.nio.file.Path
import scala.sys.process._

object ProcessTask {
val SpecialCharacters : String = """ `"\#()!~&<>|;*?""" + '\t' + '$'
val SpecialCharacters: String = """ `"\#()!~&<>|;*?""" + '\t' + '$'
private val SpecialsAndSingleQuote = SpecialCharacters + "'"
private val Escape = """\"""

/** Determines if the argument has any special characters in it, and if so quotes the whole string. */
def quoteIfNecessary(arg: String): String = {
val hasSingleQuotes = arg.contains("'")
val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch))
def quoteIfNecessary(anyArg: Any ): String = {
anyArg match {
case Unquotable(x) => x.toString
case _ => {
val arg = anyArg.toString

(hasSingleQuotes, hasSpecials) match {
case (false, false) => arg
case (true, false) => arg.replace("'", """\'""") // just escape the single quotes
case (false, true ) => "'" + arg + "'" // just single quote the whole string
case (true, true ) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("")
val hasSingleQuotes = arg.contains("'")
val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch))

(hasSingleQuotes, hasSpecials) match {
case (false, false) => arg
case (true, false) => arg.replace("'", """\'""") // just escape the single quotes
case (false, true) => "'" + arg + "'" // just single quote the whole string
case (true, true) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("")
}
}
}
}
}

/** A task that can execute a set of commands in its own process, and does not generate any new tasks.
*/
trait ProcessTask extends UnitTask {
Expand All @@ -64,8 +70,8 @@ trait ProcessTask extends UnitTask {
* @return the command string.
*/
private[core] def commandLine: String = {
if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg.toString)).mkString(" ")
else args.mkString(" ")
val mapped = if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg)) else args
mapped.mkString(" ")
}

/** Write the command to the script and get a process to run.
Expand Down
5 changes: 5 additions & 0 deletions core/src/main/scala/dagr/core/tasksystem/Unquotable.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package dagr.core.tasksystem

case class Unquotable(any: Any) {
override def toString: String = any.toString
}
90 changes: 90 additions & 0 deletions pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* The MIT License
*
* Copyright (c) 2021 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package dagr.pipelines

import java.nio.file.Files

import com.fulcrumgenomics.sopt.{arg, clp}
import dagr.core.cmdline.Pipelines
import dagr.core.execsystem.{Cores, Memory}
import dagr.core.tasksystem._
import dagr.tasks.DagrDef._
import dagr.tasks.misc.LinkFile
import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles}
import htsjdk.samtools.SAMFileHeader.SortOrder

/**
*/
@clp(description = "pipeline to contaminate bams.", group = classOf[Pipelines])
class ContaminateBams(
@arg(flag = 'i', doc = "Input bams for generating bams.") val bams: Seq[PathToBam],
@arg(flag = 'o', doc = "Output directory.") val out: DirPath,
@arg(flag = 'p', doc = "Output file prefix.") val prefix: String,
@arg(flag = 'c', doc = "Contamination levels to evaluate.") val contaminations: Seq[Double],
@arg(flag = 'd', doc = "Depth values to evaluate.") val depths: Seq[Integer],

) extends Pipeline(Some(out)) {

override def build(): Unit = {
val strat = Some(DownsamplingStrategy.HighAccuracy)

Files.createDirectories(out)

val pairsOfBam = for (x <- bams; y <- bams) yield (x, y)

pairsOfBam.foreach { case (x, y) => {
contaminations.foreach(c => {
val dsX = out.resolve(prefix + x.getFileName + ".forCont." + c + ".tmp.bam")
val dsY = out.resolve(prefix + y.getFileName + ".forCont." + c + ".tmp.bam")
// create a mixture of c x + (1-c) y taking into account the depths of
// x and y
// D_x and D_y are the effective depths of x and y
// if we are to downsample x we need x to be downsampled to c/(1-c)
// if we are to downsample y we need y to be downsampled to (1-c)/c
// of of those two values will be less than 1.
val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("2g"))
val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("2g"))

val copyX = new LinkFile(x, dsX)
val copyY = new LinkFile(y, dsY)
val downsampleX_Q = c / (1 - c) <= 1
val downsample = EitherTask.of(
downsampleX :: copyY :: Nil,
downsampleY :: copyX :: Nil,
downsampleX_Q)

val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName +
"__" + c + ".bam")
val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate)

root ==> downsample ==> mergedownsampled

})
}
}
}
}



93 changes: 93 additions & 0 deletions pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
/*
* The MIT License
*
* Copyright (c) 2020 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package dagr.pipelines

import java.nio.file.{Files, Path}

import com.fulcrumgenomics.commons.io.PathUtil
import com.fulcrumgenomics.sopt.{arg, clp}
import dagr.core.cmdline.Pipelines
import dagr.core.config.Configuration
import dagr.core.execsystem.{Cores, Memory, ResourceSet}
import dagr.core.tasksystem.{EitherTask, FixedResources, Linker, NoOpInJvmTask, Pipeline, ProcessTask}
import dagr.tasks.DagrDef._
import dagr.tasks.bwa.BwaMem
import dagr.tasks.gatk.{CountVariants, DownsampleVariants, Gatk4Task, GatkTask, IndexVariants, LeftAlignAndTrimVariants, VariantsToTable}
import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId}
import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam}
import htsjdk.samtools.SAMFileHeader.SortOrder

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/**
* Downsample Bams to create composite contaminated bams,
* then downsample to various depths
* and run VBID with different number of target snps to evaluate
* dependance of VBID on contamination & depth & number of target snps
*/
@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines])
class EvaluateVBID
(@arg(flag = 'i', doc = "Input bams to evaluate.") val bams: Seq[PathToBam],
@arg(flag = 'o', doc = "Output directory.") val out: DirPath,
@arg(flag = 'p', doc = "Output file prefix.") val prefix: String,
@arg(flag = 'm', doc = "Number of markers to use.") val markers: Seq[Integer],
@arg(flag = 'R', doc = "VBID vcf resource.") val vbidResource: PathToVcf,

) extends Pipeline(Some(out)) {

override def build(): Unit = {

val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count"))
root ==> countVariants

bams.foreach(bam => {
markers.foreach { markerCount => {
// subset VBID resource files to smaller number of markers

// this need to happen after the tool is run...need to figure out how to do that
val outDownsampledResource = out.resolve(prefix + "__" + vbidResource.getFileName +
"__" + markerCount + ".vcf")

val downsample = new DownsampleVariants(
in = vbidResource,
output = outDownsampledResource,
proportion = 1)

val vbid = new VerifyBamId(
vcf = outDownsampledResource,
bam = bam,
out = out.resolve(PathUtil.replaceExtension(bam, s".${markerCount}_markers.selfSM"))
)

countVariants ==>
Linker(countVariants, downsample)((f, c) => c.downsampleProportion = markerCount / f.count) ==>
vbid
}
}

})

}
}
8 changes: 6 additions & 2 deletions pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ import dagr.tasks.DagrDef
import dagr.tasks.bwa.BwaMem
import dagr.tasks.picard.{CollectHsMetrics, DeleteBam, MarkDuplicates, SortSam}
import DagrDef.{DirPath, PathToFasta, PathToFastq, PathToIntervals}
import dagr.core.config.Configuration
import dagr.core.execsystem.{Cores, Memory}
import htsjdk.samtools.SAMFileHeader.SortOrder

/**
Expand All @@ -54,8 +56,10 @@ class ExamplePipeline
Files.createDirectories(out)

val bwa = new BwaMem(fastq=fastq, ref=ref)
val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate)
val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix)
val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate) with Configuration {
requires(Cores(2), Memory("2G"))}
val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix)with Configuration {
requires(Cores(1), Memory("2G"))}
val rmtmp = new DeleteBam(tmpBam)

root ==> (bwa | sort) ==> mark ==> rmtmp
Expand Down
82 changes: 82 additions & 0 deletions pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* The MIT License
*
* Copyright (c) 2020 Fulcrum Genomics LLC
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in
* all copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
* THE SOFTWARE.
*/
package dagr.pipelines

import java.nio.file.Files

import com.fulcrumgenomics.sopt.{arg, clp}
import dagr.core.cmdline.Pipelines
import dagr.core.config.Configuration
import dagr.core.execsystem.{Cores, Memory}
import dagr.core.tasksystem._
import dagr.tasks.DagrDef._
import dagr.tasks.bwa.BwaMem
import dagr.tasks.picard.SortSam
import htsjdk.samtools.SAMFileHeader.SortOrder

/**
* generate bams based on input vcfs
*/
@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines])
class MapAndSort
(@arg(flag = 'i', doc = "Input fastq for mapping.") val fastqs: Seq[PathToFastq],
@arg(flag = 'r', doc = "Reference fasta.") val ref: PathToFasta,
@arg(flag = 'o', doc = "Output directory.") val out: DirPath,
@arg(flag = 'p', doc = "Output file prefix.") val prefix: String,

) extends Pipeline(Some(out)) {

override def build(): Unit = {

Files.createDirectories(out)

fastqs.foreach(f => {

val bwa = new myBwaMem(fastq = f, out = Some(out.resolve(prefix + f.getFileName + ".tmp.sam")), ref = ref)
val sort = new mySortSam(inn = bwa.out.get, outt = out.resolve(prefix + f.getFileName + ".sorted.bam"))

root ==> bwa ==> sort

})
}


private class mySortSam(val inn: PathToBam, val outt: PathToBam) extends
SortSam(in = inn, outt, SortOrder.coordinate) with
Configuration {
requires(Cores(1), Memory("2G"))
}

private class myBwaMem(val fastq: PathToFastq,
val out: Option[PathToBam],
val ref: PathToFasta) extends
BwaMem(
fastq = fastq,
out = out,
ref = ref,
maxThreads = 2,
memory = Memory("4G"))


}
Loading