-
Notifications
You must be signed in to change notification settings - Fork 0
Add connection pools #272
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add connection pools #272
Changes from all commits
7bbd0b7
60da691
a8c0a95
2de5c5a
b9ff511
031d94d
9372393
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,3 @@ | ||
| kind: Bugfix | ||
| body: 'We now correctly use client connection pools. ' | ||
| time: 2026-01-19T09:53:52.637279-05:00 |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,102 @@ | ||
| # CLAUDE.md | ||
|
|
||
| This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository. | ||
|
|
||
| ## Project Overview | ||
|
|
||
| OpsLevel Runner is a Kubernetes-based job processor for OpsLevel. It polls for jobs from the OpsLevel API (or Faktory queue), spins up Kubernetes pods to execute commands, and streams logs back to OpsLevel. | ||
|
|
||
| ## Build and Development Commands | ||
|
|
||
| All commands use [Task](https://taskfile.dev/) and should be run from the repository root: | ||
|
|
||
| ```bash | ||
| # Initial setup (installs tools, sets up go workspace) | ||
| task setup | ||
|
|
||
| # Build the binary (from src/ directory) | ||
| cd src && go build | ||
|
|
||
| # Run tests with coverage | ||
| task test | ||
|
|
||
| # Run a single test | ||
| cd src && go test -v ./pkg -run TestSanitizeLogProcessor | ||
|
|
||
| # Lint and format check | ||
| task lint | ||
|
|
||
| # Auto-fix formatting and linting issues | ||
| task fix | ||
|
|
||
| # Start local Faktory development environment | ||
| task start-faktory | ||
| ``` | ||
|
|
||
| ## Testing Jobs Locally | ||
|
|
||
| Test a job against a local Kubernetes cluster: | ||
| ```bash | ||
| cd src | ||
| OPSLEVEL_API_TOKEN=XXX go run main.go test -f job.yaml | ||
| # Or from stdin: | ||
| cat job.yaml | OPSLEVEL_API_TOKEN=XXX go run main.go test -f - | ||
| ``` | ||
|
|
||
| ## End-to-End Testing with Faktory | ||
|
|
||
| Run an end-to-end test with Faktory and a local Kubernetes cluster: | ||
| ```bash | ||
| # Terminal 1: Start Faktory server and worker | ||
| task start-faktory | ||
|
|
||
| # Terminal 2: Enqueue test jobs (requires Faktory running) | ||
| cd src && go run scripts/enqueue-test-jobs.go 50 | ||
|
|
||
| # Monitor jobs at http://localhost:7420 | ||
| ``` | ||
|
|
||
| ## Architecture | ||
|
|
||
| ### Entry Points (src/cmd/) | ||
| - `root.go` - CLI configuration with Cobra/Viper, initializes logging, Sentry, and K8s client | ||
| - `run.go` - Main execution mode with two backends: | ||
| - **API mode** (default): Polls OpsLevel GraphQL API for pending jobs, uses worker pool | ||
| - **Faktory mode**: Consumes jobs from Faktory queue | ||
| - `test.go` - Local job testing against a K8s cluster | ||
|
|
||
| ### Core Components (src/pkg/) | ||
| - `k8s.go` - `JobRunner` creates K8s pods, ConfigMaps, and PDBs to execute jobs. Each job gets an ephemeral pod with: | ||
| - Init container that copies the runner binary | ||
| - Job container running the specified image with commands | ||
| - ConfigMap mounting custom scripts from job definition | ||
| - `api.go` - GraphQL client wrapper for OpsLevel API | ||
| - `logs.go` - `LogStreamer` with pluggable `LogProcessor` chain for stdout/stderr processing | ||
| - `leaderElection.go` - K8s lease-based leader election for pod scaling | ||
|
|
||
| ### Log Processors (src/pkg/) | ||
| Log processors form a pipeline that transforms job output: | ||
| - `sanitizeLogProcessor.go` - Redacts sensitive variables | ||
| - `prefixLogProcessor.go` - Adds timestamps | ||
| - `setOutcomeVarLogProcessor.go` - Parses `::set-outcome-var key=value` directives | ||
| - `opslevelAppendLogProcessor.go` - Batches and sends logs to OpsLevel API | ||
| - `faktoryAppendJobLogProcessor.go` / `faktorySetOutcomeProcessor.go` - Faktory-specific processors | ||
|
|
||
| ### Signal Handling (src/signal/) | ||
| - Graceful shutdown via context cancellation on SIGINT/SIGTERM | ||
|
|
||
| ## Key Configuration | ||
|
|
||
| Environment variables (or CLI flags): | ||
| - `OPSLEVEL_API_TOKEN` - Required for API authentication | ||
| - `OPSLEVEL_API_URL` - API endpoint (default: https://api.opslevel.com) | ||
| - `OPSLEVEL_JOB_POD_NAMESPACE` - K8s namespace for job pods | ||
| - `OPSLEVEL_JOB_POD_MAX_WAIT` - Pod startup timeout | ||
| - `OPSLEVEL_JOB_POD_MAX_LIFETIME` - Max job duration | ||
|
|
||
| ## Dependencies | ||
|
|
||
| - Uses `opslevel-go` client library (available as git submodule in `src/submodules/opslevel-go`) | ||
| - K8s client-go for cluster interaction | ||
| - Zerolog for structured logging | ||
| - Prometheus client for metrics |
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -6,6 +6,7 @@ import ( | |||||
| "io" | ||||||
| "path" | ||||||
| "strings" | ||||||
| "sync" | ||||||
| "time" | ||||||
|
|
||||||
| "github.com/rs/zerolog/log" | ||||||
|
|
@@ -31,6 +32,11 @@ import ( | |||||
| var ( | ||||||
| ImageTagVersion string | ||||||
| k8sValidated bool | ||||||
|
|
||||||
| k8sClientOnce sync.Once | ||||||
| sharedK8sConfig *rest.Config | ||||||
| sharedK8sClient *kubernetes.Clientset | ||||||
| k8sInitError error | ||||||
| ) | ||||||
|
|
||||||
| type JobConfig struct { | ||||||
|
|
@@ -57,13 +63,23 @@ type JobOutcome struct { | |||||
| OutcomeVariables []opslevel.RunnerJobOutcomeVariable | ||||||
| } | ||||||
|
|
||||||
| func LoadK8SClient() { | ||||||
| // This function is used to ensure we can connect to k8s | ||||||
| // We don't cache the config or clients for goroutine parallel problems | ||||||
| if _, err := GetKubernetesConfig(); err != nil { | ||||||
| cobra.CheckErr(err) | ||||||
| func GetSharedK8sClient() (*rest.Config, *kubernetes.Clientset, error) { | ||||||
| k8sClientOnce.Do(func() { | ||||||
| sharedK8sConfig, k8sInitError = GetKubernetesConfig() | ||||||
| if k8sInitError != nil { | ||||||
| return | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this return is for the anonymous function from |
||||||
| } | ||||||
| sharedK8sClient, k8sInitError = kubernetes.NewForConfig(sharedK8sConfig) | ||||||
| }) | ||||||
| if k8sInitError != nil { | ||||||
| return nil, nil, k8sInitError | ||||||
| } | ||||||
| if _, err := GetKubernetesClientset(); err != nil { | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this method is no longer used if we wanna 🔪
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 🔪 |
||||||
| return sharedK8sConfig, sharedK8sClient, nil | ||||||
| } | ||||||
|
|
||||||
| func LoadK8SClient() { | ||||||
| _, _, err := GetSharedK8sClient() | ||||||
| if err != nil { | ||||||
| cobra.CheckErr(err) | ||||||
| } | ||||||
| k8sValidated = true | ||||||
|
|
@@ -74,9 +90,8 @@ func NewJobRunner(runnerId string, path string) *JobRunner { | |||||
| // It's ok if this function panics because we wouldn't beable to run jobs anyway | ||||||
| LoadK8SClient() | ||||||
| } | ||||||
| // We recreate the config & clients here to ensure goroutine parallel problems don't raise their head | ||||||
| config, _ := GetKubernetesConfig() | ||||||
| client, _ := GetKubernetesClientset() | ||||||
| // kubernetes.Clientset is thread-safe and designed to be shared across goroutines | ||||||
| config, client, _ := GetSharedK8sClient() // Already validated by LoadK8SClient | ||||||
|
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. The removed comments were in general right about sharing clients. The client is threadsafe, however we just need some synchronization around initializing it, as we load the k8s client per We don't manipulate the data from the clients at all so this should be fine. |
||||||
| pod, err := ReadPodConfig(path) | ||||||
| if err != nil { | ||||||
| panic(err) | ||||||
|
|
@@ -343,19 +358,6 @@ func CreateLabelSelector(labels map[string]string) (*metav1.LabelSelector, error | |||||
| return labelSelector, err | ||||||
| } | ||||||
|
|
||||||
| func GetKubernetesClientset() (*kubernetes.Clientset, error) { | ||||||
| config, err := GetKubernetesConfig() | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
|
|
||||||
| client, err := kubernetes.NewForConfig(config) | ||||||
| if err != nil { | ||||||
| return nil, err | ||||||
| } | ||||||
| return client, nil | ||||||
| } | ||||||
|
|
||||||
| func GetKubernetesConfig() (*rest.Config, error) { | ||||||
| loadingRules := clientcmd.NewDefaultClientConfigLoadingRules() | ||||||
| configOverrides := &clientcmd.ConfigOverrides{} | ||||||
|
|
||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,70 @@ | ||
| //go:build ignore | ||
| // +build ignore | ||
|
|
||
| package main | ||
|
|
||
| import ( | ||
| "fmt" | ||
| "os" | ||
| "strconv" | ||
| "time" | ||
|
|
||
| faktory "github.com/contribsys/faktory/client" | ||
| ) | ||
|
|
||
| func main() { | ||
| numJobs := 10 | ||
| if len(os.Args) > 1 { | ||
| if n, err := strconv.Atoi(os.Args[1]); err == nil { | ||
| numJobs = n | ||
| } | ||
| } | ||
|
|
||
| client, err := faktory.Open() | ||
| if err != nil { | ||
| fmt.Printf("Failed to connect to Faktory: %v\n", err) | ||
| os.Exit(1) | ||
| } | ||
| defer client.Close() | ||
|
|
||
| fmt.Printf("Enqueuing %d test jobs to Faktory...\n", numJobs) | ||
|
|
||
| for i := 1; i <= numJobs; i++ { | ||
| jobID := fmt.Sprintf("test-job-%d-%d", i, time.Now().UnixNano()) | ||
|
|
||
| // Create job args matching opslevel.RunnerJob structure | ||
| jobArgs := map[string]interface{}{ | ||
| "image": "alpine:latest", | ||
| "commands": []string{ | ||
| fmt.Sprintf("echo Hello from job %d", i), | ||
| fmt.Sprintf("echo Job ID: %s", jobID), | ||
| "sleep 2", | ||
| fmt.Sprintf("echo Job %d complete", i), | ||
| }, | ||
| "variables": []map[string]interface{}{}, | ||
| "files": []map[string]interface{}{ | ||
| { | ||
| "name": "test.sh", | ||
| "contents": "#!/bin/sh\necho Running test script", | ||
| }, | ||
| }, | ||
| } | ||
|
|
||
| job := faktory.NewJob("legacy", jobArgs) | ||
| job.Queue = "runner" | ||
| job.ReserveFor = 300 | ||
| job.SetCustom("opslevel-runner-job-id", jobID) | ||
|
|
||
| if err := client.Push(job); err != nil { | ||
| fmt.Printf(" Failed to enqueue job %d: %v\n", i, err) | ||
| continue | ||
| } | ||
|
|
||
| fmt.Printf(" Enqueued job %d/%d (ID: %s)\n", i, numJobs, jobID) | ||
| } | ||
|
|
||
| fmt.Println() | ||
| fmt.Printf("Done! Enqueued %d jobs to the 'runner' queue.\n", numJobs) | ||
| fmt.Println() | ||
| fmt.Println("Monitor at: http://localhost:7420") | ||
| } |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,55 @@ | ||
| #!/bin/bash | ||
| # | ||
| # Enqueue N test jobs to Faktory for end-to-end testing | ||
| # | ||
| # Usage: ./scripts/enqueue-test-jobs.sh [NUM_JOBS] | ||
| # NUM_JOBS defaults to 10 | ||
| # | ||
|
|
||
| set -e | ||
|
|
||
| NUM_JOBS=${1:-10} | ||
| SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" | ||
| SRC_DIR="$(dirname "$SCRIPT_DIR")" | ||
|
|
||
| echo "Enqueuing $NUM_JOBS test jobs to Faktory..." | ||
|
|
||
| for i in $(seq 1 $NUM_JOBS); do | ||
| JOB_ID="test-job-${i}-$(date +%s)" | ||
|
|
||
| # Create a temporary job file | ||
| # Note: commands must be an array of strings, files must have name/contents | ||
| JOB_FILE=$(mktemp) | ||
| cat > "$JOB_FILE" <<ENDJOB | ||
| type: legacy | ||
| queue: runner | ||
| reserve_for: 300 | ||
| retries: 0 | ||
| args: | ||
| - image: "alpine:latest" | ||
| commands: | ||
| - "echo Hello from job ${i}" | ||
| - "echo Job ID: ${JOB_ID}" | ||
| - "sleep 2" | ||
| - "echo Job ${i} complete" | ||
| variables: [] | ||
| files: | ||
| - name: "test.sh" | ||
| contents: "#!/bin/sh\necho Running test script" | ||
| custom: | ||
| opslevel-runner-job-id: "${JOB_ID}" | ||
| ENDJOB | ||
|
|
||
| # Enqueue the job | ||
| cd "$SRC_DIR" && go run main.go enqueue -f "$JOB_FILE" | ||
|
|
||
| # Clean up temp file | ||
| rm -f "$JOB_FILE" | ||
|
|
||
| echo " Enqueued job $i/$NUM_JOBS (ID: $JOB_ID)" | ||
| done | ||
|
|
||
| echo "" | ||
| echo "Done! Enqueued $NUM_JOBS jobs to the 'runner' queue." | ||
| echo "" | ||
| echo "Monitor at: http://localhost:7420" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The go faktory worker creates a connection pool already! We just needed to use it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
slick