diff --git a/internal/api/client.go b/internal/api/client.go index 08e83b3..07f25f8 100644 --- a/internal/api/client.go +++ b/internal/api/client.go @@ -22,7 +22,8 @@ const ( ) type options struct { - httpClient apiclient.HttpRequestDoer + httpClient apiclient.HttpRequestDoer + streamHTTPClient apiclient.HttpRequestDoer } // Option configures a cloud API client. @@ -32,12 +33,16 @@ type Option func(*options) func WithHTTPClient(httpClient apiclient.HttpRequestDoer) Option { return func(opts *options) { opts.httpClient = httpClient + opts.streamHTTPClient = httpClient } } // Client is a thin wrapper around the generated OpenAPI client. type Client struct { - client *apiclient.ClientWithResponses + client *apiclient.ClientWithResponses + baseURL string + token string + streamHTTPClient apiclient.HttpRequestDoer } // NewClient constructs a generated client with auth and error-normalization helpers. @@ -54,22 +59,32 @@ func NewClient(apiURL, token string, opts ...Option) (*Client, error) { return nil, errors.New("api url must use http:// or https:// scheme") } - cfg := options{ - httpClient: &http.Client{Timeout: defaultTimeout}, - } + cfg := options{} for _, opt := range opts { opt(&cfg) } + if cfg.httpClient == nil { + cfg.httpClient = &http.Client{Timeout: defaultTimeout} + } + if cfg.streamHTTPClient == nil { + cfg.streamHTTPClient = &http.Client{} + } + baseURL := generatedClientBaseURL(parsed) clientOpts := []apiclient.ClientOption{ apiclient.WithHTTPClient(cfg.httpClient), apiclient.WithRequestEditorFn(authorizationEditor(token)), } - generated, err := apiclient.NewClientWithResponses(generatedClientBaseURL(parsed), clientOpts...) + generated, err := apiclient.NewClientWithResponses(baseURL, clientOpts...) if err != nil { return nil, fmt.Errorf("failed to create api client: %w", err) } - return &Client{client: generated}, nil + return &Client{ + client: generated, + baseURL: baseURL, + token: token, + streamHTTPClient: cfg.streamHTTPClient, + }, nil } func generatedClientBaseURL(parsed *url.URL) string { diff --git a/internal/api/frontends.go b/internal/api/frontends.go index a14802f..66e8bef 100644 --- a/internal/api/frontends.go +++ b/internal/api/frontends.go @@ -131,6 +131,17 @@ func (c *Client) GetFrontendLogs(ctx context.Context, projectID, frontendID uuid return c.searchProjectLogs(ctx, projectID, body) } +// StreamFrontendLogs opens a runtime log stream for a frontend. +func (c *Client) StreamFrontendLogs(ctx context.Context, projectID, frontendID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) { + body := logSearchRequest{ + Resource: logResource(logResourceTypeFrontend, frontendID), + } + if limit > 0 { + body.Limit = &limit + } + return c.streamProjectLogs(ctx, projectID, body, lastEventID) +} + // GetFrontendDeploymentLogs returns one build log search page for a frontend deployment. func (c *Client) GetFrontendDeploymentLogs(ctx context.Context, projectID, frontendID, deploymentID uuid.UUID, limit int, cursor string) (*apiclient.LogSearchResponse, error) { body := logSearchRequest{ @@ -145,6 +156,17 @@ func (c *Client) GetFrontendDeploymentLogs(ctx context.Context, projectID, front return c.searchProjectLogs(ctx, projectID, body) } +// StreamFrontendDeploymentLogs opens a build log stream for a frontend deployment. +func (c *Client) StreamFrontendDeploymentLogs(ctx context.Context, projectID, frontendID, deploymentID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) { + body := logSearchRequest{ + Resource: logDeploymentResource(logResourceTypeFrontend, frontendID, deploymentID), + } + if limit > 0 { + body.Limit = &limit + } + return c.streamProjectLogs(ctx, projectID, body, lastEventID) +} + // CreateFrontendCustomDomain attaches a BYOC custom domain to a frontend. func (c *Client) CreateFrontendCustomDomain(ctx context.Context, projectID, frontendID uuid.UUID, input FrontendCustomDomainInput) (*apiclient.FrontendCustomDomainResponse, error) { body := apiclient.CreateFrontendCustomDomainJSONRequestBody{ diff --git a/internal/api/functions.go b/internal/api/functions.go index 20846cd..ca4075e 100644 --- a/internal/api/functions.go +++ b/internal/api/functions.go @@ -168,6 +168,17 @@ func (c *Client) GetFunctionLogs(ctx context.Context, projectID, functionID uuid return c.searchProjectLogs(ctx, projectID, body) } +// StreamFunctionLogs opens a runtime log stream for a function. +func (c *Client) StreamFunctionLogs(ctx context.Context, projectID, functionID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) { + body := logSearchRequest{ + Resource: logResource(logResourceTypeFunction, functionID), + } + if limit > 0 { + body.Limit = &limit + } + return c.streamProjectLogs(ctx, projectID, body, lastEventID) +} + func buildFunctionDeployMultipart(fn FunctionDeployInput) (*bytes.Buffer, string, error) { body := &bytes.Buffer{} writer := multipart.NewWriter(body) @@ -309,3 +320,14 @@ func (c *Client) GetFunctionDeploymentLogs(ctx context.Context, projectID, funct } return c.searchProjectLogs(ctx, projectID, body) } + +// StreamFunctionDeploymentLogs opens a build log stream for a function deployment. +func (c *Client) StreamFunctionDeploymentLogs(ctx context.Context, projectID, functionID, deploymentID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) { + body := logSearchRequest{ + Resource: logDeploymentResource(logResourceTypeFunction, functionID, deploymentID), + } + if limit > 0 { + body.Limit = &limit + } + return c.streamProjectLogs(ctx, projectID, body, lastEventID) +} diff --git a/internal/api/log_stream.go b/internal/api/log_stream.go new file mode 100644 index 0000000..ba85d89 --- /dev/null +++ b/internal/api/log_stream.go @@ -0,0 +1,180 @@ +package api + +import ( + "bufio" + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "net/url" + "strings" + + "github.com/google/uuid" + + "github.com/Kong/volcano-cli/internal/apiclient" +) + +const logStreamScannerMaxTokenSize = 1024 * 1024 + +// ProjectLogStream reads project log Server-Sent Events. +type ProjectLogStream struct { + body io.ReadCloser + scanner *bufio.Scanner +} + +// ProjectLogStreamEvent is one parsed project log stream event. +type ProjectLogStreamEvent struct { + ID string + Type string + Data string + Log *apiclient.LogSearchEvent + Warning string +} + +// Close closes the underlying response body. +func (s *ProjectLogStream) Close() error { + if s == nil || s.body == nil { + return nil + } + return s.body.Close() +} + +// Next returns the next non-comment SSE event. +func (s *ProjectLogStream) Next() (*ProjectLogStreamEvent, error) { + if s == nil || s.scanner == nil { + return nil, io.EOF + } + for { + event, err := s.nextRaw() + if err != nil { + return nil, err + } + if event == nil { + continue + } + return parseProjectLogStreamEvent(event) + } +} + +func (s *ProjectLogStream) nextRaw() (*ProjectLogStreamEvent, error) { + var event ProjectLogStreamEvent + var data []string + hasField := false + for s.scanner.Scan() { + line := strings.TrimSuffix(s.scanner.Text(), "\r") + if line == "" { + if !hasField { + continue + } + event.Data = strings.Join(data, "\n") + if event.Type == "" { + event.Type = "message" + } + return &event, nil + } + if strings.HasPrefix(line, ":") { + continue + } + field, value, ok := strings.Cut(line, ":") + if !ok { + continue + } + hasField = true + value = strings.TrimPrefix(value, " ") + switch field { + case "id": + event.ID = value + case "event": + event.Type = value + case "data": + data = append(data, value) + } + } + if err := s.scanner.Err(); err != nil { + return nil, err + } + if hasField { + event.Data = strings.Join(data, "\n") + if event.Type == "" { + event.Type = "message" + } + return &event, nil + } + return nil, io.EOF +} + +func parseProjectLogStreamEvent(event *ProjectLogStreamEvent) (*ProjectLogStreamEvent, error) { + switch event.Type { + case "log": + var logEvent apiclient.LogSearchEvent + if err := json.Unmarshal([]byte(event.Data), &logEvent); err != nil { + return nil, fmt.Errorf("failed to decode log stream event: %w", err) + } + event.Log = &logEvent + case "warning": + var warning struct { + Error string `json:"error"` + } + if err := json.Unmarshal([]byte(event.Data), &warning); err != nil { + return nil, fmt.Errorf("failed to decode log stream warning: %w", err) + } + event.Warning = strings.TrimSpace(warning.Error) + } + return event, nil +} + +func (c *Client) streamProjectLogs(ctx context.Context, projectID uuid.UUID, body logSearchRequest, lastEventID string) (*ProjectLogStream, error) { + normalizeLogSearchRequest(&body) + body.Cursor = nil + payload, err := json.Marshal(body) + if err != nil { + return nil, fmt.Errorf("failed to marshal log stream request: %w", err) + } + + streamURL, err := c.projectLogsStreamURL(projectID) + if err != nil { + return nil, err + } + req, err := http.NewRequestWithContext(ctx, http.MethodPost, streamURL, bytes.NewReader(payload)) + if err != nil { + return nil, err + } + req.Header.Set("Accept", "text/event-stream") + req.Header.Set("Content-Type", "application/json") + if c.token != "" { + req.Header.Set("Authorization", "Bearer "+c.token) + } + if lastEventID = strings.TrimSpace(lastEventID); lastEventID != "" { + req.Header.Set("Last-Event-ID", lastEventID) + } + + resp, err := c.streamHTTPClient.Do(req) + if err != nil { + return nil, err + } + if resp.StatusCode < 200 || resp.StatusCode >= 300 { + defer func() { + _ = resp.Body.Close() + }() + respBody, _ := io.ReadAll(resp.Body) + return nil, apiError(resp.StatusCode, respBody) + } + + scanner := bufio.NewScanner(resp.Body) + scanner.Buffer(make([]byte, 0, 64*1024), logStreamScannerMaxTokenSize) + return &ProjectLogStream{body: resp.Body, scanner: scanner}, nil +} + +func (c *Client) projectLogsStreamURL(projectID uuid.UUID) (string, error) { + base, err := url.Parse(c.baseURL) + if err != nil { + return "", fmt.Errorf("failed to parse api base url: %w", err) + } + base.Path = strings.TrimRight(base.Path, "/") + "/projects/" + projectID.String() + "/logs/stream" + base.RawPath = "" + base.RawQuery = "" + base.Fragment = "" + return base.String(), nil +} diff --git a/internal/api/log_stream_test.go b/internal/api/log_stream_test.go new file mode 100644 index 0000000..ca957e3 --- /dev/null +++ b/internal/api/log_stream_test.go @@ -0,0 +1,87 @@ +package api + +import ( + "context" + "encoding/json" + "net/http" + "net/http/httptest" + "testing" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestStreamProjectLogsRequestAndEvents(t *testing.T) { + projectID := uuid.MustParse("11111111-1111-4111-8111-111111111111") + functionID := uuid.MustParse("22222222-2222-4222-8222-222222222222") + deploymentID := uuid.MustParse("33333333-3333-4333-8333-333333333333") + var body map[string]any + var lastEventID string + + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + assert.Equal(t, http.MethodPost, r.Method) + assert.Equal(t, "/volcano-api/projects/"+projectID.String()+"/logs/stream", r.URL.Path) + assert.Equal(t, "Bearer token", r.Header.Get("Authorization")) + assert.Equal(t, "text/event-stream", r.Header.Get("Accept")) + lastEventID = r.Header.Get("Last-Event-ID") + require.NoError(t, json.NewDecoder(r.Body).Decode(&body)) + + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(": connected\n\n")) + _, _ = w.Write([]byte("id: next-id\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"log-1","message":"build finished","timestamp":1760000000000,"resource":{"type":"function","id":"` + functionID.String() + `"},"deployment":{"id":"` + deploymentID.String() + `"}}` + "\n\n")) + _, _ = w.Write([]byte("event: warning\n")) + _, _ = w.Write([]byte(`data: {"error":"temporary read failure"}` + "\n\n")) + })) + defer server.Close() + + client, err := NewClient(server.URL+"/volcano-api", "token", WithHTTPClient(server.Client())) + require.NoError(t, err) + + stream, err := client.StreamFunctionDeploymentLogs(context.Background(), projectID, functionID, deploymentID, 50, "previous-id") + require.NoError(t, err) + defer stream.Close() + + assert.Equal(t, "previous-id", lastEventID) + resource, ok := body["resource"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "function", resource["type"]) + assert.Equal(t, []any{functionID.String()}, resource["ids"]) + deployments, ok := resource["deployments"].(map[string]any) + require.True(t, ok) + assert.Equal(t, []any{deploymentID.String()}, deployments["ids"]) + assert.InEpsilon(t, 50, body["limit"], 0) + assert.NotContains(t, body, "cursor") + + event, err := stream.Next() + require.NoError(t, err) + require.NotNil(t, event.Log) + assert.Equal(t, "next-id", event.ID) + assert.Equal(t, "log-1", event.Log.Id) + assert.Equal(t, "build finished", event.Log.Message) + + event, err = stream.Next() + require.NoError(t, err) + assert.Equal(t, "temporary read failure", event.Warning) +} + +func TestStreamProjectLogsNon2xxReturnsAPIError(t *testing.T) { + projectID := uuid.MustParse("11111111-1111-4111-8111-111111111111") + frontendID := uuid.MustParse("44444444-4444-4444-8444-444444444444") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + writeAPIJSON(t, w, http.StatusServiceUnavailable, map[string]string{"error": "logs unavailable"}) + })) + defer server.Close() + + client, err := NewClient(server.URL, "", WithHTTPClient(server.Client())) + require.NoError(t, err) + + stream, err := client.StreamFrontendLogs(context.Background(), projectID, frontendID, 100, "") + require.Nil(t, stream) + var apiErr *Error + require.ErrorAs(t, err, &apiErr) + assert.Equal(t, http.StatusServiceUnavailable, apiErr.StatusCode) + assert.Contains(t, apiErr.Message, "logs unavailable") +} diff --git a/internal/api/logs.go b/internal/api/logs.go index 00ce077..c01e631 100644 --- a/internal/api/logs.go +++ b/internal/api/logs.go @@ -34,17 +34,7 @@ type logDeploymentRequestSelector struct { } func (c *Client) searchProjectLogs(ctx context.Context, projectID uuid.UUID, body logSearchRequest) (*apiclient.LogSearchResponse, error) { - if body.Limit != nil && *body.Limit <= 0 { - body.Limit = nil - } - if body.Cursor != nil { - cursor := strings.TrimSpace(*body.Cursor) - if cursor == "" { - body.Cursor = nil - } else { - body.Cursor = &cursor - } - } + normalizeLogSearchRequest(&body) payload, err := json.Marshal(body) if err != nil { @@ -58,6 +48,23 @@ func (c *Client) searchProjectLogs(ctx context.Context, projectID uuid.UUID, bod return apiResult(resp.StatusCode(), resp.Body, resp.JSON200, resp.JSON400, resp.JSON401, resp.JSON403, resp.JSON404, resp.JSON503) } +func normalizeLogSearchRequest(body *logSearchRequest) { + if body == nil { + return + } + if body.Limit != nil && *body.Limit <= 0 { + body.Limit = nil + } + if body.Cursor != nil { + cursor := strings.TrimSpace(*body.Cursor) + if cursor == "" { + body.Cursor = nil + } else { + body.Cursor = &cursor + } + } +} + func logResource(resourceType string, resourceID uuid.UUID) logRequestResource { return logRequestResource{ Type: resourceType, diff --git a/internal/cmd/frontends/helpers_test.go b/internal/cmd/frontends/helpers_test.go index f79317f..3f4a1b1 100644 --- a/internal/cmd/frontends/helpers_test.go +++ b/internal/cmd/frontends/helpers_test.go @@ -2,8 +2,10 @@ package frontends import ( "bytes" + "context" "encoding/json" "net/http" + "sync" "testing" "github.com/spf13/cobra" @@ -28,6 +30,41 @@ func executeFrontendsCommand(t *testing.T, cmd *cobra.Command, args ...string) ( return out.String(), err } +// syncBuffer is an io.Writer safe for concurrent writes and reads, used to +// capture command output while a follow command streams on another goroutine. +type syncBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *syncBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *syncBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + +// streamFrontendsCommand runs cmd with a cancelable context on its own +// goroutine, returning the captured output and a channel that receives the +// command's error when it exits. Use it for --follow commands, which run until +// the context is canceled. +func streamFrontendsCommand(ctx context.Context, cmd *cobra.Command, args ...string) (*syncBuffer, <-chan error) { + out := &syncBuffer{} + cmd.SetOut(out) + cmd.SetErr(out) + cmd.SetArgs(args) + errCh := make(chan error, 1) + go func() { + errCh <- cmd.ExecuteContext(ctx) + }() + return out, errCh +} + func setFrontendCommandTestHome(t *testing.T) { t.Helper() t.Setenv("HOME", t.TempDir()) diff --git a/internal/cmd/frontends/logs.go b/internal/cmd/frontends/logs.go index dc7b2ac..33fb284 100644 --- a/internal/cmd/frontends/logs.go +++ b/internal/cmd/frontends/logs.go @@ -7,11 +7,14 @@ import ( "io" "strings" + "github.com/google/uuid" "github.com/spf13/cobra" "github.com/Kong/volcano-cli/internal/api" "github.com/Kong/volcano-cli/internal/apiclient" + apicommon "github.com/Kong/volcano-cli/internal/apiclient/common" clifrontend "github.com/Kong/volcano-cli/internal/frontend" + "github.com/Kong/volcano-cli/internal/logfollow" "github.com/Kong/volcano-cli/internal/output" cliruntime "github.com/Kong/volcano-cli/internal/runtime" ) @@ -28,12 +31,14 @@ type frontendLogsOptions struct { deploymentID string logsType string limit int + follow bool out io.Writer } func newLogs(deps cliruntime.Deps) *cobra.Command { var limit int var logsType string + var follow bool cmd := &cobra.Command{ Use: "logs [deployment-id]", Short: "Show frontend build or runtime logs", @@ -49,11 +54,13 @@ func newLogs(deps cliruntime.Deps) *cobra.Command { deploymentID: deploymentID, logsType: logsType, limit: limit, + follow: follow, out: cmd.OutOrStdout(), }) }, } cmd.Flags().IntVarP(&limit, "limit", "l", defaultFrontendLogLimit, "Maximum logs per API page") + cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Stream logs as new events arrive") cmd.Flags().StringVar(&logsType, "type", "", "Log type to fetch: build or runtime") if err := cmd.MarkFlagRequired("type"); err != nil { panic(err) @@ -77,6 +84,12 @@ func runLogs(ctx context.Context, opts frontendLogsOptions) error { } if logsType == frontendLogsTypeRuntime { + if opts.follow { + fmt.Fprintf(opts.out, "Following runtime logs for frontend %s\n\n", frontend.Name) + return logfollow.Runtime(ctx, opts.deps, opts.out, func(ctx context.Context, lastEventID string) (*api.ProjectLogStream, error) { + return service.StreamRuntimeLogs(ctx, frontend.Id, opts.limit, lastEventID) + }) + } fmt.Fprintf(opts.out, "Fetching runtime logs for frontend %s\n\n", frontend.Name) return output.PrintSearchLogs(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { return service.RuntimeLogs(ctx, frontend.Id, opts.limit, cursor) @@ -109,12 +122,51 @@ func runLogs(ctx context.Context, opts frontendLogsOptions) error { deploymentID = &deployment.Id } + if opts.follow { + fmt.Fprintf(opts.out, "Following build logs for frontend %s deployment %s\n\n", frontend.Name, deploymentID.String()) + return followDeploymentLogs(ctx, opts, service, frontend.Id, *deploymentID) + } fmt.Fprintf(opts.out, "Fetching build logs for frontend %s deployment %s\n\n", frontend.Name, deploymentID.String()) return output.PrintSearchLogs(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { return service.DeploymentLogs(ctx, frontend.Id, *deploymentID, opts.limit, cursor) }) } +func followDeploymentLogs(ctx context.Context, opts frontendLogsOptions, service clifrontend.Service, frontendID, deploymentID uuid.UUID) error { + streamCtx, cancel := context.WithCancel(ctx) + stream, err := service.StreamDeploymentLogs(streamCtx, frontendID, deploymentID, opts.limit) + if err != nil { + cancel() + return err + } + return logfollow.Deployment(ctx, opts.deps, opts.out, stream, cancel, func(ctx context.Context) (bool, error) { + deployment, err := service.ResolveDeployment(ctx, frontendID, deploymentID.String()) + if err != nil { + return false, err + } + return frontendDeploymentTerminal(deployment), nil + }, func(ctx context.Context, printed map[string]struct{}) error { + return output.PrintSearchLogsSkipping(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { + return service.DeploymentLogs(ctx, frontendID, deploymentID, opts.limit, cursor) + }, printed) + }) +} + +func frontendDeploymentTerminal(deployment *apiclient.FrontendDeployment) bool { + if deployment == nil { + return false + } + switch deployment.Status { + case apicommon.FrontendDeploymentStatusActive, + apicommon.FrontendDeploymentStatusDeleted, + apicommon.FrontendDeploymentStatusDeleting, + apicommon.FrontendDeploymentStatusFailed: + return true + default: + return false + } +} + func normalizeLogsType(value string) (string, error) { logsType := strings.ToLower(strings.TrimSpace(value)) switch logsType { diff --git a/internal/cmd/frontends/logs_test.go b/internal/cmd/frontends/logs_test.go index 605a236..b6d98c4 100644 --- a/internal/cmd/frontends/logs_test.go +++ b/internal/cmd/frontends/logs_test.go @@ -1,10 +1,13 @@ package frontends import ( + "context" "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -60,6 +63,54 @@ func TestFrontendsLogs(t *testing.T) { assert.Contains(t, out, "second runtime") }) + t.Run("runtime follow streams", func(t *testing.T) { + setFrontendCommandTestHome(t) + saveFrontendCommandTestConfig(t) + var streamBody map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+frontendProjectID+"/frontends": + writeFrontendCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{frontendCommandPayload(frontendID, "web")}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+frontendProjectID+"/logs/stream": + require.NoError(t, json.NewDecoder(r.Body).Decode(&streamBody)) + writeFrontendLogStream(t, w, "runtime follow") + // A healthy backend holds the connection open and tails new + // events, so keep it open until the client cancels. + <-r.Context().Done() + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+frontendProjectID+"/logs/search": + t.Errorf("runtime --follow should use logs/stream") + http.NotFound(w, r) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + out, errCh := streamFrontendsCommand(ctx, New(cliruntime.Deps{HTTPClient: server.Client(), APIBaseURL: server.URL}), "logs", "web", "--type", "runtime", "--follow", "--limit", "2") + + require.Eventually(t, func() bool { + return strings.Contains(out.String(), "runtime follow") + }, 2*time.Second, 10*time.Millisecond) + cancel() + require.NoError(t, <-errCh) + + resource, ok := streamBody["resource"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "frontend", resource["type"]) + assert.Equal(t, []any{frontendID}, resource["ids"]) + assert.NotContains(t, resource, "deployments") + assert.InEpsilon(t, 2, streamBody["limit"], 0) + assert.Contains(t, out.String(), "Following runtime logs for frontend web") + }) + t.Run("build defaults current deployment", func(t *testing.T) { setFrontendCommandTestHome(t) saveFrontendCommandTestConfig(t) @@ -84,6 +135,61 @@ func TestFrontendsLogs(t *testing.T) { assert.Contains(t, out, "build log") }) + t.Run("build follow streams deployment", func(t *testing.T) { + setFrontendCommandTestHome(t) + saveFrontendCommandTestConfig(t) + var streamBody map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+frontendProjectID+"/frontends": + frontend := frontendCommandPayload(frontendID, "web") + delete(frontend, "current_deployment_id") + writeFrontendCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{frontend}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+frontendProjectID+"/frontends/"+frontendID+"/deployments": + writeFrontendCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{frontendDeploymentCommandPayload(otherFrontendDeploymentID)}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+frontendProjectID+"/logs/stream": + require.NoError(t, json.NewDecoder(r.Body).Decode(&streamBody)) + writeFrontendLogStream(t, w, "build follow") + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+frontendProjectID+"/logs/search": + // After the stream ends and the deployment is terminal, the + // follow loop runs a catch-up search that must suppress logs + // already printed from the stream (id "stream-log"). + writeFrontendCommandJSON(t, w, http.StatusOK, catchUpLogResponse()) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + out, err := executeFrontendsCommand(t, New(cliruntime.Deps{HTTPClient: server.Client(), APIBaseURL: server.URL}), "logs", "web", otherFrontendDeploymentID, "--type", "build", "--follow") + require.NoError(t, err) + resource, ok := streamBody["resource"].(map[string]any) + require.True(t, ok) + deployments, ok := resource["deployments"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "frontend", resource["type"]) + assert.Equal(t, []any{frontendID}, resource["ids"]) + assert.Equal(t, []any{otherFrontendDeploymentID}, deployments["ids"]) + assert.Contains(t, out, "Following build logs for frontend web deployment "+otherFrontendDeploymentID) + assert.Contains(t, out, "build follow") + // The catch-up search backfills logs not seen on the stream... + assert.Contains(t, out, "catch up log") + // ...without reprinting the streamed log. + assert.Equal(t, 1, strings.Count(out, "build follow")) + }) + t.Run("validates type", func(t *testing.T) { _, err := executeFrontendsCommand(t, New(cliruntime.Deps{}), "logs", "web", "--type", "deploy") require.ErrorContains(t, err, "--type must be one of: build, runtime") @@ -170,3 +276,38 @@ func frontendLogCommandResponse(message string, hasMore bool, next string) map[s } return response } + +func catchUpLogResponse() map[string]any { + return map[string]any{ + "data": []any{ + map[string]any{ + "id": "stream-log", + "message": "build follow", + "region": "aws-us-east-1", + "timestamp": int64(1760000000000), + }, + map[string]any{ + "id": "catch-up-log", + "message": "catch up log", + "region": "aws-us-east-1", + "timestamp": int64(1760000000001), + }, + }, + "has_more": false, + "limit": 100, + "page": 1, + "total": 2, + } +} + +func writeFrontendLogStream(t *testing.T, w http.ResponseWriter, message string) { + t.Helper() + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(": connected\n\n")) + _, _ = w.Write([]byte("id: stream-cursor\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"stream-log","message":"` + message + `","timestamp":1760000000000,"resource":{"type":"frontend","id":"` + frontendID + `"}}` + "\n\n")) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } +} diff --git a/internal/cmd/functions/helpers_test.go b/internal/cmd/functions/helpers_test.go index b7f0318..bfbf51b 100644 --- a/internal/cmd/functions/helpers_test.go +++ b/internal/cmd/functions/helpers_test.go @@ -2,8 +2,10 @@ package functions import ( "bytes" + "context" "encoding/json" "net/http" + "sync" "testing" "github.com/spf13/cobra" @@ -28,6 +30,41 @@ func executeFunctionsCommand(t *testing.T, cmd *cobra.Command, args ...string) ( return out.String(), err } +// syncBuffer is an io.Writer safe for concurrent writes and reads, used to +// capture command output while a follow command streams on another goroutine. +type syncBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *syncBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *syncBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + +// streamFunctionsCommand runs cmd with a cancelable context on its own +// goroutine, returning the captured output and a channel that receives the +// command's error when it exits. Use it for --follow commands, which run until +// the context is canceled. +func streamFunctionsCommand(ctx context.Context, cmd *cobra.Command, args ...string) (*syncBuffer, <-chan error) { + out := &syncBuffer{} + cmd.SetOut(out) + cmd.SetErr(out) + cmd.SetArgs(args) + errCh := make(chan error, 1) + go func() { + errCh <- cmd.ExecuteContext(ctx) + }() + return out, errCh +} + func setFunctionCommandTestHome(t *testing.T) { t.Helper() t.Setenv("HOME", t.TempDir()) diff --git a/internal/cmd/functions/logs.go b/internal/cmd/functions/logs.go index 0409877..2a94318 100644 --- a/internal/cmd/functions/logs.go +++ b/internal/cmd/functions/logs.go @@ -7,11 +7,14 @@ import ( "io" "strings" + "github.com/google/uuid" "github.com/spf13/cobra" "github.com/Kong/volcano-cli/internal/api" "github.com/Kong/volcano-cli/internal/apiclient" + apicommon "github.com/Kong/volcano-cli/internal/apiclient/common" clifunction "github.com/Kong/volcano-cli/internal/function" + "github.com/Kong/volcano-cli/internal/logfollow" "github.com/Kong/volcano-cli/internal/output" cliruntime "github.com/Kong/volcano-cli/internal/runtime" ) @@ -28,12 +31,14 @@ type logsOptions struct { deploymentID string logsType string limit int + follow bool out io.Writer } func newLogs(deps cliruntime.Deps) *cobra.Command { var limit int var logsType string + var follow bool cmd := &cobra.Command{ Use: "logs [deployment-id]", Short: "Show function build or runtime logs", @@ -49,11 +54,13 @@ func newLogs(deps cliruntime.Deps) *cobra.Command { deploymentID: deploymentID, logsType: logsType, limit: limit, + follow: follow, out: cmd.OutOrStdout(), }) }, } cmd.Flags().IntVarP(&limit, "limit", "l", defaultLogLimit, "Maximum logs per API page") + cmd.Flags().BoolVarP(&follow, "follow", "f", false, "Stream logs as new events arrive") cmd.Flags().StringVar(&logsType, "type", "", "Log type to fetch: build or runtime") if err := cmd.MarkFlagRequired("type"); err != nil { panic(err) @@ -77,6 +84,12 @@ func runLogs(ctx context.Context, opts logsOptions) error { } if logsType == logsTypeRuntime { + if opts.follow { + fmt.Fprintf(opts.out, "Following runtime logs for function %s\n\n", function.Name) + return logfollow.Runtime(ctx, opts.deps, opts.out, func(ctx context.Context, lastEventID string) (*api.ProjectLogStream, error) { + return service.StreamRuntimeLogs(ctx, function.Id, opts.limit, lastEventID) + }) + } fmt.Fprintf(opts.out, "Fetching runtime logs for function %s\n\n", function.Name) return output.PrintSearchLogs(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { return service.RuntimeLogs(ctx, function.Id, opts.limit, cursor) @@ -100,12 +113,51 @@ func runLogs(ctx context.Context, opts logsOptions) error { deploymentID = &deployment.Id } + if opts.follow { + fmt.Fprintf(opts.out, "Following build logs for function %s deployment %s\n\n", function.Name, deploymentID.String()) + return followDeploymentLogs(ctx, opts, service, function.Id, *deploymentID) + } fmt.Fprintf(opts.out, "Fetching build logs for function %s deployment %s\n\n", function.Name, deploymentID.String()) return output.PrintSearchLogs(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { return service.DeploymentLogs(ctx, function.Id, *deploymentID, opts.limit, cursor) }) } +func followDeploymentLogs(ctx context.Context, opts logsOptions, service clifunction.Service, functionID, deploymentID uuid.UUID) error { + streamCtx, cancel := context.WithCancel(ctx) + stream, err := service.StreamDeploymentLogs(streamCtx, functionID, deploymentID, opts.limit) + if err != nil { + cancel() + return err + } + return logfollow.Deployment(ctx, opts.deps, opts.out, stream, cancel, func(ctx context.Context) (bool, error) { + deployment, err := service.ResolveDeployment(ctx, functionID, deploymentID.String()) + if err != nil { + return false, err + } + return functionDeploymentTerminal(deployment), nil + }, func(ctx context.Context, printed map[string]struct{}) error { + return output.PrintSearchLogsSkipping(opts.out, func(cursor string) (*apiclient.LogSearchResponse, error) { + return service.DeploymentLogs(ctx, functionID, deploymentID, opts.limit, cursor) + }, printed) + }) +} + +func functionDeploymentTerminal(deployment *apiclient.FunctionDeployment) bool { + if deployment == nil { + return false + } + switch deployment.Status { + case apicommon.FunctionDeploymentStatusActive, + apicommon.FunctionDeploymentStatusDeleted, + apicommon.FunctionDeploymentStatusDeleting, + apicommon.FunctionDeploymentStatusFailed: + return true + default: + return false + } +} + func normalizeLogsType(value string) (string, error) { logsType := strings.ToLower(strings.TrimSpace(value)) switch logsType { diff --git a/internal/cmd/functions/logs_test.go b/internal/cmd/functions/logs_test.go index 1a2b356..ef1adcf 100644 --- a/internal/cmd/functions/logs_test.go +++ b/internal/cmd/functions/logs_test.go @@ -1,10 +1,13 @@ package functions import ( + "context" "encoding/json" "net/http" "net/http/httptest" + "strings" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -63,6 +66,54 @@ func TestFunctionsLogs(t *testing.T) { assert.Contains(t, out, "second runtime") }) + t.Run("runtime follow streams", func(t *testing.T) { + setFunctionCommandTestHome(t) + saveFunctionCommandTestConfig(t) + var streamBody map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+functionProjectID+"/functions": + writeFunctionCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{functionCommandPayload(functionID, "hello")}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+functionProjectID+"/logs/stream": + require.NoError(t, json.NewDecoder(r.Body).Decode(&streamBody)) + writeFunctionLogStream(t, w, "runtime follow") + // A healthy backend holds the connection open and tails new + // events, so keep it open until the client cancels. + <-r.Context().Done() + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+functionProjectID+"/logs/search": + t.Errorf("runtime --follow should use logs/stream") + http.NotFound(w, r) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + out, errCh := streamFunctionsCommand(ctx, New(cliruntime.Deps{HTTPClient: server.Client(), APIBaseURL: server.URL}), "logs", "hello", "--type", "runtime", "--follow", "--limit", "2") + + require.Eventually(t, func() bool { + return strings.Contains(out.String(), "runtime follow") + }, 2*time.Second, 10*time.Millisecond) + cancel() + require.NoError(t, <-errCh) + + resource, ok := streamBody["resource"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "function", resource["type"]) + assert.Equal(t, []any{functionID}, resource["ids"]) + assert.NotContains(t, resource, "deployments") + assert.InEpsilon(t, 2, streamBody["limit"], 0) + assert.Contains(t, out.String(), "Following runtime logs for function hello") + }) + t.Run("build defaults latest deployment", func(t *testing.T) { setFunctionCommandTestHome(t) saveFunctionCommandTestConfig(t) @@ -87,6 +138,59 @@ func TestFunctionsLogs(t *testing.T) { assert.Contains(t, out, "build log") }) + t.Run("build follow streams deployment", func(t *testing.T) { + setFunctionCommandTestHome(t) + saveFunctionCommandTestConfig(t) + var streamBody map[string]any + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + switch { + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+functionProjectID+"/functions": + writeFunctionCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{functionCommandPayload(functionID, "hello")}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodGet && r.URL.Path == "/projects/"+functionProjectID+"/functions/"+functionID+"/deployments": + writeFunctionCommandJSON(t, w, http.StatusOK, map[string]any{ + "data": []any{deploymentCommandPayload(otherDeploymentID)}, + "has_more": false, + "page": 1, + "limit": 100, + "total": 1, + }) + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+functionProjectID+"/logs/stream": + require.NoError(t, json.NewDecoder(r.Body).Decode(&streamBody)) + writeFunctionLogStream(t, w, "build follow") + case r.Method == http.MethodPost && r.URL.Path == "/projects/"+functionProjectID+"/logs/search": + // After the stream ends and the deployment is terminal, the + // follow loop runs a catch-up search that must suppress logs + // already printed from the stream (id "stream-log"). + writeFunctionCommandJSON(t, w, http.StatusOK, catchUpLogResponse()) + default: + http.NotFound(w, r) + } + })) + defer server.Close() + + out, err := executeFunctionsCommand(t, New(cliruntime.Deps{HTTPClient: server.Client(), APIBaseURL: server.URL}), "logs", "hello", otherDeploymentID, "--type", "build", "--follow") + require.NoError(t, err) + resource, ok := streamBody["resource"].(map[string]any) + require.True(t, ok) + deployments, ok := resource["deployments"].(map[string]any) + require.True(t, ok) + assert.Equal(t, "function", resource["type"]) + assert.Equal(t, []any{functionID}, resource["ids"]) + assert.Equal(t, []any{otherDeploymentID}, deployments["ids"]) + assert.Contains(t, out, "Following build logs for function hello deployment "+otherDeploymentID) + assert.Contains(t, out, "build follow") + // The catch-up search backfills logs not seen on the stream... + assert.Contains(t, out, "catch up log") + // ...without reprinting the streamed log. + assert.Equal(t, 1, strings.Count(out, "build follow")) + }) + t.Run("validates type", func(t *testing.T) { _, err := executeFunctionsCommand(t, New(cliruntime.Deps{}), "logs", "hello", "--type", "deploy") require.ErrorContains(t, err, "--type must be one of: build, runtime") @@ -183,3 +287,38 @@ func logCommandResponse(message string, hasMore bool, next string) map[string]an } return response } + +func catchUpLogResponse() map[string]any { + return map[string]any{ + "data": []any{ + map[string]any{ + "id": "stream-log", + "message": "build follow", + "region": "aws-us-east-1", + "timestamp": int64(1760000000000), + }, + map[string]any{ + "id": "catch-up-log", + "message": "catch up log", + "region": "aws-us-east-1", + "timestamp": int64(1760000000001), + }, + }, + "has_more": false, + "limit": 100, + "page": 1, + "total": 2, + } +} + +func writeFunctionLogStream(t *testing.T, w http.ResponseWriter, message string) { + t.Helper() + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte(": connected\n\n")) + _, _ = w.Write([]byte("id: stream-cursor\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"stream-log","message":"` + message + `","timestamp":1760000000000,"resource":{"type":"function","id":"` + functionID + `"}}` + "\n\n")) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } +} diff --git a/internal/frontend/frontend.go b/internal/frontend/frontend.go index 923275e..f47cab8 100644 --- a/internal/frontend/frontend.go +++ b/internal/frontend/frontend.go @@ -197,6 +197,21 @@ func (s Service) RuntimeLogs(ctx context.Context, frontendID uuid.UUID, limit in return logs, nil } +// StreamRuntimeLogs opens a runtime log stream for a frontend, resuming after +// lastEventID when it is set. +func (s Service) StreamRuntimeLogs(ctx context.Context, frontendID uuid.UUID, limit int, lastEventID string) (*api.ProjectLogStream, error) { + authenticated, err := s.sessions.CurrentProject() + if err != nil { + return nil, err + } + + stream, err := authenticated.API.StreamFrontendLogs(ctx, authenticated.ProjectID, frontendID, limit, lastEventID) + if err != nil { + return nil, fmt.Errorf("failed to stream runtime logs: %w", err) + } + return stream, nil +} + // DeploymentLogs returns one build log search page for a frontend deployment. func (s Service) DeploymentLogs(ctx context.Context, frontendID, deploymentID uuid.UUID, limit int, cursor string) (*apiclient.LogSearchResponse, error) { authenticated, err := s.sessions.CurrentProject() @@ -211,6 +226,20 @@ func (s Service) DeploymentLogs(ctx context.Context, frontendID, deploymentID uu return logs, nil } +// StreamDeploymentLogs opens a build log stream for a frontend deployment. +func (s Service) StreamDeploymentLogs(ctx context.Context, frontendID, deploymentID uuid.UUID, limit int) (*api.ProjectLogStream, error) { + authenticated, err := s.sessions.CurrentProject() + if err != nil { + return nil, err + } + + stream, err := authenticated.API.StreamFrontendDeploymentLogs(ctx, authenticated.ProjectID, frontendID, deploymentID, limit, "") + if err != nil { + return nil, fmt.Errorf("failed to stream deployment logs: %w", err) + } + return stream, nil +} + // CreateCustomDomain attaches a custom domain to a frontend. func (s Service) CreateCustomDomain(ctx context.Context, identifier string, input api.FrontendCustomDomainInput) (*apiclient.Frontend, *apiclient.FrontendCustomDomainResponse, error) { authenticated, err := s.sessions.CurrentProject() diff --git a/internal/function/function.go b/internal/function/function.go index b22d723..70e4ec5 100644 --- a/internal/function/function.go +++ b/internal/function/function.go @@ -444,6 +444,21 @@ func (s Service) RuntimeLogs(ctx context.Context, functionID uuid.UUID, limit in return logs, nil } +// StreamRuntimeLogs opens a runtime log stream for a function, resuming after +// lastEventID when it is set. +func (s Service) StreamRuntimeLogs(ctx context.Context, functionID uuid.UUID, limit int, lastEventID string) (*api.ProjectLogStream, error) { + authenticated, err := s.sessions.CurrentProject() + if err != nil { + return nil, err + } + + stream, err := authenticated.API.StreamFunctionLogs(ctx, authenticated.ProjectID, functionID, limit, lastEventID) + if err != nil { + return nil, fmt.Errorf("failed to stream runtime logs: %w", err) + } + return stream, nil +} + // DeploymentLogs returns one build log search page for a function deployment. func (s Service) DeploymentLogs(ctx context.Context, functionID, deploymentID uuid.UUID, limit int, cursor string) (*apiclient.LogSearchResponse, error) { authenticated, err := s.sessions.CurrentProject() @@ -458,6 +473,20 @@ func (s Service) DeploymentLogs(ctx context.Context, functionID, deploymentID uu return logs, nil } +// StreamDeploymentLogs opens a build log stream for a function deployment. +func (s Service) StreamDeploymentLogs(ctx context.Context, functionID, deploymentID uuid.UUID, limit int) (*api.ProjectLogStream, error) { + authenticated, err := s.sessions.CurrentProject() + if err != nil { + return nil, err + } + + stream, err := authenticated.API.StreamFunctionDeploymentLogs(ctx, authenticated.ProjectID, functionID, deploymentID, limit, "") + if err != nil { + return nil, fmt.Errorf("failed to stream deployment logs: %w", err) + } + return stream, nil +} + func resolveInvokeFunctionID(ctx context.Context, authenticated *clisession.ProjectSession, identifier string) (uuid.UUID, error) { target := normalizeTargetFunction(identifier) if target == "" { diff --git a/internal/logfollow/follow.go b/internal/logfollow/follow.go new file mode 100644 index 0000000..bf758c8 --- /dev/null +++ b/internal/logfollow/follow.go @@ -0,0 +1,256 @@ +// Package logfollow contains shared CLI log stream follow loops. +package logfollow + +import ( + "context" + "errors" + "io" + "sync" + "time" + + "github.com/Kong/volcano-cli/internal/api" + "github.com/Kong/volcano-cli/internal/output" + cliruntime "github.com/Kong/volcano-cli/internal/runtime" +) + +const ( + deploymentPollInterval = 2 * time.Second + reconnectBackoff = 1 * time.Second +) + +// TerminalCheck reports whether a followed deployment has reached a terminal status. +type TerminalCheck func(context.Context) (bool, error) + +// CatchUp runs after a followed deployment reaches a terminal status. +type CatchUp func(context.Context, map[string]struct{}) error + +// StreamOpener opens a project log stream, resuming after lastEventID when it is +// set. logfollow re-invokes it to reconnect a dropped stream from its last +// cursor. +type StreamOpener func(ctx context.Context, lastEventID string) (*api.ProjectLogStream, error) + +// Runtime follows a runtime log stream until the context is canceled. The +// backend holds a healthy connection open and tails new events, so a closed +// stream is treated as a transient disconnect: Runtime reconnects from the last +// stream cursor — resuming without replaying recent events — until the context +// is canceled. A failure to open the stream is surfaced to the caller. +func Runtime(ctx context.Context, deps cliruntime.Deps, w io.Writer, open StreamOpener) error { + stream := newReconnectingStream(ctx, deps, open) + defer func() { + _ = stream.Close() + }() + for { + event, err := stream.Next() + if err != nil { + if cleanStreamShutdown(ctx, err) { + return nil + } + return err + } + output.PrintLogStreamEvent(w, event) + } +} + +// Deployment prints a deployment log stream until the deployment reaches a +// terminal status, then runs catchUp with the IDs already printed from the +// stream. If the stream closes before the deployment is terminal, it keeps +// polling so the catch-up search still runs. +func Deployment(ctx context.Context, deps cliruntime.Deps, w io.Writer, stream *api.ProjectLogStream, cancel context.CancelFunc, terminal TerminalCheck, catchUp CatchUp) error { + if cancel == nil { + cancel = func() {} + } + defer cancel() + + printed := newPrintedLogIDs() + errCh := make(chan error, 1) + go func() { + defer func() { + _ = stream.Close() + }() + for { + event, err := stream.Next() + if err != nil { + errCh <- err + return + } + if event.Log != nil { + printed.add(event.Log.Id) + } + output.PrintLogStreamEvent(w, event) + } + }() + + ticker := cliruntime.NewTicker(deps, deploymentPollInterval) + defer ticker.Stop() + + // streamCh is set to nil once the stream goroutine has exited so the select + // stops waiting on it; the loop then keeps polling for terminal status. + var streamCh <-chan error = errCh + + // finish stops the stream goroutine (if still running) and runs the + // duplicate-suppressed catch-up search. + finish := func() error { + cancel() + if streamErr := awaitStream(streamCh); !cleanStreamShutdown(ctx, streamErr) { + return streamErr + } + if catchUp == nil { + return nil + } + return catchUp(ctx, printed.snapshot()) + } + + for { + select { + case err := <-streamCh: + // The stream ended on its own. Surface unexpected failures; on a + // clean shutdown stop waiting on the stream and check terminal status + // now so the catch-up search runs without waiting for the next poll — + // the stream can close before the deployment is terminal. + if !cleanStreamShutdown(ctx, err) { + return err + } + streamCh = nil + done, err := terminal(ctx) + if err != nil { + return err + } + if done { + return finish() + } + case <-ctx.Done(): + cancel() + if err := awaitStream(streamCh); !cleanStreamShutdown(ctx, err) { + return err + } + return nil + case <-ticker.C(): + done, err := terminal(ctx) + if err != nil { + cancel() + if streamErr := awaitStream(streamCh); !cleanStreamShutdown(ctx, streamErr) { + return streamErr + } + return err + } + if !done { + continue + } + return finish() + } + } +} + +// awaitStream waits for the streaming goroutine to send its final error, +// returning nil if it has already exited (streamCh is nil). +func awaitStream(streamCh <-chan error) error { + if streamCh == nil { + return nil + } + return <-streamCh +} + +func cleanStreamShutdown(ctx context.Context, err error) bool { + if err == nil || errors.Is(err, io.EOF) || errors.Is(err, context.Canceled) { + return true + } + return ctx.Err() != nil && errors.Is(ctx.Err(), context.Canceled) +} + +// reconnectingStream reads project log stream events, reconnecting from the last +// delivered cursor when the underlying stream ends. A failure to (re)open the +// stream is surfaced to the caller; a mid-stream read failure backs off and +// reconnects so the tail survives a dropped connection. +type reconnectingStream struct { + ctx context.Context + deps cliruntime.Deps + open StreamOpener + stream *api.ProjectLogStream + lastID string +} + +func newReconnectingStream(ctx context.Context, deps cliruntime.Deps, open StreamOpener) *reconnectingStream { + return &reconnectingStream{ctx: ctx, deps: deps, open: open} +} + +// Next returns the next stream event, transparently reconnecting from the last +// cursor when the stream drops. It only returns an error when the context is +// canceled or the stream cannot be opened. +func (s *reconnectingStream) Next() (*api.ProjectLogStreamEvent, error) { + for { + if s.stream == nil { + stream, err := s.open(s.ctx, s.lastID) + if err != nil { + return nil, err + } + s.stream = stream + } + event, err := s.stream.Next() + if err != nil { + _ = s.stream.Close() + s.stream = nil + if s.ctx.Err() != nil { + return nil, err + } + // A healthy backend stream stays open, so any close is a + // disconnect: back off and reconnect from the last cursor. + if waitErr := s.wait(); waitErr != nil { + return nil, waitErr + } + continue + } + if event != nil && event.ID != "" { + s.lastID = event.ID + } + return event, nil + } +} + +// Close closes the current underlying stream, if any. +func (s *reconnectingStream) Close() error { + if s.stream == nil { + return nil + } + err := s.stream.Close() + s.stream = nil + return err +} + +func (s *reconnectingStream) wait() error { + ticker := cliruntime.NewTicker(s.deps, reconnectBackoff) + defer ticker.Stop() + select { + case <-s.ctx.Done(): + return s.ctx.Err() + case <-ticker.C(): + return nil + } +} + +type printedLogIDs struct { + mu sync.Mutex + ids map[string]struct{} +} + +func newPrintedLogIDs() *printedLogIDs { + return &printedLogIDs{ids: make(map[string]struct{})} +} + +func (p *printedLogIDs) add(id string) { + if id == "" { + return + } + p.mu.Lock() + defer p.mu.Unlock() + p.ids[id] = struct{}{} +} + +func (p *printedLogIDs) snapshot() map[string]struct{} { + p.mu.Lock() + defer p.mu.Unlock() + snapshot := make(map[string]struct{}, len(p.ids)) + for id := range p.ids { + snapshot[id] = struct{}{} + } + return snapshot +} diff --git a/internal/logfollow/follow_test.go b/internal/logfollow/follow_test.go new file mode 100644 index 0000000..c503457 --- /dev/null +++ b/internal/logfollow/follow_test.go @@ -0,0 +1,254 @@ +package logfollow_test + +import ( + "bytes" + "context" + "errors" + "net/http" + "net/http/httptest" + "strings" + "sync" + "testing" + "time" + + "github.com/google/uuid" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/Kong/volcano-cli/internal/api" + "github.com/Kong/volcano-cli/internal/logfollow" + cliruntime "github.com/Kong/volcano-cli/internal/runtime" +) + +func TestDeploymentStopsAtTerminalStatusAndRunsCatchUp(t *testing.T) { + projectID := uuid.MustParse("11111111-1111-4111-8111-111111111111") + functionID := uuid.MustParse("22222222-2222-4222-8222-222222222222") + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + flusher, ok := w.(http.Flusher) + require.True(t, ok) + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte("id: stream-cursor\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"streamed-id","message":"streamed log","timestamp":1760000000000,"resource":{"type":"function","id":"` + functionID.String() + `"}}` + "\n\n")) + flusher.Flush() + <-r.Context().Done() + })) + defer server.Close() + + client, err := api.NewClient(server.URL, "", api.WithHTTPClient(server.Client())) + require.NoError(t, err) + ctx := context.Background() + streamCtx, cancel := context.WithCancel(ctx) + stream, err := client.StreamFunctionLogs(streamCtx, projectID, functionID, 100, "") + require.NoError(t, err) + + ticker := newFakeTicker() + var out syncBuffer + var printed map[string]struct{} + done := make(chan error, 1) + go func() { + done <- logfollow.Deployment(ctx, cliruntime.Deps{ + NewTicker: func(time.Duration) cliruntime.Ticker { + return ticker + }, + }, &out, stream, cancel, func(context.Context) (bool, error) { + return true, nil + }, func(_ context.Context, ids map[string]struct{}) error { + printed = ids + return nil + }) + }() + + require.Eventually(t, func() bool { + return strings.Contains(out.String(), "streamed log") + }, time.Second, 10*time.Millisecond) + ticker.tick() + + require.NoError(t, <-done) + assert.Contains(t, printed, "streamed-id") +} + +func TestDeploymentRunsCatchUpWhenStreamEndsBeforeTerminal(t *testing.T) { + projectID := uuid.MustParse("11111111-1111-4111-8111-111111111111") + functionID := uuid.MustParse("22222222-2222-4222-8222-222222222222") + // The server delivers one event and then closes the stream (EOF) while the + // deployment is still building. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "text/event-stream") + _, _ = w.Write([]byte("id: stream-cursor\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"streamed-id","message":"streamed log","timestamp":1760000000000,"resource":{"type":"function","id":"` + functionID.String() + `"}}` + "\n\n")) + })) + defer server.Close() + + client, err := api.NewClient(server.URL, "", api.WithHTTPClient(server.Client())) + require.NoError(t, err) + ctx := context.Background() + streamCtx, cancel := context.WithCancel(ctx) + stream, err := client.StreamFunctionLogs(streamCtx, projectID, functionID, 100, "") + require.NoError(t, err) + + ticker := newFakeTicker() + var out syncBuffer + terminalReady := make(chan struct{}) + var printed map[string]struct{} + done := make(chan error, 1) + go func() { + done <- logfollow.Deployment(ctx, cliruntime.Deps{ + NewTicker: func(time.Duration) cliruntime.Ticker { + return ticker + }, + }, &out, stream, cancel, func(context.Context) (bool, error) { + select { + case <-terminalReady: + return true, nil + default: + return false, nil + } + }, func(_ context.Context, ids map[string]struct{}) error { + printed = ids + return nil + }) + }() + + // Wait until the streamed event is printed; the stream then closes on its + // own, but the follow loop must keep polling instead of exiting. + require.Eventually(t, func() bool { + return strings.Contains(out.String(), "streamed log") + }, time.Second, 10*time.Millisecond) + + // Mark the deployment terminal and drive the poll ticker until the loop + // returns. The catch-up search must still run even though the stream ended. + close(terminalReady) + for { + select { + case followErr := <-done: + require.NoError(t, followErr) + assert.Contains(t, printed, "streamed-id") + return + case ticker.ch <- time.Now(): + case <-time.After(10 * time.Millisecond): + } + } +} + +func TestRuntimeReconnectsFromCursor(t *testing.T) { + projectID := uuid.MustParse("11111111-1111-4111-8111-111111111111") + functionID := uuid.MustParse("22222222-2222-4222-8222-222222222222") + + var mu sync.Mutex + var lastEventIDs []string + // The first connection delivers one event and then closes (EOF) while the + // resource keeps running; the resumed connection delivers a second event and + // stays open like a healthy tail. + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + mu.Lock() + attempt := len(lastEventIDs) + lastEventIDs = append(lastEventIDs, r.Header.Get("Last-Event-ID")) + mu.Unlock() + + w.Header().Set("Content-Type", "text/event-stream") + if attempt == 0 { + writeStreamLog(w, "cursor-1", "first-id", "first event") + return + } + writeStreamLog(w, "cursor-2", "second-id", "second event") + <-r.Context().Done() + })) + defer server.Close() + + client, err := api.NewClient(server.URL, "", api.WithHTTPClient(server.Client())) + require.NoError(t, err) + + open := func(ctx context.Context, lastEventID string) (*api.ProjectLogStream, error) { + return client.StreamFunctionLogs(ctx, projectID, functionID, 100, lastEventID) + } + + ticker := newFakeTicker() + deps := cliruntime.Deps{NewTicker: func(time.Duration) cliruntime.Ticker { return ticker }} + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + var out syncBuffer + done := make(chan error, 1) + go func() { + done <- logfollow.Runtime(ctx, deps, &out, open) + }() + + require.Eventually(t, func() bool { + return strings.Contains(out.String(), "first event") + }, time.Second, 10*time.Millisecond) + + // Release the reconnect backoff until the resumed event arrives. + require.Eventually(t, func() bool { + select { + case ticker.ch <- time.Now(): + default: + } + return strings.Contains(out.String(), "second event") + }, time.Second, 10*time.Millisecond) + + cancel() + require.NoError(t, <-done) + + mu.Lock() + defer mu.Unlock() + require.GreaterOrEqual(t, len(lastEventIDs), 2) + assert.Empty(t, lastEventIDs[0]) + assert.Equal(t, "cursor-1", lastEventIDs[1]) +} + +func TestRuntimeSurfacesOpenError(t *testing.T) { + wantErr := errors.New("open failed") + open := func(context.Context, string) (*api.ProjectLogStream, error) { + return nil, wantErr + } + err := logfollow.Runtime(context.Background(), cliruntime.Deps{}, &syncBuffer{}, open) + require.ErrorIs(t, err, wantErr) +} + +func writeStreamLog(w http.ResponseWriter, cursor, logID, message string) { + _, _ = w.Write([]byte("id: " + cursor + "\n")) + _, _ = w.Write([]byte("event: log\n")) + _, _ = w.Write([]byte(`data: {"id":"` + logID + `","message":"` + message + `","timestamp":1760000000000,"resource":{"type":"function","id":"22222222-2222-4222-8222-222222222222"}}` + "\n\n")) + if flusher, ok := w.(http.Flusher); ok { + flusher.Flush() + } +} + +type syncBuffer struct { + mu sync.Mutex + buf bytes.Buffer +} + +func (b *syncBuffer) Write(p []byte) (int, error) { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.Write(p) +} + +func (b *syncBuffer) String() string { + b.mu.Lock() + defer b.mu.Unlock() + return b.buf.String() +} + +type fakeTicker struct { + ch chan time.Time +} + +func newFakeTicker() *fakeTicker { + return &fakeTicker{ch: make(chan time.Time, 1)} +} + +func (t *fakeTicker) C() <-chan time.Time { + return t.ch +} + +func (t *fakeTicker) Reset(time.Duration) {} + +func (t *fakeTicker) Stop() {} + +func (t *fakeTicker) tick() { + t.ch <- time.Now() +} diff --git a/internal/output/logs.go b/internal/output/logs.go index 3c8d639..e86599a 100644 --- a/internal/output/logs.go +++ b/internal/output/logs.go @@ -1,9 +1,11 @@ package output import ( + "fmt" "io" "strings" + "github.com/Kong/volcano-cli/internal/api" "github.com/Kong/volcano-cli/internal/apiclient" ) @@ -13,6 +15,12 @@ type SearchLogsFetcher func(cursor string) (*apiclient.LogSearchResponse, error) // PrintSearchLogs renders paginated searched log events from fetch until the // response signals no more pages or the cursor cannot be advanced. func PrintSearchLogs(w io.Writer, fetch SearchLogsFetcher) error { + return PrintSearchLogsSkipping(w, fetch, nil) +} + +// PrintSearchLogsSkipping renders paginated searched log events, excluding IDs +// already present in skip. +func PrintSearchLogsSkipping(w io.Writer, fetch SearchLogsFetcher, skip map[string]struct{}) error { cursor := "" for { resp, err := fetch(cursor) @@ -22,7 +30,7 @@ func PrintSearchLogs(w io.Writer, fetch SearchLogsFetcher) error { if resp == nil { return nil } - LogSearchEvents(w, resp.Data) + LogSearchEvents(w, filterSkippedLogSearchEvents(resp.Data, skip)) if !resp.HasMore || resp.NextCursor == nil { return nil } @@ -32,3 +40,30 @@ func PrintSearchLogs(w io.Writer, fetch SearchLogsFetcher) error { } } } + +func filterSkippedLogSearchEvents(events []apiclient.LogSearchEvent, skip map[string]struct{}) []apiclient.LogSearchEvent { + if len(events) == 0 || len(skip) == 0 { + return events + } + filtered := make([]apiclient.LogSearchEvent, 0, len(events)) + for _, event := range events { + if _, ok := skip[event.Id]; ok && event.Id != "" { + continue + } + filtered = append(filtered, event) + } + return filtered +} + +// PrintLogStreamEvent renders one parsed project log stream event. +func PrintLogStreamEvent(w io.Writer, event *api.ProjectLogStreamEvent) { + if event == nil { + return + } + switch { + case event.Log != nil: + LogSearchEvents(w, []apiclient.LogSearchEvent{*event.Log}) + case event.Warning != "": + fmt.Fprintf(w, "Warning: %s\n", event.Warning) + } +}