From d1c98cb503ffc1e031183413464ac1dd3abed737 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Sat, 7 Mar 2026 10:50:10 -0800 Subject: [PATCH 1/3] perf(job): two-pass listing with pagination limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace per-job kv.Get() calls during filtered listing with a key-name- only status derivation pass, then fetch full details only for the paginated page. Extract computeStatusFromKeyNames pure function shared by both ListJobs and GetQueueSummary. Enforce max page size (1-100) at the OpenAPI, handler, and client levels. Also add hostname validation to agent domain handlers (get, drain, undrain) matching the node domain pattern, restore defense-in-depth validation in the file upload handler, and add corresponding tests. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cmd/client_job_list.go | 2 +- internal/api/agent/agent_drain.go | 4 + internal/api/agent/agent_drain_public_test.go | 39 ++- internal/api/agent/agent_get.go | 4 + internal/api/agent/agent_get_public_test.go | 37 ++- internal/api/agent/agent_undrain.go | 4 + .../api/agent/agent_undrain_public_test.go | 39 ++- internal/api/agent/gen/agent.gen.go | 27 ++ internal/api/agent/gen/api.yaml | 39 +++ internal/api/agent/validate.go | 34 ++ internal/api/file/file_upload.go | 8 + internal/api/file/gen/api.yaml | 10 +- internal/api/file/gen/file.gen.go | 2 +- internal/api/gen/api.yaml | 39 ++- internal/api/job/gen/api.yaml | 7 +- internal/api/job/gen/job.gen.go | 4 +- internal/api/job/job_list.go | 9 +- internal/api/job/job_list_public_test.go | 48 +++ internal/job/client/jobs.go | 298 +++++++++--------- internal/job/client/jobs_public_test.go | 65 ++-- internal/job/client/jobs_test.go | 232 ++++++++++++++ internal/job/client/types.go | 12 + 22 files changed, 768 insertions(+), 195 deletions(-) create mode 100644 internal/api/agent/validate.go create mode 100644 internal/job/client/jobs_test.go diff --git a/cmd/client_job_list.go b/cmd/client_job_list.go index abf9d105..d05fc21a 100644 --- a/cmd/client_job_list.go +++ b/cmd/client_job_list.go @@ -186,6 +186,6 @@ func init() { clientJobListCmd.Flags(). String("status", "", "Filter jobs by status (submitted, processing, completed, failed, partial_failure)") - clientJobListCmd.Flags().Int("limit", 10, "Limit number of jobs displayed (0 for no limit)") + clientJobListCmd.Flags().Int("limit", 10, "Maximum number of jobs per page (1-100, default 10)") clientJobListCmd.Flags().Int("offset", 0, "Skip the first N jobs (for pagination)") } diff --git a/internal/api/agent/agent_drain.go b/internal/api/agent/agent_drain.go index ee3c1276..9bedd39a 100644 --- a/internal/api/agent/agent_drain.go +++ b/internal/api/agent/agent_drain.go @@ -34,6 +34,10 @@ func (a *Agent) DrainAgent( ctx context.Context, request gen.DrainAgentRequestObject, ) (gen.DrainAgentResponseObject, error) { + if errMsg, ok := validateHostname(request.Hostname); !ok { + return gen.DrainAgent400JSONResponse{Error: &errMsg}, nil + } + hostname := request.Hostname agentInfo, err := a.JobClient.GetAgent(ctx, hostname) diff --git a/internal/api/agent/agent_drain_public_test.go b/internal/api/agent/agent_drain_public_test.go index 8bd310ae..002e8a3e 100644 --- a/internal/api/agent/agent_drain_public_test.go +++ b/internal/api/agent/agent_drain_public_test.go @@ -27,6 +27,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "github.com/golang/mock/gomock" @@ -69,6 +70,7 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() { tests := []struct { name string hostname string + skipMock bool mockAgent *jobtypes.AgentInfo mockGetErr error mockWriteErr error @@ -77,6 +79,26 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() { mockSetDrainErr error validateFunc func(resp gen.DrainAgentResponseObject) }{ + { + name: "returns 400 when hostname is empty", + hostname: "", + skipMock: true, + skipWrite: true, + validateFunc: func(resp gen.DrainAgentResponseObject) { + _, ok := resp.(gen.DrainAgent400JSONResponse) + s.True(ok) + }, + }, + { + name: "returns 400 when hostname exceeds max length", + hostname: strings.Repeat("a", 256), + skipMock: true, + skipWrite: true, + validateFunc: func(resp gen.DrainAgentResponseObject) { + _, ok := resp.(gen.DrainAgent400JSONResponse) + s.True(ok) + }, + }, { name: "success drains agent", hostname: "server1", @@ -176,9 +198,11 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgent() { for _, tt := range tests { s.Run(tt.name, func() { - s.mockJobClient.EXPECT(). - GetAgent(gomock.Any(), tt.hostname). - Return(tt.mockAgent, tt.mockGetErr) + if !tt.skipMock { + s.mockJobClient.EXPECT(). + GetAgent(gomock.Any(), tt.hostname). + Return(tt.mockAgent, tt.mockGetErr) + } if tt.mockSetDrain { s.mockJobClient.EXPECT(). @@ -209,6 +233,15 @@ func (s *AgentDrainPublicTestSuite) TestDrainAgentHTTP() { wantCode int wantContains []string }{ + { + name: "when hostname exceeds max length returns 400", + hostname: strings.Repeat("a", 256), + setupJobMock: func() *jobmocks.MockJobClient { + return jobmocks.NewMockJobClient(s.mockCtrl) + }, + wantCode: http.StatusBadRequest, + wantContains: []string{`"error"`}, + }, { name: "when agent exists returns 200", hostname: "server1", diff --git a/internal/api/agent/agent_get.go b/internal/api/agent/agent_get.go index 3dc6d124..be5e733d 100644 --- a/internal/api/agent/agent_get.go +++ b/internal/api/agent/agent_get.go @@ -32,6 +32,10 @@ func (a *Agent) GetAgentDetails( ctx context.Context, request gen.GetAgentDetailsRequestObject, ) (gen.GetAgentDetailsResponseObject, error) { + if errMsg, ok := validateHostname(request.Hostname); !ok { + return gen.GetAgentDetails400JSONResponse{Error: &errMsg}, nil + } + agentInfo, err := a.JobClient.GetAgent(ctx, request.Hostname) if err != nil { errMsg := err.Error() diff --git a/internal/api/agent/agent_get_public_test.go b/internal/api/agent/agent_get_public_test.go index 516dd805..22e846a4 100644 --- a/internal/api/agent/agent_get_public_test.go +++ b/internal/api/agent/agent_get_public_test.go @@ -27,6 +27,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "time" @@ -73,10 +74,29 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetails() { tests := []struct { name string hostname string + skipMock bool mockAgent *jobtypes.AgentInfo mockError error validateFunc func(resp gen.GetAgentDetailsResponseObject) }{ + { + name: "returns 400 when hostname is empty", + hostname: "", + skipMock: true, + validateFunc: func(resp gen.GetAgentDetailsResponseObject) { + _, ok := resp.(gen.GetAgentDetails400JSONResponse) + s.True(ok) + }, + }, + { + name: "returns 400 when hostname exceeds max length", + hostname: strings.Repeat("a", 256), + skipMock: true, + validateFunc: func(resp gen.GetAgentDetailsResponseObject) { + _, ok := resp.(gen.GetAgentDetails400JSONResponse) + s.True(ok) + }, + }, { name: "success returns agent details", hostname: "server1", @@ -125,9 +145,11 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetails() { for _, tt := range tests { s.Run(tt.name, func() { - s.mockJobClient.EXPECT(). - GetAgent(gomock.Any(), tt.hostname). - Return(tt.mockAgent, tt.mockError) + if !tt.skipMock { + s.mockJobClient.EXPECT(). + GetAgent(gomock.Any(), tt.hostname). + Return(tt.mockAgent, tt.mockError) + } resp, err := s.handler.GetAgentDetails(s.ctx, gen.GetAgentDetailsRequestObject{ Hostname: tt.hostname, @@ -146,6 +168,15 @@ func (s *AgentGetPublicTestSuite) TestGetAgentDetailsHTTP() { wantCode int wantContains []string }{ + { + name: "when hostname exceeds max length returns 400", + hostname: strings.Repeat("a", 256), + setupJobMock: func() *jobmocks.MockJobClient { + return jobmocks.NewMockJobClient(s.mockCtrl) + }, + wantCode: http.StatusBadRequest, + wantContains: []string{`"error"`}, + }, { name: "when agent exists returns details", hostname: "server1", diff --git a/internal/api/agent/agent_undrain.go b/internal/api/agent/agent_undrain.go index 67bad3cc..84b9e376 100644 --- a/internal/api/agent/agent_undrain.go +++ b/internal/api/agent/agent_undrain.go @@ -34,6 +34,10 @@ func (a *Agent) UndrainAgent( ctx context.Context, request gen.UndrainAgentRequestObject, ) (gen.UndrainAgentResponseObject, error) { + if errMsg, ok := validateHostname(request.Hostname); !ok { + return gen.UndrainAgent400JSONResponse{Error: &errMsg}, nil + } + hostname := request.Hostname agentInfo, err := a.JobClient.GetAgent(ctx, hostname) diff --git a/internal/api/agent/agent_undrain_public_test.go b/internal/api/agent/agent_undrain_public_test.go index 8050aacb..3ebab922 100644 --- a/internal/api/agent/agent_undrain_public_test.go +++ b/internal/api/agent/agent_undrain_public_test.go @@ -27,6 +27,7 @@ import ( "net/http" "net/http/httptest" "os" + "strings" "testing" "github.com/golang/mock/gomock" @@ -69,6 +70,7 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() { tests := []struct { name string hostname string + skipMock bool mockAgent *jobtypes.AgentInfo mockGetErr error mockWriteErr error @@ -77,6 +79,26 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() { mockDeleteDrainErr error validateFunc func(resp gen.UndrainAgentResponseObject) }{ + { + name: "returns 400 when hostname is empty", + hostname: "", + skipMock: true, + skipWrite: true, + validateFunc: func(resp gen.UndrainAgentResponseObject) { + _, ok := resp.(gen.UndrainAgent400JSONResponse) + s.True(ok) + }, + }, + { + name: "returns 400 when hostname exceeds max length", + hostname: strings.Repeat("a", 256), + skipMock: true, + skipWrite: true, + validateFunc: func(resp gen.UndrainAgentResponseObject) { + _, ok := resp.(gen.UndrainAgent400JSONResponse) + s.True(ok) + }, + }, { name: "success undrains draining agent", hostname: "server1", @@ -190,9 +212,11 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgent() { for _, tt := range tests { s.Run(tt.name, func() { - s.mockJobClient.EXPECT(). - GetAgent(gomock.Any(), tt.hostname). - Return(tt.mockAgent, tt.mockGetErr) + if !tt.skipMock { + s.mockJobClient.EXPECT(). + GetAgent(gomock.Any(), tt.hostname). + Return(tt.mockAgent, tt.mockGetErr) + } if tt.mockDeleteDrain { s.mockJobClient.EXPECT(). @@ -223,6 +247,15 @@ func (s *AgentUndrainPublicTestSuite) TestUndrainAgentHTTP() { wantCode int wantContains []string }{ + { + name: "when hostname exceeds max length returns 400", + hostname: strings.Repeat("a", 256), + setupJobMock: func() *jobmocks.MockJobClient { + return jobmocks.NewMockJobClient(s.mockCtrl) + }, + wantCode: http.StatusBadRequest, + wantContains: []string{`"error"`}, + }, { name: "when draining agent exists returns 200", hostname: "server1", diff --git a/internal/api/agent/gen/agent.gen.go b/internal/api/agent/gen/agent.gen.go index 7eb96e5e..9c0f4d19 100644 --- a/internal/api/agent/gen/agent.gen.go +++ b/internal/api/agent/gen/agent.gen.go @@ -399,6 +399,15 @@ func (response GetAgentDetails200JSONResponse) VisitGetAgentDetailsResponse(w ht return json.NewEncoder(w).Encode(response) } +type GetAgentDetails400JSONResponse externalRef0.ErrorResponse + +func (response GetAgentDetails400JSONResponse) VisitGetAgentDetailsResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + type GetAgentDetails401JSONResponse externalRef0.ErrorResponse func (response GetAgentDetails401JSONResponse) VisitGetAgentDetailsResponse(w http.ResponseWriter) error { @@ -454,6 +463,15 @@ func (response DrainAgent200JSONResponse) VisitDrainAgentResponse(w http.Respons return json.NewEncoder(w).Encode(response) } +type DrainAgent400JSONResponse externalRef0.ErrorResponse + +func (response DrainAgent400JSONResponse) VisitDrainAgentResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + type DrainAgent401JSONResponse externalRef0.ErrorResponse func (response DrainAgent401JSONResponse) VisitDrainAgentResponse(w http.ResponseWriter) error { @@ -509,6 +527,15 @@ func (response UndrainAgent200JSONResponse) VisitUndrainAgentResponse(w http.Res return json.NewEncoder(w).Encode(response) } +type UndrainAgent400JSONResponse externalRef0.ErrorResponse + +func (response UndrainAgent400JSONResponse) VisitUndrainAgentResponse(w http.ResponseWriter) error { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(400) + + return json.NewEncoder(w).Encode(response) +} + type UndrainAgent401JSONResponse externalRef0.ErrorResponse func (response UndrainAgent401JSONResponse) VisitUndrainAgentResponse(w http.ResponseWriter) error { diff --git a/internal/api/agent/gen/api.yaml b/internal/api/agent/gen/api.yaml index 6fa564aa..1b56179c 100644 --- a/internal/api/agent/gen/api.yaml +++ b/internal/api/agent/gen/api.yaml @@ -76,8 +76,15 @@ paths: - name: hostname in: path required: true + # NOTE: x-oapi-codegen-extra-tags on path params do not generate + # validate tags in strict-server mode. Validation is handled + # manually in the handler. + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to retrieve. responses: '200': @@ -86,6 +93,12 @@ paths: application/json: schema: $ref: '#/components/schemas/AgentInfo' + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: @@ -126,8 +139,15 @@ paths: - name: hostname in: path required: true + # NOTE: x-oapi-codegen-extra-tags on path params do not generate + # validate tags in strict-server mode. Validation is handled + # manually in the handler. + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to drain. responses: '200': @@ -141,6 +161,12 @@ paths: type: string required: - message + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: @@ -179,8 +205,15 @@ paths: - name: hostname in: path required: true + # NOTE: x-oapi-codegen-extra-tags on path params do not generate + # validate tags in strict-server mode. Validation is handled + # manually in the handler. + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to undrain. responses: '200': @@ -194,6 +227,12 @@ paths: type: string required: - message + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '../../common/gen/api.yaml#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: diff --git a/internal/api/agent/validate.go b/internal/api/agent/validate.go new file mode 100644 index 00000000..c9213137 --- /dev/null +++ b/internal/api/agent/validate.go @@ -0,0 +1,34 @@ +// Copyright (c) 2026 John Dewey + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +package agent + +import "github.com/retr0h/osapi/internal/validation" + +// validateHostname validates a hostname path parameter using the shared +// validator. Returns the error message and false if invalid. +// +// This exists because oapi-codegen does not generate validate tags on +// path parameters in strict-server mode (upstream limitation). +func validateHostname( + hostname string, +) (string, bool) { + return validation.Var(hostname, "required,min=1,max=255") +} diff --git a/internal/api/file/file_upload.go b/internal/api/file/file_upload.go index a6f9cb83..87a4cf7b 100644 --- a/internal/api/file/file_upload.go +++ b/internal/api/file/file_upload.go @@ -33,6 +33,7 @@ import ( "github.com/nats-io/nats.go/jetstream" "github.com/retr0h/osapi/internal/api/file/gen" + "github.com/retr0h/osapi/internal/validation" ) // PostFile upload a file to the Object Store via multipart/form-data. @@ -40,6 +41,13 @@ func (f *File) PostFile( ctx context.Context, request gen.PostFileRequestObject, ) (gen.PostFileResponseObject, error) { + // Defense-in-depth: the OpenAPI validator handles param validation before + // the handler runs, but we validate here too so the plumbing is in place + // if a future param adds stricter tags. + if errMsg, ok := validation.Struct(request.Params); !ok { + return gen.PostFile400JSONResponse{Error: &errMsg}, nil + } + name, contentType, fileData, errResp := f.parseMultipart(request) if errResp != nil { return errResp, nil diff --git a/internal/api/file/gen/api.yaml b/internal/api/file/gen/api.yaml index 50637b35..582ee986 100644 --- a/internal/api/file/gen/api.yaml +++ b/internal/api/file/gen/api.yaml @@ -43,14 +43,8 @@ paths: - name: force in: query required: false - # x-oapi-codegen-extra-tags: - # validate: omitempty - # NOTE: The tags above are intentionally commented out. The - # only param is an optional bool — validate:"omitempty" can - # never fail, so validation.Struct(Params) would be dead - # code. The middleware already type-validates the bool before - # the handler runs. Uncomment if a future param needs a - # stricter validate tag. + x-oapi-codegen-extra-tags: + validate: omitempty description: > When true, bypass the digest check and always write the file. Returns changed=true regardless of whether the diff --git a/internal/api/file/gen/file.gen.go b/internal/api/file/gen/file.gen.go index 0377146e..3287df6f 100644 --- a/internal/api/file/gen/file.gen.go +++ b/internal/api/file/gen/file.gen.go @@ -114,7 +114,7 @@ type PostFileMultipartBody struct { // PostFileParams defines parameters for PostFile. type PostFileParams struct { // Force When true, bypass the digest check and always write the file. Returns changed=true regardless of whether the content differs from the existing object. - Force *bool `form:"force,omitempty" json:"force,omitempty"` + Force *bool `form:"force,omitempty" json:"force,omitempty" validate:"omitempty"` } // PostFileMultipartBodyContentType defines parameters for PostFile. diff --git a/internal/api/gen/api.yaml b/internal/api/gen/api.yaml index 36c5a29e..94e578f6 100644 --- a/internal/api/gen/api.yaml +++ b/internal/api/gen/api.yaml @@ -87,8 +87,12 @@ paths: - name: hostname in: path required: true + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to retrieve. responses: '200': @@ -97,6 +101,12 @@ paths: application/json: schema: $ref: '#/components/schemas/AgentInfo' + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: @@ -138,8 +148,12 @@ paths: - name: hostname in: path required: true + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to drain. responses: '200': @@ -153,6 +167,12 @@ paths: type: string required: - message + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: @@ -192,8 +212,12 @@ paths: - name: hostname in: path required: true + x-oapi-codegen-extra-tags: + validate: required,min=1,max=255 schema: type: string + minLength: 1 + maxLength: 255 description: The hostname of the agent to undrain. responses: '200': @@ -207,6 +231,12 @@ paths: type: string required: - message + '400': + description: Invalid hostname. + content: + application/json: + schema: + $ref: '#/components/schemas/ErrorResponse' '401': description: Unauthorized - API key required content: @@ -412,6 +442,8 @@ paths: - name: force in: query required: false + x-oapi-codegen-extra-tags: + validate: omitempty description: > When true, bypass the digest check and always write the file. Returns changed=true regardless of whether the content differs from @@ -770,12 +802,13 @@ paths: in: query required: false x-oapi-codegen-extra-tags: - validate: omitempty,min=0 + validate: omitempty,min=1,max=100 schema: type: integer - minimum: 0 + minimum: 1 + maximum: 100 default: 10 - description: Maximum number of jobs to return. Use 0 for no limit. + description: Maximum number of jobs per page (1-100). - name: offset in: query required: false diff --git a/internal/api/job/gen/api.yaml b/internal/api/job/gen/api.yaml index 664cf1f8..3803cbf7 100644 --- a/internal/api/job/gen/api.yaml +++ b/internal/api/job/gen/api.yaml @@ -102,12 +102,13 @@ paths: in: query required: false x-oapi-codegen-extra-tags: - validate: omitempty,min=0 + validate: omitempty,min=1,max=100 schema: type: integer - minimum: 0 + minimum: 1 + maximum: 100 default: 10 - description: Maximum number of jobs to return. Use 0 for no limit. + description: Maximum number of jobs per page (1-100). - name: offset in: query required: false diff --git a/internal/api/job/gen/job.gen.go b/internal/api/job/gen/job.gen.go index a72f4382..d760eb69 100644 --- a/internal/api/job/gen/job.gen.go +++ b/internal/api/job/gen/job.gen.go @@ -151,8 +151,8 @@ type GetJobParams struct { // Status Filter jobs by status. Status *GetJobParamsStatus `form:"status,omitempty" json:"status,omitempty" validate:"omitempty,oneof=submitted processing completed failed partial_failure"` - // Limit Maximum number of jobs to return. Use 0 for no limit. - Limit *int `form:"limit,omitempty" json:"limit,omitempty" validate:"omitempty,min=0"` + // Limit Maximum number of jobs per page (1-100). + Limit *int `form:"limit,omitempty" json:"limit,omitempty" validate:"omitempty,min=1,max=100"` // Offset Number of jobs to skip for pagination. Offset *int `form:"offset,omitempty" json:"offset,omitempty" validate:"omitempty,min=0"` diff --git a/internal/api/job/job_list.go b/internal/api/job/job_list.go index 998e4e12..dadc8359 100644 --- a/internal/api/job/job_list.go +++ b/internal/api/job/job_list.go @@ -27,6 +27,7 @@ import ( "github.com/google/uuid" "github.com/retr0h/osapi/internal/api/job/gen" + "github.com/retr0h/osapi/internal/job/client" "github.com/retr0h/osapi/internal/validation" ) @@ -44,10 +45,16 @@ func (j *Job) GetJob( statusFilter = string(*request.Params.Status) } - limit := 10 + limit := client.DefaultPageSize if request.Params.Limit != nil { limit = *request.Params.Limit } + // Defense-in-depth: the OpenAPI validator rejects out-of-range limits + // before the handler runs, but we cap here too in case validation is + // ever misconfigured. ListJobs also caps internally. + if limit <= 0 || limit > client.MaxPageSize { + limit = client.DefaultPageSize + } offset := 0 if request.Params.Offset != nil { diff --git a/internal/api/job/job_list_public_test.go b/internal/api/job/job_list_public_test.go index 873dff7e..10489653 100644 --- a/internal/api/job/job_list_public_test.go +++ b/internal/api/job/job_list_public_test.go @@ -108,6 +108,36 @@ func (s *JobListPublicTestSuite) TestGetJob() { s.NotNil(r.Error) }, }, + { + name: "returns 400 when limit is zero", + request: func() gen.GetJobRequestObject { + l := 0 + return gen.GetJobRequestObject{ + Params: gen.GetJobParams{Limit: &l}, + } + }(), + expectMock: false, + validateFunc: func(resp gen.GetJobResponseObject) { + r, ok := resp.(gen.GetJob400JSONResponse) + s.True(ok) + s.NotNil(r.Error) + }, + }, + { + name: "returns 400 when limit exceeds max", + request: func() gen.GetJobRequestObject { + l := 101 + return gen.GetJobRequestObject{ + Params: gen.GetJobParams{Limit: &l}, + } + }(), + expectMock: false, + validateFunc: func(resp gen.GetJobResponseObject) { + r, ok := resp.(gen.GetJob400JSONResponse) + s.True(ok) + s.NotNil(r.Error) + }, + }, { name: "returns 400 when offset is negative", request: func() gen.GetJobRequestObject { @@ -341,6 +371,24 @@ func (s *JobListPublicTestSuite) TestListJobsValidationHTTP() { wantCode: http.StatusBadRequest, wantContains: []string{`"error"`}, }, + { + name: "when limit=0 returns 400", + query: "?limit=0", + setupJobMock: func() *jobmocks.MockJobClient { + return jobmocks.NewMockJobClient(s.mockCtrl) + }, + wantCode: http.StatusBadRequest, + wantContains: []string{`"error"`}, + }, + { + name: "when limit exceeds max returns 400", + query: "?limit=101", + setupJobMock: func() *jobmocks.MockJobClient { + return jobmocks.NewMockJobClient(s.mockCtrl) + }, + wantCode: http.StatusBadRequest, + wantContains: []string{`"error"`}, + }, { name: "when valid limit and offset", query: "?limit=5&offset=10", diff --git a/internal/job/client/jobs.go b/internal/job/client/jobs.go index 3ffa1843..11202401 100644 --- a/internal/job/client/jobs.go +++ b/internal/job/client/jobs.go @@ -259,54 +259,18 @@ func (c *Client) GetQueueSummary( return nil, fmt.Errorf("error fetching keys: %w", err) } - // Collect job IDs and their highest-priority status from key names. - // Key format: status.... - // Status priority: completed > failed > started > acknowledged > submitted - statusPriority := map[string]int{ - "submitted": 0, - "acknowledged": 1, - "started": 2, - "failed": 3, - "completed": 4, - "retried": 4, - } - - jobStatuses := make(map[string]string) // jobID -> highest status - for _, key := range keys { - if !strings.HasPrefix(key, "status.") { - continue - } - parts := strings.SplitN(key, ".", 4) - if len(parts) < 3 { - continue - } - jobID := parts[1] - event := parts[2] - - cur, exists := jobStatuses[jobID] - if !exists || statusPriority[event] > statusPriority[cur] { - jobStatuses[jobID] = event - } - } + _, jobStatuses := computeStatusFromKeyNames(keys) statusCounts := map[string]int{ - "submitted": 0, - "processing": 0, - "completed": 0, - "failed": 0, + "submitted": 0, + "processing": 0, + "completed": 0, + "failed": 0, + "partial_failure": 0, } - for _, status := range jobStatuses { - switch status { - case "completed", "retried": - statusCounts["completed"]++ - case "failed": - statusCounts["failed"]++ - case "started", "acknowledged": - statusCounts["processing"]++ - default: - statusCounts["submitted"]++ - } + for _, info := range jobStatuses { + statusCounts[info.Status]++ } total := len(jobStatuses) @@ -387,7 +351,8 @@ func (c *Client) GetJobStatus( } // ListJobs returns jobs filtered by status with server-side pagination. -// limit=0 means no limit. offset=0 means start from beginning. +// Uses a two-pass approach: Pass 1 derives status from key names only (fast), +// Pass 2 fetches full details for the paginated page only. // Jobs are returned newest-first (reverse insertion order). func (c *Client) ListJobs( ctx context.Context, @@ -395,12 +360,18 @@ func (c *Client) ListJobs( limit int, offset int, ) (*ListJobsResult, error) { + // Cap limit to MaxPageSize + if limit <= 0 || limit > MaxPageSize { + limit = DefaultPageSize + } + c.logger.Debug("kv.keys", slog.String("operation", "list_jobs"), slog.String("status_filter", statusFilter), slog.Int("limit", limit), slog.Int("offset", offset), ) + allKeys, err := c.kv.Keys(ctx) if err != nil { if errors.Is(err, jetstream.ErrNoKeysFound) { @@ -412,124 +383,58 @@ func (c *Client) ListJobs( return nil, fmt.Errorf("error fetching jobs: %w", err) } - // Extract job keys and reverse for newest-first ordering - var jobKeys []string - for _, key := range allKeys { - if strings.HasPrefix(key, "jobs.") { - jobID := strings.TrimPrefix(key, "jobs.") - if jobID != "" { - jobKeys = append(jobKeys, key) + // Pass 1: Light — key names only, no kv.Get() + orderedJobIDs, jobStatuses := computeStatusFromKeyNames(allKeys) + + // Filter + count (fast, no reads) + var matchingIDs []string + for _, id := range orderedJobIDs { + if statusFilter != "" { + info, exists := jobStatuses[id] + if !exists { + info = lightJobInfo{Status: "submitted"} + } + if info.Status != statusFilter { + continue } } + matchingIDs = append(matchingIDs, id) } - - // Reverse for newest-first (KV insertion order is chronological) - for i, j := 0, len(jobKeys)-1; i < j; i, j = i+1, j-1 { - jobKeys[i], jobKeys[j] = jobKeys[j], jobKeys[i] - } - - c.logger.Debug("kv.keys", - slog.Int("total_job_keys", len(jobKeys)), - slog.Int("all_keys", len(allKeys)), - ) - - // No status filter: we know the total count immediately - if statusFilter == "" { - return c.listJobsNoFilter(ctx, allKeys, jobKeys, limit, offset), nil - } - - // With status filter: scan all jobs to count matches and collect page - return c.listJobsWithFilter(ctx, allKeys, jobKeys, statusFilter, limit, offset), nil -} - -// listJobsNoFilter handles pagination when no status filter is applied. -func (c *Client) listJobsNoFilter( - ctx context.Context, - allKeys []string, - jobKeys []string, - limit int, - offset int, -) *ListJobsResult { - totalCount := len(jobKeys) + totalCount := len(matchingIDs) // Apply offset - if offset >= len(jobKeys) { + if offset >= len(matchingIDs) { return &ListJobsResult{ Jobs: []*job.QueuedJob{}, TotalCount: totalCount, - } + }, nil } - jobKeys = jobKeys[offset:] + matchingIDs = matchingIDs[offset:] // Apply limit - if limit > 0 && len(jobKeys) > limit { - jobKeys = jobKeys[:limit] + if len(matchingIDs) > limit { + matchingIDs = matchingIDs[:limit] } - var jobs []*job.QueuedJob - for _, key := range jobKeys { - jobID := strings.TrimPrefix(key, "jobs.") - jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, key, jobID) - if err != nil { - c.logger.Debug("failed to get job status during list", - slog.String("job_id", jobID), - slog.String("error", err.Error()), - ) - continue - } - jobs = append(jobs, jobInfo) - } - - if jobs == nil { - jobs = []*job.QueuedJob{} - } - - return &ListJobsResult{ - Jobs: jobs, - TotalCount: totalCount, - } -} + c.logger.Debug("kv.keys", + slog.Int("total_matching", totalCount), + slog.Int("page_size", len(matchingIDs)), + slog.Int("all_keys", len(allKeys)), + ) -// listJobsWithFilter handles pagination when a status filter is applied. -func (c *Client) listJobsWithFilter( - ctx context.Context, - allKeys []string, - jobKeys []string, - statusFilter string, - limit int, - offset int, -) *ListJobsResult { + // Pass 2: Heavy — kv.Get() only for the page var jobs []*job.QueuedJob - totalCount := 0 - skipped := 0 - - for _, key := range jobKeys { - jobID := strings.TrimPrefix(key, "jobs.") - jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, key, jobID) + for _, id := range matchingIDs { + jobKey := "jobs." + id + jobInfo, err := c.getJobStatusFromKeys(ctx, allKeys, jobKey, id) if err != nil { c.logger.Debug("failed to get job status during list", - slog.String("job_id", jobID), + slog.String("job_id", id), slog.String("error", err.Error()), ) continue } - - if jobInfo.Status != statusFilter { - continue - } - - totalCount++ - - // Skip offset items - if skipped < offset { - skipped++ - continue - } - - // Collect items up to limit - if limit == 0 || len(jobs) < limit { - jobs = append(jobs, jobInfo) - } + jobs = append(jobs, jobInfo) } if jobs == nil { @@ -539,7 +444,7 @@ func (c *Client) listJobsWithFilter( return &ListJobsResult{ Jobs: jobs, TotalCount: totalCount, - } + }, nil } // getJobStatusFromKeys builds a QueuedJob using pre-fetched keys (no inner kv.Keys() call). @@ -883,6 +788,113 @@ func (c *Client) DeleteJob( return nil } +// computeStatusFromKeyNames derives job statuses from KV key names only — no +// kv.Get() calls. It returns ordered job IDs (newest-first from "jobs.*" keys) +// and a map of per-job status computed using the same multi-agent logic as +// computeStatusFromEvents but parsed entirely from key names. +// +// Key format: status.... +func computeStatusFromKeyNames( + keys []string, +) ([]string, map[string]lightJobInfo) { + // Extract job IDs from jobs.* keys + var orderedJobIDs []string + for _, key := range keys { + if strings.HasPrefix(key, "jobs.") { + jobID := strings.TrimPrefix(key, "jobs.") + if jobID != "" { + orderedJobIDs = append(orderedJobIDs, jobID) + } + } + } + + // Reverse for newest-first (KV insertion order is chronological) + for i, j := 0, len(orderedJobIDs)-1; i < j; i, j = i+1, j-1 { + orderedJobIDs[i], orderedJobIDs[j] = orderedJobIDs[j], orderedJobIDs[i] + } + + // Parse status keys and track highest-priority event per (jobID, hostname) + statusPriority := map[string]int{ + "submitted": 0, + "acknowledged": 1, + "started": 2, + "failed": 3, + "completed": 4, + "retried": 4, + } + + // agentStates[jobID][hostname] = highest-priority event + agentStates := make(map[string]map[string]string) + + for _, key := range keys { + if !strings.HasPrefix(key, "status.") { + continue + } + parts := strings.SplitN(key, ".", 5) + if len(parts) < 4 { + continue + } + jobID := parts[1] + event := parts[2] + hostname := parts[3] + + if _, exists := agentStates[jobID]; !exists { + agentStates[jobID] = make(map[string]string) + } + + cur, exists := agentStates[jobID][hostname] + if !exists || statusPriority[event] > statusPriority[cur] { + agentStates[jobID][hostname] = event + } + } + + // Compute overall status per job using multi-agent logic + jobStatuses := make(map[string]lightJobInfo, len(agentStates)) + + for jobID, agents := range agentStates { + completed := 0 + failed := 0 + processing := 0 + acknowledged := 0 + + for hostname, state := range agents { + if hostname == "_api" { + continue + } + switch state { + case "completed", "retried": + completed++ + case "failed": + failed++ + case "started": + processing++ + case "acknowledged": + acknowledged++ + } + } + + totalAgents := completed + failed + processing + acknowledged + + var status string + switch { + case totalAgents == 0: + status = "submitted" + case processing > 0 || acknowledged > 0: + status = "processing" + case failed > 0 && completed > 0: + status = "partial_failure" + case failed > 0: + status = "failed" + default: + status = "completed" + } + + jobStatuses[jobID] = lightJobInfo{Status: status} + } + + return orderedJobIDs, jobStatuses +} + // getJobResponses retrieves response data for a specific job func (c *Client) getJobResponses( ctx context.Context, diff --git a/internal/job/client/jobs_public_test.go b/internal/job/client/jobs_public_test.go index 268075af..89be0c14 100644 --- a/internal/job/client/jobs_public_test.go +++ b/internal/job/client/jobs_public_test.go @@ -1047,7 +1047,7 @@ func (s *JobsPublicTestSuite) TestListJobs() { }, }, { - name: "returns all jobs no limit", + name: "returns all jobs default limit", setupMocks: func() { s.mockKV.EXPECT(). Keys(gomock.Any()). @@ -1072,13 +1072,9 @@ func (s *JobsPublicTestSuite) TestListJobs() { name: "filters out non matching status", statusFilter: "completed", setupMocks: func() { + // No status keys → default "submitted", doesn't match "completed" + // Two-pass: no kv.Get needed for filtering s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - - mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry.EXPECT().Value().Return([]byte( - `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil) }, expectedJobs: 0, expectedTotalCount: 0, @@ -1194,17 +1190,13 @@ func (s *JobsPublicTestSuite) TestListJobs() { statusFilter: "submitted", offset: 1, setupMocks: func() { + // No status keys → all default "submitted" s.mockKV.EXPECT(). Keys(gomock.Any()). Return([]string{"jobs.job-1", "jobs.job-2", "jobs.job-3"}, nil) - // Reversed: job-3, job-2, job-1 — all submitted - mockEntry3 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry3.EXPECT().Value().Return([]byte( - `{"id":"job-3","operation":{"type":"node.status.get"},"created":"2024-01-03T00:00:00Z"}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-3").Return(mockEntry3, nil) - + // Reversed: job-3, job-2, job-1. Offset 1 → page = [job-2, job-1] + // Only page items get kv.Get in Pass 2 mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) mockEntry2.EXPECT().Value().Return([]byte( `{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`, @@ -1225,22 +1217,18 @@ func (s *JobsPublicTestSuite) TestListJobs() { statusFilter: "submitted", limit: 1, setupMocks: func() { + // No status keys → both default "submitted" s.mockKV.EXPECT(). Keys(gomock.Any()). Return([]string{"jobs.job-1", "jobs.job-2"}, nil) - // Reversed: job-2, job-1 — both submitted + // Reversed: job-2, job-1. Limit 1 → page = [job-2] + // Only page item gets kv.Get in Pass 2 mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) mockEntry2.EXPECT().Value().Return([]byte( `{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`, )) s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil) - - mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry1.EXPECT().Value().Return([]byte( - `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil) }, expectedJobs: 1, expectedTotalCount: 2, @@ -1249,11 +1237,13 @@ func (s *JobsPublicTestSuite) TestListJobs() { name: "filter skips jobs with get error", statusFilter: "submitted", setupMocks: func() { + // No status keys → both default "submitted" s.mockKV.EXPECT(). Keys(gomock.Any()). Return([]string{"jobs.job-bad", "jobs.job-good"}, nil) - // Reversed: job-good, job-bad + // Reversed: job-good, job-bad — both match filter + // Pass 2: job-good Get fails, job-bad Get succeeds s.mockKV.EXPECT(). Get(gomock.Any(), "jobs.job-good"). Return(nil, errors.New("kv error")) @@ -1264,8 +1254,10 @@ func (s *JobsPublicTestSuite) TestListJobs() { )) s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-bad").Return(mockEntry, nil) }, - expectedJobs: 1, - expectedTotalCount: 1, + expectedJobs: 1, + // totalCount is 2 because key-name-based counting finds + // both jobs matching the filter before Pass 2 Get errors + expectedTotalCount: 2, }, { name: "getJobStatusFromKeys with invalid JSON", @@ -1281,6 +1273,31 @@ func (s *JobsPublicTestSuite) TestListJobs() { expectedJobs: 0, expectedTotalCount: 1, }, + { + name: "limit exceeding max capped to default", + limit: 200, + setupMocks: func() { + s.mockKV.EXPECT(). + Keys(gomock.Any()). + Return([]string{"jobs.job-1", "jobs.job-2"}, nil) + + // Limit 200 → capped to DefaultPageSize (10) + // Both jobs are within page + mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) + mockEntry2.EXPECT().Value().Return([]byte( + `{"id":"job-2","operation":{"type":"node.status.get"},"created":"2024-01-02T00:00:00Z"}`, + )) + s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil) + + mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) + mockEntry1.EXPECT().Value().Return([]byte( + `{"id":"job-1","operation":{"type":"node.hostname.get"},"created":"2024-01-01T00:00:00Z"}`, + )) + s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil) + }, + expectedJobs: 2, + expectedTotalCount: 2, + }, { name: "newest first ordering", setupMocks: func() { diff --git a/internal/job/client/jobs_test.go b/internal/job/client/jobs_test.go new file mode 100644 index 00000000..ba42ecdf --- /dev/null +++ b/internal/job/client/jobs_test.go @@ -0,0 +1,232 @@ +// Copyright (c) 2026 John Dewey + +// Permission is hereby granted, free of charge, to any person obtaining a copy +// of this software and associated documentation files (the "Software"), to +// deal in the Software without restriction, including without limitation the +// rights to use, copy, modify, merge, publish, distribute, sublicense, and/or +// sell copies of the Software, and to permit persons to whom the Software is +// furnished to do so, subject to the following conditions: + +// The above copyright notice and this permission notice shall be included in +// all copies or substantial portions of the Software. + +// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING +// FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER +// DEALINGS IN THE SOFTWARE. + +package client + +import ( + "testing" + + "github.com/stretchr/testify/suite" +) + +type JobsTestSuite struct { + suite.Suite +} + +func (s *JobsTestSuite) TestComputeStatusFromKeyNames() { + tests := []struct { + name string + keys []string + expectedOrderIDs []string + expectedStatuses map[string]string + validateOrderFunc func(ids []string) + }{ + { + name: "empty keys", + keys: []string{}, + expectedOrderIDs: nil, + expectedStatuses: map[string]string{}, + }, + { + name: "only jobs keys no status events", + keys: []string{"jobs.job-1", "jobs.job-2"}, + expectedOrderIDs: []string{"job-2", "job-1"}, + expectedStatuses: map[string]string{}, + }, + { + name: "single agent completed", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.acknowledged.agent1.101", + "status.job-1.started.agent1.102", + "status.job-1.completed.agent1.103", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + }, + }, + { + name: "single agent failed", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.acknowledged.agent1.101", + "status.job-1.started.agent1.102", + "status.job-1.failed.agent1.103", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "failed", + }, + }, + { + name: "single agent processing", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.acknowledged.agent1.101", + "status.job-1.started.agent1.102", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "processing", + }, + }, + { + name: "submitted only via api", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "submitted", + }, + }, + { + name: "acknowledged only agent shows processing", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.acknowledged.agent1.101", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "processing", + }, + }, + { + name: "multi-agent partial failure", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.completed.agent1.101", + "status.job-1.failed.agent2.102", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "partial_failure", + }, + }, + { + name: "multi-agent all completed", + keys: []string{ + "jobs.job-1", + "status.job-1.completed.agent1.101", + "status.job-1.completed.agent2.102", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + }, + }, + { + name: "multi-agent one still processing", + keys: []string{ + "jobs.job-1", + "status.job-1.completed.agent1.101", + "status.job-1.started.agent2.102", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "processing", + }, + }, + { + name: "retried counts as completed", + keys: []string{ + "jobs.job-1", + "status.job-1.submitted._api.100", + "status.job-1.retried.agent1.101", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + }, + }, + { + name: "multiple jobs mixed statuses", + keys: []string{ + "jobs.job-1", + "jobs.job-2", + "jobs.job-3", + "status.job-1.completed.agent1.101", + "status.job-2.started.agent1.201", + "status.job-3.failed.agent1.301", + }, + expectedOrderIDs: []string{"job-3", "job-2", "job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + "job-2": "processing", + "job-3": "failed", + }, + }, + { + name: "malformed status key skipped", + keys: []string{ + "jobs.job-1", + "status.incomplete", + "status.job-1.completed.agent1.101", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + }, + }, + { + name: "non-job non-status keys ignored", + keys: []string{ + "jobs.job-1", + "responses.job-1.agent1.100", + "status.job-1.completed.agent1.101", + }, + expectedOrderIDs: []string{"job-1"}, + expectedStatuses: map[string]string{ + "job-1": "completed", + }, + }, + { + name: "empty job ID after trim skipped", + keys: []string{"jobs."}, + expectedOrderIDs: nil, + expectedStatuses: map[string]string{}, + }, + } + + for _, tt := range tests { + s.Run(tt.name, func() { + orderedIDs, jobStatuses := computeStatusFromKeyNames(tt.keys) + + s.Equal(tt.expectedOrderIDs, orderedIDs) + + actualStatuses := make(map[string]string, len(jobStatuses)) + for id, info := range jobStatuses { + actualStatuses[id] = info.Status + } + s.Equal(tt.expectedStatuses, actualStatuses) + }) + } +} + +func TestJobsTestSuite(t *testing.T) { + suite.Run(t, new(JobsTestSuite)) +} diff --git a/internal/job/client/types.go b/internal/job/client/types.go index b0d20aa4..ec4c38c1 100644 --- a/internal/job/client/types.go +++ b/internal/job/client/types.go @@ -36,6 +36,13 @@ import ( "github.com/retr0h/osapi/internal/provider/node/mem" ) +const ( + // DefaultPageSize is the default number of jobs per page. + DefaultPageSize = 10 + // MaxPageSize is the maximum allowed page size. + MaxPageSize = 100 +) + // JobClient defines the interface for interacting with the jobs system. type JobClient interface { // Job queue management operations @@ -343,5 +350,10 @@ type computedJobStatus struct { Timeline []job.TimelineEvent } +// lightJobInfo holds status derived from KV key names only (no reads). +type lightJobInfo struct { + Status string +} + // Ensure Client implements JobClient interface var _ JobClient = (*Client)(nil) From 5e5ac86a472e60a824365f3b4be86abcc28f2158 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Sat, 7 Mar 2026 11:31:04 -0800 Subject: [PATCH 2/3] refactor(job): remove GetQueueStats and use fast path MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Replace the slow GetQueueStats (which read every job payload) with GetQueueSummary (key-name-only derivation) for the job status endpoint. Move status counts into the ListJobs response so the CLI makes a single API call instead of two. Remove OperationCounts entirely as it required reading every job payload. Fix job status TUI rendering with alt screen. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- cmd/client_job_list.go | 9 +- cmd/client_job_status.go | 9 +- .../docs/gen/api/get-queue-statistics.api.mdx | 52 +---- docs/docs/gen/api/list-jobs.api.mdx | 53 ++++- .../sidebar/architecture/job-architecture.md | 9 +- .../docs/sidebar/usage/cli/client/job/list.md | 6 +- .../sidebar/usage/cli/client/job/status.md | 17 +- go.mod | 2 +- go.sum | 2 + internal/api/gen/api.yaml | 19 +- internal/api/job/gen/api.yaml | 19 +- internal/api/job/gen/job.gen.go | 6 +- internal/api/job/job_list.go | 6 +- internal/api/job/job_status.go | 9 +- internal/api/job/job_status_public_test.go | 17 +- internal/job/client/jobs.go | 120 ++-------- internal/job/client/jobs_public_test.go | 219 ------------------ internal/job/client/types.go | 8 +- internal/job/mocks/job_client.gen.go | 15 -- internal/job/types.go | 7 +- 20 files changed, 145 insertions(+), 459 deletions(-) diff --git a/cmd/client_job_list.go b/cmd/client_job_list.go index d05fc21a..aad8e4d7 100644 --- a/cmd/client_job_list.go +++ b/cmd/client_job_list.go @@ -54,16 +54,9 @@ var clientJobListCmd = &cobra.Command{ return } - // Get queue stats for summary - statsResp, err := sdkClient.Job.QueueStats(ctx) - if err != nil { - cli.HandleError(err, logger) - return - } - jobs := jobsResp.Data.Items totalItems := jobsResp.Data.TotalItems - statusCounts := statsResp.Data.StatusCounts + statusCounts := jobsResp.Data.StatusCounts if jsonOutput { displayJobListJSON(jobs, totalItems, statusCounts, statusFilter, limitFlag, offsetFlag) diff --git a/cmd/client_job_status.go b/cmd/client_job_status.go index c4f71992..a3729d38 100644 --- a/cmd/client_job_status.go +++ b/cmd/client_job_status.go @@ -176,13 +176,6 @@ func fetchJobsStatus() string { statusDisplay += fmt.Sprintf(" Dead Letter Queue: %d\n", stats.DlqCount) } - if len(stats.OperationCounts) > 0 { - statusDisplay += "\nOperation Types:\n" - for opType, count := range stats.OperationCounts { - statusDisplay += fmt.Sprintf(" %s: %d\n", opType, count) - } - } - return statusDisplay } @@ -215,7 +208,7 @@ and operation types with live refresh.`, return } - p := tea.NewProgram(initialJobsModel(pollIntervalSeconds)) + p := tea.NewProgram(initialJobsModel(pollIntervalSeconds), tea.WithAltScreen()) _, err := p.Run() if err != nil { status := fetchJobsStatus() diff --git a/docs/docs/gen/api/get-queue-statistics.api.mdx b/docs/docs/gen/api/get-queue-statistics.api.mdx index ed104053..87b73628 100644 --- a/docs/docs/gen/api/get-queue-statistics.api.mdx +++ b/docs/docs/gen/api/get-queue-statistics.api.mdx @@ -5,7 +5,7 @@ description: "Retrieve statistics about the job queue." sidebar_label: "Get queue statistics" hide_title: true hide_table_of_contents: true -api: eJztVk1v1DAQ/SvWnEO75eOS24IoKhKoQBGHarVy4unG28ROx+PCEvm/o3G2S7bbQ5EAceC0SXbG7817Y3sGMBhqsj1b76CEj8hk8RZVYM02sK2D0pWPrLhBtfaVuokY8QgKYL0KUF7CW18t32mnV9ih4+X8/Gy59tXS90haVg2wKCBgHcnyBsrLAV6iJqR55Eby174qCbWBRVoUQBh67wIGKAd4OpvJzz7FD0Jgwk+41N4xOpZg3fetrTPy8TpIxgChbrDT8sSbHqEEX62xZiigJ+HJdsRjz7oV9mESax3jCgmKezwuJFi52FVIyl+JOEFZl4XaiYTfdNe3COXzp6kAIR3DsvbRcXiIjjbGyvK6Pd8ntk8l3efySlbckag2akQ6ksidEX8DdwemJDXjm/ZmRH6Epu8fVNOgNqpFZqQ7ZVMqgC2LsmNDfGLN4eO2eSAlCXg+Ozlsn89OR2482e9o1BM1Pz9T17hRhDfREprf10xI5GkSFpisWx2UPFeTd6lcKs65ihvNytd1JEKz101wqm2LRrFXNN2xW8sNsrZteAT4zne1zZns9kziQVgTUaAd8ldP14pthz5yhq69wcdsnl2RkrAH8mI2m7r7WqIOjH12aOypp8oag049UWcuxKsrW1t0rHqkzoaQj6L/7v777r546NTPgXdyWLcaD4I/cw389/YPeZsK6JAbb6CEFWbhtQwBcLz21fEoA8iwQLdIMlxMJodPYt7oz3R+2PFtmHvJzWFQQpWDoNg+nHrqNEMJb79c5NvDuiuf07ds5ys5K34OMnIzQAFCZCz85Gh2NBOheh+407mjnM5Yb5AP+vG+cMPP7vylKWusjvEbH/ettk4YRGplwVG8PEHB3XAhw1bjA8v3Yah0wM/UpiSfbyLSZhT1VpPVldR9uUgFNKgNUp7NrnEjYtQ19uLPrW6j4B9sJ5nVdl6+eX0hQ8S+IfcMyKtv/9JuM1l7GMaIC3+NLiUotiRY3iEtUko/ADwftuo= +api: eJztVk1v00AQ/SurOZs2LXDxLSBaFQlU2iIOVRSt7Um8qb3rzs4WgrX/Hc06TZ0mhyKBxIFTbGc+3ry3OzM9VOhLMh0bZyGHK2Qy+IDKs2bj2ZRe6cIFVlyjWrlC3QcMeAQZsF56yG/hoyvmn7TVS2zR8nx6eTFfuWLuOiQtUT3MMvBYBjK8hvy2h3eoCWkauBb/lStyQl3BLM4yIPSdsx495D2cTibyswvxiwAY4RMspbOMlsVYd11jypT5eOXFowdf1thqeeJ1h5CDK1ZYMmTQkeBkM+Rjx7oR9H5kayzjEgmyZzhuxFjZ0BZIyi2EHK+MTURtScIfuu0ahPzNacxAQAc/L12w7A/B0VVlJLxuLneB7UKJz7G8l4hbEMVaDZmOkmVzP2R8QU2fD1ZToa5Ug8xIj5XFmAEblsoGQa5Zs7/aiAcxisGbycm+fF+tDlw7Mj+xUq/U9PJC3eFaEd4HQ1j9OTGRyNHIzDMZu9wreapG71K5VJx8FdealSvLQITVjppwpk2DlWKnaHxjHilH1qbxL0i+1VttfEa3LYE4mLYKKKkt8ndHd4pNiy5wSl26Cl9yeLdFisNOkreTyVjdD2K1J+zrfWHPHBWmqtCqV+rC+rBYmNKgZdUhtcb71Ar+q/vvq/v2UNdNho90GLscGsHfacP/tf1L2sYMWuTaVZDDEhPxWoYwHK9ccTzQADKs6QFJhvtocl+LeIM+4/m9xVszd+KbzCCHIhlBtnk4c9Rqhhw+frtJ08PYhUvuG7TTpfSKp0VCJgNkIECGwk+OJkcTIapznludTpTVKdc58t55fE5c/3Q6f2vLGapj/MHHXaONFQSBGgk4kJc2GHgc7rLs1M6zfO/7Qnv8Sk2M8vk+IK0HUh80GV1I3bezmEGNukJKu9EdroWMssRO9HnQTZD8e9dJdqWtlucfbmR52BXkmQAp+uYvbdej2H0/WNy4O7QxQrYBwfIOcRZj/AXUzojN sidebar_class_name: "get api-method" info_path: gen/api/agent-management-api custom_edit_url: null @@ -138,54 +138,6 @@ Retrieve statistics about the job queue. Count of jobs by status. - - - - - - -
- - - - operation_counts - - object - - -
-
- - - Count of jobs by operation type. - -
diff --git a/docs/docs/gen/api/list-jobs.api.mdx b/docs/docs/gen/api/list-jobs.api.mdx index 10b4088d..0943ec3c 100644 --- a/docs/docs/gen/api/list-jobs.api.mdx +++ b/docs/docs/gen/api/list-jobs.api.mdx @@ -5,7 +5,7 @@ description: "Retrieve jobs, optionally filtered by status." sidebar_label: "List jobs" hide_title: true hide_table_of_contents: true -api: eJztWN9v20YM/lcOfNoA2XW6dhgM9MHbmi1FuwZtgj0UgUFJlH2OdKfcUWk8Q//7wDtb/pk4LdqhA/pkS+LxPvIjeTwuICefOV2ztgaG8I7YabolNbOpT5QN77Es56rQJZOjXKVz5Rm58X1IgHHiYfgBXtl0/AYNTqgiw+PR+dl4ZtOxrcmhqPBwlYCnrHGa5zD8sIBfCR25UcNTWT+z6dAR5nDVXiVQo8OKmJwPogYrgiHEXSEBLUhvGnJzSMDRTaMd5TAssPSUwF3PYq17mc1pQqZHd+ywF3Eu4BZLnSOLOltppqrmeWIN2eKFb9JKM1Ouamcz8l6bicpsVZckLwvUpXxDxxrLsTw2jqBNwGdTqlC087yOQJ02E0iATFOJdZ1qSGCtHBLo1EMCcQMI1m9tcdUmOyydBi4CR5t0tEnnq1JXmr+CqyptXgwOGq0N04QcJFBpoysxfCC4C2xKhuHJYM+KN3gncso0VUpO2SIaxFY54saZvrr0pAaqsE4Zq4JJW0baovD0TVm5b+Rfe8b5a10Hm2qcaBOyow8S9Y58bY2ngOvpYCA/28pGqtSeV8r6IYAMk2ERxboudRb0PZl5kV/s47fpjDKOYViTYx13Y8tYjsV8f9DYbRgXIr1LW4WcTSVleErLYiEA6Q4lxGH47GmbwO4O6BwKZ7vv74Gp80NZVlhXIcMQmkbne2Avjb5pSOmcDOtCR8SCcWbTfuA4lpUDircV/dY4R4aX2barJXOEksfH1YigtkaxrsgzVnVY31XKQ17Ycf+UVCeucmQUR2Oe61iszzd8xq6hNoRWiM/dgBJV8VvQo3SxrngBFjln3XGjXoqYqsh7nJBoicUsqJhazzFhj2kZTcS/K3nFU+RVLaZ8y9tNLTmbj5GPa32NnlVcsOPzrYR72Ofn5HoY4K0WRYdJHqfOYp7JLqusPMzFsei+LxClpiDjgXKwArTi7yHKHmSibds2gWDgWGB8mks2zsuw+L/yyv2W5s1eMm1ZKnFQakMHStFOtk6dNba0E51hqTzdNGQyWhY9VeqCsnlWkqJbMhysfGQp6yLxeACfvX+rfvl5cLKO3lXxCbtG0m+Xp8CRPBUxJTLqh64pSRRm18Z+LCmfyJNndOF1VwqSZT4ncjI7TfmPn5XZ1ilvG5ctU3tCRorYMrXXtizLyHGtfzYVmp60jZiWpDY+HnDQpxSynBh16aWQLU/VtKR+iJ0QPCwHmvS8vwfBd8uSAFufX2vPr2zqN7+2CTw7dLCfmdCHKOldyLNaN8Bf8JB/pAtGBx0Z/BJos1kmJ2G+dbrDaWyPY/MWrxDrthSWDn3E5l2N6EjA1Da8BnFw27wh2doQf7TuOiSKbSLx0vY9pqfpjJQFW5s8Hww2eQ0hskfqyT6plwYbnlqn/6Fc9dTo/Exd01x1/el3Yv8PxP60T+ypdanOczKqp86Mb4pCZzochOQq7X24735n99tn9/mhWhyPALloSUPzhS9a38n8SmSGvoGnNochTMJQoEYZLcGTmU1BZk/uNs6TNgZR74W1SMzmOKoDOmWuYTkFkOc0CEGy/HO6uni++vsCBIE2hY3NXYQZ2571XEzOAEhAgESLT/qDfhg01NZzhSGUlsMNaR9C9O26abGOxU8f10W7mO74SV2iNuEy5UrRGv0VRnEyrJPmTh4XixQ9XbqybeV1nLLIVC7XXtqi9ZzlXpT3jKvugXNN881h3y2WjQiFwc/j9/y84dKDkFYztc9E9MhJ0IMQuonXGsOVPDgtICS4pS0nzMkFjuKqUZZRvblqr4KJli59/nh5Ibe17VTYCf2gfXVzMvMN3YtFlLiw12TaFlbQWZ6hvWrb9l9R0dQD +api: eJztWF+P2zYM/yqCnlbASXNdOwwB+pCtve2Kbju0d9hDdwhom050Z0s+iUovCwzsQ+wT7pMMlGLn/+VatEMH9CmxTVE/8kdSFBcyR5dZVZMyWg7lGySrcIbi2qQuESa8h7Kci0KVhBZzkc6FIyDv+jKRBBMnh+/kK5OOfwENE6xQ03h0fja+NunY1GiBVTh5lUiHmbeK5nL4biF/QLBoR56mvP7apEOLkMur5iqRNViokNC6IKqhQjmUcVeZSMVIbz3auUykxVuvLOZyWEDpMJF3PQO16mUmxwnqHt6RhV7EuZAzKFUOxOpMpQirmuaJ0WiK586nlSLCXNTWZOic0hORmaoukV8WoEr+BpYUlGN+9BZlk0iXTbEC1k7zOgK1Sk9kIlH7iq3rVMtErpTLRHbqZSLjBjJYv7HFVZNssXQauAgcrdPRJJ2vSlUp+gyuqpR+PthrtNKEE7QykZXSqmLDB4y7AF+SHJ4Mdqz4Be5YTmhfpWiFKaJBZIRF8lb3xaVDMRCFsUIbEUzaMNIUhcMvyspdI3/dMc7dqDrYVMNE6ZAdfclRb9HVRjsMuJ4MBvyzqWwkSuWoVdYPAaQJNbEo1HWpsqDv8bVj+cUufpNeY0YxDGu0pOJuZAjKMZvv9hq7CeOCpbdpq4CyKacMTXFZLBgg3gGHuBw+fcL+DJE6zozX5PbBgjxXseScbwLcRLTj5h9ZI2OBstzKC/FNl37JWmonq9xOlsmdbGf3o754gVbNOP2tqcQNzgXHnhO5t62tTAn/r8E58c9ff3OshhgTXNBc/w/NcLd9C9YCR+v2+wMEqXxffSmMrYDkUHqv8h2aLrW69ShUjppUoSJXjPjapH3ZsbFP8ZZzvbWoqfXnlpbMInAFO66GBZXRglSFjqCqw/rujNjnha3Am6LoxEUOBP2DMUPWYxOSKmTmdiqxqvgt6BGqWMVDgIXWGnvcqJcsJip0DibIWmIoBRVT4yiWqmNaRhP2bysvaArUhirmG972NVerfAx0XOtrcCTigi2fb5Sa+31+jrYHAV67KDqMK1hqDeQZ79LWoyP5eyC6DwUipzkQ7CmELaCWv/sou5eJpmmaRAYDxwzjw1yy1imExf+VVw5bmvudZNqwlOOgVBr3lKKtbJ1ao01pJiqDUji89agzXJZ7UaoCs3lWosAZagpWPrCUdZF4PIDP3v4mvv9ucLKK3rb4hF0j6bPl+XckT1lMsMzGeQDZjTbvS8wn/OQIbHi952iwoTPOH31UZhsrnPE2W6b2BDUXsWVqr2xZlpHjWn/2Fegeny+QlijWPu5x0IcUshwJVOm4kC37ibTEfoidEDzERzl3+y+C4JtlSZAbn18rR69M6ta/Nol8uq+lOdOhAxPctaEjsWr9P2F780AXjPY6Mvgl0GayjE/CfKOvkafxYhDb1nh5WjXkcunQB2ze1YiOBEiNpxWIvdvmHnlrjfTe2JuQKMZH4rnhfUg31xnJCzY2eTYYrPMaQmSH1JNdUi81eJoaq/7EXPTE6PwstE5dZ/6V2P8Dsd/uEntqbKryHLXoiTPtfFGoTIWDEG2lnAs3/a/sfvnsPttXi+MR0N5nPvEV8yuZn4nM0DfQ1ORyKCdhHFIDD9Xk42uTSp662VmcpK2N4N4ya5GY9UFcB3RKVMvl/IOf0yAkk+Wf0/bi+er3C8kIlC5MbO4izNj2rCaCfAbIRDKQaPFJf9API5baOKoghNJyrMPtQ4i+bTctVrH44YPKaBfhHT2uS1DhUu5tyVqjv8IQkseU3Nzx42KRgsNLWzYNv47zJZ5H5spxW7SaMB1EeWBQdwDODc7Xx5wzKD0LhZHXw/f8uLHavZDaaeJHInrgDOxeCN2sb4Xhih+sYhAc3NyWI+RoA0dx1SjLsF5ftVPBWEuXPj+9vODb2mYqbIV+0N7enPR8TfdiESUuzA3qppEtdOJn2Vw1TfMvwukvHg== sidebar_class_name: "get api-method" info_path: gen/api/agent-management-api custom_edit_url: null @@ -145,6 +145,55 @@ Retrieve jobs, optionally filtered by status. schema={{"type":"integer","description":"Total number of jobs matching the filter.","example":42}} > + +
+ + + + status_counts + + object + + +
+
+ + + Count of all jobs by status (submitted, processing, completed, failed, partial_failure). Derived from key names during the listing pass — no extra reads. + + + +
+ + +
+
diff --git a/docs/docs/sidebar/architecture/job-architecture.md b/docs/docs/sidebar/architecture/job-architecture.md index 056a8ebe..d5dba328 100644 --- a/docs/docs/sidebar/architecture/job-architecture.md +++ b/docs/docs/sidebar/architecture/job-architecture.md @@ -346,17 +346,12 @@ GET /api/v1/jobs/{job-id} { "total_jobs": 42, "status_counts": { - "unprocessed": 5, + "submitted": 5, "processing": 2, "completed": 30, "failed": 5 }, - "operation_counts": { - "node.hostname.get": 15, - "node.status.get": 19, - "network.dns.get": 8, - "network.ping.do": 23 - } + "dlq_count": 0 } ``` diff --git a/docs/docs/sidebar/usage/cli/client/job/list.md b/docs/docs/sidebar/usage/cli/client/job/list.md index f72ddf24..cab2d6af 100644 --- a/docs/docs/sidebar/usage/cli/client/job/list.md +++ b/docs/docs/sidebar/usage/cli/client/job/list.md @@ -11,7 +11,7 @@ $ osapi client job list Jobs: - JOB ID STATUS CREATED TARGET OPERATION AGENTS + JOB ID STATUS CREATED TARGET OPERATION 550e8400-e29b-41d4-a716-446655440000 completed 2026-02-16 13:21 server1 node.hostname.get 661f9511-f30c-41d4-a716-557766551111 completed 2026-02-16 13:43 server1 network.dns.get 772a0622-a41d-52e5-b827-668877662222 failed 2026-02-16 15:48 server1 node.status.get @@ -28,7 +28,7 @@ $ osapi client job list --status completed --limit 3 Jobs: - JOB ID STATUS CREATED TARGET OPERATION AGENTS + JOB ID STATUS CREATED TARGET OPERATION 550e8400-e29b-41d4-a716-446655440000 completed 2026-02-16 13:21 server1 node.hostname.get 661f9511-f30c-41d4-a716-557766551111 completed 2026-02-16 13:43 server1 network.dns.get 772a0622-a41d-52e5-b827-668877662222 completed 2026-02-16 15:48 server1 node.status.get @@ -39,5 +39,5 @@ $ osapi client job list --status completed --limit 3 | Flag | Description | Default | | ---------- | ----------------------------------------------- | ------- | | `--status` | Filter by status (submitted, processing, etc.) | | -| `--limit` | Limit number of jobs displayed (0 for no limit) | 10 | +| `--limit` | Maximum number of jobs per page (1-100) | 10 | | `--offset` | Skip the first N jobs (for pagination) | 0 | diff --git a/docs/docs/sidebar/usage/cli/client/job/status.md b/docs/docs/sidebar/usage/cli/client/job/status.md index 13e1a136..ad186ae9 100644 --- a/docs/docs/sidebar/usage/cli/client/job/status.md +++ b/docs/docs/sidebar/usage/cli/client/job/status.md @@ -5,14 +5,13 @@ Display the job queue status with live updates using a BubbleTea TUI: ```bash $ osapi client job status --poll-interval-seconds 5 - Queue Status: - - STATUS COUNT - Total 42 - Unprocessed 5 - Processing 2 - Completed 30 - Failed 5 + Jobs Queue Status: + Total Jobs: 42 + Unprocessed: 5 + Processing: 2 + Completed: 30 + Failed: 5 + Dead Letter Queue: 3 ``` ## Flags @@ -22,4 +21,4 @@ $ osapi client job status --poll-interval-seconds 5 | `--poll-interval-seconds` | Interval between polling operations | 30 | The status view auto-refreshes at the configured interval, showing job counts by -status and operation type. +status. diff --git a/go.mod b/go.mod index 9232de7c..73ebf8ab 100644 --- a/go.mod +++ b/go.mod @@ -18,7 +18,7 @@ require ( github.com/oapi-codegen/runtime v1.2.0 github.com/osapi-io/nats-client v0.0.0-20260306210421-d68b2a0f287b github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848 - github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013 + github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e github.com/prometheus-community/pro-bing v0.8.0 github.com/prometheus/client_golang v1.23.2 github.com/samber/slog-echo v1.21.0 diff --git a/go.sum b/go.sum index 4298d6a0..afd2e7ca 100644 --- a/go.sum +++ b/go.sum @@ -757,6 +757,8 @@ github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848 h1:ELW1sTVBn5 github.com/osapi-io/nats-server v0.0.0-20260216201410-1f33dfc63848/go.mod h1:4rzeY9jiJF/+Ej4WNwqK5HQ2sflZrEs60GxQpg3Iya8= github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013 h1:kcP1brAYrbrETk+8jgJKyGE8NI0zIvSg3hT5Y1oviT4= github.com/osapi-io/osapi-sdk v0.0.0-20260307073158-439e543a3013/go.mod h1:i9g4jaIL6NVo9MRpz33lAEnY4L7u6aO97/5hN4W3hGE= +github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e h1:sCg9f0Undm5zUdZ+oKESdplMhRvAlqmnMqKlyOoInX0= +github.com/osapi-io/osapi-sdk v0.0.0-20260307192743-857786ce1c9e/go.mod h1:i9g4jaIL6NVo9MRpz33lAEnY4L7u6aO97/5hN4W3hGE= github.com/otiai10/copy v1.2.0/go.mod h1:rrF5dJ5F0t/EWSYODDu4j9/vEeYHMkc8jt0zJChqQWw= github.com/otiai10/copy v1.14.0 h1:dCI/t1iTdYGtkvCuBG2BgR6KZa83PTclw4U5n2wAllU= github.com/otiai10/copy v1.14.0/go.mod h1:ECfuL02W+/FkTWZWgQqXPWZgW9oeKCSQ5qVfSc4qc4w= diff --git a/internal/api/gen/api.yaml b/internal/api/gen/api.yaml index 94e578f6..c8185209 100644 --- a/internal/api/gen/api.yaml +++ b/internal/api/gen/api.yaml @@ -2538,6 +2538,20 @@ components: type: integer description: Total number of jobs matching the filter. example: 42 + status_counts: + type: object + additionalProperties: + type: integer + description: > + Count of all jobs by status (submitted, processing, completed, + failed, partial_failure). Derived from key names during the listing + pass — no extra reads. + example: + submitted: 5 + processing: 2 + completed: 30 + failed: 3 + partial_failure: 2 items: type: array items: @@ -2631,11 +2645,6 @@ components: additionalProperties: type: integer description: Count of jobs by status. - operation_counts: - type: object - additionalProperties: - type: integer - description: Count of jobs by operation type. dlq_count: type: integer description: Number of jobs in the dead letter queue. diff --git a/internal/api/job/gen/api.yaml b/internal/api/job/gen/api.yaml index 3803cbf7..d4e6ab10 100644 --- a/internal/api/job/gen/api.yaml +++ b/internal/api/job/gen/api.yaml @@ -431,6 +431,20 @@ components: type: integer description: Total number of jobs matching the filter. example: 42 + status_counts: + type: object + additionalProperties: + type: integer + description: > + Count of all jobs by status (submitted, processing, completed, + failed, partial_failure). Derived from key names during the + listing pass — no extra reads. + example: + submitted: 5 + processing: 2 + completed: 30 + failed: 3 + partial_failure: 2 items: type: array items: @@ -526,11 +540,6 @@ components: additionalProperties: type: integer description: Count of jobs by status. - operation_counts: - type: object - additionalProperties: - type: integer - description: Count of jobs by operation type. dlq_count: type: integer description: Number of jobs in the dead letter queue. diff --git a/internal/api/job/gen/job.gen.go b/internal/api/job/gen/job.gen.go index d760eb69..4c89eb37 100644 --- a/internal/api/job/gen/job.gen.go +++ b/internal/api/job/gen/job.gen.go @@ -121,6 +121,9 @@ type JobDetailResponse struct { type ListJobsResponse struct { Items *[]JobDetailResponse `json:"items,omitempty"` + // StatusCounts Count of all jobs by status (submitted, processing, completed, failed, partial_failure). Derived from key names during the listing pass — no extra reads. + StatusCounts *map[string]int `json:"status_counts,omitempty"` + // TotalItems Total number of jobs matching the filter. TotalItems *int `json:"total_items,omitempty"` } @@ -130,9 +133,6 @@ type QueueStatsResponse struct { // DlqCount Number of jobs in the dead letter queue. DlqCount *int `json:"dlq_count,omitempty"` - // OperationCounts Count of jobs by operation type. - OperationCounts *map[string]int `json:"operation_counts,omitempty"` - // StatusCounts Count of jobs by status. StatusCounts *map[string]int `json:"status_counts,omitempty"` diff --git a/internal/api/job/job_list.go b/internal/api/job/job_list.go index dadc8359..fca48399 100644 --- a/internal/api/job/job_list.go +++ b/internal/api/job/job_list.go @@ -99,8 +99,10 @@ func (j *Job) GetJob( } totalItems := result.TotalCount + statusCounts := result.StatusCounts return gen.GetJob200JSONResponse{ - TotalItems: &totalItems, - Items: &items, + TotalItems: &totalItems, + StatusCounts: &statusCounts, + Items: &items, }, nil } diff --git a/internal/api/job/job_status.go b/internal/api/job/job_status.go index 6f40a333..bff4f7bd 100644 --- a/internal/api/job/job_status.go +++ b/internal/api/job/job_status.go @@ -31,7 +31,7 @@ func (j *Job) GetJobStatus( ctx context.Context, _ gen.GetJobStatusRequestObject, ) (gen.GetJobStatusResponseObject, error) { - stats, err := j.JobClient.GetQueueStats(ctx) + stats, err := j.JobClient.GetQueueSummary(ctx) if err != nil { errMsg := err.Error() return gen.GetJobStatus500JSONResponse{ @@ -40,9 +40,8 @@ func (j *Job) GetJobStatus( } return gen.GetJobStatus200JSONResponse{ - TotalJobs: &stats.TotalJobs, - StatusCounts: &stats.StatusCounts, - OperationCounts: &stats.OperationCounts, - DlqCount: &stats.DLQCount, + TotalJobs: &stats.TotalJobs, + StatusCounts: &stats.StatusCounts, + DlqCount: &stats.DLQCount, }, nil } diff --git a/internal/api/job/job_status_public_test.go b/internal/api/job/job_status_public_test.go index 4f98ab26..6af77f6a 100644 --- a/internal/api/job/job_status_public_test.go +++ b/internal/api/job/job_status_public_test.go @@ -81,9 +81,6 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatus() { "completed": 30, "failed": 5, }, - OperationCounts: map[string]int{ - "node.hostname.get": 15, - }, DLQCount: 2, }, validateFunc: func(resp gen.GetJobStatusResponseObject) { @@ -106,7 +103,7 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatus() { for _, tt := range tests { s.Run(tt.name, func() { s.mockJobClient.EXPECT(). - GetQueueStats(gomock.Any()). + GetQueueSummary(gomock.Any()). Return(tt.mockStats, tt.mockError) resp, err := s.handler.GetJobStatus(s.ctx, gen.GetJobStatusRequestObject{}) @@ -128,16 +125,13 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusHTTP() { setupJobMock: func() *jobmocks.MockJobClient { mock := jobmocks.NewMockJobClient(s.mockCtrl) mock.EXPECT(). - GetQueueStats(gomock.Any()). + GetQueueSummary(gomock.Any()). Return(&jobtypes.QueueStats{ TotalJobs: 42, StatusCounts: map[string]int{ "completed": 30, "failed": 5, }, - OperationCounts: map[string]int{ - "node.hostname.get": 15, - }, DLQCount: 2, }, nil) return mock @@ -150,7 +144,7 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusHTTP() { setupJobMock: func() *jobmocks.MockJobClient { mock := jobmocks.NewMockJobClient(s.mockCtrl) mock.EXPECT(). - GetQueueStats(gomock.Any()). + GetQueueSummary(gomock.Any()). Return(nil, assert.AnError) return mock }, @@ -238,16 +232,13 @@ func (s *JobStatusPublicTestSuite) TestGetJobStatusRBACHTTP() { setupJobMock: func() *jobmocks.MockJobClient { mock := jobmocks.NewMockJobClient(s.mockCtrl) mock.EXPECT(). - GetQueueStats(gomock.Any()). + GetQueueSummary(gomock.Any()). Return(&jobtypes.QueueStats{ TotalJobs: 42, StatusCounts: map[string]int{ "completed": 30, "failed": 5, }, - OperationCounts: map[string]int{ - "node.hostname.get": 15, - }, DLQCount: 2, }, nil) return mock diff --git a/internal/job/client/jobs.go b/internal/job/client/jobs.go index 11202401..c6b8d26c 100644 --- a/internal/job/client/jobs.go +++ b/internal/job/client/jobs.go @@ -150,97 +150,8 @@ func (c *Client) CreateJob( }, nil } -// GetQueueStats returns statistics about the job queue. -func (c *Client) GetQueueStats( - ctx context.Context, -) (*job.QueueStats, error) { - c.logger.Debug("kv.keys", - slog.String("operation", "get_queue_stats"), - ) - // Get all job keys from KV store - keys, err := c.kv.Keys(ctx) - if err != nil { - if errors.Is(err, jetstream.ErrNoKeysFound) { - return &job.QueueStats{ - TotalJobs: 0, - StatusCounts: map[string]int{ - "submitted": 0, - "processing": 0, - "completed": 0, - "failed": 0, - }, - OperationCounts: map[string]int{}, - DLQCount: 0, - }, nil - } - return nil, fmt.Errorf("error fetching jobs: %w", err) - } - - statusCounts := map[string]int{ - "submitted": 0, - "processing": 0, - "completed": 0, - "failed": 0, - "partial_failure": 0, - } - - operationCounts := map[string]int{} - jobCount := 0 - - // Process immutable jobs and compute status from events - for _, key := range keys { - // Only process "jobs." prefixed keys - if !strings.HasPrefix(key, "jobs.") { - continue - } - - entry, err := c.kv.Get(ctx, key) - if err != nil { - continue - } - - var jobData map[string]interface{} - if err := json.Unmarshal(entry.Value(), &jobData); err != nil { - continue - } - - jobID := strings.TrimPrefix(key, "jobs.") - jobCount++ - - // Compute status from events (reuse already-fetched keys) - computedStatus := c.computeStatusFromEvents(ctx, keys, jobID) - statusCounts[computedStatus.Status]++ - - // Track operation type - if jobOperationData, ok := jobData["operation"].(map[string]interface{}); ok { - if operationType, ok := jobOperationData["type"].(string); ok { - operationCounts[operationType]++ - } - } - } - - // Get DLQ count - dlqCount := 0 - dlqName := c.streamName + "-DLQ" - dlqInfo, err := c.natsClient.GetStreamInfo(ctx, dlqName) - if err != nil { - // DLQ might not exist, which is fine - c.logger.Debug("failed to get DLQ info", slog.String("error", err.Error())) - } else { - dlqCount = int(dlqInfo.State.Msgs) - } - - return &job.QueueStats{ - TotalJobs: jobCount, - StatusCounts: statusCounts, - OperationCounts: operationCounts, - DLQCount: dlqCount, - }, nil -} - -// GetQueueSummary returns lightweight job queue statistics derived from KV key -// names only — no entry reads. This is much faster than GetQueueStats for large -// queues and suitable for health/status dashboards. +// GetQueueSummary returns job queue statistics derived from KV key names +// only — no entry reads. func (c *Client) GetQueueSummary( ctx context.Context, ) (*job.QueueStats, error) { @@ -386,6 +297,23 @@ func (c *Client) ListJobs( // Pass 1: Light — key names only, no kv.Get() orderedJobIDs, jobStatuses := computeStatusFromKeyNames(allKeys) + // Compute status counts from key-name-derived statuses (free — no extra reads) + statusCounts := map[string]int{ + "submitted": 0, + "processing": 0, + "completed": 0, + "failed": 0, + "partial_failure": 0, + } + for _, info := range jobStatuses { + statusCounts[info.Status]++ + } + // Jobs with no status events are "submitted" but not in jobStatuses + submittedOnly := len(orderedJobIDs) - len(jobStatuses) + if submittedOnly > 0 { + statusCounts["submitted"] += submittedOnly + } + // Filter + count (fast, no reads) var matchingIDs []string for _, id := range orderedJobIDs { @@ -405,8 +333,9 @@ func (c *Client) ListJobs( // Apply offset if offset >= len(matchingIDs) { return &ListJobsResult{ - Jobs: []*job.QueuedJob{}, - TotalCount: totalCount, + Jobs: []*job.QueuedJob{}, + TotalCount: totalCount, + StatusCounts: statusCounts, }, nil } matchingIDs = matchingIDs[offset:] @@ -442,8 +371,9 @@ func (c *Client) ListJobs( } return &ListJobsResult{ - Jobs: jobs, - TotalCount: totalCount, + Jobs: jobs, + TotalCount: totalCount, + StatusCounts: statusCounts, }, nil } diff --git a/internal/job/client/jobs_public_test.go b/internal/job/client/jobs_public_test.go index 89be0c14..25426416 100644 --- a/internal/job/client/jobs_public_test.go +++ b/internal/job/client/jobs_public_test.go @@ -800,225 +800,6 @@ func (s *JobsPublicTestSuite) TestGetJobStatus() { } } -func (s *JobsPublicTestSuite) TestGetQueueStats() { - tests := []struct { - name string - expectedErr string - setupMocks func() - expectedJobs int - expectedDLQ int - }{ - { - name: "no keys found", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return(nil, jetstream.ErrNoKeysFound) - }, - expectedJobs: 0, - expectedDLQ: 0, - }, - { - name: "keys error", - expectedErr: "error fetching jobs", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return(nil, errors.New("connection failed")) - }, - }, - { - name: "get job error skipped", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - s.mockKV.EXPECT(). - Get(gomock.Any(), "jobs.job-1"). - Return(nil, errors.New("kv error")) - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("no stream")) - }, - expectedJobs: 0, - }, - { - name: "invalid job JSON skipped", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - - mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry.EXPECT().Value().Return([]byte(`not json`)) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("no stream")) - }, - expectedJobs: 0, - }, - { - name: "with DLQ info", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - - mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry.EXPECT().Value().Return([]byte( - `{"id":"job-1","operation":{"type":"node.hostname.get"}}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 5}}, nil) - }, - expectedJobs: 1, - expectedDLQ: 5, - }, - { - name: "operation without type field", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - - mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry.EXPECT().Value().Return([]byte( - `{"id":"job-1","operation":{"data":"some value"}}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("no stream")) - }, - expectedJobs: 1, - }, - { - name: "operation as non-map value", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{"jobs.job-1"}, nil) - - mockEntry := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry.EXPECT().Value().Return([]byte( - `{"id":"job-1","operation":"string-value"}`, - )) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("no stream")) - }, - expectedJobs: 1, - }, - { - name: "non-jobs prefix keys skipped", - setupMocks: func() { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{ - "status.job-1.submitted._api.100", - "responses.job-1.agent1.200", - }, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("no stream")) - }, - expectedJobs: 0, - }, - { - name: "with jobs and DLQ error", - setupMocks: func() { - keys := []string{"jobs.job-1", "jobs.job-2"} - s.mockKV.EXPECT().Keys(gomock.Any()).Return(keys, nil) - - mockEntry1 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry1.EXPECT(). - Value(). - Return([]byte(`{"id":"job-1","operation":{"type":"node.hostname.get"}}`)) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-1").Return(mockEntry1, nil) - - mockEntry2 := jobmocks.NewMockKeyValueEntry(s.mockCtrl) - mockEntry2.EXPECT(). - Value(). - Return([]byte(`{"id":"job-2","operation":{"type":"node.status.get"}}`)) - s.mockKV.EXPECT().Get(gomock.Any(), "jobs.job-2").Return(mockEntry2, nil) - - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), "JOBS-DLQ"). - Return(nil, errors.New("stream not found")) - }, - expectedJobs: 2, - expectedDLQ: 0, - }, - } - - for _, tt := range tests { - s.Run(tt.name, func() { - tt.setupMocks() - - stats, err := s.jobsClient.GetQueueStats(s.ctx) - - if tt.expectedErr != "" { - s.Error(err) - s.Contains(err.Error(), tt.expectedErr) - } else { - s.NoError(err) - s.NotNil(stats) - s.Equal(tt.expectedJobs, stats.TotalJobs) - if tt.expectedDLQ > 0 { - s.Equal(tt.expectedDLQ, stats.DLQCount) - } - } - }) - } -} - -func (s *JobsPublicTestSuite) TestGetQueueStatsDLQNameDerivedFromStreamName() { - tests := []struct { - name string - streamName string - expectedDLQ string - setupMocks func(dlqName string) - expectedMsgs int - }{ - { - name: "when stream name is JOBS", - streamName: "JOBS", - expectedDLQ: "JOBS-DLQ", - setupMocks: func(dlqName string) { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{}, nil) - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), dlqName). - Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 5}}, nil) - }, - expectedMsgs: 5, - }, - { - name: "when stream name is namespaced", - streamName: "osapi-JOBS", - expectedDLQ: "osapi-JOBS-DLQ", - setupMocks: func(dlqName string) { - s.mockKV.EXPECT().Keys(gomock.Any()).Return([]string{}, nil) - s.mockNATSClient.EXPECT(). - GetStreamInfo(gomock.Any(), dlqName). - Return(&jetstream.StreamInfo{State: jetstream.StreamState{Msgs: 3}}, nil) - }, - expectedMsgs: 3, - }, - } - - for _, tt := range tests { - s.Run(tt.name, func() { - tt.setupMocks(tt.expectedDLQ) - - opts := &client.Options{ - Timeout: 30 * time.Second, - KVBucket: s.mockKV, - StreamName: tt.streamName, - } - c, err := client.New(slog.Default(), s.mockNATSClient, opts) - s.Require().NoError(err) - - stats, err := c.GetQueueStats(s.ctx) - s.NoError(err) - s.NotNil(stats) - s.Equal(tt.expectedMsgs, stats.DLQCount) - }) - } -} - func (s *JobsPublicTestSuite) TestListJobs() { tests := []struct { name string diff --git a/internal/job/client/types.go b/internal/job/client/types.go index ec4c38c1..ef854803 100644 --- a/internal/job/client/types.go +++ b/internal/job/client/types.go @@ -51,9 +51,6 @@ type JobClient interface { operationData map[string]interface{}, targetHostname string, ) (*CreateJobResult, error) - GetQueueStats( - ctx context.Context, - ) (*job.QueueStats, error) GetQueueSummary( ctx context.Context, ) (*job.QueueStats, error) @@ -336,8 +333,9 @@ type CreateJobResult struct { // ListJobsResult represents the result of listing jobs with pagination. type ListJobsResult struct { - Jobs []*job.QueuedJob - TotalCount int + Jobs []*job.QueuedJob + TotalCount int + StatusCounts map[string]int } // computedJobStatus represents the computed status from events diff --git a/internal/job/mocks/job_client.gen.go b/internal/job/mocks/job_client.gen.go index ea8ad1d2..9e021854 100644 --- a/internal/job/mocks/job_client.gen.go +++ b/internal/job/mocks/job_client.gen.go @@ -190,21 +190,6 @@ func (mr *MockJobClientMockRecorder) GetJobStatus(arg0, arg1 interface{}) *gomoc return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetJobStatus", reflect.TypeOf((*MockJobClient)(nil).GetJobStatus), arg0, arg1) } -// GetQueueStats mocks base method. -func (m *MockJobClient) GetQueueStats(arg0 context.Context) (*job.QueueStats, error) { - m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "GetQueueStats", arg0) - ret0, _ := ret[0].(*job.QueueStats) - ret1, _ := ret[1].(error) - return ret0, ret1 -} - -// GetQueueStats indicates an expected call of GetQueueStats. -func (mr *MockJobClientMockRecorder) GetQueueStats(arg0 interface{}) *gomock.Call { - mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetQueueStats", reflect.TypeOf((*MockJobClient)(nil).GetQueueStats), arg0) -} - // GetQueueSummary mocks base method. func (m *MockJobClient) GetQueueSummary(arg0 context.Context) (*job.QueueStats, error) { m.ctrl.T.Helper() diff --git a/internal/job/types.go b/internal/job/types.go index 196bd095..a85edaee 100644 --- a/internal/job/types.go +++ b/internal/job/types.go @@ -191,10 +191,9 @@ type TimelineEvent struct { // QueueStats represents statistics about the job queue. type QueueStats struct { - TotalJobs int `json:"total_jobs"` - StatusCounts map[string]int `json:"status_counts"` - OperationCounts map[string]int `json:"operation_counts"` - DLQCount int `json:"dlq_count"` + TotalJobs int `json:"total_jobs"` + StatusCounts map[string]int `json:"status_counts"` + DLQCount int `json:"dlq_count"` } // Operation data structures for specific operations From bbbc3a3f13a0b7ed06dcbbd42c19dd912c8e730d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=D7=A0=CF=85=CE=B1=CE=B7=20=D7=A0=CF=85=CE=B1=CE=B7=D1=95?= =?UTF-8?q?=CF=83=CE=B7?= Date: Sat, 7 Mar 2026 11:39:03 -0800 Subject: [PATCH 3/3] docs(job): document two-pass listing and kv.Keys() limits MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add architecture documentation for the two-pass job listing approach, pagination limits, and the known scalability constraint around kv.Keys() returning all keys. Fix stale GetQueueStats references. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- .../sidebar/architecture/job-architecture.md | 59 ++++++++++++++++++- .../docs/sidebar/usage/cli/client/job/list.md | 10 ++-- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/docs/docs/sidebar/architecture/job-architecture.md b/docs/docs/sidebar/architecture/job-architecture.md index d5dba328..a338404d 100644 --- a/docs/docs/sidebar/architecture/job-architecture.md +++ b/docs/docs/sidebar/architecture/job-architecture.md @@ -37,7 +37,7 @@ into NATS JetStream: - **Agents** — Processes jobs, updates status, stores results All three use the **Job Client Layer** (`internal/job/client/`), which provides -type-safe business logic operations (`CreateJob`, `GetQueueStats`, +type-safe business logic operations (`CreateJob`, `GetQueueSummary`, `GetJobStatus`, `ListJobs`) on top of NATS JetStream. **NATS JetStream** provides three storage backends: @@ -658,7 +658,7 @@ The `internal/job/` package contains shared domain types and two subpackages: | `client.go` | Publish-and-wait/collect with KV + stream | | `query.go` | Query operations (system status, hostname, etc.) | | `modify.go` | Modify operations (DNS updates) | -| `jobs.go` | CreateJob, RetryJob, GetJobStatus, GetQueueStats | +| `jobs.go` | CreateJob, RetryJob, GetJobStatus, ListJobs | | `agent.go` | WriteStatusEvent, WriteJobResponse | | `types.go` | Client-specific types and interfaces | @@ -765,11 +765,64 @@ osapi agent start ## Performance Optimizations +### Two-Pass Job Listing + +Job listing uses a two-pass approach to avoid reading every job payload: + +**Pass 1 — Key-name scan (fast):** Calls `kv.Keys()` once to get all key names +in the bucket. Job status is derived purely from key name patterns +(`status.{id}.{event}.{hostname}.{ts}`) without any `kv.Get()` calls. This +produces ordered job IDs, per-job status, and aggregate status counts — all from +string parsing in memory. + +**Pass 2 — Page fetch (bounded):** Fetches full job details (`kv.Get()`) only +for the paginated page. With `limit=10`, this is ~10 reads regardless of total +queue size. + +``` +Pass 1: kv.Keys() → 1 call → parse key names → status for all jobs +Pass 2: kv.Get() → N calls → full details for page only (N = limit) +``` + +Queue statistics (`GetQueueSummary`) and the `ListJobs` status counts both use +Pass 1 only — no `kv.Get()` calls at all. + +### Pagination Limits + +The API enforces a maximum page size of 100 (`MaxPageSize`). Requests with +`limit=0` or `limit > 100` return 400. The default page size is 10. + +### Known Scalability Constraint: `kv.Keys()` + +The two-pass approach relies on `kv.Keys()`, which returns **all key names** in +the bucket as a string slice. NATS JetStream does not support paginated key +listing (`kv.Keys(prefix, limit, offset)`) — it is all or nothing. + +This is acceptable today because: + +- Key names are short strings (~80 bytes each) +- The KV bucket has a 1-hour TTL, naturally bounding the key count +- Even 100K keys as strings is only a few MB of memory + +However, at very large scale (millions of keys or longer/no TTL), `kv.Keys()` +would become a bottleneck in both memory and latency. If this becomes a problem, +potential approaches include: + +1. **Separate status index** — a dedicated KV key (e.g., `index.status.failed`) + maintaining a list of job IDs, updated on status transitions +2. **External index** — move listing/filtering to a database (SQLite, Postgres) + while keeping NATS for job dispatch and processing +3. **NATS KV watch** — use `kv.Watch()` to maintain an in-memory index + incrementally rather than scanning all keys on each request + +For now, the 1-hour TTL keeps the bucket bounded and `kv.Keys()` fast. + +### Other Optimizations + 1. **Batch Operations**: Agents can fetch multiple jobs per poll 2. **Connection Pooling**: Reuse NATS connections 3. **KV Caching**: Local caching of frequently accessed jobs 4. **Stream Filtering**: Agents only receive relevant job types -5. **Efficient Filtering**: Status-based key prefixes enable fast queries ## Error Handling diff --git a/docs/docs/sidebar/usage/cli/client/job/list.md b/docs/docs/sidebar/usage/cli/client/job/list.md index cab2d6af..a9173e4f 100644 --- a/docs/docs/sidebar/usage/cli/client/job/list.md +++ b/docs/docs/sidebar/usage/cli/client/job/list.md @@ -36,8 +36,8 @@ $ osapi client job list --status completed --limit 3 ## Flags -| Flag | Description | Default | -| ---------- | ----------------------------------------------- | ------- | -| `--status` | Filter by status (submitted, processing, etc.) | | -| `--limit` | Maximum number of jobs per page (1-100) | 10 | -| `--offset` | Skip the first N jobs (for pagination) | 0 | +| Flag | Description | Default | +| ---------- | ---------------------------------------------- | ------- | +| `--status` | Filter by status (submitted, processing, etc.) | | +| `--limit` | Maximum number of jobs per page (1-100) | 10 | +| `--offset` | Skip the first N jobs (for pagination) | 0 |