Skip to content

Commit

Permalink
ctwas loaders fand ormatting
Browse files Browse the repository at this point in the history
  • Loading branch information
Chunmingl committed Sep 25, 2024
1 parent b3fe7de commit 8894e68
Showing 1 changed file with 89 additions and 49 deletions.
138 changes: 89 additions & 49 deletions code/pecotmr_integration/twas.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,21 @@
"```"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
"```\n",
"sos run xqtl-protocol/code/pecotmr_integration/twas.ipynb ctwas \\\n",
" --cwd /output/ --name test \\\n",
" --gwas_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/GWAS/gwas_meta.tsv \\\n",
" --ld_meta_data /mnt/vast/hpc/csg/data_public/20240409_ADSP_LD_matrix/ld_meta_file.tsv \\\n",
" --regions /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_EUR_LD_blocks.bed \\\n",
" --xqtl_meta_data /mnt/vast/hpc/csg/cl4215/mrmash/workflow/pipeline_data/picalm_region_gene_meta_data_test.tsv \\\n",
" --twas_weight_cutoff 1e-5\n",
"```"
]
},
{
"cell_type": "code",
"execution_count": null,
Expand Down Expand Up @@ -262,7 +277,7 @@
},
"outputs": [],
"source": [
"[get_analysis_regions: shared = \"regional_data\"]\n",
"[get_analysis_regions: shared = [\"filtered_region_info\", \"filtered_regional_xqtl_files\", \"regional_data\"]]\n",
"from collections import OrderedDict\n",
"\n",
"def check_required_columns(df, required_columns):\n",
Expand Down Expand Up @@ -375,7 +390,26 @@
"\n",
"\n",
"gwas_dict, xqtl_dict, regions_dict = extract_regional_data(gwas_meta_data, xqtl_meta_data,regions,gwas_name, gwas_data, column_mapping)\n",
"regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])"
"regional_data = dict([(\"GWAS\", gwas_dict), (\"xQTL\", xqtl_dict), (\"Regions\", regions_dict)])\n",
"\n",
"\n",
"# get regions data \n",
"region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n",
"regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n",
"\n",
"# Filter out empty xQTL file paths\n",
"filtered_region_info = []\n",
"filtered_regional_xqtl_files = []\n",
"skipped_regions =[]\n",
"\n",
"for region, files in zip(region_info, regional_xqtl_files):\n",
" if files:\n",
" filtered_region_info.append(region)\n",
" filtered_regional_xqtl_files.append(files)\n",
" else:\n",
" skipped_regions.append(region)\n",
"print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")\n",
"#sos_variable(\"filtered_region_info\", filtered_region_info)"
]
},
{
Expand All @@ -388,7 +422,7 @@
"outputs": [],
"source": [
"[twas]\n",
"depends: sos_variable(\"regional_data\")\n",
"depends: sos_variable(\"filtered_regional_xqtl_files\")\n",
"parameter: coverage = \"cs_coverage_0.95\"\n",
"# maximum number of top variant selection \n",
"# Threashold for rsq and pvalue for imputability determination for a model \n",
Expand All @@ -398,33 +432,14 @@
"parameter: save_ctwas_data = True\n",
"parameter: save_mr_result = True\n",
"\n",
"region_info = [x[\"meta_info\"] for x in regional_data['Regions'].values()]\n",
"regional_xqtl_files = [x[\"files\"] for x in regional_data['Regions'].values()]\n",
"\n",
"# Filter out empty xQTL file paths\n",
"filtered_region_info = []\n",
"filtered_regional_xqtl_files = []\n",
"skipped_regions =[]\n",
"\n",
"for region, files in zip(region_info, regional_xqtl_files):\n",
" if files:\n",
" filtered_region_info.append(region)\n",
" filtered_regional_xqtl_files.append(files)\n",
" else:\n",
" skipped_regions.append(region)\n",
"\n",
"print(f\"Skipping {len(skipped_regions)} out of {len(regional_xqtl_files)} regions, no overlapping xQTL weights found. \")\n",
"\n",
"input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = \"filtered_region_info\"\n",
"\n",
"output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']\n",
"if save_weights_db: \n",
" output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.merged_twas_weights.rds')\n",
"if save_ctwas_data:\n",
" output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')\n",
"if save_mr_result:\n",
" output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')\n",
"\n",
"output: output_files\n",
"task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n",
"R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container, entrypoint = entrypoint\n",
Expand All @@ -434,7 +449,6 @@
" library(pecotmr)\n",
" library(readr)\n",
" \n",
" LD_meta_file_path <- \"${ld_meta_data}\"\n",
" region_block <- unlist(strsplit(\"${_filtered_region_info[3]}\", \"_\\\\s*\"))\n",
" chrom <- as.integer(gsub(\"chr\", \"\", region_block[1]))\n",
"\n",
Expand Down Expand Up @@ -480,7 +494,7 @@
" }\n",
"\n",
" # Step 2: twas analysis for imputable genes across contexts\n",
" twas_results_db <- twas_pipeline(twas_weights_results, LD_meta_file_path, \"${gwas_meta_data}\", region_block=\"${_filtered_region_info[3]}\")\n",
" twas_results_db <- twas_pipeline(twas_weights_results, \"${ld_meta_data}\", \"${gwas_meta_data}\", region_block=\"${_filtered_region_info[3]}\")\n",
" fwrite(twas_results_db$twas_result, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n",
"\n",
" # Step 3: reformat for follow up cTWAS analysis\n",
Expand All @@ -504,38 +518,64 @@
"outputs": [],
"source": [
"[ctwas]\n",
"depends: sos_variable(\"regional_data\")\n",
"depends: sos_variable(\"filtered_region_info\")\n",
"parameter: twas_weight_cutoff = 1e-5\n",
"\n",
"input: f\"{cwd:a}/twas/{name}.*.ctwas_input.rds\", group_by = \"all\"\n",
"twas_output_files = [f'{cwd:a}/twas/{name}.{region_info[3]}.twas_data.rds' for region_info in filtered_region_info]\n",
"input:twas_output_files, group_by = \"all\"\n",
"output: f\"{cwd:a}/{step_name}/{name}.ctwas_fine_mapping.tsv\"\n",
"\n",
"task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n",
"R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container, entrypoint = entrypoint\n",
"\n",
" library(data.table)\n",
" z_snp <- lapply(${_input:r}, readRDS)\n",
" z_gene <- do.call(rbind, lapply(${_input:nnn}.twas.tsv.gz\", fread))\n",
"\n",
" # for (study in gwas_studies){\n",
" # ctwas_res[[study]] <- ctwas_res <- ctwas_sumstats(z_snp[[study]],\n",
" # ctwas_input_data$weights,\n",
" # region_info,\n",
" # LD_map=LD_info,\n",
" # snp_map=find_data(export_ctwas_data_db, c(2, \"snp_info\")),\n",
" # z_gene = z_gene[[study]],\n",
" # thin = 1,\n",
" # ncore = ${numThreads},\n",
" # outputdir = ${_output[0]:dr},\n",
" # outname = \"${name}\",\n",
" # logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n",
" # verbose = FALSE, \n",
" # cor_dir = NULL,\n",
" # save_cor = TRUE,\n",
" # screen_method=\"nonSNP_PIP\",\n",
" # LD_format=\"custom\", \n",
" # LD_loader_fun = ctwas_ld_loader,\n",
" # scale_predictdb_weights = FALSE)\n",
" # }\n",
" library(ctwas) # multigroup_ctwas\n",
" library(pecotmr)\n",
"\n",
" regions_data <- get_ctwas_meta_data(\"${ld_meta_data}\", \"${regions}\")\n",
" gwas_studies <- unique(fread(\"${gwas_meta_data}\",data.table=FALSE, select = \"study_id\"))[,1]\n",
"\n",
" data <- lapply(c(${_input:r,}), readRDS)\n",
" snp_info <- do.call(c, lapply(data, function(x) x$snp_info))\n",
" snp_info <- snp_info[!duplicated(names(snp_info))]\n",
" weights <- do.call(c, lapply(data, function(x) x$weights))\n",
" if (!is.null(${twas_weight_cutoff})){\n",
" weights <- weights <- lapply(weights, function(x){ \n",
" x$wgt <- x$wgt[abs(x$wgt[,1]) > ${twas_weight_cutoff},,drop=FALSE]\n",
" if (nrow(x$wgt)<1) return(NULL)\n",
" x$n_wgt <- nrow(x$wgt)\n",
" return(x)\n",
" })\n",
" }\n",
" z_gene_dat <- do.call(c, lapply(data, function(x)x$z_gene))\n",
" z_snp_dat <- do.call(c, lapply(data, function(x)x$z_snp))\n",
" z_gene <- setNames(lapply(gwas_studies, function(study) do.call(rbind, z_gene_dat[names(z_gene_dat) == study])), gwas_studies)\n",
" z_snp <- setNames(lapply(gwas_studies, function(study) {\n",
" df <- do.call(rbind, z_snp_dat[names(z_snp_dat) == study])\n",
" df[!duplicated(df$id), ] \n",
" }), gwas_studies)\n",
"\n",
" ctwas_res <- list()\n",
" for (study in gwas_studies){\n",
" ctwas_res[[study]] <- ctwas_sumstats(z_snp[[study]],\n",
" weights,\n",
" region_info=regions_data$region_info,\n",
" LD_map=regions_data$LD_info,\n",
" snp_map=snp_info,\n",
" z_gene = z_gene[[study]],\n",
" thin = 0.1,\n",
" ncore = ${numThreads},\n",
" outputdir = ${_output[0]:dr},\n",
" outname = \"${name}\",\n",
" logfile = file.path(${_output[0]:dr}, paste0(\"${name}\", \".ctwas_sumstats.log\")),\n",
" verbose = FALSE, \n",
" cor_dir = NULL,\n",
" save_cor = TRUE,\n",
" group_prior_var_structure = c(\"shared_context\"),\n",
" LD_format=\"custom\", \n",
" LD_loader_fun = ctwas_ld_loader,\n",
" snpinfo_loader_fun = ctwas_bimfile_loader)\n",
" }\n",
"\n"
]
}
Expand Down

0 comments on commit 8894e68

Please sign in to comment.