From e0aab001054da689ca01feaa9564e9828c52f38e Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Tue, 1 Dec 2020 23:25:35 -0500 Subject: [PATCH 01/13] WIP. getting coverage, downsampling and getting ready to merge and call vbid --- .../ContaminateAndEvaluateVBID.scala | 143 ++++++++++++++++++ .../dagr/pipelines/ExamplePipeline.scala | 8 +- .../main/scala/dagr/tasks/misc/GetYield.scala | 41 +++++ .../main/scala/dagr/tasks/misc/LinkFile.scala | 33 ++++ 4 files changed, 223 insertions(+), 2 deletions(-) create mode 100644 pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala create mode 100644 tasks/src/main/scala/dagr/tasks/misc/GetYield.scala create mode 100644 tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala new file mode 100644 index 00000000..90631e15 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -0,0 +1,143 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import dagr.commons.io.Io +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem.{EitherTask, Linker, NoOpInJvmTask, NoOpTask, Pipeline, SimpleInJvmTask} +import dagr.sopt.{arg, clp} +import dagr.tasks.DagrDef.{DirPath, FilePath, PathToBam, PathToFasta, PathToFastq, PathToIntervals} +import dagr.tasks.bwa.BwaMem +import dagr.tasks.misc.{LinkFile, MoveFile} +import dagr.tasks.picard.{CollectHsMetrics, CollectMultipleMetrics, DeleteBam, DownsampleSam, DownsamplingStrategy, MarkDuplicates, SortSam} +import htsjdk.samtools.SAMFileHeader.SortOrder +import htsjdk.samtools.metrics.MetricsFile +import picard.analysis.CollectMultipleMetrics.Program +import picard.analysis.directed.HsMetrics + +import scala.collection.mutable + +/** + * Downsample Bams to create composite contaminated bams, + * then downsample to various depths + * and run VBID with different number of target snps to evaluate + * dependance of VBID on contamination & depth & number of target snps + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class ContaminateAndEvaluateVBID +(@arg(flag = "i", doc = "Input bams.") val bams: Seq[PathToBam], + @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = "o", doc = "Output directory.") val out: DirPath, + @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, + @arg(flag = "p", doc = "Output file prefix.") val prefix: String, + @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], + @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], + @arg(flag = "m", doc = "Number of markers to use.") val markers: Seq[Integer] + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + val strat = Some(DownsamplingStrategy.HighAccuracy) + + val bam = out.resolve(prefix + ".bam") + val tmpBam = out.resolve(prefix + ".tmp.bam") + val metricsPrefix: Some[DirPath] = Some(out.resolve(prefix)) + Files.createDirectories(out) + val bamYield = new mutable.HashMap[PathToBam, Integer] + + // Collect the median target coverage in each of the inputs and put result in a map + val yieldsCollected = new NoOpTask() + root ==> yieldsCollected + + bams.map(bam => { + val prefixPath: Path = out.resolve(prefix + bam.getFileName) + val metricPath: Path = out.resolve(prefix + bam.getFileName + ".qualityYieldMetrics") + val hsMetrics = new CollectHsMetrics(in = bam, ref = ref, targets = targets) + + val collectYield = new CollectMultipleMetrics(in = bam, prefix = Some(prefixPath), + ref = ref, programs = Seq(Program.CollectQualityYieldMetrics)) + val fetchMedian = new FetchMedianCoverage(metricPath) + root ==> hsMetrics ==> fetchMedian ==> SimpleInJvmTask(bamYield.put(bam, fetchMedian.medianCoverage.getOrElse(0))) ==> yieldsCollected + }) + + val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) + + pairsOfBam.foreach { case (x, y) => { + contaminations.foreach(c => { + // create a mixture of c x + (1-c) y taking into account the depths of + // x and y + // D_x and D_y are the effective depths of x and y + // if we are to downsample x we need x to be downsampled to c/(1-c) + // if we are to downsample y we need y to be downsampled to (1-c)/c + // of of those two values will be less than 1. + val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("32g")) + val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("32g")) + + val copyX = new LinkFile(x, dsX) + val copyY = new LinkFile(y, dsY) + + val downsample = EitherTask.of(downsampleX, downsampleY, c / (1 - c) <= 1) :: + EitherTask.of(copyY, copyX, c / (1 - c) <= 1) + + }) + } + } + + depths.foreach(depth => { + con + + + }) + val bwa = new BwaMem(fastq = fastq, ref = ref) + val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { + requires(Cores(2), Memory("2G")) + } + val mark = new MarkDuplicates(inputs = Seq(tmpBam), out = Some(bam), prefix = metricsPrefix) with Configuration { + requires(Cores(1), Memory("2G")) + } + val rmtmp = new DeleteBam(tmpBam) + + root ==> (bwa | sort) ==> mark ==> rmtmp + targets.foreach(path => root ==> new CollectHsMetrics(in = bam, prefix = metricsPrefix, targets = path, ref = ref)) + } + +} + +/** Pops open the HS metrics file and reads out the median coverage. */ +private class FetchMedianCoverage(hsMetrics: FilePath) extends SimpleInJvmTask { + var medianCoverage: Option[Int] = None + + override def run(): Unit = { + val mfile = new MetricsFile[HsMetrics, java.lang.Integer] + mfile.read(Io.toReader(hsMetrics)) + val median = mfile.getMetrics.map(hs => hs.MEDIAN_TARGET_COVERAGE).max + this.medianCoverage = Some(median.toInt) + } +} + + diff --git a/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala b/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala index 804a9e37..745bb186 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala @@ -33,6 +33,8 @@ import dagr.tasks.DagrDef import dagr.tasks.bwa.BwaMem import dagr.tasks.picard.{CollectHsMetrics, DeleteBam, MarkDuplicates, SortSam} import DagrDef.{DirPath, PathToFasta, PathToFastq, PathToIntervals} +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} import htsjdk.samtools.SAMFileHeader.SortOrder /** @@ -54,8 +56,10 @@ class ExamplePipeline Files.createDirectories(out) val bwa = new BwaMem(fastq=fastq, ref=ref) - val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate) - val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix) + val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate) with Configuration { + requires(Cores(2), Memory("2G"))} + val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix)with Configuration { + requires(Cores(1), Memory("2G"))} val rmtmp = new DeleteBam(tmpBam) root ==> (bwa | sort) ==> mark ==> rmtmp diff --git a/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala new file mode 100644 index 00000000..c9b62153 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala @@ -0,0 +1,41 @@ +/* + * The MIT License + * + * Copyright (c) 2015 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.tasksystem.SimpleInJvmTask +import htsjdk.samtools.metrics.MetricsFile +import picard.analysis.CollectQualityYieldMetrics.QualityYieldMetrics + +/** In JVM Task that pulls the total number of reads from the quality yield metrics file. */ +class GetYield(qualityYieldMetrics: Path) extends SimpleInJvmTask { + this.name = "GetYield" + var count: Integer = 0 + + override def run(): Unit = { + val metrics = MetricsFile.readBeans(qualityYieldMetrics.toFile).get(0).asInstanceOf[QualityYieldMetrics] + count = metrics.PF_READS + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala new file mode 100644 index 00000000..79df9fb8 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala @@ -0,0 +1,33 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.tasksystem.ShellCommand + +/** + * Uses the shell to (soft) link a file or directory to another alias + */ +class LinkFile(val from: Path, val to: Path) extends ShellCommand("ln", "-s", from.toString, to.toString) From d828e56b7aa54b38f6751a8de6ed0f098c4d08e5 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Thu, 3 Dec 2020 23:37:04 -0500 Subject: [PATCH 02/13] WIP. nearly there. need to hook up vcf for VBID and figure out how to appropriately downsample it using the targets. --- .../dagr/core/tasksystem/EitherTask.scala | 29 ++++- .../ContaminateAndEvaluateVBID.scala | 103 ++++++++++-------- 2 files changed, 79 insertions(+), 53 deletions(-) diff --git a/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala b/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala index e7a42b15..c051fcef 100644 --- a/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala +++ b/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala @@ -24,8 +24,11 @@ package dagr.core.tasksystem object EitherTask { + sealed trait Choice - object Left extends Choice + + object Left extends Choice + object Right extends Choice /** @@ -48,17 +51,33 @@ object EitherTask { * @param goLeft an expression that returns a Boolean, with true indicating Left and false indicating Right */ def of(left: Task, right: Task, goLeft: => Boolean): EitherTask = new EitherTask(left, right, () => if (goLeft) Left else Right) + + def of(left: Iterable[Task], right: Iterable[Task], goLeft: => Boolean): EitherTask = { + + val leftPipeline = new Pipeline { + def build(): Unit = { + left.foreach(root ==> _) + } + } + val rightPipeline = new Pipeline { + def build(): Unit = { + right.foreach(root ==> _) + } + } + + new EitherTask(leftPipeline, rightPipeline, () => if (goLeft) Left else Right) + } } /** A task that returns either the left or right task based on a deferred choice. The choice function is * not evaluated until all dependencies have been met and the `EitherTask` needs to make a decision about - * which task to return from [[getTasks]. + * which task to return from [[getTasks]]. * - * @param left the left task. - * @param right the right task + * @param left the left task. + * @param right the right task * @param choice an expression that returns either Left or Right when invoked */ -class EitherTask private (private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task { +class EitherTask private(private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task { /** Decides which task to return based on `choice` at execution time. */ override def getTasks: Iterable[Task] = Seq(if (choice() eq EitherTask.Left) left else right) } diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index 90631e15..f7a5a270 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -25,21 +25,19 @@ package dagr.pipelines import java.nio.file.{Files, Path} -import dagr.commons.io.Io +//import dagr.commons.io.Io import dagr.core.cmdline.Pipelines -import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} -import dagr.core.tasksystem.{EitherTask, Linker, NoOpInJvmTask, NoOpTask, Pipeline, SimpleInJvmTask} +import dagr.core.tasksystem.{EitherTask, Pipeline, SimpleInJvmTask} import dagr.sopt.{arg, clp} -import dagr.tasks.DagrDef.{DirPath, FilePath, PathToBam, PathToFasta, PathToFastq, PathToIntervals} -import dagr.tasks.bwa.BwaMem -import dagr.tasks.misc.{LinkFile, MoveFile} -import dagr.tasks.picard.{CollectHsMetrics, CollectMultipleMetrics, DeleteBam, DownsampleSam, DownsamplingStrategy, MarkDuplicates, SortSam} +import dagr.tasks.DagrDef._ +import dagr.tasks.misc.LinkFile +import dagr.tasks.picard.{CollectHsMetrics, DownsampleSam, DownsamplingStrategy, MergeSamFiles} +//import htsjdk.samtools.metrics.MetricsFile +//import _root_.picard.analysis.directed.HsMetrics import htsjdk.samtools.SAMFileHeader.SortOrder -import htsjdk.samtools.metrics.MetricsFile -import picard.analysis.CollectMultipleMetrics.Program -import picard.analysis.directed.HsMetrics +//import scala.collection.JavaConversions._ import scala.collection.mutable /** @@ -68,27 +66,22 @@ class ContaminateAndEvaluateVBID val tmpBam = out.resolve(prefix + ".tmp.bam") val metricsPrefix: Some[DirPath] = Some(out.resolve(prefix)) Files.createDirectories(out) - val bamYield = new mutable.HashMap[PathToBam, Integer] - - // Collect the median target coverage in each of the inputs and put result in a map - val yieldsCollected = new NoOpTask() - root ==> yieldsCollected + val bamYield = new mutable.HashMap[PathToBam, Int] bams.map(bam => { - val prefixPath: Path = out.resolve(prefix + bam.getFileName) val metricPath: Path = out.resolve(prefix + bam.getFileName + ".qualityYieldMetrics") - val hsMetrics = new CollectHsMetrics(in = bam, ref = ref, targets = targets) + val hsMetrics = new CollectHsMetrics(in = bam, ref = ref, targets = targets, prefix = Some(out.resolve(prefix))) - val collectYield = new CollectMultipleMetrics(in = bam, prefix = Some(prefixPath), - ref = ref, programs = Seq(Program.CollectQualityYieldMetrics)) val fetchMedian = new FetchMedianCoverage(metricPath) - root ==> hsMetrics ==> fetchMedian ==> SimpleInJvmTask(bamYield.put(bam, fetchMedian.medianCoverage.getOrElse(0))) ==> yieldsCollected + root ==> hsMetrics ==> fetchMedian ==> SimpleInJvmTask(bamYield.put(bam, fetchMedian.medianCoverage.getOrElse(0))) }) val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) pairsOfBam.foreach { case (x, y) => { contaminations.foreach(c => { + val dsX = out.resolve(prefix + x.getFileName + ".forCont." + c + ".tmp.bam") + val dsY = out.resolve(prefix + y.getFileName + ".forCont." + c + ".tmp.bam") // create a mixture of c x + (1-c) y taking into account the depths of // x and y // D_x and D_y are the effective depths of x and y @@ -100,44 +93,58 @@ class ContaminateAndEvaluateVBID val copyX = new LinkFile(x, dsX) val copyY = new LinkFile(y, dsY) + val downsampleX_Q = c / (1 - c) <= 1 + val downsample = EitherTask.of( + downsampleX :: copyY :: Nil, + downsampleY :: copyX :: Nil, + downsampleX_Q) - val downsample = EitherTask.of(downsampleX, downsampleY, c / (1 - c) <= 1) :: - EitherTask.of(copyY, copyX, c / (1 - c) <= 1) + val xFactor = if (downsampleX_Q) c / (1 - c) else 1 + val yFactor = if (downsampleX_Q) 1 else (1 - c) / c - }) - } - } + val resultantDepth: Double = bamYield.getOrElse(x, 0) * xFactor + bamYield.getOrElse(y, 0) * yFactor - depths.foreach(depth => { - con + val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + + "__" + c + ".bam") + val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) + root ==> downsample ==> mergedownsampled - }) - val bwa = new BwaMem(fastq = fastq, ref = ref) - val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { - requires(Cores(2), Memory("2G")) - } - val mark = new MarkDuplicates(inputs = Seq(tmpBam), out = Some(bam), prefix = metricsPrefix) with Configuration { - requires(Cores(1), Memory("2G")) - } - val rmtmp = new DeleteBam(tmpBam) + depths filter (d => d <= resultantDepth) foreach { d => { + val outDownsampled = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + + "__" + c + "__target__" + d + ".bam") + val downsampleMerged = new DownsampleSam(in = xyContaminatedBam, out = outDownsampled, proportion = d / resultantDepth) - root ==> (bwa | sort) ==> mark ==> rmtmp - targets.foreach(path => root ==> new CollectHsMetrics(in = bam, prefix = metricsPrefix, targets = path, ref = ref)) - } + mergedownsampled ==> downsampleMerged + markers.foreach { markerCount => {} -} + // subset VBID resource files to smaller number of markers + + // call VBID using subsetted markers -/** Pops open the HS metrics file and reads out the median coverage. */ -private class FetchMedianCoverage(hsMetrics: FilePath) extends SimpleInJvmTask { - var medianCoverage: Option[Int] = None + } + + } + } + }) - override def run(): Unit = { - val mfile = new MetricsFile[HsMetrics, java.lang.Integer] - mfile.read(Io.toReader(hsMetrics)) - val median = mfile.getMetrics.map(hs => hs.MEDIAN_TARGET_COVERAGE).max - this.medianCoverage = Some(median.toInt) + } + } } } +// +///** Pops open the HS metrics file and reads out the median coverage. */ +//private class FetchMedianCoverage(hsMetrics: FilePath) extends SimpleInJvmTask { +// var medianCoverage: Option[Int] = None +// +// override def run(): Unit = { +// val mfile = new MetricsFile[HsMetrics, java.lang.Integer] +// mfile.read(Io.toReader(hsMetrics)) +// val median = mfile.getMetrics.map(hs => hs.MEDIAN_TARGET_COVERAGE).max +// this.medianCoverage = Some(median.toInt) +// } +//} + + From c20cc4f5371769637a9edb94e6660cfb1997cebb Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Thu, 3 Dec 2020 23:37:54 -0500 Subject: [PATCH 03/13] cleanup --- .../pipelines/ContaminateAndEvaluateVBID.scala | 17 +---------------- 1 file changed, 1 insertion(+), 16 deletions(-) diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index f7a5a270..eb002750 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -25,7 +25,6 @@ package dagr.pipelines import java.nio.file.{Files, Path} -//import dagr.commons.io.Io import dagr.core.cmdline.Pipelines import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem.{EitherTask, Pipeline, SimpleInJvmTask} @@ -33,11 +32,8 @@ import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ import dagr.tasks.misc.LinkFile import dagr.tasks.picard.{CollectHsMetrics, DownsampleSam, DownsamplingStrategy, MergeSamFiles} -//import htsjdk.samtools.metrics.MetricsFile -//import _root_.picard.analysis.directed.HsMetrics import htsjdk.samtools.SAMFileHeader.SortOrder -//import scala.collection.JavaConversions._ import scala.collection.mutable /** @@ -132,18 +128,7 @@ class ContaminateAndEvaluateVBID } } } -// -///** Pops open the HS metrics file and reads out the median coverage. */ -//private class FetchMedianCoverage(hsMetrics: FilePath) extends SimpleInJvmTask { -// var medianCoverage: Option[Int] = None -// -// override def run(): Unit = { -// val mfile = new MetricsFile[HsMetrics, java.lang.Integer] -// mfile.read(Io.toReader(hsMetrics)) -// val median = mfile.getMetrics.map(hs => hs.MEDIAN_TARGET_COVERAGE).max -// this.medianCoverage = Some(median.toInt) -// } -//} + From f8b2db37914c3be4a644c5a68a1975d89a17b698 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Sat, 5 Dec 2020 02:20:17 -0500 Subject: [PATCH 04/13] added simulation of reads --- .../ContaminateAndEvaluateVBID.scala | 26 ++++--- .../main/scala/dagr/tasks/misc/DWGSim.scala | 69 +++++++++++++++++++ 2 files changed, 86 insertions(+), 9 deletions(-) create mode 100644 tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index eb002750..80b1a489 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -25,13 +25,16 @@ package dagr.pipelines import java.nio.file.{Files, Path} +import dagr.commons.io.Io import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem.{EitherTask, Pipeline, SimpleInJvmTask} import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ -import dagr.tasks.misc.LinkFile -import dagr.tasks.picard.{CollectHsMetrics, DownsampleSam, DownsamplingStrategy, MergeSamFiles} +import dagr.tasks.bwa.BwaMem +import dagr.tasks.misc.{DWGSim, LinkFile} +import dagr.tasks.picard.{CollectHsMetrics, DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} import htsjdk.samtools.SAMFileHeader.SortOrder import scala.collection.mutable @@ -44,7 +47,7 @@ import scala.collection.mutable */ @clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) class ContaminateAndEvaluateVBID -(@arg(flag = "i", doc = "Input bams.") val bams: Seq[PathToBam], +(@arg(flag = "i", doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, @arg(flag = "o", doc = "Output directory.") val out: DirPath, @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, @@ -59,17 +62,22 @@ class ContaminateAndEvaluateVBID val strat = Some(DownsamplingStrategy.HighAccuracy) val bam = out.resolve(prefix + ".bam") - val tmpBam = out.resolve(prefix + ".tmp.bam") val metricsPrefix: Some[DirPath] = Some(out.resolve(prefix)) Files.createDirectories(out) val bamYield = new mutable.HashMap[PathToBam, Int] + val coverage = 10000 + val bams:Seq[PathToBam]=vcfs.map(vcf => { - bams.map(bam => { - val metricPath: Path = out.resolve(prefix + bam.getFileName + ".qualityYieldMetrics") - val hsMetrics = new CollectHsMetrics(in = bam, ref = ref, targets = targets, prefix = Some(out.resolve(prefix))) + val simulate = new DWGSim(vcf = vcf, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) + val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref) + val tmpBam = out.resolve(prefix + vcf.getFileName + ".tmp.bam") + val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { + requires(Cores(2), Memory("2G")) + } - val fetchMedian = new FetchMedianCoverage(metricPath) - root ==> hsMetrics ==> fetchMedian ==> SimpleInJvmTask(bamYield.put(bam, fetchMedian.medianCoverage.getOrElse(0))) + root ==> simulate ==> (bwa | sort) + + tmpBam }) val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) diff --git a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala new file mode 100644 index 00000000..a7f4bd78 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala @@ -0,0 +1,69 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem.{FixedResources, ProcessTask} +import dagr.tasks.DagrDef.{PathToFasta, PathToFastq, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Class for running DWGSim to generate simulated sequencing data + */ +class DWGSim(val vcf: PathToVcf, + val fasta: PathToFasta, + val outPrefix: Path, + val depth: Int = 100, + val iSize: Int = 350, + val iSizeDev: Int = 100, + val readLength: Double = 151, + val randomSeed: Int = 42, // -1 to use current time + val coverageTarget: PathToIntervals, + + ) extends ProcessTask with Configuration with FixedResources { + + requires(Cores(1), Memory("2G")) + private val dwgsim: Path = configureExecutable("wgsim.executable", "wgsim") + + val outputPairedFastq: PathToFastq = outPrefix.getParent.resolve(outPrefix.getFileName + ".bfast.fastq") + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any]() + buffer ++= dwgsim :: Nil + buffer ++= "-v" :: vcf :: Nil + buffer ++= "-d" :: iSize :: Nil + buffer ++= "-s" :: iSizeDev :: Nil + buffer ++= "-C" :: depth :: Nil + buffer ++= "-1" :: readLength :: Nil + buffer ++= "-2" :: readLength :: Nil + buffer ++= "-z" :: randomSeed :: Nil + buffer ++= "-x" :: coverageTarget :: Nil // this might need to be converted to a bed file + buffer ++= fasta :: outPrefix :: Nil + buffer.toList + } +} From f0e498f9ec73b6947c1366adeffd247209de46a7 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Sun, 6 Dec 2020 01:55:00 -0500 Subject: [PATCH 05/13] getting closer! still waiting to generate reads. --- .../ContaminateAndEvaluateVBID.scala | 69 ++++++++++++++----- src/main/resources/example.conf | 2 +- .../scala/dagr/tasks/gatk/Gatk4Task.scala | 65 +++++++++++++++++ .../tasks/gatk/LeftAlignAndTrimVariants.scala | 41 +++++++++++ .../main/scala/dagr/tasks/misc/DWGSim.scala | 11 +-- .../main/scala/dagr/tasks/misc/LinkFile.scala | 2 +- 6 files changed, 166 insertions(+), 24 deletions(-) create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index 80b1a489..c5825f2e 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -1,7 +1,7 @@ /* * The MIT License * - * Copyright (c) 2016 Fulcrum Genomics LLC + * Copyright (c) 2020 Fulcrum Genomics LLC * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -28,13 +28,14 @@ import java.nio.file.{Files, Path} import dagr.commons.io.Io import dagr.core.cmdline.Pipelines import dagr.core.config.Configuration -import dagr.core.execsystem.{Cores, Memory} -import dagr.core.tasksystem.{EitherTask, Pipeline, SimpleInJvmTask} +import dagr.core.execsystem.{Cores, Memory, ResourceSet} +import dagr.core.tasksystem.{EitherTask, FixedResources, NoOpInJvmTask, Pipeline, ProcessTask} import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk.LeftAlignAndTrimVariants import dagr.tasks.misc.{DWGSim, LinkFile} -import dagr.tasks.picard.{CollectHsMetrics, DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} import htsjdk.samtools.SAMFileHeader.SortOrder import scala.collection.mutable @@ -54,28 +55,62 @@ class ContaminateAndEvaluateVBID @arg(flag = "p", doc = "Output file prefix.") val prefix: String, @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], - @arg(flag = "m", doc = "Number of markers to use.") val markers: Seq[Integer] + @arg(flag = "m", doc = "Number of markers to use.") val markers: Seq[Integer], + @arg(name = "header", doc = "header line for dp.") val dpHeader: Path, + @arg(name = "dp-bed", doc = "bed file with dp values") val pathToBed: Path ) extends Pipeline(Some(out)) { override def build(): Unit = { val strat = Some(DownsamplingStrategy.HighAccuracy) - val bam = out.resolve(prefix + ".bam") - val metricsPrefix: Some[DirPath] = Some(out.resolve(prefix)) Files.createDirectories(out) val bamYield = new mutable.HashMap[PathToBam, Int] - val coverage = 10000 + val coverage = 100 + val makeBamsLoop = new NoOpInJvmTask("made bams") + val bams:Seq[PathToBam]=vcfs.map(vcf => { - val simulate = new DWGSim(vcf = vcf, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref) + val normalize = new LeftAlignAndTrimVariants(in=vcf, out=out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref=ref, intervals = Some(targets), splitMultiAlleic = Some(true)) + + val rando = new ProcessTask with FixedResources { + requires(Cores(1), Memory("1G")) + + override val name = "move dp over to vcf" + + val outVcf=out.resolve(prefix + vcf.getFileName + ".normalized.withdp.vcf") + /** + * Abstract method that must be implemented by child classes to return a list or similar traversable + * list of command line elements (command name and arguments) that form the command line to be run. + * Individual values will be converted to Strings before being used by calling toString. + */ + override def args: Seq[Any] = "bcftools" :: "annotate" :: + "-a" :: pathToBed :: + "-h" :: dpHeader :: + "-c" :: "CHROM,FROM,TO,dp" :: + "-O" :: outVcf :: + normalize.out :: Nil + + /** + * Given a non-null ResourceSet representing the available resources at this moment in time + * return either a ResourceSet that is a subset of the available resources in which the task + * can run, or None if the task cannot run in the available resources. + * + * @param availableResources The system resources available to the task + * @return Either a ResourceSet of the desired subset of resources to run with, or None + */ + override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = None + } + + + val simulate = new DWGSim(vcf = rando.outVcf, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) + val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref,maxThreads = 1, memory = Memory("2G")) val tmpBam = out.resolve(prefix + vcf.getFileName + ".tmp.bam") val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { - requires(Cores(2), Memory("2G")) + requires(Cores(1), Memory("2G")) } - root ==> simulate ==> (bwa | sort) + root ==> normalize ==> rando ==> simulate ==> (bwa | sort) ==> makeBamsLoop tmpBam }) @@ -92,8 +127,8 @@ class ContaminateAndEvaluateVBID // if we are to downsample x we need x to be downsampled to c/(1-c) // if we are to downsample y we need y to be downsampled to (1-c)/c // of of those two values will be less than 1. - val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("32g")) - val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("32g")) + val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("2g")) + val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("2g")) val copyX = new LinkFile(x, dsX) val copyY = new LinkFile(y, dsY) @@ -112,7 +147,7 @@ class ContaminateAndEvaluateVBID "__" + c + ".bam") val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) - root ==> downsample ==> mergedownsampled + makeBamsLoop ==> downsample ==> mergedownsampled depths filter (d => d <= resultantDepth) foreach { d => { val outDownsampled = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + @@ -120,13 +155,13 @@ class ContaminateAndEvaluateVBID val downsampleMerged = new DownsampleSam(in = xyContaminatedBam, out = outDownsampled, proportion = d / resultantDepth) mergedownsampled ==> downsampleMerged - markers.foreach { markerCount => {} +// markers.foreach { markerCount => {} // subset VBID resource files to smaller number of markers // call VBID using subsetted markers - } +// } } } diff --git a/src/main/resources/example.conf b/src/main/resources/example.conf index 2378aed0..0f72a79d 100644 --- a/src/main/resources/example.conf +++ b/src/main/resources/example.conf @@ -2,7 +2,7 @@ # for the default set of tasks. This should be edited for your local # setup. -# DAGR Configuration - this section can be omited entirely from application configuration +# DAGR Configuration - this section can be omitted entirely from application configuration # files as dagr's reference.conf will provide sensible defaults where they are needed. dagr = { command-line-name = "dagr" diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala new file mode 100644 index 00000000..ec5ac573 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala @@ -0,0 +1,65 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.nio.file.Path + +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem.{FixedResources, ProcessTask} +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals} +import dagr.tasks.{DagrDef, JarTask} + +import scala.collection.mutable.ListBuffer + +object Gatk4Task { + val GatkJarPathConfigKey = "gatk4.jar" +} + +/** + * Abstract base class for tasks that involve running the GATK. + */ +abstract class Gatk4Task(val walker: String, + val ref: PathToFasta, + val intervals: Option[PathToIntervals] = None, + val bamCompression: Option[Int] = None) + extends ProcessTask with JarTask with FixedResources with Configuration { + requires(Cores(1), Memory("4g")) + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any]() + buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory)) + buffer.append(this.walker) + buffer.append("-R", this.ref.toAbsolutePath.toString) + intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString)) + bamCompression.foreach(c => buffer.append("--bam_compression", c)) + addWalkerArgs(buffer) + buffer + } + + /** Can be overridden to use a specific GATK jar. */ + protected def gatkJar: Path = configure[Path](Gatk4Task.GatkJarPathConfigKey) + + protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala new file mode 100644 index 00000000..6ffa28b0 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala @@ -0,0 +1,41 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class LeftAlignAndTrimVariants(val in: PathToVcf, val out: PathToVcf, ref: PathToFasta, intervals: Option[PathToIntervals], splitMultiAlleic: Option[Boolean]) + extends Gatk4Task(walker = "LeftAlignAndTrimVariants", ref = ref, intervals = intervals) { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", out) + splitMultiAlleic foreach { _ => buffer.append("--split-multi-allelics")} + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala index a7f4bd78..424deee7 100644 --- a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala +++ b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala @@ -41,14 +41,14 @@ class DWGSim(val vcf: PathToVcf, val depth: Int = 100, val iSize: Int = 350, val iSizeDev: Int = 100, - val readLength: Double = 151, + val readLength: Int = 151, val randomSeed: Int = 42, // -1 to use current time - val coverageTarget: PathToIntervals, + val coverageTarget: PathToIntervals - ) extends ProcessTask with Configuration with FixedResources { + ) extends ProcessTask with Configuration with FixedResources { requires(Cores(1), Memory("2G")) - private val dwgsim: Path = configureExecutable("wgsim.executable", "wgsim") + private val dwgsim: Path = configureExecutable("dwgsim.executable", "dwgsim") val outputPairedFastq: PathToFastq = outPrefix.getParent.resolve(outPrefix.getFileName + ".bfast.fastq") @@ -62,7 +62,8 @@ class DWGSim(val vcf: PathToVcf, buffer ++= "-1" :: readLength :: Nil buffer ++= "-2" :: readLength :: Nil buffer ++= "-z" :: randomSeed :: Nil - buffer ++= "-x" :: coverageTarget :: Nil // this might need to be converted to a bed file + buffer ++= "-x" :: coverageTarget :: Nil + buffer ++= "-y" :: "1.0" :: Nil buffer ++= fasta :: outPrefix :: Nil buffer.toList } diff --git a/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala index 79df9fb8..03ec9bef 100644 --- a/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala +++ b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala @@ -30,4 +30,4 @@ import dagr.core.tasksystem.ShellCommand /** * Uses the shell to (soft) link a file or directory to another alias */ -class LinkFile(val from: Path, val to: Path) extends ShellCommand("ln", "-s", from.toString, to.toString) +class LinkFile(val from: Path, val to: Path) extends ShellCommand("ln", "-sf", from.toString, to.toString) From 70d6732c116bee6af081f1c1cc5617f83e8f93b2 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Thu, 11 Feb 2021 00:46:34 -0500 Subject: [PATCH 06/13] working out the kinks in this pipeline..... --- .../ContaminateAndEvaluateVBID.scala | 125 +++++++++++------- .../scala/dagr/tasks/gatk/CountVariants.scala | 71 ++++++++++ .../dagr/tasks/gatk/DownsampleVariants.scala | 14 ++ .../scala/dagr/tasks/gatk/Gatk4Task.scala | 12 +- .../scala/dagr/tasks/gatk/IndexVariants.scala | 39 ++++++ .../tasks/gatk/LeftAlignAndTrimVariants.scala | 6 +- .../dagr/tasks/gatk/VariantsToTable.scala | 47 +++++++ .../main/scala/dagr/tasks/misc/DWGSim.scala | 3 +- 8 files changed, 260 insertions(+), 57 deletions(-) create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala create mode 100644 tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index c5825f2e..210c4512 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -25,20 +25,21 @@ package dagr.pipelines import java.nio.file.{Files, Path} -import dagr.commons.io.Io +import dagr.commons.io.{Io, PathUtil} import dagr.core.cmdline.Pipelines import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory, ResourceSet} -import dagr.core.tasksystem.{EitherTask, FixedResources, NoOpInJvmTask, Pipeline, ProcessTask} +import dagr.core.tasksystem.{EitherTask, FixedResources, Linker, NoOpInJvmTask, Pipeline, ProcessTask} import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ import dagr.tasks.bwa.BwaMem -import dagr.tasks.gatk.LeftAlignAndTrimVariants -import dagr.tasks.misc.{DWGSim, LinkFile} +import dagr.tasks.gatk.{CountVariants, DownsampleVariants, Gatk4Task, GatkTask, IndexVariants, LeftAlignAndTrimVariants, VariantsToTable} +import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} import htsjdk.samtools.SAMFileHeader.SortOrder import scala.collection.mutable +import scala.collection.mutable.ListBuffer /** * Downsample Bams to create composite contaminated bams, @@ -56,8 +57,8 @@ class ContaminateAndEvaluateVBID @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], @arg(flag = "m", doc = "Number of markers to use.") val markers: Seq[Integer], - @arg(name = "header", doc = "header line for dp.") val dpHeader: Path, - @arg(name = "dp-bed", doc = "bed file with dp values") val pathToBed: Path + @arg(flag = "R", doc = "VBID vcf resource.") val vbidResource: PathToVcf, + @arg(flag = "H", doc = "header line for dp.") val dpHeader: Path ) extends Pipeline(Some(out)) { @@ -69,52 +70,34 @@ class ContaminateAndEvaluateVBID val coverage = 100 val makeBamsLoop = new NoOpInJvmTask("made bams") - val bams:Seq[PathToBam]=vcfs.map(vcf => { - - val normalize = new LeftAlignAndTrimVariants(in=vcf, out=out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref=ref, intervals = Some(targets), splitMultiAlleic = Some(true)) - - val rando = new ProcessTask with FixedResources { - requires(Cores(1), Memory("1G")) - - override val name = "move dp over to vcf" - - val outVcf=out.resolve(prefix + vcf.getFileName + ".normalized.withdp.vcf") - /** - * Abstract method that must be implemented by child classes to return a list or similar traversable - * list of command line elements (command name and arguments) that form the command line to be run. - * Individual values will be converted to Strings before being used by calling toString. - */ - override def args: Seq[Any] = "bcftools" :: "annotate" :: - "-a" :: pathToBed :: - "-h" :: dpHeader :: - "-c" :: "CHROM,FROM,TO,dp" :: - "-O" :: outVcf :: - normalize.out :: Nil - - /** - * Given a non-null ResourceSet representing the available resources at this moment in time - * return either a ResourceSet that is a subset of the available resources in which the task - * can run, or None if the task cannot run in the available resources. - * - * @param availableResources The system resources available to the task - * @return Either a ResourceSet of the desired subset of resources to run with, or None - */ - override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = None - } + val bams: Seq[PathToBam] = vcfs.map(vcf => { + + val toTable = new VariantsToTable(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".table"), fields = "CHROM" :: "POS" :: "HET" :: "HOM-VAR" :: Nil, intervals = Some(targets)) + val makeBed = new MakePLBed(table = toTable.out, out = out.resolve(prefix + "PL.bed")) + val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), + dpHeader = dpHeader, pathToBed = makeBed.out) - val simulate = new DWGSim(vcf = rando.outVcf, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref,maxThreads = 1, memory = Memory("2G")) + val normalize = new LeftAlignAndTrimVariants(in = rando.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) + val index = new IndexVariants(in = normalize.out) + val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) + val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref, maxThreads = 1, memory = Memory("2G")) val tmpBam = out.resolve(prefix + vcf.getFileName + ".tmp.bam") val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { requires(Cores(1), Memory("2G")) } - root ==> normalize ==> rando ==> simulate ==> (bwa | sort) ==> makeBamsLoop + root ==> toTable ==> makeBed ==> rando ==> normalize ==> index ==> simulate ==> (bwa | sort) ==> makeBamsLoop + + + // ~/gatk/gatk SelectVariants -V test2HG003_GRCh38_1_22_v4.2_benchmark.vcf.gz.normalized.vcf -O test2.vcf --sites-only-vcf-output tmpBam }) + val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count")) + root ==> countVariants + val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) pairsOfBam.foreach { case (x, y) => { @@ -155,24 +138,72 @@ class ContaminateAndEvaluateVBID val downsampleMerged = new DownsampleSam(in = xyContaminatedBam, out = outDownsampled, proportion = d / resultantDepth) mergedownsampled ==> downsampleMerged -// markers.foreach { markerCount => {} + markers.foreach { markerCount => { // subset VBID resource files to smaller number of markers - // call VBID using subsetted markers - -// } - + // this need to happen after the tool is run...need to figure out how to do that + val outDownsampledResource = out.resolve(prefix + "__" + vbidResource.getFileName + + "__" + markerCount + ".vcf") + + val downsample = new DownsampleVariants( + in = vbidResource, + output = outDownsampledResource, + proportion = 1) + + val vbid = new VerifyBamId( + vcf = outDownsampledResource, + bam = xyContaminatedBam, + out = out.resolve(PathUtil.replaceExtension(xyContaminatedBam, s".${markerCount}_markers.selfSM")) + ) + + (downsampleMerged :: countVariants) ==> + Linker(countVariants, downsample)((f, c) => c.downsampleProportion = markerCount / f.count) ==> + vbid + } + } } } }) - } } } } +// copies the value from the bed file to the vcf +// removed the PS field (since it's invalid) +private class CopyPS_FromBedToVcf(val in: PathToVcf, + val out: PathToVcf, + val pathToBed: Path, + val dpHeader: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "annotate" :: + "-a" :: pathToBed :: + "-h" :: dpHeader :: + "-c" :: "CHROM,FROM,TO,pl" :: + "-x" :: "FORMAT/PS" :: // remove the PS field since it's invalid + "--force" :: // needed since the input file is corrupt. + "-o" :: out :: + in :: Nil +} + +// copies the value from the bed file to the vcf +private class MakePLBed(val table: Path, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + private val awk: Path = configureExecutable("awk.executable", "awk") + + quoteIfNecessary = false + + override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: + table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala new file mode 100644 index 00000000..0f16fb3f --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala @@ -0,0 +1,71 @@ +/* + * The MIT License + * + * Copyright (c) 2021 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.lang.Long +import java.nio.file.Path + +import dagr.tasks.DagrDef.PathToVcf + +import scala.collection.mutable.ListBuffer +import scala.io.Source + +/** + * Runs the GATK walker that counts variants + */ +class CountVariants(val in: PathToVcf, val output: Path) + extends Gatk4Task(walker = "CountVariants") { + + var count = 0L + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", output) + } + + /** Finalize anything after the task has been run. + * + * This method should be called after a task has been run. The intended use of this method + * is to allow for any modification of this task prior to any dependent tasks being run. This + * would allow any parameters that were passed to dependent tasks as call-by-name to be + * finalized here. For example, we could have passed an Option[String] that is None + * until make it Some(String) in this method. Then when the dependent task's getTasks + * method is called, it can call 'get' on the option and get something. + * + * @param exitCode the exit code of the task, which could also be 1 due to the system terminating this process + * @return true if we c + */ + override def onComplete(exitCode: Int): Boolean = { + + val source = Source.fromFile(output.toFile) + + try { + count = Long.parseLong(source.getLines.mkString) + } finally { + source.close() + } + + super.onComplete(exitCode) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala new file mode 100644 index 00000000..82af5d78 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala @@ -0,0 +1,14 @@ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.PathToVcf +import scala.collection.mutable.ListBuffer + +class DownsampleVariants(in: PathToVcf, output: PathToVcf, proportion: Double) extends Gatk4Task(walker = "SelectVariants") { + + var downsampleProportion = proportion + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", output) + buffer.append("-factor", downsampleProportion) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala index ec5ac573..cec96ff5 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala @@ -40,18 +40,20 @@ object Gatk4Task { /** * Abstract base class for tasks that involve running the GATK. */ + + abstract class Gatk4Task(val walker: String, - val ref: PathToFasta, - val intervals: Option[PathToIntervals] = None, - val bamCompression: Option[Int] = None) + val ref: Option[PathToFasta] = None, + val intervals: Option[PathToIntervals] = None, + val bamCompression: Option[Int] = None) extends ProcessTask with JarTask with FixedResources with Configuration { requires(Cores(1), Memory("4g")) override def args: Seq[Any] = { val buffer = ListBuffer[Any]() - buffer.appendAll(jarArgs(this.gatkJar, jvmMemory=this.resources.memory)) + buffer.appendAll(jarArgs(this.gatkJar, jvmMemory = this.resources.memory)) buffer.append(this.walker) - buffer.append("-R", this.ref.toAbsolutePath.toString) + ref.foreach { r => buffer.append("-R", r.toAbsolutePath.toString) } intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString)) bamCompression.foreach(c => buffer.append("--bam_compression", c)) addWalkerArgs(buffer) diff --git a/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala new file mode 100644 index 00000000..b8cc8934 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala @@ -0,0 +1,39 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class IndexVariants(val in: PathToVcf) + extends Gatk4Task(walker = "IndexFeatureFile") { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-I", in) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala index 6ffa28b0..a74d5284 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala @@ -30,12 +30,12 @@ import scala.collection.mutable.ListBuffer /** * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones */ -class LeftAlignAndTrimVariants(val in: PathToVcf, val out: PathToVcf, ref: PathToFasta, intervals: Option[PathToIntervals], splitMultiAlleic: Option[Boolean]) - extends Gatk4Task(walker = "LeftAlignAndTrimVariants", ref = ref, intervals = intervals) { +class LeftAlignAndTrimVariants(val in: PathToVcf, val out: PathToVcf, ref: PathToFasta, splitMultiAlleic: Option[Boolean]) + extends Gatk4Task(walker = "LeftAlignAndTrimVariants", ref = Some(ref)) { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { buffer.append("-V", in) buffer.append("-O", out) - splitMultiAlleic foreach { _ => buffer.append("--split-multi-allelics")} + splitMultiAlleic foreach { _ => buffer.append("--split-multi-allelics") } } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala new file mode 100644 index 00000000..f5e3169b --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala @@ -0,0 +1,47 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.nio.file.Path + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class VariantsToTable(val in: PathToVcf, val out: Path, fields: Seq[String], intervals: Option[PathToIntervals]) + extends Gatk4Task(walker = "VariantsToTable", intervals = intervals) { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", out) + fields.foreach { f => { + buffer append "-F" + buffer append f + } + } + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala index 424deee7..e4062441 100644 --- a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala +++ b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala @@ -63,8 +63,7 @@ class DWGSim(val vcf: PathToVcf, buffer ++= "-2" :: readLength :: Nil buffer ++= "-z" :: randomSeed :: Nil buffer ++= "-x" :: coverageTarget :: Nil - buffer ++= "-y" :: "1.0" :: Nil - buffer ++= fasta :: outPrefix :: Nil + buffer ++= fasta :: outPrefix :: Nil //these two arguments must be last buffer.toList } } From 2cc1c83562711f21ba0290e79aade482caddb105 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Thu, 11 Feb 2021 22:53:53 -0500 Subject: [PATCH 07/13] - since there is no "call cacheing" I need to split this into several parts: -- simulate and generate bams -- downsample bam and contaminate bams -- downsample markers and estimate contamination --- .../ContaminateAndEvaluateVBID.scala | 35 ++++++++++++++----- 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala index 210c4512..1b619ff1 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala @@ -78,21 +78,21 @@ class ContaminateAndEvaluateVBID val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), dpHeader = dpHeader, pathToBed = makeBed.out) - val normalize = new LeftAlignAndTrimVariants(in = rando.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) + val subsetToPL = new subsetToPL(in=rando.out, out=out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) + val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) val index = new IndexVariants(in = normalize.out) val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new BwaMem(fastq = simulate.outputPairedFastq, ref = ref, maxThreads = 1, memory = Memory("2G")) - val tmpBam = out.resolve(prefix + vcf.getFileName + ".tmp.bam") - val sort = new SortSam(in = Io.StdIn, out = tmpBam, sortOrder = SortOrder.coordinate) with Configuration { + val bwa = new BwaMem(fastq = simulate.outputPairedFastq, out=Some(out.resolve(prefix + vcf.getFileName + ".tmp.bam")) , + ref = ref, maxThreads = 1, memory = Memory("2G")) + val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { requires(Cores(1), Memory("2G")) } - root ==> toTable ==> makeBed ==> rando ==> normalize ==> index ==> simulate ==> (bwa | sort) ==> makeBamsLoop + root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> + normalize ==> index ==> simulate ==> bwa ==> sort ==> makeBamsLoop - // ~/gatk/gatk SelectVariants -V test2HG003_GRCh38_1_22_v4.2_benchmark.vcf.gz.normalized.vcf -O test2.vcf --sites-only-vcf-output - - tmpBam + sort.out }) val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count")) @@ -185,12 +185,29 @@ private class CopyPS_FromBedToVcf(val in: PathToVcf, "-a" :: pathToBed :: "-h" :: dpHeader :: "-c" :: "CHROM,FROM,TO,pl" :: - "-x" :: "FORMAT/PS" :: // remove the PS field since it's invalid + "-x" :: "FORMAT" :: // remove the FORMAT Field + "-x" :: "^INFO/pl" :: // no need for all these annotations anyway "--force" :: // needed since the input file is corrupt. "-o" :: out :: in :: Nil } + +// copies the value from the bed file to the vcf +// removed the PS field (since it's invalid) +private class subsetToPL(val in: PathToVcf, val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "view" :: + "-i" :: "pl!=\".\"" :: + "-o" :: out :: + in :: Nil +} + + // copies the value from the bed file to the vcf private class MakePLBed(val table: Path, val out: Path From f48ab5cd21b22463248c25696de34e3d9049c018 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Thu, 11 Feb 2021 23:22:09 -0500 Subject: [PATCH 08/13] WIP --- .../dagr/pipelines/ContaminateBams.scala | 108 +++++++++++++ .../dagr/pipelines/SimulateVariants.scala | 144 ++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala create mode 100644 pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala new file mode 100644 index 00000000..12e6f7da --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala @@ -0,0 +1,108 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import dagr.commons.io.PathUtil +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem._ +import dagr.sopt.{arg, clp} +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk._ +import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import htsjdk.samtools.SAMFileHeader.SortOrder + +import scala.collection.mutable + +/** + * Downsample Bams to create composite contaminated bams, + * then downsample to various depths + * and run VBID with different number of target snps to evaluate + * dependance of VBID on contamination & depth & number of target snps + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class ContaminateAndEvaluateVBID +(@arg(flag = "i", doc = "Input bams for generating bams.") val bams: Seq[PathToBam], +// @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = "o", doc = "Output directory.") val out: DirPath, +// @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, + @arg(flag = "p", doc = "Output file prefix.") val prefix: String, + @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], + @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + val strat = Some(DownsamplingStrategy.HighAccuracy) + + Files.createDirectories(out) + val bamYield = new mutable.HashMap[PathToBam, Int] + + val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) + + pairsOfBam.foreach { case (x, y) => { + contaminations.foreach(c => { + val dsX = out.resolve(prefix + x.getFileName + ".forCont." + c + ".tmp.bam") + val dsY = out.resolve(prefix + y.getFileName + ".forCont." + c + ".tmp.bam") + // create a mixture of c x + (1-c) y taking into account the depths of + // x and y + // D_x and D_y are the effective depths of x and y + // if we are to downsample x we need x to be downsampled to c/(1-c) + // if we are to downsample y we need y to be downsampled to (1-c)/c + // of of those two values will be less than 1. + val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("2g")) + val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("2g")) + + val copyX = new LinkFile(x, dsX) + val copyY = new LinkFile(y, dsY) + val downsampleX_Q = c / (1 - c) <= 1 + val downsample = EitherTask.of( + downsampleX :: copyY :: Nil, + downsampleY :: copyX :: Nil, + downsampleX_Q) + + val xFactor = if (downsampleX_Q) c / (1 - c) else 1 + val yFactor = if (downsampleX_Q) 1 else (1 - c) / c + + val resultantDepth: Double = bamYield.getOrElse(x, 0) * xFactor + bamYield.getOrElse(y, 0) * yFactor + + val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + + "__" + c + ".bam") + val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) + + root ==> downsample ==> mergedownsampled + + }) + } + } + } +} + + + diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala new file mode 100644 index 00000000..f9ad235d --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -0,0 +1,144 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import dagr.commons.io.PathUtil +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem._ +import dagr.sopt.{arg, clp} +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk._ +import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import htsjdk.samtools.SAMFileHeader.SortOrder + +import scala.collection.mutable + +/** + * generate bams based on input vcfs + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class SimulateVariants +(@arg(flag = "i", doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], + @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = "o", doc = "Output directory.") val out: DirPath, + @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, + @arg(flag = "p", doc = "Output file prefix.") val prefix: String, + @arg(flag = "d", doc = "Depth value to simulate.") val depth: Integer, + @arg(flag = "H", doc = "header line for dp.") val dpHeader: Path + +) extends Pipeline(Some(out)) { + + var bams:Seq[PathToBam] = Nil + + override def build(): Unit = { + + Files.createDirectories(out) + val coverage = depth + + bams = vcfs.map(vcf => { + + val toTable = new VariantsToTable(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".table"), fields = "CHROM" :: "POS" :: "HET" :: "HOM-VAR" :: Nil, intervals = Some(targets)) + val makeBed = new MakePLBed(table = toTable.out, out = out.resolve(prefix + "PL.bed")) + + val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), + dpHeader = dpHeader, pathToBed = makeBed.out) + + val subsetToPL = new subsetToPL(in=rando.out, out=out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) + val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) + val index = new IndexVariants(in = normalize.out) + val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) + val bwa = new BwaMem(fastq = simulate.outputPairedFastq, out=Some(out.resolve(prefix + vcf.getFileName + ".tmp.bam")) , + ref = ref, maxThreads = 1, memory = Memory("2G")) + val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { + requires(Cores(1), Memory("2G")) + } + + root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> + normalize ==> index ==> simulate ==> bwa ==> sort + + sort.out + }) + } +} + +// copies the value from the bed file to the vcf +// removed the PS field (since it's invalid) +private class CopyPS_FromBedToVcf(val in: PathToVcf, + val out: PathToVcf, + val pathToBed: Path, + val dpHeader: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "annotate" :: + "-a" :: pathToBed :: + "-h" :: dpHeader :: + "-c" :: "CHROM,FROM,TO,pl" :: + "-x" :: "FORMAT" :: // remove the FORMAT Field + "-x" :: "^INFO/pl" :: // no need for all these annotations anyway + "--force" :: // needed since the input file is corrupt. + "-o" :: out :: + in :: Nil +} + + +// copies the value from the bed file to the vcf +// removed the PS field (since it's invalid) +private class subsetToPL(val in: PathToVcf, val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "view" :: + "-i" :: "pl!=\".\"" :: + "-o" :: out :: + in :: Nil +} + + +// copies the value from the bed file to the vcf +private class MakePLBed(val table: Path, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val awk: Path = configureExecutable("awk.executable", "awk") + + quoteIfNecessary = false + + override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: + table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil +} + + + From 4e3086b34e337bc76ea3ffd9448d10a6f85c084f Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Tue, 16 Feb 2021 21:59:24 -0500 Subject: [PATCH 09/13] WIP --- .../ContaminateAndEvaluateVBID.scala | 226 ------------------ .../dagr/pipelines/ContaminateBams.scala | 46 ++-- .../scala/dagr/pipelines/EvaluateVBID.scala | 93 +++++++ .../scala/dagr/pipelines/MapAndSort.scala | 82 +++++++ .../dagr/pipelines/SimulateVariants.scala | 126 +++++----- .../main/scala/dagr/tasks/bwa/BwaMem.scala | 2 +- .../scala/dagr/tasks/gatk/Gatk4Task.scala | 11 +- .../tasks/gatk/LeftAlignAndTrimVariants.scala | 6 +- .../dagr/tasks/gatk/VariantsToTable.scala | 7 +- .../main/scala/dagr/tasks/misc/GetYield.scala | 4 +- 10 files changed, 270 insertions(+), 333 deletions(-) delete mode 100644 pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala create mode 100644 pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala create mode 100644 pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala deleted file mode 100644 index 1b619ff1..00000000 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateAndEvaluateVBID.scala +++ /dev/null @@ -1,226 +0,0 @@ -/* - * The MIT License - * - * Copyright (c) 2020 Fulcrum Genomics LLC - * - * Permission is hereby granted, free of charge, to any person obtaining a copy - * of this software and associated documentation files (the "Software"), to deal - * in the Software without restriction, including without limitation the rights - * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell - * copies of the Software, and to permit persons to whom the Software is - * furnished to do so, subject to the following conditions: - * - * The above copyright notice and this permission notice shall be included in - * all copies or substantial portions of the Software. - * - * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR - * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, - * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE - * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER - * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, - * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN - * THE SOFTWARE. - */ -package dagr.pipelines - -import java.nio.file.{Files, Path} - -import dagr.commons.io.{Io, PathUtil} -import dagr.core.cmdline.Pipelines -import dagr.core.config.Configuration -import dagr.core.execsystem.{Cores, Memory, ResourceSet} -import dagr.core.tasksystem.{EitherTask, FixedResources, Linker, NoOpInJvmTask, Pipeline, ProcessTask} -import dagr.sopt.{arg, clp} -import dagr.tasks.DagrDef._ -import dagr.tasks.bwa.BwaMem -import dagr.tasks.gatk.{CountVariants, DownsampleVariants, Gatk4Task, GatkTask, IndexVariants, LeftAlignAndTrimVariants, VariantsToTable} -import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} -import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} -import htsjdk.samtools.SAMFileHeader.SortOrder - -import scala.collection.mutable -import scala.collection.mutable.ListBuffer - -/** - * Downsample Bams to create composite contaminated bams, - * then downsample to various depths - * and run VBID with different number of target snps to evaluate - * dependance of VBID on contamination & depth & number of target snps - */ -@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) -class ContaminateAndEvaluateVBID -(@arg(flag = "i", doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], - @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, - @arg(flag = "o", doc = "Output directory.") val out: DirPath, - @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, - @arg(flag = "p", doc = "Output file prefix.") val prefix: String, - @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], - @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], - @arg(flag = "m", doc = "Number of markers to use.") val markers: Seq[Integer], - @arg(flag = "R", doc = "VBID vcf resource.") val vbidResource: PathToVcf, - @arg(flag = "H", doc = "header line for dp.") val dpHeader: Path - -) extends Pipeline(Some(out)) { - - override def build(): Unit = { - val strat = Some(DownsamplingStrategy.HighAccuracy) - - Files.createDirectories(out) - val bamYield = new mutable.HashMap[PathToBam, Int] - val coverage = 100 - val makeBamsLoop = new NoOpInJvmTask("made bams") - - val bams: Seq[PathToBam] = vcfs.map(vcf => { - - val toTable = new VariantsToTable(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".table"), fields = "CHROM" :: "POS" :: "HET" :: "HOM-VAR" :: Nil, intervals = Some(targets)) - val makeBed = new MakePLBed(table = toTable.out, out = out.resolve(prefix + "PL.bed")) - - val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), - dpHeader = dpHeader, pathToBed = makeBed.out) - - val subsetToPL = new subsetToPL(in=rando.out, out=out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) - val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) - val index = new IndexVariants(in = normalize.out) - val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new BwaMem(fastq = simulate.outputPairedFastq, out=Some(out.resolve(prefix + vcf.getFileName + ".tmp.bam")) , - ref = ref, maxThreads = 1, memory = Memory("2G")) - val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { - requires(Cores(1), Memory("2G")) - } - - root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> - normalize ==> index ==> simulate ==> bwa ==> sort ==> makeBamsLoop - - - sort.out - }) - - val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count")) - root ==> countVariants - - val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) - - pairsOfBam.foreach { case (x, y) => { - contaminations.foreach(c => { - val dsX = out.resolve(prefix + x.getFileName + ".forCont." + c + ".tmp.bam") - val dsY = out.resolve(prefix + y.getFileName + ".forCont." + c + ".tmp.bam") - // create a mixture of c x + (1-c) y taking into account the depths of - // x and y - // D_x and D_y are the effective depths of x and y - // if we are to downsample x we need x to be downsampled to c/(1-c) - // if we are to downsample y we need y to be downsampled to (1-c)/c - // of of those two values will be less than 1. - val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("2g")) - val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("2g")) - - val copyX = new LinkFile(x, dsX) - val copyY = new LinkFile(y, dsY) - val downsampleX_Q = c / (1 - c) <= 1 - val downsample = EitherTask.of( - downsampleX :: copyY :: Nil, - downsampleY :: copyX :: Nil, - downsampleX_Q) - - val xFactor = if (downsampleX_Q) c / (1 - c) else 1 - val yFactor = if (downsampleX_Q) 1 else (1 - c) / c - - val resultantDepth: Double = bamYield.getOrElse(x, 0) * xFactor + bamYield.getOrElse(y, 0) * yFactor - - val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + - "__" + c + ".bam") - val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) - - makeBamsLoop ==> downsample ==> mergedownsampled - - depths filter (d => d <= resultantDepth) foreach { d => { - val outDownsampled = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + - "__" + c + "__target__" + d + ".bam") - val downsampleMerged = new DownsampleSam(in = xyContaminatedBam, out = outDownsampled, proportion = d / resultantDepth) - - mergedownsampled ==> downsampleMerged - - markers.foreach { markerCount => { - // subset VBID resource files to smaller number of markers - - // this need to happen after the tool is run...need to figure out how to do that - val outDownsampledResource = out.resolve(prefix + "__" + vbidResource.getFileName + - "__" + markerCount + ".vcf") - - val downsample = new DownsampleVariants( - in = vbidResource, - output = outDownsampledResource, - proportion = 1) - - val vbid = new VerifyBamId( - vcf = outDownsampledResource, - bam = xyContaminatedBam, - out = out.resolve(PathUtil.replaceExtension(xyContaminatedBam, s".${markerCount}_markers.selfSM")) - ) - - (downsampleMerged :: countVariants) ==> - Linker(countVariants, downsample)((f, c) => c.downsampleProportion = markerCount / f.count) ==> - vbid - } - } - } - } - }) - } - } - } -} - -// copies the value from the bed file to the vcf -// removed the PS field (since it's invalid) -private class CopyPS_FromBedToVcf(val in: PathToVcf, - val out: PathToVcf, - val pathToBed: Path, - val dpHeader: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - - private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") - - override def args: Seq[Any] = bcftools :: "annotate" :: - "-a" :: pathToBed :: - "-h" :: dpHeader :: - "-c" :: "CHROM,FROM,TO,pl" :: - "-x" :: "FORMAT" :: // remove the FORMAT Field - "-x" :: "^INFO/pl" :: // no need for all these annotations anyway - "--force" :: // needed since the input file is corrupt. - "-o" :: out :: - in :: Nil -} - - -// copies the value from the bed file to the vcf -// removed the PS field (since it's invalid) -private class subsetToPL(val in: PathToVcf, val out: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - - private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") - - override def args: Seq[Any] = bcftools :: "view" :: - "-i" :: "pl!=\".\"" :: - "-o" :: out :: - in :: Nil -} - - -// copies the value from the bed file to the vcf -private class MakePLBed(val table: Path, - val out: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - - private val awk: Path = configureExecutable("awk.executable", "awk") - - quoteIfNecessary = false - - override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: - table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil -} - - - diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala index 12e6f7da..82b72fbc 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala @@ -1,7 +1,7 @@ /* * The MIT License * - * Copyright (c) 2020 Fulcrum Genomics LLC + * Copyright (c) 2021 Fulcrum Genomics LLC * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -23,46 +23,33 @@ */ package dagr.pipelines -import java.nio.file.{Files, Path} +import java.nio.file.Files -import dagr.commons.io.PathUtil +import com.fulcrumgenomics.sopt.{arg, clp} import dagr.core.cmdline.Pipelines -import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem._ -import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ -import dagr.tasks.bwa.BwaMem -import dagr.tasks.gatk._ -import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} -import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import dagr.tasks.misc.LinkFile +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles} import htsjdk.samtools.SAMFileHeader.SortOrder -import scala.collection.mutable - /** - * Downsample Bams to create composite contaminated bams, - * then downsample to various depths - * and run VBID with different number of target snps to evaluate - * dependance of VBID on contamination & depth & number of target snps - */ -@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) -class ContaminateAndEvaluateVBID -(@arg(flag = "i", doc = "Input bams for generating bams.") val bams: Seq[PathToBam], -// @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, - @arg(flag = "o", doc = "Output directory.") val out: DirPath, -// @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, - @arg(flag = "p", doc = "Output file prefix.") val prefix: String, - @arg(flag = "c", doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], - @arg(flag = "d", doc = "Depth values to evaluate.") val depths: Seq[Integer], + */ +@clp(description = "pipeline to contaminate bams.", group = classOf[Pipelines]) +class ContaminateBams( + @arg(flag = 'i', doc = "Input bams for generating bams.") val bams: Seq[PathToBam], + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'c', doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], + @arg(flag = 'd', doc = "Depth values to evaluate.") val depths: Seq[Integer], -) extends Pipeline(Some(out)) { + ) extends Pipeline(Some(out)) { override def build(): Unit = { val strat = Some(DownsamplingStrategy.HighAccuracy) Files.createDirectories(out) - val bamYield = new mutable.HashMap[PathToBam, Int] val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) @@ -87,11 +74,6 @@ class ContaminateAndEvaluateVBID downsampleY :: copyX :: Nil, downsampleX_Q) - val xFactor = if (downsampleX_Q) c / (1 - c) else 1 - val yFactor = if (downsampleX_Q) 1 else (1 - c) / c - - val resultantDepth: Double = bamYield.getOrElse(x, 0) * xFactor + bamYield.getOrElse(y, 0) * yFactor - val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + "__" + c + ".bam") val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) diff --git a/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala new file mode 100644 index 00000000..5dcf2089 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala @@ -0,0 +1,93 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import com.fulcrumgenomics.commons.io.PathUtil +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory, ResourceSet} +import dagr.core.tasksystem.{EitherTask, FixedResources, Linker, NoOpInJvmTask, Pipeline, ProcessTask} +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk.{CountVariants, DownsampleVariants, Gatk4Task, GatkTask, IndexVariants, LeftAlignAndTrimVariants, VariantsToTable} +import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import htsjdk.samtools.SAMFileHeader.SortOrder + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * Downsample Bams to create composite contaminated bams, + * then downsample to various depths + * and run VBID with different number of target snps to evaluate + * dependance of VBID on contamination & depth & number of target snps + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class EvaluateVBID +(@arg(flag = 'i', doc = "Input bams to evaluate.") val bams: Seq[PathToBam], + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'm', doc = "Number of markers to use.") val markers: Seq[Integer], + @arg(flag = 'R', doc = "VBID vcf resource.") val vbidResource: PathToVcf, + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + + val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count")) + root ==> countVariants + + bams.foreach(bam => { + markers.foreach { markerCount => { + // subset VBID resource files to smaller number of markers + + // this need to happen after the tool is run...need to figure out how to do that + val outDownsampledResource = out.resolve(prefix + "__" + vbidResource.getFileName + + "__" + markerCount + ".vcf") + + val downsample = new DownsampleVariants( + in = vbidResource, + output = outDownsampledResource, + proportion = 1) + + val vbid = new VerifyBamId( + vcf = outDownsampledResource, + bam = bam, + out = out.resolve(PathUtil.replaceExtension(bam, s".${markerCount}_markers.selfSM")) + ) + + countVariants ==> + Linker(countVariants, downsample)((f, c) => c.downsampleProportion = markerCount / f.count) ==> + vbid + } + } + + }) + + } +} diff --git a/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala b/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala new file mode 100644 index 00000000..aefd09b7 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala @@ -0,0 +1,82 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.Files + +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem._ +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.picard.SortSam +import htsjdk.samtools.SAMFileHeader.SortOrder + +/** + * generate bams based on input vcfs + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class MapAndSort +(@arg(flag = 'i', doc = "Input fastq for mapping.") val fastqs: Seq[PathToFastq], + @arg(flag = 'r', doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + + Files.createDirectories(out) + + fastqs.foreach(f => { + + val bwa = new myBwaMem(fastq = f, out = Some(out.resolve(prefix + f.getFileName + ".tmp.sam")), ref = ref) + val sort = new mySortSam(inn = bwa.out.get, outt = out.resolve(prefix + f.getFileName + ".sorted.bam")) + + root ==> bwa ==> sort + + }) + } + + + private class mySortSam(val inn: PathToBam, val outt: PathToBam) extends + SortSam(in = inn, outt, SortOrder.coordinate) with + Configuration { + requires(Cores(1), Memory("2G")) + } + + private class myBwaMem(val fastq: PathToFastq, + val out: Option[PathToBam], + val ref: PathToFasta) extends + BwaMem( + fastq = fastq, + out = out, + ref = ref, + maxThreads = 2, + memory = Memory("4G")) + + +} \ No newline at end of file diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala index f9ad235d..6d943da1 100644 --- a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -25,37 +25,34 @@ package dagr.pipelines import java.nio.file.{Files, Path} -import dagr.commons.io.PathUtil +import com.fulcrumgenomics.sopt.{arg, clp} import dagr.core.cmdline.Pipelines import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem._ -import dagr.sopt.{arg, clp} import dagr.tasks.DagrDef._ import dagr.tasks.bwa.BwaMem import dagr.tasks.gatk._ -import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} -import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import dagr.tasks.misc.DWGSim +import dagr.tasks.picard.SortSam import htsjdk.samtools.SAMFileHeader.SortOrder -import scala.collection.mutable - /** * generate bams based on input vcfs */ @clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) class SimulateVariants -(@arg(flag = "i", doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], - @arg(flag = "r", doc = "Reference fasta.") val ref: PathToFasta, - @arg(flag = "o", doc = "Output directory.") val out: DirPath, - @arg(flag = "t", doc = "Target regions.") val targets: PathToIntervals, - @arg(flag = "p", doc = "Output file prefix.") val prefix: String, - @arg(flag = "d", doc = "Depth value to simulate.") val depth: Integer, - @arg(flag = "H", doc = "header line for dp.") val dpHeader: Path +(@arg(flag = 'i', doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], + @arg(flag = 'r', doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 't', doc = "Target regions.") val targets: PathToIntervals, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'd', doc = "Depth value to simulate.") val depth: Integer, + @arg(flag = 'H', doc = "header line for dp.") val dpHeader: Path ) extends Pipeline(Some(out)) { - var bams:Seq[PathToBam] = Nil + var bams: Seq[PathToBam] = Nil override def build(): Unit = { @@ -70,12 +67,11 @@ class SimulateVariants val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), dpHeader = dpHeader, pathToBed = makeBed.out) - val subsetToPL = new subsetToPL(in=rando.out, out=out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) + val subsetToPL = new subsetToPL(in = rando.out, out = out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) val index = new IndexVariants(in = normalize.out) val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new BwaMem(fastq = simulate.outputPairedFastq, out=Some(out.resolve(prefix + vcf.getFileName + ".tmp.bam")) , - ref = ref, maxThreads = 1, memory = Memory("2G")) + val bwa = new myBwaMem(fastq = simulate.outputPairedFastq, out = Some(out.resolve(prefix + vcf.getFileName + ".tmp.sam")), ref = ref) val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { requires(Cores(1), Memory("2G")) } @@ -86,59 +82,69 @@ class SimulateVariants sort.out }) } -} -// copies the value from the bed file to the vcf -// removed the PS field (since it's invalid) -private class CopyPS_FromBedToVcf(val in: PathToVcf, - val out: PathToVcf, - val pathToBed: Path, - val dpHeader: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - - private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") - - override def args: Seq[Any] = bcftools :: "annotate" :: - "-a" :: pathToBed :: - "-h" :: dpHeader :: - "-c" :: "CHROM,FROM,TO,pl" :: - "-x" :: "FORMAT" :: // remove the FORMAT Field - "-x" :: "^INFO/pl" :: // no need for all these annotations anyway - "--force" :: // needed since the input file is corrupt. - "-o" :: out :: - in :: Nil -} + // copies the value from the bed file to the vcf + // removed the PS field (since it's invalid) + private class CopyPS_FromBedToVcf(val in: PathToVcf, + val out: PathToVcf, + val pathToBed: Path, + val dpHeader: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "annotate" :: + "-a" :: pathToBed :: + "-h" :: dpHeader :: + "-c" :: "CHROM,FROM,TO,pl" :: + "-x" :: "FORMAT" :: // remove the FORMAT Field + "-x" :: "^INFO/pl" :: // no need for all these annotations anyway + "--force" :: // needed since the input file is corrupt. + "-o" :: out :: + in :: Nil + } -// copies the value from the bed file to the vcf -// removed the PS field (since it's invalid) -private class subsetToPL(val in: PathToVcf, val out: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + // copies the value from the bed file to the vcf + // removed the PS field (since it's invalid) + private class subsetToPL(val in: PathToVcf, val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) - override def args: Seq[Any] = bcftools :: "view" :: - "-i" :: "pl!=\".\"" :: - "-o" :: out :: - in :: Nil -} + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + override def args: Seq[Any] = bcftools :: "view" :: + "-i" :: "pl!=\".\"" :: + "-o" :: out :: + in :: Nil + } -// copies the value from the bed file to the vcf -private class MakePLBed(val table: Path, - val out: Path - ) extends ProcessTask with FixedResources with Configuration { - requires(Cores(1), Memory("1G")) - private val awk: Path = configureExecutable("awk.executable", "awk") + // copies the value from the bed file to the vcf + private class MakePLBed(val table: Path, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) - quoteIfNecessary = false + private val awk: Path = configureExecutable("awk.executable", "awk") - override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: - table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil -} + quoteIfNecessary = false + override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: + table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil + } + private class myBwaMem(val fastq: PathToFastq, + val out: Option[PathToBam], + val ref: PathToFasta) extends + BwaMem( + fastq = fastq, + out = out, + ref = ref, + maxThreads = 2, + memory = Memory("4G")) + +} diff --git a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala index c8322431..0dbf7a9c 100644 --- a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala +++ b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala @@ -72,7 +72,7 @@ class BwaMem(fastq: PathToFastq = Io.StdIn, basesPerBatch.foreach(n => buffer.append("-K", n)) buffer.append(ref, fastq) - out.foreach(f => buffer.append("> " + f)) + out.foreach(f => buffer.addOne(">").addOne(f)) buffer.toList } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala index cec96ff5..0ee7d572 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala @@ -52,12 +52,13 @@ abstract class Gatk4Task(val walker: String, override def args: Seq[Any] = { val buffer = ListBuffer[Any]() buffer.appendAll(jarArgs(this.gatkJar, jvmMemory = this.resources.memory)) - buffer.append(this.walker) - ref.foreach { r => buffer.append("-R", r.toAbsolutePath.toString) } - intervals.foreach(il => buffer.append("-L", il.toAbsolutePath.toString)) - bamCompression.foreach(c => buffer.append("--bam_compression", c)) + buffer.addOne(this.walker) + ref.foreach { r => buffer.addOne("-R").addOne(r.toAbsolutePath.toString) } + intervals.foreach(il => buffer.addOne("-L").addOne(il.toAbsolutePath.toString)) + bamCompression.foreach(c => buffer.addOne("--bam_compression").addOne(c)) addWalkerArgs(buffer) - buffer + + buffer.toSeq } /** Can be overridden to use a specific GATK jar. */ diff --git a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala index a74d5284..6c47c6bd 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala @@ -34,8 +34,8 @@ class LeftAlignAndTrimVariants(val in: PathToVcf, val out: PathToVcf, ref: PathT extends Gatk4Task(walker = "LeftAlignAndTrimVariants", ref = Some(ref)) { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - buffer.append("-V", in) - buffer.append("-O", out) - splitMultiAlleic foreach { _ => buffer.append("--split-multi-allelics") } + buffer.addOne("-V").addOne(in) + buffer.addOne("-O").addOne(out) + splitMultiAlleic foreach { _ => buffer.addOne("--split-multi-allelics") } } } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala index f5e3169b..961f83c3 100644 --- a/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala +++ b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala @@ -36,11 +36,10 @@ class VariantsToTable(val in: PathToVcf, val out: Path, fields: Seq[String], int extends Gatk4Task(walker = "VariantsToTable", intervals = intervals) { override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { - buffer.append("-V", in) - buffer.append("-O", out) + buffer.addOne("-V").addOne(in) + buffer.addOne("-O").addOne(out) fields.foreach { f => { - buffer append "-F" - buffer append f + buffer addOne "-F" addOne f } } } diff --git a/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala index c9b62153..c932a8e8 100644 --- a/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala +++ b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala @@ -1,7 +1,7 @@ /* * The MIT License * - * Copyright (c) 2015 Fulcrum Genomics LLC + * Copyright (c) 2021 Fulcrum Genomics LLC * * Permission is hereby granted, free of charge, to any person obtaining a copy * of this software and associated documentation files (the "Software"), to deal @@ -32,7 +32,7 @@ import picard.analysis.CollectQualityYieldMetrics.QualityYieldMetrics /** In JVM Task that pulls the total number of reads from the quality yield metrics file. */ class GetYield(qualityYieldMetrics: Path) extends SimpleInJvmTask { this.name = "GetYield" - var count: Integer = 0 + var count: Long = 0 override def run(): Unit = { val metrics = MetricsFile.readBeans(qualityYieldMetrics.toFile).get(0).asInstanceOf[QualityYieldMetrics] From f6d545ac420ea2613f25ce02eb42d62b4adfe9d1 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Tue, 23 Mar 2021 23:05:33 -0400 Subject: [PATCH 10/13] fixed pipeline by scattering dwgsim over contigs --- .../dagr/pipelines/SimulateVariants.scala | 79 +++++++++++++++++-- 1 file changed, 72 insertions(+), 7 deletions(-) diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala index 6d943da1..22539481 100644 --- a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -25,12 +25,13 @@ package dagr.pipelines import java.nio.file.{Files, Path} +import com.fulcrumgenomics.commons.io.PathUtil import com.fulcrumgenomics.sopt.{arg, clp} import dagr.core.cmdline.Pipelines import dagr.core.config.Configuration import dagr.core.execsystem.{Cores, Memory} import dagr.core.tasksystem._ -import dagr.tasks.DagrDef._ +import dagr.tasks.DagrDef.{PathToIntervals, _} import dagr.tasks.bwa.BwaMem import dagr.tasks.gatk._ import dagr.tasks.misc.DWGSim @@ -70,15 +71,37 @@ class SimulateVariants val subsetToPL = new subsetToPL(in = rando.out, out = out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) val index = new IndexVariants(in = normalize.out) - val simulate = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + ".sim"), depth = coverage, coverageTarget = targets) - val bwa = new myBwaMem(fastq = simulate.outputPairedFastq, out = Some(out.resolve(prefix + vcf.getFileName + ".tmp.sam")), ref = ref) + + + root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> normalize ==> index + + val simulate:Seq[DWGSim] = Range(0, 22).map {chr => { + val subsetBed = new Grep(in = targets, what = s"'^chr${chr + 1}\t'", out = out.resolve(prefix + PathUtil.removeExtension(targets.getFileName) + s".chr${chr + 1}.bed")) + root ==> subsetBed + + val sim = new DWGSim(vcf = normalize.out, + fasta = ref, + outPrefix = out.resolve(prefix + vcf.getFileName + s".chr${chr + 1}.sim"), + depth = coverage, + coverageTarget = subsetBed.out) + + (index::subsetBed) ==> sim + + sim + }} + + val mergeFastas = new Concatenate(ins = simulate map { fastq => out.resolve(fastq.outputPairedFastq) }, + out = out.resolve(prefix + vcf.getFileName + ".sim.bfast.fastq")) + + val bwa = new myBwaMem(fastq = mergeFastas.out, out = Some(out.resolve(prefix + vcf.getFileName + ".tmp.sam")), ref = ref) + + simulate.map(sim => sim ==> mergeFastas) + val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { requires(Cores(1), Memory("2G")) } - root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> - normalize ==> index ==> simulate ==> bwa ==> sort - + mergeFastas ==> bwa ==> sort sort.out }) } @@ -122,6 +145,46 @@ class SimulateVariants } + // copies the value from the bed file to the vcf + private class Grep(val in: Path, + val what: String, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val grep: Path = configureExecutable("grep.executable", "grep") + + quoteIfNecessary = false + + override def args: Seq[Any] = grep :: + what :: + in.toAbsolutePath :: + ">" :: + out.toAbsolutePath :: + "||" :: + "touch" :: + out.toAbsolutePath :: + Nil + } + + // copies the value from the bed file to the vcf + private class Concatenate(val ins: Seq[Path], + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + quoteIfNecessary = false + + private val cat: Path = configureExecutable("cat.executable", "cat") + + override def args: Seq[Any] = cat :: + ins.map{in => in.toAbsolutePath} :: + ">" :: + out.toAbsolutePath :: + Nil + } + + // copies the value from the bed file to the vcf private class MakePLBed(val table: Path, val out: Path @@ -145,6 +208,8 @@ class SimulateVariants out = out, ref = ref, maxThreads = 2, - memory = Memory("4G")) + memory = Memory("4G")) { + quoteIfNecessary = false + } } From b52eba7a032aa482f0db8dafef37cde052054e5c Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Tue, 23 Mar 2021 23:37:14 -0400 Subject: [PATCH 11/13] fixed concat by useing mkString --- pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala index 22539481..90de227d 100644 --- a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -178,7 +178,7 @@ class SimulateVariants private val cat: Path = configureExecutable("cat.executable", "cat") override def args: Seq[Any] = cat :: - ins.map{in => in.toAbsolutePath} :: + ins.map{in => in.toAbsolutePath}.mkString :: ">" :: out.toAbsolutePath :: Nil From e34d7c028799638485e2ad1babbb25fb8924b93a Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Tue, 23 Mar 2021 23:38:20 -0400 Subject: [PATCH 12/13] fixed concat by useing mkString --- pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala index 90de227d..0c9f8acc 100644 --- a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -178,7 +178,7 @@ class SimulateVariants private val cat: Path = configureExecutable("cat.executable", "cat") override def args: Seq[Any] = cat :: - ins.map{in => in.toAbsolutePath}.mkString :: + ins.map{in => in.toAbsolutePath} mkString " " :: ">" :: out.toAbsolutePath :: Nil From 62b11962c08e903dd80a63945fb2251ecf929180 Mon Sep 17 00:00:00 2001 From: Yossi Farjoun Date: Sat, 27 Mar 2021 21:17:39 -0400 Subject: [PATCH 13/13] SimulateVariants pipeline seems to be working now. --- .../dagr/core/tasksystem/ProcessTask.scala | 30 +++-- .../dagr/core/tasksystem/Unquotable.scala | 5 + .../dagr/pipelines/SimulateVariants.scala | 120 ++++++++++++++---- .../main/scala/dagr/tasks/bwa/BwaMem.scala | 34 ++--- tasks/src/main/scala/dagr/tasks/package.scala | 1 + 5 files changed, 134 insertions(+), 56 deletions(-) create mode 100644 core/src/main/scala/dagr/core/tasksystem/Unquotable.scala diff --git a/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala b/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala index 6b188324..a79b60b6 100644 --- a/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala +++ b/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala @@ -29,24 +29,30 @@ import java.nio.file.Path import scala.sys.process._ object ProcessTask { - val SpecialCharacters : String = """ `"\#()!~&<>|;*?""" + '\t' + '$' + val SpecialCharacters: String = """ `"\#()!~&<>|;*?""" + '\t' + '$' private val SpecialsAndSingleQuote = SpecialCharacters + "'" private val Escape = """\""" /** Determines if the argument has any special characters in it, and if so quotes the whole string. */ - def quoteIfNecessary(arg: String): String = { - val hasSingleQuotes = arg.contains("'") - val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch)) + def quoteIfNecessary(anyArg: Any ): String = { + anyArg match { + case Unquotable(x) => x.toString + case _ => { + val arg = anyArg.toString - (hasSingleQuotes, hasSpecials) match { - case (false, false) => arg - case (true, false) => arg.replace("'", """\'""") // just escape the single quotes - case (false, true ) => "'" + arg + "'" // just single quote the whole string - case (true, true ) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("") + val hasSingleQuotes = arg.contains("'") + val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch)) + + (hasSingleQuotes, hasSpecials) match { + case (false, false) => arg + case (true, false) => arg.replace("'", """\'""") // just escape the single quotes + case (false, true) => "'" + arg + "'" // just single quote the whole string + case (true, true) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("") + } + } } } } - /** A task that can execute a set of commands in its own process, and does not generate any new tasks. */ trait ProcessTask extends UnitTask { @@ -64,8 +70,8 @@ trait ProcessTask extends UnitTask { * @return the command string. */ private[core] def commandLine: String = { - if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg.toString)).mkString(" ") - else args.mkString(" ") + val mapped = if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg)) else args + mapped.mkString(" ") } /** Write the command to the script and get a process to run. diff --git a/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala b/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala new file mode 100644 index 00000000..4a0343af --- /dev/null +++ b/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala @@ -0,0 +1,5 @@ +package dagr.core.tasksystem + +case class Unquotable(any: Any) { + override def toString: String = any.toString +} diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala index 0c9f8acc..fe6ec2a9 100644 --- a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -25,19 +25,23 @@ package dagr.pipelines import java.nio.file.{Files, Path} -import com.fulcrumgenomics.commons.io.PathUtil +import dagr.core.tasksystem.Pipe +import com.fulcrumgenomics.commons.io.{Io, PathUtil} import com.fulcrumgenomics.sopt.{arg, clp} import dagr.core.cmdline.Pipelines import dagr.core.config.Configuration -import dagr.core.execsystem.{Cores, Memory} +import dagr.core.execsystem.{Cores, Memory, ResourceSet} import dagr.core.tasksystem._ import dagr.tasks.DagrDef.{PathToIntervals, _} +import dagr.tasks.DataTypes.Bed import dagr.tasks.bwa.BwaMem import dagr.tasks.gatk._ import dagr.tasks.misc.DWGSim import dagr.tasks.picard.SortSam import htsjdk.samtools.SAMFileHeader.SortOrder +import scala.collection.mutable.ListBuffer + /** * generate bams based on input vcfs */ @@ -53,14 +57,35 @@ class SimulateVariants ) extends Pipeline(Some(out)) { - var bams: Seq[PathToBam] = Nil - override def build(): Unit = { Files.createDirectories(out) val coverage = depth - bams = vcfs.map(vcf => { + val noop = new Noop() + + val beds: Seq[Path] = Range(0, 22).map { chr => { + val subsetBed = new Grep(in = targets, what = s"^chr${chr + 1}\t", out = out.resolve(prefix + PathUtil.removeExtension(targets.getFileName) + s".chr${chr + 1}.bed")) + subsetBed.name += s".chr${chr + 1}" + + root ==> subsetBed + + val slopBed = new BedtoolsSlop(in = subsetBed.out, + bases = 500, + ref = ref) + + val mergeBed = new BedtoolsMerge(out = Some(PathUtil.replaceExtension(subsetBed.out, ".slop.bed"))) + + val bedSlopMerge: Pipe[Bed, Bed] = slopBed | mergeBed + bedSlopMerge.name = s"BedSlopMerge.chr${chr + 1}" + + subsetBed ==> bedSlopMerge ==> noop + + mergeBed.out.get + } + } + + val bams = vcfs.map(vcf => { val toTable = new VariantsToTable(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".table"), fields = "CHROM" :: "POS" :: "HET" :: "HOM-VAR" :: Nil, intervals = Some(targets)) val makeBed = new MakePLBed(table = toTable.out, out = out.resolve(prefix + "PL.bed")) @@ -73,22 +98,23 @@ class SimulateVariants val index = new IndexVariants(in = normalize.out) - root ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> normalize ==> index + noop ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> normalize ==> index - val simulate:Seq[DWGSim] = Range(0, 22).map {chr => { - val subsetBed = new Grep(in = targets, what = s"'^chr${chr + 1}\t'", out = out.resolve(prefix + PathUtil.removeExtension(targets.getFileName) + s".chr${chr + 1}.bed")) - root ==> subsetBed + val simulate: Seq[DWGSim] = beds.zipWithIndex.map{ case (bed, chr) => { val sim = new DWGSim(vcf = normalize.out, fasta = ref, outPrefix = out.resolve(prefix + vcf.getFileName + s".chr${chr + 1}.sim"), depth = coverage, - coverageTarget = subsetBed.out) + coverageTarget = bed) - (index::subsetBed) ==> sim + sim.name += s".chr${chr + 1}" + noop ==> sim + index ==> sim sim - }} + } + } val mergeFastas = new Concatenate(ins = simulate map { fastq => out.resolve(fastq.outputPairedFastq) }, out = out.resolve(prefix + vcf.getFileName + ".sim.bfast.fastq")) @@ -147,21 +173,19 @@ class SimulateVariants // copies the value from the bed file to the vcf private class Grep(val in: Path, - val what: String, - val out: Path - ) extends ProcessTask with FixedResources with Configuration { + val what: String, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { requires(Cores(1), Memory("1G")) private val grep: Path = configureExecutable("grep.executable", "grep") - quoteIfNecessary = false - override def args: Seq[Any] = grep :: what :: in.toAbsolutePath :: - ">" :: + Unquotable(">") :: out.toAbsolutePath :: - "||" :: + Unquotable("||") :: "touch" :: out.toAbsolutePath :: Nil @@ -173,17 +197,55 @@ class SimulateVariants ) extends ProcessTask with FixedResources with Configuration { requires(Cores(1), Memory("1G")) - quoteIfNecessary = false - private val cat: Path = configureExecutable("cat.executable", "cat") override def args: Seq[Any] = cat :: - ins.map{in => in.toAbsolutePath} mkString " " :: - ">" :: + ins.map { in => in.toAbsolutePath }.toList ::: + Unquotable(">") :: out.toAbsolutePath :: Nil } + // adds bases to the end of bed intervals + abstract class Bedtools(val in: Path, + val tool: String, + val out: Option[Path] + ) extends ProcessTask with FixedResources with Configuration with Pipe[Bed, Bed] { + requires(Cores(1), Memory("1G")) + + private val bedtools: Path = configureExecutable("bedtools.executable", "bedtools") + + def otherArgs: List[Any] = Nil + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any](bedtools, tool, "-i", in.toAbsolutePath) + + buffer.appendAll(otherArgs) + out.foreach(f => buffer.addOne(Unquotable(">")).addOne(f)) + + buffer.toList + } + } + + // adds bases to the end of bed intervals + private class BedtoolsSlop(override val in: Path, + val bases: Int, + val ref: PathToFasta, + override val out: Option[Path] = None + ) extends Bedtools(in = in, out = out, tool = "slop") { + + private val genome = PathUtil.replaceExtension(ref, ".genome") + + override def otherArgs: List[Any] = + "-b" :: bases :: + "-g" :: genome :: + Nil + } + + // merges bedfile + private class BedtoolsMerge(override val in: Path = Io.StdIn, + override val out: Option[Path] = None + ) extends Bedtools(in = in, out = out, tool = "merge") {} // copies the value from the bed file to the vcf private class MakePLBed(val table: Path, @@ -193,12 +255,16 @@ class SimulateVariants private val awk: Path = configureExecutable("awk.executable", "awk") - quoteIfNecessary = false - - override def args: Seq[Any] = awk :: raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""" :: - table.toAbsolutePath :: ">" :: out.toAbsolutePath :: Nil + override def args: Seq[Any] = awk :: Unquotable(raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""") :: + table.toAbsolutePath :: Unquotable(">") :: out.toAbsolutePath :: Nil } + private class Noop() extends ProcessTask { + + override def args: Seq[Any] = Nil + + override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = None + } private class myBwaMem(val fastq: PathToFastq, val out: Option[PathToBam], diff --git a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala index 0dbf7a9c..d8c323de 100644 --- a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala +++ b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala @@ -38,9 +38,9 @@ class BwaMem(fastq: PathToFastq = Io.StdIn, minSeedLength: Option[Int] = None, matchScore: Option[Int] = None, mismatchPenalty: Option[Int] = None, - gapOpenPenalties: Option[(Int,Int)] = None, - gapExtensionPenalties: Option[(Int,Int)] = None, - clippingPenalties: Option[(Int,Int)] = None, + gapOpenPenalties: Option[(Int, Int)] = None, + gapExtensionPenalties: Option[(Int, Int)] = None, + clippingPenalties: Option[(Int, Int)] = None, minScore: Option[Int] = None, smartPairing: Boolean = true, basesPerBatch: Option[Int] = None, @@ -49,29 +49,29 @@ class BwaMem(fastq: PathToFastq = Io.StdIn, minThreads: Int = 1, maxThreads: Int = 32, memory: Memory = Memory("8G") - ) extends ProcessTask with VariableResources with Pipe[Fastq,Sam] { + ) extends ProcessTask with VariableResources with Pipe[Fastq, Sam] { name = "BwaMem" override def pickResources(resources: ResourceSet): Option[ResourceSet] = { - resources.subset(minCores=Cores(minThreads), maxCores=Cores(maxThreads), memory=memory) + resources.subset(minCores = Cores(minThreads), maxCores = Cores(maxThreads), memory = memory) } override def args: Seq[Any] = { val buffer = ListBuffer[Any](Bwa.findBwa, "mem", "-t", resources.cores.toInt) - if (smartPairing) buffer.append("-p") - if (outputAllAlignments) buffer.append("-a") - if (splitAlignTakeFivePrime) buffer.append("-5") - minSeedLength.foreach(l => buffer.append("-k", l)) - matchScore.foreach(s => buffer.append("-A", s)) - mismatchPenalty.foreach(p => buffer.append("-B", p)) - gapOpenPenalties.foreach { case (del, ins) => buffer.append("-O", s"$del,$ins") } - gapExtensionPenalties.foreach { case (del, ins) => buffer.append("-E", s"$del,$ins") } - clippingPenalties.foreach { case (five, three) => buffer.append("-L", s"$five,$three") } - minScore.foreach(s => buffer.append("-T", s)) - basesPerBatch.foreach(n => buffer.append("-K", n)) + if (smartPairing) buffer.addOne("-p") + if (outputAllAlignments) buffer.addOne("-a") + if (splitAlignTakeFivePrime) buffer.addOne("-5") + minSeedLength.foreach(l => buffer.appendAll("-k" :: l :: Nil)) + matchScore.foreach(s => buffer.appendAll("-A" :: s :: Nil)) + mismatchPenalty.foreach(p => buffer.appendAll("-B" :: p :: Nil)) + gapOpenPenalties.foreach { case (del, ins) => buffer.appendAll("-O" :: s"$del,$ins" :: Nil) } + gapExtensionPenalties.foreach { case (del, ins) => buffer.appendAll("-E" :: s"$del,$ins" :: Nil) } + clippingPenalties.foreach { case (five, three) => buffer.appendAll("-L" :: s"$five,$three" :: Nil) } + minScore.foreach(s => buffer.appendAll("-T" :: s :: Nil)) + basesPerBatch.foreach(n => buffer.appendAll("-K" :: n :: Nil)) - buffer.append(ref, fastq) + buffer.appendAll(ref :: fastq :: Nil) out.foreach(f => buffer.addOne(">").addOne(f)) buffer.toList diff --git a/tasks/src/main/scala/dagr/tasks/package.scala b/tasks/src/main/scala/dagr/tasks/package.scala index d8dea9ae..771dfaff 100644 --- a/tasks/src/main/scala/dagr/tasks/package.scala +++ b/tasks/src/main/scala/dagr/tasks/package.scala @@ -49,5 +49,6 @@ package object tasks { class Fastq private() class Text private() class Binary private() + class Bed private() } }