diff --git a/code/pecotmr_integration/twas.ipynb b/code/pecotmr_integration/twas.ipynb index 1d82cbff..fa058acc 100644 --- a/code/pecotmr_integration/twas.ipynb +++ b/code/pecotmr_integration/twas.ipynb @@ -173,6 +173,21 @@ "```" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```\n", + "sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \\\n", + " --cwd /output/ --name test \\\n", + " --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \\\n", + " --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \\\n", + " --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \\\n", + " --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_region_gene_meta_data_test.tsv \\\n", + " --twas_weight_cutoff 1e-5\n", + "```" + ] + }, { "cell_type": "code", "execution_count": null, @@ -262,7 +277,7 @@ }, "outputs": [], "source": [ - "[get_analysis_regions: shared = \"regional_data\"]\n", + "[get_analysis_regions: shared = [\"filtered_region_info\", \"filtered_regional_xqtl_files\", \"regional_data\"]]\n", "from collections import OrderedDict\n", "\n", "def check_required_columns(df, required_columns):\n", @@ -375,7 +390,26 @@ "\n", "\n", "gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,gwas_name, gwas_data, column_mapping)\n", - "regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])" + "regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])\n", + "\n", + "\n", + "# get regions data \n", + "region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n", + "regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n", + "\n", + "# Filter out empty xQTL file paths\n", + "filtered_region_info = []\n", + "filtered_regional_xqtl_files = []\n", + "skipped_regions =[]\n", + "\n", + "for region, files in zip(region_info, regional_xqtl_files):\n", + " if files:\n", + " filtered_region_info.append(region)\n", + " filtered_regional_xqtl_files.append(files)\n", + " else:\n", + " skipped_regions.append(region)\n", + "print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")\n", + "#sos_variable(\"filtered_region_info\", filtered_region_info)" ] }, { @@ -388,7 +422,7 @@ "outputs": [], "source": [ "[twas]\n", - "depends: sos_variable(\"regional_data\")\n", + "depends: sos_variable(\"filtered_regional_xqtl_files\")\n", "parameter: coverage = \"cs_coverage_0.95\"\n", "# maximum number of top variant selection \n", "# Threashold for rsq and pvalue for imputability determination for a model \n", @@ -398,25 +432,7 @@ "parameter: save_ctwas_data = True\n", "parameter: save_mr_result = True\n", "\n", - "region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n", - "regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n", - "\n", - "# Filter out empty xQTL file paths\n", - "filtered_region_info = []\n", - "filtered_regional_xqtl_files = []\n", - "skipped_regions =[]\n", - "\n", - "for region, files in zip(region_info, regional_xqtl_files):\n", - " if files:\n", - " filtered_region_info.append(region)\n", - " filtered_regional_xqtl_files.append(files)\n", - " else:\n", - " skipped_regions.append(region)\n", - "\n", - "print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")\n", - "\n", "input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = \"filtered_region_info\"\n", - "\n", "output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']\n", "if save_weights_db: \n", " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.merged_twas_weights.rds')\n", @@ -424,7 +440,6 @@ " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')\n", "if save_mr_result:\n", " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')\n", - "\n", "output: output_files\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n", "R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container, entrypoint = entrypoint\n", @@ -434,7 +449,6 @@ " library(pecotmr)\n", " library(readr)\n", " \n", - " LD_meta_file_path <- \"${ld_meta_data}\"\n", " region_block <- unlist(strsplit(\"${_filtered_region_info[3]}\", \"_\\\\s*\"))\n", " chrom <- as.integer(gsub(\"chr\", \"\", region_block[1]))\n", "\n", @@ -480,7 +494,7 @@ " }\n", "\n", " # Step 2: twas analysis for imputable genes across contexts\n", - " twas_results_db <- twas_pipeline(twas_weights_results, LD_meta_file_path, \"${gwas_meta_data}\", region_block=\"${_filtered_region_info[3]}\")\n", + " twas_results_db <- twas_pipeline(twas_weights_results, \"${ld_meta_data}\", \"${gwas_meta_data}\", region_block=\"${_filtered_region_info[3]}\")\n", " fwrite(twas_results_db$twas_result, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", "\n", " # Step 3: reformat for follow up cTWAS analysis\n", @@ -504,38 +518,64 @@ "outputs": [], "source": [ "[ctwas]\n", - "depends: sos_variable(\"regional_data\")\n", + "depends: sos_variable(\"filtered_region_info\")\n", + "parameter: twas_weight_cutoff = 1e-5\n", "\n", - "input: f\"{cwd:a}/twas/{name}.*.ctwas_input.rds\", group_by = \"all\"\n", + "twas_output_files = [f'{cwd:a}/twas/{name}.{region_info[3]}.twas_data.rds' for region_info in filtered_region_info]\n", + "input:twas_output_files, group_by = \"all\"\n", "output: f\"{cwd:a}/{step_name}/{name}.ctwas_fine_mapping.tsv\"\n", "\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container, entrypoint = entrypoint\n", "\n", " library(data.table)\n", - " z_snp <- lapply(${_input:r}, readRDS)\n", - " z_gene <- do.call(rbind, lapply(${_input:nnn}.twas.tsv.gz\", fread))\n", - "\n", - " # for (study in gwas_studies){\n", - " # ctwas_res[[study]] <- ctwas_res <- ctwas_sumstats(z_snp[[study]],\n", - " # ctwas_input_data$weights,\n", - " # region_info,\n", - " # LD_map=LD_info,\n", - " # snp_map=find_data(export_ctwas_data_db, c(2, \"snp_info\")),\n", - " # z_gene = z_gene[[study]],\n", - " # thin = 1,\n", - " # ncore = ${numThreads},\n", - " # outputdir = ${_output[0]:dr},\n", - " # outname = \"${name}\",\n", - " # logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n", - " # verbose = FALSE, \n", - " # cor_dir = NULL,\n", - " # save_cor = TRUE,\n", - " # screen_method=\"nonSNP_PIP\",\n", - " # LD_format=\"custom\", \n", - " # LD_loader_fun = ctwas_ld_loader,\n", - " # scale_predictdb_weights = FALSE)\n", - " # }\n", + " library(ctwas) # multigroup_ctwas\n", + " library(pecotmr)\n", + "\n", + " regions_data <- get_ctwas_meta_data(\"${ld_meta_data}\", \"${regions}\")\n", + " gwas_studies <- unique(fread(\"${gwas_meta_data}\",data.table=FALSE, select = \"study_id\"))[,1]\n", + "\n", + " data <- lapply(c(${_input:r,}), readRDS)\n", + " snp_info <- do.call(c, lapply(data, function(x) x$snp_info))\n", + " snp_info <- snp_info[!duplicated(names(snp_info))]\n", + " weights <- do.call(c, lapply(data, function(x) x$weights))\n", + " if (!is.null(${twas_weight_cutoff})){\n", + " weights <- weights <- lapply(weights, function(x){ \n", + " x$wgt <- x$wgt[abs(x$wgt[,1]) > ${twas_weight_cutoff},,drop=FALSE]\n", + " if (nrow(x$wgt)<1) return(NULL)\n", + " x$n_wgt <- nrow(x$wgt)\n", + " return(x)\n", + " })\n", + " }\n", + " z_gene_dat <- do.call(c, lapply(data, function(x)x$z_gene))\n", + " z_snp_dat <- do.call(c, lapply(data, function(x)x$z_snp))\n", + " z_gene <- setNames(lapply(gwas_studies, function(study) do.call(rbind, z_gene_dat[names(z_gene_dat) == study])), gwas_studies)\n", + " z_snp <- setNames(lapply(gwas_studies, function(study) {\n", + " df <- do.call(rbind, z_snp_dat[names(z_snp_dat) == study])\n", + " df[!duplicated(df$id), ] \n", + " }), gwas_studies)\n", + "\n", + " ctwas_res <- list()\n", + " for (study in gwas_studies){\n", + " ctwas_res[[study]] <- ctwas_sumstats(z_snp[[study]],\n", + " weights,\n", + " region_info=regions_data$region_info,\n", + " LD_map=regions_data$LD_info,\n", + " snp_map=snp_info,\n", + " z_gene = z_gene[[study]],\n", + " thin = 0.1,\n", + " ncore = ${numThreads},\n", + " outputdir = ${_output[0]:dr},\n", + " outname = \"${name}\",\n", + " logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n", + " verbose = FALSE, \n", + " cor_dir = NULL,\n", + " save_cor = TRUE,\n", + " group_prior_var_structure = c(\"shared_context\"),\n", + " LD_format=\"custom\", \n", + " LD_loader_fun = ctwas_ld_loader,\n", + " snpinfo_loader_fun = ctwas_bimfile_loader)\n", + " }\n", "\n" ] }