Skip to content

Comments

Implement state service on top of etcd.#6902

Draft
EngHabu wants to merge 5 commits intoenghabu/vetfrom
enghabu/state-etcd
Draft

Implement state service on top of etcd.#6902
EngHabu wants to merge 5 commits intoenghabu/vetfrom
enghabu/state-etcd

Conversation

@EngHabu
Copy link
Contributor

@EngHabu EngHabu commented Feb 8, 2026

Signed-off-by: Haytham Abuelfutuh haytham@afutuh.com

Tracking issue

Why are the changes needed?

What changes were proposed in this pull request?

How was this patch tested?

Labels

Please add one or more of the following labels to categorize your PR:

  • added: For new features.
  • changed: For changes in existing functionality.
  • deprecated: For soon-to-be-removed features.
  • removed: For features being removed.
  • fixed: For any bug fixed.
  • security: In case of vulnerabilities

This is important to improve the readability of release notes.

Setup process

Screenshots

Check all the applicable boxes

  • I updated the documentation accordingly.
  • All new and existing tests passed.
  • All commits are signed-off.

Related PRs

Docs link


go s.repo.ActionRepo().WatchStateUpdates(ctx, updates, errs)
// Subscribe to updates
updateCh := s.k8sClient.Subscribe()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we subscribe first then call list API for the first batch of data? Otherwise some data might be missing between the time window?

if err := s.repo.ActionRepo().NotifyStateUpdate(ctx, msg.ActionId); err != nil {
logger.Warnf(ctx, "Failed to send state update notification: %v", err)
// Continue anyway - the update was saved
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think here instead we should send a notification to run service so that the run service can update DB and show latest action status in the UI.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be better to create a new goroutine to subscribe to the state client (like what we did in the state service Watch). Every time there's an update, we can then run UpdateActionState and NotifyStateUpdate

Comment on lines +261 to 271
// Must be same run
if actionID.Run.Org != parentActionID.Run.Org ||
actionID.Run.Project != parentActionID.Run.Project ||
actionID.Run.Domain != parentActionID.Run.Domain ||
actionID.Run.Name != parentActionID.Run.Name {
return false
}

// For now, we'll include all actions in the run
// In production, you'd check the parent relationship
// For now, include all actions in the same run
// A more sophisticated implementation would check the parent-child relationship
return true
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit:

Suggested change
// Must be same run
if actionID.Run.Org != parentActionID.Run.Org ||
actionID.Run.Project != parentActionID.Run.Project ||
actionID.Run.Domain != parentActionID.Run.Domain ||
actionID.Run.Name != parentActionID.Run.Name {
return false
}
// For now, we'll include all actions in the run
// In production, you'd check the parent relationship
// For now, include all actions in the same run
// A more sophisticated implementation would check the parent-child relationship
return true
// Must be same run
// For now, include all actions in the same run
// A more sophisticated implementation would check the parent-child relationship
return actionID.Run.Org == parentActionID.Run.Org &&
actionID.Run.Project == parentActionID.Run.Project &&
actionID.Run.Domain == parentActionID.Run.Domain &&
actionID.Run.Name == parentActionID.Run.Name

// Update action state in database
if err := s.repo.ActionRepo().UpdateActionState(ctx, msg.ActionId, msg.State); err != nil {
// Update TaskAction state in Kubernetes
if err := s.k8sClient.PutState(ctx, msg.ActionId, msg.State); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: An idea that we can consider to implement in another PR. I suggest that we store the updated status in a local cache after k8s CR is updated. The Get/Watch request can retrieve data from cache first to prevent a possible bottleneck in k8s API.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we can directly get from informer cache and do not need to implement this on our own. This can be improved in the future PR

}

// Update state JSON
taskAction.Status.StateJSON = stateJSON
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we check if current state equals to previous state, and skip update if true?

defer c.mu.Unlock()

ch := make(chan *ActionUpdate, c.bufferSize)
c.subscribers[ch] = struct{}{}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The watch action API request was scoped by parent action ID. Should we maintain a {parent action ID: subscribers} map here here? The update should only be notified to subscribers listening on updated action parent ID

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Make sense to me!

return fmt.Errorf("failed to initialize scheme: %w", err)
}
// Create a client.Client from the WithWatch client for services that don't need watch
var regularK8sClient client.Client = k8sClient
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
var regularK8sClient client.Client = k8sClient
var k8sClientWithoutWatch client.Client = k8sClient

nit: make it clear that this k8s client do not have watch support

select {
case ch <- update:
default:
// Channel full, skip (non-blocking)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the channel full, the update can lost. Based on the proto, Watch should guarantee at-least-once

// watch for updates to the state of actions. this api guarantees at-least-once delivery semantics.
rpc Watch(WatchRequest) returns (stream WatchResponse) {}

I think we can just leave a error log for now, add TODO, and can handle this in the future if needed

logger.Infof(ctx, "Kubernetes client initialized for namespace: %s", cfg.Kubernetes.Namespace)

// Create state client (K8s-based, for watching TaskAction CRs)
stateK8sClient := statek8s.NewStateClient(k8sClient, cfg.Kubernetes.Namespace, 100)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Related to https://github.com/flyteorg/flyte/pull/6902/changes#r2845032983

What might be the best default value for buffer size? and should we make this configurable?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants