Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0

### Added

- Added `provenance_manager` extension point ([#714](https://github.com/alpha-unito/streamflow/pull/714))
- Enforce `CHANGELOG.md` updates in PRs ([#1066](https://github.com/alpha-unito/streamflow/pull/1066))
- Add MPI application tutorial and docs agent skills ([#1042](https://github.com/alpha-unito/streamflow/pull/1042))
- Add fault tolerance paper reference in the documentation ([#1049](https://github.com/alpha-unito/streamflow/pull/1049))
Expand Down
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ zip-safe = true
"streamflow.deployment.connector" = ["schemas/base/*.json", "schemas/*.json"]
"streamflow.deployment.filter" = ["schemas/*.json"]
"streamflow.persistence" = ["schemas/*.sql", "schemas/*.json"]
"streamflow.provenance" = ["schemas/**/*.json"]
"streamflow.recovery" = ["schemas/*.json"]
"streamflow.scheduling" = ["schemas/*.json"]
"streamflow.scheduling.policy" = ["schemas/*.json"]
Expand Down
8 changes: 8 additions & 0 deletions streamflow/ext/plugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from streamflow.core.data import DataManager
from streamflow.core.deployment import BindingFilter, Connector, DeploymentManager
from streamflow.core.persistence import Database
from streamflow.core.provenance import ProvenanceManager
from streamflow.core.recovery import CheckpointManager, FailureManager
from streamflow.core.scheduling import Policy, Scheduler
from streamflow.cwl.requirement.docker import cwl_docker_translator_classes
Expand All @@ -18,6 +19,7 @@
from streamflow.deployment.filter import binding_filter_classes
from streamflow.log_handler import logger
from streamflow.persistence import database_classes
from streamflow.provenance import provenance_manager_classes
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
from streamflow.scheduling import scheduler_classes
from streamflow.scheduling.policy import policy_classes
Expand All @@ -38,6 +40,7 @@
"deployment_manager": deployment_manager_classes,
"failure_manager": failure_manager_classes,
"policy": policy_classes,
"provenance_manager": provenance_manager_classes,
"scheduler": scheduler_classes,
}

Expand Down Expand Up @@ -98,6 +101,11 @@ def register_failure_manager(self, name: str, cls: type[FailureManager]) -> None
def register_policy(self, name: str, cls: type[Policy]) -> None:
self._register(name, cls, "policy")

def register_provenance_manager(
self, name: str, cls: type[ProvenanceManager]
) -> None:
self._register(name, cls, "provenance_manager")

def register_scheduler(self, name: str, cls: type[Scheduler]) -> None:
self._register(name, cls, "scheduler")

Expand Down
21 changes: 7 additions & 14 deletions streamflow/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
from streamflow.parser import parser
from streamflow.persistence import database_classes
from streamflow.persistence.loading_context import DefaultDatabaseLoadingContext
from streamflow.provenance import prov_classes
from streamflow.provenance import provenance_manager_classes
from streamflow.recovery import checkpoint_manager_classes, failure_manager_classes
from streamflow.scheduling import scheduler_classes
from streamflow.version import VERSION
Expand Down Expand Up @@ -114,6 +114,7 @@ async def _async_plugin(args: argparse.Namespace) -> None:


async def _async_prov(args: argparse.Namespace) -> None:
load_extensions()
context = _get_context_from_config(args.file)
try:
db_context = DefaultDatabaseLoadingContext(database=context.database)
Expand All @@ -130,27 +131,19 @@ async def _async_prov(args: argparse.Namespace) -> None:
)
)
)
wf_type = {w.type for w in workflows}
if len(wf_type) != 1:
if len(wf_type := {w.type for w in workflows}) != 1:
raise WorkflowProvenanceException(
"Cannot mix different provenance types in the same file. "
f"Workflow {args.workflow} is associated to the following types: {','.join(wf_type)}"
)
wf_type = next(iter(wf_type))
if args.type not in prov_classes:
if args.type not in provenance_manager_classes:
raise WorkflowProvenanceException(
f"{args.type} provenance format is not supported."
)
elif wf_type not in prov_classes[args.type]:
raise WorkflowProvenanceException(
"{} provenance format is not supported for workflows of type {}.".format(
args.type, wf_type
)
)
else:
provenance_manager: ProvenanceManager = prov_classes[args.type][wf_type](
context, db_context, workflows
)
provenance_manager: ProvenanceManager = provenance_manager_classes[
args.type
](context, db_context, workflows)
await provenance_manager.create_archive(
outdir=args.outdir,
filename=args.name,
Expand Down
5 changes: 2 additions & 3 deletions streamflow/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,9 @@ def __call__(
prov_parser.add_argument(
"--type",
"-t",
default="run_crate",
default="run_crate/cwl",
type=str,
choices=["run_crate"],
help="The type of provenance archive to generate (default: run_crate)",
help="The type of provenance archive to generate (default: run_crate/cwl)",
)

# streamflow report
Expand Down
4 changes: 2 additions & 2 deletions streamflow/provenance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@
from streamflow.core.provenance import ProvenanceManager
from streamflow.provenance.run_crate import CWLRunCrateProvenanceManager

prov_classes: MutableMapping[str, MutableMapping[str, type[ProvenanceManager]]] = {
"run_crate": {"cwl": CWLRunCrateProvenanceManager}
provenance_manager_classes: MutableMapping[str, type[ProvenanceManager]] = {
"run_crate/cwl": CWLRunCrateProvenanceManager
}
11 changes: 11 additions & 0 deletions streamflow/provenance/run_crate.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import uuid
from abc import ABC, abstractmethod
from collections.abc import Container, MutableMapping, MutableSequence
from importlib.resources import files
from typing import Any, cast
from zipfile import ZipFile

Expand Down Expand Up @@ -1471,6 +1472,16 @@ async def get_main_entity(self) -> MutableMapping[str, Any]:
)
return main_entity

@classmethod
def get_schema(cls) -> str:
return (
files(__package__)
.joinpath("schemas")
.joinpath("run_crate")
.joinpath("cwl.json")
.read_text("utf-8")
)

async def get_property_value(
self, name: str, token: Token
) -> MutableMapping[str, Any] | None:
Expand Down
7 changes: 7 additions & 0 deletions streamflow/provenance/schemas/run_crate/cwl.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
{
"$schema": "https://json-schema.org/draft/2019-09/schema",
"$id": "https://streamflow.di.unito.it/schemas/provenance/run_crate/cwl.json",
"type": "object",
"properties": {},
"additionalProperties": false
}
Loading