Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions kratix_sdk/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
set_input_dir,
set_metadata_dir,
set_output_dir,
timedelta_to_go_duration,
)
from .promise import Promise
from .resource import Resource
Expand All @@ -34,5 +35,6 @@
"get_output_dir",
"get_metadata_dir",
"set_metadata_dir",
"timedelta_to_go_duration",
"__version__",
]
57 changes: 57 additions & 0 deletions kratix_sdk/kratix_sdk.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
from datetime import timedelta
from pathlib import Path

import yaml
Expand Down Expand Up @@ -42,6 +43,32 @@ def set_metadata_dir(path: Path | str) -> None:
METADATA_DIR = Path(path)


def timedelta_to_go_duration(td: timedelta) -> str:
"""Converts a Python timedelta to a Go duration string (e.g. "1h30m5s300ms")."""
total_microseconds = (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds
if total_microseconds <= 0:
raise ValueError("duration must be positive")

hours, remainder = divmod(total_microseconds, 3_600_000_000)
minutes, remainder = divmod(remainder, 60_000_000)
seconds, remainder = divmod(remainder, 1_000_000)
milliseconds, microseconds = divmod(remainder, 1_000)

parts = []
if hours:
parts.append(f"{hours}h")
if minutes:
parts.append(f"{minutes}m")
if seconds:
parts.append(f"{seconds}s")
if milliseconds:
parts.append(f"{milliseconds}ms")
if microseconds:
parts.append(f"{microseconds}us")

return "".join(parts)


class KratixSDK:
def read_resource_input(self) -> Resource:
"""Reads the file in /kratix/input/object.yaml and returns a Resource.
Expand Down Expand Up @@ -174,3 +201,33 @@ def is_configure_action(self) -> bool:
def is_delete_action(self) -> bool:
"""Returns true if the workflow is a delete action."""
return self.workflow_action() == "delete"

def write_suspend(self, message: str = "") -> None:
"""Writes workflow-control.yaml with suspend: true.

Kratix will stop further pipeline execution and set the workflow phase to
Suspended.

If a message is provided, it will be surfaced in the object's status."""
data: dict = {"suspend": True}
if message:
data["message"] = message
self._write_workflow_control(data)

def write_retry_after(self, duration: timedelta, message: str = "") -> None:
"""Writes workflow-control.yaml with retryAfter set to the given duration.

Kratix will requeue the pipeline after the specified duration and increment
the attempt counter in the object's status.

If a message is provided, it will be surfaced in the object's status."""
data: dict = {"retryAfter": timedelta_to_go_duration(duration)}
if message:
data["message"] = message
self._write_workflow_control(data)

def _write_workflow_control(self, data: dict) -> None:
path = METADATA_DIR / "workflow-control.yaml"
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w") as f:
yaml.safe_dump(data, f)
113 changes: 113 additions & 0 deletions tests/test_kratix_sdk.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import timedelta
from pathlib import Path

import pytest
Expand Down Expand Up @@ -235,6 +236,118 @@ def test_is_delete_action(monkeypatch):
assert sdk.is_delete_action() is False


# ---------- timedelta_to_go_duration Tests ----------


@pytest.mark.parametrize(
"td,expected",
[
(timedelta(hours=1, minutes=30, seconds=5, milliseconds=300), "1h30m5s300ms"),
(timedelta(hours=1), "1h"),
(timedelta(minutes=30), "30m"),
(timedelta(seconds=5), "5s"),
(timedelta(milliseconds=300), "300ms"),
(timedelta(microseconds=500), "500us"),
(timedelta(days=1), "24h"),
(timedelta(hours=2, microseconds=1), "2h1us"),
(timedelta(milliseconds=1, microseconds=500), "1ms500us"),
(
timedelta(
hours=1, minutes=30, seconds=5, milliseconds=300, microseconds=123
),
"1h30m5s300ms123us",
),
],
)
def test_timedelta_to_go_duration(td, expected):
assert ks.timedelta_to_go_duration(td) == expected


def test_timedelta_to_go_duration_raises_on_zero():
with pytest.raises(ValueError):
ks.timedelta_to_go_duration(timedelta(seconds=0))


def test_timedelta_to_go_duration_raises_on_negative():
with pytest.raises(ValueError):
ks.timedelta_to_go_duration(timedelta(seconds=-1))


# ---------- Suspend / Retry Tests ----------


def test_write_suspend_writes_workflow_control():
sdk = ks.KratixSDK()

sdk.write_suspend()

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"suspend": True}


def test_write_suspend_with_message():
sdk = ks.KratixSDK()

sdk.write_suspend(message="waiting for dependency")

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"suspend": True, "message": "waiting for dependency"}


def test_write_retry_after_minutes():
sdk = ks.KratixSDK()

sdk.write_retry_after(timedelta(minutes=5, seconds=125))

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"retryAfter": "7m5s"}


def test_write_retry_after_days():
sdk = ks.KratixSDK()

sdk.write_retry_after(timedelta(days=1, hours=3, minutes=66))

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"retryAfter": "28h6m"}


def test_write_retry_after_hours():
sdk = ks.KratixSDK()

sdk.write_retry_after(timedelta(hours=3, seconds=65))

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"retryAfter": "3h1m5s"}


def test_write_retry_after_with_message():
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it worth a table test like so?

from datetime import timedelta
import pytest

def test_timedelta_to_go_duration():
    cases = [
        # (input, expected, description)
        (timedelta(hours=1, minutes=30, seconds=5, milliseconds=300), "1h30m5s300ms", "full combo"),
        (timedelta(hours=1),                                           "1h",           "hours only"),
        (timedelta(minutes=30),                                        "30m",          "minutes only"),
        (timedelta(seconds=5),                                         "5s",           "seconds only"),
        (timedelta(milliseconds=300),                                  "300ms",        "milliseconds only"),
        (timedelta(microseconds=500),                                  "500us",        "microseconds only"),
        (timedelta(days=1),                                            "24h",          "days converted to hours"),
        (timedelta(hours=2, microseconds=1),                           "2h1us",        "hours + microseconds, skips middle units"),
        (timedelta(milliseconds=1, microseconds=500),                  "1ms500us",     "ms and us combo"),
        (timedelta(hours=1, minutes=30, seconds=5, milliseconds=300,
                   microseconds=123),                                  "1h30m5s300ms123us", "all units"),
    ]

    print(f"\n{'Description':<45} {'Expected':<25} {'Got':<25} {'Pass'}")
    print("-" * 100)
    for td, expected, desc in cases:
        result = timedelta_to_go_duration(td)
        passed = "✅" if result == expected else "❌"
        print(f"{desc:<45} {expected:<25} {result:<25} {passed}")

def test_timedelta_to_go_duration_raises():
    with pytest.raises(ValueError):
        timedelta_to_go_duration(timedelta(seconds=0))
    with pytest.raises(ValueError):
        timedelta_to_go_duration(timedelta(seconds=-1))

sdk = ks.KratixSDK()

sdk.write_retry_after(timedelta(hours=1, minutes=30), message="configmap not found")

written = yaml.safe_load(
(ks.get_metadata_dir() / "workflow-control.yaml").read_text()
)
assert written == {"retryAfter": "1h30m", "message": "configmap not found"}


def test_write_retry_after_zero_duration_raises():
sdk = ks.KratixSDK()
with pytest.raises(ValueError):
sdk.write_retry_after(timedelta(0))


# ---------- Write to Output Tests ----------


Expand Down
Loading