diff --git a/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb b/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb index 2ecd9ae2..fe5db87f 100644 --- a/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb +++ b/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb @@ -561,19 +561,31 @@ "input: regional_data[\"data\"], group_by = lambda x: group_by_region(x, regional_data[\"data\"]), group_with = \"meta\"\n", "output: f'{cwd:a}/{name}.{_meta[0]}.enrichment.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", - " library(tidyverse)\n", - " library(pecotmr)\n", - " # RDS files for GWAS data\n", - " gwas_finemapped_data = c(${paths([x for x in _meta[1:len(_meta)]]):r,})\n", - " # RDS files for xQTL data\n", - " xqtl_finemapped_data = c(${paths([x for x in _input]):r,})\n", - " enrich_result = xqtl_enrichment_wrapper(gwas_files = gwas_finemapped_data, xqtl_files = xqtl_finemapped_data, \n", - " xqtl_finemapping_obj = c(\"${_meta[0]}\",${\",\".join(['\"%s\"' % x for x in xqtl_finemapping_obj]) if len(xqtl_finemapping_obj) != 0 else \"NULL\"}), \n", - " xqtl_varname_obj = c(\"${_meta[0]}\",${\",\".join(['\"%s\"' % x for x in xqtl_varname_obj]) if len(xqtl_varname_obj) != 0 else \"NULL\"}), \n", - " gwas_finemapping_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_finemapping_obj]) if len(gwas_finemapping_obj) != 0 else \"NULL\"}), \n", - " gwas_varname_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_varname_obj]) if len(gwas_varname_obj) != 0 else \"NULL\"}))\n", - " saveRDS(enrich_result, ${_output:ar})" + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " # Per-region xQTL files for this group, and the GWAS rds entries in _meta[1:].\n", + " # Convert each side of this region to a pecotmr S4 collection, then run\n", + " # qtlEnrichmentPipeline on the pair.\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode qtl \\\n", + " --rds-files ${\",\".join([str(x) for x in _input])} \\\n", + " --finemapping-obj '${\" \".join(xqtl_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(xqtl_varname_obj)}' \\\n", + " --study ${name} \\\n", + " --output ${_output:n}.qtl.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode gwas \\\n", + " --rds-files ${\",\".join([str(x) for x in _meta[1:] if str(x).endswith(\".rds\")])} \\\n", + " --finemapping-obj '${\" \".join(gwas_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(gwas_varname_obj)}' \\\n", + " --output ${_output:n}.gwas.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/qtl_enrichment.R \\\n", + " --qtl-fine-mapping ${_output:n}.qtl.rds \\\n", + " --gwas-fine-mapping ${_output:n}.gwas.rds \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}\n" ] }, { @@ -600,79 +612,32 @@ "# output: f'{cwd:a}/{step_name[:-2]}/{name}.{_meta[0]}.coloc.rds'\n", "output: f'{cwd:a}/{step_name}/{name}.{_meta[0]}.coloc.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", - " library(tidyverse)\n", - " library(pecotmr)\n", - " library(coloc) \n", - " # RDS files for xQTL data\n", - " # RDS files for xQTL data\n", - " xqtl_finemapped_datas = c(${paths([x for x in _input]):r,})\n", - " coloc_res <- list()\n", - " # are we doing something like this? that means results are by each context, and each one has \n", - " for(xqtl_finemapped_data in xqtl_finemapped_datas){\n", - " qtl_dat <- readRDS(xqtl_finemapped_data)\n", - " gene = names(qtl_dat)\n", - " context = \"${_meta[0]}\" %>% str_split(.,\"@\",simplify = T) %>% .[,1] \n", - " gene_region = pecotmr:::get_nested_element(qtl_dat[[1]], c(context,\"region_info\", \"grange\"))\n", - " \n", - " # Step 1: find relevant GWAS regions that overlap each the xQTL region of interest\n", - " gwas_finemapped_datas <- c(${ \",\".join(['\"%s\"' % x for x in _meta[1:]])}) \n", - " gwas_finemapped_datas <- gwas_finemapped_datas[grep('rds$',gwas_finemapped_datas)]\n", - " \n", - " gwas_regions <- gwas_finemapped_datas %>% basename %>% str_extract(., \"chr\\\\d+_\\\\d+_\\\\d+\")\n", - " overlap_index <- NULL\n", - " for (i in 1:length(gwas_regions)) {\n", - " region <- gwas_regions[i]\n", - " split_region <- unlist(strsplit(region, \"_\"))\n", - " block_chrom <- as.numeric(split_region[1] %>% gsub(\"chr\",\"\",.))\n", - " block_start <- as.numeric(split_region[2] %>% strsplit(., \"-\") %>% unlist %>% .[1])\n", - " block_end <- as.numeric(split_region[2] %>% strsplit(., \"-\") %>% unlist %>% .[2])\n", - " if (gene_region$chrom == block_chrom && (gene_region$start <= block_end | gene_region$end >= block_start)) {\n", - " overlap_index <- c(overlap_index, i)\n", - " }\n", - " }\n", - " \n", - " if (!is.null(overlap_index)) {\n", - " gwas_finemapped_data <- gwas_finemapped_datas[overlap_index]\n", - " \n", - " # Step 2: load enrichment analysis results\n", - " # Extract values for p1, p2, and p12\n", - " if(${\"FALSE\" if skip_enrich else \"TRUE\"}){\n", - " enrich_file = paste0('${cwd:a}','/', '${name}', '.' ,context,'.enrichment.rds')\n", - " p1 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p1`\n", - " p2 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p2`\n", - " p12 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p12`\n", - " \n", - " message(\"Priors are P1:\", p1, \"; p2: \", p2, \"; p12: \", p12)\n", - " } else {\n", - " p1 = 1e-4\n", - " p2 = 1e-4\n", - " p12 = 5e-6\n", - " message(\"Priors are using default, P1: 1e-4, P2: 1e-4, P12: 5e-6\" )\n", - " }\n", - " # Step 3: Apply colocalization analysis between each condition and GWAS\n", - " coloc_res[[gene]] <- coloc_wrapper(xqtl_file = xqtl_finemapped_data, gwas_files = gwas_finemapped_data, \n", - " xqtl_finemapping_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_finemapping_obj]) if len(xqtl_finemapping_obj) != 0 else \"NULL\"}), \n", - " xqtl_varname_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_varname_obj]) if len(xqtl_varname_obj) != 0 else \"NULL\"}), \n", - " gwas_finemapping_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_finemapping_obj]) if len(gwas_finemapping_obj) != 0 else \"NULL\"}), \n", - " gwas_varname_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_varname_obj]) if len(gwas_varname_obj) != 0 else \"NULL\"}),\n", - " xqtl_region_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_region_obj]) if len(xqtl_region_obj) != 0 else \"NULL\"}), \n", - " gwas_region_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_region_obj]) if len(gwas_region_obj) != 0 else \"NULL\"}),\n", - " p1 = p1, p2 = p2, p12 = p12, filter_lbf_cs = ${\"FALSE\" if skip_enrich else \"TRUE\"})\n", - "\n", - " if (${\"TRUE\" if ld_meta_file_path.is_file() else \"FALSE\"}) {\n", - " if(length(coloc_res[[gene]]) > 2) {\n", - " coloc_res[[gene]] <- coloc_post_processor(coloc_res[[gene]], LD_meta_file_path = ${ld_meta_file_path:r}, analysis_region = coloc_res[[gene]]$analysis_region)\n", - " if(!is.null(coloc_res[[gene]]$sets$cs)) writeLines(coloc_res[[gene]]$sets$cs %>% unlist, gsub(\"rds$\",\"coloc_res\",\"${_output}\"))\n", - " } else { message(\"No coloc results were generated.\") }\n", - " }\n", - " \n", - " } else {\n", - " message(\"No overlap was found between GWAS blocks and QTL analysis region.\")\n", - " coloc_res <- \"No overlap was found between GWAS blocks and QTL analysis region.\"\n", - " }\n", - " }\n", - " saveRDS(coloc_res, ${_output:r})" + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " # context (before '@' in _meta[0]) selects the per-region enrichment file the\n", + " # enrichment step wrote: {cwd}/{name}.{context}.enrichment.rds (legacy naming).\n", + " # Convert this region's xQTL + GWAS sides, then run colocPipeline (enloc\n", + " # variant when --enrichment is supplied).\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode qtl \\\n", + " --rds-files ${\",\".join([str(x) for x in _input])} \\\n", + " --finemapping-obj '${\" \".join(xqtl_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(xqtl_varname_obj)}' \\\n", + " --study ${name} \\\n", + " --output ${_output:n}.qtl.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode gwas \\\n", + " --rds-files ${\",\".join([str(x) for x in _meta[1:] if str(x).endswith(\".rds\")])} \\\n", + " --finemapping-obj '${\" \".join(gwas_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(gwas_varname_obj)}' \\\n", + " --output ${_output:n}.gwas.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/coloc.R \\\n", + " --qtl-fine-mapping ${_output:n}.qtl.rds \\\n", + " --gwas-input ${_output:n}.gwas.rds \\\n", + " ${('--enrichment '+cwd.absolute().joinpath(name + \".\" + _meta[0].split(\"@\")[0] + \".enrichment.rds\").as_posix()) if not skip_enrich else ''} \\\n", + " --output ${_output}\n" ] }, { @@ -722,4 +687,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/code/SoS/pecotmr_integration/twas_ctwas.ipynb b/code/SoS/pecotmr_integration/twas_ctwas.ipynb index cc3ef4a5..63e36095 100644 --- a/code/SoS/pecotmr_integration/twas_ctwas.ipynb +++ b/code/SoS/pecotmr_integration/twas_ctwas.ipynb @@ -293,312 +293,90 @@ "outputs": [], "source": [ "[twas]\n", + "# Per-region TWAS-Z + Mendelian Randomization, bridged to the pecotmr S4\n", + "# wrappers (no inline analysis R; no Python helpers). Restores the legacy\n", + "# per-region fan-out: one task per analysis region, looping the region's genes\n", + "# inside. Per region it (1) builds one multi-study GwasSumStats over the region\n", + "# via gwas_sumstats_construct.R (its LD sketch spans whatever LD-reference\n", + "# blocks the region overlaps), then per gene (2) converts the legacy\n", + "# univariate_twas_weights.rds to a pecotmr TwasWeights via\n", + "# legacy_twas_weights_convert.R and (3) runs twas.R (causalInferencePipeline)\n", + "# to emit a per-gene .twas.rds. MR is deferred (no fine-mapping result is\n", + "# passed), so only TWAS Z + p-value are produced.\n", "depends: sos_variable(\"filtered_regional_xqtl_files\")\n", + "# Legacy CLI retained. Wired to the pecotmr selection knobs: rsq_cutoff,\n", + "# rsq_pval_cutoff, rsq_option, rsq_pval_option (CV weight selection). MR is\n", + "# deferred, so mr_pval_cutoff and the remaining legacy params are kept declared\n", + "# for CLI stability but are not consumed by the S4 path.\n", "parameter: coverage = \"cs_coverage_0.95\"\n", - "# Threshold for rsq and pvalue for imputability determination for a model \n", + "# Thresholds for rsq and CV p-value for imputability/best-model selection\n", "parameter: rsq_cutoff = 0.01\n", "parameter: rsq_pval_cutoff = 0.05\n", "parameter: mr_pval_cutoff = 0.05\n", "parameter: save_ctwas_data = True\n", "parameter: save_mr_result = True\n", - "parameter: rsq_option=\"rsq\"\n", - "parameter: rsq_pval_option=[\"adj_rsq_pval\", \"pval\"]\n", - "# load by batches if memory resource is limited, default to load all at once \n", + "parameter: rsq_option = \"rsq\"\n", + "parameter: rsq_pval_option = [\"adj_rsq_pval\", \"pval\"]\n", + "# load by batches if memory resource is limited, default to load all at once\n", "parameter: batch_load_memory = 500\n", "parameter: event_filter_rules = path()\n", "parameter: comment_string = \"NULL\"\n", "parameter: rename_column = False\n", - "stop_if(len(filtered_regional_xqtl_files) == 0, \n", + "stop_if(len(filtered_regional_xqtl_files) == 0,\n", " \"No regions with overlapping xQTL weights found; skipping TWAS step.\")\n", "input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = \"filtered_region_info\"\n", - "output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']\n", - "if save_ctwas_data:\n", - " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')\n", - "if save_mr_result:\n", - " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')\n", - "output: output_files\n", + "output: [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.{gene}.twas.rds' for gene in sorted(set(_filtered_region_info[4]))]\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container\n", - "\n", - " library(dplyr)\n", - " library(data.table)\n", - " library(pecotmr)\n", - " library(readr)\n", - "\n", - "\n", - " # function to update method selection\n", - " update_twas_method <- function(twas_df, rsq_cutoff =${rsq_cutoff}, rsq_pval_cutoff = ${rsq_pval_cutoff}){\n", - " if (nrow(twas_df) == 0) { \n", - " return(twas_df)\n", - " } \n", - " twas_df$method_selected_original <- twas_df$is_selected_method\n", - " twas_df_colnames <- colnames(twas_df)\n", - " twas_df$gene_context <- paste0(twas_df$molecular_id, \"_\", twas_df$context)\n", - " fix_sel <- twas_df$is_selected_method & (is.na(twas_df$twas_z) | is.na(twas_df$twas_pval) | is.infinite(twas_df$twas_pval) |\n", - " is.null(twas_df$twas_z) | is.null(twas_df$twas_pval))\n", - " if (!any(fix_sel)) return(twas_df) # If nothing to fix, return fast\n", - " all_group <- paste(twas_df$gene_context, twas_df$gwas_study, sep = \" - \") #gene-context-study group\n", - " fix_groups <- unique(all_group[fix_sel])\n", - "\n", - " for (fixgroup in fix_groups) {\n", - " df_idxs <- which(all_group == fixgroup) # idx of gene-context-study group that needs replacement of method selected\n", - " if (length(df_idxs) == 1) next\n", - " candidate_idxs <- df_idxs[!is.na(twas_df$twas_z[df_idxs]) & !is.na(twas_df$twas_pval[df_idxs]) & !is.infinite(twas_df$twas_z[df_idxs]) & \n", - " !is.null(twas_df$twas_z[df_idxs]) & !is.null(twas_df$twas_pval[df_idxs]) &\n", - " twas_df$rsq_cv[df_idxs] >= rsq_cutoff & twas_df$pval_cv[df_idxs] <= rsq_pval_cutoff] \n", - " if (length(candidate_idxs) <= 0 || all(is.na(candidate_idxs))) next\n", - " best_i <- candidate_idxs[which.max(twas_df$rsq_cv[candidate_idxs])]\n", - " best_method <- twas_df$method[best_i]\n", - " twas_df$is_selected_method[df_idxs] <- (twas_df$method[df_idxs] == best_method)\n", - " message (\"Updating method selection for \", fixgroup, \".\")\n", - " }\n", - " return(twas_df[, twas_df_colnames])\n", - " }\n", - "\n", - " # Load metadata and configuration - let these fail if there are issues\n", - " if (${\"TRUE\" if rename_column else \"FALSE\"}) {\n", - " gwas_meta_data = fread(\"${gwas_meta_data}\", data.table=FALSE)\n", - " column_file_path = gwas_meta_data$column_mapping[1]\n", - " } else {\n", - " column_file_path = NULL\n", - " }\n", - " \n", - " xqtl_meta_df <- fread(\"${xqtl_meta_data}\", data.table=FALSE)\n", - " xqtl_meta_df <- xqtl_meta_df[!duplicated(xqtl_meta_df[, c(\"region_id\", \"TSS\")]), ]\n", - " xqtl_type_table <- if (isTRUE(file.exists(\"${xqtl_type_table}\"))) fread(\"${xqtl_type_table}\") else NULL\n", - " gene_list <- c(${', '.join([f\"'{gene}'\" for gene in _filtered_region_info[4]])})\n", - " \n", - " event_filter_rules = ${\"NULL\" if not event_filter_rules.is_file() else \"'%s'\" % event_filter_rules}\n", - " if (!is.null(event_filter_rules)) {\n", - " event_filters <- read.table(\"${event_filter_rules}\")\n", - " event_filters <- lapply(1:nrow(event_filters), function(ii) as.list(event_filters[ii,] %>% unlist))\n", - " } else { \n", - " event_filters <- NULL \n", - " }\n", - "\n", - " # Process weights with targeted error handling for file operations\n", - " weight_db_list <- c(${_input:r,})\n", - " names(weight_db_list) <- gene_list\n", - " weight_db_list <- split(weight_db_list, names(weight_db_list))\n", - " \n", - " # Filter out empty/invalid weight files with error handling\n", - " weight_db_list_update <- tryCatch({\n", - " Filter(Negate(is.null), lapply(weight_db_list, function(file_list) {\n", - " do.call(c, lapply(file_list, function(file) {\n", - " if (file.exists(file) && file.size(file) > 200) file else NULL\n", - " }))\n", - " }))\n", - " }, error = function(e) {\n", - " message(paste(\"Error checking weight files:\", e$message))\n", - " return(list())\n", - " })\n", - "\n", - " if(length(weight_db_list_update) <= 0) {\n", - " message(paste0(\"No valid weight files for region ${_filtered_region_info[3]}. Creating empty output files.\"))\n", - " # Define TWAS result columns (same as in final_results)\n", - " twas_header <- data.frame(\n", - " gene = character(),\n", - " molecular_id = character(),\n", - " TSS = numeric(),\n", - " start = numeric(),\n", - " end = numeric(),\n", - " beta = numeric(),\n", - " se = numeric(),\n", - " z = numeric(),\n", - " pval = numeric(),\n", - " qval = numeric(),\n", - " rsq = numeric(),\n", - " mr_beta = numeric(),\n", - " mr_se = numeric(),\n", - " mr_pval = numeric()\n", - " )\n", - "\n", - " # Write empty file with header, gzipped\n", - " fwrite(twas_header, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " quit(save = \"no\", status = 0)\n", - " }\n", - "\n", - " # Load TWAS weights - allow this to fail with informative errors\n", - " message(paste(\"Loading TWAS weights for\", length(weight_db_list_update), \"genes\"))\n", - " twas_weights_results <- list()\n", - " \n", - " for (gene_db in names(weight_db_list_update)) {\n", - " weight_dbs <- weight_db_list_update[[gene_db]]\n", - " message(paste(\"Processing gene:\", gene_db, \"with\", length(weight_dbs), \"weight files\"))\n", - " conditions = xqtl_meta_df |> filter(region_id == gene_db) |> pull(contexts)\n", - "\n", - " # Load weights for this gene - let it fail if there are real issues\n", - " twas_weights_results[[gene_db]] = load_twas_weights(\n", - " weight_dbs, \n", - " conditions = if(is.na(conditions)) NULL else conditions,\n", - " variable_name_obj = \"variant_names\", \n", - " susie_obj = \"susie_weights_intermediate\",\n", - " twas_weights_table = \"twas_weights\"\n", - " )\n", - "\n", - " if (length(twas_weights_results[[gene_db]]) > 1) {\n", - " twas_weights_results[[gene_db]]$data_type <- setNames(\n", - " lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {\n", - " xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]\n", - " }), \n", - " names(twas_weights_results[[gene_db]]$weights)\n", - " )\n", - " no_type_value <- sapply(names(twas_weights_results[[gene_db]]$data_type), function(x) length(twas_weights_results[[gene_db]]$data_type[[x]])==0)\n", - " if (any(no_type_value)){\n", - " aff_contexts <- names(twas_weights_results[[gene_db]]$data_type)[no_type_value]\n", - " warning(paste(aff_contexts, collapse=\", \"), \" data type information if not documented in ${xqtl_meta_data}, assigning as undefined. \")\n", - " for (i in which(no_type_value)) {\n", - " if (length(twas_weights_results[[gene_db]]$data_type[[i]])==0) twas_weights_results[[gene_db]]$data_type[[i]] <- \"undefined\"\n", - " }\n", - " }\n", - " } else {\n", - " message(paste(\"No valid weights loaded for gene:\", gene_db))\n", - " twas_weights_results[[gene_db]] <- NULL\n", - " } \n", - " }\n", - " \n", - " # Clean up and check if we have any valid results\n", - " rm(weight_db_list, weight_db_list_update, xqtl_type_table, gene_list)\n", - " gc()\n", - " \n", - " # Filter out NULL results and report\n", - " valid_genes <- names(Filter(Negate(is.null), twas_weights_results))\n", - " message(paste(\"Valid weights loaded for\", length(valid_genes), \"genes:\", paste(valid_genes, collapse = \", \")))\n", - " \n", - " if (length(valid_genes) == 0) {\n", - " message(paste0(\"No valid TWAS weights loaded for region ${_filtered_region_info[3]}. Creating empty output files.\"))\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " quit(save = \"no\", status = 0)\n", - " }\n", - "\n", - " # Batch load weights - allow this to fail\n", - " twas_weights_results <- batch_load_twas_weights(\n", - " twas_weights_results = twas_weights_results,\n", - " meta_data_df = xqtl_meta_df,\n", - " max_memory_per_batch = ${\"Inf\" if batch_load_memory == \"inf\" else batch_load_memory}\n", - " )\n", - " \n", - " message(paste(\"Proceeding with TWAS analysis for\", length(twas_weights_results), \"batches\"))\n", - "\n", - " # TWAS analysis - allow this to fail with informative errors\n", - " twas_results_db <- vector(\"list\", length(twas_weights_results)) \n", - " for (batch in seq_along(twas_weights_results)) {\n", - " message(paste(\"Processing batch\", batch, \"of\", length(twas_weights_results)))\n", - " res <- tryCatch(\n", - " twas_pipeline(\n", - " twas_weights_results[[batch]], \n", - " \"${ld_meta_data}\", \n", - " \"${gwas_meta_data}\", \n", - " region_block = \"${_filtered_region_info[3]}\",\n", - " ld_reference_sample_size = ${ld_reference_sample_size},\n", - " rsq_cutoff = ${rsq_cutoff}, \n", - " rsq_option = \"${rsq_option}\", \n", - " rsq_pval_cutoff = ${rsq_pval_cutoff}, \n", - " rsq_pval_option = c(${\", \".join([f'\"{x}\"' for x in rsq_pval_option])}), \n", - " mr_pval_cutoff = ${mr_pval_cutoff},\n", - " mr_coverage_column = \"${coverage}\", \n", - " output_twas_data = ${\"TRUE\" if save_ctwas_data else \"FALSE\"},\n", - " event_filters = event_filters,\n", - " column_file_path = column_file_path,\n", - " comment_string = ${\"NULL\" if comment_string == \"NULL\" else f\"'{comment_string}'\"}\n", - " ),\n", - " error = function(e) {\n", - " message(paste(\"Batch\", batch, \"ERROR:\", e$message))\n", - " return(NULL)\n", - " }\n", - " )\n", - " \n", - " # res is NULL → skip safely\n", - " if (is.null(res) || is.null(res$twas_result)) {\n", - " message(paste(\"Batch\", batch, \"returned no results\"))\n", - " twas_results_db[[batch]] <- NULL\n", - " next\n", - " }\n", - " \n", - " message(paste(\"Batch\", batch, \"produced\", nrow(res$twas_result), \"TWAS results\"))\n", - " twas_results_db[[batch]] <- res\n", - " }\n", - " \n", - " # Now safely drop NULLs\n", - " twas_results_db <- Filter(Negate(is.null), twas_results_db)\n", - " \n", - " rm(twas_weights_results)\n", - " gc()\n", - " \n", - " # Report final results\n", - " message(paste(\"Final valid batches:\", length(twas_results_db)))\n", - "\n", - " if(length(twas_results_db) != 0){\n", - " # Merge with metadata\n", - " for (batch in 1:length(twas_results_db)){\n", - " if (!is.null(twas_results_db[[batch]]$twas_result)) {\n", - " twas_results_db[[batch]]$twas_result <- merge(\n", - " twas_results_db[[batch]]$twas_result, \n", - " xqtl_meta_df[, c(\"region_id\",\"TSS\",\"start\",\"end\")], \n", - " by.x = \"molecular_id\", by.y = \"region_id\"\n", - " )\n", - " }\n", - " }\n", - " \n", - " # Combine and write results\n", - " final_results <- do.call(rbind, lapply(twas_results_db, function(x) {\n", - " if (!is.null(x$twas_result)) x$twas_result[, c(2,1,14:16,3:13)] else data.frame()\n", - " }))\n", - "\n", - " # update methods if twas z-score is NA \n", - " final_results <- update_twas_method(final_results) \n", - " if (nrow(final_results)==0) {\n", - " message(\"No valid TWAS results to write\")\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " } else {\n", - " message(paste(\"Writing\", nrow(final_results), \"final TWAS results\"))\n", - " fwrite(final_results, ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " } else {\n", - " message(\"No valid TWAS results to write\")\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - "\n", - " # Save cTWAS data if requested\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"} & length(twas_results_db) != 0) {\n", - " if (length(twas_results_db) > 1){\n", - " # Combine data from multiple batches\n", - " twas_results_db[[1]]$twas_data$weights <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$weights))\n", - " twas_results_db[[1]]$twas_data$susie_weights_intermediate_qced <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$susie_weights_intermediate_qced))\n", - " studies <- unique(names(find_data(twas_results_db, c(3, \"z_gene\"))))\n", - " for (study in studies){\n", - " twas_results_db[[1]][[\"twas_data\"]][[\"z_gene\"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_gene[[study]]))\n", - " }\n", - " }\n", - " saveRDS(twas_results_db[[1]]$twas_data, \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " } else if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - "\n", - " # Save MR results if requested\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " if(length(twas_results_db) != 0) {\n", - " mr_results <- do.call(rbind, lapply(twas_results_db, function(x) x$mr_result))\n", - " message(paste(\"Writing\", nrow(mr_results), \"MR results\"))\n", - " fwrite(mr_results, file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " } else {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " }\n", - "\n", - " message(\"TWAS analysis completed successfully\")" + "bash: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container\n", + " set -e\n", + " # SoS expands ${...}; bare $shellvars (no braces) are left for bash.\n", + " outdir=${cwd:a}/${step_name}\n", + " mkdir -p \"$outdir\"\n", + " region_id=${_filtered_region_info[3]}\n", + " ld_block=${_filtered_region_info[0]}:${_filtered_region_info[1]}-${_filtered_region_info[2]}\n", + "\n", + " # GWAS studies covering this region's chromosome (paths already resolved by\n", + " # get_analysis_regions). studies and gwas_tsvs iterate the same dict order.\n", + " studies=\"${\",\".join([s for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}\"\n", + " gwas_tsvs=\"${\",\".join([regional_data['GWAS'][s][_filtered_region_info[0]][0] for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}\"\n", + "\n", + " # (1) One multi-study GwasSumStats over the whole region.\n", + " gss=\"$outdir/${name}.$region_id.gwas_sumstats.rds\"\n", + " Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \\\n", + " --study \"$studies\" \\\n", + " --gwas-tsv \"$gwas_tsvs\" \\\n", + " --ld-block \"$ld_block\" \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --output \"$gss\"\n", + "\n", + " # (2)+(3) Per unique gene in this region (first weight file per gene):\n", + " # convert legacy weights (+FMR for MR), then run TWAS-Z + MR. The expansion\n", + " # below emits a flat \"gene file gene file ...\" token list (first file per\n", + " # gene); the loop consumes it two tokens at a time.\n", + " n_files=${len(_input)}\n", + " n_genes=${len(set([str(g) for g in _filtered_region_info[4]]))}\n", + " if [ \"$n_files\" -ne \"$n_genes\" ]; then\n", + " echo \"NOTE: $n_files weight file(s) across $n_genes gene(s) in region $region_id; using the first file per gene.\" >&2\n", + " fi\n", + " set -- ${\" \".join([tok for i, (g, f) in enumerate(zip(_filtered_region_info[4], _input)) if str(g) not in [str(h) for h in _filtered_region_info[4][:i]] for tok in (str(g), str(f))])}\n", + " while [ \"$#\" -ge 2 ]; do\n", + " gene=\"$1\"\n", + " weight=\"$2\"\n", + " shift 2\n", + " tw=\"$outdir/${name}.$region_id.$gene.twas_weights.rds\"\n", + " Rscript code/script/pecotmr_integration/legacy_twas_weights_convert.R \\\n", + " --legacy \"$weight\" \\\n", + " --study \"${name}\" \\\n", + " --output \"$tw\"\n", + " Rscript code/script/pecotmr_integration/twas.R \\\n", + " --twas-weights \"$tw\" \\\n", + " --gwas-sumstats \"$gss\" \\\n", + " --rsq-cutoff ${rsq_cutoff} \\\n", + " --rsq-pval-cutoff ${rsq_pval_cutoff} \\\n", + " --rsq-option ${rsq_option} \\\n", + " --rsq-pval-option \"${\",\".join(rsq_pval_option)}\" \\\n", + " --output \"$outdir/${name}.$region_id.$gene.twas.rds\"\n", + " done" ] }, { @@ -614,235 +392,78 @@ "outputs": [], "source": [ "[ctwas_1]\n", - "# ctwas_1: merge regions by each chromosome \n", + "# cTWAS step 1 (assemble): build the per-LD-block inputs for one gene-bearing\n", + "# chromosome and assemble them via the pecotmr S4 wrappers (no inline analysis\n", + "# R; no Python helpers). Per chromosome: (1) ctwas_manifest.R enumerates the\n", + "# chromosome's LD blocks from ld_meta_data, maps each gene (by TSS in\n", + "# xqtl_meta_data) to its home block, and pairs each block with a per-block\n", + "# GwasSumStats path + the gene's TwasWeights; (2) gwas_sumstats_construct.R\n", + "# builds one GwasSumStats per block (SNP background); (3) ctwas_assemble.R runs\n", + "# assembleCtwasInputs -> {name}.ctwas_inputs.rds. cTWAS runs over the whole\n", + "# chromosome's LD blocks, NOT the coarse twas analysis region.\n", "depends: sos_variable(\"filtered_region_info\")\n", - "# chromosome number to merge for region data, can be an integer or a string, i.e. 'chr1'\n", - "parameter: chrom=\"\"\n", - "# twas weight cutoff for ctwas variant selection\n", + "# chromosome to assemble (integer or 'chrN'); default: the lone gene-bearing chrom\n", + "parameter: chrom = \"\"\n", + "# weight pre-filters passed to assembleCtwasInputs\n", "parameter: twas_weight_cutoff = 0.0\n", - "parameter: max_num_variants =\"Inf\"\n", - "# cs_min_cor cutoff in susie finemapping result for variant selection\n", - "parameter: cs_min_cor = 0\n", - "# minimum pip cutoff in susie finemapping result for variant selection\n", - "parameter: min_pip_cutoff =0 \n", - "# A scalar in (0,1]. The proportion of SNPs, reduce runtime at the cost of accuracy \n", + "parameter: max_num_variants = \"Inf\"\n", + "parameter: cs_min_cor = 0.0\n", + "parameter: min_pip_cutoff = 0.0\n", + "# declared for CLI stability; consumed downstream / not by assembleCtwasInputs\n", "parameter: thin = 1.0\n", - "# Maximum number of SNPs in a region.\n", "parameter: maxSNP = 20000\n", - "# skip this step if True\n", - "parameter: skip_assembly = False\n", - "skip_if(skip_assembly == True, \" Skip [ctwas_1] assemble regions. \" )\n", "parameter: multi_group = True\n", - "# A string character add to the end of name variable\n", "parameter: numThreads = 4\n", - "\n", - "twas_output_files = {}\n", - "region_blocks_per_chrom = []\n", - "chromosome_list = []\n", - "chrom = f\"chr{int(chrom)}\" if str(chrom).isdigit() else chrom\n", - "for region_info in filtered_region_info:\n", - " chrom_info = region_info[0]\n", - " if chrom_info != chrom:\n", - " continue # Skip chromosomes other than chrom input \n", - " file_path = f\"{cwd}/twas/{name}.{region_info[3]}.twas_data.rds\"\n", - " if chrom_info not in twas_output_files:\n", - " twas_output_files[chrom_info] = [[], []] # Structure: {\"chrX\": [[files], [region_names]]}\n", - " chromosome_list.append(chrom_info)\n", - " twas_output_files[chrom_info][0].append(file_path) # File paths\n", - " twas_output_files[chrom_info][1].append(region_info[3]) # Region names\n", - "twas_files=[twas_output_files[chr][0] for chr in chromosome_list]\n", - "\n", - "region_blocks_per_chrom = [twas_output_files[chr][1] for chr in chromosome_list]\n", - "region_blocks_per_chrom = [f\"\"\"c(\"{'\", \"'.join(regions)}\")\"\"\" for regions in region_blocks_per_chrom]\n", - "outdir = f\"{cwd}/{step_name.split('_')[0]}\"\n", - "if not os.path.exists(outdir):\n", - " os.makedirs(outdir)\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "name_suffix = f'\"{name_suffix}\"' if name_suffix else 'NULL'\n", - "\n", - "input: twas_files, group_by = lambda x: group_by_region(x, twas_files), group_with=dict(region_names=region_blocks_per_chrom)\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stderr\", container = container\n", - "\n", - " library(data.table)\n", - " library(ctwas) # multigroup_ctwas\n", - " library(pecotmr)\n", - " library(stringr)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - " \n", - " # load genome-wide data across all regions\n", - " gwas_meta_data <- fread(\"${gwas_meta_data}\",data.table=FALSE)\n", - " if(!'sample_size' %in% colnames(gwas_meta_data)) stop('Please add column `sample_size` information for each GWAS study in --gwas_meta_data. ')\n", - " gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)\n", - " gwas_files <- gwas_meta_data[gwas_meta_data$chrom == readr::parse_number(\"${chrom}\") & gwas_meta_data$study_id %in% gwas_studies,, drop=FALSE]\n", - " gwas_files <- setNames(gwas_files$file_path, gwas_files$study_id)\n", - "\n", - " merge_context_names <- function(ctwas_dat_file){\n", - " ctwas_dat<- readRDS(ctwas_dat_file)\n", - " idxs <- which(grepl(\"_chr[0-9XY]+_[^_]+\", names(ctwas_dat$weights)))\n", - " if (length(idxs)>0) {\n", - " names(ctwas_dat$weights) <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", names(ctwas_dat$weights))\n", - " for (idx in idxs) {\n", - " for (study in names(ctwas_dat$weights[[idx]])){\n", - " ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$weights[[idx]][[study]]$weight_name)\n", - " ctwas_dat$weights[[idx]][[study]]$context <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$weights[[idx]][[study]]$context)\n", - " }\n", - " }\n", - " for (study in names(ctwas_dat$z_gene)){\n", - " ctwas_dat$z_gene[[study]]$id <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$id)\n", - " ctwas_dat$z_gene[[study]]$context <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$context)\n", - " ctwas_dat$z_gene[[study]]$group <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$group)\n", - " }\n", - " for (gene in names(ctwas_dat$susie_weights_intermediate_qced)){\n", - " names(ctwas_dat$susie_weights_intermediate_qced[[gene]]) <- gsub(\"_chr[0-9XY]+_[^_]+\", \"\", names(ctwas_dat$susie_weights_intermediate_qced[[gene]]))\n", - " }\n", - " }\n", - " idxs <- which(grepl(\"monocyte\", names(ctwas_dat$weights)))\n", - " if (length(idxs)>0) {\n", - " names(ctwas_dat$weights) <- gsub(\"monocyte_eQTL\", \"eQTL\", names(ctwas_dat$weights))\n", - " for (idx in idxs) {\n", - " for (study in names(ctwas_dat$weights[[idx]])){\n", - " ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$weights[[idx]][[study]]$weight_name)\n", - " ctwas_dat$weights[[idx]][[study]]$type <- \"eQTL\"\n", - " }\n", - " }\n", - " for (study in names(ctwas_dat$z_gene)){\n", - " ctwas_dat$z_gene[[study]]$id <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$z_gene[[study]]$id)\n", - " ctwas_dat$z_gene[[study]]$type <- gsub(\"monocyte_\", \"\", ctwas_dat$z_gene[[study]]$type)\n", - " ctwas_dat$z_gene[[study]]$group <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$z_gene[[study]]$group)\n", - " }\n", - " }\n", - " return(ctwas_dat)\n", - " }\n", - " weight_list = vector(\"list\", length(gwas_studies))\n", - " names(weight_list) <- gwas_studies\n", - " z_gene <- weight_list # empty list \n", - " z_snp <- weight_list\n", - " \n", - " # get LD snp info table (snp_map) and ld variants \n", - " max_pos <- fread(\"${ld_meta_data}\", data.table=FALSE)\n", - " max_pos <- max(max_pos$end[max_pos[,1]==\"${chrom}\"])\n", - " region_of_interest <- data.frame(chrom = \"${chrom}\", start = 0, end = max_pos+1)\n", - " snp_map <- get_ref_variant_info(\"${ld_meta_data}\", region_of_interest)\n", - " ld_variants <- ifelse(grepl(\"^chr\", snp_map$id), snp_map$id, paste0(\"chr\", snp_map$id))\n", - " ld_variants <- ld_variants[!duplicated(ld_variants)]\n", - "\n", - " regions_data <- get_ctwas_meta_data(\"${ld_meta_data}\")\n", - " region_info <- regions_data$region_info\n", - " LD_map <- regions_data$LD_info\n", - " if (!is.null(${name_suffix})) name <- paste0(\"${name}_\", ${name_suffix}) else name <- \"${name}\"\n", - " if (!file.exists(file.path(outputdir, paste0(name, \".LD_map.rds\")))) saveRDS(LD_map, file.path(outputdir, paste0(name, \".LD_map.rds\")), compress='xz')\n", - " saveRDS(snp_map, file.path(outputdir, paste0(name, \".snp_map.${chrom}.rds\")), compress='xz')\n", - " rm(regions_data, LD_map)\n", - " gc()\n", - " region_map <- c()\n", - " # merge chromosome wide data, weight, z_gene, z_snp from the twas pipeline output of twas.data.rds files \n", - " for (region_dat in c(${_input:r,})){\n", - " ctwas_dat <- merge_context_names(region_dat)\n", - " if (is.null(ctwas_dat)) next \n", - " weight_tmp <- trim_ctwas_variants(ctwas_dat, twas_weight_cutoff=${twas_weight_cutoff}, cs_min_cor=${cs_min_cor},\n", - " min_pip_cutoff=${min_pip_cutoff}, max_num_variants=${max_num_variants})\n", - " if (length(weight_tmp)==0) next \n", - " for (study in names(weight_tmp)) {\n", - " if (!study %in% gwas_studies) next\n", - " weight_list[[study]] <- c(weight_list[[study]], weight_tmp[[study]])\n", - " z_gene[[study]] <- rbind(z_gene[[study]], ctwas_dat$z_gene[[study]])\n", - " z_gene[[study]] <- z_gene[[study]][!is.na(z_gene[[study]]$z) & !is.infinite(z_gene[[study]]$z) & z_gene[[study]]$id %in% names(weight_list[[study]]),] \n", - " }\n", - " region <- str_extract(region_dat, \"chr[0-9XY]+_[0-9]+_[0-9]+\")\n", - " z_genes <- unique(find_data(weight_tmp, c(3, \"molecular_id\")))\n", - " names(z_genes) <- rep(str_extract(region_dat, \"chr[0-9XY]+_[0-9]+_[0-9]+\"), length(z_genes))\n", - " region_map <- c(region_map, z_genes)\n", - " rm(ctwas_dat, weight_tmp, z_genes, region)\n", - " gc()\n", - " }\n", - " z_gene <- Filter(Negate(is.null), z_gene)\n", - " weight_list <- Filter(Negate(is.null), weight_list)\n", - " if (length(z_gene)==0 | length(weight_list)==0) stop(\"Please check input data. No twas z-score value available. \")\n", - " \n", - " for (study in names(weight_list)){\n", - " z_snp[[study]] <- harmonize_gwas(gwas_files[study], query_region=paste0(readr::parse_number(\"${chrom}\"), \":0-\", max_pos+1), ld_variants, c(\"beta\", \"z\"), match_min_prop = 0)\n", - " z_snp[[study]] <- z_snp[[study]][, colnames(z_snp[[study]])[colnames(z_snp[[study]]) %in% c(\"chrom\", \"pos\", \"variant_id\", \"A1\", \"A2\", \"z\", \"beta\", \"n_sample\",\"n_case\", \"n_control\", \"effect_allele_frequency\")]]\n", - " z_snp[[study]] <- z_snp[[study]][, c(\"chrom\", \"pos\", colnames(z_snp[[study]])[!colnames(z_snp[[study]]) %in% c(\"chrom\", \"pos\")])]\n", - " z_snp[['sample_size']][[study]]<- as.integer(unique(na.omit(gwas_meta_data$sample_size[gwas_meta_data$study_id==study])))\n", - " if(length(z_snp[['sample_size']][[study]]!=1) | z_snp[['sample_size']][[study]] <=0 ) {\n", - " stop(\"Please check sample size provided for \", study, \" at --gwas_meta_data. \")\n", - " }\n", - " }\n", - " # if weight variants are trimmed, load LD and re-compute twas z score \n", - " if (${twas_weight_cutoff}!=0 | ${cs_min_cor}!=0 | ${min_pip_cutoff}!=0 | ${max_num_variants}!=Inf ) {\n", - " region_map_list <- split(region_map, names(region_map))\n", - " for (region in names(region_map_list)){\n", - " region_genes <- unique(region_map_list[[region]])\n", - " weight_list_sub <- find_data(weight_list, 2) \n", - " weight_list_sub <- weight_list_sub[gsub( \"\\\\|.*$\", \"\", names(weight_list_sub)) %in% region_genes]\n", - " region_of_interest <- data.frame(chrom = \"${chrom}\", start = min(find_data(weight_list_sub, c(2, \"p0\"))), end = max(find_data(weight_list_sub, c(2, \"p1\"))))\n", - " rm(weight_list_sub);gc()\n", - " LD_list <- load_LD_matrix(\"${ld_meta_data}\", region_of_interest)\n", - " dup_idx <- which(duplicated(LD_list$LD_variants))\n", - " LD_list <- LD_list['LD_matrix'] \n", - " if (length(dup_idx) >= 1) LD_list$LD_matrix <- LD_list$LD_matrix[-dup_idx, -dup_idx]\n", - " for (study in names(weight_list)){\n", - " sub_idx <- which(gsub(\"\\\\|.*$\", \"\", names(weight_list[[study]])) %in% region_genes)\n", - " for (context in names(weight_list[[study]][sub_idx])){\n", - " twas_df <- twas_analysis(weight_list[[study]][[context]]$wgt, z_snp[[study]], LD_list$LD_matrix, rownames(weight_list[[study]][[context]]$wgt))[[1]]\n", - " z_gene[[study]]$z[z_gene[[study]]$id==context &z_gene[[study]]$gwas_study==study] <- twas_df$z \n", - " }\n", - " }\n", - " rm(LD_list);gc() \n", - " }\n", - " rm(LD_list);gc()\n", - " }\n", - "\n", - " # for each gwas study - merge ctwas input data \n", - " for (study in names(weight_list)){\n", - " if (is.null(z_gene[[study]])) next\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " outname <- paste0(name, \".ctwas_region_data.\", study, \".${chrom}\")\n", - " } else {\n", - " contexts <- unique(do.call(c,lapply(z_gene, function(x) unique(x$context))))\n", - " outname <- paste0(name, \".ctwas_region_data.\", study, \".\", contexts, \".${chrom}\")\n", - " names(outname) <- contexts\n", - " }\n", - "\n", - " colnames(z_snp[[study]])[match(\"variant_id\", colnames(z_snp[[study]]))] <-\"id\" \n", - " z_snp[[study]] <- z_snp[[study]][, c(\"id\", \"A1\", \"A2\", \"z\")]\n", - "\n", - " # iterate by study-context pair (single group) or by study itself (multigroup) \n", - " for (group_name in outname){ \n", - " # assemble regions \n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " subset_inx <- names(weight_list[[study]])\n", - " } else {\n", - " context <- names(outname)[which(outname==group_name)]\n", - " if (length(weight_list[[study]][grepl(context, names(weight_list[[study]]))])==0) next\n", - " subset_inx <- which(grepl(context, names(weight_list[[study]])))\n", - " }\n", - "\n", - " res <- assemble_region_data(region_info,\n", - " z_snp[[study]],\n", - " z_gene[[study]],\n", - " weight_list[[study]][subset_inx],\n", - " snp_map,\n", - " thin = ${thin},\n", - " maxSNP = ${maxSNP},\n", - " min_group_size = 1,\n", - " ncore = ${numThreads})\n", - " region_data <- res$region_data\n", - " boundary_genes <- res$boundary_genes\n", - " region_data_file <- file.path(outputdir, paste0(group_name, \".thin\", ${thin}, \".rds\"))\n", - " boundary_gene_file <- gsub(\"region_data\",\"boundary_genes\", region_data_file)\n", - " saveRDS(region_data, region_data_file, compress='xz')\n", - " saveRDS(boundary_genes, boundary_gene_file, compress='xz') \n", - " }\n", - " saveRDS(weight_list[[study]], file.path(outputdir, paste0(name,\".ctwas_weights.\", study, \".${chrom}.rds\")), compress='xz')\n", - " saveRDS(list(z_snp=z_snp[[study]], z_gene=z_gene[[study]], gwas_n=z_snp[['sample_size']][[study]]), \n", - " file.path(outputdir, paste0(name, \".z_gene_snp.\", study, \".${chrom}.rds\")), compress='xz')\n", - " weight_list[[study]] <- NULL\n", - " z_gene[[study]] <- NULL\n", - " z_snp[[study]] <- NULL\n", - " }" + "parameter: skip_assembly = False\n", + "# TwasWeights source (option c): empty -> the upstream twas step's per-gene\n", + "# weights ({cwd}/twas/{name}.{region}.{gene}.twas_weights.rds); override with a\n", + "# list of prebuilt TwasWeights RDS (e.g. the blessed ctwas weights extracted by\n", + "# legacy_ctwas_weights_to_s4.R) when the toy twas weights carry no signal.\n", + "parameter: twas_weights = []\n", + "skip_if(skip_assembly == True, \"Skip [ctwas_1] assemble.\")\n", + "gene_chroms = sorted(set(str(ri[0]) for ri in filtered_region_info if ri[4]))\n", + "ctwas_chrom = (f\"chr{int(chrom)}\" if str(chrom).isdigit() else str(chrom)) if str(chrom) else (gene_chroms[0] if len(gene_chroms) == 1 else \"\")\n", + "stop_if(not ctwas_chrom, f\"Specify --chrom: expected one gene-bearing chromosome, found {gene_chroms}.\")\n", + "ctwas_weights = list(twas_weights) if twas_weights else [f\"{cwd:a}/twas/{name}.{ri[3]}.{g}.twas_weights.rds\" for ri in filtered_region_info if str(ri[0]) == ctwas_chrom for g in sorted(set(ri[4]))]\n", + "ctwas_studies = [s for s in regional_data['GWAS'] if ctwas_chrom in regional_data['GWAS'][s]]\n", + "stop_if(len(ctwas_studies) == 0, f\"No GWAS study covers {ctwas_chrom}.\")\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_inputs.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " outdir=${cwd:a}/ctwas\n", + " mkdir -p \"$outdir\"\n", + " manifest=\"$outdir/${name}.ctwas_manifest.${ctwas_chrom}.tsv\"\n", + "\n", + " # (1) per-LD-block manifest (block enumeration + gene->home-block mapping).\n", + " Rscript code/script/pecotmr_integration/ctwas_manifest.R \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --chrom \"${ctwas_chrom}\" \\\n", + " --xqtl-meta \"${xqtl_meta_data}\" \\\n", + " --twas-weights \"${\",\".join([str(w) for w in ctwas_weights])}\" \\\n", + " --gwas-sumstats-dir \"$outdir\" \\\n", + " --output \"$manifest\"\n", + "\n", + " # (2) one GwasSumStats per LD block (all studies on this chromosome).\n", + " studies=\"${\",\".join(ctwas_studies)}\"\n", + " gwas_tsvs=\"${\",\".join([regional_data['GWAS'][s][ctwas_chrom][0] for s in ctwas_studies])}\"\n", + " tail -n +2 \"$manifest\" | while IFS=$'\\t' read -r region_id region gwas_rds twas_w; do\n", + " Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \\\n", + " --study \"$studies\" \\\n", + " --gwas-tsv \"$gwas_tsvs\" \\\n", + " --ld-block \"$region\" \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --output \"$gwas_rds\"\n", + " done\n", + "\n", + " # (3) assemble cTWAS inputs.\n", + " Rscript code/script/pecotmr_integration/ctwas_assemble.R \\\n", + " --manifest \"$manifest\" \\\n", + " --twas-weight-cutoff ${twas_weight_cutoff} \\\n", + " --cs-min-cor ${cs_min_cor} \\\n", + " --min-pip-cutoff ${min_pip_cutoff} \\\n", + " --max-num-variants ${max_num_variants} \\\n", + " --output ${_output}" ] }, { @@ -857,67 +478,34 @@ "outputs": [], "source": [ "[ctwas_2]\n", - "## ctwas_2: estimate global group prior using from all regions \n", + "# cTWAS step 2 (estimate priors): estCtwasParam via ctwas_est.R. Reads the\n", + "# assembled inputs from [ctwas_1] on disk so it can be run independently.\n", + "# --fallback-to-prefit recovers the prefit estimate when the accurate EM\n", + "# cannot be estimated (e.g. the single-gene MWE), mirroring the legacy ctwas_2\n", + "# workaround.\n", "parameter: run_param_est = False\n", "parameter: skip_assembly = False\n", - "parameter: thin=1.0\n", + "parameter: thin = 1.0\n", "parameter: prior_var_structure = \"shared_all\"\n", + "parameter: niter = 50\n", + "# declared for CLI stability; not consumed by the S4 path\n", "parameter: multi_group = True\n", "parameter: numThreads = 8\n", - "parameter: niter = 50\n", - "\n", - "thin=int(thin) if thin == 1.0 else thin\n", - "skip_if(run_param_est == False, \" Skip [ctwas_2] parameter estimation. \" )\n", - "name = f\"{name}_{name_suffix}\" if name_suffix else name\n", - "\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "if multi_group:\n", - " input_pattern = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.chr*.thin{thin}.rds\" # by study\n", - "else:\n", - " input_pattern = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.*.chr*.thin{thin}.rds\" # by study-context pair\n", - "\n", - "input: input_pattern, group_by=\"all\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stderr\", container = container\n", - " \n", - " library(ctwas)\n", - " library(data.table)\n", - " library(pecotmr)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - " region_data_files <- c(${_input:r,})\n", - " names(region_data_files) <- gsub('^.*${name}.ctwas_region_data.\\\\s*|\\\\s*.chr*.*$', '', region_data_files) \n", - " gwas_studies <- if (length(${gwas_study})!=0 & ${\"TRUE\" if multi_group else \"FALSE\"}) ${gwas_study} else unique(names(region_data_files))\n", - "\n", - " # assess global parameters\n", - " for (study in gwas_studies){\n", - " region_files <- region_data_files[grepl(paste0(study, \".chr\"), region_data_files)]\n", - " region_data <- setNames(lapply(region_files, readRDS), names(region_files))\n", - " contexts <- unique(find_data(region_data, c(4, \"context\")))\n", - " # subset either study specific or study-context specific region data based on multigroup/single group\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " ## subset region data files that are not context-specific but study-specific \n", - " region_data <- region_data[!sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]\n", - " group_name <- 'multi'\n", - " } else {\n", - " region_data <- region_data[sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]\n", - " group_name <- 'single' \n", - " }\n", - " region_data <- lapply(split(region_data, names(region_data)), function(x) do.call(c, unname(x)))\n", - " saveRDS(region_data, file.path(outputdir, paste0(\"${name}.region_data_merged.\", group_name,\".\", study,\".thin${thin}.rds\")), compress='xz') #merge and save into one file to be used later\n", - " if (length(region_data[[study]])==0) stop(\"Study \", study, \" not present in region_data \", file.path(outputdir, paste0(\"${name}.region_data_merged.\", group_name, \".thin${thin}.rds\")))\n", - " group_prior_file <- file.path(outputdir, paste0(\"${name}.ctwas_param_${prior_var_structure}.\", study, \".thin${thin}.rds\"))\n", - " message(\"Start global parameter estimation for \", study, \". \")\n", - " param <- est_param(region_data[[study]],\n", - " group_prior_var_structure = \"${prior_var_structure}\", \n", - " niter_prefit = 3,\n", - " niter = ${niter},\n", - " min_group_size = 10,\n", - " ncore = ${numThreads},\n", - " verbose = TRUE)\n", - " saveRDS(param, group_prior_file, compress='xz')\n", - " region_data[[study]] <- NULL\n", - " }" + "skip_if(run_param_est == False, \"Skip [ctwas_2] parameter estimation.\")\n", + "input: f\"{cwd:a}/ctwas/{name}.ctwas_inputs.rds\"\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_est.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " Rscript code/script/pecotmr_integration/ctwas_est.R \\\n", + " --inputs ${_input} \\\n", + " --thin ${thin} \\\n", + " --niter ${niter} \\\n", + " --group-prior-var-structure ${prior_var_structure} \\\n", + " --min-group-size 1 \\\n", + " --fallback-to-prefit \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}" ] }, { @@ -934,245 +522,42 @@ "outputs": [], "source": [ "[ctwas_3]\n", - "## ctwas_3: perform finemapping for each region \n", - "parameter: thin=1.0\n", + "# cTWAS step 3 (screen + finemap): screenCtwasRegions + finemapCtwasRegions via\n", + "# ctwas_finemap.R. Reads the estimated params from [ctwas_2] on disk so it can\n", + "# be run independently. Emits the per-gene cTWAS fine-mapping result.\n", + "parameter: run_finemapping = False\n", + "parameter: L = 5\n", + "parameter: min_nonSNP_PIP = 0.5\n", + "# Boundary-gene region merging (legacy ctwas_3 merge_regions, default-off): when\n", + "# True, after fine-mapping, each high-PIP boundary gene's adjacent LD blocks are\n", + "# merged and re-fine-mapped via mergeCtwasBoundaryRegions (require-in-CS, like\n", + "# the legacy susie_pip>0.5 & !is.na(cs) selection); maxSNP caps SNPs per merged\n", + "# region.\n", + "parameter: merge_regions = False\n", "parameter: maxSNP = 20000\n", + "# declared for CLI stability; not consumed by the S4 path\n", + "parameter: thin = 1.0\n", "parameter: max_iter = 0\n", - "parameter: run_finemapping = False\n", "parameter: prior_var_structure = \"shared_all\"\n", - "# A list of regions to be subset for screening and fine-mapping, for example: \"10_80126158_82231647\"\n", - "parameter: region_name =[]\n", - "parameter: subset_context=[] # only process specified list of contexts for single group cTWAS\n", - "parameter: numThreads = 4\n", + "parameter: region_name = []\n", + "parameter: subset_context = []\n", "parameter: multi_group = True\n", - "parameter: merge_regions=False\n", - "parameter: L=5\n", - "# p_diff_thresh: p-value cutoff for identifying problematic SNPs with significant difference between observed z-scores and estimated values.\n", - "parameter: p_diff_thresh=5e-8\n", - "# sum of gene PIPs in the region should be larger than this threshold to get selected for finemapping\n", - "parameter: min_nonSNP_PIP=0.5\n", - "# additional name specification for fine-mapping result file names \n", - "parameter: alias=\"NULL\"\n", - "import glob\n", - "\n", - "skip_if(run_finemapping == False, \" Skip [ctwas_3] fine-mapping. \" )\n", - "thin=int(thin) if thin == 1.0 else thin\n", - "name = f\"{name}_{name_suffix}\" if name_suffix else name\n", - "base = f\"{cwd}/{step_name.split('_')[0]}/{name}\"\n", - "\n", - "region_data_file = (\n", - " [f\"{base}.region_data_merged.multi.{study}.thin{thin}.rds\" for study in gwas_study]\n", - " if multi_group\n", - " else [x for x in glob.glob(f\"{cwd:a}/{step_name.split('_')[0]}/{name}.region_data_merged.single.*.thin{thin}.rds\")]\n", - ")\n", - "\n", - "region_info_list = []\n", - "for region in region_name:\n", - " chrom = region.split(\"_\")[0]\n", - " if gwas_study:\n", - " weights = [f\"{base}.ctwas_weights.{study}.{chrom}.rds\" for study in gwas_study]\n", - " zfiles = [f\"{base}.z_gene_snp.{study}.{chrom}.rds\" for study in gwas_study]\n", - " params = [x for study in gwas_study for x in glob.glob(f\"{base}.ctwas_param_{prior_var_structure}.{study}*.thin{thin}.rds\")]\n", - " else:\n", - " weights = [x for x in glob.glob(f\"{base}.ctwas_weights.*.{chrom}.rds\")]\n", - " zfiles = [x for x in glob.glob(f\"{base}.z_gene_snp.*.{chrom}.rds\")]\n", - " params = [x for x in glob.glob(f\"{base}.ctwas_param_{prior_var_structure}.*.thin{thin}.rds\")]\n", - " region_info_list.append({\n", - " \"region_name\": region,\n", - " \"weights\": weights,\n", - " \"zfiles\": zfiles,\n", - " \"params\": params\n", - " })\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "subset_context = 'c(' + ', '.join(f'\"{x}\"' for x in subset_context) + ')'\n", - "input: region_info_list[_index][\"params\"], for_each = \"region_info_list\"\n", - "region_name = region_info_list[_index]['region_name']\n", - "weight_files = region_info_list[_index]['weights']\n", - "z_snp_file = region_info_list[_index]['zfiles']\n", - "params = region_info_list[_index]['params']\n", - "chrom = region_name.split(\"_\")[0]\n", - "depends: region_data_file, f\"{cwd}/{step_name.split('_')[0]}/{name}.snp_map.{chrom}.rds\",f\"{cwd}/{step_name.split('_')[0]}/{name}.LD_map.rds\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stderr\", container = container\n", - "\n", - " library(ctwas)\n", - " library(pecotmr)\n", - " library(data.table)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - "\n", - " gwas_meta_data <- fread(\"${gwas_meta_data}\",data.table=FALSE)\n", - " gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)[,1]\n", - "\n", - " weight_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in weight_files) + ')'}\n", - " z_snp_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in z_snp_file) + ')'}\n", - " param_study_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in params) + ')'}\n", - " names(param_study_files) <- gsub('^.*.ctwas_param_${prior_var_structure}.\\\\s*|\\\\s*.thin${thin}.*$', '', param_study_files)\n", - " param <- setNames(lapply(unname(param_study_files), readRDS), names(param_study_files))\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}){\n", - " param <- param[!sapply(names(param), function(x) any(sapply(paste0(gwas_studies, \".\"), function(c) grepl(c, x))))] # gwas_study per each prior \n", - " } else {\n", - " param <- param[sapply(names(param), function(x) any(sapply(paste0(gwas_studies, \".\"), function(c) grepl(c, x))))] # gwas_study x context pair per each prior\n", - " param <- param[grepl(${subset_context},names(param))]\n", - " }\n", - "\n", - " region_data_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in region_data_file) + ')'}\n", - " names(weight_files) <- gsub('^.*.ctwas_weights.\\\\s*|\\\\s*.${chrom}.*$', '', weight_files) # gwas study names regardless of single/multigroup\n", - " LD_map <- readRDS(\"${cwd}/${step_name.split('_')[0]}/${name}.LD_map.rds\")\n", - " snp_map <- readRDS(\"${cwd}/${step_name.split('_')[0]}/${name}.snp_map.${chrom}.rds\")\n", - " names(z_snp_files) <- gsub('^.*.z_gene_snp.\\\\s*|\\\\s*.${chrom}.*$', '', z_snp_files)\n", - " alias = ${\"NULL\" if alias == \"NULL\" else f\"'{alias}'\"}\n", - " \n", - " ## loop through gwas studies (multigroup) / gwas_study_context groups (single_group)\n", - " for (study in names(param)){\n", - " region_data <- readRDS(region_data_files[grepl(study, region_data_files)])\n", - " finemap_res_file <- file.path(outputdir, paste0(\"${name}.ctwas_finemap_res.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.tsv.gz\"))\n", - " if (length(alias)!=0) finemap_res_file <- gsub(\"${name}\", \"${name}_${alias}\", finemap_res_file)\n", - " susie_alpha_file <- gsub(\".tsv.gz\", \".rds\", gsub(\"ctwas_finemap_res\", \"ctwas_susie_alpha_res\", finemap_res_file))\n", - " if (nrow(region_data[[study]][[gsub(\"chr\", \"\", \"${region_name}\")]]$z_gene)==0) {\n", - " message(\"No z_gene data available for \", study, \" in ${region_name}. \")\n", - " fwrite(data.frame(), finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " next\n", - " }\n", - " \n", - " gwas_study <- study \n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}){\n", - " weights <- readRDS(weight_files[gwas_study])\n", - " } else { # single group \n", - " context <- gsub( \"\\\\|.*$\", \"\", names(param[[study]][['group_prior']])[1])\n", - " gwas_study <- gsub(paste0(\".\", context),\"\", study)\n", - " weights <- readRDS(weight_files[gwas_study])\n", - " if (length(weights[grepl(context, names(weights))])==0) next # no context specific weight data available \n", - " weights <- weights[which(grepl(context, names(weights)))] # subset\n", - " }\n", - " group_prior <- param[[study]]$group_prior\n", - " group_prior_var <- param[[study]]$group_prior_var \n", - " z_snp_list <- readRDS(z_snp_files[gwas_study])\n", - " z_snp <- z_snp_list$z_snp\n", - " z_gene <- z_snp_list$z_gene\n", - "\n", - " if (${thin} < 1){\n", - " region_data[[study]][gsub(\"chr\", \"\", \"${region_name}\")] <- expand_region_data(region_data[[study]][[gsub(\"chr\", \"\", \"${region_name}\")]],\n", - " snp_map,\n", - " z_snp,\n", - " maxSNP = ${maxSNP},\n", - " ncore = ${numThreads})\n", - " }\n", - " screen_res <- screen_regions(region_data[[study]][gsub(\"chr\", \"\", \"${region_name}\")],\n", - " group_prior = group_prior, group_prior_var = group_prior_var, min_nonSNP_PIP = ${min_nonSNP_PIP}, \n", - " ncore = ${numThreads}, verbose = FALSE, logfile = file.path(outputdir, \n", - " paste0(\"${name}.screen_regions.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.log\")))\n", - " screened_region_data <- screen_res$screened_region_data\n", - " screen_res_file <- paste0(\"${name}.screen_regions.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.rds\")\n", - " if (length(alias)!=0) screen_res_file <- gsub(\"${name}\", \"${name}_${alias}\",screen_res_file)\n", - " saveRDS(screen_res, file.path(outputdir, screen_res_file ))\n", - " if (length(screened_region_data)==0) {\n", - " message(\"No region selected for \", study, \" in ${region_name}. \")\n", - " fwrite(data.frame(), finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " next\n", - " }\n", - " message(\"Screening region completed for \", study, \". \")\n", - "\n", - " # finemap_regions() argument input \n", - " args <- list(screened_region_data, LD_map = LD_map, weights = weights, group_prior = group_prior, \n", - " group_prior_var = group_prior_var, L = ${L}, ncore = ${numThreads}, save_cor = FALSE, LD_format = \"custom\", \n", - " LD_loader_fun = ctwas_ld_loader, snpinfo_loader_fun = ctwas_bimfile_loader, verbose = TRUE, logfile = file.path(outputdir, \n", - " paste0(\"${name}.finemap_res.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.log\")))\n", - " if (as.logical(${max_iter})){\n", - " finemap_res_file <- gsub(study, paste0(study,\"_max_iter${max_iter}\"),finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study,\"_max_iter${max_iter}\"),susie_alpha_file)\n", - " args$max_iter <- ${max_iter}\n", - " }\n", - " # ctwas finemapping \n", - " res <- do.call(finemap_regions, args)\n", - " finemap_res <- res$finemap_res\n", - " finemap_res$pval <- z2p(finemap_res$z)\n", - " susie_alpha_res <- res$susie_alpha_res\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')\n", - "\n", - " # Get LD diagnosis \n", - " ld_diag <- diagnose_LD_mismatch_susie(region_ids = gsub(\"chr\", \"\", \"${region_name}\"), \n", - " z_snp = z_snp,\n", - " LD_map = LD_map, \n", - " gwas_n = z_snp_list$gwas_n, # gwas sample size \n", - " p_diff_thresh = ${p_diff_thresh},\n", - " ncore = ${numThreads},\n", - " LD_format = \"custom\",\n", - " LD_loader_fun = ctwas_ld_loader,\n", - " snpinfo_loader_fun = ctwas_bimfile_loader)\n", - " problematic_genes <- get_problematic_genes(ld_diag$problematic_snps, \n", - " weights, \n", - " finemap_res, \n", - " pip_thresh = 0.5)\n", - " # finemapping with no LD for problematic genes \n", - " problematic_region_ids <- unlist(unique(finemap_res[finemap_res$id %in% problematic_genes, \"region_id\"]))\n", - " if (length(problematic_region_ids) > 0) {\n", - " message(problematic_region_ids)\n", - " rerun_region_data <- screened_region_data[problematic_region_ids] # using already expanded screened_region_data\n", - " res <- finemap_regions_noLD(rerun_region_data, \n", - " group_prior = group_prior,\n", - " group_prior_var = group_prior_var)\n", - " finemap_res <- res$finemap_res\n", - " susie_alpha_res <- res$susie_alpha_res\n", - " message(\"Fine-mapping without LD for region ${region_name} with \", study, \". \")\n", - " finemap_res_file <- gsub(study, paste0(study, \"_update\"), finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study, \"_update\"), susie_alpha_file)\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')\n", - " }\n", - " ld_diag_file <- paste0(\"${name}.ctwas_ld_diag.${prior_var_structure}.${region_name}.\", study,\".thin${thin}.rds\")\n", - " if (length(alias)!=0) ld_diag_file <- gsub(\"${name}\", \"${name}_${alias}\",ld_diag_file)\n", - " ld_diag_plot_file <- gsub(\".ctwas_ld_diag.\", \".ctwas_ld_diag_plot.\", ld_diag_file )\n", - " ld_diag_plot_file <- gsub(\".rds\", \".pdf\", ld_diag_plot_file )\n", - " saveRDS(ld_diag[1:3], file.path(outputdir, ld_diag_file))\n", - " pdf(file.path(outputdir,ld_diag_plot_file), width = 7, height = 7)\n", - " print(ld_diag$plots)\n", - " dev.off()\n", - " message(\"Fine-mapping completed for region ${region_name} with \", study, \". \")\n", - " rm(screened_region_data, ld_diag, susie_alpha_res, res, screen_res)\n", - " gc()\n", - "\n", - " ## region merging for genes in multiple regions \n", - " boundary_genes <- file.path(outputdir, paste0(\"${name}.ctwas_boundary_genes.\", study, \".${chrom}.thin${thin}.rds\"))\n", - " if (file.exists(boundary_genes) & ${\"TRUE\" if merge_regions else \"FALSE\"}){\n", - " boundary_genes <- readRDS(boundary_genes)\n", - " high_PIP_finemap_gene_res <- subset(finemap_res, group != \"SNP\" & susie_pip > 0.5 & !is.na(cs))\n", - " high_PIP_genes <- unique(high_PIP_finemap_gene_res$id)\n", - " selected_boundary_genes <- boundary_genes[boundary_genes$id %in% high_PIP_genes, , drop=FALSE]\n", - " if (nrow(selected_boundary_genes)!=0){\n", - " message(\"Fine-mapping for merged region for ${region_name} with \", study, \". \")\n", - " region_info <- get_ctwas_meta_data(\"${ld_meta_data}\")$region_info\n", - " merge_region_res <- merge_region_data(selected_boundary_genes,\n", - " region_data[[study]],\n", - " region_info = region_info,\n", - " LD_map = LD_map,\n", - " snp_map = snp_map,\n", - " z_snp = z_snp,\n", - " z_gene = z_gene,\n", - " maxSNP = ${maxSNP})\n", - " finemap_merged_regions_res <- finemap_regions(merge_region_res$merged_region_data,\n", - " LD_map = merge_region_res$merged_LD_map,\n", - " weights = weights,\n", - " group_prior = group_prior,\n", - " group_prior_var = group_prior_var,\n", - " save_cor = FALSE,\n", - " LD_format = \"custom\", \n", - " LD_loader_fun = ctwas_ld_loader, \n", - " snpinfo_loader_fun = ctwas_bimfile_loader)\n", - " finemap_res_file <- gsub(study, paste0(study,\"_merged\"),finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study, \"_merged\"), susie_alpha_file)\n", - " finemap_res <- finemap_merged_regions_res$finemap_res\n", - " finemap_res$pval <- z2p(finemap_res$z)\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(finemap_merged_regions_res$susie_alpha_res, susie_alpha_file, compress='xz')\n", - " } else {\n", - " message(\"No high pip genes found for merged region for fine-mapping. \")\n", - " }\n", - " }\n", - " rm(merge_region_res,boundary_genes,finemap_res, weights, screen_res)\n", - " gc()\n", - " }" + "parameter: numThreads = 4\n", + "parameter: p_diff_thresh = 5e-8\n", + "parameter: alias = \"NULL\"\n", + "skip_if(run_finemapping == False, \"Skip [ctwas_3] fine-mapping.\")\n", + "input: f\"{cwd:a}/ctwas/{name}.ctwas_est.rds\"\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_finemap.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " Rscript code/script/pecotmr_integration/ctwas_finemap.R \\\n", + " --est ${_input} \\\n", + " --L ${L} \\\n", + " --min-nonsnp-pip ${min_nonSNP_PIP} \\\n", + " ${('--merge-regions --merge-filter-cs --max-snp ' + str(maxSNP)) if merge_regions else ''} \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}" ] }, { @@ -1368,4 +753,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} diff --git a/code/script/pecotmr_integration/ctwas_finemap.R b/code/script/pecotmr_integration/ctwas_finemap.R index f3779c4f..1eb95bc3 100644 --- a/code/script/pecotmr_integration/ctwas_finemap.R +++ b/code/script/pecotmr_integration/ctwas_finemap.R @@ -17,6 +17,14 @@ # --L Pass-through (default 5) # --no-filter-L Flag: disable ctwas's internal L >= 1 region screen # --min-nonsnp-pip Pass-through (default 0.5) +# --merge-regions Flag: after fine-mapping, merge boundary genes' +# adjacent LD blocks and re-fine-map the merged +# regions (legacy ctwas_3 merge_regions; default-off) +# --merge-pip-cutoff PIP threshold for selecting which boundary genes to +# merge (default 0.5) +# --merge-filter-cs Flag: require a boundary gene to be in a credible +# set to be selected for merging +# --max-snp Per-merged-region SNP cap (default Inf) # --ncore Pass-through (default 1) # --output Output RDS path (ctwas_sumstats-shape result) @@ -41,6 +49,18 @@ parser <- add_argument(parser, "--no-filter-L", parser <- add_argument(parser, "--min-nonsnp-pip", help = "min_nonSNP_PIP threshold for screen_regions", type = "numeric", default = 0.5) +parser <- add_argument(parser, "--merge-regions", + help = "Flag: merge boundary genes' adjacent regions and re-fine-map (legacy ctwas_3 merge_regions)", + flag = TRUE) +parser <- add_argument(parser, "--merge-pip-cutoff", + help = "PIP threshold for selecting boundary genes to merge", + type = "numeric", default = 0.5) +parser <- add_argument(parser, "--merge-filter-cs", + help = "Flag: require a boundary gene to be in a credible set to be merged", + flag = TRUE) +parser <- add_argument(parser, "--max-snp", + help = "Per-merged-region SNP cap", + type = "numeric", default = Inf) parser <- add_argument(parser, "--ncore", help = "Number of cores", type = "integer", default = 1L) @@ -74,6 +94,18 @@ final <- finemapCtwasRegions( L = argv$L, ncore = argv$ncore) +# Optional step 4: boundary-gene region merging + re-fine-map. +if (argv$merge_regions) { + final <- mergeCtwasBoundaryRegions( + final, + pipThresh = argv$merge_pip_cutoff, + filterCs = argv$merge_filter_cs, + maxSNP = argv$max_snp, + L = argv$L, + ncore = argv$ncore) + cat("Applied boundary-region merging (merge_regions).\n") +} + dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) saveRDS(final, argv$output) cat(sprintf("Wrote finemapCtwasRegions result to %s\n", argv$output)) diff --git a/code/script/pecotmr_integration/ctwas_manifest.R b/code/script/pecotmr_integration/ctwas_manifest.R new file mode 100644 index 00000000..4a7a0dd1 --- /dev/null +++ b/code/script/pecotmr_integration/ctwas_manifest.R @@ -0,0 +1,152 @@ +#!/usr/bin/env Rscript +# ctwas_manifest.R +# +# Build the per-LD-block manifest consumed by ctwas_assemble.R. cTWAS runs +# over the whole-chromosome LD-block grid (every LD-reference block is a SNP +# background "region"; only a few carry gene weights), so this worker: +# +# 1. reads each per-gene TwasWeights RDS to learn which genes have weights +# (the `trait` field) and which file holds each, +# 2. looks up each gene's chromosome + TSS in the xQTL meta table, +# 3. enumerates every LD block on the relevant chromosome(s) from the +# LD-meta TSV, and +# 4. assigns each gene's TwasWeights to its HOME block (the block whose +# [start, end) contains the gene's TSS) — assembleCtwasInputs uses a +# global GWAS-variant union so weight variants that straddle the block +# boundary still survive. +# +# The emitted manifest has one row per LD block, with the per-block +# GwasSumStats RDS path the caller is expected to build (region column drives +# that fan-out) and a (possibly empty) comma-separated TwasWeights list. +# +# NOTE: no data-layout path is hardcoded. The TwasWeights paths, meta tables, +# and the GwasSumStats output directory all come from arguments; the caller +# decides whether the weights are the upstream twas-step output or a +# substituted set. +# +# Inputs: +# --ld-meta LD-meta TSV (#chr/start/end/path); rows are LD blocks +# --xqtl-meta xQTL meta TSV (region_id = gene, #chr, TSS, ...) +# --twas-weights Comma-separated per-gene TwasWeights RDS paths +# --gwas-sumstats-dir Directory the per-block GwasSumStats RDS live in +# (path =