feat(workflow): add support for different workflows; workflow registry#41
feat(workflow): add support for different workflows; workflow registry#41yashlamba wants to merge 4 commits into
Conversation
fac0afb to
ba4cc90
Compare
mairasalazar
left a comment
There was a problem hiding this comment.
just a couple of small comments
ba4cc90 to
b49b303
Compare
slint
left a comment
There was a problem hiding this comment.
Happy with the registry plumbing, but I have some gripes still with naming and ergonomics, which I think we should get right/discuss.
| workflow_fn: Any | ||
| """Entry-point method of the ``@workflow.defn`` class.""" | ||
|
|
||
| params_model: type[BaseModel] | ||
| """Pydantic model used to validate workflow-specific input params.""" | ||
|
|
||
| request_builder: Callable[[WorkflowContext, BaseModel], BaseModel] |
There was a problem hiding this comment.
minor: I left a comment in a past PR about a possible way to structure workflows in a way that encapsulates some of its parts in the existing workflow class that Temporal sort of "enforces" us to have.
I think this WorkflowSpec class is a good "breakdown" of this, but it feels a lot like we're passing a lot of things around that could just be attached in the Workflow class itself. Some things I'm skeptical about:
- For
workflow_fnwe pass e.g.ExtractMetadata.run: why not just passExtractMetadata, which already has the Temporal decorators (@workflow.defnand@workflow.run) applied to the function? I know that Temporal docs recommend either passing the name of the workflow as a string (e.g."ExtractMetadata") or the "run" func (e.g.ExtractMetadata.run), but this feels redundant... - For
params_model, see my original comment in the PR. I feel like it becomes a very non-ergonomic way of "typing" a workflow's inputs, which are kind of already defined/implied in the signature. - For
request_builder, from what I understand, this is to "bundle" in the input params our workflow ID + tenant ID. If we are to be defining this kind of function for each workflow (and having it basically always look the same), it should just be a (class)method or similar in the Workflow class.
The above for me boil down to that composing these elements into the spec doesn't add value, since we won't ever want to swap them as part of testing or redefinitions of workflows.
We could make better use of the workflow class, and define a mixin or base class which allows us to enforce some of these fields.
| workflow_type=body.workflow_type, | ||
| status=WorkflowStatus.PROCESSING, | ||
| url=body.url, | ||
| params=params.model_dump(mode="json"), |
There was a problem hiding this comment.
minor/question: do you need to serialize here the Pydantic model to JSON? Maybe we need to configure Pydantic support on the Temporal client?
| raise HTTPException(status_code=500, detail="Could not create workflow") | ||
|
|
||
| try: | ||
| workflow_request = spec.request_builder( |
There was a problem hiding this comment.
nit: in this FastAPI request context, naming another thing a "request" feels a bit confusing. Based on the temporal types, this is either a workflow "handle" or maybe just "params" or workflow_args
| pages: list[int] | None = Field(default_factory=lambda: [1, 2]) | ||
|
|
||
|
|
||
| class ExtractMetadataWorkflowRequest(BaseModel): |
There was a problem hiding this comment.
minor: shouldn't this class be a subclass of WorkflowContext?
| class WorkflowSpec: | ||
| """Describes a Temporal workflow that the API can dispatch.""" | ||
|
|
||
| workflow_fn: Any |
There was a problem hiding this comment.
I guess Temporal doesn't expose the type for workflows? Maybe we could make it a bit stricter with Callable and that it need to return Awaitable?
There was a problem hiding this comment.
Let's make it workflow_cls instead, since we register those with Temporal. I'll move and see if that can be typed.
|
|
||
| # Import workflow modules so their register_workflow() calls execute. | ||
| # To add a new workflow type, add an import here. | ||
| import app.workflows.extract_metadata_workflow # noqa: F401 |
There was a problem hiding this comment.
Maybe we could add a register_all_workflows function in registry.py to make workflow registration explicit instead of relying on import side effects?
There was a problem hiding this comment.
We discussed this IRL, Alex and I, and I think I'll do that now.
Also, we don't have support for task queues; we can only ship one kind of worker. I'll try to fix it and add a default q for now.
| @@ -122,9 +129,7 @@ async def create( | |||
| session.commit() | |||
| except SQLAlchemyError: | |||
| pass | |||
| raise HTTPException( | |||
| status_code=500, detail="Could not start extraction workflow" | |||
| ) | |||
| raise HTTPException(status_code=500, detail="Could not start workflow") | |||
There was a problem hiding this comment.
We should add logger so we have proper production logs
This PR is to refactor a few things:
Following the PR, I'll create a separate PR to integrate
ty, since it's time to be a bit stricter with types.