Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 22 additions & 7 deletions internal/api/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,8 @@ const (
)

type options struct {
httpClient apiclient.HttpRequestDoer
httpClient apiclient.HttpRequestDoer
streamHTTPClient apiclient.HttpRequestDoer
}

// Option configures a cloud API client.
Expand All @@ -32,12 +33,16 @@ type Option func(*options)
func WithHTTPClient(httpClient apiclient.HttpRequestDoer) Option {
return func(opts *options) {
opts.httpClient = httpClient
opts.streamHTTPClient = httpClient
}
}

// Client is a thin wrapper around the generated OpenAPI client.
type Client struct {
client *apiclient.ClientWithResponses
client *apiclient.ClientWithResponses
baseURL string
token string
streamHTTPClient apiclient.HttpRequestDoer
}

// NewClient constructs a generated client with auth and error-normalization helpers.
Expand All @@ -54,22 +59,32 @@ func NewClient(apiURL, token string, opts ...Option) (*Client, error) {
return nil, errors.New("api url must use http:// or https:// scheme")
}

cfg := options{
httpClient: &http.Client{Timeout: defaultTimeout},
}
cfg := options{}
for _, opt := range opts {
opt(&cfg)
}
if cfg.httpClient == nil {
cfg.httpClient = &http.Client{Timeout: defaultTimeout}
}
if cfg.streamHTTPClient == nil {
cfg.streamHTTPClient = &http.Client{}
}

baseURL := generatedClientBaseURL(parsed)
clientOpts := []apiclient.ClientOption{
apiclient.WithHTTPClient(cfg.httpClient),
apiclient.WithRequestEditorFn(authorizationEditor(token)),
}
generated, err := apiclient.NewClientWithResponses(generatedClientBaseURL(parsed), clientOpts...)
generated, err := apiclient.NewClientWithResponses(baseURL, clientOpts...)
if err != nil {
return nil, fmt.Errorf("failed to create api client: %w", err)
}
return &Client{client: generated}, nil
return &Client{
client: generated,
baseURL: baseURL,
token: token,
streamHTTPClient: cfg.streamHTTPClient,
}, nil
}

func generatedClientBaseURL(parsed *url.URL) string {
Expand Down
22 changes: 22 additions & 0 deletions internal/api/frontends.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,17 @@ func (c *Client) GetFrontendLogs(ctx context.Context, projectID, frontendID uuid
return c.searchProjectLogs(ctx, projectID, body)
}

// StreamFrontendLogs opens a runtime log stream for a frontend.
func (c *Client) StreamFrontendLogs(ctx context.Context, projectID, frontendID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) {
body := logSearchRequest{
Resource: logResource(logResourceTypeFrontend, frontendID),
}
if limit > 0 {
body.Limit = &limit
}
return c.streamProjectLogs(ctx, projectID, body, lastEventID)
}

// GetFrontendDeploymentLogs returns one build log search page for a frontend deployment.
func (c *Client) GetFrontendDeploymentLogs(ctx context.Context, projectID, frontendID, deploymentID uuid.UUID, limit int, cursor string) (*apiclient.LogSearchResponse, error) {
body := logSearchRequest{
Expand All @@ -145,6 +156,17 @@ func (c *Client) GetFrontendDeploymentLogs(ctx context.Context, projectID, front
return c.searchProjectLogs(ctx, projectID, body)
}

// StreamFrontendDeploymentLogs opens a build log stream for a frontend deployment.
func (c *Client) StreamFrontendDeploymentLogs(ctx context.Context, projectID, frontendID, deploymentID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) {
body := logSearchRequest{
Resource: logDeploymentResource(logResourceTypeFrontend, frontendID, deploymentID),
}
if limit > 0 {
body.Limit = &limit
}
return c.streamProjectLogs(ctx, projectID, body, lastEventID)
}

// CreateFrontendCustomDomain attaches a BYOC custom domain to a frontend.
func (c *Client) CreateFrontendCustomDomain(ctx context.Context, projectID, frontendID uuid.UUID, input FrontendCustomDomainInput) (*apiclient.FrontendCustomDomainResponse, error) {
body := apiclient.CreateFrontendCustomDomainJSONRequestBody{
Expand Down
22 changes: 22 additions & 0 deletions internal/api/functions.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,17 @@ func (c *Client) GetFunctionLogs(ctx context.Context, projectID, functionID uuid
return c.searchProjectLogs(ctx, projectID, body)
}

// StreamFunctionLogs opens a runtime log stream for a function.
func (c *Client) StreamFunctionLogs(ctx context.Context, projectID, functionID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) {
body := logSearchRequest{
Resource: logResource(logResourceTypeFunction, functionID),
}
if limit > 0 {
body.Limit = &limit
}
return c.streamProjectLogs(ctx, projectID, body, lastEventID)
}

func buildFunctionDeployMultipart(fn FunctionDeployInput) (*bytes.Buffer, string, error) {
body := &bytes.Buffer{}
writer := multipart.NewWriter(body)
Expand Down Expand Up @@ -309,3 +320,14 @@ func (c *Client) GetFunctionDeploymentLogs(ctx context.Context, projectID, funct
}
return c.searchProjectLogs(ctx, projectID, body)
}

// StreamFunctionDeploymentLogs opens a build log stream for a function deployment.
func (c *Client) StreamFunctionDeploymentLogs(ctx context.Context, projectID, functionID, deploymentID uuid.UUID, limit int, lastEventID string) (*ProjectLogStream, error) {
body := logSearchRequest{
Resource: logDeploymentResource(logResourceTypeFunction, functionID, deploymentID),
}
if limit > 0 {
body.Limit = &limit
}
return c.streamProjectLogs(ctx, projectID, body, lastEventID)
}
180 changes: 180 additions & 0 deletions internal/api/log_stream.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
package api

import (
"bufio"
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"

"github.com/google/uuid"

"github.com/Kong/volcano-cli/internal/apiclient"
)

const logStreamScannerMaxTokenSize = 1024 * 1024

// ProjectLogStream reads project log Server-Sent Events.
type ProjectLogStream struct {
body io.ReadCloser
scanner *bufio.Scanner
}

// ProjectLogStreamEvent is one parsed project log stream event.
type ProjectLogStreamEvent struct {
ID string
Type string
Data string
Log *apiclient.LogSearchEvent
Warning string
}

// Close closes the underlying response body.
func (s *ProjectLogStream) Close() error {
if s == nil || s.body == nil {
return nil
}
return s.body.Close()
}

// Next returns the next non-comment SSE event.
func (s *ProjectLogStream) Next() (*ProjectLogStreamEvent, error) {
if s == nil || s.scanner == nil {
return nil, io.EOF
}
for {
event, err := s.nextRaw()
if err != nil {
return nil, err
}
if event == nil {
continue
}
return parseProjectLogStreamEvent(event)
}
}

func (s *ProjectLogStream) nextRaw() (*ProjectLogStreamEvent, error) {
var event ProjectLogStreamEvent
var data []string
hasField := false
for s.scanner.Scan() {
line := strings.TrimSuffix(s.scanner.Text(), "\r")
if line == "" {
if !hasField {
continue
}
event.Data = strings.Join(data, "\n")
if event.Type == "" {
event.Type = "message"
}
return &event, nil
}
if strings.HasPrefix(line, ":") {
continue
}
field, value, ok := strings.Cut(line, ":")
if !ok {
continue
}
hasField = true
value = strings.TrimPrefix(value, " ")
switch field {
case "id":
event.ID = value
case "event":
event.Type = value
case "data":
data = append(data, value)
}
}
if err := s.scanner.Err(); err != nil {
return nil, err
}
if hasField {
event.Data = strings.Join(data, "\n")
if event.Type == "" {
event.Type = "message"
}
return &event, nil
}
return nil, io.EOF
}

func parseProjectLogStreamEvent(event *ProjectLogStreamEvent) (*ProjectLogStreamEvent, error) {
switch event.Type {
case "log":
var logEvent apiclient.LogSearchEvent
if err := json.Unmarshal([]byte(event.Data), &logEvent); err != nil {
return nil, fmt.Errorf("failed to decode log stream event: %w", err)
}
event.Log = &logEvent
case "warning":
var warning struct {
Error string `json:"error"`
}
if err := json.Unmarshal([]byte(event.Data), &warning); err != nil {
return nil, fmt.Errorf("failed to decode log stream warning: %w", err)
}
event.Warning = strings.TrimSpace(warning.Error)
}
return event, nil
}

func (c *Client) streamProjectLogs(ctx context.Context, projectID uuid.UUID, body logSearchRequest, lastEventID string) (*ProjectLogStream, error) {
normalizeLogSearchRequest(&body)
body.Cursor = nil
payload, err := json.Marshal(body)
if err != nil {
return nil, fmt.Errorf("failed to marshal log stream request: %w", err)
}

streamURL, err := c.projectLogsStreamURL(projectID)
if err != nil {
return nil, err
}
req, err := http.NewRequestWithContext(ctx, http.MethodPost, streamURL, bytes.NewReader(payload))
if err != nil {
return nil, err
}
req.Header.Set("Accept", "text/event-stream")
req.Header.Set("Content-Type", "application/json")
if c.token != "" {
req.Header.Set("Authorization", "Bearer "+c.token)
}
if lastEventID = strings.TrimSpace(lastEventID); lastEventID != "" {
req.Header.Set("Last-Event-ID", lastEventID)
}

resp, err := c.streamHTTPClient.Do(req)
if err != nil {
return nil, err
}
if resp.StatusCode < 200 || resp.StatusCode >= 300 {
defer func() {
_ = resp.Body.Close()
}()
respBody, _ := io.ReadAll(resp.Body)
return nil, apiError(resp.StatusCode, respBody)
}

scanner := bufio.NewScanner(resp.Body)
scanner.Buffer(make([]byte, 0, 64*1024), logStreamScannerMaxTokenSize)
return &ProjectLogStream{body: resp.Body, scanner: scanner}, nil
}

func (c *Client) projectLogsStreamURL(projectID uuid.UUID) (string, error) {
base, err := url.Parse(c.baseURL)
if err != nil {
return "", fmt.Errorf("failed to parse api base url: %w", err)
}
base.Path = strings.TrimRight(base.Path, "/") + "/projects/" + projectID.String() + "/logs/stream"
base.RawPath = ""
base.RawQuery = ""
base.Fragment = ""
return base.String(), nil
}
Loading
Loading