Skip to content
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
.idea/
.nox/
__pycache__
*.egg-info/
bfabric/scripts/query_result.txt
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
from __future__ import annotations

from typing import TYPE_CHECKING, assert_never

import polars as pl

if TYPE_CHECKING:
from pathlib import Path

from bfabric_app_runner.specs.outputs.annotations import BfabricOutputDataset, IncludeDatasetRef, IncludeResourceRef


def _validate_table_schema(df: pl.DataFrame) -> None:
"""Validate that required columns exist and have correct types."""
if "Resource" not in df.columns:
raise ValueError("Missing required column: Resource")

if not df["Resource"].dtype.is_integer():
raise ValueError(f"Column 'Resource' must be integer type, got {df['Resource'].dtype}")

if df["Resource"].null_count() > 0:
raise ValueError("Column 'Resource' cannot contain null values")

if "Anchor" in df.columns and df["Anchor"].dtype not in (pl.Null, pl.String):

Check warning on line 24 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Expression will always evaluate to True since the types "DataType" and "type[Null] | type[String]" have no overlap (reportUnnecessaryContains)
raise ValueError(f"Column 'Anchor' must be String type, got {df['Anchor'].dtype}")


def _load_table(ref: IncludeDatasetRef) -> pl.DataFrame:
format = ref.get_format()

Check warning on line 29 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "get_format" is partially unknown   Type of "get_format" is "() -> Unknown" (reportUnknownMemberType)

Check warning on line 29 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "format" is unknown (reportUnknownVariableType)
# TODO maybe this should be a generic function somewhere, it's duplicated with input specs probably!
match format:
case "csv":
df = pl.read_csv(ref.local_path)
case "tsv":
df = pl.read_csv(ref.local_path, separator="\t")
case "parquet":
df = pl.read_parquet(ref.local_path)
case _:

Check warning on line 38 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type captured by wildcard pattern is unknown (reportUnknownVariableType)
assert_never(format)

Check warning on line 39 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Argument type is unknown   Argument corresponds to parameter "arg" in function "assert_never" (reportUnknownArgumentType)
_validate_table_schema(df)
return df


def _get_resource_row(resource: IncludeResourceRef, resource_mapping: dict[Path, int]) -> dict[str, int | str | None]:
# TODO check if accessing store_entry_path like this is robust enough (see comment below)
resource_id = resource_mapping[resource.store_entry_path]
return {"Resource": resource_id, "Anchor": resource.anchor, **resource.metadata}


# TODO the resource mapping has some challenges if it's not a model regarding mis-use non-canonical paths,
# e.g. they should never start with `/` but this could also be validated in the originating model


def generate_output_table(config: BfabricOutputDataset, resource_mapping: dict[Path, int]) -> pl.DataFrame:
"""Generates the output table contents for the specified output dataset configuration.

:param config: the output dataset configuration
:param resource_mapping: a mapping of store_entry_path to resource_id of the resources which were created
:return: the output table contents
"""
# read the input tables
tables_df = pl.concat([_load_table(ref) for ref in config.include_tables], how="diagonal_relaxed")

# generate the rows for the resources
resource_rows = []
for resource in config.include_resources:
resource_rows.append(_get_resource_row(resource, resource_mapping))

Check warning on line 67 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Type of "append" is partially unknown   Type of "append" is "(object: Unknown, /) -> None" (reportUnknownMemberType)
resources_df = pl.from_dicts(resource_rows, strict=False)

Check warning on line 68 in bfabric_app_runner/src/bfabric_app_runner/output_registration/annotation_table.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Argument type is partially unknown   Argument corresponds to parameter "data" in function "from_dicts"   Argument type is "list[Unknown]" (reportUnknownArgumentType)
_validate_table_schema(resources_df)

# TODO the schema handling could maybe be relaxed a tiny bit
# concatenate these two dataframes now
return pl.concat([tables_df, resources_df], how="diagonal_relaxed")
Original file line number Diff line number Diff line change
@@ -1,26 +1,29 @@
from __future__ import annotations

from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, assert_never

import yaml
from bfabric.entities import Resource, Storage, Workunit
from bfabric.experimental.upload_dataset import bfabric_save_csv2dataset
from loguru import logger
from pydantic import BaseModel

from bfabric.entities import Resource
from bfabric.entities import Storage, Workunit
from bfabric.experimental.upload_dataset import bfabric_save_csv2dataset
from bfabric_app_runner.output_registration.annotation_table import generate_output_table
from bfabric_app_runner.specs.outputs.annotations import AnnotationType, BfabricOutputDataset
from bfabric_app_runner.specs.outputs_spec import (
CopyResourceSpec,
UpdateExisting,
OutputsSpec,
SpecType,
SaveDatasetSpec,
SaveLinkSpec,
SpecType,
UpdateExisting,
)
from bfabric_app_runner.util.checksums import md5sum
from bfabric_app_runner.util.scp import scp

if TYPE_CHECKING:
from pathlib import Path

from bfabric import Bfabric
from bfabric.experimental.workunit_definition import WorkunitDefinition

Expand All @@ -37,8 +40,8 @@
client: Bfabric,
workunit_definition: WorkunitDefinition,
resource_id: int | None = None,
) -> None:
"""Registers a file in the workunit."""
) -> int:
"""Registers a file in the workunit and returns the resource ID."""
existing_id = _identify_existing_resource_id(client, spec, workunit_definition)
if resource_id is not None and existing_id is not None and resource_id != existing_id:
raise ValueError(f"Resource id {resource_id} does not match existing resource id {existing_id}")
Expand All @@ -59,7 +62,8 @@
if existing_id is not None:
resource_data["id"] = existing_id

client.save("resource", resource_data)
result = client.save("resource", resource_data)
return result[0]["id"]

Check warning on line 66 in bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Return type is Any (reportAny)


def _identify_existing_resource_id(
Expand Down Expand Up @@ -145,7 +149,7 @@
if existing_link_id is not None:
link_data["id"] = existing_link_id
res = client.save("link", link_data)
logger.info(f"Link {spec.name} saved with id {res['id']} for entity {entity_type} with id {entity_id}")
logger.info(f"Link {spec.name} saved with id {res[0]['id']} for entity {entity_type} with id {entity_id}")


def find_default_resource_id(workunit_definition: WorkunitDefinition, client: Bfabric) -> int | None:
Expand All @@ -160,20 +164,29 @@
return None


class RegisterAllOutputs(BaseModel):
resources_mapping: dict[Path, int] = {}
"""This maps the store_entry_path to the ids of the resources which were created or updated"""


def register_all(
client: Bfabric,
workunit_definition: WorkunitDefinition,
specs_list: list[SpecType],
ssh_user: str | None,
reuse_default_resource: bool,
force_storage: Path | None,
) -> None:
) -> RegisterAllOutputs:
"""Registers all the output specs to the workunit."""
default_resource_was_reused = not reuse_default_resource

storage = _get_storage(client, force_storage, specs_list, workunit_definition)
logger.info(f"Using storage: {storage}")

# TODO maybe the naming "created" is a bit confusing, because it could have existed beforehand
# created_resources_mapping: dict[Path, int] = {}
outputs = RegisterAllOutputs()

for spec in specs_list:
logger.debug(f"Registering {spec}")
if isinstance(spec, CopyResourceSpec):
Expand All @@ -188,7 +201,8 @@
default_resource_was_reused = True
else:
resource_id = None
register_file_in_workunit(

outputs.resources_mapping[spec.store_entry_path] = register_file_in_workunit(
spec,
client=client,
workunit_definition=workunit_definition,
Expand All @@ -199,7 +213,13 @@
elif isinstance(spec, SaveLinkSpec):
_save_link(spec, client, workunit_definition=workunit_definition)
else:
raise ValueError(f"Unknown spec type: {type(spec)}")
assert_never(type(spec))

return outputs

# TODO register the dataset... now this gets tricky, how should we handle the case when there is existing dataset
# - it could be modifiable or it could not be modifiable anymore
# -> recompute and then have a logic for handling


def _get_storage(
Expand All @@ -215,6 +235,22 @@
return None


def _save_annotations(outputs: RegisterAllOutputs, annotations: list[AnnotationType], client: Bfabric) -> None:

Check warning on line 238 in bfabric_app_runner/src/bfabric_app_runner/output_registration/register.py

View workflow job for this annotation

GitHub Actions / Unit Tests

"client" is not accessed (reportUnusedParameter)
for annotation in annotations:
if isinstance(annotation, BfabricOutputDataset):
output_df = generate_output_table(config=annotation, resource_mapping=outputs.resources_mapping)

# TODO we should also check if there is id already
existing_dataset = None
# TODO and now we need to handle this maybe with the old logic of save/update from savedataset
# TODO but it will need some logic for handling the existing dataset already
# TODO the final step will be attaching this dataset to the workunit

_ = output_df
_ = existing_dataset
raise NotImplementedError


def register_outputs(
outputs_yaml: Path,
workunit_definition: WorkunitDefinition,
Expand All @@ -224,12 +260,16 @@
force_storage: Path | None,
) -> None:
"""Registers outputs to the workunit."""
specs_list = OutputsSpec.read_yaml(outputs_yaml)
register_all(
with outputs_yaml.open("r") as file:
spec = OutputsSpec.model_validate(yaml.safe_load(file))

outputs = register_all(
client=client,
workunit_definition=workunit_definition,
specs_list=specs_list,
specs_list=spec.outputs,
ssh_user=ssh_user,
reuse_default_resource=reuse_default_resource,
force_storage=force_storage,
)
# TODO and here we create the dataset mapping
_save_annotations(outputs, spec.annotations, client=client)
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ class StaticFileSpec(BaseModel):
type: Literal["static_file"] = "static_file"

content: str | bytes

"""The text or binary content to write."""
filename: str
"""The target filename to write to."""
Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
from __future__ import annotations

Check failure on line 1 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Cycle detected in import chain /home/runner/work/bfabricPy/bfabricPy/bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py /home/runner/work/bfabricPy/bfabricPy/bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py (reportImportCycles)

import typing
from pathlib import Path
from typing import ClassVar, Literal

from pydantic import BaseModel, model_validator

from bfabric_app_runner.specs.outputs_spec import UpdateExisting


class IncludeDatasetRef(BaseModel):
Formats: ClassVar = Literal["csv", "tsv", "parquet"]

local_path: Path
format: Formats | None = None

Check failure on line 16 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Variable not allowed in type expression (reportInvalidTypeForm)

# TODO decide if this is the correct place or it should be a level higher
update_existing: UpdateExisting = UpdateExisting.IF_EXISTS

def get_format(self) -> Formats:

Check failure on line 21 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs/annotations.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Variable not allowed in type expression (reportInvalidTypeForm)
"""Returns the format inferring the type from the filename if not specified explicitly."""
if self.format is not None:
return self.format
return self.local_path.suffix.removeprefix(".")

@model_validator(mode="after")
def _check_format_or_correct_suffix(self) -> IncludeDatasetRef:
allowed_formats = typing.get_args(self.Formats)
if self.format is None and self.local_path.suffix.removeprefix(".") not in allowed_formats:
msg = f"When format is not specified, the file extension must be one of {allowed_formats}"
raise ValueError(msg)
return self


class IncludeResourceRef(BaseModel):
store_entry_path: Path
# TODO None vs empty string
anchor: str | None = None
metadata: dict[str, str] = {}


class BfabricOutputDataset(BaseModel):
# TODO since there is only one output annotation, we cannot set the default value yet, because
# adding more types later would be a breaking change otherwise.
# type: Literal["bfabric_output_dataset"] = "bfabric_output_dataset"
type: Literal["bfabric_output_dataset"]
include_tables: list[IncludeDatasetRef]
include_resources: list[IncludeResourceRef]


AnnotationType = BfabricOutputDataset
29 changes: 24 additions & 5 deletions bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,15 @@
from __future__ import annotations

import enum
from collections import Counter
from pathlib import Path # noqa: TCH003
from typing import Literal, Annotated
from typing import Annotated, Literal

import yaml
from pydantic import BaseModel, ConfigDict, Field, model_validator

from bfabric_app_runner.specs.outputs.annotations import AnnotationType


class UpdateExisting(enum.Enum):
NO = "no"
Expand Down Expand Up @@ -45,6 +48,7 @@
invalid_characters: str = ""


# TODO deprecate!
class SaveLinkSpec(BaseModel):
"""Saves a link to the workunit, or, if desired to an arbitrary entity of type entity_type with id entity_id."""

Expand Down Expand Up @@ -74,13 +78,28 @@
class OutputsSpec(BaseModel):
model_config = ConfigDict(extra="forbid")
outputs: list[SpecType]
annotations: list[AnnotationType] = []

@classmethod
def read_yaml(cls, path: Path) -> list[SpecType]:
model = cls.model_validate(yaml.safe_load(path.read_text()))
return model.outputs
# @classmethod
# def read_yaml(cls, path: Path) -> list[SpecType]:
# model = cls.model_validate(yaml.safe_load(path.read_text()))
# return model.outputs

@classmethod
def write_yaml(cls, specs: list[SpecType], path: Path) -> None:
model = cls.model_validate(dict(outputs=specs))
path.write_text(yaml.dump(model.model_dump(mode="json")))

@model_validator(mode="after")
def _no_duplicate_store_entry_paths(self) -> OutputsSpec:
# arguably, this is a bit strict and might be relaxed in the future
# TODO although the main scenario where this could be a problem is when store_folder_path is used and maybe we
# should handle this differently?
resource_specs = list(filter(lambda x: isinstance(x, CopyResourceSpec), self.outputs))
store_entry_paths = [spec.store_entry_path for spec in resource_specs]

Check failure on line 99 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Cannot access attribute "store_entry_path" for class "SaveLinkSpec"   Attribute "store_entry_path" is unknown (reportAttributeAccessIssue)

Check failure on line 99 in bfabric_app_runner/src/bfabric_app_runner/specs/outputs_spec.py

View workflow job for this annotation

GitHub Actions / Unit Tests

Cannot access attribute "store_entry_path" for class "SaveDatasetSpec"   Attribute "store_entry_path" is unknown (reportAttributeAccessIssue)
# check for duplicates
duplicates = [path for path, count in Counter(store_entry_paths).items() if count > 1]
if duplicates:
msg = f"Duplicate store entry paths: {duplicates}"
raise ValueError(msg)
return self
Loading
Loading