From 7bbd0b7e405e5a0879f12bca7522468f7ccc4d80 Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Fri, 16 Jan 2026 19:45:56 -0500 Subject: [PATCH 1/7] Add connection pooling for faktory. --- src/pkg/faktoryRunnerAppendJobLogProcessor.go | 7 +++---- src/pkg/faktorySetOutcomeProcessor.go | 7 +++---- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/src/pkg/faktoryRunnerAppendJobLogProcessor.go b/src/pkg/faktoryRunnerAppendJobLogProcessor.go index 6e97fef..1478369 100644 --- a/src/pkg/faktoryRunnerAppendJobLogProcessor.go +++ b/src/pkg/faktoryRunnerAppendJobLogProcessor.go @@ -11,7 +11,6 @@ import ( ) type FaktoryAppendJobLogProcessor struct { - client *faktory.Client helper faktoryWorker.Helper logger zerolog.Logger jobId opslevel.ID @@ -25,9 +24,7 @@ type FaktoryAppendJobLogProcessor struct { } func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID, maxBytes int, maxTime time.Duration) *FaktoryAppendJobLogProcessor { - client, _ := faktory.Open() return &FaktoryAppendJobLogProcessor{ - client: client, helper: helper, logger: logger, jobId: jobId, @@ -104,7 +101,9 @@ func (s *FaktoryAppendJobLogProcessor) submit() { s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId) } } else { - err := s.client.Push(job) + err := s.helper.With(func(cl *faktory.Client) error { + return cl.Push(job) + }) if err != nil { MetricEnqueueFailed.Inc() s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId) diff --git a/src/pkg/faktorySetOutcomeProcessor.go b/src/pkg/faktorySetOutcomeProcessor.go index d41a84d..16f1476 100644 --- a/src/pkg/faktorySetOutcomeProcessor.go +++ b/src/pkg/faktorySetOutcomeProcessor.go @@ -11,7 +11,6 @@ import ( ) type FaktorySetOutcomeProcessor struct { - client *faktory.Client helper faktoryWorker.Helper logger zerolog.Logger jobId opslevel.ID @@ -20,9 +19,7 @@ type FaktorySetOutcomeProcessor struct { } func NewFaktorySetOutcomeProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID) *FaktorySetOutcomeProcessor { - client, _ := faktory.Open() return &FaktorySetOutcomeProcessor{ - client: client, helper: helper, logger: logger, jobId: jobId, @@ -99,7 +96,9 @@ func (s *FaktorySetOutcomeProcessor) Flush(outcome JobOutcome) { s.logger.Error().Err(err).Msgf("error when reporting outcome '%s' for job '%s'", outcome.Outcome, s.jobId) } } else { - err := s.client.Push(job) + err := s.helper.With(func(cl *faktory.Client) error { + return cl.Push(job) + }) if err != nil { MetricEnqueueFailed.Inc() s.logger.Error().Err(err).Msgf("error when reporting outcome '%s' for job '%s'", outcome.Outcome, s.jobId) From 60da691de8c62ff60ab8cbb89b8581c8d6d40a93 Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Fri, 16 Jan 2026 19:46:18 -0500 Subject: [PATCH 2/7] Add a k8s client singleton to use between goroutines --- src/pkg/k8s.go | 33 ++++++++++++++++++++++++--------- 1 file changed, 24 insertions(+), 9 deletions(-) diff --git a/src/pkg/k8s.go b/src/pkg/k8s.go index 7841a3b..064dc1b 100644 --- a/src/pkg/k8s.go +++ b/src/pkg/k8s.go @@ -6,6 +6,7 @@ import ( "io" "path" "strings" + "sync" "time" "github.com/rs/zerolog/log" @@ -31,6 +32,11 @@ import ( var ( ImageTagVersion string k8sValidated bool + + k8sClientOnce sync.Once + sharedK8sConfig *rest.Config + sharedK8sClient *kubernetes.Clientset + k8sInitError error ) type JobConfig struct { @@ -57,13 +63,23 @@ type JobOutcome struct { OutcomeVariables []opslevel.RunnerJobOutcomeVariable } -func LoadK8SClient() { - // This function is used to ensure we can connect to k8s - // We don't cache the config or clients for goroutine parallel problems - if _, err := GetKubernetesConfig(); err != nil { - cobra.CheckErr(err) +func GetSharedK8sClient() (*rest.Config, *kubernetes.Clientset, error) { + k8sClientOnce.Do(func() { + sharedK8sConfig, k8sInitError = GetKubernetesConfig() + if k8sInitError != nil { + return + } + sharedK8sClient, k8sInitError = kubernetes.NewForConfig(sharedK8sConfig) + }) + if k8sInitError != nil { + return nil, nil, k8sInitError } - if _, err := GetKubernetesClientset(); err != nil { + return sharedK8sConfig, sharedK8sClient, nil +} + +func LoadK8SClient() { + _, _, err := GetSharedK8sClient() + if err != nil { cobra.CheckErr(err) } k8sValidated = true @@ -74,9 +90,8 @@ func NewJobRunner(runnerId string, path string) *JobRunner { // It's ok if this function panics because we wouldn't beable to run jobs anyway LoadK8SClient() } - // We recreate the config & clients here to ensure goroutine parallel problems don't raise their head - config, _ := GetKubernetesConfig() - client, _ := GetKubernetesClientset() + // kubernetes.Clientset is thread-safe and designed to be shared across goroutines + config, client, _ := GetSharedK8sClient() // Already validated by LoadK8SClient pod, err := ReadPodConfig(path) if err != nil { panic(err) From a8c0a959f912b98e933d7e11ac1a775004cb29ed Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Fri, 16 Jan 2026 19:46:44 -0500 Subject: [PATCH 3/7] Add CLAUDE.md --- CLAUDE.md | 89 +++++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 89 insertions(+) create mode 100644 CLAUDE.md diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..7811ff3 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,89 @@ +# CLAUDE.md + +This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. + +## Project Overview + +OpsLevel Runner is a Kubernetes-based job processor for OpsLevel. It polls for jobs from the OpsLevel API (or Faktory queue), spins up Kubernetes pods to execute commands, and streams logs back to OpsLevel. + +## Build and Development Commands + +All commands use [Task](https://taskfile.dev/) and should be run from the repository root: + +```bash +# Initial setup (installs tools, sets up go workspace) +task setup + +# Build the binary (from src/ directory) +cd src && go build + +# Run tests with coverage +task test + +# Run a single test +cd src && go test -v ./pkg -run TestSanitizeLogProcessor + +# Lint and format check +task lint + +# Auto-fix formatting and linting issues +task fix + +# Start local Faktory development environment +task start-faktory +``` + +## Testing Jobs Locally + +Test a job against a local Kubernetes cluster: +```bash +cd src +OPSLEVEL_API_TOKEN=XXX go run main.go test -f job.yaml +# Or from stdin: +cat job.yaml | OPSLEVEL_API_TOKEN=XXX go run main.go test -f - +``` + +## Architecture + +### Entry Points (src/cmd/) +- `root.go` - CLI configuration with Cobra/Viper, initializes logging, Sentry, and K8s client +- `run.go` - Main execution mode with two backends: + - **API mode** (default): Polls OpsLevel GraphQL API for pending jobs, uses worker pool + - **Faktory mode**: Consumes jobs from Faktory queue +- `test.go` - Local job testing against a K8s cluster + +### Core Components (src/pkg/) +- `k8s.go` - `JobRunner` creates K8s pods, ConfigMaps, and PDBs to execute jobs. Each job gets an ephemeral pod with: + - Init container that copies the runner binary + - Job container running the specified image with commands + - ConfigMap mounting custom scripts from job definition +- `api.go` - GraphQL client wrapper for OpsLevel API +- `logs.go` - `LogStreamer` with pluggable `LogProcessor` chain for stdout/stderr processing +- `leaderElection.go` - K8s lease-based leader election for pod scaling + +### Log Processors (src/pkg/) +Log processors form a pipeline that transforms job output: +- `sanitizeLogProcessor.go` - Redacts sensitive variables +- `prefixLogProcessor.go` - Adds timestamps +- `setOutcomeVarLogProcessor.go` - Parses `::set-outcome-var key=value` directives +- `opslevelAppendLogProcessor.go` - Batches and sends logs to OpsLevel API +- `faktoryAppendJobLogProcessor.go` / `faktorySetOutcomeProcessor.go` - Faktory-specific processors + +### Signal Handling (src/signal/) +- Graceful shutdown via context cancellation on SIGINT/SIGTERM + +## Key Configuration + +Environment variables (or CLI flags): +- `OPSLEVEL_API_TOKEN` - Required for API authentication +- `OPSLEVEL_API_URL` - API endpoint (default: https://api.opslevel.com) +- `OPSLEVEL_JOB_POD_NAMESPACE` - K8s namespace for job pods +- `OPSLEVEL_JOB_POD_MAX_WAIT` - Pod startup timeout +- `OPSLEVEL_JOB_POD_MAX_LIFETIME` - Max job duration + +## Dependencies + +- Uses `opslevel-go` client library (available as git submodule in `src/submodules/opslevel-go`) +- K8s client-go for cluster interaction +- Zerolog for structured logging +- Prometheus client for metrics From 2de5c5a6a536ce22f2c11e16dec61e9d70ff170d Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Mon, 19 Jan 2026 09:21:55 -0500 Subject: [PATCH 4/7] Add script for end-to-end testing. --- src/scripts/enqueue-test-jobs.go | 69 ++++++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) create mode 100644 src/scripts/enqueue-test-jobs.go diff --git a/src/scripts/enqueue-test-jobs.go b/src/scripts/enqueue-test-jobs.go new file mode 100644 index 0000000..120445c --- /dev/null +++ b/src/scripts/enqueue-test-jobs.go @@ -0,0 +1,69 @@ +// +build ignore + +package main + +import ( + "fmt" + "os" + "strconv" + "time" + + faktory "github.com/contribsys/faktory/client" +) + +func main() { + numJobs := 10 + if len(os.Args) > 1 { + if n, err := strconv.Atoi(os.Args[1]); err == nil { + numJobs = n + } + } + + client, err := faktory.Open() + if err != nil { + fmt.Printf("Failed to connect to Faktory: %v\n", err) + os.Exit(1) + } + defer client.Close() + + fmt.Printf("Enqueuing %d test jobs to Faktory...\n", numJobs) + + for i := 1; i <= numJobs; i++ { + jobID := fmt.Sprintf("test-job-%d-%d", i, time.Now().UnixNano()) + + // Create job args matching opslevel.RunnerJob structure + jobArgs := map[string]interface{}{ + "image": "alpine:latest", + "commands": []string{ + fmt.Sprintf("echo Hello from job %d", i), + fmt.Sprintf("echo Job ID: %s", jobID), + "sleep 2", + fmt.Sprintf("echo Job %d complete", i), + }, + "variables": []map[string]interface{}{}, + "files": []map[string]interface{}{ + { + "name": "test.sh", + "contents": "#!/bin/sh\necho Running test script", + }, + }, + } + + job := faktory.NewJob("legacy", jobArgs) + job.Queue = "runner" + job.ReserveFor = 300 + job.SetCustom("opslevel-runner-job-id", jobID) + + if err := client.Push(job); err != nil { + fmt.Printf(" Failed to enqueue job %d: %v\n", i, err) + continue + } + + fmt.Printf(" Enqueued job %d/%d (ID: %s)\n", i, numJobs, jobID) + } + + fmt.Println() + fmt.Printf("Done! Enqueued %d jobs to the 'runner' queue.\n", numJobs) + fmt.Println() + fmt.Println("Monitor at: http://localhost:7420") +} From b9ff5115d0ef627c00dd73be2b16b853abb154bc Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Mon, 19 Jan 2026 09:53:09 -0500 Subject: [PATCH 5/7] Add end-to-end test script --- CLAUDE.md | 13 ++++++++ src/scripts/enqueue-test-jobs.go | 1 + src/scripts/enqueue-test-jobs.sh | 55 ++++++++++++++++++++++++++++++++ 3 files changed, 69 insertions(+) create mode 100755 src/scripts/enqueue-test-jobs.sh diff --git a/CLAUDE.md b/CLAUDE.md index 7811ff3..7464fb4 100644 --- a/CLAUDE.md +++ b/CLAUDE.md @@ -43,6 +43,19 @@ OPSLEVEL_API_TOKEN=XXX go run main.go test -f job.yaml cat job.yaml | OPSLEVEL_API_TOKEN=XXX go run main.go test -f - ``` +## End-to-End Testing with Faktory + +Run an end-to-end test with Faktory and a local Kubernetes cluster: +```bash +# Terminal 1: Start Faktory server and worker +task start-faktory + +# Terminal 2: Enqueue test jobs (requires Faktory running) +cd src && go run scripts/enqueue-test-jobs.go 50 + +# Monitor jobs at http://localhost:7420 +``` + ## Architecture ### Entry Points (src/cmd/) diff --git a/src/scripts/enqueue-test-jobs.go b/src/scripts/enqueue-test-jobs.go index 120445c..8a5fc9d 100644 --- a/src/scripts/enqueue-test-jobs.go +++ b/src/scripts/enqueue-test-jobs.go @@ -1,3 +1,4 @@ +//go:build ignore // +build ignore package main diff --git a/src/scripts/enqueue-test-jobs.sh b/src/scripts/enqueue-test-jobs.sh new file mode 100755 index 0000000..ecdc32a --- /dev/null +++ b/src/scripts/enqueue-test-jobs.sh @@ -0,0 +1,55 @@ +#!/bin/bash +# +# Enqueue N test jobs to Faktory for end-to-end testing +# +# Usage: ./scripts/enqueue-test-jobs.sh [NUM_JOBS] +# NUM_JOBS defaults to 10 +# + +set -e + +NUM_JOBS=${1:-10} +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +SRC_DIR="$(dirname "$SCRIPT_DIR")" + +echo "Enqueuing $NUM_JOBS test jobs to Faktory..." + +for i in $(seq 1 $NUM_JOBS); do + JOB_ID="test-job-${i}-$(date +%s)" + + # Create a temporary job file + # Note: commands must be an array of strings, files must have name/contents + JOB_FILE=$(mktemp) + cat > "$JOB_FILE" < Date: Mon, 19 Jan 2026 09:54:02 -0500 Subject: [PATCH 6/7] Add changie --- .changes/unreleased/Bugfix-20260119-095352.yaml | 3 +++ 1 file changed, 3 insertions(+) create mode 100644 .changes/unreleased/Bugfix-20260119-095352.yaml diff --git a/.changes/unreleased/Bugfix-20260119-095352.yaml b/.changes/unreleased/Bugfix-20260119-095352.yaml new file mode 100644 index 0000000..cb5e271 --- /dev/null +++ b/.changes/unreleased/Bugfix-20260119-095352.yaml @@ -0,0 +1,3 @@ +kind: Bugfix +body: 'We now correctly use client connection pools. ' +time: 2026-01-19T09:53:52.637279-05:00 From 9372393bededbbccfcf8475a98172a41e735aeca Mon Sep 17 00:00:00 2001 From: Jason Morrow Date: Mon, 19 Jan 2026 14:55:37 -0500 Subject: [PATCH 7/7] remove GetKubernetesClientset as it's no longer used --- src/pkg/k8s.go | 13 ------------- 1 file changed, 13 deletions(-) diff --git a/src/pkg/k8s.go b/src/pkg/k8s.go index 064dc1b..b856c04 100644 --- a/src/pkg/k8s.go +++ b/src/pkg/k8s.go @@ -358,19 +358,6 @@ func CreateLabelSelector(labels map[string]string) (*metav1.LabelSelector, error return labelSelector, err } -func GetKubernetesClientset() (*kubernetes.Clientset, error) { - config, err := GetKubernetesConfig() - if err != nil { - return nil, err - } - - client, err := kubernetes.NewForConfig(config) - if err != nil { - return nil, err - } - return client, nil -} - func GetKubernetesConfig() (*rest.Config, error) { loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() configOverrides := &clientcmd.ConfigOverrides{}