Skip to content

feat(workflow): add support for different workflows; workflow registry#41

Open
yashlamba wants to merge 4 commits into
inveniosoftware:mainfrom
yashlamba:worflow-registry
Open

feat(workflow): add support for different workflows; workflow registry#41
yashlamba wants to merge 4 commits into
inveniosoftware:mainfrom
yashlamba:worflow-registry

Conversation

@yashlamba
Copy link
Copy Markdown
Member

@yashlamba yashlamba commented Apr 21, 2026

This PR is to refactor a few things:

  1. Support multiple workflows, which we register on application start.
  2. Param validation for each workflow. We use a workflow "builder" for the request.

Following the PR, I'll create a separate PR to integrate ty, since it's time to be a bit stricter with types.

@yashlamba yashlamba marked this pull request as draft April 21, 2026 12:21
@yashlamba yashlamba force-pushed the worflow-registry branch 4 times, most recently from fac0afb to ba4cc90 Compare April 27, 2026 15:01
Copy link
Copy Markdown
Collaborator

@mairasalazar mairasalazar left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just a couple of small comments

Comment thread app/workflows/registry.py Outdated
Comment thread app/routers/workflows.py Outdated
@yashlamba yashlamba marked this pull request as ready for review April 28, 2026 13:29
@yashlamba yashlamba moved this to In review 🔍 in Sprint Q2 2026 ☀️ Apr 28, 2026
@yashlamba yashlamba requested a review from mairasalazar April 28, 2026 13:35
@yashlamba yashlamba requested a review from slint April 28, 2026 13:35
@yashlamba yashlamba requested a review from ptamarit April 28, 2026 13:35
Copy link
Copy Markdown
Member

@slint slint left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Happy with the registry plumbing, but I have some gripes still with naming and ergonomics, which I think we should get right/discuss.

Comment thread app/workflows/registry.py Outdated
Comment on lines +23 to +29
workflow_fn: Any
"""Entry-point method of the ``@workflow.defn`` class."""

params_model: type[BaseModel]
"""Pydantic model used to validate workflow-specific input params."""

request_builder: Callable[[WorkflowContext, BaseModel], BaseModel]
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: I left a comment in a past PR about a possible way to structure workflows in a way that encapsulates some of its parts in the existing workflow class that Temporal sort of "enforces" us to have.

I think this WorkflowSpec class is a good "breakdown" of this, but it feels a lot like we're passing a lot of things around that could just be attached in the Workflow class itself. Some things I'm skeptical about:

  • For workflow_fn we pass e.g. ExtractMetadata.run: why not just pass ExtractMetadata, which already has the Temporal decorators (@workflow.defn and @workflow.run) applied to the function? I know that Temporal docs recommend either passing the name of the workflow as a string (e.g. "ExtractMetadata") or the "run" func (e.g. ExtractMetadata.run), but this feels redundant...
  • For params_model, see my original comment in the PR. I feel like it becomes a very non-ergonomic way of "typing" a workflow's inputs, which are kind of already defined/implied in the signature.
  • For request_builder, from what I understand, this is to "bundle" in the input params our workflow ID + tenant ID. If we are to be defining this kind of function for each workflow (and having it basically always look the same), it should just be a (class)method or similar in the Workflow class.

The above for me boil down to that composing these elements into the spec doesn't add value, since we won't ever want to swap them as part of testing or redefinitions of workflows.

We could make better use of the workflow class, and define a mixin or base class which allows us to enforce some of these fields.

Comment thread app/routers/workflows.py
workflow_type=body.workflow_type,
status=WorkflowStatus.PROCESSING,
url=body.url,
params=params.model_dump(mode="json"),
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor/question: do you need to serialize here the Pydantic model to JSON? Maybe we need to configure Pydantic support on the Temporal client?

Comment thread app/routers/workflows.py Outdated
raise HTTPException(status_code=500, detail="Could not create workflow")

try:
workflow_request = spec.request_builder(
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: in this FastAPI request context, naming another thing a "request" feels a bit confusing. Based on the temporal types, this is either a workflow "handle" or maybe just "params" or workflow_args

pages: list[int] | None = Field(default_factory=lambda: [1, 2])


class ExtractMetadataWorkflowRequest(BaseModel):
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: shouldn't this class be a subclass of WorkflowContext?

@yashlamba yashlamba requested a review from OliverGeneser May 12, 2026 08:20
Comment thread app/workflows/registry.py
class WorkflowSpec:
"""Describes a Temporal workflow that the API can dispatch."""

workflow_fn: Any
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess Temporal doesn't expose the type for workflows? Maybe we could make it a bit stricter with Callable and that it need to return Awaitable?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's make it workflow_cls instead, since we register those with Temporal. I'll move and see if that can be typed.

Comment thread app/workflows/__init__.py

# Import workflow modules so their register_workflow() calls execute.
# To add a new workflow type, add an import here.
import app.workflows.extract_metadata_workflow # noqa: F401
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could add a register_all_workflows function in registry.py to make workflow registration explicit instead of relying on import side effects?

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We discussed this IRL, Alex and I, and I think I'll do that now.

Also, we don't have support for task queues; we can only ship one kind of worker. I'll try to fix it and add a default q for now.

Comment thread app/routers/workflows.py
Comment on lines 126 to +132
@@ -122,9 +129,7 @@ async def create(
session.commit()
except SQLAlchemyError:
pass
raise HTTPException(
status_code=500, detail="Could not start extraction workflow"
)
raise HTTPException(status_code=500, detail="Could not start workflow")
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should add logger so we have proper production logs

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Coming soon!

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

#40

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

Status: In review 🔍

Development

Successfully merging this pull request may close these issues.

5 participants