+
+
+ Count of all jobs by status (submitted, processing, completed, failed, partial_failure). Derived from key names during the listing pass — no extra reads.
+
+
+
+
+
+
+
+
diff --git a/docs/docs/sidebar/architecture/job-architecture.md b/docs/docs/sidebar/architecture/job-architecture.md
index 056a8ebe..a338404d 100644
--- a/docs/docs/sidebar/architecture/job-architecture.md
+++ b/docs/docs/sidebar/architecture/job-architecture.md
@@ -37,7 +37,7 @@ into NATS JetStream:
- **Agents** — Processes jobs, updates status, stores results
All three use the **Job Client Layer** (`internal/job/client/`), which provides
-type-safe business logic operations (`CreateJob`, `GetQueueStats`,
+type-safe business logic operations (`CreateJob`, `GetQueueSummary`,
`GetJobStatus`, `ListJobs`) on top of NATS JetStream.
**NATS JetStream** provides three storage backends:
@@ -346,17 +346,12 @@ GET /api/v1/jobs/{job-id}
{
"total_jobs": 42,
"status_counts": {
- "unprocessed": 5,
+ "submitted": 5,
"processing": 2,
"completed": 30,
"failed": 5
},
- "operation_counts": {
- "node.hostname.get": 15,
- "node.status.get": 19,
- "network.dns.get": 8,
- "network.ping.do": 23
- }
+ "dlq_count": 0
}
```
@@ -663,7 +658,7 @@ The `internal/job/` package contains shared domain types and two subpackages:
| `client.go` | Publish-and-wait/collect with KV + stream |
| `query.go` | Query operations (system status, hostname, etc.) |
| `modify.go` | Modify operations (DNS updates) |
-| `jobs.go` | CreateJob, RetryJob, GetJobStatus, GetQueueStats |
+| `jobs.go` | CreateJob, RetryJob, GetJobStatus, ListJobs |
| `agent.go` | WriteStatusEvent, WriteJobResponse |
| `types.go` | Client-specific types and interfaces |
@@ -770,11 +765,64 @@ osapi agent start
## Performance Optimizations
+### Two-Pass Job Listing
+
+Job listing uses a two-pass approach to avoid reading every job payload:
+
+**Pass 1 — Key-name scan (fast):** Calls `kv.Keys()` once to get all key names
+in the bucket. Job status is derived purely from key name patterns
+(`status.{id}.{event}.{hostname}.{ts}`) without any `kv.Get()` calls. This
+produces ordered job IDs, per-job status, and aggregate status counts — all from
+string parsing in memory.
+
+**Pass 2 — Page fetch (bounded):** Fetches full job details (`kv.Get()`) only
+for the paginated page. With `limit=10`, this is ~10 reads regardless of total
+queue size.
+
+```
+Pass 1: kv.Keys() → 1 call → parse key names → status for all jobs
+Pass 2: kv.Get() → N calls → full details for page only (N = limit)
+```
+
+Queue statistics (`GetQueueSummary`) and the `ListJobs` status counts both use
+Pass 1 only — no `kv.Get()` calls at all.
+
+### Pagination Limits
+
+The API enforces a maximum page size of 100 (`MaxPageSize`). Requests with
+`limit=0` or `limit > 100` return 400. The default page size is 10.
+
+### Known Scalability Constraint: `kv.Keys()`
+
+The two-pass approach relies on `kv.Keys()`, which returns **all key names** in
+the bucket as a string slice. NATS JetStream does not support paginated key
+listing (`kv.Keys(prefix, limit, offset)`) — it is all or nothing.
+
+This is acceptable today because:
+
+- Key names are short strings (~80 bytes each)
+- The KV bucket has a 1-hour TTL, naturally bounding the key count
+- Even 100K keys as strings is only a few MB of memory
+
+However, at very large scale (millions of keys or longer/no TTL), `kv.Keys()`
+would become a bottleneck in both memory and latency. If this becomes a problem,
+potential approaches include:
+
+1. **Separate status index** — a dedicated KV key (e.g., `index.status.failed`)
+ maintaining a list of job IDs, updated on status transitions
+2. **External index** — move listing/filtering to a database (SQLite, Postgres)
+ while keeping NATS for job dispatch and processing
+3. **NATS KV watch** — use `kv.Watch()` to maintain an in-memory index
+ incrementally rather than scanning all keys on each request
+
+For now, the 1-hour TTL keeps the bucket bounded and `kv.Keys()` fast.
+
+### Other Optimizations
+
1. **Batch Operations**: Agents can fetch multiple jobs per poll
2. **Connection Pooling**: Reuse NATS connections
3. **KV Caching**: Local caching of frequently accessed jobs
4. **Stream Filtering**: Agents only receive relevant job types
-5. **Efficient Filtering**: Status-based key prefixes enable fast queries
## Error Handling
diff --git a/docs/docs/sidebar/usage/cli/client/job/list.md b/docs/docs/sidebar/usage/cli/client/job/list.md
index f72ddf24..a9173e4f 100644
--- a/docs/docs/sidebar/usage/cli/client/job/list.md
+++ b/docs/docs/sidebar/usage/cli/client/job/list.md
@@ -11,7 +11,7 @@ $ osapi client job list
Jobs:
- JOB ID STATUS CREATED TARGET OPERATION AGENTS
+ JOB ID STATUS CREATED TARGET OPERATION
550e8400-e29b-41d4-a716-446655440000 completed 2026-02-16 13:21 server1 node.hostname.get
661f9511-f30c-41d4-a716-557766551111 completed 2026-02-16 13:43 server1 network.dns.get
772a0622-a41d-52e5-b827-668877662222 failed 2026-02-16 15:48 server1 node.status.get
@@ -28,7 +28,7 @@ $ osapi client job list --status completed --limit 3
Jobs:
- JOB ID STATUS CREATED TARGET OPERATION AGENTS
+ JOB ID STATUS CREATED TARGET OPERATION
550e8400-e29b-41d4-a716-446655440000 completed 2026-02-16 13:21 server1 node.hostname.get
661f9511-f30c-41d4-a716-557766551111 completed 2026-02-16 13:43 server1 network.dns.get
772a0622-a41d-52e5-b827-668877662222 completed 2026-02-16 15:48 server1 node.status.get
@@ -36,8 +36,8 @@ $ osapi client job list --status completed --limit 3
## Flags
-| Flag | Description | Default |
-| ---------- | ----------------------------------------------- | ------- |
-| `--status` | Filter by status (submitted, processing, etc.) | |
-| `--limit` | Limit number of jobs displayed (0 for no limit) | 10 |
-| `--offset` | Skip the first N jobs (for pagination) | 0 |
+| Flag | Description | Default |
+| ---------- | ---------------------------------------------- | ------- |
+| `--status` | Filter by status (submitted, processing, etc.) | |
+| `--limit` | Maximum number of jobs per page (1-100) | 10 |
+| `--offset` | Skip the first N jobs (for pagination) | 0 |
diff --git a/docs/docs/sidebar/usage/cli/client/job/status.md b/docs/docs/sidebar/usage/cli/client/job/status.md
index 13e1a136..ad186ae9 100644
--- a/docs/docs/sidebar/usage/cli/client/job/status.md
+++ b/docs/docs/sidebar/usage/cli/client/job/status.md
@@ -5,14 +5,13 @@ Display the job queue status with live updates using a BubbleTea TUI:
```bash
$ osapi client job status --poll-interval-seconds 5
- Queue Status:
-
- STATUS COUNT
- Total 42
- Unprocessed 5
- Processing 2
- Completed 30
- Failed 5
+ Jobs Queue Status:
+ Total Jobs: 42
+ Unprocessed: 5
+ Processing: 2
+ Completed: 30
+ Failed: 5
+ Dead Letter Queue: 3
```
## Flags
@@ -22,4 +21,4 @@ $ osapi client job status --poll-interval-seconds 5
| `--poll-interval-seconds` | Interval between polling operations | 30 |
The status view auto-refreshes at the configured interval, showing job counts by
-status and operation type.
+status.
diff --git a/go.mod b/go.mod
index 9232de7c..73ebf8ab 100644
--- a/go.mod
+++ b/go.mod
@@ -18,7 +18,7 @@ require (
github.com/oapi-codegen/runtime v1.2.0
github.com/osapi-io/nats-client v0.0.0-20260306210421-d68b2a0f287b
github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848
- github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013
+ github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e
github.com/prometheus-community/pro-bing v0.8.0
github.com/prometheus/client_golang v1.23.2
github.com/samber/slog-echo v1.21.0
diff --git a/go.sum b/go.sum
index 4298d6a0..afd2e7ca 100644
--- a/go.sum
+++ b/go.sum
@@ -757,6 +757,8 @@ github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848 h1:ELW1sTVBn5
github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848/go.mod h1:4rzeY9jiJF/+Ej4WNwqK5HQ2sflZrEs60GxQpg3Iya8=
github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013 h1:kcP1brAYrbrETk+8jgJKyGE8NI0zIvSg3hT5Y1oviT4=
github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013/go.mod h1:i9g4jaIL6NVo9MRpz33lAEnY4L7u6aO97/5hN4W3hGE=
+github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e h1:sCg9f0Undm5zUdZ+oKESdplMhRvAlqmnMqKlyOoInX0=
+github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e/go.mod h1:i9g4jaIL6NVo9MRpz33lAEnY4L7u6aO97/5hN4W3hGE=
github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw=
github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU=
github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w=
diff --git a/internal/api/agent/agent_drain.go b/internal/api/agent/agent_drain.go
index ee3c1276..9bedd39a 100644
--- a/internal/api/agent/agent_drain.go
+++ b/internal/api/agent/agent_drain.go
@@ -34,6 +34,10 @@ func (a *Agent) DrainAgent(
ctx context.Context,
request gen.DrainAgentRequestObject,
) (gen.DrainAgentResponseObject, error) {
+ if errMsg, ok := validateHostname(request.Hostname); !ok {
+ return gen.DrainAgent400JSONResponse{Error: &errMsg}, nil
+ }
+
hostname := request.Hostname
agentInfo, err := a.JobClient.GetAgent(ctx, hostname)
diff --git a/internal/api/agent/agent_drain_public_test.go b/internal/api/agent/agent_drain_public_test.go
index 8bd310ae..002e8a3e 100644
--- a/internal/api/agent/agent_drain_public_test.go
+++ b/internal/api/agent/agent_drain_public_test.go
@@ -27,6 +27,7 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "strings"
"testing"
"github.com/golang/mock/gomock"
@@ -69,6 +70,7 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() {
tests := []struct {
name string
hostname string
+ skipMock bool
mockAgent *jobtypes.AgentInfo
mockGetErr error
mockWriteErr error
@@ -77,6 +79,26 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() {
mockSetDrainErr error
validateFunc func(resp gen.DrainAgentResponseObject)
}{
+ {
+ name: "returns 400 when hostname is empty",
+ hostname: "",
+ skipMock: true,
+ skipWrite: true,
+ validateFunc: func(resp gen.DrainAgentResponseObject) {
+ _, ok := resp.(gen.DrainAgent400JSONResponse)
+ s.True(ok)
+ },
+ },
+ {
+ name: "returns 400 when hostname exceeds max length",
+ hostname: strings.Repeat("a", 256),
+ skipMock: true,
+ skipWrite: true,
+ validateFunc: func(resp gen.DrainAgentResponseObject) {
+ _, ok := resp.(gen.DrainAgent400JSONResponse)
+ s.True(ok)
+ },
+ },
{
name: "success drains agent",
hostname: "server1",
@@ -176,9 +198,11 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() {
for _, tt := range tests {
s.Run(tt.name, func() {
- s.mockJobClient.EXPECT().
- GetAgent(gomock.Any(), tt.hostname).
- Return(tt.mockAgent, tt.mockGetErr)
+ if !tt.skipMock {
+ s.mockJobClient.EXPECT().
+ GetAgent(gomock.Any(), tt.hostname).
+ Return(tt.mockAgent, tt.mockGetErr)
+ }
if tt.mockSetDrain {
s.mockJobClient.EXPECT().
@@ -209,6 +233,15 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgentHTTP() {
wantCode int
wantContains []string
}{
+ {
+ name: "when hostname exceeds max length returns 400",
+ hostname: strings.Repeat("a", 256),
+ setupJobMock: func() *jobmocks.MockJobClient {
+ return jobmocks.NewMockJobClient(s.mockCtrl)
+ },
+ wantCode: http.StatusBadRequest,
+ wantContains: []string{`"error"`},
+ },
{
name: "when agent exists returns 200",
hostname: "server1",
diff --git a/internal/api/agent/agent_get.go b/internal/api/agent/agent_get.go
index 3dc6d124..be5e733d 100644
--- a/internal/api/agent/agent_get.go
+++ b/internal/api/agent/agent_get.go
@@ -32,6 +32,10 @@ func (a *Agent) GetAgentDetails(
ctx context.Context,
request gen.GetAgentDetailsRequestObject,
) (gen.GetAgentDetailsResponseObject, error) {
+ if errMsg, ok := validateHostname(request.Hostname); !ok {
+ return gen.GetAgentDetails400JSONResponse{Error: &errMsg}, nil
+ }
+
agentInfo, err := a.JobClient.GetAgent(ctx, request.Hostname)
if err != nil {
errMsg := err.Error()
diff --git a/internal/api/agent/agent_get_public_test.go b/internal/api/agent/agent_get_public_test.go
index 516dd805..22e846a4 100644
--- a/internal/api/agent/agent_get_public_test.go
+++ b/internal/api/agent/agent_get_public_test.go
@@ -27,6 +27,7 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "strings"
"testing"
"time"
@@ -73,10 +74,29 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetails() {
tests := []struct {
name string
hostname string
+ skipMock bool
mockAgent *jobtypes.AgentInfo
mockError error
validateFunc func(resp gen.GetAgentDetailsResponseObject)
}{
+ {
+ name: "returns 400 when hostname is empty",
+ hostname: "",
+ skipMock: true,
+ validateFunc: func(resp gen.GetAgentDetailsResponseObject) {
+ _, ok := resp.(gen.GetAgentDetails400JSONResponse)
+ s.True(ok)
+ },
+ },
+ {
+ name: "returns 400 when hostname exceeds max length",
+ hostname: strings.Repeat("a", 256),
+ skipMock: true,
+ validateFunc: func(resp gen.GetAgentDetailsResponseObject) {
+ _, ok := resp.(gen.GetAgentDetails400JSONResponse)
+ s.True(ok)
+ },
+ },
{
name: "success returns agent details",
hostname: "server1",
@@ -125,9 +145,11 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetails() {
for _, tt := range tests {
s.Run(tt.name, func() {
- s.mockJobClient.EXPECT().
- GetAgent(gomock.Any(), tt.hostname).
- Return(tt.mockAgent, tt.mockError)
+ if !tt.skipMock {
+ s.mockJobClient.EXPECT().
+ GetAgent(gomock.Any(), tt.hostname).
+ Return(tt.mockAgent, tt.mockError)
+ }
resp, err := s.handler.GetAgentDetails(s.ctx, gen.GetAgentDetailsRequestObject{
Hostname: tt.hostname,
@@ -146,6 +168,15 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetailsHTTP() {
wantCode int
wantContains []string
}{
+ {
+ name: "when hostname exceeds max length returns 400",
+ hostname: strings.Repeat("a", 256),
+ setupJobMock: func() *jobmocks.MockJobClient {
+ return jobmocks.NewMockJobClient(s.mockCtrl)
+ },
+ wantCode: http.StatusBadRequest,
+ wantContains: []string{`"error"`},
+ },
{
name: "when agent exists returns details",
hostname: "server1",
diff --git a/internal/api/agent/agent_undrain.go b/internal/api/agent/agent_undrain.go
index 67bad3cc..84b9e376 100644
--- a/internal/api/agent/agent_undrain.go
+++ b/internal/api/agent/agent_undrain.go
@@ -34,6 +34,10 @@ func (a *Agent) UndrainAgent(
ctx context.Context,
request gen.UndrainAgentRequestObject,
) (gen.UndrainAgentResponseObject, error) {
+ if errMsg, ok := validateHostname(request.Hostname); !ok {
+ return gen.UndrainAgent400JSONResponse{Error: &errMsg}, nil
+ }
+
hostname := request.Hostname
agentInfo, err := a.JobClient.GetAgent(ctx, hostname)
diff --git a/internal/api/agent/agent_undrain_public_test.go b/internal/api/agent/agent_undrain_public_test.go
index 8050aacb..3ebab922 100644
--- a/internal/api/agent/agent_undrain_public_test.go
+++ b/internal/api/agent/agent_undrain_public_test.go
@@ -27,6 +27,7 @@ import (
"net/http"
"net/http/httptest"
"os"
+ "strings"
"testing"
"github.com/golang/mock/gomock"
@@ -69,6 +70,7 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() {
tests := []struct {
name string
hostname string
+ skipMock bool
mockAgent *jobtypes.AgentInfo
mockGetErr error
mockWriteErr error
@@ -77,6 +79,26 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() {
mockDeleteDrainErr error
validateFunc func(resp gen.UndrainAgentResponseObject)
}{
+ {
+ name: "returns 400 when hostname is empty",
+ hostname: "",
+ skipMock: true,
+ skipWrite: true,
+ validateFunc: func(resp gen.UndrainAgentResponseObject) {
+ _, ok := resp.(gen.UndrainAgent400JSONResponse)
+ s.True(ok)
+ },
+ },
+ {
+ name: "returns 400 when hostname exceeds max length",
+ hostname: strings.Repeat("a", 256),
+ skipMock: true,
+ skipWrite: true,
+ validateFunc: func(resp gen.UndrainAgentResponseObject) {
+ _, ok := resp.(gen.UndrainAgent400JSONResponse)
+ s.True(ok)
+ },
+ },
{
name: "success undrains draining agent",
hostname: "server1",
@@ -190,9 +212,11 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() {
for _, tt := range tests {
s.Run(tt.name, func() {
- s.mockJobClient.EXPECT().
- GetAgent(gomock.Any(), tt.hostname).
- Return(tt.mockAgent, tt.mockGetErr)
+ if !tt.skipMock {
+ s.mockJobClient.EXPECT().
+ GetAgent(gomock.Any(), tt.hostname).
+ Return(tt.mockAgent, tt.mockGetErr)
+ }
if tt.mockDeleteDrain {
s.mockJobClient.EXPECT().
@@ -223,6 +247,15 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgentHTTP() {
wantCode int
wantContains []string
}{
+ {
+ name: "when hostname exceeds max length returns 400",
+ hostname: strings.Repeat("a", 256),
+ setupJobMock: func() *jobmocks.MockJobClient {
+ return jobmocks.NewMockJobClient(s.mockCtrl)
+ },
+ wantCode: http.StatusBadRequest,
+ wantContains: []string{`"error"`},
+ },
{
name: "when draining agent exists returns 200",
hostname: "server1",
diff --git a/internal/api/agent/gen/agent.gen.go b/internal/api/agent/gen/agent.gen.go
index 7eb96e5e..9c0f4d19 100644
--- a/internal/api/agent/gen/agent.gen.go
+++ b/internal/api/agent/gen/agent.gen.go
@@ -399,6 +399,15 @@ func (response GetAgentDetails200JSONResponse) VisitGetAgentDetailsResponse(w ht
return json.NewEncoder(w).Encode(response)
}
+type GetAgentDetails400JSONResponse externalRef0.ErrorResponse
+
+func (response GetAgentDetails400JSONResponse) VisitGetAgentDetailsResponse(w http.ResponseWriter) error {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(400)
+
+ return json.NewEncoder(w).Encode(response)
+}
+
type GetAgentDetails401JSONResponse externalRef0.ErrorResponse
func (response GetAgentDetails401JSONResponse) VisitGetAgentDetailsResponse(w http.ResponseWriter) error {
@@ -454,6 +463,15 @@ func (response DrainAgent200JSONResponse) VisitDrainAgentResponse(w http.Respons
return json.NewEncoder(w).Encode(response)
}
+type DrainAgent400JSONResponse externalRef0.ErrorResponse
+
+func (response DrainAgent400JSONResponse) VisitDrainAgentResponse(w http.ResponseWriter) error {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(400)
+
+ return json.NewEncoder(w).Encode(response)
+}
+
type DrainAgent401JSONResponse externalRef0.ErrorResponse
func (response DrainAgent401JSONResponse) VisitDrainAgentResponse(w http.ResponseWriter) error {
@@ -509,6 +527,15 @@ func (response UndrainAgent200JSONResponse) VisitUndrainAgentResponse(w http.Res
return json.NewEncoder(w).Encode(response)
}
+type UndrainAgent400JSONResponse externalRef0.ErrorResponse
+
+func (response UndrainAgent400JSONResponse) VisitUndrainAgentResponse(w http.ResponseWriter) error {
+ w.Header().Set("Content-Type", "application/json")
+ w.WriteHeader(400)
+
+ return json.NewEncoder(w).Encode(response)
+}
+
type UndrainAgent401JSONResponse externalRef0.ErrorResponse
func (response UndrainAgent401JSONResponse) VisitUndrainAgentResponse(w http.ResponseWriter) error {
diff --git a/internal/api/agent/gen/api.yaml b/internal/api/agent/gen/api.yaml
index 6fa564aa..1b56179c 100644
--- a/internal/api/agent/gen/api.yaml
+++ b/internal/api/agent/gen/api.yaml
@@ -76,8 +76,15 @@ paths:
- name: hostname
in: path
required: true
+ # NOTE: x-oapi-codegen-extra-tags on path params do not generate
+ # validate tags in strict-server mode. Validation is handled
+ # manually in the handler.
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to retrieve.
responses:
'200':
@@ -86,6 +93,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/AgentInfo'
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
@@ -126,8 +139,15 @@ paths:
- name: hostname
in: path
required: true
+ # NOTE: x-oapi-codegen-extra-tags on path params do not generate
+ # validate tags in strict-server mode. Validation is handled
+ # manually in the handler.
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to drain.
responses:
'200':
@@ -141,6 +161,12 @@ paths:
type: string
required:
- message
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
@@ -179,8 +205,15 @@ paths:
- name: hostname
in: path
required: true
+ # NOTE: x-oapi-codegen-extra-tags on path params do not generate
+ # validate tags in strict-server mode. Validation is handled
+ # manually in the handler.
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to undrain.
responses:
'200':
@@ -194,6 +227,12 @@ paths:
type: string
required:
- message
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
diff --git a/internal/api/agent/validate.go b/internal/api/agent/validate.go
new file mode 100644
index 00000000..c9213137
--- /dev/null
+++ b/internal/api/agent/validate.go
@@ -0,0 +1,34 @@
+// Copyright (c) 2026 John Dewey
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
+// deal in the Software without restriction, including without limitation the
+// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+// sell copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+package agent
+
+import "github.com/retr0h/osapi/internal/validation"
+
+// validateHostname validates a hostname path parameter using the shared
+// validator. Returns the error message and false if invalid.
+//
+// This exists because oapi-codegen does not generate validate tags on
+// path parameters in strict-server mode (upstream limitation).
+func validateHostname(
+ hostname string,
+) (string, bool) {
+ return validation.Var(hostname, "required,min=1,max=255")
+}
diff --git a/internal/api/file/file_upload.go b/internal/api/file/file_upload.go
index a6f9cb83..87a4cf7b 100644
--- a/internal/api/file/file_upload.go
+++ b/internal/api/file/file_upload.go
@@ -33,6 +33,7 @@ import (
"github.com/nats-io/nats.go/jetstream"
"github.com/retr0h/osapi/internal/api/file/gen"
+ "github.com/retr0h/osapi/internal/validation"
)
// PostFile upload a file to the Object Store via multipart/form-data.
@@ -40,6 +41,13 @@ func (f *File) PostFile(
ctx context.Context,
request gen.PostFileRequestObject,
) (gen.PostFileResponseObject, error) {
+ // Defense-in-depth: the OpenAPI validator handles param validation before
+ // the handler runs, but we validate here too so the plumbing is in place
+ // if a future param adds stricter tags.
+ if errMsg, ok := validation.Struct(request.Params); !ok {
+ return gen.PostFile400JSONResponse{Error: &errMsg}, nil
+ }
+
name, contentType, fileData, errResp := f.parseMultipart(request)
if errResp != nil {
return errResp, nil
diff --git a/internal/api/file/gen/api.yaml b/internal/api/file/gen/api.yaml
index 50637b35..582ee986 100644
--- a/internal/api/file/gen/api.yaml
+++ b/internal/api/file/gen/api.yaml
@@ -43,14 +43,8 @@ paths:
- name: force
in: query
required: false
- # x-oapi-codegen-extra-tags:
- # validate: omitempty
- # NOTE: The tags above are intentionally commented out. The
- # only param is an optional bool — validate:"omitempty" can
- # never fail, so validation.Struct(Params) would be dead
- # code. The middleware already type-validates the bool before
- # the handler runs. Uncomment if a future param needs a
- # stricter validate tag.
+ x-oapi-codegen-extra-tags:
+ validate: omitempty
description: >
When true, bypass the digest check and always write the
file. Returns changed=true regardless of whether the
diff --git a/internal/api/file/gen/file.gen.go b/internal/api/file/gen/file.gen.go
index 0377146e..3287df6f 100644
--- a/internal/api/file/gen/file.gen.go
+++ b/internal/api/file/gen/file.gen.go
@@ -114,7 +114,7 @@ type PostFileMultipartBody struct {
// PostFileParams defines parameters for PostFile.
type PostFileParams struct {
// Force When true, bypass the digest check and always write the file. Returns changed=true regardless of whether the content differs from the existing object.
- Force *bool `form:"force,omitempty" json:"force,omitempty"`
+ Force *bool `form:"force,omitempty" json:"force,omitempty" validate:"omitempty"`
}
// PostFileMultipartBodyContentType defines parameters for PostFile.
diff --git a/internal/api/gen/api.yaml b/internal/api/gen/api.yaml
index 36c5a29e..c8185209 100644
--- a/internal/api/gen/api.yaml
+++ b/internal/api/gen/api.yaml
@@ -87,8 +87,12 @@ paths:
- name: hostname
in: path
required: true
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to retrieve.
responses:
'200':
@@ -97,6 +101,12 @@ paths:
application/json:
schema:
$ref: '#/components/schemas/AgentInfo'
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
@@ -138,8 +148,12 @@ paths:
- name: hostname
in: path
required: true
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to drain.
responses:
'200':
@@ -153,6 +167,12 @@ paths:
type: string
required:
- message
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
@@ -192,8 +212,12 @@ paths:
- name: hostname
in: path
required: true
+ x-oapi-codegen-extra-tags:
+ validate: required,min=1,max=255
schema:
type: string
+ minLength: 1
+ maxLength: 255
description: The hostname of the agent to undrain.
responses:
'200':
@@ -207,6 +231,12 @@ paths:
type: string
required:
- message
+ '400':
+ description: Invalid hostname.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ErrorResponse'
'401':
description: Unauthorized - API key required
content:
@@ -412,6 +442,8 @@ paths:
- name: force
in: query
required: false
+ x-oapi-codegen-extra-tags:
+ validate: omitempty
description: >
When true, bypass the digest check and always write the file.
Returns changed=true regardless of whether the content differs from
@@ -770,12 +802,13 @@ paths:
in: query
required: false
x-oapi-codegen-extra-tags:
- validate: omitempty,min=0
+ validate: omitempty,min=1,max=100
schema:
type: integer
- minimum: 0
+ minimum: 1
+ maximum: 100
default: 10
- description: Maximum number of jobs to return. Use 0 for no limit.
+ description: Maximum number of jobs per page (1-100).
- name: offset
in: query
required: false
@@ -2505,6 +2538,20 @@ components:
type: integer
description: Total number of jobs matching the filter.
example: 42
+ status_counts:
+ type: object
+ additionalProperties:
+ type: integer
+ description: >
+ Count of all jobs by status (submitted, processing, completed,
+ failed, partial_failure). Derived from key names during the listing
+ pass — no extra reads.
+ example:
+ submitted: 5
+ processing: 2
+ completed: 30
+ failed: 3
+ partial_failure: 2
items:
type: array
items:
@@ -2598,11 +2645,6 @@ components:
additionalProperties:
type: integer
description: Count of jobs by status.
- operation_counts:
- type: object
- additionalProperties:
- type: integer
- description: Count of jobs by operation type.
dlq_count:
type: integer
description: Number of jobs in the dead letter queue.
diff --git a/internal/api/job/gen/api.yaml b/internal/api/job/gen/api.yaml
index 664cf1f8..d4e6ab10 100644
--- a/internal/api/job/gen/api.yaml
+++ b/internal/api/job/gen/api.yaml
@@ -102,12 +102,13 @@ paths:
in: query
required: false
x-oapi-codegen-extra-tags:
- validate: omitempty,min=0
+ validate: omitempty,min=1,max=100
schema:
type: integer
- minimum: 0
+ minimum: 1
+ maximum: 100
default: 10
- description: Maximum number of jobs to return. Use 0 for no limit.
+ description: Maximum number of jobs per page (1-100).
- name: offset
in: query
required: false
@@ -430,6 +431,20 @@ components:
type: integer
description: Total number of jobs matching the filter.
example: 42
+ status_counts:
+ type: object
+ additionalProperties:
+ type: integer
+ description: >
+ Count of all jobs by status (submitted, processing, completed,
+ failed, partial_failure). Derived from key names during the
+ listing pass — no extra reads.
+ example:
+ submitted: 5
+ processing: 2
+ completed: 30
+ failed: 3
+ partial_failure: 2
items:
type: array
items:
@@ -525,11 +540,6 @@ components:
additionalProperties:
type: integer
description: Count of jobs by status.
- operation_counts:
- type: object
- additionalProperties:
- type: integer
- description: Count of jobs by operation type.
dlq_count:
type: integer
description: Number of jobs in the dead letter queue.
diff --git a/internal/api/job/gen/job.gen.go b/internal/api/job/gen/job.gen.go
index a72f4382..4c89eb37 100644
--- a/internal/api/job/gen/job.gen.go
+++ b/internal/api/job/gen/job.gen.go
@@ -121,6 +121,9 @@ type JobDetailResponse struct {
type ListJobsResponse struct {
Items *[]JobDetailResponse `json:"items,omitempty"`
+ // StatusCounts Count of all jobs by status (submitted, processing, completed, failed, partial_failure). Derived from key names during the listing pass — no extra reads.
+ StatusCounts *map[string]int `json:"status_counts,omitempty"`
+
// TotalItems Total number of jobs matching the filter.
TotalItems *int `json:"total_items,omitempty"`
}
@@ -130,9 +133,6 @@ type QueueStatsResponse struct {
// DlqCount Number of jobs in the dead letter queue.
DlqCount *int `json:"dlq_count,omitempty"`
- // OperationCounts Count of jobs by operation type.
- OperationCounts *map[string]int `json:"operation_counts,omitempty"`
-
// StatusCounts Count of jobs by status.
StatusCounts *map[string]int `json:"status_counts,omitempty"`
@@ -151,8 +151,8 @@ type GetJobParams struct {
// Status Filter jobs by status.
Status *GetJobParamsStatus `form:"status,omitempty" json:"status,omitempty" validate:"omitempty,oneof=submitted processing completed failed partial_failure"`
- // Limit Maximum number of jobs to return. Use 0 for no limit.
- Limit *int `form:"limit,omitempty" json:"limit,omitempty" validate:"omitempty,min=0"`
+ // Limit Maximum number of jobs per page (1-100).
+ Limit *int `form:"limit,omitempty" json:"limit,omitempty" validate:"omitempty,min=1,max=100"`
// Offset Number of jobs to skip for pagination.
Offset *int `form:"offset,omitempty" json:"offset,omitempty" validate:"omitempty,min=0"`
diff --git a/internal/api/job/job_list.go b/internal/api/job/job_list.go
index 998e4e12..fca48399 100644
--- a/internal/api/job/job_list.go
+++ b/internal/api/job/job_list.go
@@ -27,6 +27,7 @@ import (
"github.com/google/uuid"
"github.com/retr0h/osapi/internal/api/job/gen"
+ "github.com/retr0h/osapi/internal/job/client"
"github.com/retr0h/osapi/internal/validation"
)
@@ -44,10 +45,16 @@ func (j *Job) GetJob(
statusFilter = string(*request.Params.Status)
}
- limit := 10
+ limit := client.DefaultPageSize
if request.Params.Limit != nil {
limit = *request.Params.Limit
}
+ // Defense-in-depth: the OpenAPI validator rejects out-of-range limits
+ // before the handler runs, but we cap here too in case validation is
+ // ever misconfigured. ListJobs also caps internally.
+ if limit <= 0 || limit > client.MaxPageSize {
+ limit = client.DefaultPageSize
+ }
offset := 0
if request.Params.Offset != nil {
@@ -92,8 +99,10 @@ func (j *Job) GetJob(
}
totalItems := result.TotalCount
+ statusCounts := result.StatusCounts
return gen.GetJob200JSONResponse{
- TotalItems: &totalItems,
- Items: &items,
+ TotalItems: &totalItems,
+ StatusCounts: &statusCounts,
+ Items: &items,
}, nil
}
diff --git a/internal/api/job/job_list_public_test.go b/internal/api/job/job_list_public_test.go
index 873dff7e..10489653 100644
--- a/internal/api/job/job_list_public_test.go
+++ b/internal/api/job/job_list_public_test.go
@@ -108,6 +108,36 @@ func (s *JobListPublicTestSuite) TestGetJob() {
s.NotNil(r.Error)
},
},
+ {
+ name: "returns 400 when limit is zero",
+ request: func() gen.GetJobRequestObject {
+ l := 0
+ return gen.GetJobRequestObject{
+ Params: gen.GetJobParams{Limit: &l},
+ }
+ }(),
+ expectMock: false,
+ validateFunc: func(resp gen.GetJobResponseObject) {
+ r, ok := resp.(gen.GetJob400JSONResponse)
+ s.True(ok)
+ s.NotNil(r.Error)
+ },
+ },
+ {
+ name: "returns 400 when limit exceeds max",
+ request: func() gen.GetJobRequestObject {
+ l := 101
+ return gen.GetJobRequestObject{
+ Params: gen.GetJobParams{Limit: &l},
+ }
+ }(),
+ expectMock: false,
+ validateFunc: func(resp gen.GetJobResponseObject) {
+ r, ok := resp.(gen.GetJob400JSONResponse)
+ s.True(ok)
+ s.NotNil(r.Error)
+ },
+ },
{
name: "returns 400 when offset is negative",
request: func() gen.GetJobRequestObject {
@@ -341,6 +371,24 @@ func (s *JobListPublicTestSuite) TestListJobsValidationHTTP() {
wantCode: http.StatusBadRequest,
wantContains: []string{`"error"`},
},
+ {
+ name: "when limit=0 returns 400",
+ query: "?limit=0",
+ setupJobMock: func() *jobmocks.MockJobClient {
+ return jobmocks.NewMockJobClient(s.mockCtrl)
+ },
+ wantCode: http.StatusBadRequest,
+ wantContains: []string{`"error"`},
+ },
+ {
+ name: "when limit exceeds max returns 400",
+ query: "?limit=101",
+ setupJobMock: func() *jobmocks.MockJobClient {
+ return jobmocks.NewMockJobClient(s.mockCtrl)
+ },
+ wantCode: http.StatusBadRequest,
+ wantContains: []string{`"error"`},
+ },
{
name: "when valid limit and offset",
query: "?limit=5&offset=10",
diff --git a/internal/api/job/job_status.go b/internal/api/job/job_status.go
index 6f40a333..bff4f7bd 100644
--- a/internal/api/job/job_status.go
+++ b/internal/api/job/job_status.go
@@ -31,7 +31,7 @@ func (j *Job) GetJobStatus(
ctx context.Context,
_ gen.GetJobStatusRequestObject,
) (gen.GetJobStatusResponseObject, error) {
- stats, err := j.JobClient.GetQueueStats(ctx)
+ stats, err := j.JobClient.GetQueueSummary(ctx)
if err != nil {
errMsg := err.Error()
return gen.GetJobStatus500JSONResponse{
@@ -40,9 +40,8 @@ func (j *Job) GetJobStatus(
}
return gen.GetJobStatus200JSONResponse{
- TotalJobs: &stats.TotalJobs,
- StatusCounts: &stats.StatusCounts,
- OperationCounts: &stats.OperationCounts,
- DlqCount: &stats.DLQCount,
+ TotalJobs: &stats.TotalJobs,
+ StatusCounts: &stats.StatusCounts,
+ DlqCount: &stats.DLQCount,
}, nil
}
diff --git a/internal/api/job/job_status_public_test.go b/internal/api/job/job_status_public_test.go
index 4f98ab26..6af77f6a 100644
--- a/internal/api/job/job_status_public_test.go
+++ b/internal/api/job/job_status_public_test.go
@@ -81,9 +81,6 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatus() {
"completed": 30,
"failed": 5,
},
- OperationCounts: map[string]int{
- "node.hostname.get": 15,
- },
DLQCount: 2,
},
validateFunc: func(resp gen.GetJobStatusResponseObject) {
@@ -106,7 +103,7 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatus() {
for _, tt := range tests {
s.Run(tt.name, func() {
s.mockJobClient.EXPECT().
- GetQueueStats(gomock.Any()).
+ GetQueueSummary(gomock.Any()).
Return(tt.mockStats, tt.mockError)
resp, err := s.handler.GetJobStatus(s.ctx, gen.GetJobStatusRequestObject{})
@@ -128,16 +125,13 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusHTTP() {
setupJobMock: func() *jobmocks.MockJobClient {
mock := jobmocks.NewMockJobClient(s.mockCtrl)
mock.EXPECT().
- GetQueueStats(gomock.Any()).
+ GetQueueSummary(gomock.Any()).
Return(&jobtypes.QueueStats{
TotalJobs: 42,
StatusCounts: map[string]int{
"completed": 30,
"failed": 5,
},
- OperationCounts: map[string]int{
- "node.hostname.get": 15,
- },
DLQCount: 2,
}, nil)
return mock
@@ -150,7 +144,7 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusHTTP() {
setupJobMock: func() *jobmocks.MockJobClient {
mock := jobmocks.NewMockJobClient(s.mockCtrl)
mock.EXPECT().
- GetQueueStats(gomock.Any()).
+ GetQueueSummary(gomock.Any()).
Return(nil, assert.AnError)
return mock
},
@@ -238,16 +232,13 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusRBACHTTP() {
setupJobMock: func() *jobmocks.MockJobClient {
mock := jobmocks.NewMockJobClient(s.mockCtrl)
mock.EXPECT().
- GetQueueStats(gomock.Any()).
+ GetQueueSummary(gomock.Any()).
Return(&jobtypes.QueueStats{
TotalJobs: 42,
StatusCounts: map[string]int{
"completed": 30,
"failed": 5,
},
- OperationCounts: map[string]int{
- "node.hostname.get": 15,
- },
DLQCount: 2,
}, nil)
return mock
diff --git a/internal/job/client/jobs.go b/internal/job/client/jobs.go
index 3ffa1843..c6b8d26c 100644
--- a/internal/job/client/jobs.go
+++ b/internal/job/client/jobs.go
@@ -150,32 +150,28 @@ func (c *Client) CreateJob(
}, nil
}
-// GetQueueStats returns statistics about the job queue.
-func (c *Client) GetQueueStats(
+// GetQueueSummary returns job queue statistics derived from KV key names
+// only — no entry reads.
+func (c *Client) GetQueueSummary(
ctx context.Context,
) (*job.QueueStats, error) {
- c.logger.Debug("kv.keys",
- slog.String("operation", "get_queue_stats"),
- )
- // Get all job keys from KV store
keys, err := c.kv.Keys(ctx)
if err != nil {
if errors.Is(err, jetstream.ErrNoKeysFound) {
return &job.QueueStats{
- TotalJobs: 0,
StatusCounts: map[string]int{
"submitted": 0,
"processing": 0,
"completed": 0,
"failed": 0,
},
- OperationCounts: map[string]int{},
- DLQCount: 0,
}, nil
}
- return nil, fmt.Errorf("error fetching jobs: %w", err)
+ return nil, fmt.Errorf("error fetching keys: %w", err)
}
+ _, jobStatuses := computeStatusFromKeyNames(keys)
+
statusCounts := map[string]int{
"submitted": 0,
"processing": 0,
@@ -184,129 +180,8 @@ func (c *Client) GetQueueStats(
"partial_failure": 0,
}
- operationCounts := map[string]int{}
- jobCount := 0
-
- // Process immutable jobs and compute status from events
- for _, key := range keys {
- // Only process "jobs." prefixed keys
- if !strings.HasPrefix(key, "jobs.") {
- continue
- }
-
- entry, err := c.kv.Get(ctx, key)
- if err != nil {
- continue
- }
-
- var jobData map[string]interface{}
- if err := json.Unmarshal(entry.Value(), &jobData); err != nil {
- continue
- }
-
- jobID := strings.TrimPrefix(key, "jobs.")
- jobCount++
-
- // Compute status from events (reuse already-fetched keys)
- computedStatus := c.computeStatusFromEvents(ctx, keys, jobID)
- statusCounts[computedStatus.Status]++
-
- // Track operation type
- if jobOperationData, ok := jobData["operation"].(map[string]interface{}); ok {
- if operationType, ok := jobOperationData["type"].(string); ok {
- operationCounts[operationType]++
- }
- }
- }
-
- // Get DLQ count
- dlqCount := 0
- dlqName := c.streamName + "-DLQ"
- dlqInfo, err := c.natsClient.GetStreamInfo(ctx, dlqName)
- if err != nil {
- // DLQ might not exist, which is fine
- c.logger.Debug("failed to get DLQ info", slog.String("error", err.Error()))
- } else {
- dlqCount = int(dlqInfo.State.Msgs)
- }
-
- return &job.QueueStats{
- TotalJobs: jobCount,
- StatusCounts: statusCounts,
- OperationCounts: operationCounts,
- DLQCount: dlqCount,
- }, nil
-}
-
-// GetQueueSummary returns lightweight job queue statistics derived from KV key
-// names only — no entry reads. This is much faster than GetQueueStats for large
-// queues and suitable for health/status dashboards.
-func (c *Client) GetQueueSummary(
- ctx context.Context,
-) (*job.QueueStats, error) {
- keys, err := c.kv.Keys(ctx)
- if err != nil {
- if errors.Is(err, jetstream.ErrNoKeysFound) {
- return &job.QueueStats{
- StatusCounts: map[string]int{
- "submitted": 0,
- "processing": 0,
- "completed": 0,
- "failed": 0,
- },
- }, nil
- }
- return nil, fmt.Errorf("error fetching keys: %w", err)
- }
-
- // Collect job IDs and their highest-priority status from key names.
- // Key format: status....
- // Status priority: completed > failed > started > acknowledged > submitted
- statusPriority := map[string]int{
- "submitted": 0,
- "acknowledged": 1,
- "started": 2,
- "failed": 3,
- "completed": 4,
- "retried": 4,
- }
-
- jobStatuses := make(map[string]string) // jobID -> highest status
- for _, key := range keys {
- if !strings.HasPrefix(key, "status.") {
- continue
- }
- parts := strings.SplitN(key, ".", 4)
- if len(parts) < 3 {
- continue
- }
- jobID := parts[1]
- event := parts[2]
-
- cur, exists := jobStatuses[jobID]
- if !exists || statusPriority[event] > statusPriority[cur] {
- jobStatuses[jobID] = event
- }
- }
-
- statusCounts := map[string]int{
- "submitted": 0,
- "processing": 0,
- "completed": 0,
- "failed": 0,
- }
-
- for _, status := range jobStatuses {
- switch status {
- case "completed", "retried":
- statusCounts["completed"]++
- case "failed":
- statusCounts["failed"]++
- case "started", "acknowledged":
- statusCounts["processing"]++
- default:
- statusCounts["submitted"]++
- }
+ for _, info := range jobStatuses {
+ statusCounts[info.Status]++
}
total := len(jobStatuses)
@@ -387,7 +262,8 @@ func (c *Client) GetJobStatus(
}
// ListJobs returns jobs filtered by status with server-side pagination.
-// limit=0 means no limit. offset=0 means start from beginning.
+// Uses a two-pass approach: Pass 1 derives status from key names only (fast),
+// Pass 2 fetches full details for the paginated page only.
// Jobs are returned newest-first (reverse insertion order).
func (c *Client) ListJobs(
ctx context.Context,
@@ -395,12 +271,18 @@ func (c *Client) ListJobs(
limit int,
offset int,
) (*ListJobsResult, error) {
+ // Cap limit to MaxPageSize
+ if limit <= 0 || limit > MaxPageSize {
+ limit = DefaultPageSize
+ }
+
c.logger.Debug("kv.keys",
slog.String("operation", "list_jobs"),
slog.String("status_filter", statusFilter),
slog.Int("limit", limit),
slog.Int("offset", offset),
)
+
allKeys, err := c.kv.Keys(ctx)
if err != nil {
if errors.Is(err, jetstream.ErrNoKeysFound) {
@@ -412,124 +294,76 @@ func (c *Client) ListJobs(
return nil, fmt.Errorf("error fetching jobs: %w", err)
}
- // Extract job keys and reverse for newest-first ordering
- var jobKeys []string
- for _, key := range allKeys {
- if strings.HasPrefix(key, "jobs.") {
- jobID := strings.TrimPrefix(key, "jobs.")
- if jobID != "" {
- jobKeys = append(jobKeys, key)
- }
- }
- }
+ // Pass 1: Light — key names only, no kv.Get()
+ orderedJobIDs, jobStatuses := computeStatusFromKeyNames(allKeys)
- // Reverse for newest-first (KV insertion order is chronological)
- for i, j := 0, len(jobKeys)-1; i < j; i, j = i+1, j-1 {
- jobKeys[i], jobKeys[j] = jobKeys[j], jobKeys[i]
+ // Compute status counts from key-name-derived statuses (free — no extra reads)
+ statusCounts := map[string]int{
+ "submitted": 0,
+ "processing": 0,
+ "completed": 0,
+ "failed": 0,
+ "partial_failure": 0,
}
-
- c.logger.Debug("kv.keys",
- slog.Int("total_job_keys", len(jobKeys)),
- slog.Int("all_keys", len(allKeys)),
- )
-
- // No status filter: we know the total count immediately
- if statusFilter == "" {
- return c.listJobsNoFilter(ctx, allKeys, jobKeys, limit, offset), nil
+ for _, info := range jobStatuses {
+ statusCounts[info.Status]++
+ }
+ // Jobs with no status events are "submitted" but not in jobStatuses
+ submittedOnly := len(orderedJobIDs) - len(jobStatuses)
+ if submittedOnly > 0 {
+ statusCounts["submitted"] += submittedOnly
}
- // With status filter: scan all jobs to count matches and collect page
- return c.listJobsWithFilter(ctx, allKeys, jobKeys, statusFilter, limit, offset), nil
-}
-
-// listJobsNoFilter handles pagination when no status filter is applied.
-func (c *Client) listJobsNoFilter(
- ctx context.Context,
- allKeys []string,
- jobKeys []string,
- limit int,
- offset int,
-) *ListJobsResult {
- totalCount := len(jobKeys)
+ // Filter + count (fast, no reads)
+ var matchingIDs []string
+ for _, id := range orderedJobIDs {
+ if statusFilter != "" {
+ info, exists := jobStatuses[id]
+ if !exists {
+ info = lightJobInfo{Status: "submitted"}
+ }
+ if info.Status != statusFilter {
+ continue
+ }
+ }
+ matchingIDs = append(matchingIDs, id)
+ }
+ totalCount := len(matchingIDs)
// Apply offset
- if offset >= len(jobKeys) {
+ if offset >= len(matchingIDs) {
return &ListJobsResult{
- Jobs: []*job.QueuedJob{},
- TotalCount: totalCount,
- }
+ Jobs: []*job.QueuedJob{},
+ TotalCount: totalCount,
+ StatusCounts: statusCounts,
+ }, nil
}
- jobKeys = jobKeys[offset:]
+ matchingIDs = matchingIDs[offset:]
// Apply limit
- if limit > 0 && len(jobKeys) > limit {
- jobKeys = jobKeys[:limit]
- }
-
- var jobs []*job.QueuedJob
- for _, key := range jobKeys {
- jobID := strings.TrimPrefix(key, "jobs.")
- jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, key, jobID)
- if err != nil {
- c.logger.Debug("failed to get job status during list",
- slog.String("job_id", jobID),
- slog.String("error", err.Error()),
- )
- continue
- }
- jobs = append(jobs, jobInfo)
- }
-
- if jobs == nil {
- jobs = []*job.QueuedJob{}
+ if len(matchingIDs) > limit {
+ matchingIDs = matchingIDs[:limit]
}
- return &ListJobsResult{
- Jobs: jobs,
- TotalCount: totalCount,
- }
-}
+ c.logger.Debug("kv.keys",
+ slog.Int("total_matching", totalCount),
+ slog.Int("page_size", len(matchingIDs)),
+ slog.Int("all_keys", len(allKeys)),
+ )
-// listJobsWithFilter handles pagination when a status filter is applied.
-func (c *Client) listJobsWithFilter(
- ctx context.Context,
- allKeys []string,
- jobKeys []string,
- statusFilter string,
- limit int,
- offset int,
-) *ListJobsResult {
+ // Pass 2: Heavy — kv.Get() only for the page
var jobs []*job.QueuedJob
- totalCount := 0
- skipped := 0
-
- for _, key := range jobKeys {
- jobID := strings.TrimPrefix(key, "jobs.")
- jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, key, jobID)
+ for _, id := range matchingIDs {
+ jobKey := "jobs." + id
+ jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, jobKey, id)
if err != nil {
c.logger.Debug("failed to get job status during list",
- slog.String("job_id", jobID),
+ slog.String("job_id", id),
slog.String("error", err.Error()),
)
continue
}
-
- if jobInfo.Status != statusFilter {
- continue
- }
-
- totalCount++
-
- // Skip offset items
- if skipped < offset {
- skipped++
- continue
- }
-
- // Collect items up to limit
- if limit == 0 || len(jobs) < limit {
- jobs = append(jobs, jobInfo)
- }
+ jobs = append(jobs, jobInfo)
}
if jobs == nil {
@@ -537,9 +371,10 @@ func (c *Client) listJobsWithFilter(
}
return &ListJobsResult{
- Jobs: jobs,
- TotalCount: totalCount,
- }
+ Jobs: jobs,
+ TotalCount: totalCount,
+ StatusCounts: statusCounts,
+ }, nil
}
// getJobStatusFromKeys builds a QueuedJob using pre-fetched keys (no inner kv.Keys() call).
@@ -883,6 +718,113 @@ func (c *Client) DeleteJob(
return nil
}
+// computeStatusFromKeyNames derives job statuses from KV key names only — no
+// kv.Get() calls. It returns ordered job IDs (newest-first from "jobs.*" keys)
+// and a map of per-job status computed using the same multi-agent logic as
+// computeStatusFromEvents but parsed entirely from key names.
+//
+// Key format: status....
+func computeStatusFromKeyNames(
+ keys []string,
+) ([]string, map[string]lightJobInfo) {
+ // Extract job IDs from jobs.* keys
+ var orderedJobIDs []string
+ for _, key := range keys {
+ if strings.HasPrefix(key, "jobs.") {
+ jobID := strings.TrimPrefix(key, "jobs.")
+ if jobID != "" {
+ orderedJobIDs = append(orderedJobIDs, jobID)
+ }
+ }
+ }
+
+ // Reverse for newest-first (KV insertion order is chronological)
+ for i, j := 0, len(orderedJobIDs)-1; i < j; i, j = i+1, j-1 {
+ orderedJobIDs[i], orderedJobIDs[j] = orderedJobIDs[j], orderedJobIDs[i]
+ }
+
+ // Parse status keys and track highest-priority event per (jobID, hostname)
+ statusPriority := map[string]int{
+ "submitted": 0,
+ "acknowledged": 1,
+ "started": 2,
+ "failed": 3,
+ "completed": 4,
+ "retried": 4,
+ }
+
+ // agentStates[jobID][hostname] = highest-priority event
+ agentStates := make(map[string]map[string]string)
+
+ for _, key := range keys {
+ if !strings.HasPrefix(key, "status.") {
+ continue
+ }
+ parts := strings.SplitN(key, ".", 5)
+ if len(parts) < 4 {
+ continue
+ }
+ jobID := parts[1]
+ event := parts[2]
+ hostname := parts[3]
+
+ if _, exists := agentStates[jobID]; !exists {
+ agentStates[jobID] = make(map[string]string)
+ }
+
+ cur, exists := agentStates[jobID][hostname]
+ if !exists || statusPriority[event] > statusPriority[cur] {
+ agentStates[jobID][hostname] = event
+ }
+ }
+
+ // Compute overall status per job using multi-agent logic
+ jobStatuses := make(map[string]lightJobInfo, len(agentStates))
+
+ for jobID, agents := range agentStates {
+ completed := 0
+ failed := 0
+ processing := 0
+ acknowledged := 0
+
+ for hostname, state := range agents {
+ if hostname == "_api" {
+ continue
+ }
+ switch state {
+ case "completed", "retried":
+ completed++
+ case "failed":
+ failed++
+ case "started":
+ processing++
+ case "acknowledged":
+ acknowledged++
+ }
+ }
+
+ totalAgents := completed + failed + processing + acknowledged
+
+ var status string
+ switch {
+ case totalAgents == 0:
+ status = "submitted"
+ case processing > 0 || acknowledged > 0:
+ status = "processing"
+ case failed > 0 && completed > 0:
+ status = "partial_failure"
+ case failed > 0:
+ status = "failed"
+ default:
+ status = "completed"
+ }
+
+ jobStatuses[jobID] = lightJobInfo{Status: status}
+ }
+
+ return orderedJobIDs, jobStatuses
+}
+
// getJobResponses retrieves response data for a specific job
func (c *Client) getJobResponses(
ctx context.Context,
diff --git a/internal/job/client/jobs_public_test.go b/internal/job/client/jobs_public_test.go
index 268075af..25426416 100644
--- a/internal/job/client/jobs_public_test.go
+++ b/internal/job/client/jobs_public_test.go
@@ -800,225 +800,6 @@ func (s *JobsPublicTestSuite) TestGetJobStatus() {
}
}
-func (s *JobsPublicTestSuite) TestGetQueueStats() {
- tests := []struct {
- name string
- expectedErr string
- setupMocks func()
- expectedJobs int
- expectedDLQ int
- }{
- {
- name: "no keys found",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return(nil, jetstream.ErrNoKeysFound)
- },
- expectedJobs: 0,
- expectedDLQ: 0,
- },
- {
- name: "keys error",
- expectedErr: "error fetching jobs",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return(nil, errors.New("connection failed"))
- },
- },
- {
- name: "get job error skipped",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
- s.mockKV.EXPECT().
- Get(gomock.Any(), "jobs.job-1").
- Return(nil, errors.New("kv error"))
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("no stream"))
- },
- expectedJobs: 0,
- },
- {
- name: "invalid job JSON skipped",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
-
- mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry.EXPECT().Value().Return([]byte(`not json`))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("no stream"))
- },
- expectedJobs: 0,
- },
- {
- name: "with DLQ info",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
-
- mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry.EXPECT().Value().Return([]byte(
- `{"id":"job-1","operation":{"type":"node.hostname.get"}}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 5}}, nil)
- },
- expectedJobs: 1,
- expectedDLQ: 5,
- },
- {
- name: "operation without type field",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
-
- mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry.EXPECT().Value().Return([]byte(
- `{"id":"job-1","operation":{"data":"some value"}}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("no stream"))
- },
- expectedJobs: 1,
- },
- {
- name: "operation as non-map value",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
-
- mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry.EXPECT().Value().Return([]byte(
- `{"id":"job-1","operation":"string-value"}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("no stream"))
- },
- expectedJobs: 1,
- },
- {
- name: "non-jobs prefix keys skipped",
- setupMocks: func() {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{
- "status.job-1.submitted._api.100",
- "responses.job-1.agent1.200",
- }, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("no stream"))
- },
- expectedJobs: 0,
- },
- {
- name: "with jobs and DLQ error",
- setupMocks: func() {
- keys := []string{"jobs.job-1", "jobs.job-2"}
- s.mockKV.EXPECT().Keys(gomock.Any()).Return(keys, nil)
-
- mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry1.EXPECT().
- Value().
- Return([]byte(`{"id":"job-1","operation":{"type":"node.hostname.get"}}`))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil)
-
- mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry2.EXPECT().
- Value().
- Return([]byte(`{"id":"job-2","operation":{"type":"node.status.get"}}`))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil)
-
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), "JOBS-DLQ").
- Return(nil, errors.New("stream not found"))
- },
- expectedJobs: 2,
- expectedDLQ: 0,
- },
- }
-
- for _, tt := range tests {
- s.Run(tt.name, func() {
- tt.setupMocks()
-
- stats, err := s.jobsClient.GetQueueStats(s.ctx)
-
- if tt.expectedErr != "" {
- s.Error(err)
- s.Contains(err.Error(), tt.expectedErr)
- } else {
- s.NoError(err)
- s.NotNil(stats)
- s.Equal(tt.expectedJobs, stats.TotalJobs)
- if tt.expectedDLQ > 0 {
- s.Equal(tt.expectedDLQ, stats.DLQCount)
- }
- }
- })
- }
-}
-
-func (s *JobsPublicTestSuite) TestGetQueueStatsDLQNameDerivedFromStreamName() {
- tests := []struct {
- name string
- streamName string
- expectedDLQ string
- setupMocks func(dlqName string)
- expectedMsgs int
- }{
- {
- name: "when stream name is JOBS",
- streamName: "JOBS",
- expectedDLQ: "JOBS-DLQ",
- setupMocks: func(dlqName string) {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{}, nil)
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), dlqName).
- Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 5}}, nil)
- },
- expectedMsgs: 5,
- },
- {
- name: "when stream name is namespaced",
- streamName: "osapi-JOBS",
- expectedDLQ: "osapi-JOBS-DLQ",
- setupMocks: func(dlqName string) {
- s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{}, nil)
- s.mockNATSClient.EXPECT().
- GetStreamInfo(gomock.Any(), dlqName).
- Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 3}}, nil)
- },
- expectedMsgs: 3,
- },
- }
-
- for _, tt := range tests {
- s.Run(tt.name, func() {
- tt.setupMocks(tt.expectedDLQ)
-
- opts := &client.Options{
- Timeout: 30 * time.Second,
- KVBucket: s.mockKV,
- StreamName: tt.streamName,
- }
- c, err := client.New(slog.Default(), s.mockNATSClient, opts)
- s.Require().NoError(err)
-
- stats, err := c.GetQueueStats(s.ctx)
- s.NoError(err)
- s.NotNil(stats)
- s.Equal(tt.expectedMsgs, stats.DLQCount)
- })
- }
-}
-
func (s *JobsPublicTestSuite) TestListJobs() {
tests := []struct {
name string
@@ -1047,7 +828,7 @@ func (s *JobsPublicTestSuite) TestListJobs() {
},
},
{
- name: "returns all jobs no limit",
+ name: "returns all jobs default limit",
setupMocks: func() {
s.mockKV.EXPECT().
Keys(gomock.Any()).
@@ -1072,13 +853,9 @@ func (s *JobsPublicTestSuite) TestListJobs() {
name: "filters out non matching status",
statusFilter: "completed",
setupMocks: func() {
+ // No status keys → default "submitted", doesn't match "completed"
+ // Two-pass: no kv.Get needed for filtering
s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil)
-
- mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry.EXPECT().Value().Return([]byte(
- `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil)
},
expectedJobs: 0,
expectedTotalCount: 0,
@@ -1194,17 +971,13 @@ func (s *JobsPublicTestSuite) TestListJobs() {
statusFilter: "submitted",
offset: 1,
setupMocks: func() {
+ // No status keys → all default "submitted"
s.mockKV.EXPECT().
Keys(gomock.Any()).
Return([]string{"jobs.job-1", "jobs.job-2", "jobs.job-3"}, nil)
- // Reversed: job-3, job-2, job-1 — all submitted
- mockEntry3 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry3.EXPECT().Value().Return([]byte(
- `{"id":"job-3","operation":{"type":"node.status.get"},"created":"2024-01-03T00:00:00Z"}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-3").Return(mockEntry3, nil)
-
+ // Reversed: job-3, job-2, job-1. Offset 1 → page = [job-2, job-1]
+ // Only page items get kv.Get in Pass 2
mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
mockEntry2.EXPECT().Value().Return([]byte(
`{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`,
@@ -1225,22 +998,18 @@ func (s *JobsPublicTestSuite) TestListJobs() {
statusFilter: "submitted",
limit: 1,
setupMocks: func() {
+ // No status keys → both default "submitted"
s.mockKV.EXPECT().
Keys(gomock.Any()).
Return([]string{"jobs.job-1", "jobs.job-2"}, nil)
- // Reversed: job-2, job-1 — both submitted
+ // Reversed: job-2, job-1. Limit 1 → page = [job-2]
+ // Only page item gets kv.Get in Pass 2
mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
mockEntry2.EXPECT().Value().Return([]byte(
`{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`,
))
s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil)
-
- mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
- mockEntry1.EXPECT().Value().Return([]byte(
- `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`,
- ))
- s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil)
},
expectedJobs: 1,
expectedTotalCount: 2,
@@ -1249,11 +1018,13 @@ func (s *JobsPublicTestSuite) TestListJobs() {
name: "filter skips jobs with get error",
statusFilter: "submitted",
setupMocks: func() {
+ // No status keys → both default "submitted"
s.mockKV.EXPECT().
Keys(gomock.Any()).
Return([]string{"jobs.job-bad", "jobs.job-good"}, nil)
- // Reversed: job-good, job-bad
+ // Reversed: job-good, job-bad — both match filter
+ // Pass 2: job-good Get fails, job-bad Get succeeds
s.mockKV.EXPECT().
Get(gomock.Any(), "jobs.job-good").
Return(nil, errors.New("kv error"))
@@ -1264,8 +1035,10 @@ func (s *JobsPublicTestSuite) TestListJobs() {
))
s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-bad").Return(mockEntry, nil)
},
- expectedJobs: 1,
- expectedTotalCount: 1,
+ expectedJobs: 1,
+ // totalCount is 2 because key-name-based counting finds
+ // both jobs matching the filter before Pass 2 Get errors
+ expectedTotalCount: 2,
},
{
name: "getJobStatusFromKeys with invalid JSON",
@@ -1281,6 +1054,31 @@ func (s *JobsPublicTestSuite) TestListJobs() {
expectedJobs: 0,
expectedTotalCount: 1,
},
+ {
+ name: "limit exceeding max capped to default",
+ limit: 200,
+ setupMocks: func() {
+ s.mockKV.EXPECT().
+ Keys(gomock.Any()).
+ Return([]string{"jobs.job-1", "jobs.job-2"}, nil)
+
+ // Limit 200 → capped to DefaultPageSize (10)
+ // Both jobs are within page
+ mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
+ mockEntry2.EXPECT().Value().Return([]byte(
+ `{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`,
+ ))
+ s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil)
+
+ mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl)
+ mockEntry1.EXPECT().Value().Return([]byte(
+ `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`,
+ ))
+ s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil)
+ },
+ expectedJobs: 2,
+ expectedTotalCount: 2,
+ },
{
name: "newest first ordering",
setupMocks: func() {
diff --git a/internal/job/client/jobs_test.go b/internal/job/client/jobs_test.go
new file mode 100644
index 00000000..ba42ecdf
--- /dev/null
+++ b/internal/job/client/jobs_test.go
@@ -0,0 +1,232 @@
+// Copyright (c) 2026 John Dewey
+
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to
+// deal in the Software without restriction, including without limitation the
+// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
+// sell copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
+// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
+// DEALINGS IN THE SOFTWARE.
+
+package client
+
+import (
+ "testing"
+
+ "github.com/stretchr/testify/suite"
+)
+
+type JobsTestSuite struct {
+ suite.Suite
+}
+
+func (s *JobsTestSuite) TestComputeStatusFromKeyNames() {
+ tests := []struct {
+ name string
+ keys []string
+ expectedOrderIDs []string
+ expectedStatuses map[string]string
+ validateOrderFunc func(ids []string)
+ }{
+ {
+ name: "empty keys",
+ keys: []string{},
+ expectedOrderIDs: nil,
+ expectedStatuses: map[string]string{},
+ },
+ {
+ name: "only jobs keys no status events",
+ keys: []string{"jobs.job-1", "jobs.job-2"},
+ expectedOrderIDs: []string{"job-2", "job-1"},
+ expectedStatuses: map[string]string{},
+ },
+ {
+ name: "single agent completed",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.acknowledged.agent1.101",
+ "status.job-1.started.agent1.102",
+ "status.job-1.completed.agent1.103",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ },
+ },
+ {
+ name: "single agent failed",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.acknowledged.agent1.101",
+ "status.job-1.started.agent1.102",
+ "status.job-1.failed.agent1.103",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "failed",
+ },
+ },
+ {
+ name: "single agent processing",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.acknowledged.agent1.101",
+ "status.job-1.started.agent1.102",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "processing",
+ },
+ },
+ {
+ name: "submitted only via api",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "submitted",
+ },
+ },
+ {
+ name: "acknowledged only agent shows processing",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.acknowledged.agent1.101",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "processing",
+ },
+ },
+ {
+ name: "multi-agent partial failure",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.completed.agent1.101",
+ "status.job-1.failed.agent2.102",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "partial_failure",
+ },
+ },
+ {
+ name: "multi-agent all completed",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.completed.agent1.101",
+ "status.job-1.completed.agent2.102",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ },
+ },
+ {
+ name: "multi-agent one still processing",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.completed.agent1.101",
+ "status.job-1.started.agent2.102",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "processing",
+ },
+ },
+ {
+ name: "retried counts as completed",
+ keys: []string{
+ "jobs.job-1",
+ "status.job-1.submitted._api.100",
+ "status.job-1.retried.agent1.101",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ },
+ },
+ {
+ name: "multiple jobs mixed statuses",
+ keys: []string{
+ "jobs.job-1",
+ "jobs.job-2",
+ "jobs.job-3",
+ "status.job-1.completed.agent1.101",
+ "status.job-2.started.agent1.201",
+ "status.job-3.failed.agent1.301",
+ },
+ expectedOrderIDs: []string{"job-3", "job-2", "job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ "job-2": "processing",
+ "job-3": "failed",
+ },
+ },
+ {
+ name: "malformed status key skipped",
+ keys: []string{
+ "jobs.job-1",
+ "status.incomplete",
+ "status.job-1.completed.agent1.101",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ },
+ },
+ {
+ name: "non-job non-status keys ignored",
+ keys: []string{
+ "jobs.job-1",
+ "responses.job-1.agent1.100",
+ "status.job-1.completed.agent1.101",
+ },
+ expectedOrderIDs: []string{"job-1"},
+ expectedStatuses: map[string]string{
+ "job-1": "completed",
+ },
+ },
+ {
+ name: "empty job ID after trim skipped",
+ keys: []string{"jobs."},
+ expectedOrderIDs: nil,
+ expectedStatuses: map[string]string{},
+ },
+ }
+
+ for _, tt := range tests {
+ s.Run(tt.name, func() {
+ orderedIDs, jobStatuses := computeStatusFromKeyNames(tt.keys)
+
+ s.Equal(tt.expectedOrderIDs, orderedIDs)
+
+ actualStatuses := make(map[string]string, len(jobStatuses))
+ for id, info := range jobStatuses {
+ actualStatuses[id] = info.Status
+ }
+ s.Equal(tt.expectedStatuses, actualStatuses)
+ })
+ }
+}
+
+func TestJobsTestSuite(t *testing.T) {
+ suite.Run(t, new(JobsTestSuite))
+}
diff --git a/internal/job/client/types.go b/internal/job/client/types.go
index b0d20aa4..ef854803 100644
--- a/internal/job/client/types.go
+++ b/internal/job/client/types.go
@@ -36,6 +36,13 @@ import (
"github.com/retr0h/osapi/internal/provider/node/mem"
)
+const (
+ // DefaultPageSize is the default number of jobs per page.
+ DefaultPageSize = 10
+ // MaxPageSize is the maximum allowed page size.
+ MaxPageSize = 100
+)
+
// JobClient defines the interface for interacting with the jobs system.
type JobClient interface {
// Job queue management operations
@@ -44,9 +51,6 @@ type JobClient interface {
operationData map[string]interface{},
targetHostname string,
) (*CreateJobResult, error)
- GetQueueStats(
- ctx context.Context,
- ) (*job.QueueStats, error)
GetQueueSummary(
ctx context.Context,
) (*job.QueueStats, error)
@@ -329,8 +333,9 @@ type CreateJobResult struct {
// ListJobsResult represents the result of listing jobs with pagination.
type ListJobsResult struct {
- Jobs []*job.QueuedJob
- TotalCount int
+ Jobs []*job.QueuedJob
+ TotalCount int
+ StatusCounts map[string]int
}
// computedJobStatus represents the computed status from events
@@ -343,5 +348,10 @@ type computedJobStatus struct {
Timeline []job.TimelineEvent
}
+// lightJobInfo holds status derived from KV key names only (no reads).
+type lightJobInfo struct {
+ Status string
+}
+
// Ensure Client implements JobClient interface
var _ JobClient = (*Client)(nil)
diff --git a/internal/job/mocks/job_client.gen.go b/internal/job/mocks/job_client.gen.go
index ea8ad1d2..9e021854 100644
--- a/internal/job/mocks/job_client.gen.go
+++ b/internal/job/mocks/job_client.gen.go
@@ -190,21 +190,6 @@ func (mr *MockJobClientMockRecorder) GetJobStatus(arg0, arg1 interface{}) *gomoc
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJobStatus", reflect.TypeOf((*MockJobClient)(nil).GetJobStatus), arg0, arg1)
}
-// GetQueueStats mocks base method.
-func (m *MockJobClient) GetQueueStats(arg0 context.Context) (*job.QueueStats, error) {
- m.ctrl.T.Helper()
- ret := m.ctrl.Call(m, "GetQueueStats", arg0)
- ret0, _ := ret[0].(*job.QueueStats)
- ret1, _ := ret[1].(error)
- return ret0, ret1
-}
-
-// GetQueueStats indicates an expected call of GetQueueStats.
-func (mr *MockJobClientMockRecorder) GetQueueStats(arg0 interface{}) *gomock.Call {
- mr.mock.ctrl.T.Helper()
- return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueStats", reflect.TypeOf((*MockJobClient)(nil).GetQueueStats), arg0)
-}
-
// GetQueueSummary mocks base method.
func (m *MockJobClient) GetQueueSummary(arg0 context.Context) (*job.QueueStats, error) {
m.ctrl.T.Helper()
diff --git a/internal/job/types.go b/internal/job/types.go
index 196bd095..a85edaee 100644
--- a/internal/job/types.go
+++ b/internal/job/types.go
@@ -191,10 +191,9 @@ type TimelineEvent struct {
// QueueStats represents statistics about the job queue.
type QueueStats struct {
- TotalJobs int `json:"total_jobs"`
- StatusCounts map[string]int `json:"status_counts"`
- OperationCounts map[string]int `json:"operation_counts"`
- DLQCount int `json:"dlq_count"`
+ TotalJobs int `json:"total_jobs"`
+ StatusCounts map[string]int `json:"status_counts"`
+ DLQCount int `json:"dlq_count"`
}
// Operation data structures for specific operations