Implement state service on top of etcd.#6902
Conversation
Signed-off-by: Haytham Abuelfutuh <haytham@afutuh.com>
|
|
||
| go s.repo.ActionRepo().WatchStateUpdates(ctx, updates, errs) | ||
| // Subscribe to updates | ||
| updateCh := s.k8sClient.Subscribe() |
There was a problem hiding this comment.
Should we subscribe first then call list API for the first batch of data? Otherwise some data might be missing between the time window?
| if err := s.repo.ActionRepo().NotifyStateUpdate(ctx, msg.ActionId); err != nil { | ||
| logger.Warnf(ctx, "Failed to send state update notification: %v", err) | ||
| // Continue anyway - the update was saved | ||
| } |
There was a problem hiding this comment.
I think here instead we should send a notification to run service so that the run service can update DB and show latest action status in the UI.
There was a problem hiding this comment.
It would be better to create a new goroutine to subscribe to the state client (like what we did in the state service Watch). Every time there's an update, we can then run UpdateActionState and NotifyStateUpdate
| // Must be same run | ||
| if actionID.Run.Org != parentActionID.Run.Org || | ||
| actionID.Run.Project != parentActionID.Run.Project || | ||
| actionID.Run.Domain != parentActionID.Run.Domain || | ||
| actionID.Run.Name != parentActionID.Run.Name { | ||
| return false | ||
| } | ||
|
|
||
| // For now, we'll include all actions in the run | ||
| // In production, you'd check the parent relationship | ||
| // For now, include all actions in the same run | ||
| // A more sophisticated implementation would check the parent-child relationship | ||
| return true |
There was a problem hiding this comment.
nit:
| // Must be same run | |
| if actionID.Run.Org != parentActionID.Run.Org || | |
| actionID.Run.Project != parentActionID.Run.Project || | |
| actionID.Run.Domain != parentActionID.Run.Domain || | |
| actionID.Run.Name != parentActionID.Run.Name { | |
| return false | |
| } | |
| // For now, we'll include all actions in the run | |
| // In production, you'd check the parent relationship | |
| // For now, include all actions in the same run | |
| // A more sophisticated implementation would check the parent-child relationship | |
| return true | |
| // Must be same run | |
| // For now, include all actions in the same run | |
| // A more sophisticated implementation would check the parent-child relationship | |
| return actionID.Run.Org == parentActionID.Run.Org && | |
| actionID.Run.Project == parentActionID.Run.Project && | |
| actionID.Run.Domain == parentActionID.Run.Domain && | |
| actionID.Run.Name == parentActionID.Run.Name |
| // Update action state in database | ||
| if err := s.repo.ActionRepo().UpdateActionState(ctx, msg.ActionId, msg.State); err != nil { | ||
| // Update TaskAction state in Kubernetes | ||
| if err := s.k8sClient.PutState(ctx, msg.ActionId, msg.State); err != nil { |
There was a problem hiding this comment.
nit: An idea that we can consider to implement in another PR. I suggest that we store the updated status in a local cache after k8s CR is updated. The Get/Watch request can retrieve data from cache first to prevent a possible bottleneck in k8s API.
There was a problem hiding this comment.
I think we can directly get from informer cache and do not need to implement this on our own. This can be improved in the future PR
| } | ||
|
|
||
| // Update state JSON | ||
| taskAction.Status.StateJSON = stateJSON |
There was a problem hiding this comment.
Should we check if current state equals to previous state, and skip update if true?
| defer c.mu.Unlock() | ||
|
|
||
| ch := make(chan *ActionUpdate, c.bufferSize) | ||
| c.subscribers[ch] = struct{}{} |
There was a problem hiding this comment.
The watch action API request was scoped by parent action ID. Should we maintain a {parent action ID: subscribers} map here here? The update should only be notified to subscribers listening on updated action parent ID
| return fmt.Errorf("failed to initialize scheme: %w", err) | ||
| } | ||
| // Create a client.Client from the WithWatch client for services that don't need watch | ||
| var regularK8sClient client.Client = k8sClient |
There was a problem hiding this comment.
| var regularK8sClient client.Client = k8sClient | |
| var k8sClientWithoutWatch client.Client = k8sClient |
nit: make it clear that this k8s client do not have watch support
| select { | ||
| case ch <- update: | ||
| default: | ||
| // Channel full, skip (non-blocking) |
There was a problem hiding this comment.
If the channel full, the update can lost. Based on the proto, Watch should guarantee at-least-once
flyte/flyteidl2/workflow/state_service.proto
Lines 21 to 22 in 78afd14
I think we can just leave a error log for now, add TODO, and can handle this in the future if needed
| logger.Infof(ctx, "Kubernetes client initialized for namespace: %s", cfg.Kubernetes.Namespace) | ||
|
|
||
| // Create state client (K8s-based, for watching TaskAction CRs) | ||
| stateK8sClient := statek8s.NewStateClient(k8sClient, cfg.Kubernetes.Namespace, 100) |
There was a problem hiding this comment.
Related to https://github.com/flyteorg/flyte/pull/6902/changes#r2845032983
What might be the best default value for buffer size? and should we make this configurable?
Signed-off-by: Haytham Abuelfutuh haytham@afutuh.com
Tracking issue
Why are the changes needed?
What changes were proposed in this pull request?
How was this patch tested?
Labels
Please add one or more of the following labels to categorize your PR:
This is important to improve the readability of release notes.
Setup process
Screenshots
Check all the applicable boxes
Related PRs
Docs link
main