diff --git a/code/pecotmr_integration/ctwas_input_processing.ipynb b/code/pecotmr_integration/ctwas_input_processing.ipynb deleted file mode 100644 index 18e91c3c..00000000 --- a/code/pecotmr_integration/ctwas_input_processing.ipynb +++ /dev/null @@ -1,350 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "30a9d03f-35de-423c-a8f4-b89c34b2cf07", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "# Processing cTWAS Input Files\n", - "This pipeline performs tasks including the formatting and compiling of inputs such as TWAS weights, GWAS summary statistics, and LD matrices, which are essential for running ctwas-multigroup as part of the pecotmr pipeline integration. \n", - "\n", - "### Resource-Intensive Steps\n", - "- Formatting LD files \n", - "- Compiling and formatting weight files. \n", - "\n", - "### Input\n", - "- Summary table: From the twas_sparse pipeline, this metafile summarizes gene-context imputability, best method of the computation, and selected variants.\n", - "- LD meta file: We assume Assumes that LD matrices are located in the same directory as the LD meta file, with both the ld_meta_file and chrX folders organized at the same hierarchical level. \n", - "- GWAS Sumary Statistics: This file is post-quality control (QC) and harmonized against LD, including columns for `chrom`, `A1`, `A2`, `variant_id`, `z`. \n", - " - The `variant_id` represent harmonized ref and alt allele after QC formatted as {int(chr):pos:A2:A1}, where A2 is the reference allele and A1 is the alternate allele. Allele QC and harmonization are performed by Haochen Sun.\n", - " - Allele QC and harmonization is performed by Haochen Sun. \n", - "\n", - "### Output\n", - "- z_snp (dataframe): A dataframe containing GWAS summary statistics with header of `id`,`A1`,`A2`,`z`. \n", - "- weights (list): A list of weights for each gene-context pair.\n", - "- region_info (dataframe): A meta file for formatted LD files." - ] - }, - { - "cell_type": "markdown", - "id": "f3b7145f-ae33-4a85-9ef3-96d40f64947e", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "### Example Command" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0e2edb10-ce80-4248-a3a2-5cef5c15ed7e", - "metadata": { - "kernel": "Bash", - "tags": [] - }, - "outputs": [], - "source": [ - "sos run xqtl-pipeline/code/pecotmr_integration/ctwas_input_processing.ipynb \\\n", - " --cwd /mnt/vast/hpc/csg/cl4215/mrmash/workflow/ctwas_test \\\n", - " --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_test.tsv \\\n", - " --gwas_sumstat /mnt/vast/hpc/csg/cl4215/mrmash/workflow/genes_300/ctwas/gwas_wg.tsv \\\n", - " --summary_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/twas_mr/pipeline/sparse/twas_sparse/TADB_enhanced_cis.coding.summary_table.tsv \\\n", - " -s build " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "56c84caa-6bdb-40c2-a345-89c3f6159b0f", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[global]\n", - "# Parameter definitions\n", - "parameter: cwd = path(\"output/\")\n", - "parameter: container = \"\"\n", - "parameter: entrypoint = ('micromamba run -a \"\" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])) if container else \"\"\n", - "parameter: customized_association_windows = path()\n", - "parameter: ld_meta_data = path()\n", - "parameter: summary_table = path()\n", - "parameter: gwas_sumstat = path()\n", - "parameter: weight_input = path()\n", - "parameter: job_size = 10\n", - "parameter: walltime = \"15h\"\n", - "parameter: mem = \"35G\"\n", - "parameter: numThreads = 2\n", - "ld_outdir = f\"{cwd}/LD\"\n", - "temp_dir = f\"{cwd}/temp\"\n", - "# ensure LD output directory exists\n", - "import os\n", - "if not os.path.exists(ld_outdir):\n", - " os.makedirs(ld_outdir)\n", - "if not os.path.exists(temp_dir):\n", - " os.makedirs(temp_dir)\n", - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "694b3b5e-2786-424e-bce5-e40e12934af9", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[default]\n", - "sos_run(['format_ld', 'format_gwas', 'format_weight'])\n", - "sos_run(['Variants_Update'])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "80ec964d-6043-448f-b37b-edf12dad120e", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "outputs": [], - "source": [ - "[format_ld_1]\n", - "# Read the meta-data and generate regions to be processed: chrXX_start_end\n", - "region = [f\"{x['#chrom']}_{x['start']}_{x['end']}\" for x in pd.read_csv(ld_meta_data, sep=\"\\t\").to_dict(orient='records')]\n", - "input: ld_meta_data, for_each='region', group_by=1\n", - "task: trunk_workers = 5, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', stdout = f\"{ld_outdir}/region_info_{_region}.stdout\", stderr = f\"{ld_outdir}/region_info_{_region}.stderr\", container = container, entrypoint = entrypoint \n", - " library(pecotmr)\n", - " cat(\"Formatting LD region: ${_region} ...\\n\")\n", - " region <- paste0(dirname(\"${ld_meta_data}\"), \"/\", gsub(\"_.*$\", \"\", \"${_region}\"), \"/${_region}.cor.xz\") #ld_dir/chrXX/chrXX_start_end.cor.xz\n", - " format_ctwas_ld(region, \"${ld_outdir}\")\n", - " # remove update table - so that update the meta table at each time of processing with pipeline. \n", - " meta_file_path <- paste0('${ld_outdir}', \"/LD_region_info.txt\")\n", - " if (file.exists(meta_file_path)){\n", - " # Remove the file\n", - " file.remove(meta_file_path)\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3b5303eb-7bdf-4a7f-804a-8460dc3b3991", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_ld_2]\n", - "# This step will consolidate all processed LD information into a single file\n", - "depends: dynamic(f\"{ld_outdir}/region_info_*.stdout\") # Depends on completion of all format_ld tasks\n", - "input: group_by = 'all'\n", - "output: f\"{ld_outdir}/LD_region_info.txt\"\n", - "R: expand='${ }', container=container, entrypoint=entrypoint\n", - " library(pecotmr)\n", - " # Call the function to process all region files and save the result\n", - " processed_ld_info <- get_dir_region_info('${ld_outdir}')\n", - " write.table(processed_ld_info, ${_output:r}, sep=\"\\t\", col.names=TRUE, row.names=FALSE, quote=FALSE)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ff99f04f-d296-45a1-b604-8385af8a5d44", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "outputs": [], - "source": [ - "[format_weight_1]\n", - "# Read the summary report and prepare for processing\n", - "summary_df = pd.read_csv(summary_table, sep='\\t')\n", - "# Create a unique identifier by concatenating 'study' and 'gene' columns\n", - "summary_df['study_gene'] = summary_df['study'] + ':' + summary_df['gene']\n", - "study_gene_indices = summary_df['study_gene'].tolist()\n", - " \n", - "input: for_each='study_gene_indices'\n", - "output: f\"{temp_dir}/output_{_study_gene_indices}.rds\"\n", - "task: trunk_workers = 4, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', stdout = f\"{temp_dir}/weight_processing_{_index}.stdout\", stderr = f\"{temp_dir}/weight_processing_{_index}.stderr\", container = container, entrypoint = entrypoint \n", - " library(pecotmr)\n", - " # get analysis unit, _index maps directly to xqtl_indices in sos\n", - " study <- strsplit('${_study_gene_indices}', ':')[[1]][1]\n", - " gene <- strsplit('${_study_gene_indices}', ':')[[1]][2]\n", - " summary_df <- read.table('${summary_table}', sep=\"\\t\", header=TRUE)\n", - " summary_report_unit <- summary_df[summary_df$study==study & summary_df$gene == gene, ] # Adding 1 because R is 1-indexed\n", - " processed_result <- get_ctwas_input(summary_report_unit, outdir=NULL, outname=NULL, \n", - " weights_input_file = ${\"NULL\" if weight_input=='.' or weight_input =='' else weight_input}, auto_save=FALSE)# This step will altomattically save results. \n", - " # save to temporary file \n", - " saveRDS(processed_result, ${_output:r})\n", - " \n", - " # in case of re-running, new will have meta data updated without being ignored.\n", - " merged_weight_file <- paste0(\"${cwd}\",\"/merged_weights_list.rds\")\n", - " if (file.exists(merged_weight_file)){\n", - " # Remove the file\n", - " file.remove(merged_weight_file)\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5f1e59af-379c-4e69-adab-09543dbde0c2", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_weight_2]\n", - "depends: sos_step('format_weight_1')\n", - "input: f\"{temp_dir}/output_*.rds\", group_by='all'\n", - "output: f\"{cwd}/merged_weights_list.rds\"\n", - "task: trunk_workers = 4, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', container=container, entrypoint=entrypoint\n", - " library(purrr)\n", - " # Function to recursively merge nested lists\n", - " merge_nested_lists <- function(list1, list2) {\n", - " if (is.list(list1) && is.list(list2)) {\n", - " common_names <- intersect(names(list1), names(list2))\n", - " unique_names1 <- setdiff(names(list1), names(list2))\n", - " unique_names2 <- setdiff(names(list2), names(list1))\n", - "\n", - " # Combine common elements recursively\n", - " combined <- map(common_names, ~merge_nested_lists(list1[[.x]], list2[[.x]]))\n", - " names(combined) <- common_names # Preserve names for combined elements\n", - "\n", - " # Add unique elements from both lists, ensuring names are preserved\n", - " combined <- c(combined, setNames(list1[unique_names1], unique_names1), setNames(list2[unique_names2], unique_names2))\n", - "\n", - " # Special handling at the 'weights_list' level to concatenate lists\n", - " if (\"weights_list\" %in% common_names) {\n", - " # Concatenate and preserve names within 'weights_list'\n", - " combined_weights <- c(list1$weights_list, list2$weights_list)\n", - " names(combined_weights) <- c(names(list1$weights_list), names(list2$weights_list))\n", - " combined$weights_list <- combined_weights\n", - " }\n", - "\n", - " return(combined)\n", - " } else {\n", - " # For non-list items just concatenate\n", - " return(c(list1, list2))\n", - " }\n", - " }\n", - "\n", - " # Function to merge all RDS files\n", - " merge_all_rds <- function(file_paths) {\n", - " all_lists <- lapply(file_paths, readRDS)\n", - " Reduce(merge_nested_lists, all_lists)\n", - " }\n", - "\n", - " file_paths <- c(${_input:r,})\n", - " merged_list <- merge_all_rds(file_paths)\n", - " weights <- do.call(c, lapply(1:22, function(chr){merged_list[[1]][[paste0(\"chr\", chr)]][[\"weights_list\"]]}))\n", - " weights <- weights[!duplicated(names(weights))]\n", - " saveRDS(weights, ${_output:r})" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0c31f46f-ec19-4421-af35-12d32703921d", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_gwas]\n", - "input: gwas_sumstat\n", - "output: f\"{cwd}/gwas_z_snp.tsv\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd}/gwas_z_snp.stdout\", stderr = f\"{cwd}/gwas_z_snp.stderr\", container = container, entrypoint = entrypoint \n", - " z_snp <- read.table(${_input:r}, sep=\"\\t\", header=TRUE)\n", - " z_snp$A1 <- trimws(z_snp$variant_id, whitespace = \".*\\\\:\")\n", - " z_snp$A2 <- sapply(z_snp$variant_id, function(var) {strsplit(var, \"\\\\:\")[[1]][3]})\n", - " z_snp <- z_snp[, c(\"variant_id\", \"A1\", \"A2\", \"z\")]\n", - " colnames(z_snp)[1]<-\"id\"\n", - " z_snp$id <- paste0(\"chr\",z_snp$id)\n", - " write.table(z_snp, ${_output:r}, sep=\"\\t\", quote=FALSE, row.names=FALSE)\n", - " # in case of re-running, new will have meta data updated without being ignored.\n", - " if (file.exists('${_output:n}_updated.tsv')){\n", - " # Remove the file\n", - " file.remove('${_output:n}_updated.tsv')\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "03ec6706-e942-4b8a-b3c0-73f25265dc4b", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[Variants_Update]\n", - "depends: sos_step('format_weight_1'),sos_step('format_weight_2'), sos_step('format_ld_1'), sos_step('format_ld_2'), sos_step('format_gwas')\n", - "input: f\"{cwd}/gwas_z_snp.tsv\", f\"{cwd}/merged_weights_list.rds\", f\"{ld_outdir}/LD_region_info.txt\", group_by='all'\n", - "output: f\"{cwd}/gwas_z_snp_updated.tsv\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd}/z_snp_variant_update.stdout\", stderr = f\"{cwd}/z_snp_variant_update.stderr\", container = container, entrypoint = entrypoint \n", - "\n", - " z_snp <- read.table('${_input[0]}', sep=\"\\t\", header=TRUE)\n", - " weights <- readRDS('${_input[1]}')\n", - " region_info <- read.table('${_input[2]}', sep=\"\\t\", header=TRUE)\n", - "\n", - " all_wgt_var <- unique(do.call(c, lapply(names(weights), function(genes_con){\n", - " rownames(weights[[genes_con]]$wgt)\n", - " })))\n", - "\n", - " ### Checking and updating all snp variant are in LD variant\n", - " all_ld_var<- do.call(c, lapply(region_info$SNP_info, function(rvar){\n", - " read.table(rvar, sep=\"\\t\", header=TRUE)$id\n", - " }))\n", - " ###update z_snp\n", - " z_snp <- z_snp[z_snp$id %in% all_ld_var,]\n", - " \n", - " if (!all(all_wgt_var %in% z_snp$id)){\n", - " stop(\"Weight file '${_input[1]}' included variants that cannot be found in z_snp. \")\n", - " }\n", - " if (!all(z_snp$id %in% all_ld_var)){\n", - " stop(\"z_snp included variants that cannot be found in LD reference. \")\n", - " }\n", - " write.table(z_snp, '${_output}', sep=\"\\t\", row.names=FALSE, quote=FALSE)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "SoS", - "language": "sos", - "name": "sos" - }, - "language_info": { - "codemirror_mode": "sos", - "file_extension": ".sos", - "mimetype": "text/x-sos", - "name": "sos", - "nbconvert_exporter": "sos_notebook.converter.SoS_Exporter", - "pygments_lexer": "sos" - }, - "sos": { - "kernels": [ - [ - "SoS", - "sos", - "", - "" - ] - ], - "version": "0.24.3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -}