diff --git a/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala b/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala index e7a42b15..c051fcef 100644 --- a/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala +++ b/core/src/main/scala/dagr/core/tasksystem/EitherTask.scala @@ -24,8 +24,11 @@ package dagr.core.tasksystem object EitherTask { + sealed trait Choice - object Left extends Choice + + object Left extends Choice + object Right extends Choice /** @@ -48,17 +51,33 @@ object EitherTask { * @param goLeft an expression that returns a Boolean, with true indicating Left and false indicating Right */ def of(left: Task, right: Task, goLeft: => Boolean): EitherTask = new EitherTask(left, right, () => if (goLeft) Left else Right) + + def of(left: Iterable[Task], right: Iterable[Task], goLeft: => Boolean): EitherTask = { + + val leftPipeline = new Pipeline { + def build(): Unit = { + left.foreach(root ==> _) + } + } + val rightPipeline = new Pipeline { + def build(): Unit = { + right.foreach(root ==> _) + } + } + + new EitherTask(leftPipeline, rightPipeline, () => if (goLeft) Left else Right) + } } /** A task that returns either the left or right task based on a deferred choice. The choice function is * not evaluated until all dependencies have been met and the `EitherTask` needs to make a decision about - * which task to return from [[getTasks]. + * which task to return from [[getTasks]]. * - * @param left the left task. - * @param right the right task + * @param left the left task. + * @param right the right task * @param choice an expression that returns either Left or Right when invoked */ -class EitherTask private (private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task { +class EitherTask private(private val left: Task, private val right: Task, private val choice: () => EitherTask.Choice) extends Task { /** Decides which task to return based on `choice` at execution time. */ override def getTasks: Iterable[Task] = Seq(if (choice() eq EitherTask.Left) left else right) } diff --git a/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala b/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala index 6b188324..a79b60b6 100644 --- a/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala +++ b/core/src/main/scala/dagr/core/tasksystem/ProcessTask.scala @@ -29,24 +29,30 @@ import java.nio.file.Path import scala.sys.process._ object ProcessTask { - val SpecialCharacters : String = """ `"\#()!~&<>|;*?""" + '\t' + '$' + val SpecialCharacters: String = """ `"\#()!~&<>|;*?""" + '\t' + '$' private val SpecialsAndSingleQuote = SpecialCharacters + "'" private val Escape = """\""" /** Determines if the argument has any special characters in it, and if so quotes the whole string. */ - def quoteIfNecessary(arg: String): String = { - val hasSingleQuotes = arg.contains("'") - val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch)) + def quoteIfNecessary(anyArg: Any ): String = { + anyArg match { + case Unquotable(x) => x.toString + case _ => { + val arg = anyArg.toString - (hasSingleQuotes, hasSpecials) match { - case (false, false) => arg - case (true, false) => arg.replace("'", """\'""") // just escape the single quotes - case (false, true ) => "'" + arg + "'" // just single quote the whole string - case (true, true ) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("") + val hasSingleQuotes = arg.contains("'") + val hasSpecials = arg.exists(ch => SpecialCharacters.contains(ch)) + + (hasSingleQuotes, hasSpecials) match { + case (false, false) => arg + case (true, false) => arg.replace("'", """\'""") // just escape the single quotes + case (false, true) => "'" + arg + "'" // just single quote the whole string + case (true, true) => arg.map(ch => if (SpecialsAndSingleQuote.contains(ch)) Escape + ch else ch).mkString("") + } + } } } } - /** A task that can execute a set of commands in its own process, and does not generate any new tasks. */ trait ProcessTask extends UnitTask { @@ -64,8 +70,8 @@ trait ProcessTask extends UnitTask { * @return the command string. */ private[core] def commandLine: String = { - if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg.toString)).mkString(" ") - else args.mkString(" ") + val mapped = if (quoteIfNecessary) args.map(arg => ProcessTask.quoteIfNecessary(arg)) else args + mapped.mkString(" ") } /** Write the command to the script and get a process to run. diff --git a/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala b/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala new file mode 100644 index 00000000..4a0343af --- /dev/null +++ b/core/src/main/scala/dagr/core/tasksystem/Unquotable.scala @@ -0,0 +1,5 @@ +package dagr.core.tasksystem + +case class Unquotable(any: Any) { + override def toString: String = any.toString +} diff --git a/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala new file mode 100644 index 00000000..82b72fbc --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/ContaminateBams.scala @@ -0,0 +1,90 @@ +/* + * The MIT License + * + * Copyright (c) 2021 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.Files + +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem._ +import dagr.tasks.DagrDef._ +import dagr.tasks.misc.LinkFile +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles} +import htsjdk.samtools.SAMFileHeader.SortOrder + +/** + */ +@clp(description = "pipeline to contaminate bams.", group = classOf[Pipelines]) +class ContaminateBams( + @arg(flag = 'i', doc = "Input bams for generating bams.") val bams: Seq[PathToBam], + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'c', doc = "Contamination levels to evaluate.") val contaminations: Seq[Double], + @arg(flag = 'd', doc = "Depth values to evaluate.") val depths: Seq[Integer], + + ) extends Pipeline(Some(out)) { + + override def build(): Unit = { + val strat = Some(DownsamplingStrategy.HighAccuracy) + + Files.createDirectories(out) + + val pairsOfBam = for (x <- bams; y <- bams) yield (x, y) + + pairsOfBam.foreach { case (x, y) => { + contaminations.foreach(c => { + val dsX = out.resolve(prefix + x.getFileName + ".forCont." + c + ".tmp.bam") + val dsY = out.resolve(prefix + y.getFileName + ".forCont." + c + ".tmp.bam") + // create a mixture of c x + (1-c) y taking into account the depths of + // x and y + // D_x and D_y are the effective depths of x and y + // if we are to downsample x we need x to be downsampled to c/(1-c) + // if we are to downsample y we need y to be downsampled to (1-c)/c + // of of those two values will be less than 1. + val downsampleX = new DownsampleSam(in = x, out = dsX, proportion = c / (1 - c), strategy = strat).requires(Cores(1), Memory("2g")) + val downsampleY = new DownsampleSam(in = x, out = dsY, proportion = (1 - c) / c, strategy = strat).requires(Cores(1), Memory("2g")) + + val copyX = new LinkFile(x, dsX) + val copyY = new LinkFile(y, dsY) + val downsampleX_Q = c / (1 - c) <= 1 + val downsample = EitherTask.of( + downsampleX :: copyY :: Nil, + downsampleY :: copyX :: Nil, + downsampleX_Q) + + val xyContaminatedBam = out.resolve(prefix + "__" + x.getFileName + "__" + y.getFileName + + "__" + c + ".bam") + val mergedownsampled = new MergeSamFiles(in = dsX :: dsY :: Nil, out = xyContaminatedBam, sortOrder = SortOrder.coordinate) + + root ==> downsample ==> mergedownsampled + + }) + } + } + } +} + + + diff --git a/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala b/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala new file mode 100644 index 00000000..5dcf2089 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/EvaluateVBID.scala @@ -0,0 +1,93 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import com.fulcrumgenomics.commons.io.PathUtil +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory, ResourceSet} +import dagr.core.tasksystem.{EitherTask, FixedResources, Linker, NoOpInJvmTask, Pipeline, ProcessTask} +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk.{CountVariants, DownsampleVariants, Gatk4Task, GatkTask, IndexVariants, LeftAlignAndTrimVariants, VariantsToTable} +import dagr.tasks.misc.{DWGSim, LinkFile, VerifyBamId} +import dagr.tasks.picard.{DownsampleSam, DownsamplingStrategy, MergeSamFiles, SortSam} +import htsjdk.samtools.SAMFileHeader.SortOrder + +import scala.collection.mutable +import scala.collection.mutable.ListBuffer + +/** + * Downsample Bams to create composite contaminated bams, + * then downsample to various depths + * and run VBID with different number of target snps to evaluate + * dependance of VBID on contamination & depth & number of target snps + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class EvaluateVBID +(@arg(flag = 'i', doc = "Input bams to evaluate.") val bams: Seq[PathToBam], + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'm', doc = "Number of markers to use.") val markers: Seq[Integer], + @arg(flag = 'R', doc = "VBID vcf resource.") val vbidResource: PathToVcf, + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + + val countVariants = new CountVariants(vbidResource, out.resolve(prefix + ".vbid.count")) + root ==> countVariants + + bams.foreach(bam => { + markers.foreach { markerCount => { + // subset VBID resource files to smaller number of markers + + // this need to happen after the tool is run...need to figure out how to do that + val outDownsampledResource = out.resolve(prefix + "__" + vbidResource.getFileName + + "__" + markerCount + ".vcf") + + val downsample = new DownsampleVariants( + in = vbidResource, + output = outDownsampledResource, + proportion = 1) + + val vbid = new VerifyBamId( + vcf = outDownsampledResource, + bam = bam, + out = out.resolve(PathUtil.replaceExtension(bam, s".${markerCount}_markers.selfSM")) + ) + + countVariants ==> + Linker(countVariants, downsample)((f, c) => c.downsampleProportion = markerCount / f.count) ==> + vbid + } + } + + }) + + } +} diff --git a/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala b/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala index 804a9e37..745bb186 100644 --- a/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala +++ b/pipelines/src/main/scala/dagr/pipelines/ExamplePipeline.scala @@ -33,6 +33,8 @@ import dagr.tasks.DagrDef import dagr.tasks.bwa.BwaMem import dagr.tasks.picard.{CollectHsMetrics, DeleteBam, MarkDuplicates, SortSam} import DagrDef.{DirPath, PathToFasta, PathToFastq, PathToIntervals} +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} import htsjdk.samtools.SAMFileHeader.SortOrder /** @@ -54,8 +56,10 @@ class ExamplePipeline Files.createDirectories(out) val bwa = new BwaMem(fastq=fastq, ref=ref) - val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate) - val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix) + val sort = new SortSam(in=Io.StdIn, out=tmpBam, sortOrder=SortOrder.coordinate) with Configuration { + requires(Cores(2), Memory("2G"))} + val mark = new MarkDuplicates(inputs=Seq(tmpBam), out=Some(bam), prefix=metricsPrefix)with Configuration { + requires(Cores(1), Memory("2G"))} val rmtmp = new DeleteBam(tmpBam) root ==> (bwa | sort) ==> mark ==> rmtmp diff --git a/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala b/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala new file mode 100644 index 00000000..aefd09b7 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/MapAndSort.scala @@ -0,0 +1,82 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.Files + +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem._ +import dagr.tasks.DagrDef._ +import dagr.tasks.bwa.BwaMem +import dagr.tasks.picard.SortSam +import htsjdk.samtools.SAMFileHeader.SortOrder + +/** + * generate bams based on input vcfs + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class MapAndSort +(@arg(flag = 'i', doc = "Input fastq for mapping.") val fastqs: Seq[PathToFastq], + @arg(flag = 'r', doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + + Files.createDirectories(out) + + fastqs.foreach(f => { + + val bwa = new myBwaMem(fastq = f, out = Some(out.resolve(prefix + f.getFileName + ".tmp.sam")), ref = ref) + val sort = new mySortSam(inn = bwa.out.get, outt = out.resolve(prefix + f.getFileName + ".sorted.bam")) + + root ==> bwa ==> sort + + }) + } + + + private class mySortSam(val inn: PathToBam, val outt: PathToBam) extends + SortSam(in = inn, outt, SortOrder.coordinate) with + Configuration { + requires(Cores(1), Memory("2G")) + } + + private class myBwaMem(val fastq: PathToFastq, + val out: Option[PathToBam], + val ref: PathToFasta) extends + BwaMem( + fastq = fastq, + out = out, + ref = ref, + maxThreads = 2, + memory = Memory("4G")) + + +} \ No newline at end of file diff --git a/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala new file mode 100644 index 00000000..fe6ec2a9 --- /dev/null +++ b/pipelines/src/main/scala/dagr/pipelines/SimulateVariants.scala @@ -0,0 +1,281 @@ +/* + * The MIT License + * + * Copyright (c) 2020 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.pipelines + +import java.nio.file.{Files, Path} + +import dagr.core.tasksystem.Pipe +import com.fulcrumgenomics.commons.io.{Io, PathUtil} +import com.fulcrumgenomics.sopt.{arg, clp} +import dagr.core.cmdline.Pipelines +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory, ResourceSet} +import dagr.core.tasksystem._ +import dagr.tasks.DagrDef.{PathToIntervals, _} +import dagr.tasks.DataTypes.Bed +import dagr.tasks.bwa.BwaMem +import dagr.tasks.gatk._ +import dagr.tasks.misc.DWGSim +import dagr.tasks.picard.SortSam +import htsjdk.samtools.SAMFileHeader.SortOrder + +import scala.collection.mutable.ListBuffer + +/** + * generate bams based on input vcfs + */ +@clp(description = "Example FASTQ to BAM pipeline.", group = classOf[Pipelines]) +class SimulateVariants +(@arg(flag = 'i', doc = "Input vcfs for generating bams.") val vcfs: Seq[PathToVcf], + @arg(flag = 'r', doc = "Reference fasta.") val ref: PathToFasta, + @arg(flag = 'o', doc = "Output directory.") val out: DirPath, + @arg(flag = 't', doc = "Target regions.") val targets: PathToIntervals, + @arg(flag = 'p', doc = "Output file prefix.") val prefix: String, + @arg(flag = 'd', doc = "Depth value to simulate.") val depth: Integer, + @arg(flag = 'H', doc = "header line for dp.") val dpHeader: Path + +) extends Pipeline(Some(out)) { + + override def build(): Unit = { + + Files.createDirectories(out) + val coverage = depth + + val noop = new Noop() + + val beds: Seq[Path] = Range(0, 22).map { chr => { + val subsetBed = new Grep(in = targets, what = s"^chr${chr + 1}\t", out = out.resolve(prefix + PathUtil.removeExtension(targets.getFileName) + s".chr${chr + 1}.bed")) + subsetBed.name += s".chr${chr + 1}" + + root ==> subsetBed + + val slopBed = new BedtoolsSlop(in = subsetBed.out, + bases = 500, + ref = ref) + + val mergeBed = new BedtoolsMerge(out = Some(PathUtil.replaceExtension(subsetBed.out, ".slop.bed"))) + + val bedSlopMerge: Pipe[Bed, Bed] = slopBed | mergeBed + bedSlopMerge.name = s"BedSlopMerge.chr${chr + 1}" + + subsetBed ==> bedSlopMerge ==> noop + + mergeBed.out.get + } + } + + val bams = vcfs.map(vcf => { + + val toTable = new VariantsToTable(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".table"), fields = "CHROM" :: "POS" :: "HET" :: "HOM-VAR" :: Nil, intervals = Some(targets)) + val makeBed = new MakePLBed(table = toTable.out, out = out.resolve(prefix + "PL.bed")) + + val rando = new CopyPS_FromBedToVcf(in = vcf, out = out.resolve(prefix + vcf.getFileName + ".with.pl.vcf"), + dpHeader = dpHeader, pathToBed = makeBed.out) + + val subsetToPL = new subsetToPL(in = rando.out, out = out.resolve(prefix + vcf.getFileName + ".subsetToPL.vcf")) + val normalize = new LeftAlignAndTrimVariants(in = subsetToPL.out, out = out.resolve(prefix + vcf.getFileName + ".normalized.vcf"), ref = ref, splitMultiAlleic = Some(true)) + val index = new IndexVariants(in = normalize.out) + + + noop ==> toTable ==> makeBed ==> rando ==> subsetToPL ==> normalize ==> index + + val simulate: Seq[DWGSim] = beds.zipWithIndex.map{ case (bed, chr) => { + + val sim = new DWGSim(vcf = normalize.out, + fasta = ref, + outPrefix = out.resolve(prefix + vcf.getFileName + s".chr${chr + 1}.sim"), + depth = coverage, + coverageTarget = bed) + + sim.name += s".chr${chr + 1}" + noop ==> sim + index ==> sim + + sim + } + } + + val mergeFastas = new Concatenate(ins = simulate map { fastq => out.resolve(fastq.outputPairedFastq) }, + out = out.resolve(prefix + vcf.getFileName + ".sim.bfast.fastq")) + + val bwa = new myBwaMem(fastq = mergeFastas.out, out = Some(out.resolve(prefix + vcf.getFileName + ".tmp.sam")), ref = ref) + + simulate.map(sim => sim ==> mergeFastas) + + val sort = new SortSam(in = bwa.out.get, out = out.resolve(prefix + vcf.getFileName + ".sorted.bam"), sortOrder = SortOrder.coordinate) with Configuration { + requires(Cores(1), Memory("2G")) + } + + mergeFastas ==> bwa ==> sort + sort.out + }) + } + + + // copies the value from the bed file to the vcf + // removed the PS field (since it's invalid) + private class CopyPS_FromBedToVcf(val in: PathToVcf, + val out: PathToVcf, + val pathToBed: Path, + val dpHeader: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "annotate" :: + "-a" :: pathToBed :: + "-h" :: dpHeader :: + "-c" :: "CHROM,FROM,TO,pl" :: + "-x" :: "FORMAT" :: // remove the FORMAT Field + "-x" :: "^INFO/pl" :: // no need for all these annotations anyway + "--force" :: // needed since the input file is corrupt. + "-o" :: out :: + in :: Nil + } + + + // copies the value from the bed file to the vcf + // removed the PS field (since it's invalid) + private class subsetToPL(val in: PathToVcf, val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val bcftools: Path = configureExecutable("bcftools.executable", "bcftools") + + override def args: Seq[Any] = bcftools :: "view" :: + "-i" :: "pl!=\".\"" :: + "-o" :: out :: + in :: Nil + } + + + // copies the value from the bed file to the vcf + private class Grep(val in: Path, + val what: String, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val grep: Path = configureExecutable("grep.executable", "grep") + + override def args: Seq[Any] = grep :: + what :: + in.toAbsolutePath :: + Unquotable(">") :: + out.toAbsolutePath :: + Unquotable("||") :: + "touch" :: + out.toAbsolutePath :: + Nil + } + + // copies the value from the bed file to the vcf + private class Concatenate(val ins: Seq[Path], + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val cat: Path = configureExecutable("cat.executable", "cat") + + override def args: Seq[Any] = cat :: + ins.map { in => in.toAbsolutePath }.toList ::: + Unquotable(">") :: + out.toAbsolutePath :: + Nil + } + + // adds bases to the end of bed intervals + abstract class Bedtools(val in: Path, + val tool: String, + val out: Option[Path] + ) extends ProcessTask with FixedResources with Configuration with Pipe[Bed, Bed] { + requires(Cores(1), Memory("1G")) + + private val bedtools: Path = configureExecutable("bedtools.executable", "bedtools") + + def otherArgs: List[Any] = Nil + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any](bedtools, tool, "-i", in.toAbsolutePath) + + buffer.appendAll(otherArgs) + out.foreach(f => buffer.addOne(Unquotable(">")).addOne(f)) + + buffer.toList + } + } + + // adds bases to the end of bed intervals + private class BedtoolsSlop(override val in: Path, + val bases: Int, + val ref: PathToFasta, + override val out: Option[Path] = None + ) extends Bedtools(in = in, out = out, tool = "slop") { + + private val genome = PathUtil.replaceExtension(ref, ".genome") + + override def otherArgs: List[Any] = + "-b" :: bases :: + "-g" :: genome :: + Nil + } + + // merges bedfile + private class BedtoolsMerge(override val in: Path = Io.StdIn, + override val out: Option[Path] = None + ) extends Bedtools(in = in, out = out, tool = "merge") {} + + // copies the value from the bed file to the vcf + private class MakePLBed(val table: Path, + val out: Path + ) extends ProcessTask with FixedResources with Configuration { + requires(Cores(1), Memory("1G")) + + private val awk: Path = configureExecutable("awk.executable", "awk") + + override def args: Seq[Any] = awk :: Unquotable(raw"""'BEGIN{OFS="\t"; } NR>1{print $$1,$$2-1, $$2, $$3+2*$$4}'""") :: + table.toAbsolutePath :: Unquotable(">") :: out.toAbsolutePath :: Nil + } + + private class Noop() extends ProcessTask { + + override def args: Seq[Any] = Nil + + override def pickResources(availableResources: ResourceSet): Option[ResourceSet] = None + } + + private class myBwaMem(val fastq: PathToFastq, + val out: Option[PathToBam], + val ref: PathToFasta) extends + BwaMem( + fastq = fastq, + out = out, + ref = ref, + maxThreads = 2, + memory = Memory("4G")) { + quoteIfNecessary = false + } + +} diff --git a/src/main/resources/example.conf b/src/main/resources/example.conf index 2378aed0..0f72a79d 100644 --- a/src/main/resources/example.conf +++ b/src/main/resources/example.conf @@ -2,7 +2,7 @@ # for the default set of tasks. This should be edited for your local # setup. -# DAGR Configuration - this section can be omited entirely from application configuration +# DAGR Configuration - this section can be omitted entirely from application configuration # files as dagr's reference.conf will provide sensible defaults where they are needed. dagr = { command-line-name = "dagr" diff --git a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala index c8322431..d8c323de 100644 --- a/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala +++ b/tasks/src/main/scala/dagr/tasks/bwa/BwaMem.scala @@ -38,9 +38,9 @@ class BwaMem(fastq: PathToFastq = Io.StdIn, minSeedLength: Option[Int] = None, matchScore: Option[Int] = None, mismatchPenalty: Option[Int] = None, - gapOpenPenalties: Option[(Int,Int)] = None, - gapExtensionPenalties: Option[(Int,Int)] = None, - clippingPenalties: Option[(Int,Int)] = None, + gapOpenPenalties: Option[(Int, Int)] = None, + gapExtensionPenalties: Option[(Int, Int)] = None, + clippingPenalties: Option[(Int, Int)] = None, minScore: Option[Int] = None, smartPairing: Boolean = true, basesPerBatch: Option[Int] = None, @@ -49,30 +49,30 @@ class BwaMem(fastq: PathToFastq = Io.StdIn, minThreads: Int = 1, maxThreads: Int = 32, memory: Memory = Memory("8G") - ) extends ProcessTask with VariableResources with Pipe[Fastq,Sam] { + ) extends ProcessTask with VariableResources with Pipe[Fastq, Sam] { name = "BwaMem" override def pickResources(resources: ResourceSet): Option[ResourceSet] = { - resources.subset(minCores=Cores(minThreads), maxCores=Cores(maxThreads), memory=memory) + resources.subset(minCores = Cores(minThreads), maxCores = Cores(maxThreads), memory = memory) } override def args: Seq[Any] = { val buffer = ListBuffer[Any](Bwa.findBwa, "mem", "-t", resources.cores.toInt) - if (smartPairing) buffer.append("-p") - if (outputAllAlignments) buffer.append("-a") - if (splitAlignTakeFivePrime) buffer.append("-5") - minSeedLength.foreach(l => buffer.append("-k", l)) - matchScore.foreach(s => buffer.append("-A", s)) - mismatchPenalty.foreach(p => buffer.append("-B", p)) - gapOpenPenalties.foreach { case (del, ins) => buffer.append("-O", s"$del,$ins") } - gapExtensionPenalties.foreach { case (del, ins) => buffer.append("-E", s"$del,$ins") } - clippingPenalties.foreach { case (five, three) => buffer.append("-L", s"$five,$three") } - minScore.foreach(s => buffer.append("-T", s)) - basesPerBatch.foreach(n => buffer.append("-K", n)) + if (smartPairing) buffer.addOne("-p") + if (outputAllAlignments) buffer.addOne("-a") + if (splitAlignTakeFivePrime) buffer.addOne("-5") + minSeedLength.foreach(l => buffer.appendAll("-k" :: l :: Nil)) + matchScore.foreach(s => buffer.appendAll("-A" :: s :: Nil)) + mismatchPenalty.foreach(p => buffer.appendAll("-B" :: p :: Nil)) + gapOpenPenalties.foreach { case (del, ins) => buffer.appendAll("-O" :: s"$del,$ins" :: Nil) } + gapExtensionPenalties.foreach { case (del, ins) => buffer.appendAll("-E" :: s"$del,$ins" :: Nil) } + clippingPenalties.foreach { case (five, three) => buffer.appendAll("-L" :: s"$five,$three" :: Nil) } + minScore.foreach(s => buffer.appendAll("-T" :: s :: Nil)) + basesPerBatch.foreach(n => buffer.appendAll("-K" :: n :: Nil)) - buffer.append(ref, fastq) - out.foreach(f => buffer.append("> " + f)) + buffer.appendAll(ref :: fastq :: Nil) + out.foreach(f => buffer.addOne(">").addOne(f)) buffer.toList } diff --git a/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala new file mode 100644 index 00000000..0f16fb3f --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/CountVariants.scala @@ -0,0 +1,71 @@ +/* + * The MIT License + * + * Copyright (c) 2021 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.lang.Long +import java.nio.file.Path + +import dagr.tasks.DagrDef.PathToVcf + +import scala.collection.mutable.ListBuffer +import scala.io.Source + +/** + * Runs the GATK walker that counts variants + */ +class CountVariants(val in: PathToVcf, val output: Path) + extends Gatk4Task(walker = "CountVariants") { + + var count = 0L + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", output) + } + + /** Finalize anything after the task has been run. + * + * This method should be called after a task has been run. The intended use of this method + * is to allow for any modification of this task prior to any dependent tasks being run. This + * would allow any parameters that were passed to dependent tasks as call-by-name to be + * finalized here. For example, we could have passed an Option[String] that is None + * until make it Some(String) in this method. Then when the dependent task's getTasks + * method is called, it can call 'get' on the option and get something. + * + * @param exitCode the exit code of the task, which could also be 1 due to the system terminating this process + * @return true if we c + */ + override def onComplete(exitCode: Int): Boolean = { + + val source = Source.fromFile(output.toFile) + + try { + count = Long.parseLong(source.getLines.mkString) + } finally { + source.close() + } + + super.onComplete(exitCode) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala new file mode 100644 index 00000000..82af5d78 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/DownsampleVariants.scala @@ -0,0 +1,14 @@ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.PathToVcf +import scala.collection.mutable.ListBuffer + +class DownsampleVariants(in: PathToVcf, output: PathToVcf, proportion: Double) extends Gatk4Task(walker = "SelectVariants") { + + var downsampleProportion = proportion + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-V", in) + buffer.append("-O", output) + buffer.append("-factor", downsampleProportion) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala new file mode 100644 index 00000000..0ee7d572 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/Gatk4Task.scala @@ -0,0 +1,68 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.nio.file.Path + +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem.{FixedResources, ProcessTask} +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals} +import dagr.tasks.{DagrDef, JarTask} + +import scala.collection.mutable.ListBuffer + +object Gatk4Task { + val GatkJarPathConfigKey = "gatk4.jar" +} + +/** + * Abstract base class for tasks that involve running the GATK. + */ + + +abstract class Gatk4Task(val walker: String, + val ref: Option[PathToFasta] = None, + val intervals: Option[PathToIntervals] = None, + val bamCompression: Option[Int] = None) + extends ProcessTask with JarTask with FixedResources with Configuration { + requires(Cores(1), Memory("4g")) + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any]() + buffer.appendAll(jarArgs(this.gatkJar, jvmMemory = this.resources.memory)) + buffer.addOne(this.walker) + ref.foreach { r => buffer.addOne("-R").addOne(r.toAbsolutePath.toString) } + intervals.foreach(il => buffer.addOne("-L").addOne(il.toAbsolutePath.toString)) + bamCompression.foreach(c => buffer.addOne("--bam_compression").addOne(c)) + addWalkerArgs(buffer) + + buffer.toSeq + } + + /** Can be overridden to use a specific GATK jar. */ + protected def gatkJar: Path = configure[Path](Gatk4Task.GatkJarPathConfigKey) + + protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala new file mode 100644 index 00000000..b8cc8934 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/IndexVariants.scala @@ -0,0 +1,39 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class IndexVariants(val in: PathToVcf) + extends Gatk4Task(walker = "IndexFeatureFile") { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.append("-I", in) + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala new file mode 100644 index 00000000..6c47c6bd --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/LeftAlignAndTrimVariants.scala @@ -0,0 +1,41 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class LeftAlignAndTrimVariants(val in: PathToVcf, val out: PathToVcf, ref: PathToFasta, splitMultiAlleic: Option[Boolean]) + extends Gatk4Task(walker = "LeftAlignAndTrimVariants", ref = Some(ref)) { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.addOne("-V").addOne(in) + buffer.addOne("-O").addOne(out) + splitMultiAlleic foreach { _ => buffer.addOne("--split-multi-allelics") } + } +} diff --git a/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala new file mode 100644 index 00000000..961f83c3 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/gatk/VariantsToTable.scala @@ -0,0 +1,46 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.gatk + +import java.nio.file.Path + +import dagr.tasks.DagrDef.{PathToFasta, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Runs the GATK walker that normalizes variants and optionally splits multiallelic sites into multiple, biallelic ones + */ +class VariantsToTable(val in: PathToVcf, val out: Path, fields: Seq[String], intervals: Option[PathToIntervals]) + extends Gatk4Task(walker = "VariantsToTable", intervals = intervals) { + + override protected def addWalkerArgs(buffer: ListBuffer[Any]): Unit = { + buffer.addOne("-V").addOne(in) + buffer.addOne("-O").addOne(out) + fields.foreach { f => { + buffer addOne "-F" addOne f + } + } + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala new file mode 100644 index 00000000..e4062441 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/DWGSim.scala @@ -0,0 +1,69 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.config.Configuration +import dagr.core.execsystem.{Cores, Memory} +import dagr.core.tasksystem.{FixedResources, ProcessTask} +import dagr.tasks.DagrDef.{PathToFasta, PathToFastq, PathToIntervals, PathToVcf} + +import scala.collection.mutable.ListBuffer + +/** + * Class for running DWGSim to generate simulated sequencing data + */ +class DWGSim(val vcf: PathToVcf, + val fasta: PathToFasta, + val outPrefix: Path, + val depth: Int = 100, + val iSize: Int = 350, + val iSizeDev: Int = 100, + val readLength: Int = 151, + val randomSeed: Int = 42, // -1 to use current time + val coverageTarget: PathToIntervals + + ) extends ProcessTask with Configuration with FixedResources { + + requires(Cores(1), Memory("2G")) + private val dwgsim: Path = configureExecutable("dwgsim.executable", "dwgsim") + + val outputPairedFastq: PathToFastq = outPrefix.getParent.resolve(outPrefix.getFileName + ".bfast.fastq") + + override def args: Seq[Any] = { + val buffer = ListBuffer[Any]() + buffer ++= dwgsim :: Nil + buffer ++= "-v" :: vcf :: Nil + buffer ++= "-d" :: iSize :: Nil + buffer ++= "-s" :: iSizeDev :: Nil + buffer ++= "-C" :: depth :: Nil + buffer ++= "-1" :: readLength :: Nil + buffer ++= "-2" :: readLength :: Nil + buffer ++= "-z" :: randomSeed :: Nil + buffer ++= "-x" :: coverageTarget :: Nil + buffer ++= fasta :: outPrefix :: Nil //these two arguments must be last + buffer.toList + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala new file mode 100644 index 00000000..c932a8e8 --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/GetYield.scala @@ -0,0 +1,41 @@ +/* + * The MIT License + * + * Copyright (c) 2021 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.tasksystem.SimpleInJvmTask +import htsjdk.samtools.metrics.MetricsFile +import picard.analysis.CollectQualityYieldMetrics.QualityYieldMetrics + +/** In JVM Task that pulls the total number of reads from the quality yield metrics file. */ +class GetYield(qualityYieldMetrics: Path) extends SimpleInJvmTask { + this.name = "GetYield" + var count: Long = 0 + + override def run(): Unit = { + val metrics = MetricsFile.readBeans(qualityYieldMetrics.toFile).get(0).asInstanceOf[QualityYieldMetrics] + count = metrics.PF_READS + } +} diff --git a/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala new file mode 100644 index 00000000..03ec9bef --- /dev/null +++ b/tasks/src/main/scala/dagr/tasks/misc/LinkFile.scala @@ -0,0 +1,33 @@ +/* + * The MIT License + * + * Copyright (c) 2016 Fulcrum Genomics LLC + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in + * all copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN + * THE SOFTWARE. + */ +package dagr.tasks.misc + +import java.nio.file.Path + +import dagr.core.tasksystem.ShellCommand + +/** + * Uses the shell to (soft) link a file or directory to another alias + */ +class LinkFile(val from: Path, val to: Path) extends ShellCommand("ln", "-sf", from.toString, to.toString) diff --git a/tasks/src/main/scala/dagr/tasks/package.scala b/tasks/src/main/scala/dagr/tasks/package.scala index d8dea9ae..771dfaff 100644 --- a/tasks/src/main/scala/dagr/tasks/package.scala +++ b/tasks/src/main/scala/dagr/tasks/package.scala @@ -49,5 +49,6 @@ package object tasks { class Fastq private() class Text private() class Binary private() + class Bed private() } }