From c8baca52079276d3beb6ada978611d7490b307af Mon Sep 17 00:00:00 2001 From: Gage Krumbach Date: Fri, 16 Jan 2026 10:42:26 -0600 Subject: [PATCH] Refactor workflow activation logic and improve error handling - Removed redundant workflow activation code from backend session handler. - Enhanced frontend workflow management hook to include retry logic with exponential backoff for activating workflows. - Updated backend to prevent duplicate workflow updates by checking current workflow state before reinitialization. - Improved logging and error handling for better observability during workflow activation. These changes streamline the workflow activation process and enhance the user experience by providing more robust error handling and retry mechanisms. --- components/backend/handlers/sessions.go | 39 ----------- .../hooks/use-workflow-management.ts | 20 ++++-- .../operator/internal/handlers/sessions.go | 3 +- components/runners/claude-code-runner/main.py | 69 +++++++++++-------- 4 files changed, 60 insertions(+), 71 deletions(-) diff --git a/components/backend/handlers/sessions.go b/components/backend/handlers/sessions.go index 8f72bc929..5caf1d3ef 100644 --- a/components/backend/handlers/sessions.go +++ b/components/backend/handlers/sessions.go @@ -1210,45 +1210,6 @@ func SelectWorkflow(c *gin.Context) { branch = "main" } - // Call runner to clone and activate the workflow (if session is running) - status, _ := item.Object["status"].(map[string]interface{}) - phase, _ := status["phase"].(string) - if phase == "Running" { - runnerURL := fmt.Sprintf("http://session-%s.%s.svc.cluster.local:8001/workflow", sessionName, project) - runnerReq := map[string]string{ - "gitUrl": req.GitURL, - "branch": branch, - "path": req.Path, - } - reqBody, _ := json.Marshal(runnerReq) - - log.Printf("Calling runner to activate workflow: %s@%s (path: %s) -> %s", req.GitURL, branch, req.Path, runnerURL) - httpReq, err := http.NewRequestWithContext(c.Request.Context(), "POST", runnerURL, bytes.NewReader(reqBody)) - if err != nil { - log.Printf("Failed to create runner request: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to create runner request"}) - return - } - httpReq.Header.Set("Content-Type", "application/json") - - client := &http.Client{Timeout: 120 * time.Second} // Allow time for clone - resp, err := client.Do(httpReq) - if err != nil { - log.Printf("Failed to call runner to activate workflow: %v", err) - c.JSON(http.StatusInternalServerError, gin.H{"error": "Failed to activate workflow (runner not reachable)"}) - return - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - log.Printf("Runner failed to activate workflow (status %d): %s", resp.StatusCode, string(body)) - c.JSON(resp.StatusCode, gin.H{"error": fmt.Sprintf("Failed to activate workflow: %s", string(body))}) - return - } - log.Printf("Runner successfully activated workflow %s@%s for session %s", req.GitURL, branch, sessionName) - } - // Update activeWorkflow in spec spec, ok := item.Object["spec"].(map[string]interface{}) if !ok { diff --git a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts index 391aeefe5..1bf7eb409 100644 --- a/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts +++ b/components/frontend/src/app/projects/[name]/sessions/[sessionName]/hooks/use-workflow-management.ts @@ -32,7 +32,7 @@ export function useWorkflowManagement({ }, []); // Activate the pending workflow (or a workflow passed directly) - const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string) => { + const activateWorkflow = useCallback(async (workflowToActivate?: WorkflowConfig, currentPhase?: string, retryCount = 0) => { const workflow = workflowToActivate || pendingWorkflow; if (!workflow) return false; @@ -52,7 +52,10 @@ export function useWorkflowManagement({ return true; // Don't return false - we've queued it successfully } - setWorkflowActivating(true); + // Only set loading state on first attempt (not retries) + if (retryCount === 0) { + setWorkflowActivating(true); + } try { // Update CR with workflow configuration @@ -68,6 +71,16 @@ export function useWorkflowManagement({ if (!response.ok) { const errorData = await response.json(); + + // If runner not ready and we haven't retried too many times, retry with backoff + if (errorData.retryable && retryCount < 5) { + const delay = Math.min(1000 * Math.pow(1.5, retryCount), 5000); // Exponential backoff, max 5s + console.log(`Runner not ready, retrying in ${delay}ms (attempt ${retryCount + 1}/5)...`); + await new Promise(resolve => setTimeout(resolve, delay)); + // Retry without resetting loading state + return activateWorkflow(workflow, phase, retryCount + 1); + } + throw new Error(errorData.error || "Failed to update workflow"); } @@ -80,6 +93,7 @@ export function useWorkflowManagement({ onWorkflowActivated?.(); + setWorkflowActivating(false); return true; } catch (error) { console.error("Failed to activate workflow:", error); @@ -87,8 +101,6 @@ export function useWorkflowManagement({ sessionQueue.clearWorkflow(); setWorkflowActivating(false); return false; - } finally { - setWorkflowActivating(false); } }, [pendingWorkflow, projectName, sessionName, sessionPhase, sessionQueue, onWorkflowActivated]); diff --git a/components/operator/internal/handlers/sessions.go b/components/operator/internal/handlers/sessions.go index 28812b3a2..e70040b06 100644 --- a/components/operator/internal/handlers/sessions.go +++ b/components/operator/internal/handlers/sessions.go @@ -1662,9 +1662,10 @@ func reconcileActiveWorkflowWithPatch(sessionNamespace, sessionName string, spec reconciledWorkflowRaw, _, _ := unstructured.NestedMap(status, "reconciledWorkflow") reconciledGitURL, _ := reconciledWorkflowRaw["gitUrl"].(string) reconciledBranch, _ := reconciledWorkflowRaw["branch"].(string) + reconciledPath, _ := reconciledWorkflowRaw["path"].(string) // Detect drift: workflow changed - if reconciledGitURL == gitURL && reconciledBranch == branch { + if reconciledGitURL == gitURL && reconciledBranch == branch && reconciledPath == path { return nil } diff --git a/components/runners/claude-code-runner/main.py b/components/runners/claude-code-runner/main.py index 9bf99ca3e..56701208a 100644 --- a/components/runners/claude-code-runner/main.py +++ b/components/runners/claude-code-runner/main.py @@ -193,6 +193,8 @@ async def auto_execute_initial_prompt(prompt: str, session_id: str): # Track if adapter has been initialized _adapter_initialized = False +# Prevent duplicate workflow updates/greetings from concurrent calls +_workflow_change_lock = asyncio.Lock() @app.post("/") @@ -511,35 +513,48 @@ async def change_workflow(request: Request): raise HTTPException(status_code=503, detail="Adapter not initialized") body = await request.json() - git_url = body.get("gitUrl", "") - branch = body.get("branch", "main") - path = body.get("path", "") + git_url = (body.get("gitUrl") or "").strip() + branch = (body.get("branch") or "main").strip() or "main" + path = (body.get("path") or "").strip() logger.info(f"Workflow change request: {git_url}@{branch} (path: {path})") - - # Clone the workflow repository at runtime - # This is needed because the init container only runs once at pod startup - if git_url: - success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path) - if not success: - logger.warning("Failed to clone workflow, will use default workflow directory") - - # Update environment variables - os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url - os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch - os.environ["ACTIVE_WORKFLOW_PATH"] = path - - # Reset adapter state to force reinitialization on next run - _adapter_initialized = False - adapter._first_run = True - - logger.info("Workflow updated, adapter will reinitialize on next run") - - # Trigger a new run to greet user with workflow context - # This runs in background via backend POST - asyncio.create_task(trigger_workflow_greeting(git_url, branch, path)) - - return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} + + async with _workflow_change_lock: + current_git_url = os.getenv("ACTIVE_WORKFLOW_GIT_URL", "").strip() + current_branch = os.getenv("ACTIVE_WORKFLOW_BRANCH", "main").strip() or "main" + current_path = os.getenv("ACTIVE_WORKFLOW_PATH", "").strip() + + if ( + current_git_url == git_url + and current_branch == branch + and current_path == path + ): + logger.info("Workflow unchanged; skipping reinit and greeting") + return {"message": "Workflow already active", "gitUrl": git_url, "branch": branch, "path": path} + + # Clone the workflow repository at runtime + # This is needed because the init container only runs once at pod startup + if git_url: + success, workflow_path = await clone_workflow_at_runtime(git_url, branch, path) + if not success: + logger.warning("Failed to clone workflow, will use default workflow directory") + + # Update environment variables + os.environ["ACTIVE_WORKFLOW_GIT_URL"] = git_url + os.environ["ACTIVE_WORKFLOW_BRANCH"] = branch + os.environ["ACTIVE_WORKFLOW_PATH"] = path + + # Reset adapter state to force reinitialization on next run + _adapter_initialized = False + adapter._first_run = True + + logger.info("Workflow updated, adapter will reinitialize on next run") + + # Trigger a new run to greet user with workflow context + # This runs in background via backend POST + asyncio.create_task(trigger_workflow_greeting(git_url, branch, path)) + + return {"message": "Workflow updated", "gitUrl": git_url, "branch": branch, "path": path} async def get_default_branch(repo_path: str) -> str: