Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 159 additions & 0 deletions payload_format.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,159 @@
// SPDX-FileCopyrightText: 2026 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtp

// MediaFormat describes the bitstream or sample representation passed to a payload format.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure how much is applicable here, but I think whenever possible the RFC should be referenced for naming and rational behind particular concepts. Even if its just a comment with the RFC # + section.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is mostly application logic but I'll try to reference all my reasons, and specs behind every interface.

type MediaFormat string

// PacketizeContext contains the RTP state and negotiated parameters available
// to a payload-format packetizer.
type PacketizeContext struct {
MTU int
PayloadType uint8
SSRC uint32
Timestamp uint32
Sequencer Sequencer
Params any
}

// MediaSample is the media input passed to a payload-format packetizer.
type MediaSample struct {
Payload []byte
Duration uint32
Format MediaFormat
Metadata any
}

// ExtensionWrite applies a payload-format-owned RTP header extension.
type ExtensionWrite interface {
Apply(*Header) error
}

// PayloadFragment is an RTP payload fragment plus the RTP header semantics
// owned by the payload format.
type PayloadFragment struct {
Payload []byte
Marker bool
TimestampOffset uint32
Extensions []ExtensionWrite
MutateHeader func(*Header) error
}

// PayloadFormatPacketizer packetizes media samples into RTP payload fragments.
type PayloadFormatPacketizer interface {
Packetize(ctx PacketizeContext, sample MediaSample, emit func(PayloadFragment) error) error
Flush(ctx PacketizeContext, emit func(PayloadFragment) error) error
Reset()
}

// PacketView exposes the RTP header and payload to payload-format depacketizers.
type PacketView struct {
Header *Header
Payload []byte
}

// PacketInfo describes payload-format sample boundaries and parsed packet metadata.
type PacketInfo struct {
StartsSample bool
EndsSample bool
KeyFrame bool
Metadata any
Comment thread
jwetzell marked this conversation as resolved.
}

// PayloadFormatDepacketizer inspects RTP packets and appends their media bytes
// to a sample being assembled.
type PayloadFormatDepacketizer interface {
Inspect(packet PacketView) (PacketInfo, error)
AppendToSample(dst []byte, packet PacketView) ([]byte, error)
Reset()
}

// LegacyPayloaderAdapter adapts a legacy Payloader to PayloadFormatPacketizer.
type LegacyPayloaderAdapter struct {
Payloader Payloader
}

// Packetize adapts the legacy Payloader Payload method to PayloadFormatPacketizer.
func (a LegacyPayloaderAdapter) Packetize(
ctx PacketizeContext,
sample MediaSample,
emit func(PayloadFragment) error,
) error {
payloads := a.Payloader.Payload(legacyPayloaderMTU(ctx.MTU), sample.Payload)
for i, payload := range payloads {
if err := emit(PayloadFragment{
Payload: payload,
Marker: i == len(payloads)-1,
}); err != nil {
return err
}
}

return nil
}

// Flush adapts legacy payloaders, which do not buffer pending payload fragments.
func (a LegacyPayloaderAdapter) Flush(_ PacketizeContext, _ func(PayloadFragment) error) error {
return nil
}

// Reset adapts legacy payloaders, which do not expose reset behavior.
func (a LegacyPayloaderAdapter) Reset() {}

// LegacyDepacketizerAdapter adapts a legacy Depacketizer to PayloadFormatDepacketizer.
type LegacyDepacketizerAdapter struct {
Depacketizer Depacketizer
}

// Inspect adapts the legacy partition boundary methods to PayloadFormatDepacketizer.
func (a LegacyDepacketizerAdapter) Inspect(packet PacketView) (PacketInfo, error) {
marker := false
if packet.Header != nil {
marker = packet.Header.Marker
}

return PacketInfo{
StartsSample: a.Depacketizer.IsPartitionHead(packet.Payload),
EndsSample: a.Depacketizer.IsPartitionTail(marker, packet.Payload),
}, nil
}

// AppendToSample adapts the legacy Unmarshal method to PayloadFormatDepacketizer.
func (a LegacyDepacketizerAdapter) AppendToSample(dst []byte, packet PacketView) ([]byte, error) {
media, err := a.Depacketizer.Unmarshal(packet.Payload)
if err != nil {
return dst, err
}

return append(dst, media...), nil
}

// Reset resets the wrapped depacketizer when it exposes reset behavior.
func (a LegacyDepacketizerAdapter) Reset() {
if resetter, ok := a.Depacketizer.(interface{ Reset() }); ok {
resetter.Reset()
}
}

func legacyPayloaderMTU(mtu int) uint16 {
const (
baseRTPHeaderSize = csrcOffset
maxUint16 = 1<<16 - 1
)

payloadMTU := mtu - baseRTPHeaderSize
if payloadMTU <= 0 {
return 0
}
if payloadMTU > maxUint16 {
return maxUint16
}

return uint16(payloadMTU) //nolint:gosec // payloadMTU is clamped to uint16 above.
}

var (
_ PayloadFormatPacketizer = LegacyPayloaderAdapter{}
_ PayloadFormatDepacketizer = LegacyDepacketizerAdapter{}
)
101 changes: 101 additions & 0 deletions payload_format_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
// SPDX-FileCopyrightText: 2026 The Pion community <https://pion.ly>
// SPDX-License-Identifier: MIT

package rtp

import (
"errors"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

type testPayloader struct {
mtu uint16
payload []byte
}

func (p *testPayloader) Payload(mtu uint16, payload []byte) [][]byte {
p.mtu = mtu
p.payload = payload

return [][]byte{
{0x01},
{0x02},
}
}

func TestLegacyPayloaderAdapter(t *testing.T) {
payloader := &testPayloader{}
adapter := LegacyPayloaderAdapter{Payloader: payloader}
sample := MediaSample{Payload: []byte{0xAA, 0xBB}}

var fragments []PayloadFragment
err := adapter.Packetize(PacketizeContext{MTU: 112}, sample, func(fragment PayloadFragment) error {
fragments = append(fragments, fragment)

return nil
})

require.NoError(t, err)
assert.Equal(t, uint16(100), payloader.mtu)
assert.Equal(t, sample.Payload, payloader.payload)
require.Len(t, fragments, 2)
assert.Equal(t, []byte{0x01}, fragments[0].Payload)
assert.False(t, fragments[0].Marker)
assert.Equal(t, []byte{0x02}, fragments[1].Payload)
assert.True(t, fragments[1].Marker)
}

func TestLegacyPayloaderAdapterEmitError(t *testing.T) {
expectedErr := errors.New("emit failed") // nolint:err113
adapter := LegacyPayloaderAdapter{Payloader: &testPayloader{}}

err := adapter.Packetize(PacketizeContext{MTU: 112}, MediaSample{}, func(PayloadFragment) error {
return expectedErr
})

assert.ErrorIs(t, err, expectedErr)
}

type testDepacketizer struct {
reset bool
}

func (d *testDepacketizer) Unmarshal(packet []byte) ([]byte, error) {
return append([]byte{0x00}, packet...), nil
}

func (d *testDepacketizer) IsPartitionHead(payload []byte) bool {
return len(payload) > 0 && payload[0] == 0x01
}

func (d *testDepacketizer) IsPartitionTail(marker bool, _ []byte) bool {
return marker
}

func (d *testDepacketizer) Reset() {
d.reset = true
}

func TestLegacyDepacketizerAdapter(t *testing.T) {
depacketizer := &testDepacketizer{}
adapter := LegacyDepacketizerAdapter{Depacketizer: depacketizer}
packet := PacketView{
Header: &Header{Marker: true},
Payload: []byte{0x01, 0x02},
}

info, err := adapter.Inspect(packet)
require.NoError(t, err)
assert.True(t, info.StartsSample)
assert.True(t, info.EndsSample)

sample, err := adapter.AppendToSample([]byte{0xFF}, packet)
require.NoError(t, err)
assert.Equal(t, []byte{0xFF, 0x00, 0x01, 0x02}, sample)

adapter.Reset()
assert.True(t, depacketizer.reset)
}
Loading