Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add more documentation #72

Open
timosachsenberg opened this issue Jun 24, 2024 · 0 comments
Open

add more documentation #72

timosachsenberg opened this issue Jun 24, 2024 · 0 comments

Comments

@timosachsenberg
Copy link

I used Claude to document the code (for reviewing purposes).
It is not tested but I thought I leave it here if it is considered useful.

import logging
import os
from pathlib import Path

import numpy as np
import pandas as pd

from ibaqpy.ibaq.ibaqpy_commons import load_feature, load_sdrf
from ibaqpy.ibaq.utils import (
    apply_batch_correction,
    compute_pca,
    fill_samples,
    filter_missing_value_by_group,
    folder_retrieval,
    generate_meta,
    get_batch_info_from_sample_names,
    impute_missing_values,
    iterative_outlier_removal,
    plot_pca,
    remove_single_sample_batches,
    split_df_by_column,
)

# Set up logging
logging.basicConfig(
    format="%(asctime)s [%(funcName)s] - %(message)s", level=logging.DEBUG
)
logger = logging.getLogger(__name__)

class Combiner:
    """
    A class for combining and processing proteomics data from multiple sources.
    
    This class handles data loading, imputation, outlier removal, and batch correction
    for proteomics data, particularly focusing on iBAQ (intensity-based absolute quantification) results.
    """

    def __init__(self, data_folder: os.PathLike, covariate: str = None, organism: str = "HUMAN"):
        """
        Initialize the Combiner object.

        Args:
            data_folder (os.PathLike): Path to the folder containing the data files.
            covariate (str, optional): The covariate to use for analysis. Defaults to None.
            organism (str, optional): The organism to filter for. Defaults to "HUMAN".
        """
        self.data_folder = Path(data_folder)
        if not self.data_folder.exists() or not self.data_folder.is_dir():
            raise FileNotFoundError(f"Data folder {self.data_folder} does not exist!")
        
        self.covariate = covariate
        self.organism = organism
        
        # Initialize other attributes
        self.df_pca = None
        self.df_corrected = None
        self.df_filtered_outliers = None
        self.batch_index = None
        self.samples_number = None
        self.datasets = None
        self.df = None
        self.metadata = None
        
        # Load and process data
        self._load_data()

    def _load_data(self):
        """Load and process SDRF and iBAQ data from the specified data folder."""
        logger.info("Combining SDRFs and ibaq results ...")
        files = folder_retrieval(str(self.data_folder))
        
        # Load and combine metadata
        self.metadata = pd.concat([generate_meta(load_sdrf(sdrf)) for sdrf in files["sdrf"]])
        self.metadata = self.metadata.drop_duplicates()
        self.metadata.index = self.metadata["sample_id"]
        
        # Load and combine iBAQ data
        self.df = pd.concat([load_feature(ibaq) for ibaq in files["ibaq"]])
        self.df = self.df[self.df["ProteinName"].str.endswith(self.organism)]
        self.df.index = self.df["SampleID"]
        
        # Join metadata with iBAQ data
        self.df = self.df.join(self.metadata, how="left")
        
        # Set up additional attributes
        self.samples = self.df.columns.tolist()
        self.proteins = self.df["ProteinName"].unique().tolist()
        self.batch_index = get_batch_info_from_sample_names(self.df.columns)

    def read_data(self, meta: str, ibaq: str, organism="HUMAN", covariate=None):
        """
        Read metadata and iBAQ data from local files.

        Args:
            meta (str): Path to the metadata file.
            ibaq (str): Path to the iBAQ data file.
            organism (str, optional): Organism to filter for. Defaults to "HUMAN".
            covariate (str, optional): Covariate to use for analysis. Defaults to None.
        """
        self.covariate = covariate
        self.df = pd.read_csv(ibaq, index_col=0)
        self.metadata = pd.read_csv(meta)
        self.df = self.df[self.df["ProteinName"].str.endswith(organism)]
        self.df.index = self.df["SampleID"]
        self.metadata = self.metadata.drop_duplicates()
        self.df = self.df.join(self.metadata, how="left")

    def imputer(self, covariate_to_keep: list = None):
        """
        Impute missing values in the iBAQ data.

        Args:
            covariate_to_keep (list, optional): List of covariate values to keep. Defaults to None.
        """
        logger.info("Imputing merged ibaq results ...")

        if self.covariate and len(self.metadata[self.covariate].unique()) < 2:
            raise SystemExit(f"{self.covariate} should contain at least two different covariates!")

        # Filter data based on covariate_to_keep
        if covariate_to_keep:
            self.df = self.df[self.df[self.covariate].isin(covariate_to_keep)]

        # Filter out proteins with too many missing values
        self.df = filter_missing_value_by_group(
            self.df, col="ProteinName", non_missing_percent_to_keep=0.3
        )

        # Impute missing values
        if self.covariate:
            df_list = split_df_by_column(self.df, cov_index_col=self.covariate)
            df_list = [fill_samples(df, self.proteins) for df in df_list]
            df_list = impute_missing_values(df_list)
            self.df = pd.concat(df_list, axis=1)
        else:
            self.df = fill_samples(self.df, self.proteins)
            self.df = impute_missing_values(self.df)

        self.datasets = list(set([sample.split("-")[0] for sample in self.samples]))

    def outlier_removal(self, n_components: int = None, min_cluster_size: int = None,
                        min_samples_num: int = None, n_iter: int = None):
        """
        Remove outliers from the imputed data using iterative outlier removal.

        Args:
            n_components (int, optional): Number of PCA components to use. Defaults to None.
            min_cluster_size (int, optional): Minimum cluster size for HDBSCAN. Defaults to None.
            min_samples_num (int, optional): Minimum number of samples for HDBSCAN. Defaults to None.
            n_iter (int, optional): Number of iterations for outlier removal. Defaults to None.
        """
        logger.info("Removing outliers from imputed data ...")

        # Calculate sample numbers per dataset
        batches = [sample.split("-")[0] for sample in self.samples]
        self.samples_number = {dataset: batches.count(dataset) for dataset in self.datasets}
        min_samples = max(round(np.median(list(self.samples_number.values()))), 2)

        # Apply iterative outlier removal
        self.df_filtered_outliers = iterative_outlier_removal(
            self.df,
            self.batch_index,
            n_components=n_components or round(len(set(self.batch_index)) / 3),
            min_cluster_size=min_cluster_size or min_samples,
            min_samples=min_samples_num or min_samples,
            n_iter=n_iter or 5,
        )

        # Compute and plot PCA of corrected data with outliers removed
        self._compute_and_plot_pca(self.df_filtered_outliers, "PCA plot of corrected data with outliers removed",
                                   "pca_corrected_outliers_removed.png", n_components)

    def batch_correction(self, n_components: int = None, tissue_parts_to_keep: int = None):
        """
        Apply batch correction to the data.

        Args:
            n_components (int, optional): Number of PCA components to use. Defaults to None.
            tissue_parts_to_keep (int, optional): Number of tissue parts to keep. Defaults to None.
        """
        logger.info("Applying batch effect correction ...")

        # Plot PCA of uncorrected data
        self._compute_and_plot_pca(self.df, "PCA plot of uncorrected data", "pca_uncorrected.png", n_components)

        # Filter samples based on tissue parts if specified
        if tissue_parts_to_keep:
            self._filter_samples_by_tissue(tissue_parts_to_keep)

        # Prepare data for batch correction
        self._prepare_for_batch_correction()

        # Apply batch correction
        self.df_corrected = apply_batch_correction(
            self.df, self.batch_index, 
            covs=self.metadata[self.covariate].tolist() if self.covariate else []
        )

        # Plot PCA of corrected data
        self._compute_and_plot_pca(self.df_corrected, "PCA plot of corrected data", "pca_corrected.png", n_components)

    def _compute_and_plot_pca(self, data, title, output_file, n_components):
        """
        Compute PCA and plot the results.

        Args:
            data (pd.DataFrame): Data to compute PCA on.
            title (str): Title for the PCA plot.
            output_file (str): Filename for saving the PCA plot.
            n_components (int): Number of PCA components to compute.
        """
        n_components = n_components or round(len(set(self.batch_index)) / 3)
        self.df_pca = compute_pca(data.T, n_components=n_components)
        self.df_pca["batch"] = self.df_pca.index.str.split("-").str[0]
        plot_pca(self.df_pca, title=title, output_file=output_file)

    def _filter_samples_by_tissue(self, tissue_parts_to_keep):
        """
        Filter samples based on specified tissue parts.

        Args:
            tissue_parts_to_keep (int): Number of tissue parts to keep.
        """
        self.metadata = self.metadata[self.metadata["tissue_part"].isin(tissue_parts_to_keep)]
        samples_to_keep = self.metadata["sample_id"].tolist()
        self.df = self.df[[s for s in self.df.columns if s in samples_to_keep]]

    def _prepare_for_batch_correction(self):
        """Prepare data for batch correction by removing single sample batches and aligning metadata."""
        self.batch_index = get_batch_info_from_sample_names(self.df.columns.tolist())
        self.df = remove_single_sample_batches(self.df, self.batch_index)
        
        columns = self.df.columns.tolist()
        self.metadata = self.metadata[self.metadata["sample_id"].isin(columns)]
        self.metadata = self.metadata.reset_index(drop=True)
        self.metadata = (
            self.metadata.set_index("sample_id").reindex(columns, axis=0).reset_index()
        )
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant