Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
202 changes: 134 additions & 68 deletions pkg/nest/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,12 @@ type API struct {
Token string
ExpiresAt time.Time

// Credentials stored so refreshToken() can call OAuth directly without
// searching the cache (which breaks after the first token rotation).
ClientID string
ClientSecret string
RefreshToken string

StreamProjectID string
StreamDeviceID string
StreamExpiresAt time.Time
Expand All @@ -26,7 +32,8 @@ type API struct {
StreamToken string
StreamExtensionToken string

extendTimer *time.Timer
extendMu sync.Mutex
extendStop chan struct{}
}

type Auth struct {
Expand Down Expand Up @@ -60,7 +67,7 @@ func NewAPI(clientID, clientSecret, refreshToken string) (*API, error) {
"refresh_token": []string{refreshToken},
}

client := &http.Client{Timeout: time.Second * 5000}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.PostForm("https://www.googleapis.com/oauth2/v4/token", data)
if err != nil {
return nil, err
Expand All @@ -83,8 +90,11 @@ func NewAPI(clientID, clientSecret, refreshToken string) (*API, error) {
}

api := &API{
Token: resv.AccessToken,
ExpiresAt: now.Add(resv.ExpiresIn * time.Second),
Token: resv.AccessToken,
ExpiresAt: now.Add(resv.ExpiresIn * time.Second),
ClientID: clientID,
ClientSecret: clientSecret,
RefreshToken: refreshToken,
}

cache[key] = api
Expand All @@ -101,7 +111,7 @@ func (a *API) GetDevices(projectID string) ([]DeviceInfo, error) {

req.Header.Set("Authorization", "Bearer "+a.Token)

client := &http.Client{Timeout: time.Second * 5000}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.Do(req)
if err != nil {
return nil, err
Expand Down Expand Up @@ -178,7 +188,7 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) {

req.Header.Set("Authorization", "Bearer "+a.Token)

client := &http.Client{Timeout: time.Second * 5000}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.Do(req)
if err != nil {
return "", err
Expand Down Expand Up @@ -228,35 +238,38 @@ func (a *API) ExchangeSDP(projectID, deviceID, offer string) (string, error) {
}

func (a *API) refreshToken() error {
// Get the cached API with matching token to get credentials
var refreshKey string
cacheMu.Lock()
for key, api := range cache {
if api.Token == a.Token {
refreshKey = key
break
clientID := a.ClientID
clientSecret := a.ClientSecret
refreshToken := a.RefreshToken

// Backward-compatible fallback: derive credentials from cache key if the
// struct was created before credential storage was added.
if clientID == "" || clientSecret == "" || refreshToken == "" {
var refreshKey string
cacheMu.Lock()
for key, api := range cache {
if api.Token == a.Token {
refreshKey = key
break
}
}
}
cacheMu.Unlock()

if refreshKey == "" {
return errors.New("nest: unable to find cached credentials")
}
cacheMu.Unlock()

// Parse credentials from cache key
parts := strings.Split(refreshKey, ":")
if len(parts) != 3 {
return errors.New("nest: invalid cache key format")
if refreshKey == "" {
return errors.New("nest: unable to find cached credentials")
}
parts := strings.Split(refreshKey, ":")
if len(parts) != 3 {
return errors.New("nest: invalid cache key format")
}
clientID, clientSecret, refreshToken = parts[0], parts[1], parts[2]
}
clientID, clientSecret, refreshToken := parts[0], parts[1], parts[2]

// Get new API instance which will refresh the token
newAPI, err := NewAPI(clientID, clientSecret, refreshToken)
if err != nil {
return err
}

// Update current API with new token
a.Token = newAPI.Token
a.ExpiresAt = newAPI.ExpiresAt
return nil
Expand All @@ -272,11 +285,9 @@ func (a *API) ExtendStream() error {
}

if a.StreamToken != "" {
// RTSP
reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendRtspStream"
reqv.Params.StreamExtensionToken = a.StreamExtensionToken
} else {
// WebRTC
reqv.Command = "sdm.devices.commands.CameraLiveStream.ExtendWebRtcStream"
reqv.Params.MediaSessionID = a.StreamSessionID
}
Expand All @@ -288,43 +299,72 @@ func (a *API) ExtendStream() error {

uri := "https://smartdevicemanagement.googleapis.com/v1/enterprises/" +
a.StreamProjectID + "/devices/" + a.StreamDeviceID + ":executeCommand"
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
if err != nil {
return err
}

req.Header.Set("Authorization", "Bearer "+a.Token)
maxRetries := 3
retryDelay := 30 * time.Second

client := &http.Client{Timeout: time.Second * 5000}
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
for attempt := 0; attempt < maxRetries; attempt++ {
req, err := http.NewRequest("POST", uri, bytes.NewReader(b))
if err != nil {
return err
}

if res.StatusCode != 200 {
return errors.New("nest: wrong status: " + res.Status)
}
req.Header.Set("Authorization", "Bearer "+a.Token)

var resv struct {
Results struct {
ExpiresAt time.Time `json:"expiresAt"`
MediaSessionID string `json:"mediaSessionId"`
StreamExtensionToken string `json:"streamExtensionToken"`
StreamToken string `json:"streamToken"`
} `json:"results"`
}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.Do(req)
if err != nil {
return err
}

if err = json.NewDecoder(res.Body).Decode(&resv); err != nil {
return err
}
if res.StatusCode == 401 {
res.Body.Close()
if attempt < maxRetries-1 {
if err := a.refreshToken(); err != nil {
return err
}
time.Sleep(time.Second)
continue
}
}

a.StreamSessionID = resv.Results.MediaSessionID
a.StreamExpiresAt = resv.Results.ExpiresAt
a.StreamExtensionToken = resv.Results.StreamExtensionToken
a.StreamToken = resv.Results.StreamToken
if res.StatusCode == 409 || res.StatusCode == 429 {
res.Body.Close()
if attempt < maxRetries-1 {
time.Sleep(retryDelay)
retryDelay *= 2
continue
}
}

return nil
defer res.Body.Close()

if res.StatusCode != 200 {
return errors.New("nest: wrong status: " + res.Status)
}

var resv struct {
Results struct {
ExpiresAt time.Time `json:"expiresAt"`
MediaSessionID string `json:"mediaSessionId"`
StreamExtensionToken string `json:"streamExtensionToken"`
StreamToken string `json:"streamToken"`
} `json:"results"`
}

if err = json.NewDecoder(res.Body).Decode(&resv); err != nil {
return err
}

a.StreamSessionID = resv.Results.MediaSessionID
a.StreamExpiresAt = resv.Results.ExpiresAt
a.StreamExtensionToken = resv.Results.StreamExtensionToken
a.StreamToken = resv.Results.StreamToken

return nil
}

return errors.New("nest: max retries exceeded")
}

func (a *API) GenerateRtspStream(projectID, deviceID string) (string, error) {
Expand All @@ -348,11 +388,12 @@ func (a *API) GenerateRtspStream(projectID, deviceID string) (string, error) {

req.Header.Set("Authorization", "Bearer "+a.Token)

client := &http.Client{Timeout: time.Second * 5000}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.Do(req)
if err != nil {
return "", err
}
defer res.Body.Close()

if res.StatusCode != 200 {
return "", errors.New("nest: wrong status: " + res.Status)
Expand Down Expand Up @@ -412,11 +453,12 @@ func (a *API) StopRTSPStream() error {

req.Header.Set("Authorization", "Bearer "+a.Token)

client := &http.Client{Timeout: time.Second * 5000}
client := &http.Client{Timeout: 10 * time.Second}
res, err := client.Do(req)
if err != nil {
return err
}
defer res.Body.Close()

if res.StatusCode != 200 {
return errors.New("nest: wrong status: " + res.Status)
Expand Down Expand Up @@ -464,23 +506,47 @@ type Device struct {
} `json:"parentRelations"`
}

// StartExtendStreamTimer runs a background loop that extends the Nest stream
// session before it expires. Unlike a one-shot timer, the loop reschedules
// itself after each successful extend and continues on transient errors, so
// the stream stays alive indefinitely rather than expiring after ~10 minutes.
func (a *API) StartExtendStreamTimer() {
if a.extendTimer != nil {
a.extendMu.Lock()
defer a.extendMu.Unlock()

if a.extendStop != nil {
return
}

a.extendTimer = time.NewTimer(time.Until(a.StreamExpiresAt) - time.Minute)
stop := make(chan struct{})
a.extendStop = stop

go func() {
<-a.extendTimer.C
if err := a.ExtendStream(); err != nil {
return
for {
d := time.Until(a.StreamExpiresAt) - time.Minute
if d < 10*time.Second {
d = 10 * time.Second
}
t := time.NewTimer(d)
select {
case <-t.C:
// Keep looping even on error — a transient failure should not
// stop the loop and cause an avoidable stream expiry.
_ = a.ExtendStream()
case <-stop:
t.Stop()
return
}
}
}()
}

func (a *API) StopExtendStreamTimer() {
if a.extendTimer != nil {
a.extendTimer.Stop()
a.extendTimer = nil
a.extendMu.Lock()
defer a.extendMu.Unlock()

if a.extendStop != nil {
close(a.extendStop)
a.extendStop = nil
}
}
15 changes: 9 additions & 6 deletions pkg/nest/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,13 +29,13 @@ func Dial(rawURL string) (core.Producer, error) {
}

query := u.Query()
cliendID := query.Get("client_id")
cliendSecret := query.Get("client_secret")
clientID := query.Get("client_id")
clientSecret := query.Get("client_secret")
refreshToken := query.Get("refresh_token")
projectID := query.Get("project_id")
deviceID := query.Get("device_id")

if cliendID == "" || cliendSecret == "" || refreshToken == "" || projectID == "" || deviceID == "" {
if clientID == "" || clientSecret == "" || refreshToken == "" || projectID == "" || deviceID == "" {
return nil, errors.New("nest: wrong query")
}

Expand All @@ -46,7 +46,7 @@ func Dial(rawURL string) (core.Producer, error) {
var lastErr error

for attempt := 0; attempt < maxRetries; attempt++ {
nestAPI, err = NewAPI(cliendID, cliendSecret, refreshToken)
nestAPI, err = NewAPI(clientID, clientSecret, refreshToken)
if err == nil {
break
}
Expand All @@ -66,7 +66,7 @@ func Dial(rawURL string) (core.Producer, error) {
return rtspConn(nestAPI, rawURL, projectID, deviceID)
}

// Default to WEB_RTC for backwards compataiility
// Default to WEB_RTC for backwards compatibility
return rtcConn(nestAPI, rawURL, projectID, deviceID)
}

Expand Down Expand Up @@ -129,12 +129,14 @@ func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, e
// 3. Create offer with candidates
offer, err := conn.CreateCompleteOffer(medias)
if err != nil {
_ = pc.Close()
return nil, err
}

// 4. Exchange SDP via Hass
// 4. Exchange SDP via Google SDM API
answer, err := nestAPI.ExchangeSDP(projectID, deviceID, offer)
if err != nil {
_ = pc.Close()
lastErr = err
if attempt < maxRetries-1 {
time.Sleep(retryDelay)
Expand All @@ -146,6 +148,7 @@ func rtcConn(nestAPI *API, rawURL, projectID, deviceID string) (*WebRTCClient, e

// 5. Set answer with remote medias
if err = conn.SetAnswer(answer); err != nil {
_ = pc.Close()
return nil, err
}

Expand Down