From eaa6c347612f07ef689475305bdb677a096c53a0 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Thu, 26 Mar 2026 14:46:04 +0000 Subject: [PATCH 1/5] feat: add helpers for suspend and retry pipelines --- kratix_sdk/kratix_sdk.py | 31 +++++++++++++++++++++++++++ tests/test_kratix_sdk.py | 45 ++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/kratix_sdk/kratix_sdk.py b/kratix_sdk/kratix_sdk.py index ccd3eec..75debfc 100644 --- a/kratix_sdk/kratix_sdk.py +++ b/kratix_sdk/kratix_sdk.py @@ -174,3 +174,34 @@ def is_configure_action(self) -> bool: def is_delete_action(self) -> bool: """Returns true if the workflow is a delete action.""" return self.workflow_action() == "delete" + + def suspend(self, message: str = "") -> None: + """Suspends the pipeline by writing workflow-control.yaml with suspend: true. + + Kratix will stop further pipeline execution and set the workflow phase to + Suspended. If a message is provided, it will be surfaced in the object's status.""" + data: dict = {"suspend": True} + if message: + data["message"] = message + self._write_workflow_control(data) + + def retry_after(self, duration: str, message: str = "") -> None: + """Configures the pipeline to be retried after a given duration. + + The duration must be a valid Go duration string (e.g. "5m", "1h30m", "300ms"). + Kratix will requeue the pipeline after the specified duration and increment + the attempt counter in the object's status. + + If a message is provided, it will be surfaced in the object's status.""" + if not duration: + raise ValueError("duration must be a non-empty string") + data: dict = {"retryAfter": duration} + if message: + data["message"] = message + self._write_workflow_control(data) + + def _write_workflow_control(self, data: dict) -> None: + path = METADATA_DIR / "workflow-control.yaml" + path.parent.mkdir(parents=True, exist_ok=True) + with path.open("w") as f: + yaml.safe_dump(data, f) diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index e78795c..d9076d1 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -235,6 +235,51 @@ def test_is_delete_action(monkeypatch): assert sdk.is_delete_action() is False +# ---------- Suspend / Retry Tests ---------- + + +def test_suspend_writes_workflow_control(): + sdk = ks.KratixSDK() + + sdk.suspend() + + written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + assert written == {"suspend": True} + + +def test_suspend_with_message(): + sdk = ks.KratixSDK() + + sdk.suspend(message="waiting for dependency") + + written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + assert written == {"suspend": True, "message": "waiting for dependency"} + + +def test_retry_after_writes_workflow_control(): + sdk = ks.KratixSDK() + + sdk.retry_after("5m") + + written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + assert written == {"retryAfter": "5m"} + + +def test_retry_after_with_message(): + sdk = ks.KratixSDK() + + sdk.retry_after("1h30m", message="configmap not found yet") + + written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + assert written == {"retryAfter": "1h30m", "message": "configmap not found yet"} + + +def test_retry_after_empty_duration_raises(): + sdk = ks.KratixSDK() + with pytest.raises(ValueError): + sdk.retry_after("") + + # ---------- Write to Output Tests ---------- From 3b90f9ddd3b5121566931967c6941f0da04f0c2a Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Thu, 26 Mar 2026 15:10:25 +0000 Subject: [PATCH 2/5] chore: fmt --- kratix_sdk/kratix_sdk.py | 4 +++- tests/test_kratix_sdk.py | 16 ++++++++++++---- 2 files changed, 15 insertions(+), 5 deletions(-) diff --git a/kratix_sdk/kratix_sdk.py b/kratix_sdk/kratix_sdk.py index 75debfc..3c9af96 100644 --- a/kratix_sdk/kratix_sdk.py +++ b/kratix_sdk/kratix_sdk.py @@ -179,7 +179,9 @@ def suspend(self, message: str = "") -> None: """Suspends the pipeline by writing workflow-control.yaml with suspend: true. Kratix will stop further pipeline execution and set the workflow phase to - Suspended. If a message is provided, it will be surfaced in the object's status.""" + Suspended. + + If a message is provided, it will be surfaced in the object's status.""" data: dict = {"suspend": True} if message: data["message"] = message diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index d9076d1..f6639d9 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -243,7 +243,9 @@ def test_suspend_writes_workflow_control(): sdk.suspend() - written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) assert written == {"suspend": True} @@ -252,7 +254,9 @@ def test_suspend_with_message(): sdk.suspend(message="waiting for dependency") - written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) assert written == {"suspend": True, "message": "waiting for dependency"} @@ -261,7 +265,9 @@ def test_retry_after_writes_workflow_control(): sdk.retry_after("5m") - written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) assert written == {"retryAfter": "5m"} @@ -270,7 +276,9 @@ def test_retry_after_with_message(): sdk.retry_after("1h30m", message="configmap not found yet") - written = yaml.safe_load((ks.get_metadata_dir() / "workflow-control.yaml").read_text()) + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) assert written == {"retryAfter": "1h30m", "message": "configmap not found yet"} From e36c05e3be64d3173926c905ac74fdd1f791b208 Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 1 Apr 2026 12:00:07 +0100 Subject: [PATCH 3/5] chore: rename, final edition --- kratix_sdk/kratix_sdk.py | 8 ++++---- tests/test_kratix_sdk.py | 20 ++++++++++---------- 2 files changed, 14 insertions(+), 14 deletions(-) diff --git a/kratix_sdk/kratix_sdk.py b/kratix_sdk/kratix_sdk.py index 3c9af96..42fcf4b 100644 --- a/kratix_sdk/kratix_sdk.py +++ b/kratix_sdk/kratix_sdk.py @@ -175,8 +175,8 @@ def is_delete_action(self) -> bool: """Returns true if the workflow is a delete action.""" return self.workflow_action() == "delete" - def suspend(self, message: str = "") -> None: - """Suspends the pipeline by writing workflow-control.yaml with suspend: true. + def write_suspend(self, message: str = "") -> None: + """Writes workflow-control.yaml with suspend: true. Kratix will stop further pipeline execution and set the workflow phase to Suspended. @@ -187,8 +187,8 @@ def suspend(self, message: str = "") -> None: data["message"] = message self._write_workflow_control(data) - def retry_after(self, duration: str, message: str = "") -> None: - """Configures the pipeline to be retried after a given duration. + def write_retry_after(self, duration: str, message: str = "") -> None: + """Writes workflow-control.yaml with retryAfter set to the given duration. The duration must be a valid Go duration string (e.g. "5m", "1h30m", "300ms"). Kratix will requeue the pipeline after the specified duration and increment diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index f6639d9..f7e436e 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -238,10 +238,10 @@ def test_is_delete_action(monkeypatch): # ---------- Suspend / Retry Tests ---------- -def test_suspend_writes_workflow_control(): +def test_write_suspend_writes_workflow_control(): sdk = ks.KratixSDK() - sdk.suspend() + sdk.write_suspend() written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() @@ -249,10 +249,10 @@ def test_suspend_writes_workflow_control(): assert written == {"suspend": True} -def test_suspend_with_message(): +def test_write_suspend_with_message(): sdk = ks.KratixSDK() - sdk.suspend(message="waiting for dependency") + sdk.write_suspend(message="waiting for dependency") written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() @@ -260,10 +260,10 @@ def test_suspend_with_message(): assert written == {"suspend": True, "message": "waiting for dependency"} -def test_retry_after_writes_workflow_control(): +def test_write_retry_after_writes_workflow_control(): sdk = ks.KratixSDK() - sdk.retry_after("5m") + sdk.write_retry_after("5m") written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() @@ -271,10 +271,10 @@ def test_retry_after_writes_workflow_control(): assert written == {"retryAfter": "5m"} -def test_retry_after_with_message(): +def test_write_retry_after_with_message(): sdk = ks.KratixSDK() - sdk.retry_after("1h30m", message="configmap not found yet") + sdk.write_retry_after("1h30m", message="configmap not found yet") written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() @@ -282,10 +282,10 @@ def test_retry_after_with_message(): assert written == {"retryAfter": "1h30m", "message": "configmap not found yet"} -def test_retry_after_empty_duration_raises(): +def test_write_retry_after_empty_duration_raises(): sdk = ks.KratixSDK() with pytest.raises(ValueError): - sdk.retry_after("") + sdk.write_retry_after("") # ---------- Write to Output Tests ---------- From db8adb61d580d1cf67f4aba3add021998ec4656c Mon Sep 17 00:00:00 2001 From: Chunyi Lyu Date: Wed, 1 Apr 2026 14:35:07 +0100 Subject: [PATCH 4/5] chore: make retryafter input into a python timedelta --- kratix_sdk/__init__.py | 2 ++ kratix_sdk/kratix_sdk.py | 34 +++++++++++++++++++++++++++++----- tests/test_kratix_sdk.py | 37 ++++++++++++++++++++++++++++++------- 3 files changed, 61 insertions(+), 12 deletions(-) diff --git a/kratix_sdk/__init__.py b/kratix_sdk/__init__.py index 935df7c..db326cc 100644 --- a/kratix_sdk/__init__.py +++ b/kratix_sdk/__init__.py @@ -8,6 +8,7 @@ set_input_dir, set_metadata_dir, set_output_dir, + timedelta_to_go_duration, ) from .promise import Promise from .resource import Resource @@ -34,5 +35,6 @@ "get_output_dir", "get_metadata_dir", "set_metadata_dir", + "timedelta_to_go_duration", "__version__", ] diff --git a/kratix_sdk/kratix_sdk.py b/kratix_sdk/kratix_sdk.py index 42fcf4b..ee14bc1 100644 --- a/kratix_sdk/kratix_sdk.py +++ b/kratix_sdk/kratix_sdk.py @@ -1,4 +1,5 @@ import os +from datetime import timedelta from pathlib import Path import yaml @@ -42,6 +43,32 @@ def set_metadata_dir(path: Path | str) -> None: METADATA_DIR = Path(path) +def timedelta_to_go_duration(td: timedelta) -> str: + """Converts a Python timedelta to a Go duration string (e.g. "1h30m5s300ms").""" + total_microseconds = (td.days * 86400 + td.seconds) * 1_000_000 + td.microseconds + if total_microseconds <= 0: + raise ValueError("duration must be positive") + + hours, remainder = divmod(total_microseconds, 3_600_000_000) + minutes, remainder = divmod(remainder, 60_000_000) + seconds, remainder = divmod(remainder, 1_000_000) + milliseconds, microseconds = divmod(remainder, 1_000) + + parts = [] + if hours: + parts.append(f"{hours}h") + if minutes: + parts.append(f"{minutes}m") + if seconds: + parts.append(f"{seconds}s") + if milliseconds: + parts.append(f"{milliseconds}ms") + if microseconds: + parts.append(f"{microseconds}us") + + return "".join(parts) + + class KratixSDK: def read_resource_input(self) -> Resource: """Reads the file in /kratix/input/object.yaml and returns a Resource. @@ -187,17 +214,14 @@ def write_suspend(self, message: str = "") -> None: data["message"] = message self._write_workflow_control(data) - def write_retry_after(self, duration: str, message: str = "") -> None: + def write_retry_after(self, duration: timedelta, message: str = "") -> None: """Writes workflow-control.yaml with retryAfter set to the given duration. - The duration must be a valid Go duration string (e.g. "5m", "1h30m", "300ms"). Kratix will requeue the pipeline after the specified duration and increment the attempt counter in the object's status. If a message is provided, it will be surfaced in the object's status.""" - if not duration: - raise ValueError("duration must be a non-empty string") - data: dict = {"retryAfter": duration} + data: dict = {"retryAfter": timedelta_to_go_duration(duration)} if message: data["message"] = message self._write_workflow_control(data) diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index f7e436e..e669fdf 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -1,3 +1,4 @@ +from datetime import timedelta from pathlib import Path import pytest @@ -260,32 +261,54 @@ def test_write_suspend_with_message(): assert written == {"suspend": True, "message": "waiting for dependency"} -def test_write_retry_after_writes_workflow_control(): +def test_write_retry_after_minutes(): sdk = ks.KratixSDK() - sdk.write_retry_after("5m") + sdk.write_retry_after(timedelta(minutes=5, seconds=125)) written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() ) - assert written == {"retryAfter": "5m"} + assert written == {"retryAfter": "7m5s"} + + +def test_write_retry_after_days(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(days=1, hours=3, minutes=66)) + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "28h6m"} + + +def test_write_retry_after_hours(): + sdk = ks.KratixSDK() + + sdk.write_retry_after(timedelta(hours=3, seconds=65)) + + written = yaml.safe_load( + (ks.get_metadata_dir() / "workflow-control.yaml").read_text() + ) + assert written == {"retryAfter": "3h1m5s"} def test_write_retry_after_with_message(): sdk = ks.KratixSDK() - sdk.write_retry_after("1h30m", message="configmap not found yet") + sdk.write_retry_after(timedelta(hours=1, minutes=30), message="configmap not found") written = yaml.safe_load( (ks.get_metadata_dir() / "workflow-control.yaml").read_text() ) - assert written == {"retryAfter": "1h30m", "message": "configmap not found yet"} + assert written == {"retryAfter": "1h30m", "message": "configmap not found"} -def test_write_retry_after_empty_duration_raises(): +def test_write_retry_after_zero_duration_raises(): sdk = ks.KratixSDK() with pytest.raises(ValueError): - sdk.write_retry_after("") + sdk.write_retry_after(timedelta(0)) # ---------- Write to Output Tests ---------- From 7d9dfaa2e93e8d27b3068c1e2b69ab4a0faeaaad Mon Sep 17 00:00:00 2001 From: Shane Dowling Date: Wed, 1 Apr 2026 14:51:27 +0100 Subject: [PATCH 5/5] chore: adding table tests for python to go duration conversion --- tests/test_kratix_sdk.py | 37 +++++++++++++++++++++++++++++++++++++ 1 file changed, 37 insertions(+) diff --git a/tests/test_kratix_sdk.py b/tests/test_kratix_sdk.py index e669fdf..ac73b51 100644 --- a/tests/test_kratix_sdk.py +++ b/tests/test_kratix_sdk.py @@ -236,6 +236,43 @@ def test_is_delete_action(monkeypatch): assert sdk.is_delete_action() is False +# ---------- timedelta_to_go_duration Tests ---------- + + +@pytest.mark.parametrize( + "td,expected", + [ + (timedelta(hours=1, minutes=30, seconds=5, milliseconds=300), "1h30m5s300ms"), + (timedelta(hours=1), "1h"), + (timedelta(minutes=30), "30m"), + (timedelta(seconds=5), "5s"), + (timedelta(milliseconds=300), "300ms"), + (timedelta(microseconds=500), "500us"), + (timedelta(days=1), "24h"), + (timedelta(hours=2, microseconds=1), "2h1us"), + (timedelta(milliseconds=1, microseconds=500), "1ms500us"), + ( + timedelta( + hours=1, minutes=30, seconds=5, milliseconds=300, microseconds=123 + ), + "1h30m5s300ms123us", + ), + ], +) +def test_timedelta_to_go_duration(td, expected): + assert ks.timedelta_to_go_duration(td) == expected + + +def test_timedelta_to_go_duration_raises_on_zero(): + with pytest.raises(ValueError): + ks.timedelta_to_go_duration(timedelta(seconds=0)) + + +def test_timedelta_to_go_duration_raises_on_negative(): + with pytest.raises(ValueError): + ks.timedelta_to_go_duration(timedelta(seconds=-1)) + + # ---------- Suspend / Retry Tests ----------