Skip to content

Commit 665cf21

Browse files
committed
fixes
1 parent e4ac424 commit 665cf21

5 files changed

Lines changed: 155 additions & 14 deletions

File tree

cmd/sigsentinel/main.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -665,7 +665,7 @@ func startAudioPipeline(ctx context.Context, runtime *Runtime) (*ingest.Session,
665665
Address: doc.Config.Scanner.IP,
666666
RTSPPort: doc.Config.Scanner.RTSPPort,
667667
ReconnectDelay: 2 * time.Second,
668-
MaxReconnectFails: 5,
668+
MaxReconnectFails: 0, // retry indefinitely; avoid hard-failing app on transient scanner/network loss
669669
OnFrame: func(frame ingest.Frame) {
670670
if err := rec.PushPCM(frame.Samples, frame.ReceivedAt); err != nil {
671671
select {

internal/audio/ingest/session.go

Lines changed: 34 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
const (
1717
frameBufferSize = 256
1818
healthySessionDuration = 30 * time.Second
19+
maxReconnectDelay = 30 * time.Second
1920
)
2021

2122
// Frame is one decoded PCM frame from the RTP stream.
@@ -27,11 +28,13 @@ type Frame struct {
2728

2829
// Config defines RTSP/RTP ingest behavior.
2930
type Config struct {
30-
Address string
31-
RTSPPort int
32-
RTSPPath string
33-
RTPReadTimeout time.Duration
34-
ReconnectDelay time.Duration
31+
Address string
32+
RTSPPort int
33+
RTSPPath string
34+
RTPReadTimeout time.Duration
35+
ReconnectDelay time.Duration
36+
// MaxReconnectFails controls consecutive reconnect failures allowed before fatal.
37+
// Values <= 0 disable the failure budget and retry indefinitely.
3538
MaxReconnectFails int
3639
Logger *log.Logger
3740
OnFrame func(Frame)
@@ -61,9 +64,6 @@ func NewSession(parent context.Context, cfg Config) (*Session, error) {
6164
if cfg.ReconnectDelay == 0 {
6265
cfg.ReconnectDelay = 2 * time.Second
6366
}
64-
if cfg.MaxReconnectFails <= 0 {
65-
cfg.MaxReconnectFails = 5
66-
}
6767
ctx, cancel := context.WithCancel(parent)
6868
s := &Session{cfg: cfg, ctx: ctx, cancel: cancel, fatal: make(chan error, 1), frames: make(chan Frame, frameBufferSize)}
6969
s.wg.Add(2)
@@ -106,20 +106,42 @@ func (s *Session) loop() {
106106
fails = 0
107107
}
108108
fails++
109-
s.logf("audio ingest reconnect attempt %d/%d after error: %v", fails, s.cfg.MaxReconnectFails, err)
110-
if fails >= s.cfg.MaxReconnectFails {
109+
delay := reconnectBackoffDelay(s.cfg.ReconnectDelay, fails)
110+
if s.cfg.MaxReconnectFails > 0 {
111+
s.logf("audio ingest reconnect attempt %d/%d in %s after error: %v", fails, s.cfg.MaxReconnectFails, delay, err)
112+
} else {
113+
s.logf("audio ingest reconnect attempt %d/unbounded in %s after error: %v", fails, delay, err)
114+
}
115+
if s.cfg.MaxReconnectFails > 0 && fails >= s.cfg.MaxReconnectFails {
111116
select {
112-
case s.fatal <- fmt.Errorf("audio ingest reconnect budget exceeded: %w", err):
117+
case s.fatal <- fmt.Errorf("audio ingest reconnect budget exceeded after %d consecutive failures: %w", fails, err):
113118
default:
114119
}
115120
return
116121
}
117122
select {
118123
case <-s.ctx.Done():
119124
return
120-
case <-time.After(s.cfg.ReconnectDelay):
125+
case <-time.After(delay):
126+
}
127+
}
128+
}
129+
130+
func reconnectBackoffDelay(base time.Duration, fails int) time.Duration {
131+
if base <= 0 {
132+
base = 2 * time.Second
133+
}
134+
delay := base
135+
for attempt := 1; attempt < fails && delay < maxReconnectDelay; attempt++ {
136+
if delay > maxReconnectDelay/2 {
137+
return maxReconnectDelay
121138
}
139+
delay *= 2
140+
}
141+
if delay > maxReconnectDelay {
142+
return maxReconnectDelay
122143
}
144+
return delay
123145
}
124146

125147
func (s *Session) runOnce() error {

internal/audio/ingest/session_test.go

Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,15 @@ import (
1414
"github.com/stretchr/testify/require"
1515
)
1616

17+
func unavailableRTSPPort(t *testing.T) int {
18+
t.Helper()
19+
ln, err := net.Listen("tcp", "127.0.0.1:0")
20+
require.NoError(t, err)
21+
port := ln.Addr().(*net.TCPAddr).Port
22+
require.NoError(t, ln.Close())
23+
return port
24+
}
25+
1726
func startIngestRTSPMockServer(t *testing.T) (host string, port int, serverRTPPort int) {
1827
t.Helper()
1928
ln, err := net.Listen("tcp", "127.0.0.1:0")
@@ -282,3 +291,62 @@ func TestIngestSessionReceivesFrame(t *testing.T) {
282291
require.FailNow(t, "expected decoded frame")
283292
}
284293
}
294+
295+
func TestReconnectBackoffDelay(t *testing.T) {
296+
t.Parallel()
297+
298+
assert.Equal(t, 2*time.Second, reconnectBackoffDelay(2*time.Second, 1))
299+
assert.Equal(t, 4*time.Second, reconnectBackoffDelay(2*time.Second, 2))
300+
assert.Equal(t, 8*time.Second, reconnectBackoffDelay(2*time.Second, 3))
301+
assert.Equal(t, 16*time.Second, reconnectBackoffDelay(2*time.Second, 4))
302+
assert.Equal(t, 30*time.Second, reconnectBackoffDelay(2*time.Second, 5))
303+
assert.Equal(t, 30*time.Second, reconnectBackoffDelay(2*time.Second, 20))
304+
assert.Equal(t, 2*time.Second, reconnectBackoffDelay(0, 1))
305+
}
306+
307+
func TestIngestSessionReconnectBudgetHandling(t *testing.T) {
308+
t.Parallel()
309+
310+
t.Run("bounded_budget_emits_fatal", func(t *testing.T) {
311+
ctx, cancel := context.WithCancel(t.Context())
312+
defer cancel()
313+
314+
s, err := NewSession(ctx, Config{
315+
Address: "127.0.0.1",
316+
RTSPPort: unavailableRTSPPort(t),
317+
ReconnectDelay: 5 * time.Millisecond,
318+
MaxReconnectFails: 2,
319+
})
320+
require.NoError(t, err)
321+
defer func() { _ = s.Close() }()
322+
323+
select {
324+
case fatalErr := <-s.Fatal():
325+
require.Error(t, fatalErr)
326+
assert.Contains(t, fatalErr.Error(), "audio ingest reconnect budget exceeded")
327+
assert.Contains(t, fatalErr.Error(), "after 2 consecutive failures")
328+
case <-time.After(2 * time.Second):
329+
require.FailNow(t, "expected reconnect budget fatal")
330+
}
331+
})
332+
333+
t.Run("unbounded_budget_keeps_retrying", func(t *testing.T) {
334+
ctx, cancel := context.WithCancel(t.Context())
335+
defer cancel()
336+
337+
s, err := NewSession(ctx, Config{
338+
Address: "127.0.0.1",
339+
RTSPPort: unavailableRTSPPort(t),
340+
ReconnectDelay: 5 * time.Millisecond,
341+
MaxReconnectFails: 0,
342+
})
343+
require.NoError(t, err)
344+
defer func() { _ = s.Close() }()
345+
346+
select {
347+
case fatalErr := <-s.Fatal():
348+
require.FailNowf(t, "unexpected fatal", "received fatal in unbounded mode: %v", fatalErr)
349+
case <-time.After(150 * time.Millisecond):
350+
}
351+
})
352+
}

internal/audio/recording/manager.go

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,9 +134,14 @@ func (m *Manager) UpdateTelemetry(status sds200.RuntimeStatus, at time.Time) err
134134
}
135135
m.resetDetectorLocked()
136136
if active {
137-
if err := m.handleAutoStart(status, active, at); err != nil {
137+
// Frequency splits happen mid-activity; start the follow-on clip immediately
138+
// instead of waiting for debounce + another telemetry update.
139+
m.clearAutoStartPendingLocked()
140+
if err := m.begin(at, status, recordTriggerTelemetry); err != nil {
138141
return err
139142
}
143+
m.resetDetectorLocked()
144+
m.detector.Evaluate(true, at)
140145
}
141146
return nil
142147
}

internal/audio/recording/manager_test.go

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -170,6 +170,52 @@ func TestManagerUpdateTelemetry(t *testing.T) {
170170
assert.Equal(t, "telemetry", snap.Trigger)
171171
})
172172

173+
t.Run("frequency_change_restarts_immediately_even_with_start_debounce", func(t *testing.T) {
174+
t0 := time.Date(2026, 3, 8, 10, 0, 0, 0, time.UTC)
175+
var clips []Metadata
176+
m := NewManager(Config{
177+
OutputDir: filepath.Join(t.TempDir(), "clips"),
178+
HangTime: 10 * time.Second,
179+
StartDebounce: 10 * time.Second,
180+
OnFinalized: func(meta Metadata) error {
181+
clips = append(clips, meta)
182+
return nil
183+
},
184+
})
185+
186+
first := sds200.RuntimeStatus{
187+
Connected: true,
188+
SquelchOpen: true,
189+
Frequency: "155.2200",
190+
System: "County",
191+
Channel: "Ops 1",
192+
}
193+
second := sds200.RuntimeStatus{
194+
Connected: true,
195+
SquelchOpen: true,
196+
Frequency: "460.0000",
197+
System: "Metro",
198+
Channel: "Dispatch 9",
199+
}
200+
201+
// Prime auto-start with debounce using two active updates.
202+
require.NoError(t, m.UpdateTelemetry(first, t0))
203+
require.NoError(t, m.UpdateTelemetry(first, t0.Add(10*time.Second)))
204+
require.NoError(t, m.PushPCM([]int16{1, 2, 3}, t0.Add(11*time.Second)))
205+
206+
// Frequency split should finalize old clip and immediately begin new clip.
207+
require.NoError(t, m.UpdateTelemetry(second, t0.Add(12*time.Second)))
208+
209+
require.Len(t, clips, 1)
210+
assert.Equal(t, "155.2200", clips[0].Frequency)
211+
212+
snap := m.Snapshot()
213+
assert.True(t, snap.Active)
214+
assert.False(t, snap.Manual)
215+
assert.Equal(t, t0.Add(12*time.Second), snap.StartedAt)
216+
assert.Equal(t, "telemetry", snap.Trigger)
217+
})
218+
173219
t.Run("avoid_stops_auto_recording_immediately", func(t *testing.T) {
174220
t0 := time.Date(2026, 3, 8, 10, 0, 0, 0, time.UTC)
175221
var clips []Metadata

0 commit comments

Comments
 (0)