From f8c2c2687d2c0cc65f5980f842180c071534b38e Mon Sep 17 00:00:00 2001 From: Chunmingl Date: Wed, 25 Sep 2024 02:22:00 -0400 Subject: [PATCH 1/2] merge pipelines, clean up, ctwas data formatting --- code/pecotmr_integration/ctwas.ipynb | 236 ------------ .../ctwas_input_processing.ipynb | 350 ------------------ code/pecotmr_integration/twas.ipynb | 159 ++++++-- 3 files changed, 125 insertions(+), 620 deletions(-) delete mode 100644 code/pecotmr_integration/ctwas.ipynb delete mode 100644 code/pecotmr_integration/ctwas_input_processing.ipynb diff --git a/code/pecotmr_integration/ctwas.ipynb b/code/pecotmr_integration/ctwas.ipynb deleted file mode 100644 index f1ec9009..00000000 --- a/code/pecotmr_integration/ctwas.ipynb +++ /dev/null @@ -1,236 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "996383b6-1060-4c66-8314-f8b33b1b5f6e", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "source": [ - "# cTWAS pipeline\n", - "This module provide an interface for multigroup cTWAS (causal TWAS) analysis with multi-context twas results across all regions.\n", - "\n", - "```\n", - "Zhao S, Crouse W, Qian S, Luo K, Stephens M, He X. Adjusting for genetic confounders in transcriptome-wide association studies improves discovery of risk genes of complex traits. Nat Genet (2024). https://doi.org/10.1038/s41588-023-01648-9\n", - "```\n", - "#### Main Steps\n", - "1. Harmonize weights, gwas against LD reference panel \n", - "2. Pre-process input data formats for cTWAS input data format. \n", - "3. Perform cTWAS analysis with LD" - ] - }, - { - "cell_type": "markdown", - "id": "f3921b69-c069-4419-8aa0-76a7f0920a3c", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "## Input\n", - "- **LD meta data**: meta data from LD processing with column names of `#chrom`, `start`, `end`, `path`. \n", - "- **regions data**: dataframe of meta data for LD block information with column names of `chr`, `start`, `stop`. \n", - "- **xqtl_region_data**: a dataframe with regions data and file path of the corresponding `refined_twas_weights_data`(*.twas.rds) data from twas pipeline output \n", - "- **gwas_meta_file**: a dataframe with GWAS summary statistics data file paths by chromosome. \n", - "\n", - "## Output\n", - "- A dataframe of cTWAS fine-mapping results for SNP and genes." - ] - }, - { - "cell_type": "markdown", - "id": "1145df9c-f481-453d-89b1-71ec6c41ae65", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "## Example\n", - "```\n", - "sos run ~/githubrepo/xqtl-protocol/code/pecotmr_integration/ctwas.ipynb ctwas \\\n", - "--cwd /mnt/vast/hpc/csg/cl4215/mrmash/workflow/ \\\n", - "--xqtl_region_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/susie_twas/regional_xqtl_data.tsv \\\n", - "--regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/twas_mr/pipeline/EUR_LD_blocks_test.bed \\\n", - "--ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \\\n", - "--gwas_meta_file /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv\n", - "```" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "d978d4d5-c0c9-470b-b733-ab83028bfafa", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[global]\n", - "parameter: cwd = path(\"output/\")\n", - "parameter: ld_meta_data = path()\n", - "parameter: gwas_meta_file = path()\n", - "# region info for the input refined_twas_weights_data\n", - "parameter: xqtl_region_data = path()\n", - "# ld region for the input data\n", - "parameter: regions = path()\n", - "parameter: name = f\"{xqtl_region_data:bn}\"\n", - "parameter: container = ''\n", - "parameter: entrypoint= ('micromamba run -a \"\" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])) if container else \"\"\n", - "parameter: job_size = 100\n", - "parameter: walltime = \"3h\"\n", - "parameter: mem = \"20G\"\n", - "parameter: numThreads = 1\n", - "import os\n", - "import pandas as pd\n", - "from collections import OrderedDict\n", - "\n", - "def check_required_columns(df, required_columns):\n", - " \"\"\"Check if the required columns are present in the dataframe.\"\"\"\n", - " missing_columns = [col for col in required_columns if col not in list(df.columns)]\n", - " if missing_columns:\n", - " raise ValueError(f\"Missing required columns: {', '.join(missing_columns)}\")\n", - "required_xqtl_region_data_columns = ['chrom','start','end','file_path']\n", - "required_ld_columns = ['chr', 'start', 'stop']" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "be63c87f-a6d1-45d8-8af7-e35ee0f5597f", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "outputs": [], - "source": [ - "[ctwas_1]\n", - "# this step we load & format input data for a ld region\n", - "regions_df = pd.read_csv(regions, sep=\"\\t\",skipinitialspace=True)\n", - "regions_df.columns = [col.strip() for col in regions_df.columns] \n", - "regions_df['chr'] = regions_df['chr'].str.strip()\n", - "xqtl_region_data = pd.read_csv(xqtl_region_data, sep=\"\\t\")\n", - "#check for required columns\n", - "check_required_columns(regions_df, required_ld_columns)\n", - "check_required_columns(xqtl_region_data, required_xqtl_region_data_columns)\n", - "\n", - "# Create a dictionary to map each LD region to its corresponding xQTL file paths\n", - "region_xqtl_dict = OrderedDict()\n", - "for _, region in regions_df.iterrows():\n", - " region_id = f\"{region['chr']}_{region['start']}_{region['stop']}\"\n", - " matching_files = xqtl_region_data[\n", - " (xqtl_region_data['chrom'] == region['chr']) & (xqtl_region_data['start'] == region['start']) & \n", - " (xqtl_region_data['end'] == region['stop'])]['file_path'].tolist()\n", - " region_xqtl_dict[region_id] = matching_files\n", - "# Generate inputs for the next steps\n", - "region_files = [file for files in region_xqtl_dict.values() for file in files]\n", - "region_ids = [region_id for region_id, files in region_xqtl_dict.items() for file in files]\n", - "region_ids_str = ','.join(f'\"{region_id}\"' for region_id in region_ids)\n", - "\n", - "if len(region_files) != len(region_ids):\n", - " raise ValueError(\"Mismatch between region_files and region_ids lengths\")\n", - "\n", - "input: region_files, paired_with=['region_ids_str']\n", - "output: f'{cwd:a}/{step_name}/{name}_ctwas.rds', f'{cwd:a}/{step_name}/{name}_weights.rds'\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container, entrypoint = entrypoint\n", - "\n", - " library(IRanges)\n", - " library(R6)\n", - " library(data.table)\n", - " devtools::load_all(\"/home/cl4215/githubrepo/pecotmr/\") #library(pecotmr)\n", - " devtools::load_all(\"/home/cl4215/githubrepo/ctwas_multigroup/ctwas/\")#library(ctwas)\n", - " \n", - " # Step1: Harmonize weights and gwas\n", - " twas_weights_data <- lapply(c(${_input:r,}), readRDS)\n", - " post_qc_data <- lapply(twas_weights_data, function(twas_data){\n", - " harmonize_twas(twas_data, \"${ld_meta_data}\", \"${gwas_meta_file}\", refined_twas_weights_loader, scale_weights=TRUE)\n", - " })\n", - " gwas_studies <- unique(names(find_data(post_qc_data, c(3, \"gwas_qced\"))))\n", - " saveRDS(post_qc_data, ${_output[1]:r})\n", - " \n", - " # Step2: Preprocess weights, LD variants data. \n", - " weights <- do.call(c, lapply(post_qc_data, function(twas_data){\n", - " get_ctwas_weights(twas_data, \"${ld_meta_data}\")# reshape weights for all gene-context pairs in the region for cTWAS analysis\n", - " }))\n", - " weights <- weights[!sapply(weights, is.null)]\n", - " \n", - " # get region_info and snp_info: LD block meta info and variant - bim file data.\n", - " region_of_interest <- region_to_df(c(${region_ids_str}))\n", - " meta_data <- get_ctwas_meta_data(\"${ld_meta_data}\", \"${regions}\")\n", - " region_info <- meta_data$region_info\n", - " LD_info <- meta_data$LD_info\n", - " \n", - " # load LD variants\n", - " bim_file_paths <- unique(do.call(c, lapply(1:nrow(region_of_interest), function(region_row){\n", - " get_regional_ld_meta(\"${ld_meta_data}\", region_of_interest[region_row,,drop=FALSE])$intersections$bim_file_paths})))\n", - " snp_info <- lapply(bim_file_paths, function(file){\n", - " bimfile <- read.table(file, header = FALSE, sep=\"\\t\")[, c(1,2,4:8)]# original colnames: \"chrom\", \"variants\", \"GD\", \"pos\", \"A1\", \"A2\", \"variance\", \"allele_freq\", \"n_nomiss\"\n", - " bimfile$V2 <- gsub(\"chr\", \"\", gsub(\"_\", \":\", bimfile$V2))\n", - " colnames(bimfile) <- c(\"chrom\", \"id\", \"pos\", \"alt\", \"ref\", \"variance\", \"allele_freq\") # A1:alt, A2: ref\n", - " return(bimfile)})\n", - " names(snp_info)<- do.call(c, lapply(bim_file_paths, function(x) { parts <- strsplit(basename(x), \"[_:/.]\")[[1]][1:3]\n", - " gsub(\"chr\", \"\", paste(parts, collapse=\"_\"))}))\n", - " \n", - " # Step3: Simple cTWAS with LD for all regions\n", - " ctwas_res <- list()\n", - " for (study in gwas_studies){\n", - " gwas_z <- do.call(rbind, lapply(post_qc_data, function(x) find_data(x, c(2, \"gwas_qced\", study), docall=rbind)))\n", - " colnames(gwas_z)[which(colnames(gwas_z)==\"variant_id\")] <- \"id\"\n", - " z_snp <- gwas_z[, c(\"id\", \"A1\", \"A2\", \"z\")]\n", - " z_snp <- z_snp[!duplicated(z_snp$id), ]\n", - " # compute gene_z for with each study z_snp \n", - " z_gene <- compute_gene_z(z_snp, weights, ncore = ${numThreads}, logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".compute_gene_z.log\")))\n", - " weights_update <- weights[names(weights)[!names(weights) %in% z_gene$id[is.na(z_gene$z)]]] # remove gene-context pair with NA result\n", - " z_gene <- z_gene[!is.na(z_gene$z),] \n", - " ctwas_res[[study]] <- ctwas_res <- ctwas_sumstats(z_snp,\n", - " weights_update,\n", - " region_info,\n", - " snp_info,\n", - " LD_info,\n", - " z_gene = z_gene,\n", - " thin = 1,\n", - " ncore = ${numThreads},\n", - " outputdir = ${_output[0]:dr},\n", - " outname = \"${name}\",\n", - " logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n", - " verbose = FALSE, \n", - " cor_dir = NULL,\n", - " save_cor = TRUE,\n", - " screen_method=\"nonSNP_PIP\",\n", - " LD_format=\"custom\", \n", - " LD_loader_fun = ctwas_ld_loader)\n", - " }\n", - "\n", - " # Step4: save results \n", - " saveRDS(ctwas_res, ${_output[0]:r})" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "SoS", - "language": "sos", - "name": "sos" - }, - "language_info": { - "codemirror_mode": "sos", - "file_extension": ".sos", - "mimetype": "text/x-sos", - "name": "sos", - "nbconvert_exporter": "sos_notebook.converter.SoS_Exporter", - "pygments_lexer": "sos" - }, - "sos": { - "kernels": [ - [ - "SoS", - "sos", - "", - "" - ] - ], - "version": "0.24.3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/code/pecotmr_integration/ctwas_input_processing.ipynb b/code/pecotmr_integration/ctwas_input_processing.ipynb deleted file mode 100644 index 18e91c3c..00000000 --- a/code/pecotmr_integration/ctwas_input_processing.ipynb +++ /dev/null @@ -1,350 +0,0 @@ -{ - "cells": [ - { - "cell_type": "markdown", - "id": "30a9d03f-35de-423c-a8f4-b89c34b2cf07", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "# Processing cTWAS Input Files\n", - "This pipeline performs tasks including the formatting and compiling of inputs such as TWAS weights, GWAS summary statistics, and LD matrices, which are essential for running ctwas-multigroup as part of the pecotmr pipeline integration. \n", - "\n", - "### Resource-Intensive Steps\n", - "- Formatting LD files \n", - "- Compiling and formatting weight files. \n", - "\n", - "### Input\n", - "- Summary table: From the twas_sparse pipeline, this metafile summarizes gene-context imputability, best method of the computation, and selected variants.\n", - "- LD meta file: We assume Assumes that LD matrices are located in the same directory as the LD meta file, with both the ld_meta_file and chrX folders organized at the same hierarchical level. \n", - "- GWAS Sumary Statistics: This file is post-quality control (QC) and harmonized against LD, including columns for `chrom`, `A1`, `A2`, `variant_id`, `z`. \n", - " - The `variant_id` represent harmonized ref and alt allele after QC formatted as {int(chr):pos:A2:A1}, where A2 is the reference allele and A1 is the alternate allele. Allele QC and harmonization are performed by Haochen Sun.\n", - " - Allele QC and harmonization is performed by Haochen Sun. \n", - "\n", - "### Output\n", - "- z_snp (dataframe): A dataframe containing GWAS summary statistics with header of `id`,`A1`,`A2`,`z`. \n", - "- weights (list): A list of weights for each gene-context pair.\n", - "- region_info (dataframe): A meta file for formatted LD files." - ] - }, - { - "cell_type": "markdown", - "id": "f3b7145f-ae33-4a85-9ef3-96d40f64947e", - "metadata": { - "kernel": "SoS" - }, - "source": [ - "### Example Command" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0e2edb10-ce80-4248-a3a2-5cef5c15ed7e", - "metadata": { - "kernel": "Bash", - "tags": [] - }, - "outputs": [], - "source": [ - "sos run xqtl-pipeline/code/pecotmr_integration/ctwas_input_processing.ipynb \\\n", - " --cwd /mnt/vast/hpc/csg/cl4215/mrmash/workflow/ctwas_test \\\n", - " --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_test.tsv \\\n", - " --gwas_sumstat /mnt/vast/hpc/csg/cl4215/mrmash/workflow/genes_300/ctwas/gwas_wg.tsv \\\n", - " --summary_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/twas_mr/pipeline/sparse/twas_sparse/TADB_enhanced_cis.coding.summary_table.tsv \\\n", - " -s build " - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "56c84caa-6bdb-40c2-a345-89c3f6159b0f", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[global]\n", - "# Parameter definitions\n", - "parameter: cwd = path(\"output/\")\n", - "parameter: container = \"\"\n", - "parameter: entrypoint = ('micromamba run -a \"\" -n' + ' ' + re.sub(r'(_apptainer:latest|_docker:latest|\\.sif)$', '', container.split('/')[-1])) if container else \"\"\n", - "parameter: customized_association_windows = path()\n", - "parameter: ld_meta_data = path()\n", - "parameter: summary_table = path()\n", - "parameter: gwas_sumstat = path()\n", - "parameter: weight_input = path()\n", - "parameter: job_size = 10\n", - "parameter: walltime = \"15h\"\n", - "parameter: mem = \"35G\"\n", - "parameter: numThreads = 2\n", - "ld_outdir = f\"{cwd}/LD\"\n", - "temp_dir = f\"{cwd}/temp\"\n", - "# ensure LD output directory exists\n", - "import os\n", - "if not os.path.exists(ld_outdir):\n", - " os.makedirs(ld_outdir)\n", - "if not os.path.exists(temp_dir):\n", - " os.makedirs(temp_dir)\n", - "import pandas as pd" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "694b3b5e-2786-424e-bce5-e40e12934af9", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[default]\n", - "sos_run(['format_ld', 'format_gwas', 'format_weight'])\n", - "sos_run(['Variants_Update'])" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "80ec964d-6043-448f-b37b-edf12dad120e", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "outputs": [], - "source": [ - "[format_ld_1]\n", - "# Read the meta-data and generate regions to be processed: chrXX_start_end\n", - "region = [f\"{x['#chrom']}_{x['start']}_{x['end']}\" for x in pd.read_csv(ld_meta_data, sep=\"\\t\").to_dict(orient='records')]\n", - "input: ld_meta_data, for_each='region', group_by=1\n", - "task: trunk_workers = 5, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', stdout = f\"{ld_outdir}/region_info_{_region}.stdout\", stderr = f\"{ld_outdir}/region_info_{_region}.stderr\", container = container, entrypoint = entrypoint \n", - " library(pecotmr)\n", - " cat(\"Formatting LD region: ${_region} ...\\n\")\n", - " region <- paste0(dirname(\"${ld_meta_data}\"), \"/\", gsub(\"_.*$\", \"\", \"${_region}\"), \"/${_region}.cor.xz\") #ld_dir/chrXX/chrXX_start_end.cor.xz\n", - " format_ctwas_ld(region, \"${ld_outdir}\")\n", - " # remove update table - so that update the meta table at each time of processing with pipeline. \n", - " meta_file_path <- paste0('${ld_outdir}', \"/LD_region_info.txt\")\n", - " if (file.exists(meta_file_path)){\n", - " # Remove the file\n", - " file.remove(meta_file_path)\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "3b5303eb-7bdf-4a7f-804a-8460dc3b3991", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_ld_2]\n", - "# This step will consolidate all processed LD information into a single file\n", - "depends: dynamic(f\"{ld_outdir}/region_info_*.stdout\") # Depends on completion of all format_ld tasks\n", - "input: group_by = 'all'\n", - "output: f\"{ld_outdir}/LD_region_info.txt\"\n", - "R: expand='${ }', container=container, entrypoint=entrypoint\n", - " library(pecotmr)\n", - " # Call the function to process all region files and save the result\n", - " processed_ld_info <- get_dir_region_info('${ld_outdir}')\n", - " write.table(processed_ld_info, ${_output:r}, sep=\"\\t\", col.names=TRUE, row.names=FALSE, quote=FALSE)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "ff99f04f-d296-45a1-b604-8385af8a5d44", - "metadata": { - "kernel": "SoS", - "tags": [] - }, - "outputs": [], - "source": [ - "[format_weight_1]\n", - "# Read the summary report and prepare for processing\n", - "summary_df = pd.read_csv(summary_table, sep='\\t')\n", - "# Create a unique identifier by concatenating 'study' and 'gene' columns\n", - "summary_df['study_gene'] = summary_df['study'] + ':' + summary_df['gene']\n", - "study_gene_indices = summary_df['study_gene'].tolist()\n", - " \n", - "input: for_each='study_gene_indices'\n", - "output: f\"{temp_dir}/output_{_study_gene_indices}.rds\"\n", - "task: trunk_workers = 4, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', stdout = f\"{temp_dir}/weight_processing_{_index}.stdout\", stderr = f\"{temp_dir}/weight_processing_{_index}.stderr\", container = container, entrypoint = entrypoint \n", - " library(pecotmr)\n", - " # get analysis unit, _index maps directly to xqtl_indices in sos\n", - " study <- strsplit('${_study_gene_indices}', ':')[[1]][1]\n", - " gene <- strsplit('${_study_gene_indices}', ':')[[1]][2]\n", - " summary_df <- read.table('${summary_table}', sep=\"\\t\", header=TRUE)\n", - " summary_report_unit <- summary_df[summary_df$study==study & summary_df$gene == gene, ] # Adding 1 because R is 1-indexed\n", - " processed_result <- get_ctwas_input(summary_report_unit, outdir=NULL, outname=NULL, \n", - " weights_input_file = ${\"NULL\" if weight_input=='.' or weight_input =='' else weight_input}, auto_save=FALSE)# This step will altomattically save results. \n", - " # save to temporary file \n", - " saveRDS(processed_result, ${_output:r})\n", - " \n", - " # in case of re-running, new will have meta data updated without being ignored.\n", - " merged_weight_file <- paste0(\"${cwd}\",\"/merged_weights_list.rds\")\n", - " if (file.exists(merged_weight_file)){\n", - " # Remove the file\n", - " file.remove(merged_weight_file)\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "5f1e59af-379c-4e69-adab-09543dbde0c2", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_weight_2]\n", - "depends: sos_step('format_weight_1')\n", - "input: f\"{temp_dir}/output_*.rds\", group_by='all'\n", - "output: f\"{cwd}/merged_weights_list.rds\"\n", - "task: trunk_workers = 4, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand='${ }', container=container, entrypoint=entrypoint\n", - " library(purrr)\n", - " # Function to recursively merge nested lists\n", - " merge_nested_lists <- function(list1, list2) {\n", - " if (is.list(list1) && is.list(list2)) {\n", - " common_names <- intersect(names(list1), names(list2))\n", - " unique_names1 <- setdiff(names(list1), names(list2))\n", - " unique_names2 <- setdiff(names(list2), names(list1))\n", - "\n", - " # Combine common elements recursively\n", - " combined <- map(common_names, ~merge_nested_lists(list1[[.x]], list2[[.x]]))\n", - " names(combined) <- common_names # Preserve names for combined elements\n", - "\n", - " # Add unique elements from both lists, ensuring names are preserved\n", - " combined <- c(combined, setNames(list1[unique_names1], unique_names1), setNames(list2[unique_names2], unique_names2))\n", - "\n", - " # Special handling at the 'weights_list' level to concatenate lists\n", - " if (\"weights_list\" %in% common_names) {\n", - " # Concatenate and preserve names within 'weights_list'\n", - " combined_weights <- c(list1$weights_list, list2$weights_list)\n", - " names(combined_weights) <- c(names(list1$weights_list), names(list2$weights_list))\n", - " combined$weights_list <- combined_weights\n", - " }\n", - "\n", - " return(combined)\n", - " } else {\n", - " # For non-list items just concatenate\n", - " return(c(list1, list2))\n", - " }\n", - " }\n", - "\n", - " # Function to merge all RDS files\n", - " merge_all_rds <- function(file_paths) {\n", - " all_lists <- lapply(file_paths, readRDS)\n", - " Reduce(merge_nested_lists, all_lists)\n", - " }\n", - "\n", - " file_paths <- c(${_input:r,})\n", - " merged_list <- merge_all_rds(file_paths)\n", - " weights <- do.call(c, lapply(1:22, function(chr){merged_list[[1]][[paste0(\"chr\", chr)]][[\"weights_list\"]]}))\n", - " weights <- weights[!duplicated(names(weights))]\n", - " saveRDS(weights, ${_output:r})" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "0c31f46f-ec19-4421-af35-12d32703921d", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[format_gwas]\n", - "input: gwas_sumstat\n", - "output: f\"{cwd}/gwas_z_snp.tsv\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd}/gwas_z_snp.stdout\", stderr = f\"{cwd}/gwas_z_snp.stderr\", container = container, entrypoint = entrypoint \n", - " z_snp <- read.table(${_input:r}, sep=\"\\t\", header=TRUE)\n", - " z_snp$A1 <- trimws(z_snp$variant_id, whitespace = \".*\\\\:\")\n", - " z_snp$A2 <- sapply(z_snp$variant_id, function(var) {strsplit(var, \"\\\\:\")[[1]][3]})\n", - " z_snp <- z_snp[, c(\"variant_id\", \"A1\", \"A2\", \"z\")]\n", - " colnames(z_snp)[1]<-\"id\"\n", - " z_snp$id <- paste0(\"chr\",z_snp$id)\n", - " write.table(z_snp, ${_output:r}, sep=\"\\t\", quote=FALSE, row.names=FALSE)\n", - " # in case of re-running, new will have meta data updated without being ignored.\n", - " if (file.exists('${_output:n}_updated.tsv')){\n", - " # Remove the file\n", - " file.remove('${_output:n}_updated.tsv')\n", - " }" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "03ec6706-e942-4b8a-b3c0-73f25265dc4b", - "metadata": { - "kernel": "SoS" - }, - "outputs": [], - "source": [ - "[Variants_Update]\n", - "depends: sos_step('format_weight_1'),sos_step('format_weight_2'), sos_step('format_ld_1'), sos_step('format_ld_2'), sos_step('format_gwas')\n", - "input: f\"{cwd}/gwas_z_snp.tsv\", f\"{cwd}/merged_weights_list.rds\", f\"{ld_outdir}/LD_region_info.txt\", group_by='all'\n", - "output: f\"{cwd}/gwas_z_snp_updated.tsv\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd}/z_snp_variant_update.stdout\", stderr = f\"{cwd}/z_snp_variant_update.stderr\", container = container, entrypoint = entrypoint \n", - "\n", - " z_snp <- read.table('${_input[0]}', sep=\"\\t\", header=TRUE)\n", - " weights <- readRDS('${_input[1]}')\n", - " region_info <- read.table('${_input[2]}', sep=\"\\t\", header=TRUE)\n", - "\n", - " all_wgt_var <- unique(do.call(c, lapply(names(weights), function(genes_con){\n", - " rownames(weights[[genes_con]]$wgt)\n", - " })))\n", - "\n", - " ### Checking and updating all snp variant are in LD variant\n", - " all_ld_var<- do.call(c, lapply(region_info$SNP_info, function(rvar){\n", - " read.table(rvar, sep=\"\\t\", header=TRUE)$id\n", - " }))\n", - " ###update z_snp\n", - " z_snp <- z_snp[z_snp$id %in% all_ld_var,]\n", - " \n", - " if (!all(all_wgt_var %in% z_snp$id)){\n", - " stop(\"Weight file '${_input[1]}' included variants that cannot be found in z_snp. \")\n", - " }\n", - " if (!all(z_snp$id %in% all_ld_var)){\n", - " stop(\"z_snp included variants that cannot be found in LD reference. \")\n", - " }\n", - " write.table(z_snp, '${_output}', sep=\"\\t\", row.names=FALSE, quote=FALSE)" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "SoS", - "language": "sos", - "name": "sos" - }, - "language_info": { - "codemirror_mode": "sos", - "file_extension": ".sos", - "mimetype": "text/x-sos", - "name": "sos", - "nbconvert_exporter": "sos_notebook.converter.SoS_Exporter", - "pygments_lexer": "sos" - }, - "sos": { - "kernels": [ - [ - "SoS", - "sos", - "", - "" - ] - ], - "version": "0.24.3" - } - }, - "nbformat": 4, - "nbformat_minor": 5 -} diff --git a/code/pecotmr_integration/twas.ipynb b/code/pecotmr_integration/twas.ipynb index ccf69a71..9d75dba3 100644 --- a/code/pecotmr_integration/twas.ipynb +++ b/code/pecotmr_integration/twas.ipynb @@ -163,13 +163,28 @@ "## Example\n", "```\n", "sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb twas \\\n", - " --cwd output/ \\\n", + " --cwd /output/ --name test \\\n", " --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \\\n", " --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \\\n", - " --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/twas_mr/pipeline/EUR_LD_blocks_test.bed \\\n", - " --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/susie_twas/Fungen_xQTL.cis_results_db_TSS.tsv \\\n", - " --xqtl_type_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/susie_twas/Fungen_xQTL.cis_results_db_TSS_xqtl_type.tsv \\\n", - " --p_value_cutoff 0.05 --rsq_threshold 0.01 --twas\n", + " --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \\\n", + " --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_region_gene_meta_data_test.tsv \\\n", + " --xqtl_type_table /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/data_type_table.txt \\\n", + " --p_value_cutoff 0.05 --rsq_threshold 0.01\n", + "```" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```\n", + "sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \\\n", + " --cwd /output/ --name test \\\n", + " --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \\\n", + " --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \\\n", + " --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \\\n", + " --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_region_gene_meta_data_test.tsv \\\n", + " --twas_weight_cutoff 1e-5\n", "```" ] }, @@ -262,7 +277,7 @@ }, "outputs": [], "source": [ - "[get_analysis_regions: shared = \"regional_data\"]\n", + "[get_analysis_regions: shared = [\"filtered_region_info\", \"filtered_regional_xqtl_files\", \"regional_data\"]]\n", "from collections import OrderedDict\n", "\n", "def check_required_columns(df, required_columns):\n", @@ -375,7 +390,25 @@ "\n", "\n", "gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,gwas_name, gwas_data, column_mapping)\n", - "regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])" + "regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])\n", + "\n", + "\n", + "# get regions data \n", + "region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n", + "regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n", + "\n", + "# Filter out empty xQTL file paths\n", + "filtered_region_info = []\n", + "filtered_regional_xqtl_files = []\n", + "skipped_regions =[]\n", + "\n", + "for region, files in zip(region_info, regional_xqtl_files):\n", + " if files:\n", + " filtered_region_info.append(region)\n", + " filtered_regional_xqtl_files.append(files)\n", + " else:\n", + " skipped_regions.append(region)\n", + "print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")" ] }, { @@ -388,7 +421,7 @@ "outputs": [], "source": [ "[twas]\n", - "depends: sos_variable(\"regional_data\")\n", + "depends: sos_variable(\"filtered_regional_xqtl_files\")\n", "parameter: coverage = \"cs_coverage_0.95\"\n", "# maximum number of top variant selection \n", "# Threashold for rsq and pvalue for imputability determination for a model \n", @@ -396,32 +429,16 @@ "parameter: rsq_threshold = 0.01\n", "parameter: save_weights_db = False\n", "parameter: save_ctwas_data = True\n", - "\n", - "region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n", - "regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n", - "\n", - "# Filter out empty xQTL file paths\n", - "filtered_region_info = []\n", - "filtered_regional_xqtl_files = []\n", - "skipped_regions =[]\n", - "\n", - "for region, files in zip(region_info, regional_xqtl_files):\n", - " if files:\n", - " filtered_region_info.append(region)\n", - " filtered_regional_xqtl_files.append(files)\n", - " else:\n", - " skipped_regions.append(region)\n", - "\n", - "print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")\n", + "parameter: save_mr_result = True\n", "\n", "input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = \"filtered_region_info\"\n", - "\n", "output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']\n", "if save_weights_db: \n", " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.merged_twas_weights.rds')\n", "if save_ctwas_data:\n", - " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.ctwas_input.rds')\n", - "\n", + " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')\n", + "if save_mr_result:\n", + " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')\n", "output: output_files\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n", "R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container, entrypoint = entrypoint\n", @@ -429,8 +446,8 @@ " library(dplyr)\n", " library(data.table)\n", " library(pecotmr)\n", + " library(readr)\n", " \n", - " LD_meta_file_path <- \"${ld_meta_data}\"\n", " region_block <- unlist(strsplit(\"${_filtered_region_info[3]}\", \"_\\\\s*\"))\n", " chrom <- as.integer(gsub(\"chr\", \"\", region_block[1]))\n", "\n", @@ -476,16 +493,90 @@ " }\n", "\n", " # Step 2: twas analysis for imputable genes across contexts\n", - " twas_table <- twas_pipeline(twas_weights_results, LD_meta_file_path, \"${gwas_meta_data}\", twas_weights_loader=twas_weights_loader,\n", - " region_block=\"${_filtered_region_info[3]}\", no_skip_twas=${\"TRUE\" if twas else \"FALSE\"})\n", - " fwrite(twas_table, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\", compression_level = 9)\n", + " twas_results_db <- twas_pipeline(twas_weights_results, \"${ld_meta_data}\", \"${gwas_meta_data}\", region_block=\"${_filtered_region_info[3]}\")\n", + " fwrite(twas_results_db$twas_result, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", "\n", " # Step 3: reformat for follow up cTWAS analysis\n", " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " # FIXME: let's implement this now\n", - " saveRDS(NULL,\"${_output[0]:nnn}.ctwas_input.rds\", compress='xz')\n", + " saveRDS(twas_results_db$twas_data, \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", + " }\n", + " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", + " fwrite(twas_results_db$mr_result, file = \"${_output[0]:nnn}.mr_result.tsv\", sep = \"\\t\", compress = \"gzip\")\n", " }" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "kernel": "SoS", + "vscode": { + "languageId": "plaintext" + } + }, + "outputs": [], + "source": [ + "[ctwas]\n", + "depends: sos_variable(\"filtered_region_info\")\n", + "parameter: twas_weight_cutoff = 1e-5\n", + "\n", + "twas_output_files = [f'{cwd:a}/twas/{name}.{region_info[3]}.twas_data.rds' for region_info in filtered_region_info]\n", + "input:twas_output_files, group_by = \"all\"\n", + "output: f\"{cwd:a}/{step_name}/{name}.ctwas_fine_mapping.tsv\"\n", + "\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", + "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container, entrypoint = entrypoint\n", + "\n", + " library(data.table)\n", + " library(ctwas) # multigroup_ctwas\n", + " library(pecotmr)\n", + "\n", + " regions_data <- get_ctwas_meta_data(\"${ld_meta_data}\", \"${regions}\", \"${xqtl_meta_data}\")\n", + " gwas_studies <- unique(fread(\"${gwas_meta_data}\",data.table=FALSE, select = \"study_id\"))[,1]\n", + "\n", + " data <- lapply(c(${_input:r,}), readRDS)\n", + " snp_info <- do.call(c, lapply(data, function(x) x$snp_info))\n", + " snp_info <- snp_info[!duplicated(names(snp_info))]\n", + " weights <- do.call(c, lapply(data, function(x) x$weights))\n", + " if (!is.null(${twas_weight_cutoff})){\n", + " weights <- weights <- lapply(weights, function(x){ \n", + " x$wgt <- x$wgt[abs(x$wgt[,1]) > ${twas_weight_cutoff},,drop=FALSE]\n", + " if (nrow(x$wgt)<1) return(NULL)\n", + " x$n_wgt <- nrow(x$wgt)\n", + " return(x)\n", + " })\n", + " }\n", + " z_gene_dat <- do.call(c, lapply(data, function(x)x$z_gene))\n", + " z_snp_dat <- do.call(c, lapply(data, function(x)x$z_snp))\n", + " z_gene <- setNames(lapply(gwas_studies, function(study) do.call(rbind, z_gene_dat[names(z_gene_dat) == study])), gwas_studies)\n", + " z_snp <- setNames(lapply(gwas_studies, function(study) {\n", + " df <- do.call(rbind, z_snp_dat[names(z_snp_dat) == study])\n", + " df[!duplicated(df$id), ] \n", + " }), gwas_studies)\n", + "\n", + " ctwas_res <- list()\n", + " for (study in gwas_studies){\n", + " ctwas_res[[study]] <- ctwas_sumstats(z_snp[[study]],\n", + " weights,\n", + " region_info=regions_data$region_info,\n", + " LD_map=regions_data$LD_info,\n", + " snp_map=snp_info,\n", + " z_gene = z_gene[[study]],\n", + " thin = 0.1,\n", + " ncore = ${numThreads},\n", + " outputdir = ${_output[0]:dr},\n", + " outname = \"${name}\",\n", + " logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n", + " verbose = FALSE, \n", + " cor_dir = NULL,\n", + " save_cor = FALSE,\n", + " group_prior_var_structure = c(\"shared_context\"),\n", + " LD_format=\"custom\", \n", + " LD_loader_fun = ctwas_ld_loader,\n", + " snpinfo_loader_fun = ctwas_bimfile_loader)\n", + " }\n", + "\n" + ] } ], "metadata": { From 698e0a8a982ba07402f2235c922ae4fe4f4d82a6 Mon Sep 17 00:00:00 2001 From: Chunmingl Date: Wed, 25 Sep 2024 03:31:40 -0400 Subject: [PATCH 2/2] add message --- code/pecotmr_integration/twas.ipynb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/code/pecotmr_integration/twas.ipynb b/code/pecotmr_integration/twas.ipynb index 9d75dba3..82e6645b 100644 --- a/code/pecotmr_integration/twas.ipynb +++ b/code/pecotmr_integration/twas.ipynb @@ -540,7 +540,9 @@ " weights <- do.call(c, lapply(data, function(x) x$weights))\n", " if (!is.null(${twas_weight_cutoff})){\n", " weights <- weights <- lapply(weights, function(x){ \n", + " allvariants <- nrow(x$wgt)\n", " x$wgt <- x$wgt[abs(x$wgt[,1]) > ${twas_weight_cutoff},,drop=FALSE]\n", + " message(paste0(\"Dropped \", allvariants-nrow(x$wgt), \" variants by twas weight cutoff \", ${twas_weight_cutoff}, \". \"))\n", " if (nrow(x$wgt)<1) return(NULL)\n", " x$n_wgt <- nrow(x$wgt)\n", " return(x)\n",