Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
48 changes: 41 additions & 7 deletions internal/rtsp/rtsp.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,13 +99,7 @@ func rtspHandler(rawURL string) (core.Producer, error) {
conn.Backchannel = true
conn.UserAgent = app.UserAgent

if rawQuery != "" {
query := streams.ParseQuery(rawQuery)
conn.Backchannel = query.Get("backchannel") == "1"
conn.Media = query.Get("media")
conn.Timeout = core.Atoi(query.Get("timeout"))
conn.Transport = query.Get("transport")
}
applyClientQuery(conn, rawURL, rawQuery)

if log.Trace().Enabled() {
conn.Listen(func(msg any) {
Expand Down Expand Up @@ -143,6 +137,27 @@ func rtspHandler(rawURL string) (core.Producer, error) {
return conn, nil
}

func applyClientQuery(conn *rtsp.Conn, rawURL, rawQuery string) {
query := url.Values{}

if uri, err := url.Parse(rawURL); err == nil && uri != nil {
for key, values := range uri.Query() {
query[key] = append([]string(nil), values...)
}
}

if extra := streams.ParseQuery(rawQuery); extra != nil {
for key, values := range extra {
query[key] = append([]string(nil), values...)
}
}

conn.Backchannel = query.Get("backchannel") == "1"
conn.Media = query.Get("media")
conn.Timeout = core.Atoi(query.Get("timeout"))
conn.Transport = query.Get("transport")
}

func tcpHandler(conn *rtsp.Conn) {
var name string
var closer func()
Expand Down Expand Up @@ -208,6 +223,8 @@ func tcpHandler(conn *rtsp.Conn) {
conn.PacketSize = uint16(core.Atoi(s))
}

conn.Repack = defaultConsumerRepack(conn.Connection.RemoteAddr, query.Get("repack"))

// param name like ffmpeg style https://ffmpeg.org/ffmpeg-protocols.html
if s := query.Get("log_level"); s != "" {
if lvl, err := zerolog.ParseLevel(s); err == nil {
Expand Down Expand Up @@ -287,6 +304,23 @@ func tcpHandler(conn *rtsp.Conn) {
_ = conn.Close()
}

func defaultConsumerRepack(remoteAddr, raw string) bool {
switch strings.ToLower(raw) {
case "":
return isLoopback(remoteAddr)
case "1", "true", "yes", "on":
return true
case "0", "false", "no", "off":
return false
default:
return isLoopback(remoteAddr)
}
}

func isLoopback(remoteAddr string) bool {
return strings.HasPrefix(remoteAddr, "127.") || strings.HasPrefix(remoteAddr, "[::1]") || strings.HasPrefix(remoteAddr, "localhost:")
}

func ParseQuery(query map[string][]string) []*core.Media {
if v := query["mp4"]; v != nil {
return []*core.Media{
Expand Down
55 changes: 55 additions & 0 deletions internal/rtsp/rtsp_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package rtsp

import (
"testing"

pkgrtsp "github.com/AlexxIT/go2rtc/pkg/rtsp"
"github.com/stretchr/testify/require"
)

func TestApplyClientQueryUsesURLQuery(t *testing.T) {
conn := &pkgrtsp.Conn{}

applyClientQuery(conn, "rtsp://127.0.0.1:8554/test?timeout=20&transport=tcp&media=video", "")

require.Equal(t, 20, conn.Timeout)
require.Equal(t, "tcp", conn.Transport)
require.Equal(t, "video", conn.Media)
}

func TestApplyClientQueryRawQueryOverridesURLQuery(t *testing.T) {
conn := &pkgrtsp.Conn{}

applyClientQuery(conn, "rtsp://127.0.0.1:8554/test?timeout=20&transport=tcp", "timeout=45#transport=udp#backchannel=1")

require.Equal(t, 45, conn.Timeout)
require.Equal(t, "udp", conn.Transport)
require.True(t, conn.Backchannel)
}

func TestApplyClientQueryAllowsEmptyURL(t *testing.T) {
conn := &pkgrtsp.Conn{}

require.NotPanics(t, func() {
applyClientQuery(conn, "", "")
})

require.False(t, conn.Backchannel)
require.Zero(t, conn.Timeout)
require.Empty(t, conn.Transport)
require.Empty(t, conn.Media)
}

func TestDefaultConsumerRepackLoopbackDefaultsOn(t *testing.T) {
require.True(t, defaultConsumerRepack("127.0.0.1:8554", ""))
require.True(t, defaultConsumerRepack("[::1]:8554", ""))
}

func TestDefaultConsumerRepackRemoteDefaultsOff(t *testing.T) {
require.False(t, defaultConsumerRepack("192.168.2.3:46980", ""))
}

func TestDefaultConsumerRepackAllowsOverride(t *testing.T) {
require.False(t, defaultConsumerRepack("127.0.0.1:8554", "off"))
require.True(t, defaultConsumerRepack("192.168.2.3:46980", "on"))
}
15 changes: 15 additions & 0 deletions internal/streams/stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,21 @@ producers:
s.mu.Unlock()
}

func (s *Stream) stopAll() {
s.mu.Lock()
consumers := append([]core.Consumer(nil), s.consumers...)
producers := append([]*Producer(nil), s.producers...)
s.consumers = nil
s.mu.Unlock()

for _, consumer := range consumers {
_ = consumer.Stop()
}
for _, producer := range producers {
producer.stop()
}
}

func (s *Stream) MarshalJSON() ([]byte, error) {
var info = struct {
Producers []*Producer `json:"producers"`
Expand Down
19 changes: 13 additions & 6 deletions internal/streams/stream_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,16 +9,20 @@ import (
)

func TestRecursion(t *testing.T) {
streams = map[string]*Stream{}
HandleFunc("rtsp", func(url string) (core.Producer, error) { return nil, nil })
HandleFunc("test", func(url string) (core.Producer, error) { return nil, nil })

// create stream with some source
stream1, err := New("from_yaml", "does_not_matter")
require.NoError(t, err)
stream1, err := New("from_yaml", "test://does_not_matter")
require.Nil(t, err)
require.Len(t, streams, 1)

// ask another unnamed stream that links go2rtc
query, err := url.ParseQuery("src=rtsp://localhost:8554/from_yaml?video")
require.NoError(t, err)
require.Nil(t, err)
stream2, err := GetOrPatch(query)
require.NoError(t, err)
require.Nil(t, err)

// check stream is same
require.Equal(t, stream1, stream2)
Expand All @@ -28,14 +32,17 @@ func TestRecursion(t *testing.T) {
}

func TestTempate(t *testing.T) {
streams = map[string]*Stream{}

HandleFunc("rtsp", func(url string) (core.Producer, error) { return nil, nil }) // bypass HasProducer
HandleFunc("ffmpeg", func(url string) (core.Producer, error) { return nil, nil })

// config from yaml
stream1, err := New("camera.from_hass", "ffmpeg:{input}#video=copy")
require.NoError(t, err)
require.Nil(t, err)
// request from hass
stream2, err := Patch("camera.from_hass", "rtsp://example.com")
require.NoError(t, err)
require.Nil(t, err)

require.Equal(t, stream1, stream2)
require.Equal(t, "ffmpeg:rtsp://example.com#video=copy", stream1.producers[0].url)
Expand Down
15 changes: 15 additions & 0 deletions internal/streams/streams.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,3 +174,18 @@ func GetAllSources() map[string][]string {
streamsMu.Unlock()
return sources
}

func StopAll() {
streamsMu.Lock()
unique := make(map[*Stream]struct{}, len(streams))
for _, stream := range streams {
if stream != nil {
unique[stream] = struct{}{}
}
}
streamsMu.Unlock()

for stream := range unique {
stream.stopAll()
}
}
4 changes: 3 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,5 +121,7 @@ func main() {
}
}

shell.RunUntilSignal()
sig := shell.WaitSignal()
println("exit with signal:", sig.String())
streams.StopAll()
}
45 changes: 21 additions & 24 deletions pkg/core/media.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,33 +177,30 @@ func UnmarshalMedia(md *sdp.MediaDescription) *Media {
}

func ParseQuery(query map[string][]string) (medias []*Media) {
// set media candidates from query list
for key, values := range query {
switch key {
case KindVideo, KindAudio:
for _, value := range values {
media := &Media{Kind: key, Direction: DirectionSendonly}

for _, name := range strings.Split(value, ",") {
name = strings.ToUpper(name)

// check aliases
switch name {
case "", "COPY":
name = CodecAny
case "MJPEG":
name = CodecJPEG
case "AAC":
name = CodecAAC
case "MP3":
name = CodecMP3
}

media.Codecs = append(media.Codecs, &Codec{Name: name})
for _, key := range []string{KindVideo, KindAudio} {
values := query[key]
for _, value := range values {
media := &Media{Kind: key, Direction: DirectionSendonly}

for _, name := range strings.Split(value, ",") {
name = strings.ToUpper(name)

// check aliases
switch name {
case "", "COPY":
name = CodecAny
case "MJPEG":
name = CodecJPEG
case "AAC":
name = CodecAAC
case "MP3":
name = CodecMP3
}

medias = append(medias, media)
media.Codecs = append(media.Codecs, &Codec{Name: name})
}

medias = append(medias, media)
}
}

Expand Down
11 changes: 11 additions & 0 deletions pkg/core/media_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,17 @@ func TestParseQuery(t *testing.T) {
}
}

func TestParseQueryMediaOrderIsStable(t *testing.T) {
query := url.Values{
"audio": {""},
"video": {""},
}
medias := ParseQuery(query)
require.Len(t, medias, 2)
require.Equal(t, KindVideo, medias[0].Kind)
require.Equal(t, KindAudio, medias[1].Kind)
}

func TestClone(t *testing.T) {
media1 := &Media{
Kind: KindVideo,
Expand Down
29 changes: 27 additions & 2 deletions pkg/rtsp/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ type Conn struct {
Media string
OnClose func() error
PacketSize uint16
Repack bool
SessionName string
Timeout int
Transport string // custom transport support, ex. RTSP over WebSocket
Expand Down Expand Up @@ -107,7 +108,7 @@ func (c *Conn) Handle() (err error) {

if c.Timeout == 0 {
// polling frames from remote RTSP Server (ex Camera)
timeout = time.Second * 5
timeout = defaultActiveProducerTimeout(c.URL)

if len(c.Receivers) == 0 || c.Transport == "udp" {
// if we only send audio to camera
Expand All @@ -121,7 +122,7 @@ func (c *Conn) Handle() (err error) {
case core.ModePassiveProducer:
// polling frames from remote RTSP Client (ex FFmpeg)
if c.Timeout == 0 {
timeout = time.Second * 15
timeout = defaultPassiveProducerTimeout(c.RemoteAddr)
} else {
timeout = time.Second * time.Duration(c.Timeout)
}
Expand Down Expand Up @@ -151,6 +152,30 @@ func (c *Conn) Handle() (err error) {
return
}

func defaultActiveProducerTimeout(uri *url.URL) time.Duration {
if uri != nil {
host := uri.Hostname()
switch host {
case "127.0.0.1", "::1", "localhost":
return 20 * time.Second
}
}

return 5 * time.Second
}

func defaultPassiveProducerTimeout(remoteAddr string) time.Duration {
host, _, err := net.SplitHostPort(remoteAddr)
if err == nil {
switch host {
case "127.0.0.1", "::1", "localhost":
return 60 * time.Second
}
}

return 15 * time.Second
}

func (c *Conn) handleKeepalive(ctx context.Context, d time.Duration) {
ticker := time.NewTicker(d)
for {
Expand Down
23 changes: 23 additions & 0 deletions pkg/rtsp/conn_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package rtsp

import (
"net/url"
"testing"
"time"

"github.com/stretchr/testify/require"
)

func TestDefaultActiveProducerTimeout(t *testing.T) {
require.Equal(t, 20*time.Second, defaultActiveProducerTimeout(&url.URL{Host: "127.0.0.1:8554"}))
require.Equal(t, 20*time.Second, defaultActiveProducerTimeout(&url.URL{Host: "localhost:8554"}))
require.Equal(t, 5*time.Second, defaultActiveProducerTimeout(&url.URL{Host: "192.168.2.238:554"}))
require.Equal(t, 5*time.Second, defaultActiveProducerTimeout(nil))
}

func TestDefaultPassiveProducerTimeout(t *testing.T) {
require.Equal(t, 60*time.Second, defaultPassiveProducerTimeout("127.0.0.1:8554"))
require.Equal(t, 60*time.Second, defaultPassiveProducerTimeout("[::1]:8554"))
require.Equal(t, 15*time.Second, defaultPassiveProducerTimeout("192.168.2.238:554"))
require.Equal(t, 15*time.Second, defaultPassiveProducerTimeout("not-an-addr"))
}
Loading