Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .changes/unreleased/Bugfix-20260119-095352.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
kind: Bugfix
body: 'We now correctly use client connection pools. '
time: 2026-01-19T09:53:52.637279-05:00
102 changes: 102 additions & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
# CLAUDE.md

This file provides guidance to Claude Code (claude.ai/code) when working with code in this repository.

## Project Overview

OpsLevel Runner is a Kubernetes-based job processor for OpsLevel. It polls for jobs from the OpsLevel API (or Faktory queue), spins up Kubernetes pods to execute commands, and streams logs back to OpsLevel.

## Build and Development Commands

All commands use [Task](https://taskfile.dev/) and should be run from the repository root:

```bash
# Initial setup (installs tools, sets up go workspace)
task setup

# Build the binary (from src/ directory)
cd src && go build

# Run tests with coverage
task test

# Run a single test
cd src && go test -v ./pkg -run TestSanitizeLogProcessor

# Lint and format check
task lint

# Auto-fix formatting and linting issues
task fix

# Start local Faktory development environment
task start-faktory
```

## Testing Jobs Locally

Test a job against a local Kubernetes cluster:
```bash
cd src
OPSLEVEL_API_TOKEN=XXX go run main.go test -f job.yaml
# Or from stdin:
cat job.yaml | OPSLEVEL_API_TOKEN=XXX go run main.go test -f -
```

## End-to-End Testing with Faktory

Run an end-to-end test with Faktory and a local Kubernetes cluster:
```bash
# Terminal 1: Start Faktory server and worker
task start-faktory

# Terminal 2: Enqueue test jobs (requires Faktory running)
cd src && go run scripts/enqueue-test-jobs.go 50

# Monitor jobs at http://localhost:7420
```

## Architecture

### Entry Points (src/cmd/)
- `root.go` - CLI configuration with Cobra/Viper, initializes logging, Sentry, and K8s client
- `run.go` - Main execution mode with two backends:
- **API mode** (default): Polls OpsLevel GraphQL API for pending jobs, uses worker pool
- **Faktory mode**: Consumes jobs from Faktory queue
- `test.go` - Local job testing against a K8s cluster

### Core Components (src/pkg/)
- `k8s.go` - `JobRunner` creates K8s pods, ConfigMaps, and PDBs to execute jobs. Each job gets an ephemeral pod with:
- Init container that copies the runner binary
- Job container running the specified image with commands
- ConfigMap mounting custom scripts from job definition
- `api.go` - GraphQL client wrapper for OpsLevel API
- `logs.go` - `LogStreamer` with pluggable `LogProcessor` chain for stdout/stderr processing
- `leaderElection.go` - K8s lease-based leader election for pod scaling

### Log Processors (src/pkg/)
Log processors form a pipeline that transforms job output:
- `sanitizeLogProcessor.go` - Redacts sensitive variables
- `prefixLogProcessor.go` - Adds timestamps
- `setOutcomeVarLogProcessor.go` - Parses `::set-outcome-var key=value` directives
- `opslevelAppendLogProcessor.go` - Batches and sends logs to OpsLevel API
- `faktoryAppendJobLogProcessor.go` / `faktorySetOutcomeProcessor.go` - Faktory-specific processors

### Signal Handling (src/signal/)
- Graceful shutdown via context cancellation on SIGINT/SIGTERM

## Key Configuration

Environment variables (or CLI flags):
- `OPSLEVEL_API_TOKEN` - Required for API authentication
- `OPSLEVEL_API_URL` - API endpoint (default: https://api.opslevel.com)
- `OPSLEVEL_JOB_POD_NAMESPACE` - K8s namespace for job pods
- `OPSLEVEL_JOB_POD_MAX_WAIT` - Pod startup timeout
- `OPSLEVEL_JOB_POD_MAX_LIFETIME` - Max job duration

## Dependencies

- Uses `opslevel-go` client library (available as git submodule in `src/submodules/opslevel-go`)
- K8s client-go for cluster interaction
- Zerolog for structured logging
- Prometheus client for metrics
7 changes: 3 additions & 4 deletions src/pkg/faktoryRunnerAppendJobLogProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type FaktoryAppendJobLogProcessor struct {
client *faktory.Client
helper faktoryWorker.Helper
logger zerolog.Logger
jobId opslevel.ID
Expand All @@ -25,9 +24,7 @@ type FaktoryAppendJobLogProcessor struct {
}

func NewFaktoryAppendJobLogProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID, maxBytes int, maxTime time.Duration) *FaktoryAppendJobLogProcessor {
client, _ := faktory.Open()
return &FaktoryAppendJobLogProcessor{
client: client,
helper: helper,
logger: logger,
jobId: jobId,
Expand Down Expand Up @@ -104,7 +101,9 @@ func (s *FaktoryAppendJobLogProcessor) submit() {
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId)
}
} else {
err := s.client.Push(job)
err := s.helper.With(func(cl *faktory.Client) error {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The go faktory worker creates a connection pool already! We just needed to use it.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

slick

return cl.Push(job)
})
if err != nil {
MetricEnqueueFailed.Inc()
s.logger.Error().Err(err).Msgf("error while enqueuing append logs for '%d' log line(s) for job '%s'", len(s.logLines), s.jobId)
Expand Down
7 changes: 3 additions & 4 deletions src/pkg/faktorySetOutcomeProcessor.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
)

type FaktorySetOutcomeProcessor struct {
client *faktory.Client
helper faktoryWorker.Helper
logger zerolog.Logger
jobId opslevel.ID
Expand All @@ -20,9 +19,7 @@ type FaktorySetOutcomeProcessor struct {
}

func NewFaktorySetOutcomeProcessor(helper faktoryWorker.Helper, logger zerolog.Logger, jobId opslevel.ID) *FaktorySetOutcomeProcessor {
client, _ := faktory.Open()
return &FaktorySetOutcomeProcessor{
client: client,
helper: helper,
logger: logger,
jobId: jobId,
Expand Down Expand Up @@ -99,7 +96,9 @@ func (s *FaktorySetOutcomeProcessor) Flush(outcome JobOutcome) {
s.logger.Error().Err(err).Msgf("error when reporting outcome '%s' for job '%s'", outcome.Outcome, s.jobId)
}
} else {
err := s.client.Push(job)
err := s.helper.With(func(cl *faktory.Client) error {
return cl.Push(job)
})
if err != nil {
MetricEnqueueFailed.Inc()
s.logger.Error().Err(err).Msgf("error when reporting outcome '%s' for job '%s'", outcome.Outcome, s.jobId)
Expand Down
46 changes: 24 additions & 22 deletions src/pkg/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"io"
"path"
"strings"
"sync"
"time"

"github.com/rs/zerolog/log"
Expand All @@ -31,6 +32,11 @@ import (
var (
ImageTagVersion string
k8sValidated bool

k8sClientOnce sync.Once
sharedK8sConfig *rest.Config
sharedK8sClient *kubernetes.Clientset
k8sInitError error
)

type JobConfig struct {
Expand All @@ -57,13 +63,23 @@ type JobOutcome struct {
OutcomeVariables []opslevel.RunnerJobOutcomeVariable
}

func LoadK8SClient() {
// This function is used to ensure we can connect to k8s
// We don't cache the config or clients for goroutine parallel problems
if _, err := GetKubernetesConfig(); err != nil {
cobra.CheckErr(err)
func GetSharedK8sClient() (*rest.Config, *kubernetes.Clientset, error) {
k8sClientOnce.Do(func() {
sharedK8sConfig, k8sInitError = GetKubernetesConfig()
if k8sInitError != nil {
return
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return
return nil, nil, k8sInitError

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this return is for the anonymous function from Do() - so i think the empty return is correct here.

}
sharedK8sClient, k8sInitError = kubernetes.NewForConfig(sharedK8sConfig)
})
if k8sInitError != nil {
return nil, nil, k8sInitError
}
if _, err := GetKubernetesClientset(); err != nil {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this method is no longer used if we wanna 🔪

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔪

return sharedK8sConfig, sharedK8sClient, nil
}

func LoadK8SClient() {
_, _, err := GetSharedK8sClient()
if err != nil {
cobra.CheckErr(err)
}
k8sValidated = true
Expand All @@ -74,9 +90,8 @@ func NewJobRunner(runnerId string, path string) *JobRunner {
// It's ok if this function panics because we wouldn't beable to run jobs anyway
LoadK8SClient()
}
// We recreate the config & clients here to ensure goroutine parallel problems don't raise their head
config, _ := GetKubernetesConfig()
client, _ := GetKubernetesClientset()
// kubernetes.Clientset is thread-safe and designed to be shared across goroutines
config, client, _ := GetSharedK8sClient() // Already validated by LoadK8SClient
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The removed comments were in general right about sharing clients. The client is threadsafe, however we just need some synchronization around initializing it, as we load the k8s client per NewJobRunner.

We don't manipulate the data from the clients at all so this should be fine.

pod, err := ReadPodConfig(path)
if err != nil {
panic(err)
Expand Down Expand Up @@ -343,19 +358,6 @@ func CreateLabelSelector(labels map[string]string) (*metav1.LabelSelector, error
return labelSelector, err
}

func GetKubernetesClientset() (*kubernetes.Clientset, error) {
config, err := GetKubernetesConfig()
if err != nil {
return nil, err
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
return nil, err
}
return client, nil
}

func GetKubernetesConfig() (*rest.Config, error) {
loadingRules := clientcmd.NewDefaultClientConfigLoadingRules()
configOverrides := &clientcmd.ConfigOverrides{}
Expand Down
70 changes: 70 additions & 0 deletions src/scripts/enqueue-test-jobs.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
//go:build ignore
// +build ignore

package main

import (
"fmt"
"os"
"strconv"
"time"

faktory "github.com/contribsys/faktory/client"
)

func main() {
numJobs := 10
if len(os.Args) > 1 {
if n, err := strconv.Atoi(os.Args[1]); err == nil {
numJobs = n
}
}

client, err := faktory.Open()
if err != nil {
fmt.Printf("Failed to connect to Faktory: %v\n", err)
os.Exit(1)
}
defer client.Close()

fmt.Printf("Enqueuing %d test jobs to Faktory...\n", numJobs)

for i := 1; i <= numJobs; i++ {
jobID := fmt.Sprintf("test-job-%d-%d", i, time.Now().UnixNano())

// Create job args matching opslevel.RunnerJob structure
jobArgs := map[string]interface{}{
"image": "alpine:latest",
"commands": []string{
fmt.Sprintf("echo Hello from job %d", i),
fmt.Sprintf("echo Job ID: %s", jobID),
"sleep 2",
fmt.Sprintf("echo Job %d complete", i),
},
"variables": []map[string]interface{}{},
"files": []map[string]interface{}{
{
"name": "test.sh",
"contents": "#!/bin/sh\necho Running test script",
},
},
}

job := faktory.NewJob("legacy", jobArgs)
job.Queue = "runner"
job.ReserveFor = 300
job.SetCustom("opslevel-runner-job-id", jobID)

if err := client.Push(job); err != nil {
fmt.Printf(" Failed to enqueue job %d: %v\n", i, err)
continue
}

fmt.Printf(" Enqueued job %d/%d (ID: %s)\n", i, numJobs, jobID)
}

fmt.Println()
fmt.Printf("Done! Enqueued %d jobs to the 'runner' queue.\n", numJobs)
fmt.Println()
fmt.Println("Monitor at: http://localhost:7420")
}
55 changes: 55 additions & 0 deletions src/scripts/enqueue-test-jobs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
#!/bin/bash
#
# Enqueue N test jobs to Faktory for end-to-end testing
#
# Usage: ./scripts/enqueue-test-jobs.sh [NUM_JOBS]
# NUM_JOBS defaults to 10
#

set -e

NUM_JOBS=${1:-10}
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
SRC_DIR="$(dirname "$SCRIPT_DIR")"

echo "Enqueuing $NUM_JOBS test jobs to Faktory..."

for i in $(seq 1 $NUM_JOBS); do
JOB_ID="test-job-${i}-$(date +%s)"

# Create a temporary job file
# Note: commands must be an array of strings, files must have name/contents
JOB_FILE=$(mktemp)
cat > "$JOB_FILE" <<ENDJOB
type: legacy
queue: runner
reserve_for: 300
retries: 0
args:
- image: "alpine:latest"
commands:
- "echo Hello from job ${i}"
- "echo Job ID: ${JOB_ID}"
- "sleep 2"
- "echo Job ${i} complete"
variables: []
files:
- name: "test.sh"
contents: "#!/bin/sh\necho Running test script"
custom:
opslevel-runner-job-id: "${JOB_ID}"
ENDJOB

# Enqueue the job
cd "$SRC_DIR" && go run main.go enqueue -f "$JOB_FILE"

# Clean up temp file
rm -f "$JOB_FILE"

echo " Enqueued job $i/$NUM_JOBS (ID: $JOB_ID)"
done

echo ""
echo "Done! Enqueued $NUM_JOBS jobs to the 'runner' queue."
echo ""
echo "Monitor at: http://localhost:7420"
Loading