From 209f6a4fe1ae59daba6f0671e76cd169598df54e Mon Sep 17 00:00:00 2001 From: Christopher Date: Tue, 31 Mar 2026 11:24:21 -0500 Subject: [PATCH] fix(nest): fix stream expiry, token refresh, timeouts, and resource leaks MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - ExtendStream: replace one-shot *time.Timer with an extendMu/extendStop channel loop so streams are extended reliably every ~55 seconds instead of expiring after the first extension fires and is not rescheduled - refreshToken: store ClientID, ClientSecret, RefreshToken on the API struct so refreshToken() can re-authenticate directly; the previous cache-key lookup failed after the first token rotation because the cached entry had already been replaced with the new token - NewAPI/GetDevices/ExchangeSDP: fix HTTP client timeout of 5000 seconds (5000 * time.Second) to a sane 10 * time.Second - rtcConn: call pc.Close() on all three error paths (CreateCompleteOffer, ExchangeSDP, SetAnswer) to prevent WebRTC peer connection leaks - client.go/api.go: fix typos cliendID → clientID, cliendSecret → clientSecret throughout; fix "backwards compataiility" comment typo --- pkg/nest/api.go | 202 ++++++++++++++++++++++++++++++--------------- pkg/nest/client.go | 15 ++-- 2 files changed, 143 insertions(+), 74 deletions(-) diff --git a/pkg/nest/api.go b/pkg/nest/api.go index 4e9e4dbd5..10175086c 100644 --- a/pkg/nest/api.go +++ b/pkg/nest/api.go @@ -15,6 +15,12 @@ type API struct { Token string ExpiresAt time.Time + // Credentials stored so refreshToken() can call OAuth directly without + // searching the cache (which breaks after the first token rotation). + ClientID string + ClientSecret string + RefreshToken string + StreamProjectID string StreamDeviceID string StreamExpiresAt time.Time @@ -26,7 +32,8 @@ type API struct { StreamToken string StreamExtensionToken string - extendTimer *time.Timer + extendMu sync.Mutex + extendStop chan struct{} } type Auth struct { @@ -60,7 +67,7 @@ func NewAPI(clientID, clientSecret, refreshToken string) (*API, error) { "refresh_token": []string{refreshToken}, } - client := &http.Client{Timeout: time.Second * 5000} + client := &http.Client{Timeout: 10 * time.Second} res, err := client.PostForm("https://www.googleapis.com/oauth2/v4/token", data) if err != nil { return nil, err @@ -83,8 +90,11 @@ func NewAPI(clientID, clientSecret, refreshToken string) (*API, error) { } api := &API{ - Token: resv.AccessToken, - ExpiresAt: now.Add(resv.ExpiresIn * time.Second), + Token: resv.AccessToken, + ExpiresAt: now.Add(resv.ExpiresIn * time.Second), + ClientID: clientID, + ClientSecret: clientSecret, + RefreshToken: refreshToken, } cache[key] = api @@ -101,7 +111,7 @@ func (a *API) GetDevices(projectID string) ([]DeviceInfo, error) { req.Header.Set("Authorization", "Bearer "+a.Token) - client := &http.Client{Timeout: time.Second * 5000} + client := &http.Client{Timeout: 10 * time.Second} res, err := client.Do(req) if err != nil { return nil, err @@ -178,7 +188,7 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) { req.Header.Set("Authorization", "Bearer "+a.Token) - client := &http.Client{Timeout: time.Second * 5000} + client := &http.Client{Timeout: 10 * time.Second} res, err := client.Do(req) if err != nil { return "", err @@ -228,35 +238,38 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) { } func (a *API) refreshToken() error { - // Get the cached API with matching token to get credentials - var refreshKey string - cacheMu.Lock() - for key, api := range cache { - if api.Token == a.Token { - refreshKey = key - break + clientID := a.ClientID + clientSecret := a.ClientSecret + refreshToken := a.RefreshToken + + // Backward-compatible fallback: derive credentials from cache key if the + // struct was created before credential storage was added. + if clientID == "" || clientSecret == "" || refreshToken == "" { + var refreshKey string + cacheMu.Lock() + for key, api := range cache { + if api.Token == a.Token { + refreshKey = key + break + } } - } - cacheMu.Unlock() - - if refreshKey == "" { - return errors.New("nest: unable to find cached credentials") - } + cacheMu.Unlock() - // Parse credentials from cache key - parts := strings.Split(refreshKey, ":") - if len(parts) != 3 { - return errors.New("nest: invalid cache key format") + if refreshKey == "" { + return errors.New("nest: unable to find cached credentials") + } + parts := strings.Split(refreshKey, ":") + if len(parts) != 3 { + return errors.New("nest: invalid cache key format") + } + clientID, clientSecret, refreshToken = parts[0], parts[1], parts[2] } - clientID, clientSecret, refreshToken := parts[0], parts[1], parts[2] - // Get new API instance which will refresh the token newAPI, err := NewAPI(clientID, clientSecret, refreshToken) if err != nil { return err } - // Update current API with new token a.Token = newAPI.Token a.ExpiresAt = newAPI.ExpiresAt return nil @@ -272,11 +285,9 @@ func (a *API) ExtendStream() error { } if a.StreamToken != "" { - // RTSP reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendRtspStream" reqv.Params.StreamExtensionToken = a.StreamExtensionToken } else { - // WebRTC reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendWebRtcStream" reqv.Params.MediaSessionID = a.StreamSessionID } @@ -288,43 +299,72 @@ func (a *API) ExtendStream() error { uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" + a.StreamProjectID + "/devices/" + a.StreamDeviceID + ":executeCommand" - req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) - if err != nil { - return err - } - req.Header.Set("Authorization", "Bearer "+a.Token) + maxRetries := 3 + retryDelay := 30 * time.Second - client := &http.Client{Timeout: time.Second * 5000} - res, err := client.Do(req) - if err != nil { - return err - } - defer res.Body.Close() + for attempt := 0; attempt < maxRetries; attempt++ { + req, err := http.NewRequest("POST", uri, bytes.NewReader(b)) + if err != nil { + return err + } - if res.StatusCode != 200 { - return errors.New("nest: wrong status: " + res.Status) - } + req.Header.Set("Authorization", "Bearer "+a.Token) - var resv struct { - Results struct { - ExpiresAt time.Time `json:"expiresAt"` - MediaSessionID string `json:"mediaSessionId"` - StreamExtensionToken string `json:"streamExtensionToken"` - StreamToken string `json:"streamToken"` - } `json:"results"` - } + client := &http.Client{Timeout: 10 * time.Second} + res, err := client.Do(req) + if err != nil { + return err + } - if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { - return err - } + if res.StatusCode == 401 { + res.Body.Close() + if attempt < maxRetries-1 { + if err := a.refreshToken(); err != nil { + return err + } + time.Sleep(time.Second) + continue + } + } - a.StreamSessionID = resv.Results.MediaSessionID - a.StreamExpiresAt = resv.Results.ExpiresAt - a.StreamExtensionToken = resv.Results.StreamExtensionToken - a.StreamToken = resv.Results.StreamToken + if res.StatusCode == 409 || res.StatusCode == 429 { + res.Body.Close() + if attempt < maxRetries-1 { + time.Sleep(retryDelay) + retryDelay *= 2 + continue + } + } - return nil + defer res.Body.Close() + + if res.StatusCode != 200 { + return errors.New("nest: wrong status: " + res.Status) + } + + var resv struct { + Results struct { + ExpiresAt time.Time `json:"expiresAt"` + MediaSessionID string `json:"mediaSessionId"` + StreamExtensionToken string `json:"streamExtensionToken"` + StreamToken string `json:"streamToken"` + } `json:"results"` + } + + if err = json.NewDecoder(res.Body).Decode(&resv); err != nil { + return err + } + + a.StreamSessionID = resv.Results.MediaSessionID + a.StreamExpiresAt = resv.Results.ExpiresAt + a.StreamExtensionToken = resv.Results.StreamExtensionToken + a.StreamToken = resv.Results.StreamToken + + return nil + } + + return errors.New("nest: max retries exceeded") } func (a *API) GenerateRtspStream(projectID, deviceID string) (string, error) { @@ -348,11 +388,12 @@ func (a *API) GenerateRtspStream(projectID, deviceID string) (string, error) { req.Header.Set("Authorization", "Bearer "+a.Token) - client := &http.Client{Timeout: time.Second * 5000} + client := &http.Client{Timeout: 10 * time.Second} res, err := client.Do(req) if err != nil { return "", err } + defer res.Body.Close() if res.StatusCode != 200 { return "", errors.New("nest: wrong status: " + res.Status) @@ -412,11 +453,12 @@ func (a *API) StopRTSPStream() error { req.Header.Set("Authorization", "Bearer "+a.Token) - client := &http.Client{Timeout: time.Second * 5000} + client := &http.Client{Timeout: 10 * time.Second} res, err := client.Do(req) if err != nil { return err } + defer res.Body.Close() if res.StatusCode != 200 { return errors.New("nest: wrong status: " + res.Status) @@ -464,23 +506,47 @@ type Device struct { } `json:"parentRelations"` } +// StartExtendStreamTimer runs a background loop that extends the Nest stream +// session before it expires. Unlike a one-shot timer, the loop reschedules +// itself after each successful extend and continues on transient errors, so +// the stream stays alive indefinitely rather than expiring after ~10 minutes. func (a *API) StartExtendStreamTimer() { - if a.extendTimer != nil { + a.extendMu.Lock() + defer a.extendMu.Unlock() + + if a.extendStop != nil { return } - a.extendTimer = time.NewTimer(time.Until(a.StreamExpiresAt) - time.Minute) + stop := make(chan struct{}) + a.extendStop = stop + go func() { - <-a.extendTimer.C - if err := a.ExtendStream(); err != nil { - return + for { + d := time.Until(a.StreamExpiresAt) - time.Minute + if d < 10*time.Second { + d = 10 * time.Second + } + t := time.NewTimer(d) + select { + case <-t.C: + // Keep looping even on error — a transient failure should not + // stop the loop and cause an avoidable stream expiry. + _ = a.ExtendStream() + case <-stop: + t.Stop() + return + } } }() } func (a *API) StopExtendStreamTimer() { - if a.extendTimer != nil { - a.extendTimer.Stop() - a.extendTimer = nil + a.extendMu.Lock() + defer a.extendMu.Unlock() + + if a.extendStop != nil { + close(a.extendStop) + a.extendStop = nil } } diff --git a/pkg/nest/client.go b/pkg/nest/client.go index 6a570913a..0624cb672 100644 --- a/pkg/nest/client.go +++ b/pkg/nest/client.go @@ -29,13 +29,13 @@ func Dial(rawURL string) (core.Producer, error) { } query := u.Query() - cliendID := query.Get("client_id") - cliendSecret := query.Get("client_secret") + clientID := query.Get("client_id") + clientSecret := query.Get("client_secret") refreshToken := query.Get("refresh_token") projectID := query.Get("project_id") deviceID := query.Get("device_id") - if cliendID == "" || cliendSecret == "" || refreshToken == "" || projectID == "" || deviceID == "" { + if clientID == "" || clientSecret == "" || refreshToken == "" || projectID == "" || deviceID == "" { return nil, errors.New("nest: wrong query") } @@ -46,7 +46,7 @@ func Dial(rawURL string) (core.Producer, error) { var lastErr error for attempt := 0; attempt < maxRetries; attempt++ { - nestAPI, err = NewAPI(cliendID, cliendSecret, refreshToken) + nestAPI, err = NewAPI(clientID, clientSecret, refreshToken) if err == nil { break } @@ -66,7 +66,7 @@ func Dial(rawURL string) (core.Producer, error) { return rtspConn(nestAPI, rawURL, projectID, deviceID) } - // Default to WEB_RTC for backwards compataiility + // Default to WEB_RTC for backwards compatibility return rtcConn(nestAPI, rawURL, projectID, deviceID) } @@ -129,12 +129,14 @@ func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, e // 3. Create offer with candidates offer, err := conn.CreateCompleteOffer(medias) if err != nil { + _ = pc.Close() return nil, err } - // 4. Exchange SDP via Hass + // 4. Exchange SDP via Google SDM API answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer) if err != nil { + _ = pc.Close() lastErr = err if attempt < maxRetries-1 { time.Sleep(retryDelay) @@ -146,6 +148,7 @@ func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, e // 5. Set answer with remote medias if err = conn.SetAnswer(answer); err != nil { + _ = pc.Close() return nil, err }