diff --git a/.github/workflows/python-app.yaml b/.github/workflows/python-app.yaml index c1c8176c..3ba2edd8 100644 --- a/.github/workflows/python-app.yaml +++ b/.github/workflows/python-app.yaml @@ -1,22 +1,28 @@ -name: Python application +name: MONSDA CI -on: [pull_request] +on: + pull_request: + types: [opened, synchronize, reopened, labeled, unlabeled] + workflow_dispatch: + inputs: + run_full_pipeline: + description: "Run full pipeline integration test" + required: false + default: false + type: boolean jobs: - monsdatest: - name: monsda-test - runs-on: "ubuntu-latest" + tests: + name: Unit tests (pytest) + runs-on: ubuntu-latest defaults: run: shell: bash -el {0} - # Docker Hub image that `postgres-job` executes in - container: node:latest - # service containers to run with `postgres-job` - steps: - - uses: actions/checkout@master + steps: + - uses: actions/checkout@v4 with: submodules: recursive - - uses: conda-incubator/setup-miniconda@master + - uses: conda-incubator/setup-miniconda@v3 with: miniconda-version: "latest" activate-environment: monsda-test @@ -28,25 +34,70 @@ jobs: show-channel-urls: true use-only-tar-bz2: false auto-activate-base: false - - name: build and install monsda + - name: build and install MONSDA run: | cd ${{ github.workspace }} - echo "Build start" - python -m pip install --upgrade pip - python -m pip install build - python -m build - pip install dist/*.whl - - name: preparing test + python -m pip install --upgrade pip + python -m pip install -e . + python -m pip install pytest + - name: run unit tests run: | - echo "Build finished, preparing tests" - export a=$(monsda --version 2>&1 >/dev/null |sed 's/MONSDA version //g') - sed -i "s/\"VERSION\": \"FIXME\"/\"VERSION\": \"$a\"/g" tests/data/config_Test.json - - name: running test - run: | - echo "Running tests" + cd ${{ github.workspace }} + pytest -q tests/test_Utils.py + + full-pipeline: + name: Full pipeline integration test + runs-on: ubuntu-latest + needs: tests + if: >- + (github.event_name == 'workflow_dispatch' && inputs.run_full_pipeline) || + (github.event_name == 'pull_request' && contains(github.event.pull_request.labels.*.name, 'full-pipeline')) + defaults: + run: + shell: bash -el {0} + steps: + - uses: actions/checkout@v4 + with: + submodules: recursive + - uses: conda-incubator/setup-miniconda@v3 + with: + miniconda-version: "latest" + activate-environment: monsda-test + environment-file: environment.yml + python-version: 3.12.2 + channels: conda-forge,bioconda + allow-softlinks: true + channel-priority: flexible + show-channel-urls: true + use-only-tar-bz2: false + auto-activate-base: false + - name: build and install MONSDA + run: | + cd ${{ github.workspace }} + python -m pip install --upgrade pip + python -m pip install -e . + - name: prepare integration test config + run: | + cd ${{ github.workspace }} + VERSION=$(monsda --version 2>&1 | sed 's/MONSDA version //g') + sed -i "s/\"VERSION\": \"FIXME\"/\"VERSION\": \"${VERSION}\"/g" tests/data/config_Test.json + - name: run full pipeline test + run: | cd ${{ github.workspace }}/tests - ln -fs data/* . + ln -fs data/* . mkdir -p CONDALIB - monsda -j 6 -c config_Test.json --directory ${PWD} --use-conda --conda-prefix CONDALIB --save && echo "MONSDA test passed" || echo "MONSDA test failed" - #chmod +x cicd_test.sh - #./cicd_test.sh \ No newline at end of file + monsda -j 6 -c config_Test.json --directory ${PWD} --use-conda --conda-prefix CONDALIB --save + - name: upload test logs on failure + if: failure() + uses: actions/upload-artifact@v4 + with: + name: full-pipeline-debug-artifacts + path: | + ${{ github.workspace }}/tests/LOGS + ${{ github.workspace }}/tests/SUBSNAKES + ${{ github.workspace }}/tests/FASTQ + ${{ github.workspace }}/tests/MAPPED + ${{ github.workspace }}/tests/QC + ${{ github.workspace }}/tests/COUNTING + ${{ github.workspace }}/tests/PEAKS + ${{ github.workspace }}/tests/TRACKS \ No newline at end of file diff --git a/MONSDA/Params.py b/MONSDA/Params.py index 7f5346a1..71182733 100644 --- a/MONSDA/Params.py +++ b/MONSDA/Params.py @@ -1504,14 +1504,18 @@ def comparable_as_string(config: dict, subwork: str) -> str: logid + "no comparables found in " + subwork + ". Compare All vs. All." ) groups_by_condition = list(mu.yield_from_dict("GROUPS", config)) + log.debug(logid + "Groups by condition: " + str(groups_by_condition)) flattened = sorted( set(val for sublist in groups_by_condition for val in sublist) ) + log.debug(logid + "Flattened groups: " + str(flattened)) combined = list(set(itertools.permutations(flattened, 2))) + log.debug(logid + "Combined groups: " + str(combined)) complist = [] for key, value in combined: complist.append(f"{key}-vs-{value}:{key}-vs-{value}") compstr = ",".join(complist) + log.debug(logid + "Comparables string: " + compstr) return compstr diff --git a/MONSDA/Utils.py b/MONSDA/Utils.py index 2efc714e..29a8c4d2 100644 --- a/MONSDA/Utils.py +++ b/MONSDA/Utils.py @@ -125,6 +125,10 @@ def setup_logger(scriptname): exc_tb, ) print("".join(tbe.format()), file=sys.stderr) + # Fallback logger if initialization fails (e.g., during testing) + scriptname = "MONSDA" + log = logging.getLogger(scriptname) + log.setLevel(logging.INFO) # NestedDefaultDict @@ -142,7 +146,14 @@ class NestedDefaultDict(collections.defaultdict): NestedDefaultDict """ def __init__(self, *args, **kwargs): - super(NestedDefaultDict, self).__init__(NestedDefaultDict, *args, **kwargs) + default_factory = NestedDefaultDict + remaining_args = args + if args and callable(args[0]): + default_factory = args[0] + remaining_args = args[1:] + super(NestedDefaultDict, self).__init__( + default_factory, *remaining_args, **kwargs + ) def __repr__(self): return repr(dict(self)) @@ -197,11 +208,7 @@ def rmempty(check: list) -> list: list list of non-empty files """ - ret = list() - for f in check: - if os.path.isfile(f): - ret.append(f) - return ret + return [x for x in check if os.path.isfile(x)] @check_run @@ -231,7 +238,8 @@ def replacer(match): r'//.*?$|/\*.*?\*/|\'(?:\\.|[^\\\'])*\'|"(?:\\.|[^\\"])*"', re.DOTALL | re.MULTILINE, ) - return [re.sub(pattern, replacer, x) for x in textlist] + cleaned = [re.sub(pattern, replacer, x) for x in textlist] + return [line for line in cleaned if line.strip()] ############################## @@ -673,8 +681,14 @@ def find_key_for_value(val:str, dictionary:dict) -> dict.keys: if dict_inst(v): log.debug(logid + "item" + str(v)) yield from find_key_for_value(val, v) - elif v == val or val in v: - yield k + else: + contains = False + try: + contains = val in v + except TypeError: + contains = False + if v == val or contains: + yield k else: return dictionary @@ -1053,9 +1067,13 @@ def idfromfa(id:str) -> list: """ goi, chrom, strand = [None, None, None] try: - goi, chrom = id.split(":")[::2] - strand = str(id.split(":")[3].split("(")[1][0]) - except: + parts = id.split(":", 1) + goi = parts[0] + chrom_info = parts[1] + chrom = chrom_info.split(".", 1)[0] + sm = re.search(r"\(([+-])\)", id) + strand = sm.group(1) if sm else "na" + except Exception: print( "Fasta header is not in expected format, you will loose information on strand and chromosome" ) @@ -1153,12 +1171,11 @@ def multi_replace(repl:str, text:str) -> str: str string with replacements """ - print("MULTI: " + str(repl) + str(text)) - # Create a regular expression from the dictionary keys - regex = re.compile("(%s)" % "|".join(map(re.escape, repl.keys()))) + # Create a regular expression from the dictionary keys + regex = re.compile(r"\b(%s)\b" % "|".join(map(re.escape, repl.keys()))) # For each match, look-up corresponding value in dictionary - return regex.sub(lambda mo: dict[mo.string[mo.start() : mo.end()]], text) + return regex.sub(lambda mo: repl[mo.string[mo.start() : mo.end()]], text) @check_run @@ -1224,16 +1241,9 @@ def add_to_innermost_key_by_list(addto:dict, toadd:str, keylist:list) -> dict: logid = scriptname + ".add_to_innermost_key_by_list: " log.debug(logid + str(addto) + ", " + str(toadd)) - tconf = {} - for i in range( - len(keylist) - ): # need to add options as last element to dict of unknown depth - tconf[keylist[i]] = {} - tconf = tconf[keylist[i]] - if i == len(keylist) - 1: - tconf = toadd - - addto.update(tconf) + if not keylist: + return addto + nested_set(addto, keylist, toadd) return addto diff --git a/MONSDA/Workflows.py b/MONSDA/Workflows.py index 2bffa091..88aaf92a 100755 --- a/MONSDA/Workflows.py +++ b/MONSDA/Workflows.py @@ -570,6 +570,12 @@ def make_pre( subname = toolenv + ".smk" if works[j] == "QC": + if toolenv == "rustqc": + log.warning( + logid + + "RustQC is post-alignment QC and requires MAPPING in the workflow, skipping preprocess QC" + ) + continue subname = toolenv + "_raw.smk" smkf = os.path.abspath(os.path.join(workflowpath, subname)) @@ -690,6 +696,12 @@ def make_pre( subname = toolenv + ".smk" if subwork == "QC": + if toolenv == "rustqc": + log.warning( + logid + + "RustQC is post-alignment QC and requires MAPPING in the workflow, skipping preprocess QC" + ) + continue subname = toolenv + "_raw.smk" # Add rulethemall based on chosen workflows @@ -813,6 +825,9 @@ def make_sub( works = worklist[i].split("-") envs = envlist[i].split("-") subjobs = list() + use_rustqc_multiqc = any( + works[j] == "QC" and envs[j] == "rustqc" for j in range(len(works)) + ) # Add variable for combination string subjobs.append( @@ -869,10 +884,22 @@ def make_sub( subconf.update(sconf) log.debug(logid + f"SCONF:{sconf}, SUBCONF:{subconf}") + # RustQC is post-alignment QC only; skip if no MAPPING + if ( + works[j] == "QC" + and toolenv == "rustqc" + and "MAPPING" not in works + ): + log.warning( + logid + + "RustQC requires mapped BAM files, skipping QC step as MAPPING is not in the workflow" + ) + continue if ( works[j] == "QC" and "TRIMMING" in works and "MAPPING" not in works + and toolenv != "rustqc" ): if "DEDUP" in works and "umitools" in envs: subname = toolenv + "_dedup_trim.smk" @@ -883,6 +910,7 @@ def make_sub( works[j] == "QC" and "TRIMMING" not in works and "MAPPING" not in works + and toolenv != "rustqc" ): if "DEDUP" in subworkflows and "umitools" in envs: subname = toolenv + "_dedup.smk" @@ -929,7 +957,13 @@ def make_sub( subjobs.append(line) subjobs.append("\n\n") if "QC" in subworkflows: - smkf = os.path.abspath(os.path.join(workflowpath, "multiqc.smk")) + # Use rustqc-specific multiqc whenever rustqc is part of this combo + qc_mqc = ( + "multiqc_rustqc.smk" + if use_rustqc_multiqc + else "multiqc.smk" + ) + smkf = os.path.abspath(os.path.join(workflowpath, qc_mqc)) with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): line = re.sub(condapath, 'conda: "' + envpath, line) @@ -1032,10 +1066,22 @@ def make_sub( sconf[subwork + "BIN"] = toolbin subconf.update(sconf) + # RustQC is post-alignment QC only; skip if no MAPPING + if ( + subwork == "QC" + and toolenv == "rustqc" + and "MAPPING" not in subworkflows + ): + log.warning( + logid + + "RustQC requires mapped BAM files, skipping QC step as MAPPING is not in the workflow" + ) + continue if ( subwork == "QC" and "TRIMMING" in subworkflows and "MAPPING" not in subworkflows + and toolenv != "rustqc" ): if "DEDUP" in subworkflows and "picard" not in any( [x for x in listoftools] @@ -1047,7 +1093,7 @@ def make_sub( # Picard tools can be extended here if subwork == "DEDUP" and toolenv == "picard": subname = toolenv + "_dedup.smk" - subconf.poop("PREDEDUP", None) + subconf.pop("PREDEDUP", None) elif works[j] == "DEDUP" and toolenv == "umitools": subconf["PREDEDUP"] = "enabled" # Add rulethemall based on chosen workflows @@ -1092,7 +1138,15 @@ def make_sub( subjobs.append(line) subjobs.append("\n\n") if "QC" in subworkflows: - smkf = os.path.abspath(os.path.join(workflowpath, "multiqc.smk")) + # Use rustqc-specific multiqc whenever rustqc is part of this combo + _qc_tools, _ = create_subworkflow(config, "QC", [condition]) + qc_mqc = ( + "multiqc_rustqc.smk" + if _qc_tools is not None + and any(str(t[0]) == "rustqc" for t in _qc_tools) + else "multiqc.smk" + ) + smkf = os.path.abspath(os.path.join(workflowpath, qc_mqc)) with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): line = re.sub(condapath, 'conda: "' + envpath, line) @@ -1159,197 +1213,20 @@ def make_post( if combinations: combname = mp.get_combo_name(combinations) - for condition in combname: - envlist = list(combname[condition].get("envs")) - log.debug(logid + f"POSTLISTS:{condition}, {postworkflow}, {envlist}") - - subconf = mu.NestedDefaultDict() - add = list() - - smkf = os.path.abspath(os.path.join(workflowpath, "header.smk")) - with open(smkf, "r") as smk: - for line in mu.comment_remover(smk.readlines()): - line = re.sub(logfix, "loglevel='" + loglevel + "'", line) - line = re.sub(condapath, 'conda: "' + envpath, line) - if "include: " in line: - line = fixinclude( - line, loglevel, condapath, envpath, workflowpath, logfix - ) - add.append(line) - - for i in range(len(envlist)): - envs = envlist[i].split("-") - flowlist = list() - listoftools, listofconfigs = create_subworkflow( - config, postworkflow, [condition], stage="POST" - ) - - if listoftools is None: - log.warning( - logid - + "No entry fits condition " - + str(condition) - + " for processing step " - + str(postworkflow) - ) - continue - - sconf = listofconfigs[0] - sconf.pop("PREDEDUP", None) # cleanup - - for c in range(1, len(listofconfigs)): - sconf = mu.merge_dicts(sconf, listofconfigs[c]) - - for a in range(0, len(listoftools)): - subjobs = list() - toolenv, toolbin = map(str, listoftools[a]) - - log.debug(logid + "toolenv: " + str(toolenv)) - if postworkflow == "CIRCS": - if toolenv == "ciri2" and "bwa" not in envs: - log.warning( - "CIRI2 needs BWA mapped files, will skip input produced otherwise" - ) - continue - - tc = list(condition) - tc.append(toolenv) - sconf[postworkflow].update(mu.subset_dict(config[postworkflow], tc)) - - if sconf[postworkflow].get("TOOLS"): - sconf[postworkflow]["TOOLS"] = mu.sub_dict( - sconf[postworkflow]["TOOLS"], [toolenv] - ) - - if postworkflow in ["DE", "DEU", "DAS", "DTU"] and toolbin not in [ - "deseq", - "diego", - ]: # for all other postprocessing tools we have more than one defined subworkflow - toolenv = toolenv + "_" + postworkflow - - sconf[postworkflow + "ENV"] = toolenv - sconf[postworkflow + "BIN"] = toolbin - - log.debug( - logid - + "POSTPROCESS: " - + str(postworkflow) - + " CONDITION: " - + str(condition) - + " TOOL: " - + str(toolenv) - ) - - scombo = str(envlist[i]) if envlist[i] != "" else "" - combo = ( - str.join(os.sep, [str(envlist[i]), toolenv]) - if envlist[i] != "" - else toolenv - ) - - # Add variable for combination string - subjobs.append( - "\ncombo = '" - + combo - + "'\n" - + "\nscombo = '" - + scombo - + "'\n" - + '\nwildcard_constraints:\n combo = combo,\n scombo = scombo,\n read = "R1|R2",\n type = "sorted|sorted_unique" if not rundedup else "sorted|sorted_unique|sorted_dedup|sorted_unique_dedup"' - ) - subjobs.append("\n\n") - subconf.update(sconf) - - subname = toolenv + ".smk" - smkf = os.path.abspath(os.path.join(workflowpath, subname)) - - if ( - toolbin in ["salmon", "kallisto"] - and "TRIMMING" not in config["WORKFLOWS"] - ): - log.debug(logid + "Simulated read trimming only!") - mu.makeoutdir("TRIMMED_FASTQ") - smkf = ( - os.path.abspath(os.path.join(workflowpath, toolenv)) - + "_trim.smk" - ) - with open(smkf, "r") as smk: - for line in mu.comment_remover(smk.readlines()): - line = re.sub(logfix, "loglevel='" + loglevel + "'", line) - line = re.sub(condapath, 'conda: "' + envpath, line) - if "include: " in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - ) - subjobs.append(line) - subjobs.append("\n\n") - - # Append footer and write out subsnake and subconf per condition - smkf = os.path.abspath(os.path.join(workflowpath, "footer.smk")) - with open(smkf, "r") as smk: - for line in mu.comment_remover(smk.readlines()): - line = re.sub(condapath, 'conda: "../', line) - if "include: " in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - ) - subjobs.append(line) - subjobs.append("\n\n") - - te = ( - toolenv.split("_")[0] if "_" in toolenv else toolenv - ) # shorten toolenv if subwork is already added - smko = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - postworkflow, - te, - "subsnake.smk", - ] - ), - ) - ) - - write_if_different(smko, "".join(add) + "".join(subjobs)) - - confo = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - postworkflow, - te, - "subconfig.json", - ] - ), - ) - ) - - dump_if_different(confo, subconf) - - jobs.append([smko, confo]) + # Postprocessing runs across all conditions - collect all conditions at once + all_conditions = list(combname.keys()) + # Use the envlist from the first condition; all conditions are expected to share + # the same upstream tool chain (e.g. same aligner) for cross-condition analysis + first_condition = all_conditions[0] + envlist = list(combname[first_condition].get("envs")) + log.debug( + logid + + f"POSTLISTS (all conditions):{all_conditions}, {postworkflow}, {envlist}" + ) - else: - subwork = postworkflow - add = list() subconf = mu.NestedDefaultDict() - toollist = list() + add = list() + smkf = os.path.abspath(os.path.join(workflowpath, "header.smk")) with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): @@ -1361,45 +1238,81 @@ def make_post( ) add.append(line) - for condition in conditions: - # subconf = mu.NestedDefaultDict() - + for i in range(len(envlist)): + envs = envlist[i].split("-") + # Collect configs from ALL conditions at once for cross-condition postprocessing listoftools, listofconfigs = create_subworkflow( - config, subwork, [condition], stage="POST" + config, postworkflow, all_conditions, stage="POST" ) if listoftools is None: log.warning( logid - + "No entry fits condition " - + str(condition) + + "No entry fits conditions " + + str(all_conditions) + " for processing step " - + str(subwork) + + str(postworkflow) ) continue sconf = listofconfigs[0] sconf.pop("PREDEDUP", None) # cleanup - subconf.update(sconf) - toollist.extend(listoftools) - listoftools = [list(x) for x in set(tuple(x) for x in toollist)] + # Merge configs from all conditions + for c in range(1, len(listofconfigs)): + if listofconfigs[c] is not None: + sconf = mu.merge_dicts(sconf, listofconfigs[c]) + for a in range(0, len(listoftools)): subjobs = list() - toolenv, toolbin = map(str, listoftools[a]) - if subwork in ["DE", "DEU", "DAS", "DTU"] and toolbin not in [ + + log.debug(logid + "toolenv: " + str(toolenv)) + if postworkflow == "CIRCS": + if toolenv == "ciri2" and "bwa" not in envs: + log.warning( + "CIRI2 needs BWA mapped files, will skip input produced otherwise" + ) + continue + + # Deep-merge postworkflow config from all conditions (preserves all condition paths) + for cond in all_conditions: + tc = list(cond) + tc.append(toolenv) + sconf[postworkflow] = mu.merge_dicts( + sconf[postworkflow], mu.subset_dict(config[postworkflow], tc) + ) + + if sconf[postworkflow].get("TOOLS"): + sconf[postworkflow]["TOOLS"] = mu.sub_dict( + sconf[postworkflow]["TOOLS"], [toolenv] + ) + + if postworkflow in ["DE", "DEU", "DAS", "DTU"] and toolbin not in [ "deseq", "diego", ]: # for all other postprocessing tools we have more than one defined subworkflow - toolenv = toolenv + "_" + subwork - log.debug(logid + "toolenv: " + str(toolenv)) + toolenv = toolenv + "_" + postworkflow - subconf[subwork + "ENV"] = toolenv - subconf[subwork + "BIN"] = toolbin + sconf[postworkflow + "ENV"] = toolenv + sconf[postworkflow + "BIN"] = toolbin - scombo = "" - combo = toolenv + log.debug( + logid + + "POSTPROCESS: " + + str(postworkflow) + + " ALL CONDITIONS: " + + str(all_conditions) + + " TOOL: " + + str(toolenv) + ) + + scombo = str(envlist[i]) if envlist[i] != "" else "" + combo = ( + str.join(os.sep, [str(envlist[i]), toolenv]) + if envlist[i] != "" + else toolenv + ) # Add variable for combination string subjobs.append( @@ -1412,7 +1325,7 @@ def make_post( + '\nwildcard_constraints:\n combo = combo,\n scombo = scombo,\n read = "R1|R2",\n type = "sorted|sorted_unique" if not rundedup else "sorted|sorted_unique|sorted_dedup|sorted_unique_dedup"' ) subjobs.append("\n\n") - # subconf.update(sconf) + subconf.update(sconf) subname = toolenv + ".smk" smkf = os.path.abspath(os.path.join(workflowpath, subname)) @@ -1424,26 +1337,38 @@ def make_post( log.debug(logid + "Simulated read trimming only!") mu.makeoutdir("TRIMMED_FASTQ") smkf = ( - os.path.abspath(os.path.join(workflowpath, toolenv)) + "_trim.smk" + os.path.abspath(os.path.join(workflowpath, toolenv)) + + "_trim.smk" ) with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): - line = re.sub(condapath, 'conda: "' + envpath, line) + line = re.sub(logfix, "loglevel='" + loglevel + "'", line) + line = re.sub(condapath, 'conda: "' + envpath, line) if "include: " in line: line = fixinclude( - line, loglevel, condapath, envpath, workflowpath, logfix + line, + loglevel, + condapath, + envpath, + workflowpath, + logfix, ) subjobs.append(line) - subjobs.append("\n\n") + subjobs.append("\n\n") - # Append footer and write out subsnake and subconf per condition + # Append footer and write out subsnake and subconf for all conditions smkf = os.path.abspath(os.path.join(workflowpath, "footer.smk")) with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): - line = re.sub(condapath, 'conda: "' + envpath, line) + line = re.sub(condapath, 'conda: "../', line) if "include: " in line: line = fixinclude( - line, loglevel, condapath, envpath, workflowpath, logfix + line, + loglevel, + condapath, + envpath, + workflowpath, + logfix, ) subjobs.append(line) subjobs.append("\n\n") @@ -1454,7 +1379,15 @@ def make_post( smko = os.path.abspath( os.path.join( subdir, - "_".join(["_".join(condition), subwork, te, "subsnake.smk"]), + "_".join( + [ + "allconditions", + envlist[i], + postworkflow, + te, + "subsnake.smk", + ] + ), ) ) @@ -1463,7 +1396,15 @@ def make_post( confo = os.path.abspath( os.path.join( subdir, - "_".join(["_".join(condition), subwork, te, "subconfig.json"]), + "_".join( + [ + "allconditions", + envlist[i], + postworkflow, + te, + "subconfig.json", + ] + ), ) ) @@ -1471,62 +1412,187 @@ def make_post( jobs.append([smko, confo]) - return jobs + else: + subwork = postworkflow + add = list() + subconf = mu.NestedDefaultDict() + smkf = os.path.abspath(os.path.join(workflowpath, "header.smk")) + with open(smkf, "r") as smk: + for line in mu.comment_remover(smk.readlines()): + line = re.sub(logfix, "loglevel='" + loglevel + "'", line) + line = re.sub(condapath, 'conda: "' + envpath, line) + if "include: " in line: + line = fixinclude( + line, loglevel, condapath, envpath, workflowpath, logfix + ) + add.append(line) + # Postprocessing runs across all conditions - collect configs from all conditions at once + listoftools, listofconfigs = create_subworkflow( + config, subwork, conditions, stage="POST" + ) -@check_run -def make_summary(config, subdir, loglevel, combinations=None): - logid = scriptname + ".Workflows_make_summary: " + if listoftools is None: + log.warning( + logid + + "No entry fits any condition for processing step " + + str(subwork) + ) + return jobs - output = "REPORTS/SUMMARY/summary.Rmd" - jobs = list() - lines = list() - condapath = re.compile(r'conda:\s+"') - logfix = re.compile(r'loglevel="INFO"') + sconf = listofconfigs[0] + sconf.pop("PREDEDUP", None) # cleanup - envlist = list() - if combinations: - combname = mp.get_combo_name(combinations) - for condition in combname: - envlist.extend(combname[condition]["envs"]) - envlist = list(set(envlist)) - # Add Header - sum_path = os.path.join(installpath, "MONSDA", "scripts", "Analysis", "SUMMARY") - rmd_header = os.path.abspath(os.path.join(sum_path, "header_summary.Rmd")) + # Merge configs from all conditions + for c in range(1, len(listofconfigs)): + if listofconfigs[c] is not None: + sconf = mu.merge_dicts(sconf, listofconfigs[c]) - with open(rmd_header, "r") as read_file: - for line in read_file.readlines(): - lines.append(line) - lines.append("\n\n") + subconf.update(sconf) + toollist = [list(x) for x in set(tuple(x) for x in listoftools)] - # Add rMarkdown snippets - if len(envlist) == 0: - envlist.append("") - for scombo in envlist: - scombo = str(scombo) + os.sep - snippath = str.join(os.sep, ["REPORTS", "SUMMARY", "RmdSnippets", scombo + "*"]) - snippets = [x for x in glob.glob(snippath) if os.path.isfile(x)] + for a in range(0, len(toollist)): + subjobs = list() - log.debug(f"{logid} snippets found: {snippets} for path: {snippath}") - for snippet in snippets: - with open(snippet, "r") as read_file: - for line in read_file.readlines(): - if line.startswith("# "): - lines = [[l] for l in "@$@".join(lines).split(line)] - lines[0].append(line) - lines = "".join(sum(lines, [])).split("@$@") + toolenv, toolbin = map(str, toollist[a]) + if subwork in ["DE", "DEU", "DAS", "DTU"] and toolbin not in [ + "deseq", + "diego", + ]: # for all other postprocessing tools we have more than one defined subworkflow + toolenv = toolenv + "_" + subwork + log.debug(logid + "toolenv: " + str(toolenv)) - if os.path.exists(output): - os.rename(output, output + ".bak") - with open(output, "a") as writefile: - for line in lines: - writefile.write(line) - writefile.write("\n\n") + subconf[subwork + "ENV"] = toolenv + subconf[subwork + "BIN"] = toolbin - subjobs = list() + scombo = "" + combo = toolenv - smkf = os.path.abspath(os.path.join(workflowpath, "header.smk")) - with open(smkf, "r") as smk: + # Add variable for combination string + subjobs.append( + "\ncombo = '" + + combo + + "'\n" + + "\nscombo = '" + + scombo + + "'\n" + + '\nwildcard_constraints:\n combo = combo,\n scombo = scombo,\n read = "R1|R2",\n type = "sorted|sorted_unique" if not rundedup else "sorted|sorted_unique|sorted_dedup|sorted_unique_dedup"' + ) + subjobs.append("\n\n") + + subname = toolenv + ".smk" + smkf = os.path.abspath(os.path.join(workflowpath, subname)) + + if ( + toolbin in ["salmon", "kallisto"] + and "TRIMMING" not in config["WORKFLOWS"] + ): + log.debug(logid + "Simulated read trimming only!") + mu.makeoutdir("TRIMMED_FASTQ") + smkf = ( + os.path.abspath(os.path.join(workflowpath, toolenv)) + "_trim.smk" + ) + with open(smkf, "r") as smk: + for line in mu.comment_remover(smk.readlines()): + line = re.sub(condapath, 'conda: "' + envpath, line) + if "include: " in line: + line = fixinclude( + line, loglevel, condapath, envpath, workflowpath, logfix + ) + subjobs.append(line) + subjobs.append("\n\n") + + # Append footer and write out subsnake and subconf for all conditions + smkf = os.path.abspath(os.path.join(workflowpath, "footer.smk")) + with open(smkf, "r") as smk: + for line in mu.comment_remover(smk.readlines()): + line = re.sub(condapath, 'conda: "' + envpath, line) + if "include: " in line: + line = fixinclude( + line, loglevel, condapath, envpath, workflowpath, logfix + ) + subjobs.append(line) + subjobs.append("\n\n") + + te = ( + toolenv.split("_")[0] if "_" in toolenv else toolenv + ) # shorten toolenv if subwork is already added + smko = os.path.abspath( + os.path.join( + subdir, + "_".join(["allconditions", subwork, te, "subsnake.smk"]), + ) + ) + + write_if_different(smko, "".join(add) + "".join(subjobs)) + + confo = os.path.abspath( + os.path.join( + subdir, + "_".join(["allconditions", subwork, te, "subconfig.json"]), + ) + ) + + dump_if_different(confo, subconf) + + jobs.append([smko, confo]) + + return jobs + + +@check_run +def make_summary(config, subdir, loglevel, combinations=None): + logid = scriptname + ".Workflows_make_summary: " + + output = "REPORTS/SUMMARY/summary.Rmd" + jobs = list() + lines = list() + condapath = re.compile(r'conda:\s+"') + logfix = re.compile(r'loglevel="INFO"') + + envlist = list() + if combinations: + combname = mp.get_combo_name(combinations) + for condition in combname: + envlist.extend(combname[condition]["envs"]) + envlist = list(set(envlist)) + # Add Header + sum_path = os.path.join(installpath, "MONSDA", "scripts", "Analysis", "SUMMARY") + rmd_header = os.path.abspath(os.path.join(sum_path, "header_summary.Rmd")) + + with open(rmd_header, "r") as read_file: + for line in read_file.readlines(): + lines.append(line) + lines.append("\n\n") + + # Add rMarkdown snippets + if len(envlist) == 0: + envlist.append("") + for scombo in envlist: + scombo = str(scombo) + os.sep + snippath = str.join(os.sep, ["REPORTS", "SUMMARY", "RmdSnippets", scombo + "*"]) + snippets = [x for x in glob.glob(snippath) if os.path.isfile(x)] + + log.debug(f"{logid} snippets found: {snippets} for path: {snippath}") + for snippet in snippets: + with open(snippet, "r") as read_file: + for line in read_file.readlines(): + if line.startswith("# "): + lines = [[l] for l in "@$@".join(lines).split(line)] + lines[0].append(line) + lines = "".join(sum(lines, [])).split("@$@") + + if os.path.exists(output): + os.rename(output, output + ".bak") + with open(output, "a") as writefile: + for line in lines: + writefile.write(line) + writefile.write("\n\n") + + subjobs = list() + + smkf = os.path.abspath(os.path.join(workflowpath, "header.smk")) + with open(smkf, "r") as smk: for line in mu.comment_remover(smk.readlines()): subjobs.append(line) subjobs.append("\n\n") @@ -1731,7 +1797,11 @@ def nf_fetch_params( log.error(logid + "No samples found, please check config file") sys.exit(logid + "ERROR: No samples found, please check config file") - SETUP = mu.keysets_from_dict(config["SETTINGS"], "SAMPLES")[0] + SETUP = ( + tuple(condition) + if condition + else mu.keysets_from_dict(config["SETTINGS"], "SAMPLES")[0] + ) SETS = os.sep.join(SETUP) SETTINGS = mu.sub_dict(config["SETTINGS"], SETUP) @@ -1800,11 +1870,14 @@ def nf_fetch_params( MAPCONF = mu.sub_dict(config["MAPPING"], SETUP) MAPPERBIN, MAPPERENV = mp.env_bin_from_config(config, "MAPPING") MAPPERENV = MAPPERENV.split("_")[0] - MAPOPT = MAPCONF.get(MAPPERENV).get("OPTIONS") + # Subconfigs (from nf_make_sub) have the toolenv key stripped out; + # fall back to MAPCONF itself when the toolenv key is absent. + mapper_dict = MAPCONF.get(MAPPERENV) or MAPCONF + MAPOPT = mapper_dict.get("OPTIONS", {}) log.debug(logid + "MAPPINGCONFIG: " + str(SETUP) + "\t" + str(MAPCONF)) - REF = MAPCONF[MAPPERENV].get("REFERENCE", MAPCONF.get("REFERENCE")) - MANNO = MAPCONF[MAPPERENV].get("ANNOTATION", MAPCONF.get("ANNOTATION")) - MDECOY = MAPCONF[MAPPERENV].get("DECOY", MAPCONF.get("DECOY")) + REF = mapper_dict.get("REFERENCE", MAPCONF.get("REFERENCE")) + MANNO = mapper_dict.get("ANNOTATION", MAPCONF.get("ANNOTATION")) + MDECOY = mapper_dict.get("DECOY", MAPCONF.get("DECOY")) if REF: REFERENCE = REF REFDIR = str(os.path.dirname(REFERENCE)) @@ -1830,13 +1903,13 @@ def nf_fetch_params( PREFIX = PRE if not PREFIX or PREFIX is None: PREFIX = MAPPERENV - IDX = MAPCONF.get("INDEX", MAPCONF[MAPPERENV].get("INDEX")) + IDX = MAPCONF.get("INDEX", mapper_dict.get("INDEX")) if IDX: INDEX = IDX if not INDEX: INDEX = str.join(os.sep, [REFDIR, "INDICES", MAPPERENV]) + ".idx" keydict = mu.sub_dict( - mp.tool_params(SAMPLES[0], None, config, "MAPPING", MAPPERENV)["OPTIONS"], + MAPOPT, ["INDEX"], ) keydict["REF"] = REFERENCE @@ -1854,7 +1927,7 @@ def nf_fetch_params( INDEX2 = None UIDX2 = None UIDX2NAME = None - IDX2 = MAPCONF.get("INDEX2", MAPCONF[MAPPERENV].get("INDEX2")) + IDX2 = MAPCONF.get("INDEX2", mapper_dict.get("INDEX2")) if IDX2: INDEX2 = IDX2 UIDX2 = f"{REFDIR}/INDICES/{MAPPERENV}_{unikey}_bs" @@ -2412,6 +2485,12 @@ def nf_make_pre( ) if works[j] == "QC": + if toolenv == "rustqc": + log.warning( + logid + + "RustQC is post-alignment QC and requires MAPPING in the workflow, skipping preprocess QC" + ) + continue subname = toolenv + "_raw.nf" flowlist.append("QC_RAW") flowlist.append("MULTIQC") @@ -2597,6 +2676,12 @@ def nf_make_pre( ) if subwork == "QC": + if toolenv == "rustqc": + log.warning( + logid + + "RustQC is post-alignment QC and requires MAPPING in the workflow, skipping preprocess QC" + ) + continue subname = toolenv + "_raw.nf" flowlist.append("QC_RAW") elif subwork == "FETCH": @@ -2809,7 +2894,17 @@ def nf_make_sub( ) if works[j] == "QC": - if "TRIMMING" in works: + if toolenv == "rustqc": + # RustQC is post-alignment QC only + if "MAPPING" in works: + flowlist.append("QC_MAPPING") + else: + log.warning( + logid + + "RustQC requires mapped BAM files, skipping QC step as MAPPING is not in the workflow" + ) + continue + elif "TRIMMING" in works: flowlist.append("QC_RAW") flowlist.append("TRIMMING") flowlist.append("QC_TRIMMING") @@ -3021,7 +3116,12 @@ def nf_make_sub( + "(POSTMAPPING.out.postmap.concat(POSTMAPPING.out.postmapuni))\n" ) elif w == "MULTIQC": - if "DEDUPBAM" in flowlist and "QC_TRIMMING" in flowlist: + if "QC_RAW" not in flowlist and "QC_MAPPING" in flowlist: + # RustQC: only BAM-level QC, no FASTQ QC + subjobs.append( + " " * 4 + w + "(QC_MAPPING.out.qc.collect())\n" + ) + elif "DEDUPBAM" in flowlist and "QC_TRIMMING" in flowlist: subjobs.append( " " * 4 + w @@ -3147,7 +3247,17 @@ def nf_make_sub( ) if subwork == "QC": - if "TRIMMING" in subworkflows: + if toolenv == "rustqc": + # RustQC is post-alignment QC only + if "MAPPING" in subworkflows: + flowlist.append("QC_MAPPING") + else: + log.warning( + logid + + "RustQC requires mapped BAM files, skipping QC step as MAPPING is not in the workflow" + ) + continue + elif "TRIMMING" in subworkflows: if "DEDUP" in subworkflows: flowlist.append("QC_RAW") flowlist.append("DEDUP_TRIM") @@ -3322,7 +3432,12 @@ def nf_make_sub( + "(POSTMAPPING.out.postmap.concat(POSTMAPPING.out.postmapuni))\n" ) elif w == "MULTIQC": - if "DEDUPBAM" in flowlist and "QC_TRIMMING" in flowlist: + if "QC_RAW" not in flowlist and "QC_MAPPING" in flowlist: + # RustQC: only BAM-level QC, no FASTQ QC + subjobs.append( + " " * 4 + w + "(QC_MAPPING.out.qc.collect())\n" + ) + elif "DEDUPBAM" in flowlist and "QC_TRIMMING" in flowlist: subjobs.append( " " * 4 + w @@ -3416,345 +3531,16 @@ def nf_make_post( log.debug(f"{logid} COMBINATIONS: {combname}") subwork = postworkflow - if subwork in ["DE", "DEU", "DAS", "DTU"]: - condition = list(combname.keys())[0] - envlist = combname[condition].get("envs") - subconf = mu.NestedDefaultDict() - add = list() - - nfi = os.path.abspath(os.path.join(workflowpath, "header.nf")) - with open(nfi, "r") as nf: - for line in mu.comment_remover(nf.readlines()): - line = re.sub(logfix, "loglevel='" + loglevel + "'", line) - line = re.sub(condapath, 'conda "' + envpath, line) - if "include {" in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - "nfmode", - ) - add.append(line) - add.append("\n\n") - - for i in range(len(envlist)): - flowlist = list() - listoftools, listofconfigs = create_subworkflow( - config, subwork, combname, stage="POST" - ) - - if listoftools is None: - log.error( - logid + "No entry in config fits processing step" + str(subwork) - ) - - sconf = listofconfigs[0] - sconf.pop("PREDEDUP", None) # cleanup - - for c in range(1, len(listofconfigs)): - sconf = mu.merge_dicts(sconf, listofconfigs[c]) - flowlist.append(subwork) - - for a in range(0, len(listoftools)): - tp = list() - subjobs = list() - toolenv, toolbin = map(str, listoftools[a]) - for cond in combname.keys(): - tc = list(cond) - tc.append(toolenv) - sconf[subwork] = mu.merge_dicts( - sconf[subwork], mu.subset_dict(config[subwork], tc) - ) - - if sconf[subwork].get("TOOLS"): - sconf[subwork]["TOOLS"] = mu.sub_dict( - sconf[subwork]["TOOLS"], [toolenv] - ) - - toolenv = toolenv + "_" + subwork - sconf[subwork + "ENV"] = toolenv - sconf[subwork + "BIN"] = toolbin - subsamples = mp.get_samples(sconf) - - log.debug( - logid - + "POSTPROCESS: " - + str(subwork) - + " TOOL: " - + str(toolenv) - ) - - scombo = str(envlist[i]) if envlist[i] != "" else "" - combo = ( - str.join(os.sep, [str(envlist[i]), toolenv]) - if envlist[i] != "" - else toolenv - ) - - subconf.update(sconf) - - subname = toolenv + ".nf" - nfi = os.path.abspath(os.path.join(workflowpath, subname)) - with open(nfi, "r") as nf: - for line in mu.comment_remover(nf.readlines()): - line = re.sub(condapath, 'conda "' + envpath, line) - if "include {" in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - "nfmode", - ) - subjobs.append(line) - subjobs.append("\n\n") - - tp.append( - nf_tool_params( - subsamples[0], - None, - sconf, - subwork, - toolenv, - toolbin, - None, - condition, - ) - ) - - subjobs.append("\n\n" + "workflow {\n") - for w in flowlist: - subjobs.append(" " * 4 + w + "(dummy)\n") - subjobs.append("}\n\n") - - te = toolenv.split("_")[0] if "_" in toolenv else toolenv - nfo = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - subwork, - te, - "subflow.nf", - ] - ), - ) - ) - if writeout: - write_if_different(nfo, "".join(add) + "".join(subjobs)) - - confo = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - subwork, - te, - "subconfig.json", - ] - ), - ) - ) - if writeout: - dump_if_different(confo, subconf) - - tpl = " ".join(tp) - combi = list((str(envlist[i]), toolenv)) - para = nf_fetch_params(confo, condition, combi) - jobs.append([nfo, confo, tpl, para]) - else: - for condition in combname: - envlist = list(combname[condition].get("envs")) - log.debug(logid + f"POSTLISTS:{condition}, {subwork}, {envlist}") - - subconf = mu.NestedDefaultDict() - add = list() - - nfi = os.path.abspath(os.path.join(workflowpath, "header.nf")) - with open(nfi, "r") as nf: - for line in mu.comment_remover(nf.readlines()): - line = re.sub(logfix, "loglevel='" + loglevel + "'", line) - line = re.sub(condapath, 'conda "' + envpath, line) - if "include {" in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - "nfmode", - ) - add.append(line) - add.append("\n\n") - - for i in range(len(envlist)): - envs = envlist[i].split("-") - flowlist = list() - listoftools, listofconfigs = create_subworkflow( - config, subwork, [condition], stage="POST" - ) - - if listoftools is None: - log.error( - logid - + "No entry in config fits processing step" - + str(subwork) - ) - - sconf = listofconfigs[0] - sconf.pop("PREDEDUP", None) # cleanup - - for c in range(1, len(listofconfigs)): - sconf = mu.merge_dicts(sconf, listofconfigs[c]) - flowlist.append(subwork) - - for a in range(0, len(listoftools)): - tp = list() - subjobs = list() - toolenv, toolbin = map(str, listoftools[a]) - - if subwork == "CIRCS": - if toolenv == "ciri2" and "bwa" not in envs: - log.warning( - "CIRI2 needs BWA mapped files, will skip input produced otherwise" - ) - continue - - tc = list(condition) - tc.append(toolenv) - sconf[subwork] = mu.merge_dicts( - sconf[subwork], mu.subset_dict(config[subwork], tc) - ) - - if sconf[subwork].get("TOOLS"): - sconf[subwork]["TOOLS"] = mu.sub_dict( - sconf[subwork]["TOOLS"], [toolenv] - ) - - subsamples = mp.get_samples(sconf) - sconf[subwork + "ENV"] = toolenv + "_" + subwork - sconf[subwork + "BIN"] = toolbin - - log.debug( - logid - + "POSTPROCESS: " - + str(subwork) - + " CONDITION: " - + str(condition) - + " TOOL: " - + str(toolenv) - ) - - scombo = str(envlist[i]) if envlist[i] != "" else "" - combo = ( - str.join(os.sep, [str(envlist[i]), toolenv]) - if envlist[i] != "" - else toolenv - ) - - subconf.update(sconf) - - subname = toolenv + ".nf" - nfi = os.path.abspath(os.path.join(workflowpath, subname)) - with open(nfi, "r") as nf: - for line in mu.comment_remover(nf.readlines()): - line = re.sub(condapath, 'conda "' + envpath, line) - if "include {" in line: - line = fixinclude( - line, - loglevel, - condapath, - envpath, - workflowpath, - logfix, - "nfmode", - ) - subjobs.append(line) - subjobs.append("\n\n") - - tp.append( - nf_tool_params( - subsamples[0], - None, - sconf, - subwork, - toolenv, - toolbin, - None, - condition, - ) - ) - - subjobs.append("\n\n" + "workflow {\n") - for w in flowlist: - subjobs.append(" " * 4 + w + "(dummy)\n") - subjobs.append("}\n\n") - - te = toolenv.split("_")[0] if "_" in toolenv else toolenv - nfo = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - subwork, - te, - "subflow.nf", - ] - ), - ) - ) - if writeout: - write_if_different(nfo, "".join(add) + "".join(subjobs)) - - confo = os.path.abspath( - os.path.join( - subdir, - "_".join( - [ - "_".join(condition), - envlist[i], - subwork, - te, - "subconfig.json", - ] - ), - ) - ) - if writeout: - dump_if_different(confo, subconf) - - tpl = " ".join(tp) - combi = list((str(envlist[i]), toolenv)) - para = nf_fetch_params(confo, condition, combi) - """ - NOTE: Workaround for multi-feature featurecount, we can not run for loops for feature lists in nextflow so we need to rerun the jobs for single features and featuremaps (feature->id). This could break reproducibility for manual runs, could be better to loop at generation of nfo and confo and add feature name to output files, but this is inconsistent with snakemake runs so we choose this as workaround - """ - log.debug(logid + f"PARAMS: {para}") - if para.get("COUNTINGFEATLIST"): - fl = para.pop("COUNTINGFEATLIST").split(",") - il = para.pop("COUNTINGIDLIST").split(",") - for i in range(len(fl)): - para["COUNTINGFEAT"] = fl[i] - para["COUNTINGMAP"] = f"'-t {fl[i]} -g {il[i]}'" - log.debug(logid + f"NEWPARAMS: {para}") - jobs.append([nfo, confo, tpl, para]) - else: - jobs.append([nfo, confo, tpl, para]) - else: - subwork = postworkflow + # Postprocessing runs across all conditions regardless of workflow type + all_conditions = list(combname.keys()) + first_condition = all_conditions[0] + envlist = combname[first_condition].get("envs") + log.debug( + logid + f"POSTLISTS (all conditions):{all_conditions}, {subwork}, {envlist}" + ) + subconf = mu.NestedDefaultDict() add = list() + nfi = os.path.abspath(os.path.join(workflowpath, "header.nf")) with open(nfi, "r") as nf: for line in mu.comment_remover(nf.readlines()): @@ -3773,45 +3559,80 @@ def nf_make_post( add.append(line) add.append("\n\n") - for condition in conditions: + for i in range(len(envlist)): + envs = envlist[i].split("-") flowlist = list() - subjobs = list() - subconf = mu.NestedDefaultDict() - + # Collect configs from ALL conditions at once listoftools, listofconfigs = create_subworkflow( - config, subwork, [condition], stage="POST" + config, subwork, all_conditions, stage="POST" ) if listoftools is None: log.error( logid + "No entry in config fits processing step" + str(subwork) ) + continue sconf = listofconfigs[0] sconf.pop("PREDEDUP", None) # cleanup + + # Merge configs from all conditions + for c in range(1, len(listofconfigs)): + if listofconfigs[c] is not None: + sconf = mu.merge_dicts(sconf, listofconfigs[c]) flowlist.append(subwork) for a in range(0, len(listoftools)): tp = list() subjobs = list() - toolenv, toolbin = map(str, listoftools[a]) - if subwork in [ - "DE", - "DEU", - "DAS", - "DTU", - ]: # and toolbin not in ["deseq", "diego"]: # for all other postprocessing tools we have more than one defined subworkflow - toolenv = toolenv + "_" + subwork - log.debug(logid + "toolenv: " + str(toolenv)) + if subwork == "CIRCS": + if toolenv == "ciri2" and "bwa" not in envs: + log.warning( + "CIRI2 needs BWA mapped files, will skip input produced otherwise" + ) + continue + + # Merge subwork config from all conditions + for cond in all_conditions: + tc = list(cond) + tc.append(toolenv) + sconf[subwork] = mu.merge_dicts( + sconf[subwork], mu.subset_dict(config[subwork], tc) + ) + + if sconf[subwork].get("TOOLS"): + sconf[subwork]["TOOLS"] = mu.sub_dict( + sconf[subwork]["TOOLS"], [toolenv] + ) + + # Append subwork suffix for all applicable postprocess workflows + toolenv_orig = toolenv + toolenv = toolenv + "_" + subwork sconf[subwork + "ENV"] = toolenv sconf[subwork + "BIN"] = toolbin subsamples = mp.get_samples(sconf) - scombo = "" - combo = toolenv + log.debug( + logid + + "POSTPROCESS: " + + str(subwork) + + " ALL CONDITIONS: " + + str(all_conditions) + + " TOOL: " + + str(toolenv) + ) + + scombo = str(envlist[i]) if envlist[i] != "" else "" + combo = ( + str.join(os.sep, [str(envlist[i]), toolenv]) + if envlist[i] != "" + else toolenv + ) + subconf.update(sconf) + subname = toolenv + ".nf" nfi = os.path.abspath(os.path.join(workflowpath, subname)) with open(nfi, "r") as nf: @@ -3839,7 +3660,7 @@ def nf_make_post( toolenv, toolbin, None, - condition, + first_condition, ) ) @@ -3848,13 +3669,14 @@ def nf_make_post( subjobs.append(" " * 4 + w + "(dummy)\n") subjobs.append("}\n\n") - te = toolenv.split("_")[0] if "_" in toolenv else toolenv + te = toolenv_orig # use original (pre-suffix) name for filename nfo = os.path.abspath( os.path.join( subdir, "_".join( [ - "_".join(condition), + "allconditions", + envlist[i], subwork, te, "subflow.nf", @@ -3870,7 +3692,8 @@ def nf_make_post( subdir, "_".join( [ - "_".join(condition), + "allconditions", + envlist[i], subwork, te, "subconfig.json", @@ -3882,9 +3705,8 @@ def nf_make_post( dump_if_different(confo, subconf) tpl = " ".join(tp) - # combi = list((str(envlist[i]), toolenv)) - para = nf_fetch_params(confo, condition, None) - + combi = list((str(envlist[i]), toolenv)) + para = nf_fetch_params(confo, first_condition, combi) """ NOTE: Workaround for multi-feature featurecount, we can not run for loops for feature lists in nextflow so we need to rerun the jobs for single features and featuremaps (feature->id). This could break reproducibility for manual runs, could be better to loop at generation of nfo and confo and add feature name to output files, but this is inconsistent with snakemake runs so we choose this as workaround """ @@ -3899,6 +3721,151 @@ def nf_make_post( jobs.append([nfo, confo, tpl, para]) else: jobs.append([nfo, confo, tpl, para]) + else: + subwork = postworkflow + add = list() + nfi = os.path.abspath(os.path.join(workflowpath, "header.nf")) + with open(nfi, "r") as nf: + for line in mu.comment_remover(nf.readlines()): + line = re.sub(logfix, "loglevel='" + loglevel + "'", line) + line = re.sub(condapath, 'conda "' + envpath, line) + if "include {" in line: + line = fixinclude( + line, + loglevel, + condapath, + envpath, + workflowpath, + logfix, + "nfmode", + ) + add.append(line) + add.append("\n\n") + + # Postprocessing runs across all conditions - collect configs from all conditions at once + flowlist = list() + subjobs = list() + subconf = mu.NestedDefaultDict() + + listoftools, listofconfigs = create_subworkflow( + config, subwork, conditions, stage="POST" + ) + + if listoftools is None: + log.error(logid + "No entry in config fits processing step" + str(subwork)) + return jobs + + sconf = listofconfigs[0] + sconf.pop("PREDEDUP", None) # cleanup + + # Merge configs from all conditions + for c in range(1, len(listofconfigs)): + if listofconfigs[c] is not None: + sconf = mu.merge_dicts(sconf, listofconfigs[c]) + flowlist.append(subwork) + + for a in range(0, len(listoftools)): + tp = list() + subjobs = list() + + toolenv, toolbin = map(str, listoftools[a]) + # Append subwork suffix for all postprocessing workflows + toolenv = toolenv + "_" + subwork + + log.debug(logid + "toolenv: " + str(toolenv)) + sconf[subwork + "ENV"] = toolenv + sconf[subwork + "BIN"] = toolbin + subsamples = mp.get_samples(sconf) + + scombo = "" + combo = toolenv + subconf.update(sconf) + subname = toolenv + ".nf" + nfi = os.path.abspath(os.path.join(workflowpath, subname)) + with open(nfi, "r") as nf: + for line in mu.comment_remover(nf.readlines()): + line = re.sub(condapath, 'conda "' + envpath, line) + if "include {" in line: + line = fixinclude( + line, + loglevel, + condapath, + envpath, + workflowpath, + logfix, + "nfmode", + ) + subjobs.append(line) + subjobs.append("\n\n") + + tp.append( + nf_tool_params( + subsamples[0], + None, + sconf, + subwork, + toolenv, + toolbin, + None, + conditions[0], + ) + ) + + subjobs.append("\n\n" + "workflow {\n") + for w in flowlist: + subjobs.append(" " * 4 + w + "(dummy)\n") + subjobs.append("}\n\n") + + te = toolenv.split("_")[0] if "_" in toolenv else toolenv + nfo = os.path.abspath( + os.path.join( + subdir, + "_".join( + [ + "allconditions", + subwork, + te, + "subflow.nf", + ] + ), + ) + ) + if writeout: + write_if_different(nfo, "".join(add) + "".join(subjobs)) + + confo = os.path.abspath( + os.path.join( + subdir, + "_".join( + [ + "allconditions", + subwork, + te, + "subconfig.json", + ] + ), + ) + ) + if writeout: + dump_if_different(confo, subconf) + + tpl = " ".join(tp) + para = nf_fetch_params(confo, conditions[0], None) + + """ + NOTE: Workaround for multi-feature featurecount, we can not run for loops for feature lists in nextflow so we need to rerun the jobs for single features and featuremaps (feature->id). This could break reproducibility for manual runs, could be better to loop at generation of nfo and confo and add feature name to output files, but this is inconsistent with snakemake runs so we choose this as workaround + """ + log.debug(logid + f"PARAMS: {para}") + if para.get("COUNTINGFEATLIST"): + fl = para.pop("COUNTINGFEATLIST").split(",") + il = para.pop("COUNTINGIDLIST").split(",") + for i in range(len(fl)): + para["COUNTINGFEAT"] = fl[i] + para["COUNTINGMAP"] = f"'-t {fl[i]} -g {il[i]}'" + log.debug(logid + f"NEWPARAMS: {para}") + jobs.append([nfo, confo, tpl, para]) + else: + jobs.append([nfo, confo, tpl, para]) return jobs diff --git a/README.md b/README.md index 7b9d4597..b3dfedfc 100644 --- a/README.md +++ b/README.md @@ -105,6 +105,30 @@ respectively. For other workload managers please refer to the documentation of ```Snakemake``` and ```Nextflow```. +## Testing + +### Quick local unit tests (same path as CI) + +From the repository root run: + +``` +pytest -q tests/test_Utils.py +``` + +or use the helper script: + +``` +bash tests/cicd_test.sh +``` + +### Optional local integration smoke test + +To also run the heavier pipeline smoke test from the helper script: + +``` +RUN_INTEGRATION_TESTS=1 bash tests/cicd_test.sh +``` + ## Contribute If you like this project, are missing features, want to contribute or diff --git a/configs/template.json b/configs/template.json index d18e062f..9ab0bb95 100644 --- a/configs/template.json +++ b/configs/template.json @@ -2,7 +2,7 @@ "WORKFLOWS": "", #which workflows do you want to run "MAPPING, QC, DEDUP, TRIMMING, COUNTING, TRACKS, PEAKS, DE, DEU, DAS, DTU, CIRCS" "BINS": "", #where to find customscripts used in the workflow !!!ADVANCED USAGE ONLY!!! "MAXTHREADS": "20", #maximum number of cores to use, make sure this fits your needs - "VERSION": "1.3.0", #needs to be identical to the installed version for reproducibility reasons + "VERSION": "1.4.0", #needs to be identical to the installed version for reproducibility reasons "SETTINGS": { "id": { "condition": { diff --git a/configs/template_clean.json b/configs/template_clean.json index 1a72e49e..825e7335 100644 --- a/configs/template_clean.json +++ b/configs/template_clean.json @@ -2,7 +2,7 @@ "WORKFLOWS": "", "BINS": "", "MAXTHREADS": "20", - "VERSION": "1.3.0", + "VERSION": "1.4.0", "SETTINGS": { "id": { "condition": { diff --git a/configs/tutorial_exhaustive.json b/configs/tutorial_exhaustive.json index 91363b37..e811dec3 100644 --- a/configs/tutorial_exhaustive.json +++ b/configs/tutorial_exhaustive.json @@ -1,6 +1,6 @@ { "WORKFLOWS": "FETCH, QC, TRIMMING, MAPPING, DEDUP, TRACKS, PEAKS, DE, DEU, DAS, DTU, COUNTING", - "VERSION": "1.3.0", + "VERSION": "1.4.0", "BINS": "", "MAXTHREADS": "16", "SETTINGS": { @@ -84,7 +84,8 @@ "QC": { "TOOLS" : { - "fastqc" : "fastqc" + "fastqc" : "fastqc", + "rustqc" : "rustqc" }, "Ecoli": { "KO": { @@ -94,6 +95,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } }, "WT": { @@ -104,6 +112,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } } } diff --git a/configs/tutorial_postprocess.json b/configs/tutorial_postprocess.json index a745c4a0..aa5ba370 100644 --- a/configs/tutorial_postprocess.json +++ b/configs/tutorial_postprocess.json @@ -1,6 +1,6 @@ { "WORKFLOWS": "FETCH, QC, TRIMMING, MAPPING, DEDUP, TRACKS, PEAKS, DE, DEU, COUNTING", - "VERSION": "1.3.0", + "VERSION": "1.4.0", "BINS": "", "MAXTHREADS": "16", "SETTINGS": { @@ -78,7 +78,8 @@ "QC": { "TOOLS" : { - "fastqc" : "fastqc" + "fastqc" : "fastqc", + "rustqc" : "rustqc" }, "Ecoli": { "KO": { @@ -88,6 +89,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } }, "WT": { @@ -98,6 +106,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } } } diff --git a/configs/tutorial_quick.json b/configs/tutorial_quick.json index 2225d2e9..5e712ad4 100644 --- a/configs/tutorial_quick.json +++ b/configs/tutorial_quick.json @@ -2,7 +2,7 @@ "WORKFLOWS": "FETCH,MAPPING", "BINS": "", "MAXTHREADS": "4", - "VERSION": "1.3.0", + "VERSION": "1.4.0", "SETTINGS": { "SIMPLE": { "SAMPLES": [ diff --git a/configs/tutorial_toolmix.json b/configs/tutorial_toolmix.json index 429ceede..e3b080fa 100644 --- a/configs/tutorial_toolmix.json +++ b/configs/tutorial_toolmix.json @@ -1,6 +1,6 @@ { "WORKFLOWS": "FETCH, QC, TRIMMING, MAPPING, DEDUP", - "VERSION": "1.3.0", + "VERSION": "1.4.0", "BINS": "", "MAXTHREADS": "16", "SETTINGS": { @@ -76,7 +76,8 @@ "QC": { "TOOLS" : { - "fastqc" : "fastqc" + "fastqc" : "fastqc", + "rustqc" : "rustqc" }, "Ecoli": { "KO": { @@ -86,6 +87,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } }, "WT": { @@ -96,6 +104,13 @@ "QC": "", #QC options here if any, KO is not required, will be resolved by rules "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules } + }, + "rustqc": { + "OPTIONS": + { + "QC": "", #QC options here if any, KO is not required, will be resolved by rules + "MULTI": "" #MultiQC options here if any, KO is not required, will be resolved by rules + } } } } diff --git a/envs/monsda.yaml b/envs/monsda.yaml index e55801cc..39538a1d 100644 --- a/envs/monsda.yaml +++ b/envs/monsda.yaml @@ -8,7 +8,7 @@ dependencies: - fastapi =0.128.5 - grep >=3.4 - isort =7.0.0 - - monsda =1.3.0 + - monsda =1.4.0 - natsort =8.4.0 - nextflow =25.10.3 - numpy =2.4.2 diff --git a/envs/rustqc.yaml b/envs/rustqc.yaml new file mode 100644 index 00000000..5f268a0f --- /dev/null +++ b/envs/rustqc.yaml @@ -0,0 +1,7 @@ +name: rustqc +channels: +- bioconda +- conda-forge +dependencies: +- rustqc =0.2.1 +- multiqc =1.21 diff --git a/setup.cfg b/setup.cfg index 143bf72f..b3c29c7f 100644 --- a/setup.cfg +++ b/setup.cfg @@ -15,7 +15,7 @@ max-complexity = 18 select = B,C,E,F,W,T4 [tool:pytest] -testpaths=test +testpaths=tests [versioneer] VCS = git diff --git a/tests/cicd_test.sh b/tests/cicd_test.sh index 442625cd..cb765242 100644 --- a/tests/cicd_test.sh +++ b/tests/cicd_test.sh @@ -1,10 +1,27 @@ -#!/usr/bin/env bash -el -conda activate monsda-test +#!/usr/bin/env bash +set -euo pipefail + +# Optional: activate pre-created conda env if available +if command -v conda >/dev/null 2>&1; then + eval "$(conda shell.bash hook)" + if conda env list | awk '{print $1}' | grep -qx monsda-test; then + conda activate monsda-test + fi +fi # Get the directory of the current script SCRIPT_DIR=$(dirname "$(realpath "$0")") echo ${SCRIPT_DIR} cd ${SCRIPT_DIR} -mkdir -p CONDALIB -# Run the MONSDA command with the updated -c parameter -monsda -j 6 -c config_Test.json --directory ${PWD} --use-conda --conda-prefix CONDALIB --save && echo "MONSDA test passed" || echo "MONSDA test failed" \ No newline at end of file + +# Keep local execution aligned with CI +python -m pip install --upgrade pip +python -m pip install -e .. +python -m pip install pytest +pytest -q test_Utils.py + +# Optional integration smoke test (opt-in) +if [[ "${RUN_INTEGRATION_TESTS:-0}" == "1" ]]; then + mkdir -p CONDALIB + monsda -j 6 -c config_Test.json --directory ${PWD} --use-conda --conda-prefix CONDALIB --save +fi \ No newline at end of file diff --git a/tests/conftest.py b/tests/conftest.py new file mode 100644 index 00000000..8b132812 --- /dev/null +++ b/tests/conftest.py @@ -0,0 +1,7 @@ +import sys +from pathlib import Path + +# Ensure repository root is importable during local pytest runs +ROOT = Path(__file__).resolve().parents[1] +if str(ROOT) not in sys.path: + sys.path.insert(0, str(ROOT)) diff --git a/tests/test_Utils.py b/tests/test_Utils.py index 5bf0f62c..0706af65 100755 --- a/tests/test_Utils.py +++ b/tests/test_Utils.py @@ -1,4 +1,11 @@ +import hashlib +import io +import os +import shutil import unittest +from contextlib import redirect_stdout + +import numpy as np from MONSDA.Utils import ( NestedDefaultDict, @@ -49,8 +56,14 @@ def test_NestedDefaultDict(self): self.assertEqual(nested_dict['c']['d'], 0) def test_rmempty(self): - files = ["file1.txt", "file2.txt"] - self.assertEqual(rmempty(files), files) + existing = "file1.txt" + missing = "file2.txt" + with open(existing, "w") as f: + f.write("x") + try: + self.assertEqual(rmempty([existing, missing]), [existing]) + finally: + os.remove(existing) def test_comment_remover(self): input_text = [ @@ -99,8 +112,8 @@ def test_merge_dicts(self): self.assertEqual(merge_dicts(d1, d2), {"a": 1, "b": {"c": 2, "d": 3}, "e": 4}) def test_keysets_from_dict(self): - data_dict = {"a": {"b": {"c": "d"}}} - self.assertEqual(keysets_from_dict(data_dict), [("a", "b", "c")]) + data_dict = {"a": {"b": {"c": {"SAMPLES": "test"}}}} + self.assertEqual(keysets_from_dict(data_dict, "SAMPLES"), [("a", "b", "c")]) def test_keys_from_dict(self): data_dict = {"a": {"b": {"c": "d"}}} @@ -192,9 +205,10 @@ def test_parseseq(self): def test_npprint(self): a = np.array([1.23, 4.56]) expected_output = "1\t1.2300000\n2\t4.5600000\n" - with self.assertLogs(level="INFO") as log: + buf = io.StringIO() + with redirect_stdout(buf): npprint(a) - self.assertIn(expected_output.strip(), log.output[0]) + self.assertIn(expected_output, buf.getvalue()) def test_idfromfa(self): id = "cluster1:chr19.tRNA5-LysCTT(+)" diff --git a/workflows/countreads.nf b/workflows/countreads.nf index 6931e526..b542cfab 100644 --- a/workflows/countreads.nf +++ b/workflows/countreads.nf @@ -147,6 +147,7 @@ process count_mappers{ conda "samtools.yaml" container "oras://jfallmann/monsda:"+"samtools" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -177,6 +178,7 @@ process featurecount{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 diff --git a/workflows/deseq2_DE.nf b/workflows/deseq2_DE.nf index 1ab4ce44..e39a645c 100644 --- a/workflows/deseq2_DE.nf +++ b/workflows/deseq2_DE.nf @@ -22,6 +22,7 @@ process featurecount_deseq{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -29,7 +30,7 @@ process featurecount_deseq{ saveAs: {filename -> if (filename.indexOf(".counts.gz") > 0) "DE/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" else if (filename.indexOf(".counts.summary") > 0) "DE/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" - else if (filename.indexOf(".log") > 0) "LOGS/DE/${SCOMBO}/${file(filename).getSimpleName()}/featurecounts_deseq2_unique.log" + else if (filename.indexOf(".log") > 0) "LOGS/DE/${SCOMBO}/${CONDITION}/${file(filename).getSimpleName()}/featurecounts_deseq2_unique.log" } input: @@ -211,6 +212,7 @@ workflow DE{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/dexseq_DEU.nf b/workflows/dexseq_DEU.nf index c70f6f9a..8c954541 100644 --- a/workflows/dexseq_DEU.nf +++ b/workflows/dexseq_DEU.nf @@ -21,6 +21,7 @@ process prepare_deu_annotation{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -60,6 +61,7 @@ process featurecount_dexseq{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -254,6 +256,7 @@ workflow DEU{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/diego_DAS.nf b/workflows/diego_DAS.nf index f5b5b9da..a2a43a73 100644 --- a/workflows/diego_DAS.nf +++ b/workflows/diego_DAS.nf @@ -24,13 +24,14 @@ process featurecount_diego{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 publishDir "${workflow.workDir}/../" , mode: 'link', saveAs: {filename -> - if (filename.indexOf(".counts.gz") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" - else if (filename.indexOf(".counts.summary") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" + if (filename.indexOf(".counts.gz") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" + else if (filename.indexOf(".counts.summary") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" else if (filename.indexOf(".log") > 0) "LOGS/DAS/${SCOMBO}/${file(filename).getSimpleName()}/featurecounts_diego_unique.log" } @@ -287,6 +288,7 @@ workflow DAS{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/edger_DAS.nf b/workflows/edger_DAS.nf index d51e71a4..9fa62d2b 100644 --- a/workflows/edger_DAS.nf +++ b/workflows/edger_DAS.nf @@ -22,13 +22,14 @@ process featurecount_edger{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 publishDir "${workflow.workDir}/../" , mode: 'link', saveAs: {filename -> if (filename.indexOf(".counts.gz") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" - else if (filename.indexOf(".counts.summary") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" + else if (filename.indexOf(".counts.summary") > 0) "DAS/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" else if (filename.indexOf(".log") > 0) "LOGS/DAS/${SCOMBO}/${file(filename).getSimpleName()}/featurecounts_edger_unique.log" } @@ -212,6 +213,7 @@ workflow DAS{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/edger_DE.nf b/workflows/edger_DE.nf index b5115091..43e0160e 100644 --- a/workflows/edger_DE.nf +++ b/workflows/edger_DE.nf @@ -22,6 +22,7 @@ process featurecount_edger{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -211,6 +212,7 @@ workflow DE{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/edger_DEU.nf b/workflows/edger_DEU.nf index 8bbb55a2..7891d95d 100644 --- a/workflows/edger_DEU.nf +++ b/workflows/edger_DEU.nf @@ -22,13 +22,14 @@ process featurecount_edger{ conda "$COUNTENV"+".yaml" container "oras://jfallmann/monsda:"+"$COUNTENV" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 publishDir "${workflow.workDir}/../" , mode: 'link', saveAs: {filename -> if (filename.indexOf(".counts.gz") > 0) "DEU/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" - else if (filename.indexOf(".counts.summary") > 0) "DEU/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" + else if (filename.indexOf(".counts.summary") > 0) "DEU/${SCOMBO}/Featurecounts/${CONDITION}/${file(filename).getName()}" else if (filename.indexOf(".log") > 0) "LOGS/DEU/${SCOMBO}/${file(filename).getSimpleName()}/featurecounts_edger_unique.log" } @@ -211,6 +212,7 @@ workflow DEU{ if (RUNDEDUP){ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique_dedup.bam" + } }else{ MAPPEDSAMPLES = LONGSAMPLES.collect{ element -> return "${workflow.workDir}/../MAPPED/${COMBO}/"+element+"_mapped_sorted_unique.bam" diff --git a/workflows/mapping.nf b/workflows/mapping.nf index 78752fca..62b06ffa 100644 --- a/workflows/mapping.nf +++ b/workflows/mapping.nf @@ -4,6 +4,7 @@ process sortsam{ conda "samtools.yaml" container "oras://jfallmann/monsda:"+"samtools" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -25,7 +26,7 @@ process sortsam{ //No Maxthread in nextflow def sortmem = Math.ceil(task.memory.giga as double) as int """ - set +o pipefail; mkdir -p TMP; samtools view -H $map|grep -P '^@HD' |pigz -p ${task.cpus} -f > tmphead; samtools view -H $map|grep -P '^@SQ'|sort -t\$'\\t' -k1,1 -k2,2V |pigz -p ${task.cpus} -f >> tmphead ; samtools view -H $map|grep -P '^@RG'|pigz -p ${task.cpus} -f >> tmphead ; samtools view -H $map|grep -P '^@PG'|pigz -p ${task.cpus} -f >> tmphead ; export LC_ALL=C;zcat $map | grep -v \"^@\"|sort --parallel=${task.cpus} -S $sortmem -T TMP -t\$'\\t' -k3,3V -k4,4n - |pigz -p ${task.cpus} -f > tmpfile; cat tmphead tmpfile > $sorted && rm -f tmphead tmpfile ${workflow.workDir}/../MAPPED/${COMBO}/${CONDITION}/$map + set +o pipefail; mkdir -p TMP; samtools view -H $map|grep -P '^@HD' |pigz -p ${task.cpus} -f > tmphead; samtools view -H $map|grep -P '^@SQ'|sort -t\$'\\t' -k1,1 -k2,2V |pigz -p ${task.cpus} -f >> tmphead ; samtools view -H $map|grep -P '^@RG'|pigz -p ${task.cpus} -f >> tmphead ; samtools view -H $map|grep -P '^@PG'|pigz -p ${task.cpus} -f >> tmphead ; export LC_ALL=C;zcat $map | grep -v \"^@\"|sort --parallel=${task.cpus} -S $sortmem -T TMP -t\$'\\t' -k3,3V -k4,4n - |pigz -p ${task.cpus} -f > tmpfile; cat tmphead tmpfile > $sorted && rm -f tmphead tmpfile """ } @@ -33,6 +34,7 @@ process sam2bam{ conda "samtools.yaml" container "oras://jfallmann/monsda:"+"samtools" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -64,6 +66,7 @@ process uniqsam{ conda "samtools.yaml" container "oras://jfallmann/monsda:"+"samtools" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 @@ -98,6 +101,7 @@ process sam2bamuniq{ conda "samtools.yaml" container "oras://jfallmann/monsda:"+"samtools" cpus THREADS + memory 16.GB cache 'lenient' //validExitStatus 0,1 diff --git a/workflows/multiqc.nf b/workflows/multiqc.nf index 18a9eb5e..e82010ce 100644 --- a/workflows/multiqc.nf +++ b/workflows/multiqc.nf @@ -17,7 +17,7 @@ process mqc{ } input: - path others + path others, stageAs: 'mqc_input??/*' //path samples output: @@ -26,7 +26,32 @@ process mqc{ script: """ - touch $others; export LC_ALL=en_US.utf8; export LC_ALL=C.UTF-8; multiqc -f --exclude picard --exclude gatk -k json -z -o \${PWD} . + touch $others + OUT=\${PWD} + LIST=multiqc_inputs.txt + TMP_LIST=multiqc_inputs_unique.txt + BASE_QC_DIR="${workflow.workDir}/../QC" + COMBO_VAL="${COMBO}" + CONDITION_VAL="${CONDITION}" + + for i in $others; do + dirname "\$i" >> "\$LIST" + done + + # If this is a rustqc combo and the corresponding fastqc combo exists, + # include the fastqc output directory in the same MultiQC report. + if [[ "\$COMBO_VAL" == *"rustqc"* ]]; then + FQ_COMBO="\${COMBO_VAL/rustqc/fastqc}" + FQ_DIR="\${BASE_QC_DIR}/\${FQ_COMBO}/\${CONDITION_VAL}" + if [[ -d "\$FQ_DIR" ]]; then + echo "\$FQ_DIR" >> "\$LIST" + fi + fi + + sort -u "\$LIST" > "\$TMP_LIST" + export LC_ALL=en_US.utf8 + export LC_ALL=C.UTF-8 + multiqc -f --exclude picard --exclude gatk -k json -z -s -o "\$OUT" -l "\$TMP_LIST" """ } diff --git a/workflows/multiqc_rustqc.smk b/workflows/multiqc_rustqc.smk new file mode 100644 index 00000000..1ecd2b1f --- /dev/null +++ b/workflows/multiqc_rustqc.smk @@ -0,0 +1,109 @@ +if rundedup: + if paired == 'paired': + if prededup: + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_dedupmapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquededup.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.dedupbam.output.bam, file=samplecond(SAMPLES, config), combo=combo, type=["sorted", "sorted_unique"]) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" + else: + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_dedupmapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquededup.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.dedupbam.output.bam, file=samplecond(SAMPLES, config), combo=combo, type=["sorted", "sorted_unique"]) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" + + else: + if prededup: + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_dedupmapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquededup.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.dedupbam.output.bam, file=samplecond(SAMPLES, config), combo=combo, type=["sorted", "sorted_unique"]) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" + else: + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_dedupmapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquededup.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.dedupbam.output.bam, file=samplecond(SAMPLES, config), combo=combo, type=["sorted", "sorted_unique"]) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" + +else: + if paired == 'paired': + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" + + else: + rule multiqc: + input: expand(rules.rustqc_mapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.rustqc_uniquemapped.output.js, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bam.output.bam, file=samplecond(SAMPLES, config), combo=combo), + expand(rules.sam2bamuniq.output.uniqbam, file=samplecond(SAMPLES, config), combo=combo) + output: html = report("QC/Multi/{combo}/{condition}/multiqc_report.html", category="QC"), + tmp = temp("QC/Multi/{combo}/{condition}/tmp"), + lst = "QC/Multi/{combo}/{condition}/qclist.txt" + log: "LOGS/{combo}/{condition}_multiqc.log" + conda: "rustqc.yaml" + container: "oras://jfallmann/monsda:rustqc" + threads: 1 + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('MULTI', "") + shell: "OUT=$(dirname {output.html}); for i in {input}; do echo $(dirname \"${{i}}\") >> {output.tmp}; done; FQ_COMBO=$(echo {wildcards.combo} | sed 's/rustqc/fastqc/g'); FQ_DIR=QC/${{FQ_COMBO}}/{wildcards.condition}; if [ -d \"${{FQ_DIR}}\" ]; then echo ${{FQ_DIR}} >> {output.tmp}; fi; cat {output.tmp} | sort -u > {output.lst}; export LC_ALL=C.UTF-8; multiqc -f {params.qpara} --exclude picard --exclude gatk -k json -z -s -o ${{OUT}} -l {output.lst} 2> {log}" diff --git a/workflows/rustqc.nf b/workflows/rustqc.nf new file mode 100644 index 00000000..1505b0a3 --- /dev/null +++ b/workflows/rustqc.nf @@ -0,0 +1,56 @@ +QCENV = get_always('QCENV') +QCBIN = get_always('QCBIN') +QCPARAMS = get_always('rustqc_params_QC') ?: '' + +MAPANNO = get_always('MAPPINGANNO') + +// Map MONSDA strandedness to RustQC strandedness +def rustqc_stranded(stranded) { + if (stranded == 'fr') return 'forward' + else if (stranded == 'rf') return 'reverse' + else return 'unstranded' +} + +RUSTQC_STRANDED = rustqc_stranded(STRANDED ?: '') +RUSTQC_PAIRED = (PAIRED == 'paired') ? '-p' : '' + +//RUSTQC on mapped BAMs + +process rustqc_mapped{ + conda "$QCENV"+".yaml" + container "oras://jfallmann/monsda:"+"$QCENV" + cpus THREADS + cache 'lenient' + label 'big_mem' + + publishDir "${workflow.workDir}/../" , mode: 'link', + saveAs: {filename -> + if (filename.indexOf("rustqc_summary.json") > 0) "QC/${COMBO}/${CONDITION}/${file(filename).getParent().getName()}/rustqc_summary.json" + else "QC/${COMBO}/${CONDITION}/${filename}" + } + + input: + path bam + + output: + path "results/**", emit: rustqc_results + path "results/rustqc_summary.json", emit: rustqc_json + + script: + fn = file(bam).getSimpleName() + anno = file("${workflow.workDir}/../${MAPANNO}") + """ + rustqc rna $bam --gtf $anno -t ${task.cpus} $RUSTQC_PAIRED -s $RUSTQC_STRANDED --skip-dup-check -j results/rustqc_summary.json -o results/$fn $QCPARAMS + """ +} + +workflow QC_MAPPING{ + take: collection + + main: + + rustqc_mapped(collection) + + emit: + qc = rustqc_mapped.out.rustqc_results +} diff --git a/workflows/rustqc.smk b/workflows/rustqc.smk new file mode 100644 index 00000000..67fa3e93 --- /dev/null +++ b/workflows/rustqc.smk @@ -0,0 +1,72 @@ +QCBIN, QCENV = env_bin_from_config(config, 'QC') + +# Map MONSDA strandedness to RustQC strandedness +def rustqc_stranded(stranded): + if stranded == 'fr': + return 'forward' + elif stranded == 'rf': + return 'reverse' + else: + return 'unstranded' + +RUSTQC_STRANDED = rustqc_stranded(stranded) +RUSTQC_PAIRED = '-p' if paired == 'paired' else '' + +rule rustqc_mapped: + input: r1 = "MAPPED/{combo}/{file}_mapped_sorted.bam" + output: o1 = directory("QC/{combo}/{file}_mapped_sorted"), + js = "QC/{combo}/{file}_mapped_sorted/rustqc_summary.json" + log: "LOGS/{combo}/{file}_rustqc_mapped.log" + conda: ""+QCENV+".yaml" + container: "oras://jfallmann/monsda:"+QCENV+"" + threads: MAXTHREAD + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('QC', ""), + anno = ANNOTATION, + paired = RUSTQC_PAIRED, + stranded = RUSTQC_STRANDED + shell: "rustqc rna {input.r1} --gtf {params.anno} -t {threads} {params.paired} -s {params.stranded} --skip-dup-check -j {output.js} -o {output.o1} {params.qpara} 2> {log}" + +rule rustqc_uniquemapped: + input: r1 = "MAPPED/{combo}/{file}_mapped_sorted_unique.bam", + r2 = "MAPPED/{combo}/{file}_mapped_sorted_unique.bam.bai" + output: o1 = directory("QC/{combo}/{file}_mapped_sorted_unique"), + js = "QC/{combo}/{file}_mapped_sorted_unique/rustqc_summary.json" + log: "LOGS/{combo}/{file}_rustqc_uniquemapped.log" + conda: ""+QCENV+".yaml" + container: "oras://jfallmann/monsda:"+QCENV+"" + threads: MAXTHREAD + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('QC', ""), + anno = ANNOTATION, + paired = RUSTQC_PAIRED, + stranded = RUSTQC_STRANDED + shell: "rustqc rna {input.r1} --gtf {params.anno} -t {threads} {params.paired} -s {params.stranded} --skip-dup-check -j {output.js} -o {output.o1} {params.qpara} 2> {log}" + +rule rustqc_dedupmapped: + input: r1 = "MAPPED/{combo}/{file}_mapped_sorted_dedup.bam", + r2 = "MAPPED/{combo}/{file}_mapped_sorted_dedup.bam.bai" + output: o1 = directory("QC/{combo}/{file}_mapped_sorted_dedup"), + js = "QC/{combo}/{file}_mapped_sorted_dedup/rustqc_summary.json" + log: "LOGS/{combo}/{file}_rustqc_dedupmapped.log" + conda: ""+QCENV+".yaml" + container: "oras://jfallmann/monsda:"+QCENV+"" + threads: MAXTHREAD + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('QC', ""), + anno = ANNOTATION, + paired = RUSTQC_PAIRED, + stranded = RUSTQC_STRANDED + shell: "rustqc rna {input.r1} --gtf {params.anno} -t {threads} {params.paired} -s {params.stranded} -j {output.js} -o {output.o1} {params.qpara} 2> {log}" + +rule rustqc_uniquededup: + input: r1 = "MAPPED/{combo}/{file}_mapped_sorted_unique_dedup.bam", + r2 = "MAPPED/{combo}/{file}_mapped_sorted_unique_dedup.bam.bai" + output: o1 = directory("QC/{combo}/{file}_mapped_sorted_unique_dedup"), + js = "QC/{combo}/{file}_mapped_sorted_unique_dedup/rustqc_summary.json" + log: "LOGS/{combo}/{file}_rustqc_uniquededup.log" + conda: ""+QCENV+".yaml" + container: "oras://jfallmann/monsda:"+QCENV+"" + threads: MAXTHREAD + params: qpara = lambda wildcards: tool_params(SAMPLES[0], None, config, 'QC', QCENV)['OPTIONS'].get('QC', ""), + anno = ANNOTATION, + paired = RUSTQC_PAIRED, + stranded = RUSTQC_STRANDED + shell: "rustqc rna {input.r1} --gtf {params.anno} -t {threads} {params.paired} -s {params.stranded} -j {output.js} -o {output.o1} {params.qpara} 2> {log}"