Skip to content

Commit

Permalink
feat(lib): return number of no mapping reads
Browse files Browse the repository at this point in the history
  • Loading branch information
mbhall88 committed Nov 21, 2024
1 parent 9ac48ec commit 917b786
Show file tree
Hide file tree
Showing 5 changed files with 116 additions and 40 deletions.
68 changes: 46 additions & 22 deletions liblrge/src/ava.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};

use crossbeam_channel as channel;
Expand Down Expand Up @@ -54,6 +55,11 @@ impl AvaStrategy {
builder.build(input)
}

/// The number of reads being overlapped.
pub fn num_reads(&self) -> usize {
self.num_reads
}

/// Subsample the reads in the input file to `num_reads`.
fn subsample_reads(&mut self) -> crate::Result<(PathBuf, usize)> {
debug!("Counting records in FASTQ file...");
Expand Down Expand Up @@ -127,7 +133,7 @@ impl AvaStrategy {
aln_wrapper: AlignerWrapper,
reads_file: PathBuf,
sum_len: usize,
) -> crate::Result<Vec<f32>> {
) -> crate::Result<(Vec<f32>, u32)> {
// Bounded channel to control memory usage - i.e., 10000 records in the channel at a time
let (sender, receiver) = channel::bounded(25_000);
let aligner = Arc::clone(&aln_wrapper.aligner); // Shared reference for the producer thread
Expand Down Expand Up @@ -203,7 +209,7 @@ impl AvaStrategy {
.par_bridge() // Parallelize the processing
.try_for_each(|record| -> Result<(), LrgeError> {
let io::Message::Data((rid, seq)) = record;
trace!("Processing read: {:?}", String::from_utf8_lossy(&rid));
trace!("Processing read: {}", String::from_utf8_lossy(&rid));

let mut qname = rid.to_owned();
if qname.last() != Some(&0) {
Expand Down Expand Up @@ -233,7 +239,8 @@ impl AvaStrategy {
let tname = &mapping.target_name;

if &rid == tname {
// Skip self-overlaps
// Skip self-overlaps. if the qname is not in the ovlap_counter, we insert it with 0 overlaps
ovlap_counter_lock.entry(rid.clone()).or_insert(0);
continue;
}

Expand All @@ -252,10 +259,6 @@ impl AvaStrategy {
*ovlap_counter_lock.entry(rid.clone()).or_insert(0) += 1;
}
} else {
debug!(
"No mappings found for read: {:?}",
String::from_utf8_lossy(&rid)
);
// if the qname is not in the ovlap_counter, we insert it with 0 overlaps
ovlap_counter_lock.entry(rid.clone()).or_insert(0);
}
Expand All @@ -271,37 +274,59 @@ impl AvaStrategy {
LrgeError::ThreadError(format!("Thread panicked when joining: {:?}", e))
})??;

debug!("Overlaps written to: {:?}", paf_path);
debug!("Overlaps written to: {}", paf_path.to_string_lossy());

let ovlap_counter = Arc::try_unwrap(ovlap_counter)
.unwrap()
.into_inner()
.unwrap();
let read_lengths = Arc::try_unwrap(read_lengths).unwrap().into_inner().unwrap();
let no_mapping_count = AtomicU32::new(0);
let estimates = ovlap_counter
.par_iter()
.map(|(rid, n_ovlaps)| {
// safe to unwrap the Option here because we know the key exists
let read_len = read_lengths.get(rid).unwrap();
let avg_read_len = sum_len as f32 / (self.num_reads - 1) as f32;
let est = per_read_estimate(
*read_len,
avg_read_len,
self.num_reads - 1,
*n_ovlaps,
overlap_threshold,
);
let est = if *n_ovlaps == 0 {
no_mapping_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
trace!(
"No overlaps found for read: {}",
String::from_utf8_lossy(rid)
);
f32::INFINITY
} else {
// safe to unwrap the Option here because we know the key exists
let read_len = read_lengths.get(rid).unwrap();
let avg_read_len = sum_len as f32 / (self.num_reads - 1) as f32;
per_read_estimate(
*read_len,
avg_read_len,
self.num_reads - 1,
*n_ovlaps,
overlap_threshold,
)
};
trace!("Estimate for {}: {}", String::from_utf8_lossy(rid), est);
est
})
.collect();

Ok(estimates)
let no_mapping_count = no_mapping_count.load(std::sync::atomic::Ordering::Relaxed);

if no_mapping_count > 0 {
let percent = (no_mapping_count as f32 / self.num_reads as f32) * 100.0;
warn!(
"{} ({:.2}%) read(s) did not overlap any other reads",
no_mapping_count, percent
);
} else {
debug!("All reads had at least one overlap");
}

Ok((estimates, no_mapping_count))
}
}

impl Estimate for AvaStrategy {
fn generate_estimates(&mut self) -> crate::Result<Vec<f32>> {
fn generate_estimates(&mut self) -> crate::Result<(Vec<f32>, u32)> {
let (reads_file, sum_len) = self.subsample_reads()?;

let preset = match self.platform {
Expand All @@ -310,8 +335,7 @@ impl Estimate for AvaStrategy {
};

let aligner = AlignerWrapper::new(&reads_file, self.threads, preset, false)?;
let estimates = self.align_reads(aligner, reads_file, sum_len)?;

Ok(estimates)
self.align_reads(aligner, reads_file, sum_len)
}
}
26 changes: 22 additions & 4 deletions liblrge/src/estimate.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,17 @@ pub const LOWER_QUANTILE: f32 = 0.15;
/// The upper quantile we found to give the highest confidence in our analysis.
pub const UPPER_QUANTILE: f32 = 0.65;

pub struct EstimateResult {
/// The lower quantile of the estimates
pub lower: Option<f32>,
/// The median of the estimates - this is the genome size estimate
pub median: Option<f32>,
/// The upper quantile of the estimates
pub upper: Option<f32>,
/// The number of reads that did not have an overlap
pub no_mapping_count: u32,
}

/// This trait provides methods to generate estimates and calculate the median
/// of those estimates, both with and without considering infinite values.
pub trait Estimate {
Expand All @@ -13,7 +24,7 @@ pub trait Estimate {
/// # Returns
///
/// A `Vec<f32>` containing the generated estimates. These estimates may be finite or infinite.
fn generate_estimates(&mut self) -> crate::Result<Vec<f32>>;
fn generate_estimates(&mut self) -> crate::Result<(Vec<f32>, u32)>;

// todo add link to paper
/// Generate an estimate of the genome size, taking the median of the per-read estimates.
Expand Down Expand Up @@ -43,16 +54,23 @@ pub trait Estimate {
finite: bool,
lower_quant: Option<f32>,
upper_quant: Option<f32>,
) -> crate::Result<(Option<f32>, Option<f32>, Option<f32>)> {
let estimates = self.generate_estimates()?;
) -> crate::Result<EstimateResult> {
let (estimates, no_mapping_count) = self.generate_estimates()?;

let iter: Box<dyn Iterator<Item = f32>> = if finite {
Box::new(estimates.iter().filter(|&x| x.is_finite()).copied())
} else {
Box::new(estimates.iter().copied())
};

Ok(median(iter, lower_quant, upper_quant))
let (lower, median, upper) = median(iter, lower_quant, upper_quant);

Ok(EstimateResult {
lower,
median,
upper,
no_mapping_count,
})
}
}

Expand Down
41 changes: 32 additions & 9 deletions liblrge/src/twoset.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ use std::collections::HashSet;
use std::fs::File;
use std::io::BufWriter;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicU32;
use std::sync::{Arc, Mutex};

use crossbeam_channel as channel;
Expand Down Expand Up @@ -53,6 +54,16 @@ impl TwoSetStrategy {
builder.build(input)
}

/// The number of target reads
pub fn target_num_reads(&self) -> usize {
self.target_num_reads
}

/// The number of query reads
pub fn query_num_reads(&self) -> usize {
self.query_num_reads
}

fn split_fastq(&mut self) -> crate::Result<(PathBuf, PathBuf, f32)> {
debug!("Counting records in FASTQ file...");
let n_fq_reads = {
Expand Down Expand Up @@ -141,7 +152,7 @@ impl TwoSetStrategy {
aln_wrapper: AlignerWrapper,
query_file: PathBuf,
avg_target_len: f32,
) -> Result<Vec<f32>, LrgeError> {
) -> Result<(Vec<f32>, u32), LrgeError> {
// Bounded channel to control memory usage - i.e., 10000 records in the channel at a time
let (sender, receiver) = channel::bounded(10000);
let aligner = Arc::clone(&aln_wrapper.aligner); // Shared reference for the producer thread
Expand Down Expand Up @@ -195,6 +206,7 @@ impl TwoSetStrategy {

let estimates = Vec::with_capacity(self.query_num_reads);
let estimates = Arc::new(Mutex::new(estimates));
let no_mapping_count = AtomicU32::new(0);

debug!("Aligning reads and writing overlaps to PAF file...");
// Consumer: Process records from the channel in parallel
Expand All @@ -204,7 +216,7 @@ impl TwoSetStrategy {
.par_bridge() // Parallelize the processing
.try_for_each(|record| -> Result<(), LrgeError> {
let io::Message::Data((rid, seq)) = record;
trace!("Processing read: {:?}", String::from_utf8_lossy(&rid));
trace!("Processing read: {}", String::from_utf8_lossy(&rid));

let mut qname = rid.to_owned();
if qname.last() != Some(&0) {
Expand Down Expand Up @@ -233,10 +245,11 @@ impl TwoSetStrategy {
}
}
} else {
debug!(
"No mappings found for read: {:?}",
trace!(
"No overlaps found for read: {}",
String::from_utf8_lossy(&rid)
);
no_mapping_count.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
}

let est = per_read_estimate(
Expand Down Expand Up @@ -265,7 +278,18 @@ impl TwoSetStrategy {
LrgeError::ThreadError(format!("Thread panicked when joining: {:?}", e))
})??;

debug!("Overlaps written to: {:?}", paf_path);
debug!("Overlaps written to: {}", paf_path.to_string_lossy());

let no_mapping_count = no_mapping_count.load(std::sync::atomic::Ordering::Relaxed);
if no_mapping_count > 0 {
let percent = (no_mapping_count as f32 / self.query_num_reads as f32) * 100.0;
warn!(
"{} ({:.2}%) query read(s) did not overlap any target reads",
no_mapping_count, percent
);
} else {
debug!("All query reads overlapped with target reads");
}

// we extract the estimates from the Arc and Mutex
let estimates = Arc::try_unwrap(estimates)
Expand All @@ -279,12 +303,12 @@ impl TwoSetStrategy {
LrgeError::ThreadError("Error unwrapping estimates Mutex<Vec<f32>>".to_string())
})?;

Ok(estimates)
Ok((estimates, no_mapping_count))
}
}

impl Estimate for TwoSetStrategy {
fn generate_estimates(&mut self) -> crate::Result<Vec<f32>> {
fn generate_estimates(&mut self) -> crate::Result<(Vec<f32>, u32)> {
let (target_file, query_file, avg_target_len) = self.split_fastq()?;

let preset = match self.platform {
Expand All @@ -293,9 +317,8 @@ impl Estimate for TwoSetStrategy {
};

let aligner = AlignerWrapper::new(&target_file, self.threads, preset, true)?;
let estimates = self.align_reads(aligner, query_file, avg_target_len)?;

Ok(estimates)
self.align_reads(aligner, query_file, avg_target_len)
}
}

Expand Down
2 changes: 1 addition & 1 deletion lrge/src/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,7 @@ fn check_path_exists<S: AsRef<OsStr> + ?Sized>(s: &S) -> Result<PathBuf, String>
if path.exists() {
Ok(path)
} else {
Err(format!("{:?} does not exist", path))
Err(format!("{} does not exist", path.to_string_lossy()))
}
}

Expand Down
19 changes: 15 additions & 4 deletions lrge/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,8 @@ mod cli;
mod utils;

fn setup_logging(quiet: u8, verbose: u8) {
let sum = (verbose - quiet) as i16;
let sum = verbose as i8 - quiet as i8;

let lvl = match sum {
1 => LevelFilter::Debug,
2.. => LevelFilter::Trace,
Expand All @@ -35,9 +36,15 @@ fn main() -> Result<()> {

let tmpdir = create_temp_dir(args.temp_dir.as_ref(), args.keep_temp)?;
if args.keep_temp {
info!("Created temporary directory at {:?}", tmpdir.path());
info!(
"Created temporary directory at {}",
tmpdir.path().to_string_lossy()
);
} else {
debug!("Created temporary directory at {:?}", tmpdir.path());
debug!(
"Created temporary directory at {}",
tmpdir.path().to_string_lossy()
);
}

let mut output: Box<dyn Write> = if args.output == "-" {
Expand Down Expand Up @@ -74,10 +81,14 @@ fn main() -> Result<()> {
unreachable!("No strategy could be determined. Please raise an issue at <https://github.com/mbhall88/lrge/issues>")
};

let (low_q, estimate, upper_q) = strategy
let est_result = strategy
.estimate(!args.with_infinity, Some(args.lower_q), Some(args.upper_q))
.context("Failed to generate estimate")?;

let estimate = est_result.median;
let low_q = est_result.lower;
let upper_q = est_result.upper;

match estimate {
Some(est) => {
let formatted_est = format_estimate(est);
Expand Down

0 comments on commit 917b786

Please sign in to comment.