From d1fc7243bc9bbe51cd940e96d9fcf3209f67826d Mon Sep 17 00:00:00 2001 From: GlassOfWhiskey Date: Mon, 7 Apr 2025 23:50:47 +0200 Subject: [PATCH 1/2] Added `provenance_manager` extension point This commit adds the possibility to upload custom `ProvenanceManager` classes through StreamFlow plugins and to visualize them through the `streamflow ext list` and `streamflow ext show` commands --- pyproject.toml | 1 + streamflow/ext/plugin.py | 6 ++++++ streamflow/main.py | 12 ++++++------ streamflow/provenance/__init__.py | 4 ++-- streamflow/provenance/run_crate.py | 13 ++++++++++++- streamflow/provenance/schemas/run_crate/cwl.json | 7 +++++++ 6 files changed, 34 insertions(+), 9 deletions(-) create mode 100644 streamflow/provenance/schemas/run_crate/cwl.json diff --git a/pyproject.toml b/pyproject.toml index c97fae5f4..20c407487 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -140,6 +140,7 @@ zip-safe = true "streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"] "streamflow.deployment.filter" = ["schemas/*.json"] "streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"] +"streamflow.provenance" = ["schemas/**/*.json"] "streamflow.recovery" = ["schemas/*.json"] "streamflow.scheduling" = ["schemas/*.json"] "streamflow.scheduling.policy" = ["schemas/*.json"] diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index a9f7f2811..20c066b6d 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -8,6 +8,7 @@ from streamflow.core.data import DataManager from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager from streamflow.core.persistence import Database +from streamflow.core.provenance import ProvenanceManager from streamflow.core.recovery import CheckpointManager, FailureManager from streamflow.core.scheduling import Policy, Scheduler from streamflow.cwl.requirement.docker import cwl_docker_translator_classes @@ -18,6 +19,7 @@ from streamflow.deployment.filter import binding_filter_classes from streamflow.log_handler import logger from streamflow.persistence import database_classes +from streamflow.provenance import provenance_manager_classes from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes from streamflow.scheduling import scheduler_classes from streamflow.scheduling.policy import policy_classes @@ -38,6 +40,7 @@ "deployment_manager": deployment_manager_classes, "failure_manager": failure_manager_classes, "policy": policy_classes, + "provenance_manager": provenance_manager_classes, "scheduler": scheduler_classes, } @@ -98,6 +101,9 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]) -> None def register_policy(self, name: str, cls: type[Policy]) -> None: self._register(name, cls, "policy") + def register_provenance_manager(self, name: str, cls: type[ProvenanceManager]) -> None: + self._register(name, cls, "provenance_manager") + def register_scheduler(self, name: str, cls: type[Scheduler]) -> None: self._register(name, cls, "scheduler") diff --git a/streamflow/main.py b/streamflow/main.py index 48f8860f7..99692d752 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -32,7 +32,7 @@ from streamflow.parser import parser from streamflow.persistence import database_classes from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext -from streamflow.provenance import prov_classes +from streamflow.provenance import provenance_manager_classes from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes from streamflow.scheduling import scheduler_classes from streamflow.version import VERSION @@ -137,20 +137,20 @@ async def _async_prov(args: argparse.Namespace) -> None: f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}" ) wf_type = next(iter(wf_type)) - if args.type not in prov_classes: + if args.type not in provenance_manager_classes: raise WorkflowProvenanceException( f"{args.type} provenance format is not supported." ) - elif wf_type not in prov_classes[args.type]: + elif wf_type not in provenance_manager_classes[args.type]: raise WorkflowProvenanceException( "{} provenance format is not supported for workflows of type {}.".format( args.type, wf_type ) ) else: - provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type]( - context, db_context, workflows - ) + provenance_manager: ProvenanceManager = provenance_manager_classes[ + args.type + ][wf_type](context, db_context, workflows) await provenance_manager.create_archive( outdir=args.outdir, filename=args.name, diff --git a/streamflow/provenance/__init__.py b/streamflow/provenance/__init__.py index 420c1337f..4ed62c19f 100644 --- a/streamflow/provenance/__init__.py +++ b/streamflow/provenance/__init__.py @@ -5,6 +5,6 @@ from streamflow.core.provenance import ProvenanceManager from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager -prov_classes: MutableMapping[str, MutableMapping[str, type[ProvenanceManager]]] = { - "run_crate": {"cwl": CWLRunCrateProvenanceManager} +provenance_manager_classes: MutableMapping[str, type[ProvenanceManager]] = { + "run_crate/cwl": CWLRunCrateProvenanceManager } diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index a9e9f1acc..c1c371fd6 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -12,7 +12,8 @@ import uuid from abc import ABC, abstractmethod from collections.abc import Container, MutableMapping, MutableSequence -from typing import Any, cast +from importlib.resources import files +from typing import Any, cast, get_args from zipfile import ZipFile import cwl_utils.parser @@ -1471,6 +1472,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]: ) return main_entity + @classmethod + def get_schema(cls) -> str: + return ( + files(__package__) + .joinpath("schemas") + .joinpath("run_crate") + .joinpath("cwl.json") + .read_text("utf-8") + ) + async def get_property_value( self, name: str, token: Token ) -> MutableMapping[str, Any] | None: diff --git a/streamflow/provenance/schemas/run_crate/cwl.json b/streamflow/provenance/schemas/run_crate/cwl.json new file mode 100644 index 000000000..cf8ab6dfd --- /dev/null +++ b/streamflow/provenance/schemas/run_crate/cwl.json @@ -0,0 +1,7 @@ +{ + "$schema": "https://json-schema.org/draft/2019-09/schema", + "$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json", + "type": "object", + "properties": {}, + "additionalProperties": false +} \ No newline at end of file From 8854d5af5df676b71312f7f5e6c99e4587a33d2f Mon Sep 17 00:00:00 2001 From: LanderOtto Date: Wed, 10 Jun 2026 10:41:14 +0200 Subject: [PATCH 2/2] Update parser --- CHANGELOG.md | 1 + streamflow/ext/plugin.py | 4 +++- streamflow/main.py | 13 +++---------- streamflow/parser.py | 5 ++--- streamflow/provenance/run_crate.py | 2 +- 5 files changed, 10 insertions(+), 15 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index bb54b530d..c0e2521e9 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Added +- Added `provenance_manager` extension point ([#714](https://github.com/alpha-unito/streamflow/pull/714)) - Enforce `CHANGELOG.md` updates in PRs ([#1066](https://github.com/alpha-unito/streamflow/pull/1066)) - Add MPI application tutorial and docs agent skills ([#1042](https://github.com/alpha-unito/streamflow/pull/1042)) - Add fault tolerance paper reference in the documentation ([#1049](https://github.com/alpha-unito/streamflow/pull/1049)) diff --git a/streamflow/ext/plugin.py b/streamflow/ext/plugin.py index 20c066b6d..f8ec008d1 100644 --- a/streamflow/ext/plugin.py +++ b/streamflow/ext/plugin.py @@ -101,7 +101,9 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]) -> None def register_policy(self, name: str, cls: type[Policy]) -> None: self._register(name, cls, "policy") - def register_provenance_manager(self, name: str, cls: type[ProvenanceManager]) -> None: + def register_provenance_manager( + self, name: str, cls: type[ProvenanceManager] + ) -> None: self._register(name, cls, "provenance_manager") def register_scheduler(self, name: str, cls: type[Scheduler]) -> None: diff --git a/streamflow/main.py b/streamflow/main.py index 99692d752..8801bda7d 100644 --- a/streamflow/main.py +++ b/streamflow/main.py @@ -114,6 +114,7 @@ async def _async_plugin(args: argparse.Namespace) -> None: async def _async_prov(args: argparse.Namespace) -> None: + load_extensions() context = _get_context_from_config(args.file) try: db_context = DefaultDatabaseLoadingContext(database=context.database) @@ -130,27 +131,19 @@ async def _async_prov(args: argparse.Namespace) -> None: ) ) ) - wf_type = {w.type for w in workflows} - if len(wf_type) != 1: + if len(wf_type := {w.type for w in workflows}) != 1: raise WorkflowProvenanceException( "Cannot mix different provenance types in the same file. " f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}" ) - wf_type = next(iter(wf_type)) if args.type not in provenance_manager_classes: raise WorkflowProvenanceException( f"{args.type} provenance format is not supported." ) - elif wf_type not in provenance_manager_classes[args.type]: - raise WorkflowProvenanceException( - "{} provenance format is not supported for workflows of type {}.".format( - args.type, wf_type - ) - ) else: provenance_manager: ProvenanceManager = provenance_manager_classes[ args.type - ][wf_type](context, db_context, workflows) + ](context, db_context, workflows) await provenance_manager.create_archive( outdir=args.outdir, filename=args.name, diff --git a/streamflow/parser.py b/streamflow/parser.py index 74b360b0d..72926c002 100644 --- a/streamflow/parser.py +++ b/streamflow/parser.py @@ -166,10 +166,9 @@ def __call__( prov_parser.add_argument( "--type", "-t", - default="run_crate", + default="run_crate/cwl", type=str, - choices=["run_crate"], - help="The type of provenance archive to generate (default: run_crate)", + help="The type of provenance archive to generate (default: run_crate/cwl)", ) # streamflow report diff --git a/streamflow/provenance/run_crate.py b/streamflow/provenance/run_crate.py index c1c371fd6..cf4eb51eb 100644 --- a/streamflow/provenance/run_crate.py +++ b/streamflow/provenance/run_crate.py @@ -13,7 +13,7 @@ from abc import ABC, abstractmethod from collections.abc import Container, MutableMapping, MutableSequence from importlib.resources import files -from typing import Any, cast, get_args +from typing import Any, cast from zipfile import ZipFile import cwl_utils.parser