diff --git a/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb b/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb index 2ecd9ae2..fe5db87f 100644 --- a/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb +++ b/code/SoS/pecotmr_integration/SuSiE_enloc.ipynb @@ -561,19 +561,31 @@ "input: regional_data[\"data\"], group_by = lambda x: group_by_region(x, regional_data[\"data\"]), group_with = \"meta\"\n", "output: f'{cwd:a}/{name}.{_meta[0]}.enrichment.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", - " library(tidyverse)\n", - " library(pecotmr)\n", - " # RDS files for GWAS data\n", - " gwas_finemapped_data = c(${paths([x for x in _meta[1:len(_meta)]]):r,})\n", - " # RDS files for xQTL data\n", - " xqtl_finemapped_data = c(${paths([x for x in _input]):r,})\n", - " enrich_result = xqtl_enrichment_wrapper(gwas_files = gwas_finemapped_data, xqtl_files = xqtl_finemapped_data, \n", - " xqtl_finemapping_obj = c(\"${_meta[0]}\",${\",\".join(['\"%s\"' % x for x in xqtl_finemapping_obj]) if len(xqtl_finemapping_obj) != 0 else \"NULL\"}), \n", - " xqtl_varname_obj = c(\"${_meta[0]}\",${\",\".join(['\"%s\"' % x for x in xqtl_varname_obj]) if len(xqtl_varname_obj) != 0 else \"NULL\"}), \n", - " gwas_finemapping_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_finemapping_obj]) if len(gwas_finemapping_obj) != 0 else \"NULL\"}), \n", - " gwas_varname_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_varname_obj]) if len(gwas_varname_obj) != 0 else \"NULL\"}))\n", - " saveRDS(enrich_result, ${_output:ar})" + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " # Per-region xQTL files for this group, and the GWAS rds entries in _meta[1:].\n", + " # Convert each side of this region to a pecotmr S4 collection, then run\n", + " # qtlEnrichmentPipeline on the pair.\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode qtl \\\n", + " --rds-files ${\",\".join([str(x) for x in _input])} \\\n", + " --finemapping-obj '${\" \".join(xqtl_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(xqtl_varname_obj)}' \\\n", + " --study ${name} \\\n", + " --output ${_output:n}.qtl.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode gwas \\\n", + " --rds-files ${\",\".join([str(x) for x in _meta[1:] if str(x).endswith(\".rds\")])} \\\n", + " --finemapping-obj '${\" \".join(gwas_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(gwas_varname_obj)}' \\\n", + " --output ${_output:n}.gwas.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/qtl_enrichment.R \\\n", + " --qtl-fine-mapping ${_output:n}.qtl.rds \\\n", + " --gwas-fine-mapping ${_output:n}.gwas.rds \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}\n" ] }, { @@ -600,79 +612,32 @@ "# output: f'{cwd:a}/{step_name[:-2]}/{name}.{_meta[0]}.coloc.rds'\n", "output: f'{cwd:a}/{step_name}/{name}.{_meta[0]}.coloc.rds'\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", - " library(tidyverse)\n", - " library(pecotmr)\n", - " library(coloc) \n", - " # RDS files for xQTL data\n", - " # RDS files for xQTL data\n", - " xqtl_finemapped_datas = c(${paths([x for x in _input]):r,})\n", - " coloc_res <- list()\n", - " # are we doing something like this? that means results are by each context, and each one has \n", - " for(xqtl_finemapped_data in xqtl_finemapped_datas){\n", - " qtl_dat <- readRDS(xqtl_finemapped_data)\n", - " gene = names(qtl_dat)\n", - " context = \"${_meta[0]}\" %>% str_split(.,\"@\",simplify = T) %>% .[,1] \n", - " gene_region = pecotmr:::get_nested_element(qtl_dat[[1]], c(context,\"region_info\", \"grange\"))\n", - " \n", - " # Step 1: find relevant GWAS regions that overlap each the xQTL region of interest\n", - " gwas_finemapped_datas <- c(${ \",\".join(['\"%s\"' % x for x in _meta[1:]])}) \n", - " gwas_finemapped_datas <- gwas_finemapped_datas[grep('rds$',gwas_finemapped_datas)]\n", - " \n", - " gwas_regions <- gwas_finemapped_datas %>% basename %>% str_extract(., \"chr\\\\d+_\\\\d+_\\\\d+\")\n", - " overlap_index <- NULL\n", - " for (i in 1:length(gwas_regions)) {\n", - " region <- gwas_regions[i]\n", - " split_region <- unlist(strsplit(region, \"_\"))\n", - " block_chrom <- as.numeric(split_region[1] %>% gsub(\"chr\",\"\",.))\n", - " block_start <- as.numeric(split_region[2] %>% strsplit(., \"-\") %>% unlist %>% .[1])\n", - " block_end <- as.numeric(split_region[2] %>% strsplit(., \"-\") %>% unlist %>% .[2])\n", - " if (gene_region$chrom == block_chrom && (gene_region$start <= block_end | gene_region$end >= block_start)) {\n", - " overlap_index <- c(overlap_index, i)\n", - " }\n", - " }\n", - " \n", - " if (!is.null(overlap_index)) {\n", - " gwas_finemapped_data <- gwas_finemapped_datas[overlap_index]\n", - " \n", - " # Step 2: load enrichment analysis results\n", - " # Extract values for p1, p2, and p12\n", - " if(${\"FALSE\" if skip_enrich else \"TRUE\"}){\n", - " enrich_file = paste0('${cwd:a}','/', '${name}', '.' ,context,'.enrichment.rds')\n", - " p1 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p1`\n", - " p2 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p2`\n", - " p12 <- readRDS(enrich_file)[[1]]$`Alternative (coloc) p12`\n", - " \n", - " message(\"Priors are P1:\", p1, \"; p2: \", p2, \"; p12: \", p12)\n", - " } else {\n", - " p1 = 1e-4\n", - " p2 = 1e-4\n", - " p12 = 5e-6\n", - " message(\"Priors are using default, P1: 1e-4, P2: 1e-4, P12: 5e-6\" )\n", - " }\n", - " # Step 3: Apply colocalization analysis between each condition and GWAS\n", - " coloc_res[[gene]] <- coloc_wrapper(xqtl_file = xqtl_finemapped_data, gwas_files = gwas_finemapped_data, \n", - " xqtl_finemapping_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_finemapping_obj]) if len(xqtl_finemapping_obj) != 0 else \"NULL\"}), \n", - " xqtl_varname_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_varname_obj]) if len(xqtl_varname_obj) != 0 else \"NULL\"}), \n", - " gwas_finemapping_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_finemapping_obj]) if len(gwas_finemapping_obj) != 0 else \"NULL\"}), \n", - " gwas_varname_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_varname_obj]) if len(gwas_varname_obj) != 0 else \"NULL\"}),\n", - " xqtl_region_obj = c(context,${\",\".join(['\"%s\"' % x for x in xqtl_region_obj]) if len(xqtl_region_obj) != 0 else \"NULL\"}), \n", - " gwas_region_obj = c(${\",\".join(['\"%s\"' % x for x in gwas_region_obj]) if len(gwas_region_obj) != 0 else \"NULL\"}),\n", - " p1 = p1, p2 = p2, p12 = p12, filter_lbf_cs = ${\"FALSE\" if skip_enrich else \"TRUE\"})\n", - "\n", - " if (${\"TRUE\" if ld_meta_file_path.is_file() else \"FALSE\"}) {\n", - " if(length(coloc_res[[gene]]) > 2) {\n", - " coloc_res[[gene]] <- coloc_post_processor(coloc_res[[gene]], LD_meta_file_path = ${ld_meta_file_path:r}, analysis_region = coloc_res[[gene]]$analysis_region)\n", - " if(!is.null(coloc_res[[gene]]$sets$cs)) writeLines(coloc_res[[gene]]$sets$cs %>% unlist, gsub(\"rds$\",\"coloc_res\",\"${_output}\"))\n", - " } else { message(\"No coloc results were generated.\") }\n", - " }\n", - " \n", - " } else {\n", - " message(\"No overlap was found between GWAS blocks and QTL analysis region.\")\n", - " coloc_res <- \"No overlap was found between GWAS blocks and QTL analysis region.\"\n", - " }\n", - " }\n", - " saveRDS(coloc_res, ${_output:r})" + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " # context (before '@' in _meta[0]) selects the per-region enrichment file the\n", + " # enrichment step wrote: {cwd}/{name}.{context}.enrichment.rds (legacy naming).\n", + " # Convert this region's xQTL + GWAS sides, then run colocPipeline (enloc\n", + " # variant when --enrichment is supplied).\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode qtl \\\n", + " --rds-files ${\",\".join([str(x) for x in _input])} \\\n", + " --finemapping-obj '${\" \".join(xqtl_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(xqtl_varname_obj)}' \\\n", + " --study ${name} \\\n", + " --output ${_output:n}.qtl.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/legacy_enloc_finemap_convert.R \\\n", + " --mode gwas \\\n", + " --rds-files ${\",\".join([str(x) for x in _meta[1:] if str(x).endswith(\".rds\")])} \\\n", + " --finemapping-obj '${\" \".join(gwas_finemapping_obj)}' \\\n", + " --varname-obj '${\" \".join(gwas_varname_obj)}' \\\n", + " --output ${_output:n}.gwas.rds\n", + "\n", + " Rscript code/script/pecotmr_integration/coloc.R \\\n", + " --qtl-fine-mapping ${_output:n}.qtl.rds \\\n", + " --gwas-input ${_output:n}.gwas.rds \\\n", + " ${('--enrichment '+cwd.absolute().joinpath(name + \".\" + _meta[0].split(\"@\")[0] + \".enrichment.rds\").as_posix()) if not skip_enrich else ''} \\\n", + " --output ${_output}\n" ] }, { @@ -722,4 +687,4 @@ }, "nbformat": 4, "nbformat_minor": 5 -} \ No newline at end of file +} diff --git a/code/SoS/pecotmr_integration/twas_ctwas.ipynb b/code/SoS/pecotmr_integration/twas_ctwas.ipynb index cc3ef4a5..63e36095 100644 --- a/code/SoS/pecotmr_integration/twas_ctwas.ipynb +++ b/code/SoS/pecotmr_integration/twas_ctwas.ipynb @@ -293,312 +293,90 @@ "outputs": [], "source": [ "[twas]\n", + "# Per-region TWAS-Z + Mendelian Randomization, bridged to the pecotmr S4\n", + "# wrappers (no inline analysis R; no Python helpers). Restores the legacy\n", + "# per-region fan-out: one task per analysis region, looping the region's genes\n", + "# inside. Per region it (1) builds one multi-study GwasSumStats over the region\n", + "# via gwas_sumstats_construct.R (its LD sketch spans whatever LD-reference\n", + "# blocks the region overlaps), then per gene (2) converts the legacy\n", + "# univariate_twas_weights.rds to a pecotmr TwasWeights via\n", + "# legacy_twas_weights_convert.R and (3) runs twas.R (causalInferencePipeline)\n", + "# to emit a per-gene .twas.rds. MR is deferred (no fine-mapping result is\n", + "# passed), so only TWAS Z + p-value are produced.\n", "depends: sos_variable(\"filtered_regional_xqtl_files\")\n", + "# Legacy CLI retained. Wired to the pecotmr selection knobs: rsq_cutoff,\n", + "# rsq_pval_cutoff, rsq_option, rsq_pval_option (CV weight selection). MR is\n", + "# deferred, so mr_pval_cutoff and the remaining legacy params are kept declared\n", + "# for CLI stability but are not consumed by the S4 path.\n", "parameter: coverage = \"cs_coverage_0.95\"\n", - "# Threshold for rsq and pvalue for imputability determination for a model \n", + "# Thresholds for rsq and CV p-value for imputability/best-model selection\n", "parameter: rsq_cutoff = 0.01\n", "parameter: rsq_pval_cutoff = 0.05\n", "parameter: mr_pval_cutoff = 0.05\n", "parameter: save_ctwas_data = True\n", "parameter: save_mr_result = True\n", - "parameter: rsq_option=\"rsq\"\n", - "parameter: rsq_pval_option=[\"adj_rsq_pval\", \"pval\"]\n", - "# load by batches if memory resource is limited, default to load all at once \n", + "parameter: rsq_option = \"rsq\"\n", + "parameter: rsq_pval_option = [\"adj_rsq_pval\", \"pval\"]\n", + "# load by batches if memory resource is limited, default to load all at once\n", "parameter: batch_load_memory = 500\n", "parameter: event_filter_rules = path()\n", "parameter: comment_string = \"NULL\"\n", "parameter: rename_column = False\n", - "stop_if(len(filtered_regional_xqtl_files) == 0, \n", + "stop_if(len(filtered_regional_xqtl_files) == 0,\n", " \"No regions with overlapping xQTL weights found; skipping TWAS step.\")\n", "input: filtered_regional_xqtl_files, group_by = lambda x: group_by_region(x, filtered_regional_xqtl_files), group_with = \"filtered_region_info\"\n", - "output_files = [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas.tsv.gz']\n", - "if save_ctwas_data:\n", - " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.twas_data.rds')\n", - "if save_mr_result:\n", - " output_files.append(f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.mr_result.tsv.gz')\n", - "output: output_files\n", + "output: [f'{cwd:a}/{step_name}/{name}.{_filtered_region_info[3]}.{gene}.twas.rds' for gene in sorted(set(_filtered_region_info[4]))]\n", "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f'{step_name}_{_output[0]:bn}'\n", - "R: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container\n", - "\n", - " library(dplyr)\n", - " library(data.table)\n", - " library(pecotmr)\n", - " library(readr)\n", - "\n", - "\n", - " # function to update method selection\n", - " update_twas_method <- function(twas_df, rsq_cutoff =${rsq_cutoff}, rsq_pval_cutoff = ${rsq_pval_cutoff}){\n", - " if (nrow(twas_df) == 0) { \n", - " return(twas_df)\n", - " } \n", - " twas_df$method_selected_original <- twas_df$is_selected_method\n", - " twas_df_colnames <- colnames(twas_df)\n", - " twas_df$gene_context <- paste0(twas_df$molecular_id, \"_\", twas_df$context)\n", - " fix_sel <- twas_df$is_selected_method & (is.na(twas_df$twas_z) | is.na(twas_df$twas_pval) | is.infinite(twas_df$twas_pval) |\n", - " is.null(twas_df$twas_z) | is.null(twas_df$twas_pval))\n", - " if (!any(fix_sel)) return(twas_df) # If nothing to fix, return fast\n", - " all_group <- paste(twas_df$gene_context, twas_df$gwas_study, sep = \" - \") #gene-context-study group\n", - " fix_groups <- unique(all_group[fix_sel])\n", - "\n", - " for (fixgroup in fix_groups) {\n", - " df_idxs <- which(all_group == fixgroup) # idx of gene-context-study group that needs replacement of method selected\n", - " if (length(df_idxs) == 1) next\n", - " candidate_idxs <- df_idxs[!is.na(twas_df$twas_z[df_idxs]) & !is.na(twas_df$twas_pval[df_idxs]) & !is.infinite(twas_df$twas_z[df_idxs]) & \n", - " !is.null(twas_df$twas_z[df_idxs]) & !is.null(twas_df$twas_pval[df_idxs]) &\n", - " twas_df$rsq_cv[df_idxs] >= rsq_cutoff & twas_df$pval_cv[df_idxs] <= rsq_pval_cutoff] \n", - " if (length(candidate_idxs) <= 0 || all(is.na(candidate_idxs))) next\n", - " best_i <- candidate_idxs[which.max(twas_df$rsq_cv[candidate_idxs])]\n", - " best_method <- twas_df$method[best_i]\n", - " twas_df$is_selected_method[df_idxs] <- (twas_df$method[df_idxs] == best_method)\n", - " message (\"Updating method selection for \", fixgroup, \".\")\n", - " }\n", - " return(twas_df[, twas_df_colnames])\n", - " }\n", - "\n", - " # Load metadata and configuration - let these fail if there are issues\n", - " if (${\"TRUE\" if rename_column else \"FALSE\"}) {\n", - " gwas_meta_data = fread(\"${gwas_meta_data}\", data.table=FALSE)\n", - " column_file_path = gwas_meta_data$column_mapping[1]\n", - " } else {\n", - " column_file_path = NULL\n", - " }\n", - " \n", - " xqtl_meta_df <- fread(\"${xqtl_meta_data}\", data.table=FALSE)\n", - " xqtl_meta_df <- xqtl_meta_df[!duplicated(xqtl_meta_df[, c(\"region_id\", \"TSS\")]), ]\n", - " xqtl_type_table <- if (isTRUE(file.exists(\"${xqtl_type_table}\"))) fread(\"${xqtl_type_table}\") else NULL\n", - " gene_list <- c(${', '.join([f\"'{gene}'\" for gene in _filtered_region_info[4]])})\n", - " \n", - " event_filter_rules = ${\"NULL\" if not event_filter_rules.is_file() else \"'%s'\" % event_filter_rules}\n", - " if (!is.null(event_filter_rules)) {\n", - " event_filters <- read.table(\"${event_filter_rules}\")\n", - " event_filters <- lapply(1:nrow(event_filters), function(ii) as.list(event_filters[ii,] %>% unlist))\n", - " } else { \n", - " event_filters <- NULL \n", - " }\n", - "\n", - " # Process weights with targeted error handling for file operations\n", - " weight_db_list <- c(${_input:r,})\n", - " names(weight_db_list) <- gene_list\n", - " weight_db_list <- split(weight_db_list, names(weight_db_list))\n", - " \n", - " # Filter out empty/invalid weight files with error handling\n", - " weight_db_list_update <- tryCatch({\n", - " Filter(Negate(is.null), lapply(weight_db_list, function(file_list) {\n", - " do.call(c, lapply(file_list, function(file) {\n", - " if (file.exists(file) && file.size(file) > 200) file else NULL\n", - " }))\n", - " }))\n", - " }, error = function(e) {\n", - " message(paste(\"Error checking weight files:\", e$message))\n", - " return(list())\n", - " })\n", - "\n", - " if(length(weight_db_list_update) <= 0) {\n", - " message(paste0(\"No valid weight files for region ${_filtered_region_info[3]}. Creating empty output files.\"))\n", - " # Define TWAS result columns (same as in final_results)\n", - " twas_header <- data.frame(\n", - " gene = character(),\n", - " molecular_id = character(),\n", - " TSS = numeric(),\n", - " start = numeric(),\n", - " end = numeric(),\n", - " beta = numeric(),\n", - " se = numeric(),\n", - " z = numeric(),\n", - " pval = numeric(),\n", - " qval = numeric(),\n", - " rsq = numeric(),\n", - " mr_beta = numeric(),\n", - " mr_se = numeric(),\n", - " mr_pval = numeric()\n", - " )\n", - "\n", - " # Write empty file with header, gzipped\n", - " fwrite(twas_header, file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " quit(save = \"no\", status = 0)\n", - " }\n", - "\n", - " # Load TWAS weights - allow this to fail with informative errors\n", - " message(paste(\"Loading TWAS weights for\", length(weight_db_list_update), \"genes\"))\n", - " twas_weights_results <- list()\n", - " \n", - " for (gene_db in names(weight_db_list_update)) {\n", - " weight_dbs <- weight_db_list_update[[gene_db]]\n", - " message(paste(\"Processing gene:\", gene_db, \"with\", length(weight_dbs), \"weight files\"))\n", - " conditions = xqtl_meta_df |> filter(region_id == gene_db) |> pull(contexts)\n", - "\n", - " # Load weights for this gene - let it fail if there are real issues\n", - " twas_weights_results[[gene_db]] = load_twas_weights(\n", - " weight_dbs, \n", - " conditions = if(is.na(conditions)) NULL else conditions,\n", - " variable_name_obj = \"variant_names\", \n", - " susie_obj = \"susie_weights_intermediate\",\n", - " twas_weights_table = \"twas_weights\"\n", - " )\n", - "\n", - " if (length(twas_weights_results[[gene_db]]) > 1) {\n", - " twas_weights_results[[gene_db]]$data_type <- setNames(\n", - " lapply(names(twas_weights_results[[gene_db]]$weights), function(context) {\n", - " xqtl_type_table$type[sapply(xqtl_type_table$context, function(x) grepl(x, context))]\n", - " }), \n", - " names(twas_weights_results[[gene_db]]$weights)\n", - " )\n", - " no_type_value <- sapply(names(twas_weights_results[[gene_db]]$data_type), function(x) length(twas_weights_results[[gene_db]]$data_type[[x]])==0)\n", - " if (any(no_type_value)){\n", - " aff_contexts <- names(twas_weights_results[[gene_db]]$data_type)[no_type_value]\n", - " warning(paste(aff_contexts, collapse=\", \"), \" data type information if not documented in ${xqtl_meta_data}, assigning as undefined. \")\n", - " for (i in which(no_type_value)) {\n", - " if (length(twas_weights_results[[gene_db]]$data_type[[i]])==0) twas_weights_results[[gene_db]]$data_type[[i]] <- \"undefined\"\n", - " }\n", - " }\n", - " } else {\n", - " message(paste(\"No valid weights loaded for gene:\", gene_db))\n", - " twas_weights_results[[gene_db]] <- NULL\n", - " } \n", - " }\n", - " \n", - " # Clean up and check if we have any valid results\n", - " rm(weight_db_list, weight_db_list_update, xqtl_type_table, gene_list)\n", - " gc()\n", - " \n", - " # Filter out NULL results and report\n", - " valid_genes <- names(Filter(Negate(is.null), twas_weights_results))\n", - " message(paste(\"Valid weights loaded for\", length(valid_genes), \"genes:\", paste(valid_genes, collapse = \", \")))\n", - " \n", - " if (length(valid_genes) == 0) {\n", - " message(paste0(\"No valid TWAS weights loaded for region ${_filtered_region_info[3]}. Creating empty output files.\"))\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " quit(save = \"no\", status = 0)\n", - " }\n", - "\n", - " # Batch load weights - allow this to fail\n", - " twas_weights_results <- batch_load_twas_weights(\n", - " twas_weights_results = twas_weights_results,\n", - " meta_data_df = xqtl_meta_df,\n", - " max_memory_per_batch = ${\"Inf\" if batch_load_memory == \"inf\" else batch_load_memory}\n", - " )\n", - " \n", - " message(paste(\"Proceeding with TWAS analysis for\", length(twas_weights_results), \"batches\"))\n", - "\n", - " # TWAS analysis - allow this to fail with informative errors\n", - " twas_results_db <- vector(\"list\", length(twas_weights_results)) \n", - " for (batch in seq_along(twas_weights_results)) {\n", - " message(paste(\"Processing batch\", batch, \"of\", length(twas_weights_results)))\n", - " res <- tryCatch(\n", - " twas_pipeline(\n", - " twas_weights_results[[batch]], \n", - " \"${ld_meta_data}\", \n", - " \"${gwas_meta_data}\", \n", - " region_block = \"${_filtered_region_info[3]}\",\n", - " ld_reference_sample_size = ${ld_reference_sample_size},\n", - " rsq_cutoff = ${rsq_cutoff}, \n", - " rsq_option = \"${rsq_option}\", \n", - " rsq_pval_cutoff = ${rsq_pval_cutoff}, \n", - " rsq_pval_option = c(${\", \".join([f'\"{x}\"' for x in rsq_pval_option])}), \n", - " mr_pval_cutoff = ${mr_pval_cutoff},\n", - " mr_coverage_column = \"${coverage}\", \n", - " output_twas_data = ${\"TRUE\" if save_ctwas_data else \"FALSE\"},\n", - " event_filters = event_filters,\n", - " column_file_path = column_file_path,\n", - " comment_string = ${\"NULL\" if comment_string == \"NULL\" else f\"'{comment_string}'\"}\n", - " ),\n", - " error = function(e) {\n", - " message(paste(\"Batch\", batch, \"ERROR:\", e$message))\n", - " return(NULL)\n", - " }\n", - " )\n", - " \n", - " # res is NULL → skip safely\n", - " if (is.null(res) || is.null(res$twas_result)) {\n", - " message(paste(\"Batch\", batch, \"returned no results\"))\n", - " twas_results_db[[batch]] <- NULL\n", - " next\n", - " }\n", - " \n", - " message(paste(\"Batch\", batch, \"produced\", nrow(res$twas_result), \"TWAS results\"))\n", - " twas_results_db[[batch]] <- res\n", - " }\n", - " \n", - " # Now safely drop NULLs\n", - " twas_results_db <- Filter(Negate(is.null), twas_results_db)\n", - " \n", - " rm(twas_weights_results)\n", - " gc()\n", - " \n", - " # Report final results\n", - " message(paste(\"Final valid batches:\", length(twas_results_db)))\n", - "\n", - " if(length(twas_results_db) != 0){\n", - " # Merge with metadata\n", - " for (batch in 1:length(twas_results_db)){\n", - " if (!is.null(twas_results_db[[batch]]$twas_result)) {\n", - " twas_results_db[[batch]]$twas_result <- merge(\n", - " twas_results_db[[batch]]$twas_result, \n", - " xqtl_meta_df[, c(\"region_id\",\"TSS\",\"start\",\"end\")], \n", - " by.x = \"molecular_id\", by.y = \"region_id\"\n", - " )\n", - " }\n", - " }\n", - " \n", - " # Combine and write results\n", - " final_results <- do.call(rbind, lapply(twas_results_db, function(x) {\n", - " if (!is.null(x$twas_result)) x$twas_result[, c(2,1,14:16,3:13)] else data.frame()\n", - " }))\n", - "\n", - " # update methods if twas z-score is NA \n", - " final_results <- update_twas_method(final_results) \n", - " if (nrow(final_results)==0) {\n", - " message(\"No valid TWAS results to write\")\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " } else {\n", - " message(paste(\"Writing\", nrow(final_results), \"final TWAS results\"))\n", - " fwrite(final_results, ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " } else {\n", - " message(\"No valid TWAS results to write\")\n", - " fwrite(data.frame(), file = ${_output[0]:r}, sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - "\n", - " # Save cTWAS data if requested\n", - " if (${\"TRUE\" if save_ctwas_data else \"FALSE\"} & length(twas_results_db) != 0) {\n", - " if (length(twas_results_db) > 1){\n", - " # Combine data from multiple batches\n", - " twas_results_db[[1]]$twas_data$weights <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$weights))\n", - " twas_results_db[[1]]$twas_data$susie_weights_intermediate_qced <- do.call(c, lapply(twas_results_db, function(x) x$twas_data$susie_weights_intermediate_qced))\n", - " studies <- unique(names(find_data(twas_results_db, c(3, \"z_gene\"))))\n", - " for (study in studies){\n", - " twas_results_db[[1]][[\"twas_data\"]][[\"z_gene\"]][[study]] <- do.call(rbind, lapply(twas_results_db, function(x) x$twas_data$z_gene[[study]]))\n", - " }\n", - " }\n", - " saveRDS(twas_results_db[[1]]$twas_data, \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " } else if (${\"TRUE\" if save_ctwas_data else \"FALSE\"}) {\n", - " saveRDS(list(), \"${_output[0]:nnn}.twas_data.rds\", compress='xz')\n", - " }\n", - "\n", - " # Save MR results if requested\n", - " if (${\"TRUE\" if save_mr_result else \"FALSE\"}) {\n", - " if(length(twas_results_db) != 0) {\n", - " mr_results <- do.call(rbind, lapply(twas_results_db, function(x) x$mr_result))\n", - " message(paste(\"Writing\", nrow(mr_results), \"MR results\"))\n", - " fwrite(mr_results, file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " } else {\n", - " fwrite(data.frame(), file = \"${_output[0]:nnn}.mr_result.tsv.gz\", sep = \"\\t\", compress = \"gzip\")\n", - " }\n", - " }\n", - "\n", - " message(\"TWAS analysis completed successfully\")" + "bash: expand = '${ }', stdout = f\"{_output[0]:n}.stdout\", stderr = f\"{_output[0]:n}.stderr\", container = container\n", + " set -e\n", + " # SoS expands ${...}; bare $shellvars (no braces) are left for bash.\n", + " outdir=${cwd:a}/${step_name}\n", + " mkdir -p \"$outdir\"\n", + " region_id=${_filtered_region_info[3]}\n", + " ld_block=${_filtered_region_info[0]}:${_filtered_region_info[1]}-${_filtered_region_info[2]}\n", + "\n", + " # GWAS studies covering this region's chromosome (paths already resolved by\n", + " # get_analysis_regions). studies and gwas_tsvs iterate the same dict order.\n", + " studies=\"${\",\".join([s for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}\"\n", + " gwas_tsvs=\"${\",\".join([regional_data['GWAS'][s][_filtered_region_info[0]][0] for s in regional_data['GWAS'] if _filtered_region_info[0] in regional_data['GWAS'][s]])}\"\n", + "\n", + " # (1) One multi-study GwasSumStats over the whole region.\n", + " gss=\"$outdir/${name}.$region_id.gwas_sumstats.rds\"\n", + " Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \\\n", + " --study \"$studies\" \\\n", + " --gwas-tsv \"$gwas_tsvs\" \\\n", + " --ld-block \"$ld_block\" \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --output \"$gss\"\n", + "\n", + " # (2)+(3) Per unique gene in this region (first weight file per gene):\n", + " # convert legacy weights (+FMR for MR), then run TWAS-Z + MR. The expansion\n", + " # below emits a flat \"gene file gene file ...\" token list (first file per\n", + " # gene); the loop consumes it two tokens at a time.\n", + " n_files=${len(_input)}\n", + " n_genes=${len(set([str(g) for g in _filtered_region_info[4]]))}\n", + " if [ \"$n_files\" -ne \"$n_genes\" ]; then\n", + " echo \"NOTE: $n_files weight file(s) across $n_genes gene(s) in region $region_id; using the first file per gene.\" >&2\n", + " fi\n", + " set -- ${\" \".join([tok for i, (g, f) in enumerate(zip(_filtered_region_info[4], _input)) if str(g) not in [str(h) for h in _filtered_region_info[4][:i]] for tok in (str(g), str(f))])}\n", + " while [ \"$#\" -ge 2 ]; do\n", + " gene=\"$1\"\n", + " weight=\"$2\"\n", + " shift 2\n", + " tw=\"$outdir/${name}.$region_id.$gene.twas_weights.rds\"\n", + " Rscript code/script/pecotmr_integration/legacy_twas_weights_convert.R \\\n", + " --legacy \"$weight\" \\\n", + " --study \"${name}\" \\\n", + " --output \"$tw\"\n", + " Rscript code/script/pecotmr_integration/twas.R \\\n", + " --twas-weights \"$tw\" \\\n", + " --gwas-sumstats \"$gss\" \\\n", + " --rsq-cutoff ${rsq_cutoff} \\\n", + " --rsq-pval-cutoff ${rsq_pval_cutoff} \\\n", + " --rsq-option ${rsq_option} \\\n", + " --rsq-pval-option \"${\",\".join(rsq_pval_option)}\" \\\n", + " --output \"$outdir/${name}.$region_id.$gene.twas.rds\"\n", + " done" ] }, { @@ -614,235 +392,78 @@ "outputs": [], "source": [ "[ctwas_1]\n", - "# ctwas_1: merge regions by each chromosome \n", + "# cTWAS step 1 (assemble): build the per-LD-block inputs for one gene-bearing\n", + "# chromosome and assemble them via the pecotmr S4 wrappers (no inline analysis\n", + "# R; no Python helpers). Per chromosome: (1) ctwas_manifest.R enumerates the\n", + "# chromosome's LD blocks from ld_meta_data, maps each gene (by TSS in\n", + "# xqtl_meta_data) to its home block, and pairs each block with a per-block\n", + "# GwasSumStats path + the gene's TwasWeights; (2) gwas_sumstats_construct.R\n", + "# builds one GwasSumStats per block (SNP background); (3) ctwas_assemble.R runs\n", + "# assembleCtwasInputs -> {name}.ctwas_inputs.rds. cTWAS runs over the whole\n", + "# chromosome's LD blocks, NOT the coarse twas analysis region.\n", "depends: sos_variable(\"filtered_region_info\")\n", - "# chromosome number to merge for region data, can be an integer or a string, i.e. 'chr1'\n", - "parameter: chrom=\"\"\n", - "# twas weight cutoff for ctwas variant selection\n", + "# chromosome to assemble (integer or 'chrN'); default: the lone gene-bearing chrom\n", + "parameter: chrom = \"\"\n", + "# weight pre-filters passed to assembleCtwasInputs\n", "parameter: twas_weight_cutoff = 0.0\n", - "parameter: max_num_variants =\"Inf\"\n", - "# cs_min_cor cutoff in susie finemapping result for variant selection\n", - "parameter: cs_min_cor = 0\n", - "# minimum pip cutoff in susie finemapping result for variant selection\n", - "parameter: min_pip_cutoff =0 \n", - "# A scalar in (0,1]. The proportion of SNPs, reduce runtime at the cost of accuracy \n", + "parameter: max_num_variants = \"Inf\"\n", + "parameter: cs_min_cor = 0.0\n", + "parameter: min_pip_cutoff = 0.0\n", + "# declared for CLI stability; consumed downstream / not by assembleCtwasInputs\n", "parameter: thin = 1.0\n", - "# Maximum number of SNPs in a region.\n", "parameter: maxSNP = 20000\n", - "# skip this step if True\n", - "parameter: skip_assembly = False\n", - "skip_if(skip_assembly == True, \" Skip [ctwas_1] assemble regions. \" )\n", "parameter: multi_group = True\n", - "# A string character add to the end of name variable\n", "parameter: numThreads = 4\n", - "\n", - "twas_output_files = {}\n", - "region_blocks_per_chrom = []\n", - "chromosome_list = []\n", - "chrom = f\"chr{int(chrom)}\" if str(chrom).isdigit() else chrom\n", - "for region_info in filtered_region_info:\n", - " chrom_info = region_info[0]\n", - " if chrom_info != chrom:\n", - " continue # Skip chromosomes other than chrom input \n", - " file_path = f\"{cwd}/twas/{name}.{region_info[3]}.twas_data.rds\"\n", - " if chrom_info not in twas_output_files:\n", - " twas_output_files[chrom_info] = [[], []] # Structure: {\"chrX\": [[files], [region_names]]}\n", - " chromosome_list.append(chrom_info)\n", - " twas_output_files[chrom_info][0].append(file_path) # File paths\n", - " twas_output_files[chrom_info][1].append(region_info[3]) # Region names\n", - "twas_files=[twas_output_files[chr][0] for chr in chromosome_list]\n", - "\n", - "region_blocks_per_chrom = [twas_output_files[chr][1] for chr in chromosome_list]\n", - "region_blocks_per_chrom = [f\"\"\"c(\"{'\", \"'.join(regions)}\")\"\"\" for regions in region_blocks_per_chrom]\n", - "outdir = f\"{cwd}/{step_name.split('_')[0]}\"\n", - "if not os.path.exists(outdir):\n", - " os.makedirs(outdir)\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "name_suffix = f'\"{name_suffix}\"' if name_suffix else 'NULL'\n", - "\n", - "input: twas_files, group_by = lambda x: group_by_region(x, twas_files), group_with=dict(region_names=region_blocks_per_chrom)\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.{chrom}.thin{thin}.stderr\", container = container\n", - "\n", - " library(data.table)\n", - " library(ctwas) # multigroup_ctwas\n", - " library(pecotmr)\n", - " library(stringr)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - " \n", - " # load genome-wide data across all regions\n", - " gwas_meta_data <- fread(\"${gwas_meta_data}\",data.table=FALSE)\n", - " if(!'sample_size' %in% colnames(gwas_meta_data)) stop('Please add column `sample_size` information for each GWAS study in --gwas_meta_data. ')\n", - " gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)\n", - " gwas_files <- gwas_meta_data[gwas_meta_data$chrom == readr::parse_number(\"${chrom}\") & gwas_meta_data$study_id %in% gwas_studies,, drop=FALSE]\n", - " gwas_files <- setNames(gwas_files$file_path, gwas_files$study_id)\n", - "\n", - " merge_context_names <- function(ctwas_dat_file){\n", - " ctwas_dat<- readRDS(ctwas_dat_file)\n", - " idxs <- which(grepl(\"_chr[0-9XY]+_[^_]+\", names(ctwas_dat$weights)))\n", - " if (length(idxs)>0) {\n", - " names(ctwas_dat$weights) <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", names(ctwas_dat$weights))\n", - " for (idx in idxs) {\n", - " for (study in names(ctwas_dat$weights[[idx]])){\n", - " ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$weights[[idx]][[study]]$weight_name)\n", - " ctwas_dat$weights[[idx]][[study]]$context <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$weights[[idx]][[study]]$context)\n", - " }\n", - " }\n", - " for (study in names(ctwas_dat$z_gene)){\n", - " ctwas_dat$z_gene[[study]]$id <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$id)\n", - " ctwas_dat$z_gene[[study]]$context <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$context)\n", - " ctwas_dat$z_gene[[study]]$group <- gsub(\"_chr[0-9XY]+_[A-Za-z0-9]+\", \"\", ctwas_dat$z_gene[[study]]$group)\n", - " }\n", - " for (gene in names(ctwas_dat$susie_weights_intermediate_qced)){\n", - " names(ctwas_dat$susie_weights_intermediate_qced[[gene]]) <- gsub(\"_chr[0-9XY]+_[^_]+\", \"\", names(ctwas_dat$susie_weights_intermediate_qced[[gene]]))\n", - " }\n", - " }\n", - " idxs <- which(grepl(\"monocyte\", names(ctwas_dat$weights)))\n", - " if (length(idxs)>0) {\n", - " names(ctwas_dat$weights) <- gsub(\"monocyte_eQTL\", \"eQTL\", names(ctwas_dat$weights))\n", - " for (idx in idxs) {\n", - " for (study in names(ctwas_dat$weights[[idx]])){\n", - " ctwas_dat$weights[[idx]][[study]]$weight_name <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$weights[[idx]][[study]]$weight_name)\n", - " ctwas_dat$weights[[idx]][[study]]$type <- \"eQTL\"\n", - " }\n", - " }\n", - " for (study in names(ctwas_dat$z_gene)){\n", - " ctwas_dat$z_gene[[study]]$id <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$z_gene[[study]]$id)\n", - " ctwas_dat$z_gene[[study]]$type <- gsub(\"monocyte_\", \"\", ctwas_dat$z_gene[[study]]$type)\n", - " ctwas_dat$z_gene[[study]]$group <- gsub(\"monocyte_eQTL\", \"eQTL\", ctwas_dat$z_gene[[study]]$group)\n", - " }\n", - " }\n", - " return(ctwas_dat)\n", - " }\n", - " weight_list = vector(\"list\", length(gwas_studies))\n", - " names(weight_list) <- gwas_studies\n", - " z_gene <- weight_list # empty list \n", - " z_snp <- weight_list\n", - " \n", - " # get LD snp info table (snp_map) and ld variants \n", - " max_pos <- fread(\"${ld_meta_data}\", data.table=FALSE)\n", - " max_pos <- max(max_pos$end[max_pos[,1]==\"${chrom}\"])\n", - " region_of_interest <- data.frame(chrom = \"${chrom}\", start = 0, end = max_pos+1)\n", - " snp_map <- get_ref_variant_info(\"${ld_meta_data}\", region_of_interest)\n", - " ld_variants <- ifelse(grepl(\"^chr\", snp_map$id), snp_map$id, paste0(\"chr\", snp_map$id))\n", - " ld_variants <- ld_variants[!duplicated(ld_variants)]\n", - "\n", - " regions_data <- get_ctwas_meta_data(\"${ld_meta_data}\")\n", - " region_info <- regions_data$region_info\n", - " LD_map <- regions_data$LD_info\n", - " if (!is.null(${name_suffix})) name <- paste0(\"${name}_\", ${name_suffix}) else name <- \"${name}\"\n", - " if (!file.exists(file.path(outputdir, paste0(name, \".LD_map.rds\")))) saveRDS(LD_map, file.path(outputdir, paste0(name, \".LD_map.rds\")), compress='xz')\n", - " saveRDS(snp_map, file.path(outputdir, paste0(name, \".snp_map.${chrom}.rds\")), compress='xz')\n", - " rm(regions_data, LD_map)\n", - " gc()\n", - " region_map <- c()\n", - " # merge chromosome wide data, weight, z_gene, z_snp from the twas pipeline output of twas.data.rds files \n", - " for (region_dat in c(${_input:r,})){\n", - " ctwas_dat <- merge_context_names(region_dat)\n", - " if (is.null(ctwas_dat)) next \n", - " weight_tmp <- trim_ctwas_variants(ctwas_dat, twas_weight_cutoff=${twas_weight_cutoff}, cs_min_cor=${cs_min_cor},\n", - " min_pip_cutoff=${min_pip_cutoff}, max_num_variants=${max_num_variants})\n", - " if (length(weight_tmp)==0) next \n", - " for (study in names(weight_tmp)) {\n", - " if (!study %in% gwas_studies) next\n", - " weight_list[[study]] <- c(weight_list[[study]], weight_tmp[[study]])\n", - " z_gene[[study]] <- rbind(z_gene[[study]], ctwas_dat$z_gene[[study]])\n", - " z_gene[[study]] <- z_gene[[study]][!is.na(z_gene[[study]]$z) & !is.infinite(z_gene[[study]]$z) & z_gene[[study]]$id %in% names(weight_list[[study]]),] \n", - " }\n", - " region <- str_extract(region_dat, \"chr[0-9XY]+_[0-9]+_[0-9]+\")\n", - " z_genes <- unique(find_data(weight_tmp, c(3, \"molecular_id\")))\n", - " names(z_genes) <- rep(str_extract(region_dat, \"chr[0-9XY]+_[0-9]+_[0-9]+\"), length(z_genes))\n", - " region_map <- c(region_map, z_genes)\n", - " rm(ctwas_dat, weight_tmp, z_genes, region)\n", - " gc()\n", - " }\n", - " z_gene <- Filter(Negate(is.null), z_gene)\n", - " weight_list <- Filter(Negate(is.null), weight_list)\n", - " if (length(z_gene)==0 | length(weight_list)==0) stop(\"Please check input data. No twas z-score value available. \")\n", - " \n", - " for (study in names(weight_list)){\n", - " z_snp[[study]] <- harmonize_gwas(gwas_files[study], query_region=paste0(readr::parse_number(\"${chrom}\"), \":0-\", max_pos+1), ld_variants, c(\"beta\", \"z\"), match_min_prop = 0)\n", - " z_snp[[study]] <- z_snp[[study]][, colnames(z_snp[[study]])[colnames(z_snp[[study]]) %in% c(\"chrom\", \"pos\", \"variant_id\", \"A1\", \"A2\", \"z\", \"beta\", \"n_sample\",\"n_case\", \"n_control\", \"effect_allele_frequency\")]]\n", - " z_snp[[study]] <- z_snp[[study]][, c(\"chrom\", \"pos\", colnames(z_snp[[study]])[!colnames(z_snp[[study]]) %in% c(\"chrom\", \"pos\")])]\n", - " z_snp[['sample_size']][[study]]<- as.integer(unique(na.omit(gwas_meta_data$sample_size[gwas_meta_data$study_id==study])))\n", - " if(length(z_snp[['sample_size']][[study]]!=1) | z_snp[['sample_size']][[study]] <=0 ) {\n", - " stop(\"Please check sample size provided for \", study, \" at --gwas_meta_data. \")\n", - " }\n", - " }\n", - " # if weight variants are trimmed, load LD and re-compute twas z score \n", - " if (${twas_weight_cutoff}!=0 | ${cs_min_cor}!=0 | ${min_pip_cutoff}!=0 | ${max_num_variants}!=Inf ) {\n", - " region_map_list <- split(region_map, names(region_map))\n", - " for (region in names(region_map_list)){\n", - " region_genes <- unique(region_map_list[[region]])\n", - " weight_list_sub <- find_data(weight_list, 2) \n", - " weight_list_sub <- weight_list_sub[gsub( \"\\\\|.*$\", \"\", names(weight_list_sub)) %in% region_genes]\n", - " region_of_interest <- data.frame(chrom = \"${chrom}\", start = min(find_data(weight_list_sub, c(2, \"p0\"))), end = max(find_data(weight_list_sub, c(2, \"p1\"))))\n", - " rm(weight_list_sub);gc()\n", - " LD_list <- load_LD_matrix(\"${ld_meta_data}\", region_of_interest)\n", - " dup_idx <- which(duplicated(LD_list$LD_variants))\n", - " LD_list <- LD_list['LD_matrix'] \n", - " if (length(dup_idx) >= 1) LD_list$LD_matrix <- LD_list$LD_matrix[-dup_idx, -dup_idx]\n", - " for (study in names(weight_list)){\n", - " sub_idx <- which(gsub(\"\\\\|.*$\", \"\", names(weight_list[[study]])) %in% region_genes)\n", - " for (context in names(weight_list[[study]][sub_idx])){\n", - " twas_df <- twas_analysis(weight_list[[study]][[context]]$wgt, z_snp[[study]], LD_list$LD_matrix, rownames(weight_list[[study]][[context]]$wgt))[[1]]\n", - " z_gene[[study]]$z[z_gene[[study]]$id==context &z_gene[[study]]$gwas_study==study] <- twas_df$z \n", - " }\n", - " }\n", - " rm(LD_list);gc() \n", - " }\n", - " rm(LD_list);gc()\n", - " }\n", - "\n", - " # for each gwas study - merge ctwas input data \n", - " for (study in names(weight_list)){\n", - " if (is.null(z_gene[[study]])) next\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " outname <- paste0(name, \".ctwas_region_data.\", study, \".${chrom}\")\n", - " } else {\n", - " contexts <- unique(do.call(c,lapply(z_gene, function(x) unique(x$context))))\n", - " outname <- paste0(name, \".ctwas_region_data.\", study, \".\", contexts, \".${chrom}\")\n", - " names(outname) <- contexts\n", - " }\n", - "\n", - " colnames(z_snp[[study]])[match(\"variant_id\", colnames(z_snp[[study]]))] <-\"id\" \n", - " z_snp[[study]] <- z_snp[[study]][, c(\"id\", \"A1\", \"A2\", \"z\")]\n", - "\n", - " # iterate by study-context pair (single group) or by study itself (multigroup) \n", - " for (group_name in outname){ \n", - " # assemble regions \n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " subset_inx <- names(weight_list[[study]])\n", - " } else {\n", - " context <- names(outname)[which(outname==group_name)]\n", - " if (length(weight_list[[study]][grepl(context, names(weight_list[[study]]))])==0) next\n", - " subset_inx <- which(grepl(context, names(weight_list[[study]])))\n", - " }\n", - "\n", - " res <- assemble_region_data(region_info,\n", - " z_snp[[study]],\n", - " z_gene[[study]],\n", - " weight_list[[study]][subset_inx],\n", - " snp_map,\n", - " thin = ${thin},\n", - " maxSNP = ${maxSNP},\n", - " min_group_size = 1,\n", - " ncore = ${numThreads})\n", - " region_data <- res$region_data\n", - " boundary_genes <- res$boundary_genes\n", - " region_data_file <- file.path(outputdir, paste0(group_name, \".thin\", ${thin}, \".rds\"))\n", - " boundary_gene_file <- gsub(\"region_data\",\"boundary_genes\", region_data_file)\n", - " saveRDS(region_data, region_data_file, compress='xz')\n", - " saveRDS(boundary_genes, boundary_gene_file, compress='xz') \n", - " }\n", - " saveRDS(weight_list[[study]], file.path(outputdir, paste0(name,\".ctwas_weights.\", study, \".${chrom}.rds\")), compress='xz')\n", - " saveRDS(list(z_snp=z_snp[[study]], z_gene=z_gene[[study]], gwas_n=z_snp[['sample_size']][[study]]), \n", - " file.path(outputdir, paste0(name, \".z_gene_snp.\", study, \".${chrom}.rds\")), compress='xz')\n", - " weight_list[[study]] <- NULL\n", - " z_gene[[study]] <- NULL\n", - " z_snp[[study]] <- NULL\n", - " }" + "parameter: skip_assembly = False\n", + "# TwasWeights source (option c): empty -> the upstream twas step's per-gene\n", + "# weights ({cwd}/twas/{name}.{region}.{gene}.twas_weights.rds); override with a\n", + "# list of prebuilt TwasWeights RDS (e.g. the blessed ctwas weights extracted by\n", + "# legacy_ctwas_weights_to_s4.R) when the toy twas weights carry no signal.\n", + "parameter: twas_weights = []\n", + "skip_if(skip_assembly == True, \"Skip [ctwas_1] assemble.\")\n", + "gene_chroms = sorted(set(str(ri[0]) for ri in filtered_region_info if ri[4]))\n", + "ctwas_chrom = (f\"chr{int(chrom)}\" if str(chrom).isdigit() else str(chrom)) if str(chrom) else (gene_chroms[0] if len(gene_chroms) == 1 else \"\")\n", + "stop_if(not ctwas_chrom, f\"Specify --chrom: expected one gene-bearing chromosome, found {gene_chroms}.\")\n", + "ctwas_weights = list(twas_weights) if twas_weights else [f\"{cwd:a}/twas/{name}.{ri[3]}.{g}.twas_weights.rds\" for ri in filtered_region_info if str(ri[0]) == ctwas_chrom for g in sorted(set(ri[4]))]\n", + "ctwas_studies = [s for s in regional_data['GWAS'] if ctwas_chrom in regional_data['GWAS'][s]]\n", + "stop_if(len(ctwas_studies) == 0, f\"No GWAS study covers {ctwas_chrom}.\")\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_inputs.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " outdir=${cwd:a}/ctwas\n", + " mkdir -p \"$outdir\"\n", + " manifest=\"$outdir/${name}.ctwas_manifest.${ctwas_chrom}.tsv\"\n", + "\n", + " # (1) per-LD-block manifest (block enumeration + gene->home-block mapping).\n", + " Rscript code/script/pecotmr_integration/ctwas_manifest.R \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --chrom \"${ctwas_chrom}\" \\\n", + " --xqtl-meta \"${xqtl_meta_data}\" \\\n", + " --twas-weights \"${\",\".join([str(w) for w in ctwas_weights])}\" \\\n", + " --gwas-sumstats-dir \"$outdir\" \\\n", + " --output \"$manifest\"\n", + "\n", + " # (2) one GwasSumStats per LD block (all studies on this chromosome).\n", + " studies=\"${\",\".join(ctwas_studies)}\"\n", + " gwas_tsvs=\"${\",\".join([regional_data['GWAS'][s][ctwas_chrom][0] for s in ctwas_studies])}\"\n", + " tail -n +2 \"$manifest\" | while IFS=$'\\t' read -r region_id region gwas_rds twas_w; do\n", + " Rscript code/script/pecotmr_integration/gwas_sumstats_construct.R \\\n", + " --study \"$studies\" \\\n", + " --gwas-tsv \"$gwas_tsvs\" \\\n", + " --ld-block \"$region\" \\\n", + " --ld-meta \"${ld_meta_data}\" \\\n", + " --output \"$gwas_rds\"\n", + " done\n", + "\n", + " # (3) assemble cTWAS inputs.\n", + " Rscript code/script/pecotmr_integration/ctwas_assemble.R \\\n", + " --manifest \"$manifest\" \\\n", + " --twas-weight-cutoff ${twas_weight_cutoff} \\\n", + " --cs-min-cor ${cs_min_cor} \\\n", + " --min-pip-cutoff ${min_pip_cutoff} \\\n", + " --max-num-variants ${max_num_variants} \\\n", + " --output ${_output}" ] }, { @@ -857,67 +478,34 @@ "outputs": [], "source": [ "[ctwas_2]\n", - "## ctwas_2: estimate global group prior using from all regions \n", + "# cTWAS step 2 (estimate priors): estCtwasParam via ctwas_est.R. Reads the\n", + "# assembled inputs from [ctwas_1] on disk so it can be run independently.\n", + "# --fallback-to-prefit recovers the prefit estimate when the accurate EM\n", + "# cannot be estimated (e.g. the single-gene MWE), mirroring the legacy ctwas_2\n", + "# workaround.\n", "parameter: run_param_est = False\n", "parameter: skip_assembly = False\n", - "parameter: thin=1.0\n", + "parameter: thin = 1.0\n", "parameter: prior_var_structure = \"shared_all\"\n", + "parameter: niter = 50\n", + "# declared for CLI stability; not consumed by the S4 path\n", "parameter: multi_group = True\n", "parameter: numThreads = 8\n", - "parameter: niter = 50\n", - "\n", - "thin=int(thin) if thin == 1.0 else thin\n", - "skip_if(run_param_est == False, \" Skip [ctwas_2] parameter estimation. \" )\n", - "name = f\"{name}_{name_suffix}\" if name_suffix else name\n", - "\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "if multi_group:\n", - " input_pattern = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.chr*.thin{thin}.rds\" # by study\n", - "else:\n", - " input_pattern = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_region_data.*.*.chr*.thin{thin}.rds\" # by study-context pair\n", - "\n", - "input: input_pattern, group_by=\"all\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_param_est.{prior_var_structure}.thin{thin}.stderr\", container = container\n", - " \n", - " library(ctwas)\n", - " library(data.table)\n", - " library(pecotmr)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - " region_data_files <- c(${_input:r,})\n", - " names(region_data_files) <- gsub('^.*${name}.ctwas_region_data.\\\\s*|\\\\s*.chr*.*$', '', region_data_files) \n", - " gwas_studies <- if (length(${gwas_study})!=0 & ${\"TRUE\" if multi_group else \"FALSE\"}) ${gwas_study} else unique(names(region_data_files))\n", - "\n", - " # assess global parameters\n", - " for (study in gwas_studies){\n", - " region_files <- region_data_files[grepl(paste0(study, \".chr\"), region_data_files)]\n", - " region_data <- setNames(lapply(region_files, readRDS), names(region_files))\n", - " contexts <- unique(find_data(region_data, c(4, \"context\")))\n", - " # subset either study specific or study-context specific region data based on multigroup/single group\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}) {\n", - " ## subset region data files that are not context-specific but study-specific \n", - " region_data <- region_data[!sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]\n", - " group_name <- 'multi'\n", - " } else {\n", - " region_data <- region_data[sapply(names(region_data), function(x) any(sapply(contexts, function(i) grepl(i, x))))]\n", - " group_name <- 'single' \n", - " }\n", - " region_data <- lapply(split(region_data, names(region_data)), function(x) do.call(c, unname(x)))\n", - " saveRDS(region_data, file.path(outputdir, paste0(\"${name}.region_data_merged.\", group_name,\".\", study,\".thin${thin}.rds\")), compress='xz') #merge and save into one file to be used later\n", - " if (length(region_data[[study]])==0) stop(\"Study \", study, \" not present in region_data \", file.path(outputdir, paste0(\"${name}.region_data_merged.\", group_name, \".thin${thin}.rds\")))\n", - " group_prior_file <- file.path(outputdir, paste0(\"${name}.ctwas_param_${prior_var_structure}.\", study, \".thin${thin}.rds\"))\n", - " message(\"Start global parameter estimation for \", study, \". \")\n", - " param <- est_param(region_data[[study]],\n", - " group_prior_var_structure = \"${prior_var_structure}\", \n", - " niter_prefit = 3,\n", - " niter = ${niter},\n", - " min_group_size = 10,\n", - " ncore = ${numThreads},\n", - " verbose = TRUE)\n", - " saveRDS(param, group_prior_file, compress='xz')\n", - " region_data[[study]] <- NULL\n", - " }" + "skip_if(run_param_est == False, \"Skip [ctwas_2] parameter estimation.\")\n", + "input: f\"{cwd:a}/ctwas/{name}.ctwas_inputs.rds\"\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_est.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " Rscript code/script/pecotmr_integration/ctwas_est.R \\\n", + " --inputs ${_input} \\\n", + " --thin ${thin} \\\n", + " --niter ${niter} \\\n", + " --group-prior-var-structure ${prior_var_structure} \\\n", + " --min-group-size 1 \\\n", + " --fallback-to-prefit \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}" ] }, { @@ -934,245 +522,42 @@ "outputs": [], "source": [ "[ctwas_3]\n", - "## ctwas_3: perform finemapping for each region \n", - "parameter: thin=1.0\n", + "# cTWAS step 3 (screen + finemap): screenCtwasRegions + finemapCtwasRegions via\n", + "# ctwas_finemap.R. Reads the estimated params from [ctwas_2] on disk so it can\n", + "# be run independently. Emits the per-gene cTWAS fine-mapping result.\n", + "parameter: run_finemapping = False\n", + "parameter: L = 5\n", + "parameter: min_nonSNP_PIP = 0.5\n", + "# Boundary-gene region merging (legacy ctwas_3 merge_regions, default-off): when\n", + "# True, after fine-mapping, each high-PIP boundary gene's adjacent LD blocks are\n", + "# merged and re-fine-mapped via mergeCtwasBoundaryRegions (require-in-CS, like\n", + "# the legacy susie_pip>0.5 & !is.na(cs) selection); maxSNP caps SNPs per merged\n", + "# region.\n", + "parameter: merge_regions = False\n", "parameter: maxSNP = 20000\n", + "# declared for CLI stability; not consumed by the S4 path\n", + "parameter: thin = 1.0\n", "parameter: max_iter = 0\n", - "parameter: run_finemapping = False\n", "parameter: prior_var_structure = \"shared_all\"\n", - "# A list of regions to be subset for screening and fine-mapping, for example: \"10_80126158_82231647\"\n", - "parameter: region_name =[]\n", - "parameter: subset_context=[] # only process specified list of contexts for single group cTWAS\n", - "parameter: numThreads = 4\n", + "parameter: region_name = []\n", + "parameter: subset_context = []\n", "parameter: multi_group = True\n", - "parameter: merge_regions=False\n", - "parameter: L=5\n", - "# p_diff_thresh: p-value cutoff for identifying problematic SNPs with significant difference between observed z-scores and estimated values.\n", - "parameter: p_diff_thresh=5e-8\n", - "# sum of gene PIPs in the region should be larger than this threshold to get selected for finemapping\n", - "parameter: min_nonSNP_PIP=0.5\n", - "# additional name specification for fine-mapping result file names \n", - "parameter: alias=\"NULL\"\n", - "import glob\n", - "\n", - "skip_if(run_finemapping == False, \" Skip [ctwas_3] fine-mapping. \" )\n", - "thin=int(thin) if thin == 1.0 else thin\n", - "name = f\"{name}_{name_suffix}\" if name_suffix else name\n", - "base = f\"{cwd}/{step_name.split('_')[0]}/{name}\"\n", - "\n", - "region_data_file = (\n", - " [f\"{base}.region_data_merged.multi.{study}.thin{thin}.rds\" for study in gwas_study]\n", - " if multi_group\n", - " else [x for x in glob.glob(f\"{cwd:a}/{step_name.split('_')[0]}/{name}.region_data_merged.single.*.thin{thin}.rds\")]\n", - ")\n", - "\n", - "region_info_list = []\n", - "for region in region_name:\n", - " chrom = region.split(\"_\")[0]\n", - " if gwas_study:\n", - " weights = [f\"{base}.ctwas_weights.{study}.{chrom}.rds\" for study in gwas_study]\n", - " zfiles = [f\"{base}.z_gene_snp.{study}.{chrom}.rds\" for study in gwas_study]\n", - " params = [x for study in gwas_study for x in glob.glob(f\"{base}.ctwas_param_{prior_var_structure}.{study}*.thin{thin}.rds\")]\n", - " else:\n", - " weights = [x for x in glob.glob(f\"{base}.ctwas_weights.*.{chrom}.rds\")]\n", - " zfiles = [x for x in glob.glob(f\"{base}.z_gene_snp.*.{chrom}.rds\")]\n", - " params = [x for x in glob.glob(f\"{base}.ctwas_param_{prior_var_structure}.*.thin{thin}.rds\")]\n", - " region_info_list.append({\n", - " \"region_name\": region,\n", - " \"weights\": weights,\n", - " \"zfiles\": zfiles,\n", - " \"params\": params\n", - " })\n", - "gwas_study = 'c(' + ', '.join(f'\"{x}\"' for x in gwas_study) + ')'\n", - "subset_context = 'c(' + ', '.join(f'\"{x}\"' for x in subset_context) + ')'\n", - "input: region_info_list[_index][\"params\"], for_each = \"region_info_list\"\n", - "region_name = region_info_list[_index]['region_name']\n", - "weight_files = region_info_list[_index]['weights']\n", - "z_snp_file = region_info_list[_index]['zfiles']\n", - "params = region_info_list[_index]['params']\n", - "chrom = region_name.split(\"_\")[0]\n", - "depends: region_data_file, f\"{cwd}/{step_name.split('_')[0]}/{name}.snp_map.{chrom}.rds\",f\"{cwd}/{step_name.split('_')[0]}/{name}.LD_map.rds\"\n", - "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads\n", - "R: expand = '${ }', stdout = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stdout\", \n", - "stderr = f\"{cwd:a}/{step_name.split('_')[0]}/{name}.ctwas_finemap_res.{prior_var_structure}.{region_name}.thin{thin}.stderr\", container = container\n", - "\n", - " library(ctwas)\n", - " library(pecotmr)\n", - " library(data.table)\n", - " outputdir = \"${cwd:a}/${step_name.split('_')[0]}\"\n", - "\n", - " gwas_meta_data <- fread(\"${gwas_meta_data}\",data.table=FALSE)\n", - " gwas_studies <- if (length(${gwas_study})!=0) ${gwas_study} else unique(gwas_meta_data$study_id)[,1]\n", - "\n", - " weight_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in weight_files) + ')'}\n", - " z_snp_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in z_snp_file) + ')'}\n", - " param_study_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in params) + ')'}\n", - " names(param_study_files) <- gsub('^.*.ctwas_param_${prior_var_structure}.\\\\s*|\\\\s*.thin${thin}.*$', '', param_study_files)\n", - " param <- setNames(lapply(unname(param_study_files), readRDS), names(param_study_files))\n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}){\n", - " param <- param[!sapply(names(param), function(x) any(sapply(paste0(gwas_studies, \".\"), function(c) grepl(c, x))))] # gwas_study per each prior \n", - " } else {\n", - " param <- param[sapply(names(param), function(x) any(sapply(paste0(gwas_studies, \".\"), function(c) grepl(c, x))))] # gwas_study x context pair per each prior\n", - " param <- param[grepl(${subset_context},names(param))]\n", - " }\n", - "\n", - " region_data_files <- ${'c(' + ', '.join(f'\"{x}\"' for x in region_data_file) + ')'}\n", - " names(weight_files) <- gsub('^.*.ctwas_weights.\\\\s*|\\\\s*.${chrom}.*$', '', weight_files) # gwas study names regardless of single/multigroup\n", - " LD_map <- readRDS(\"${cwd}/${step_name.split('_')[0]}/${name}.LD_map.rds\")\n", - " snp_map <- readRDS(\"${cwd}/${step_name.split('_')[0]}/${name}.snp_map.${chrom}.rds\")\n", - " names(z_snp_files) <- gsub('^.*.z_gene_snp.\\\\s*|\\\\s*.${chrom}.*$', '', z_snp_files)\n", - " alias = ${\"NULL\" if alias == \"NULL\" else f\"'{alias}'\"}\n", - " \n", - " ## loop through gwas studies (multigroup) / gwas_study_context groups (single_group)\n", - " for (study in names(param)){\n", - " region_data <- readRDS(region_data_files[grepl(study, region_data_files)])\n", - " finemap_res_file <- file.path(outputdir, paste0(\"${name}.ctwas_finemap_res.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.tsv.gz\"))\n", - " if (length(alias)!=0) finemap_res_file <- gsub(\"${name}\", \"${name}_${alias}\", finemap_res_file)\n", - " susie_alpha_file <- gsub(\".tsv.gz\", \".rds\", gsub(\"ctwas_finemap_res\", \"ctwas_susie_alpha_res\", finemap_res_file))\n", - " if (nrow(region_data[[study]][[gsub(\"chr\", \"\", \"${region_name}\")]]$z_gene)==0) {\n", - " message(\"No z_gene data available for \", study, \" in ${region_name}. \")\n", - " fwrite(data.frame(), finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " next\n", - " }\n", - " \n", - " gwas_study <- study \n", - " if (${\"TRUE\" if multi_group else \"FALSE\"}){\n", - " weights <- readRDS(weight_files[gwas_study])\n", - " } else { # single group \n", - " context <- gsub( \"\\\\|.*$\", \"\", names(param[[study]][['group_prior']])[1])\n", - " gwas_study <- gsub(paste0(\".\", context),\"\", study)\n", - " weights <- readRDS(weight_files[gwas_study])\n", - " if (length(weights[grepl(context, names(weights))])==0) next # no context specific weight data available \n", - " weights <- weights[which(grepl(context, names(weights)))] # subset\n", - " }\n", - " group_prior <- param[[study]]$group_prior\n", - " group_prior_var <- param[[study]]$group_prior_var \n", - " z_snp_list <- readRDS(z_snp_files[gwas_study])\n", - " z_snp <- z_snp_list$z_snp\n", - " z_gene <- z_snp_list$z_gene\n", - "\n", - " if (${thin} < 1){\n", - " region_data[[study]][gsub(\"chr\", \"\", \"${region_name}\")] <- expand_region_data(region_data[[study]][[gsub(\"chr\", \"\", \"${region_name}\")]],\n", - " snp_map,\n", - " z_snp,\n", - " maxSNP = ${maxSNP},\n", - " ncore = ${numThreads})\n", - " }\n", - " screen_res <- screen_regions(region_data[[study]][gsub(\"chr\", \"\", \"${region_name}\")],\n", - " group_prior = group_prior, group_prior_var = group_prior_var, min_nonSNP_PIP = ${min_nonSNP_PIP}, \n", - " ncore = ${numThreads}, verbose = FALSE, logfile = file.path(outputdir, \n", - " paste0(\"${name}.screen_regions.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.log\")))\n", - " screened_region_data <- screen_res$screened_region_data\n", - " screen_res_file <- paste0(\"${name}.screen_regions.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.rds\")\n", - " if (length(alias)!=0) screen_res_file <- gsub(\"${name}\", \"${name}_${alias}\",screen_res_file)\n", - " saveRDS(screen_res, file.path(outputdir, screen_res_file ))\n", - " if (length(screened_region_data)==0) {\n", - " message(\"No region selected for \", study, \" in ${region_name}. \")\n", - " fwrite(data.frame(), finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " next\n", - " }\n", - " message(\"Screening region completed for \", study, \". \")\n", - "\n", - " # finemap_regions() argument input \n", - " args <- list(screened_region_data, LD_map = LD_map, weights = weights, group_prior = group_prior, \n", - " group_prior_var = group_prior_var, L = ${L}, ncore = ${numThreads}, save_cor = FALSE, LD_format = \"custom\", \n", - " LD_loader_fun = ctwas_ld_loader, snpinfo_loader_fun = ctwas_bimfile_loader, verbose = TRUE, logfile = file.path(outputdir, \n", - " paste0(\"${name}.finemap_res.${prior_var_structure}.${region_name}.\", study, \".thin${thin}.log\")))\n", - " if (as.logical(${max_iter})){\n", - " finemap_res_file <- gsub(study, paste0(study,\"_max_iter${max_iter}\"),finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study,\"_max_iter${max_iter}\"),susie_alpha_file)\n", - " args$max_iter <- ${max_iter}\n", - " }\n", - " # ctwas finemapping \n", - " res <- do.call(finemap_regions, args)\n", - " finemap_res <- res$finemap_res\n", - " finemap_res$pval <- z2p(finemap_res$z)\n", - " susie_alpha_res <- res$susie_alpha_res\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')\n", - "\n", - " # Get LD diagnosis \n", - " ld_diag <- diagnose_LD_mismatch_susie(region_ids = gsub(\"chr\", \"\", \"${region_name}\"), \n", - " z_snp = z_snp,\n", - " LD_map = LD_map, \n", - " gwas_n = z_snp_list$gwas_n, # gwas sample size \n", - " p_diff_thresh = ${p_diff_thresh},\n", - " ncore = ${numThreads},\n", - " LD_format = \"custom\",\n", - " LD_loader_fun = ctwas_ld_loader,\n", - " snpinfo_loader_fun = ctwas_bimfile_loader)\n", - " problematic_genes <- get_problematic_genes(ld_diag$problematic_snps, \n", - " weights, \n", - " finemap_res, \n", - " pip_thresh = 0.5)\n", - " # finemapping with no LD for problematic genes \n", - " problematic_region_ids <- unlist(unique(finemap_res[finemap_res$id %in% problematic_genes, \"region_id\"]))\n", - " if (length(problematic_region_ids) > 0) {\n", - " message(problematic_region_ids)\n", - " rerun_region_data <- screened_region_data[problematic_region_ids] # using already expanded screened_region_data\n", - " res <- finemap_regions_noLD(rerun_region_data, \n", - " group_prior = group_prior,\n", - " group_prior_var = group_prior_var)\n", - " finemap_res <- res$finemap_res\n", - " susie_alpha_res <- res$susie_alpha_res\n", - " message(\"Fine-mapping without LD for region ${region_name} with \", study, \". \")\n", - " finemap_res_file <- gsub(study, paste0(study, \"_update\"), finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study, \"_update\"), susie_alpha_file)\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(susie_alpha_res, susie_alpha_file, compress='xz')\n", - " }\n", - " ld_diag_file <- paste0(\"${name}.ctwas_ld_diag.${prior_var_structure}.${region_name}.\", study,\".thin${thin}.rds\")\n", - " if (length(alias)!=0) ld_diag_file <- gsub(\"${name}\", \"${name}_${alias}\",ld_diag_file)\n", - " ld_diag_plot_file <- gsub(\".ctwas_ld_diag.\", \".ctwas_ld_diag_plot.\", ld_diag_file )\n", - " ld_diag_plot_file <- gsub(\".rds\", \".pdf\", ld_diag_plot_file )\n", - " saveRDS(ld_diag[1:3], file.path(outputdir, ld_diag_file))\n", - " pdf(file.path(outputdir,ld_diag_plot_file), width = 7, height = 7)\n", - " print(ld_diag$plots)\n", - " dev.off()\n", - " message(\"Fine-mapping completed for region ${region_name} with \", study, \". \")\n", - " rm(screened_region_data, ld_diag, susie_alpha_res, res, screen_res)\n", - " gc()\n", - "\n", - " ## region merging for genes in multiple regions \n", - " boundary_genes <- file.path(outputdir, paste0(\"${name}.ctwas_boundary_genes.\", study, \".${chrom}.thin${thin}.rds\"))\n", - " if (file.exists(boundary_genes) & ${\"TRUE\" if merge_regions else \"FALSE\"}){\n", - " boundary_genes <- readRDS(boundary_genes)\n", - " high_PIP_finemap_gene_res <- subset(finemap_res, group != \"SNP\" & susie_pip > 0.5 & !is.na(cs))\n", - " high_PIP_genes <- unique(high_PIP_finemap_gene_res$id)\n", - " selected_boundary_genes <- boundary_genes[boundary_genes$id %in% high_PIP_genes, , drop=FALSE]\n", - " if (nrow(selected_boundary_genes)!=0){\n", - " message(\"Fine-mapping for merged region for ${region_name} with \", study, \". \")\n", - " region_info <- get_ctwas_meta_data(\"${ld_meta_data}\")$region_info\n", - " merge_region_res <- merge_region_data(selected_boundary_genes,\n", - " region_data[[study]],\n", - " region_info = region_info,\n", - " LD_map = LD_map,\n", - " snp_map = snp_map,\n", - " z_snp = z_snp,\n", - " z_gene = z_gene,\n", - " maxSNP = ${maxSNP})\n", - " finemap_merged_regions_res <- finemap_regions(merge_region_res$merged_region_data,\n", - " LD_map = merge_region_res$merged_LD_map,\n", - " weights = weights,\n", - " group_prior = group_prior,\n", - " group_prior_var = group_prior_var,\n", - " save_cor = FALSE,\n", - " LD_format = \"custom\", \n", - " LD_loader_fun = ctwas_ld_loader, \n", - " snpinfo_loader_fun = ctwas_bimfile_loader)\n", - " finemap_res_file <- gsub(study, paste0(study,\"_merged\"),finemap_res_file)\n", - " susie_alpha_file <- gsub(study, paste0(study, \"_merged\"), susie_alpha_file)\n", - " finemap_res <- finemap_merged_regions_res$finemap_res\n", - " finemap_res$pval <- z2p(finemap_res$z)\n", - " fwrite(finemap_res, finemap_res_file, sep = \"\\t\", compress = \"gzip\")\n", - " saveRDS(finemap_merged_regions_res$susie_alpha_res, susie_alpha_file, compress='xz')\n", - " } else {\n", - " message(\"No high pip genes found for merged region for fine-mapping. \")\n", - " }\n", - " }\n", - " rm(merge_region_res,boundary_genes,finemap_res, weights, screen_res)\n", - " gc()\n", - " }" + "parameter: numThreads = 4\n", + "parameter: p_diff_thresh = 5e-8\n", + "parameter: alias = \"NULL\"\n", + "skip_if(run_finemapping == False, \"Skip [ctwas_3] fine-mapping.\")\n", + "input: f\"{cwd:a}/ctwas/{name}.ctwas_est.rds\"\n", + "output: f\"{cwd:a}/ctwas/{name}.ctwas_finemap.rds\"\n", + "task: trunk_workers = 1, trunk_size = job_size, walltime = walltime, mem = mem, cores = numThreads, tags = f\"{step_name}_{_output:bn}\"\n", + "bash: expand = '${ }', stdout = f\"{_output:n}.stdout\", stderr = f\"{_output:n}.stderr\", container = container\n", + " set -e\n", + " Rscript code/script/pecotmr_integration/ctwas_finemap.R \\\n", + " --est ${_input} \\\n", + " --L ${L} \\\n", + " --min-nonsnp-pip ${min_nonSNP_PIP} \\\n", + " ${('--merge-regions --merge-filter-cs --max-snp ' + str(maxSNP)) if merge_regions else ''} \\\n", + " --ncore ${numThreads} \\\n", + " --output ${_output}" ] }, { @@ -1368,4 +753,4 @@ }, "nbformat": 4, "nbformat_minor": 4 -} \ No newline at end of file +} diff --git a/code/script/pecotmr_integration/ctwas_finemap.R b/code/script/pecotmr_integration/ctwas_finemap.R index f3779c4f..1eb95bc3 100644 --- a/code/script/pecotmr_integration/ctwas_finemap.R +++ b/code/script/pecotmr_integration/ctwas_finemap.R @@ -17,6 +17,14 @@ # --L Pass-through (default 5) # --no-filter-L Flag: disable ctwas's internal L >= 1 region screen # --min-nonsnp-pip Pass-through (default 0.5) +# --merge-regions Flag: after fine-mapping, merge boundary genes' +# adjacent LD blocks and re-fine-map the merged +# regions (legacy ctwas_3 merge_regions; default-off) +# --merge-pip-cutoff PIP threshold for selecting which boundary genes to +# merge (default 0.5) +# --merge-filter-cs Flag: require a boundary gene to be in a credible +# set to be selected for merging +# --max-snp Per-merged-region SNP cap (default Inf) # --ncore Pass-through (default 1) # --output Output RDS path (ctwas_sumstats-shape result) @@ -41,6 +49,18 @@ parser <- add_argument(parser, "--no-filter-L", parser <- add_argument(parser, "--min-nonsnp-pip", help = "min_nonSNP_PIP threshold for screen_regions", type = "numeric", default = 0.5) +parser <- add_argument(parser, "--merge-regions", + help = "Flag: merge boundary genes' adjacent regions and re-fine-map (legacy ctwas_3 merge_regions)", + flag = TRUE) +parser <- add_argument(parser, "--merge-pip-cutoff", + help = "PIP threshold for selecting boundary genes to merge", + type = "numeric", default = 0.5) +parser <- add_argument(parser, "--merge-filter-cs", + help = "Flag: require a boundary gene to be in a credible set to be merged", + flag = TRUE) +parser <- add_argument(parser, "--max-snp", + help = "Per-merged-region SNP cap", + type = "numeric", default = Inf) parser <- add_argument(parser, "--ncore", help = "Number of cores", type = "integer", default = 1L) @@ -74,6 +94,18 @@ final <- finemapCtwasRegions( L = argv$L, ncore = argv$ncore) +# Optional step 4: boundary-gene region merging + re-fine-map. +if (argv$merge_regions) { + final <- mergeCtwasBoundaryRegions( + final, + pipThresh = argv$merge_pip_cutoff, + filterCs = argv$merge_filter_cs, + maxSNP = argv$max_snp, + L = argv$L, + ncore = argv$ncore) + cat("Applied boundary-region merging (merge_regions).\n") +} + dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) saveRDS(final, argv$output) cat(sprintf("Wrote finemapCtwasRegions result to %s\n", argv$output)) diff --git a/code/script/pecotmr_integration/ctwas_manifest.R b/code/script/pecotmr_integration/ctwas_manifest.R new file mode 100644 index 00000000..4a7a0dd1 --- /dev/null +++ b/code/script/pecotmr_integration/ctwas_manifest.R @@ -0,0 +1,152 @@ +#!/usr/bin/env Rscript +# ctwas_manifest.R +# +# Build the per-LD-block manifest consumed by ctwas_assemble.R. cTWAS runs +# over the whole-chromosome LD-block grid (every LD-reference block is a SNP +# background "region"; only a few carry gene weights), so this worker: +# +# 1. reads each per-gene TwasWeights RDS to learn which genes have weights +# (the `trait` field) and which file holds each, +# 2. looks up each gene's chromosome + TSS in the xQTL meta table, +# 3. enumerates every LD block on the relevant chromosome(s) from the +# LD-meta TSV, and +# 4. assigns each gene's TwasWeights to its HOME block (the block whose +# [start, end) contains the gene's TSS) — assembleCtwasInputs uses a +# global GWAS-variant union so weight variants that straddle the block +# boundary still survive. +# +# The emitted manifest has one row per LD block, with the per-block +# GwasSumStats RDS path the caller is expected to build (region column drives +# that fan-out) and a (possibly empty) comma-separated TwasWeights list. +# +# NOTE: no data-layout path is hardcoded. The TwasWeights paths, meta tables, +# and the GwasSumStats output directory all come from arguments; the caller +# decides whether the weights are the upstream twas-step output or a +# substituted set. +# +# Inputs: +# --ld-meta LD-meta TSV (#chr/start/end/path); rows are LD blocks +# --xqtl-meta xQTL meta TSV (region_id = gene, #chr, TSS, ...) +# --twas-weights Comma-separated per-gene TwasWeights RDS paths +# --gwas-sumstats-dir Directory the per-block GwasSumStats RDS live in +# (path = /.gwas_sumstats.rds) +# --chrom Optional chromosome filter (e.g. "22" or "chr22"); +# default: every chromosome that carries a weighted gene +# --output Output manifest TSV +# +# Output columns: region_id, region, gwas_sumstats_rds, twas_weights_rds + +suppressPackageStartupMessages({ + library(argparser) + library(pecotmr) +}) + +p <- arg_parser("Build the per-LD-block cTWAS manifest") +p <- add_argument(p, "--ld-meta", type = "character", + help = "LD-meta TSV (#chr/start/end/path)") +p <- add_argument(p, "--xqtl-meta", type = "character", + help = "xQTL meta TSV (region_id = gene, #chr, TSS)") +p <- add_argument(p, "--twas-weights", type = "character", + help = "Comma-separated per-gene TwasWeights RDS paths") +p <- add_argument(p, "--gwas-sumstats-dir", type = "character", + help = "Directory holding the per-block GwasSumStats RDS") +p <- add_argument(p, "--chrom", type = "character", default = "", + help = "Optional chromosome filter (default: chroms with genes)") +p <- add_argument(p, "--output", type = "character", help = "Output manifest TSV") +argv <- parse_args(p) + +splitCsv <- function(s) { + if (is.null(s) || is.na(s) || !nzchar(s)) return(character(0)) + trimws(strsplit(s, ",", fixed = TRUE)[[1L]]) +} +normChr <- function(x) sub("^chr", "", as.character(x), ignore.case = TRUE) + +weightPaths <- splitCsv(argv$twas_weights) +if (length(weightPaths) == 0L) + stop("--twas-weights lists no TwasWeights RDS paths.") + +# ---- gene -> weights file (read each RDS; trait field is the gene id) ------- +geneToWeight <- list() +for (wp in weightPaths) { + if (!file.exists(wp)) stop("TwasWeights RDS not found: ", wp) + tw <- readRDS(wp) + if (!methods::is(tw, "TwasWeights")) + stop("Not a TwasWeights RDS: ", wp) + for (g in unique(as.character(tw$trait))) { + geneToWeight[[g]] <- union(geneToWeight[[g]], wp) + } +} +genes <- names(geneToWeight) + +# ---- gene -> (chrom, TSS) from the xQTL meta ------------------------------- +xqtl <- read.table(argv$xqtl_meta, header = TRUE, sep = "\t", + stringsAsFactors = FALSE, check.names = FALSE, + comment.char = "") +xchr <- intersect(c("#chr", "#chrom", "chr", "chrom"), names(xqtl))[1L] +if (is.na(xchr) || !all(c("region_id", "TSS") %in% names(xqtl))) + stop("--xqtl-meta needs region_id, TSS and a chromosome column; got: ", + paste(names(xqtl), collapse = ", ")) +xqtl <- xqtl[!duplicated(xqtl$region_id), , drop = FALSE] +geneChr <- setNames(normChr(xqtl[[xchr]]), xqtl$region_id) +geneTss <- setNames(suppressWarnings(as.integer(xqtl$TSS)), xqtl$region_id) + +missing <- setdiff(genes, names(geneChr)) +if (length(missing) > 0L) + stop("Genes have TwasWeights but are absent from --xqtl-meta: ", + paste(missing, collapse = ", ")) + +# Chromosomes in scope: the --chrom filter, else every chrom carrying a gene. +chromsWithGenes <- unique(geneChr[genes]) +chroms <- if (nzchar(argv$chrom)) normChr(argv$chrom) else chromsWithGenes +if (nzchar(argv$chrom)) + genes <- genes[geneChr[genes] %in% chroms] + +# ---- enumerate LD blocks for the in-scope chromosomes ---------------------- +ld <- read.table(argv$ld_meta, header = TRUE, sep = "\t", + stringsAsFactors = FALSE, check.names = FALSE, + comment.char = "") +ldChrCol <- intersect(c("#chr", "#chrom", "chr", "chrom"), names(ld))[1L] +if (is.na(ldChrCol)) + stop("--ld-meta needs a chromosome column; got: ", + paste(names(ld), collapse = ", ")) +ld$.chr <- normChr(ld[[ldChrCol]]) +ld$.start <- suppressWarnings(as.integer(ld$start)) +ld$.end <- suppressWarnings(as.integer(ld$end)) +ld <- ld[ld$.chr %in% chroms & !is.na(ld$.start) & !is.na(ld$.end), , drop = FALSE] +ld <- ld[!duplicated(ld[, c(".chr", ".start", ".end")]), , drop = FALSE] +ld <- ld[order(ld$.chr, ld$.start), , drop = FALSE] +if (nrow(ld) < 2L) + stop("Fewer than two LD blocks in scope (got ", nrow(ld), + "); cTWAS's EM needs multi-block context.") + +region <- sprintf("chr%s:%d-%d", ld$.chr, ld$.start, ld$.end) +region_id <- gsub("[:-]", "_", region) +gwas_rds <- file.path(argv$gwas_sumstats_dir, + paste0(region_id, ".gwas_sumstats.rds")) + +# ---- assign each gene to its home block (TSS in [start, end)) -------------- +twas_weights_rds <- rep("", nrow(ld)) +for (g in genes) { + hit <- which(ld$.chr == geneChr[[g]] & + geneTss[[g]] >= ld$.start & geneTss[[g]] < ld$.end) + if (length(hit) == 0L) { + warning("Gene ", g, " (TSS ", geneTss[[g]], ") falls in no LD block; skipped.") + next + } + h <- hit[[1L]] + cur <- splitCsv(twas_weights_rds[[h]]) + twas_weights_rds[[h]] <- paste(union(cur, geneToWeight[[g]]), collapse = ",") +} + +placed <- sum(nzchar(twas_weights_rds)) +if (placed == 0L) + stop("No gene was placed into any LD block; check --xqtl-meta TSS vs --ld-meta.") + +out <- data.frame(region_id = region_id, region = region, + gwas_sumstats_rds = gwas_rds, + twas_weights_rds = twas_weights_rds, + stringsAsFactors = FALSE) +dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) +write.table(out, file = argv$output, sep = "\t", quote = FALSE, row.names = FALSE) +cat(sprintf("Wrote cTWAS manifest: %d LD block(s), %d gene-bearing, to %s\n", + nrow(out), placed, argv$output)) diff --git a/code/script/pecotmr_integration/gwas_sumstats_construct.R b/code/script/pecotmr_integration/gwas_sumstats_construct.R index 2abdd353..360df60a 100644 --- a/code/script/pecotmr_integration/gwas_sumstats_construct.R +++ b/code/script/pecotmr_integration/gwas_sumstats_construct.R @@ -1,41 +1,52 @@ #!/usr/bin/env Rscript # gwas_sumstats_construct.R # -# Build one pecotmr::GwasSumStats over a single LD block from a GWAS -# summary-statistics TSV and an LD-reference meta file, run +# Build a pecotmr::GwasSumStats over a single analysis region from one or +# more GWAS summary-statistics TSVs and an LD-reference meta file, run # summaryStatsQc(), and serialize to RDS. The resulting RDS is the -# per-(study, block) input to twasWeightsPipeline downstream consumers, +# per-(region) GWAS input to twasWeightsPipeline downstream consumers, # causalInferencePipeline, and ctwasPipeline. # +# Multiple studies: pass --study / --gwas-tsv (and optionally +# --column-mapping) as comma-separated lists of equal length. One +# GwasSumStats entry is built per study; all studies share the region's +# LD sketch (a population LD panel is not study-specific). +# causalInferencePipeline then loops the studies and emits one row per +# (qtl tuple, gwasStudy). +# +# LD resolution: the region need NOT coincide with a single LD-meta row. +# All LD-meta rows overlapping the region are collected and their genotype +# payloads de-duplicated. The common one-file-per-chromosome layout (every +# overlapping row points at the same prefix) resolves to a single handle +# that already spans the region; genuinely separate per-block payloads are +# a hard error (multi-file LD merge is not done here). +# # Inputs: -# --study Single study identifier -# --gwas-tsv Path to a tabix-indexed (or plain) GWAS TSV +# --study Study identifier(s), comma-separated +# --gwas-tsv GWAS TSV path(s), comma-separated, one per study. # Standard columns (with hardcoded aliases when no # --column-mapping is supplied): # #chrom|chrom|chr, pos, variant_id|SNP, # A1, A2, z|Z, n_sample|N # Optional: effect_allele_frequency, p, beta, se -# --column-mapping Optional YAML file mapping standard column names -# to this study's actual column names. Keys are -# the standard names (chrom, pos, variant_id, A1, -# A2, z, n_sample, and optionally beta, se, p, maf, -# info); values are the column name as it appears -# in the TSV. Required entries: chrom, pos, -# variant_id, A1, A2, z, n_sample. Used in place -# of the hardcoded alias fallback. -# --ld-block Genomic interval as chr:start-end (one LD block) +# --column-mapping Optional YAML file(s) mapping standard column names +# to a study's actual column names. Comma-separated: +# empty (none), one (applied to every study), or one +# per study. Keys are the standard names (chrom, pos, +# variant_id, A1, A2, z, n_sample, and optionally beta, +# se, p, maf, info); values are the column name as it +# appears in the TSV. Required: chrom, pos, variant_id, +# A1, A2, z, n_sample. +# --ld-block Analysis region as chr:start-end # --ld-meta LD-meta TSV (#chr, start, end, path) # --genome Genome build label (e.g. "GRCh38"). Default "GRCh38" # --qc-method LD-mismatch QC method passed to # summaryStatsQc(zMismatchQc = ...). One of "none" # (default), "slalom", "dentist". # --impute Flag: run RAISS sumstat imputation in -# summaryStatsQc(impute = TRUE) against the LD -# sketch. -# --qc-args Optional JSON object of extra named kwargs -# spliced into summaryStatsQc(). Anything in the -# summaryStatsQc() signature is reachable here -# (e.g. '{"mafCutoff":0.01,"imputeOpts":{"rcond":0.005}}'). +# summaryStatsQc(impute = TRUE) against the LD sketch. +# --qc-args Optional JSON object of extra named kwargs spliced +# into summaryStatsQc(). # --output Output RDS path suppressPackageStartupMessages({ @@ -48,15 +59,15 @@ suppressPackageStartupMessages({ library(yaml) }) -parser <- arg_parser("Build a per-LD-block GwasSumStats RDS") +parser <- arg_parser("Build a per-region (multi-study) GwasSumStats RDS") parser <- add_argument(parser, "--study", - help = "Single study identifier", + help = "Study identifier(s), comma-separated", type = "character") parser <- add_argument(parser, "--gwas-tsv", - help = "Path to GWAS summary-statistics TSV", + help = "GWAS TSV path(s), comma-separated (one per study)", type = "character") parser <- add_argument(parser, "--ld-block", - help = "LD block as chr:start-end", + help = "Analysis region as chr:start-end", type = "character") parser <- add_argument(parser, "--ld-meta", help = "Path to LD-meta TSV", @@ -65,7 +76,7 @@ parser <- add_argument(parser, "--genome", help = "Genome build label", type = "character", default = "GRCh38") parser <- add_argument(parser, "--column-mapping", - help = "Optional YAML file mapping standard column names to this study's column names", + help = "Optional YAML mapping file(s), comma-separated (none / one-for-all / one-per-study)", type = "character", default = "") parser <- add_argument(parser, "--qc-method", help = "LD-mismatch QC: 'none' (default), 'slalom', or 'dentist'", @@ -83,17 +94,34 @@ parser <- add_argument(parser, "--output", help = "Output RDS path", type = "character") argv <- parse_args(parser) -# ----- Resolve column-name lookup ------------------------------------------- -# Either pull from a user-supplied YAML mapping (per-study explicit names) -# or fall back to the hardcoded alias lists. The mapping must cover every -# required standard name; optional names (BETA/SE/P/MAF/INFO) pass through -# when present, regardless of source. -column_mapping <- if (nzchar(argv$column_mapping) && argv$column_mapping != ".") { - if (!file.exists(argv$column_mapping)) - stop("--column-mapping file not found: ", argv$column_mapping) - yaml::read_yaml(argv$column_mapping) +splitCsv <- function(s) { + if (is.null(s) || !nzchar(s)) return(character(0)) + trimws(strsplit(s, ",", fixed = TRUE)[[1L]]) +} + +# ----- Resolve study / gwas-tsv / column-mapping lists ---------------------- +studies <- splitCsv(argv$study) +gwasTsvs <- splitCsv(argv$gwas_tsv) +if (length(studies) == 0L || length(gwasTsvs) == 0L) + stop("--study and --gwas-tsv are both required.") +if (length(gwasTsvs) != length(studies)) + stop("--gwas-tsv supplies ", length(gwasTsvs), " path(s) but --study lists ", + length(studies), " study/ies; they must match.") +if (anyDuplicated(studies)) + stop("--study has duplicate identifiers: ", + paste(studies[duplicated(studies)], collapse = ", ")) + +mappings <- splitCsv(argv$column_mapping) +mappings <- if (length(mappings) == 0L) { + rep("", length(studies)) +} else if (length(mappings) == 1L) { + rep(mappings, length(studies)) +} else if (length(mappings) == length(studies)) { + mappings } else { - NULL + stop("--column-mapping must be empty, a single file (applied to every ", + "study), or one file per study (got ", length(mappings), + " for ", length(studies), " studies).") } # ----- Parse --qc-args JSON ------------------------------------------------- @@ -110,8 +138,7 @@ qc_extra <- if (nzchar(argv$qc_args) && argv$qc_args != "." && } else { list() } -# Reject collisions between explicit flags and --qc-args (zMismatchQc / -# impute) to avoid silent override surprises. +# Reject collisions between explicit flags and --qc-args. clash <- intersect(names(qc_extra), c("zMismatchQc", "impute")) if (length(clash) > 0L) stop("--qc-args sets ", paste(clash, collapse = ", "), @@ -129,136 +156,148 @@ parse_region <- function(s) { block <- parse_region(argv$ld_block) -# ----- Read GWAS TSV and subset to the LD block ----------------------------- -gwas <- read.table(if (grepl("\\.gz$", argv$gwas_tsv)) gzfile(argv$gwas_tsv) - else argv$gwas_tsv, - header = TRUE, sep = "\t", - stringsAsFactors = FALSE, check.names = FALSE, - comment.char = "") - -pick <- function(opts, where) intersect(opts, names(where))[1L] +# ----- Build the LD-reference GenotypeHandle spanning the region ------------ +# Resolve every LD-meta row overlapping the region and de-duplicate the +# genotype payloads. The one-file-per-chromosome layout (all overlapping +# rows share a prefix) gives a single handle that already covers the region; +# pecotmr's GenotypeHandle(ldMeta=…, region=…) round-trips the meta path as +# the data path (an upstream bug) and rejects multi-row regions, so we read +# the meta TSV ourselves and point a constructor at the resolved payload. +buildLdHandle <- function(ld_meta, block) { + ld_meta_dir <- dirname(normalizePath(ld_meta)) + ld_meta_df <- read.table(ld_meta, header = TRUE, sep = "\t", + stringsAsFactors = FALSE, check.names = FALSE, + comment.char = "") + ld_chr_col <- intersect(c("#chr", "#chrom", "chr", "chrom"), + names(ld_meta_df))[1L] + if (is.na(ld_chr_col)) + stop("Could not find a chromosome column in ", ld_meta, + " (expected one of '#chr' / '#chrom' / 'chr' / 'chrom'); got: ", + paste(names(ld_meta_df), collapse = ", ")) + ld_chr_norm <- sub("^chr", "", as.character(ld_meta_df[[ld_chr_col]]), + ignore.case = TRUE) + block_chr_norm <- sub("^chr", "", block$chr, ignore.case = TRUE) + ld_start <- suppressWarnings(as.integer(ld_meta_df$start)) + ld_end <- suppressWarnings(as.integer(ld_meta_df$end)) + # `start = 0, end = 0` is the meta convention for "whole chromosome". + whole_chrom <- !is.na(ld_start) & !is.na(ld_end) & + ld_start == 0L & ld_end == 0L + # Strict overlap (no shared-endpoint match) so adjacent blocks that abut + # the region are not pulled in. + overlaps <- which(ld_chr_norm == block_chr_norm & + (whole_chrom | + (ld_start < block$end & ld_end > block$start))) + if (length(overlaps) == 0L) + stop("No LD-meta row overlaps ", argv$ld_block, " in ", ld_meta, ".") + ld_prefixes <- unique(ld_meta_df$path[overlaps]) + if (length(ld_prefixes) > 1L) + stop("Region ", argv$ld_block, " overlaps LD-meta rows pointing at ", + length(ld_prefixes), " distinct genotype payloads (", + paste(ld_prefixes, collapse = ", "), "). Multi-file LD merge is ", + "not supported here; restrict the region to a single payload.") + ld_prefix <- ld_prefixes[[1L]] + if (!startsWith(ld_prefix, "/")) + ld_prefix <- file.path(ld_meta_dir, ld_prefix) -# When the user provides a column mapping, use it directly; otherwise -# fall back to the hardcoded alias lists below. -resolve_col <- function(std, fallback) { - if (!is.null(column_mapping) && !is.null(column_mapping[[std]])) { - named <- column_mapping[[std]] - if (!(named %in% names(gwas))) - stop("--column-mapping['", std, "'] = '", named, - "' is not a column in ", argv$gwas_tsv) - return(named) + # Detect data format from companion file extensions. + if (file.exists(paste0(ld_prefix, ".pgen"))) { + GenotypeHandle(plink2Prefix = ld_prefix) + } else if (file.exists(paste0(ld_prefix, ".bed"))) { + GenotypeHandle(plink1Prefix = ld_prefix) + } else if (file.exists(paste0(ld_prefix, ".gds"))) { + GenotypeHandle(path = paste0(ld_prefix, ".gds")) + } else if (file.exists(paste0(ld_prefix, ".vcf.gz"))) { + GenotypeHandle(path = paste0(ld_prefix, ".vcf.gz")) + } else { + stop("Could not find a recognised genotype payload at LD-meta prefix: ", + ld_prefix, " (looked for .pgen / .bed / .gds / .vcf.gz)") } - pick(fallback, gwas) } -chr_col <- resolve_col("chrom", c("#chrom", "chrom", "chr")) -pos_col <- resolve_col("pos", c("pos", "position", "BP")) -snp_col <- resolve_col("variant_id", c("variant_id", "SNP", "rsid")) -a1_col <- resolve_col("A1", c("A1", "a1")) -a2_col <- resolve_col("A2", c("A2", "a2")) -z_col <- resolve_col("z", c("z", "Z")) -n_col <- resolve_col("n_sample", c("n_sample", "N", "n")) -if (any(is.na(c(chr_col, pos_col, snp_col, a1_col, a2_col, z_col, n_col)))) - stop("--gwas-tsv missing one of required columns ", - "(chrom/pos/variant_id/A1/A2/z/n_sample) in: ", argv$gwas_tsv, - if (!is.null(column_mapping)) - " — check that every required key is present in --column-mapping" - else "") +ld_handle <- buildLdHandle(argv$ld_meta, block) -# Normalise chromosome label to match the LD block's -chrom_vals <- as.character(gwas[[chr_col]]) -if (!startsWith(chrom_vals[[1L]], "chr")) - chrom_vals <- paste0("chr", chrom_vals) +# ----- Build one GwasSumStats entry GRanges per study ---------------------- +buildEntryGr <- function(gwas_tsv, mapping_path, block) { + column_mapping <- if (nzchar(mapping_path) && mapping_path != ".") { + if (!file.exists(mapping_path)) + stop("--column-mapping file not found: ", mapping_path) + yaml::read_yaml(mapping_path) + } else { + NULL + } -pos_vals <- as.integer(gwas[[pos_col]]) -keep <- chrom_vals == block$chr & pos_vals >= block$start & pos_vals <= block$end -sub <- gwas[keep, , drop = FALSE] -if (nrow(sub) == 0L) - stop("No GWAS variants fall in LD block ", argv$ld_block, - " (after chromosome normalisation).") + gwas <- read.table(if (grepl("\\.gz$", gwas_tsv)) gzfile(gwas_tsv) + else gwas_tsv, + header = TRUE, sep = "\t", + stringsAsFactors = FALSE, check.names = FALSE, + comment.char = "") -# ----- Build the LD-reference GenotypeHandle for the block ------------------ -# Resolve the LD-meta row covering this block directly: pecotmr's -# GenotypeHandle(ldMeta=…, region=…) currently round-trips the meta file -# path as the data path (an upstream bug), so we read the meta TSV -# ourselves and call the appropriate GenotypeHandle constructor. -ld_meta_dir <- dirname(normalizePath(argv$ld_meta)) -ld_meta_df <- read.table(argv$ld_meta, header = TRUE, sep = "\t", - stringsAsFactors = FALSE, check.names = FALSE, - comment.char = "") -# Header may be "#chr", "#chrom", "chr", or "chrom". -ld_chr_col <- intersect(c("#chr", "#chrom", "chr", "chrom"), - names(ld_meta_df))[1L] -if (is.na(ld_chr_col)) - stop("Could not find a chromosome column in ", argv$ld_meta, - " (expected one of '#chr' / '#chrom' / 'chr' / 'chrom'); got: ", - paste(names(ld_meta_df), collapse = ", ")) -# Normalize both sides to the bare chromosome label (no "chr" prefix). -ld_chr_norm <- sub("^chr", "", - as.character(ld_meta_df[[ld_chr_col]]), - ignore.case = TRUE) -block_chr_norm <- sub("^chr", "", block$chr, ignore.case = TRUE) -# `start = 0, end = 0` is the meta convention for "whole chromosome" — any -# block on that chromosome is covered. -ld_start <- suppressWarnings(as.integer(ld_meta_df$start)) -ld_end <- suppressWarnings(as.integer(ld_meta_df$end)) -whole_chrom <- !is.na(ld_start) & !is.na(ld_end) & - ld_start == 0L & ld_end == 0L -covers <- which(ld_chr_norm == block_chr_norm & - (whole_chrom | - (ld_start <= block$start & ld_end >= block$end))) -if (length(covers) == 0L) - stop("No LD-meta row in ", argv$ld_meta, " fully covers ", argv$ld_block) -if (length(covers) > 1L) - stop("Multiple LD-meta rows cover ", argv$ld_block, - "; restrict to a single LD block.") -ld_prefix <- ld_meta_df$path[covers[[1L]]] -if (!startsWith(ld_prefix, "/")) - ld_prefix <- file.path(ld_meta_dir, ld_prefix) + pick <- function(opts, where) intersect(opts, names(where))[1L] + resolve_col <- function(std, fallback) { + if (!is.null(column_mapping) && !is.null(column_mapping[[std]])) { + named <- column_mapping[[std]] + if (!(named %in% names(gwas))) + stop("--column-mapping['", std, "'] = '", named, + "' is not a column in ", gwas_tsv) + return(named) + } + pick(fallback, gwas) + } + chr_col <- resolve_col("chrom", c("#chrom", "chrom", "chr")) + pos_col <- resolve_col("pos", c("pos", "position", "BP")) + snp_col <- resolve_col("variant_id", c("variant_id", "SNP", "rsid")) + a1_col <- resolve_col("A1", c("A1", "a1")) + a2_col <- resolve_col("A2", c("A2", "a2")) + z_col <- resolve_col("z", c("z", "Z")) + n_col <- resolve_col("n_sample", c("n_sample", "N", "n")) -# Detect data format from companion file extensions -if (file.exists(paste0(ld_prefix, ".pgen"))) { - ld_handle <- GenotypeHandle(plink2Prefix = ld_prefix) -} else if (file.exists(paste0(ld_prefix, ".bed"))) { - ld_handle <- GenotypeHandle(plink1Prefix = ld_prefix) -} else if (file.exists(paste0(ld_prefix, ".gds"))) { - ld_handle <- GenotypeHandle(path = paste0(ld_prefix, ".gds")) -} else if (file.exists(paste0(ld_prefix, ".vcf.gz"))) { - ld_handle <- GenotypeHandle(path = paste0(ld_prefix, ".vcf.gz")) -} else { - stop("Could not find a recognised genotype payload at LD-meta prefix: ", ld_prefix, - " (looked for .pgen / .bed / .gds / .vcf.gz)") -} + if (any(is.na(c(chr_col, pos_col, snp_col, a1_col, a2_col, z_col, n_col)))) + stop("--gwas-tsv missing one of required columns ", + "(chrom/pos/variant_id/A1/A2/z/n_sample) in: ", gwas_tsv, + if (!is.null(column_mapping)) + " — check that every required key is present in --column-mapping" + else "") + + # Normalise chromosome label to match the region's. + chrom_vals <- as.character(gwas[[chr_col]]) + if (!startsWith(chrom_vals[[1L]], "chr")) + chrom_vals <- paste0("chr", chrom_vals) + pos_vals <- as.integer(gwas[[pos_col]]) + keep <- chrom_vals == block$chr & pos_vals >= block$start & + pos_vals <= block$end + sub <- gwas[keep, , drop = FALSE] + if (nrow(sub) == 0L) + stop("No GWAS variants from ", gwas_tsv, " fall in region ", + argv$ld_block, " (after chromosome normalisation).") -# ----- Build GwasSumStats entry GRanges ------------------------------------- -entry_gr <- GRanges( - seqnames = chrom_vals[keep], - ranges = IRanges(start = pos_vals[keep], width = 1L)) -mcols(entry_gr) <- DataFrame( - SNP = as.character(sub[[snp_col]]), - A1 = as.character(sub[[a1_col]]), - A2 = as.character(sub[[a2_col]]), - Z = as.numeric(sub[[z_col]]), - N = as.integer(sub[[n_col]])) -# Optional columns when present -opt_cols <- c(beta = "beta", se = "se", p = c("p", "pvalue"), - maf = c("effect_allele_frequency", "maf", "MAF"), - info = c("info", "INFO")) -for (slot in c("BETA", "SE", "P", "MAF", "INFO")) { - src <- intersect(c(slot, tolower(slot), - if (slot == "MAF") c("effect_allele_frequency"), - if (slot == "P") c("pvalue", "p")), - names(sub))[1L] - if (!is.na(src)) { - mcols(entry_gr)[[slot]] <- if (slot == "N") as.integer(sub[[src]]) - else as.numeric(sub[[src]]) + entry_gr <- GRanges( + seqnames = chrom_vals[keep], + ranges = IRanges(start = pos_vals[keep], width = 1L)) + mcols(entry_gr) <- DataFrame( + SNP = as.character(sub[[snp_col]]), + A1 = as.character(sub[[a1_col]]), + A2 = as.character(sub[[a2_col]]), + Z = as.numeric(sub[[z_col]]), + N = as.integer(sub[[n_col]])) + # Optional columns when present (hardcoded aliases). + for (slot in c("BETA", "SE", "P", "MAF", "INFO")) { + src <- intersect(c(slot, tolower(slot), + if (slot == "MAF") c("effect_allele_frequency"), + if (slot == "P") c("pvalue", "p")), + names(sub))[1L] + if (!is.na(src)) + mcols(entry_gr)[[slot]] <- as.numeric(sub[[src]]) } + entry_gr } +entries <- lapply(seq_along(studies), function(k) + buildEntryGr(gwasTsvs[[k]], mappings[[k]], block)) + # ----- Construct + QC + save ------------------------------------------------ gss <- GwasSumStats( - study = argv$study, - entry = list(entry_gr), + study = studies, + entry = entries, genome = argv$genome, ldSketch = ld_handle) gss_out <- if (argv$skip_qc) { @@ -274,6 +313,7 @@ gss_out <- if (argv$skip_qc) { dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) saveRDS(gss_out, argv$output) -cat(sprintf("Wrote %sGwasSumStats for study '%s' over %s (%d variants in) to %s\n", +cat(sprintf("Wrote %sGwasSumStats (%d stud%s: %s) over %s to %s\n", if (argv$skip_qc) "(skip-QC) " else "QC'd ", - argv$study, argv$ld_block, length(entry_gr), argv$output)) + length(studies), if (length(studies) == 1L) "y" else "ies", + paste(studies, collapse = ","), argv$ld_block, argv$output)) diff --git a/code/script/pecotmr_integration/legacy_enloc_finemap_convert.R b/code/script/pecotmr_integration/legacy_enloc_finemap_convert.R new file mode 100644 index 00000000..41c3c04b --- /dev/null +++ b/code/script/pecotmr_integration/legacy_enloc_finemap_convert.R @@ -0,0 +1,346 @@ +#!/usr/bin/env Rscript +# legacy_enloc_finemap_convert.R +# +# Bridge for migrating the legacy SuSiE-enloc fine-mapping RDS files to the +# new pecotmr S4 path, so the enrichment + coloc cells of SuSiE_enloc.ipynb +# can call qtl_enrichment.R / coloc.R (which consume S4 +# QtlFineMappingResult / GwasFineMappingResult collections). +# +# Two modes, each emitting ONE combined collection RDS: +# --mode qtl : a QtlFineMappingResult spanning every (study, context) +# fit found in the per-study `*.univariate_susie_twas_weights.rds` +# files (one entry per (study, context, trait=gene, method="susie")). +# --mode gwas : a GwasFineMappingResult spanning every (gwas_study, block) +# fit found in the per-block `*.univariate_susie_rss.rds` files +# (one entry per (gwas_study, region_id=block, method="susie_rss")). +# +# Legacy inner shape (shared by both modes), per fit: +# $variant_names all variant ids (e.g. "chr1:20520132:G:C" for qtl, +# "1:17351816:A:C" for gwas), one per column of the fit. +# $susie_result_trimmed a SuSiE object: $pip (over all variants, UNNAMED), +# $sets ($cs list of per-effect variant-index vectors, +# NULL when no credible set), $alpha / $mu / $mu2 / +# $lbf_variable (L x nvariants), $V (per-effect prior +# variance). +# $top_loci a small canonical-top-loci df (NOT used here; we +# build topLoci over ALL variants so getTopLoci's +# posterior projection has the full per-variant view). +# $sumstats betahat/sebetahat (qtl) over all variants (UNNAMED, +# aligned by index to variant_names); GWAS carries z. +# +# qtl files: RDS[[gene]][[context]] = inner +# gwas files: RDS[[block]][[gwas_study]]$RSS_QC_RAISS_imputed = inner +# (some [[block]][[gwas_study]] are empty list() -> skipped) +# +# FIELD-MAPPING into the canonical topLoci (consumed via getTopLoci / +# .projectPosteriorView, which sources variant_id/chrom/pos/A1/A2, pip, +# beta<-posterior_mean, se<-posterior_sd, and passes cs_95 through): +# variant_id <- variant_names (verbatim; chr-prefix convention preserved; +# colocPipeline/qtlEnrichmentPipeline reconcile qtl vs gwas +# conventions via alignVariantNames) +# chrom/pos/A1/A2 <- parsed from chr:pos:A1:A2 +# pip <- susie_result_trimmed$pip (set as names on the fit too, +# so the FineMappingEntry drift check passes) +# posterior_mean <- colSums(alpha * mu) [proper SuSiE posterior] +# posterior_sd <- sqrt(pmax(colSums(alpha*mu2) - posterior_mean^2, 0)) +# cs_95 <- "L" for variants in credible set k, "0" otherwise +# (from susie_result_trimmed$sets$cs; "0" everywhere when NULL) +# susieFit is kept as the full susie_result_trimmed (coloc needs lbf_variable/ +# sets/V; enrichment needs alpha/pip/V). +# +# LD-SKETCH: the QtlFineMappingResult is emitted with ldSketch = NULL +# (individual-level fits). The GwasFineMappingResult MUST carry a non-NULL +# GenotypeHandle ldSketch -- qtlEnrichmentPipeline hard-requires it -- so a +# minimal placeholder GenotypeHandle is attached. It is never inspected for +# content: .requireMatchingLdSketches short-circuits the moment the QTL side +# is NULL, so no real genotype panel is needed for this MWE bridge. +# +# OBJECT PATHS: the fit / variant-names live at configurable nested paths +# inside `inner` (qtl) or `rds[[block]]` (gwas), supplied via --finemapping-obj +# / --varname-obj (each a single string of space-separated path components). +# QTL : inner = rds[[gene]][[context]]; the obj path is applied to `inner`, +# e.g. "preset_variants_result susie_result_trimmed" reaches the legacy +# PRESET-subset fit (NOT inner$susie_result_trimmed, the full fit). +# GWAS : the obj's FIRST element is the specific GWAS study; the WHOLE path is +# applied to rds[[block]], e.g. +# "AD_Bellenguez_2022 RSS_QC_RAISS_imputed susie_result_trimmed" +# reaches that one study's fit. The GwasFMR study label = obj[[1]]. +# +# Inputs: +# --mode {qtl, gwas} +# --meta xqtl_meta tsv (qtl mode) or gwas_meta tsv (gwas mode); used to +# discover the per-study/per-block RDS basenames + labels. When +# absent / unreadable the converter falls back to globbing +# --data-dir for the matching RDS pattern. +# --data-dir directory holding the legacy RDS files (and the meta tsv) +# --rds-files comma-separated explicit RDS paths; when given, bypasses +# --meta/--data-dir discovery and uses exactly those files +# (per-region mode). +# --finemapping-obj space-separated path to the fit inside inner (qtl) or +# rds[[block]] (gwas; first component = study). Defaults to the +# verified legacy QTL/GWAS paths when empty. +# --varname-obj space-separated path to the variant_names vector. +# --region-obj space-separated path to the region grange (accepted for +# CLI parity with the legacy interface; not consumed here). +# --method fine-mapping method label (default: susie for qtl, susie_rss for gwas) +# --output output collection RDS + +suppressPackageStartupMessages({ library(argparser); library(pecotmr) }) + +p <- arg_parser("Convert legacy SuSiE-enloc fine-mapping RDS to pecotmr S4") +p <- add_argument(p, "--mode", type = "character", + help = "qtl or gwas") +p <- add_argument(p, "--meta", type = "character", default = "", + help = "xqtl_meta / gwas_meta tsv (optional; else glob --data-dir)") +p <- add_argument(p, "--data-dir", type = "character", default = "", + help = "directory of the legacy *.rds files") +p <- add_argument(p, "--rds-files", type = "character", default = "", + help = "comma-separated explicit RDS paths (per-region; bypasses --meta/--data-dir)") +p <- add_argument(p, "--finemapping-obj", type = "character", default = "", + help = "space-separated object path to the fit (default: verified legacy path)") +p <- add_argument(p, "--varname-obj", type = "character", default = "", + help = "space-separated object path to variant_names (default: verified legacy path)") +p <- add_argument(p, "--region-obj", type = "character", default = "", + help = "space-separated object path to region grange (CLI parity; unused here)") +p <- add_argument(p, "--study", type = "character", default = "", + help = "QTL study label override (default: parsed from filename)") +p <- add_argument(p, "--method", type = "character", default = "", + help = "method label (default susie [qtl] / susie_rss [gwas])") +p <- add_argument(p, "--output", type = "character", + help = "output collection RDS") +argv <- parse_args(p) + +mode <- match.arg(argv$mode, c("qtl", "gwas")) +method <- if (nzchar(argv$method)) argv$method else + if (mode == "qtl") "susie" else "susie_rss" + +# Parse a single space-separated CLI string into a character path vector. +parseObjPath <- function(x) { + if (is.null(x) || !nzchar(trimws(x))) return(character(0)) + strsplit(trimws(x), "\\s+")[[1L]] +} + +# Object paths into the RDS -- REQUIRED, supplied by the caller (the SoS cell +# forwards the legacy --*-finemapping-obj / --*-varname-obj values). They are +# deliberately NOT defaulted: the fit / variant-name locations are a property of +# the input data, not of this script, so hardcoding them here would silently +# bind the converter to one dataset's layout. For GWAS the first path element is +# the study (which becomes the GwasFMR study label). +finemappingObj <- parseObjPath(argv$finemapping_obj) +varnameObj <- parseObjPath(argv$varname_obj) +if (length(finemappingObj) == 0L) + stop("--finemapping-obj is required (space-separated object path to the fit).") +if (length(varnameObj) == 0L) + stop("--varname-obj is required (space-separated object path to variant_names).") + +# ---- shared helpers -------------------------------------------------------- + +# Walk a nested list `x` along the character path components in `path`. An empty +# / NULL path returns `x` unchanged; a missing intermediate short-circuits NULL. +getNested <- function(x, path) { + if (is.null(path) || length(path) == 0L) return(x) + for (p in path) { + if (is.null(x)) return(NULL) + x <- x[[p]] + } + x +} + +# Parse "chr:pos:A1:A2" ids into the canonical identity columns. Tolerant of a +# missing "chr" prefix (gwas ids) and of malformed ids (NA-filled). +parseIds <- function(vids) { + vp <- strsplit(as.character(vids), ":", fixed = TRUE) + g <- function(i) vapply(vp, function(x) + if (length(x) >= i) x[[i]] else NA_character_, character(1)) + list(chrom = sub("^chr", "", g(1L)), + pos = suppressWarnings(as.integer(g(2L))), + A1 = g(3L), + A2 = g(4L)) +} + +# Per-variant posterior mean / sd from a (trimmed) SuSiE fit's single-effect +# matrices: E[b] = sum_l alpha_lj mu_lj ; Var[b] = sum_l alpha_lj mu2_lj - E[b]^2. +# Falls back to 0 / a small placeholder when the matrices are unavailable. +posteriorMeanSd <- function(fit, n) { + alpha <- if (!is.null(fit$alpha)) as.matrix(fit$alpha) else NULL + mu <- if (!is.null(fit$mu)) as.matrix(fit$mu) else NULL + mu2 <- if (!is.null(fit$mu2)) as.matrix(fit$mu2) else NULL + if (!is.null(alpha) && !is.null(mu) && all(dim(alpha) == dim(mu)) && + ncol(alpha) == n) { + pm <- as.numeric(colSums(alpha * mu)) + ps <- if (!is.null(mu2) && all(dim(alpha) == dim(mu2))) + as.numeric(sqrt(pmax(colSums(alpha * mu2) - pm^2, 0))) + else pmax(abs(pm) * 0.5, 0.05) + return(list(mean = pm, sd = ps)) + } + list(mean = rep(0, n), sd = rep(0.05, n)) +} + +# Credible-set membership label per variant: "L" for variants in the k-th +# credible set of susie_result_trimmed$sets$cs, "0" otherwise. Index vectors in +# $sets$cs are 1-based positions into the variant axis. +csLabels <- function(fit, n) { + out <- rep("0", n) + cs <- fit$sets$cs + if (is.null(cs) || length(cs) == 0L) return(out) + csNames <- names(cs) + for (k in seq_along(cs)) { + idx <- cs[[k]] + idx <- idx[!is.na(idx) & idx >= 1L & idx <= n] + if (length(idx) == 0L) next + lab <- if (!is.null(csNames) && nzchar(csNames[[k]])) csNames[[k]] + else paste0("L", k) + out[idx] <- lab + } + out +} + +# Build one FineMappingEntry from a legacy inner fit list. The fit and its +# variant-names are sourced via configurable object paths (relative to `inner`). +buildEntry <- function(inner, finemappingObj, varnameObj) { + fit <- getNested(inner, finemappingObj) + vids <- as.character(getNested(inner, varnameObj)) + n <- length(vids) + if (is.null(fit) || is.null(fit$pip) || n == 0L) return(NULL) + pip <- as.numeric(fit$pip) + if (length(pip) != n) return(NULL) + names(fit$pip) <- vids # name the fit's pip (drift check + pipelines) + pms <- posteriorMeanSd(fit, n) + ids <- parseIds(vids) + topLoci <- data.frame( + variant_id = vids, + chrom = ids$chrom, + pos = ids$pos, + A1 = ids$A1, + A2 = ids$A2, + pip = pip, + posterior_mean = pms$mean, + posterior_sd = pms$sd, + cs_95 = csLabels(fit, n), + stringsAsFactors = FALSE) + FineMappingEntry(variantIds = vids, susieFit = fit, topLoci = topLoci) +} + +# Discover the basenames of the legacy RDS to read, preferring the meta tsv's +# `original_data` column (comma-separated), falling back to a directory glob. +discoverFiles <- function(meta, dataDir, pattern) { + files <- character(0) + if (nzchar(meta) && file.exists(meta)) { + md <- tryCatch( + utils::read.delim(meta, header = TRUE, sep = "\t", + check.names = FALSE, comment.char = "", + stringsAsFactors = FALSE), + error = function(e) NULL) + if (!is.null(md) && "original_data" %in% colnames(md)) { + bn <- unlist(strsplit(as.character(md$original_data), ",", fixed = TRUE)) + bn <- trimws(bn[nzchar(trimws(bn))]) + files <- file.path(dataDir, unique(bn)) + files <- files[file.exists(files)] + } + } + if (length(files) == 0L) { + files <- list.files(dataDir, pattern = pattern, full.names = TRUE) + } + unique(files) +} + +# ---- QTL mode -------------------------------------------------------------- +# RDS[[gene]][[context]] = inner. study label comes from --study (the SoS cell +# forwards ${name}); context = the inner key, trait = the gene. The fit/varname +# obj paths are the tail applied to `inner`. +convertQtl <- function(files, finemappingObj, varnameObj, studyOverride = "") { + if (!nzchar(studyOverride)) + stop("--study is required in QTL mode (the study label is a property of ", + "the analysis, not encoded generically in the filename).") + fs <- character(0); fc <- character(0); ft <- character(0) + fm <- character(0); fe <- list() + for (f in files) { + study <- studyOverride + rds <- readRDS(f) + for (gene in names(rds)) { + for (context in names(rds[[gene]])) { + entry <- buildEntry(rds[[gene]][[context]], finemappingObj, varnameObj) + if (is.null(entry)) next + fe[[length(fe) + 1L]] <- entry + fs <- c(fs, study); fc <- c(fc, context) + ft <- c(ft, gene); fm <- c(fm, method) + } + } + } + if (length(fe) == 0L) stop("No QTL fine-mapping entries built from inputs.") + QtlFineMappingResult(study = fs, context = fc, trait = ft, + method = fm, entry = fe, ldSketch = NULL) +} + +# ---- GWAS mode ------------------------------------------------------------- +# RDS[[block]] holds per-study nodes. The obj's FIRST element is the specific +# GWAS study; the WHOLE obj path applied to rds[[block]] reaches that one +# study's fit (getNested(rds[[block]], gwasObj)). study = obj[[1]], region_id = +# block. We iterate blocks (files) for that one study, skipping empty nodes. +convertGwas <- function(files, finemappingObj, varnameObj) { + gwasStudy <- finemappingObj[[1L]] + fitTail <- finemappingObj[-1L] # path inside rds[[block]][[study]] + varTail <- varnameObj[-1L] + gs <- character(0); gm <- character(0); gr <- character(0); ge <- list() + for (f in files) { + rds <- readRDS(f) + for (block in names(rds)) { + node <- rds[[block]][[gwasStudy]] + if (is.null(node) || length(node) == 0L) next # empty study in block + entry <- buildEntry(node, fitTail, varTail) + if (is.null(entry)) next + ge[[length(ge) + 1L]] <- entry + gs <- c(gs, gwasStudy); gm <- c(gm, method); gr <- c(gr, block) + } + } + if (length(ge) == 0L) stop("No GWAS fine-mapping entries built from inputs.") + # qtlEnrichmentPipeline hard-requires a non-NULL GenotypeHandle ldSketch on + # the GWAS side. It is never inspected for content here (the matching check + # short-circuits because the QTL ldSketch is NULL), so a minimal placeholder + # handle suffices for this MWE bridge. + ldSketch <- new("GenotypeHandle", + path = "", + format = "vcf", + snpInfo = data.frame(SNP = character(0), CHR = character(0), + BP = integer(0), A1 = character(0), + A2 = character(0), + stringsAsFactors = FALSE), + nSamples = 0L, sampleIds = character(0), + pgenPtr = NULL, chromPaths = character(0)) + GwasFineMappingResult(study = gs, method = gm, entry = ge, + region_id = gr, ldSketch = ldSketch) +} + +# ---- dispatch -------------------------------------------------------------- +# Per-region mode: --rds-files lists exactly which RDS to read (bypasses the +# whole-collection --meta/--data-dir discovery). Otherwise discover via meta/glob. +if (nzchar(trimws(argv$rds_files))) { + files <- trimws(strsplit(argv$rds_files, ",", fixed = TRUE)[[1L]]) + files <- files[nzchar(files)] + missing <- files[!file.exists(files)] + if (length(missing) > 0L) + stop("These --rds-files do not exist: ", paste(missing, collapse = ", ")) +} else { + pattern <- if (mode == "qtl") { + "univariate_susie_twas_weights\\.rds$" + } else { + "univariate_susie_rss\\.rds$" + } + files <- discoverFiles(argv$meta, argv$data_dir, pattern) +} +if (length(files) == 0L) + stop("No legacy RDS files found (rds-files='", argv$rds_files, "', meta='", + argv$meta, "', data-dir='", argv$data_dir, "').") +cat(sprintf("[%s mode] reading %d legacy RDS file(s)\n", mode, length(files))) + +res <- if (mode == "qtl") { + convertQtl(files, finemappingObj, varnameObj, argv$study) +} else { + convertGwas(files, finemappingObj, varnameObj) +} + +dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) +saveRDS(res, argv$output) +cat(sprintf("Wrote %s (%d entries) to %s\n", + class(res)[[1L]], nrow(res), argv$output)) +print(res) diff --git a/code/script/pecotmr_integration/legacy_twas_weights_convert.R b/code/script/pecotmr_integration/legacy_twas_weights_convert.R new file mode 100644 index 00000000..1050621d --- /dev/null +++ b/code/script/pecotmr_integration/legacy_twas_weights_convert.R @@ -0,0 +1,118 @@ +#!/usr/bin/env Rscript +# legacy_twas_weights_convert.R +# +# Bridge for migrating legacy TWAS analyses to the new pecotmr S4 path: convert +# a legacy `*.univariate_twas_weights.rds` (the nested list a legacy +# load_twas_weights() consumed: gene > context > {twas_weights, twas_cv_result, +# susie_weights_intermediate, variant_names, region_info, ...}) into: +# +# * a pecotmr::TwasWeights S4 RDS (one row per (study, context, trait, +# method); cvPerformance carried over so causalInferencePipeline's +# rsqCutoff weight-selection works), and +# * optionally a pecotmr::QtlFineMappingResult S4 RDS built from the SuSiE +# intermediate fit, so causalInferencePipeline can run MR. +# +# The pair is the input contract of twas.R (--twas-weights / --fine-mapping-result). +# +# CAVEAT (FMR): the legacy SuSiE intermediate fit carries pip + the coefficient +# (used as the per-variant QTL effect, topLoci$posterior_mean) but NO marginal +# standard error, so topLoci$posterior_sd is a magnitude-scaled placeholder. MR +# is only run by causalInferencePipeline for TWAS-significant tuples +# (mrPvalCutoff gate), so this placeholder only matters for genes that pass it. +# +# Inputs: +# --legacy Legacy univariate_twas_weights.rds +# --study Study identifier for the emitted rows +# --output Output TwasWeights RDS +# --output-fmr Optional output QtlFineMappingResult RDS (from the SuSiE fit) + +suppressPackageStartupMessages({ library(argparser); library(pecotmr) }) + +p <- arg_parser("Convert a legacy univariate_twas_weights.rds to pecotmr S4") +p <- add_argument(p, "--legacy", type = "character", + help = "legacy univariate_twas_weights.rds") +p <- add_argument(p, "--study", type = "character", default = "study", + help = "study identifier for the emitted rows") +p <- add_argument(p, "--output", type = "character", + help = "output TwasWeights RDS") +p <- add_argument(p, "--output-fmr", type = "character", default = "", + help = "optional output QtlFineMappingResult RDS (from the SuSiE fit)") +argv <- parse_args(p) + +legacy <- readRDS(argv$legacy) + +# Legacy per-method CV performance (1 x 6 matrix: corr,rsq,adj_rsq,pval,RMSE,MAE) +# -> the new cvPerformance shape list(metrics = named vector). +perfToCv <- function(perf) { + if (is.null(perf)) return(NULL) + v <- as.numeric(perf) + nms <- colnames(perf) + if (is.null(nms) || length(nms) != length(v)) + nms <- c("corr", "rsq", "adj_rsq", "pval", "RMSE", "MAE")[seq_along(v)] + list(metrics = stats::setNames(v, nms)) +} + +# ---- TwasWeights ----------------------------------------------------------- +rs <- character(0); rc <- character(0); rt <- character(0) +rm <- character(0); entries <- list() +for (trait in names(legacy)) { + for (ctxKey in names(legacy[[trait]])) { + inner <- legacy[[trait]][[ctxKey]] + context <- sub(paste0("_", trait, "$"), "", ctxKey) # bulk_rnaseq_ENSG... -> bulk_rnaseq + perfL <- inner$twas_cv_result$performance + for (wnm in names(inner$twas_weights)) { + tok <- sub("_weights$", "", wnm) # susie_weights -> susie + wmat <- inner$twas_weights[[wnm]] + vids <- if (!is.null(rownames(wmat))) rownames(wmat) else inner$variant_names + wvec <- stats::setNames(as.numeric(wmat), vids) + cv <- perfToCv(perfL[[paste0(tok, "_performance")]]) + entries[[length(entries) + 1L]] <- TwasWeightsEntry( + variantIds = vids, weights = wvec, cvPerformance = cv, + standardized = FALSE, dataType = context) + rs <- c(rs, argv$study); rc <- c(rc, context) + rt <- c(rt, trait); rm <- c(rm, tok) + } + } +} +tw <- TwasWeights(study = rs, context = rc, trait = rt, method = rm, entry = entries) +saveRDS(tw, argv$output) +cat(sprintf("Wrote TwasWeights (%d rows: %s) to %s\n", + nrow(tw), paste(unique(rm), collapse = ","), argv$output)) + +# ---- QtlFineMappingResult from the SuSiE intermediate fit (optional) ------- +if (nzchar(argv$output_fmr)) { + fs <- character(0); fc <- character(0); ft <- character(0) + fm <- character(0); fe <- list() + for (trait in names(legacy)) { + for (ctxKey in names(legacy[[trait]])) { + inner <- legacy[[trait]][[ctxKey]] + context <- sub(paste0("_", trait, "$"), "", ctxKey) + s <- inner$susie_weights_intermediate + if (is.null(s) || is.null(s$pip)) next + vids <- names(s$pip) + sw <- inner$twas_weights[["susie_weights"]] + bx <- as.numeric(sw)[match(vids, rownames(sw))]; bx[is.na(bx)] <- 0 + sx <- pmax(abs(bx) * 0.5, 0.05) # placeholder SE (legacy fit has none) + # getTopLoci()/.projectPosteriorView re-derives variant_id from + # chrom:pos:A1:A2 and sources beta<-posterior_mean, se<-posterior_sd, so + # supply those identity + effect columns under the canonical names. + vp <- strsplit(vids, ":", fixed = TRUE) + g <- function(i) vapply(vp, function(x) + if (length(x) >= i) x[[i]] else NA_character_, character(1)) + topLoci <- data.frame( + variant_id = vids, chrom = g(1L), + pos = suppressWarnings(as.integer(g(2L))), A1 = g(3L), A2 = g(4L), + pip = as.numeric(s$pip), posterior_mean = bx, posterior_sd = sx, + stringsAsFactors = FALSE) + fe[[length(fe) + 1L]] <- FineMappingEntry( + variantIds = vids, susieFit = s, topLoci = topLoci) + fs <- c(fs, argv$study); fc <- c(fc, context) + ft <- c(ft, trait); fm <- c(fm, "susie") + } + } + fmr <- QtlFineMappingResult(study = fs, context = fc, trait = ft, + method = fm, entry = fe) + saveRDS(fmr, argv$output_fmr) + cat(sprintf("Wrote QtlFineMappingResult (%d rows) to %s\n", + nrow(fmr), argv$output_fmr)) +} diff --git a/code/script/pecotmr_integration/twas.R b/code/script/pecotmr_integration/twas.R index bbffce9c..1fd20da4 100644 --- a/code/script/pecotmr_integration/twas.R +++ b/code/script/pecotmr_integration/twas.R @@ -8,11 +8,16 @@ # # Inputs: # --twas-weights Per-gene TwasWeights RDS (output of twas_weights.ipynb) -# --gwas-sumstats Per-LD-block GwasSumStats RDS +# --gwas-sumstats Per-region GwasSumStats RDS (one or more studies) # --fine-mapping-result Optional per-gene FineMappingResult RDS +# --rsq-cutoff CV-R^2 weight-selection cutoff (legacy rsq_cutoff) +# --rsq-pval-cutoff CV-p-value gate for selection (legacy rsq_pval_cutoff) +# --rsq-option CV r-squared metric: rsq / adj_rsq (legacy rsq_option) +# --rsq-pval-option CV p-value metric candidates (legacy rsq_pval_option) # --mr-pip-cutoff Pass-through (default 0.5) # --mr-method Pass-through; "ivwPerVariant" or "csAware". # Default "ivwPerVariant". +# --mr-pval-cutoff Run MR only where TWAS p < this (legacy mr_pval_cutoff) # --output Output RDS path suppressPackageStartupMessages({ @@ -36,10 +41,38 @@ parser <- add_argument(parser, "--mr-pip-cutoff", parser <- add_argument(parser, "--mr-method", help = "MR method: ivwPerVariant or csAware", type = "character", default = "ivwPerVariant") +parser <- add_argument(parser, "--rsq-cutoff", + help = "CV weight selection: per (study,context,trait,gwasStudy) keep only the best method whose cvPerformance rsq >= this; 0 disables (legacy twas_pipeline rsq_cutoff)", + type = "numeric", default = 0.01) +parser <- add_argument(parser, "--rsq-pval-cutoff", + help = "CV-p-value gate for weight selection: a method is eligible only when its cvPerformance p-value < this; Inf disables the gate (legacy rsq_pval_cutoff)", + type = "numeric", default = Inf) +parser <- add_argument(parser, "--rsq-option", + help = "cvPerformance metric used as 'r-squared' for the cutoff/ranking: 'rsq' or 'adj_rsq' (legacy rsq_option)", + type = "character", default = "rsq") +parser <- add_argument(parser, "--rsq-pval-option", + help = "Comma-separated candidate cvPerformance p-value metric names; the first present is used (legacy rsq_pval_option)", + type = "character", default = "adj_rsq_pval,pval") +parser <- add_argument(parser, "--mr-pval-cutoff", + help = "Run MR only where TWAS p-value < this; 1 disables the gate (legacy twas_pipeline mr_pval_cutoff)", + type = "numeric", default = 0.05) +parser <- add_argument(parser, "--mr-cpip-cutoff", + help = "Cumulative-PIP cutoff for csAware MR (causalInferencePipeline mrCpipCutoff)", + type = "numeric", default = 0.5) +parser <- add_argument(parser, "--combine-methods", + help = "Comma-separated method tokens for cross-method p-value combination (combinePValues); empty = none", + type = "character", default = "") parser <- add_argument(parser, "--output", help = "Output RDS path", type = "character") argv <- parse_args(parser) +# Cross-method combination: empty / "." -> NULL (skip), else split on comma. +combine_methods <- if (nzchar(argv$combine_methods) && argv$combine_methods != ".") + trimws(strsplit(argv$combine_methods, ",", fixed = TRUE)[[1L]]) else NULL + +# CV-p-value gate metric candidates (first present in cvPerformance is used). +rsq_pval_option <- trimws(strsplit(argv$rsq_pval_option, ",", fixed = TRUE)[[1L]]) + tw <- readRDS(argv$twas_weights) gss <- readRDS(argv$gwas_sumstats) @@ -54,8 +87,15 @@ res <- causalInferencePipeline( gwasSumStats = gss, twasWeights = tw, fineMappingResult = fmr, + rsqCutoff = argv$rsq_cutoff, + rsqPvalCutoff = argv$rsq_pval_cutoff, + rsqOption = argv$rsq_option, + rsqPvalOption = rsq_pval_option, mrPipCutoff = argv$mr_pip_cutoff, - mrMethod = argv$mr_method) + mrMethod = argv$mr_method, + mrCpipCutoff = argv$mr_cpip_cutoff, + mrPvalCutoff = argv$mr_pval_cutoff, + combineMethods = combine_methods) dir.create(dirname(argv$output), showWarnings = FALSE, recursive = TRUE) saveRDS(res, argv$output)