Skip to content

Commit 3483485

Browse files
adding more rpc / audio latency stress tests
1 parent 43b7301 commit 3483485

2 files changed

Lines changed: 877 additions & 1 deletion

File tree

Lines changed: 373 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,373 @@
1+
/*
2+
* Copyright 2026 LiveKit
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
#include "../common/test_common.h"
18+
#include <atomic>
19+
#include <chrono>
20+
#include <cmath>
21+
#include <condition_variable>
22+
#include <cstdint>
23+
#include <mutex>
24+
#include <set>
25+
#include <thread>
26+
#include <vector>
27+
28+
namespace livekit {
29+
namespace test {
30+
31+
using namespace std::chrono_literals;
32+
33+
namespace {
34+
35+
constexpr int kVideoWidth = 640;
36+
constexpr int kVideoHeight = 360;
37+
constexpr int kAudioSampleRate = 48000;
38+
constexpr int kAudioChannels = 1;
39+
constexpr int kAudioFrameMs = 10;
40+
constexpr int kSamplesPerChannel = kAudioSampleRate * kAudioFrameMs / 1000;
41+
42+
struct MediaSubscriptionState {
43+
std::mutex mutex;
44+
std::condition_variable cv;
45+
std::set<std::string> subscribed_track_names;
46+
int audio_tracks = 0;
47+
int video_tracks = 0;
48+
};
49+
50+
class MediaTrackCollectorDelegate : public RoomDelegate {
51+
public:
52+
explicit MediaTrackCollectorDelegate(MediaSubscriptionState &state)
53+
: state_(state) {}
54+
55+
void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override {
56+
std::lock_guard<std::mutex> lock(state_.mutex);
57+
std::string name = "<unknown>";
58+
std::string sid = "<unknown>";
59+
if (event.track) {
60+
if (event.track->kind() == TrackKind::KIND_AUDIO) {
61+
state_.audio_tracks++;
62+
} else if (event.track->kind() == TrackKind::KIND_VIDEO) {
63+
state_.video_tracks++;
64+
}
65+
sid = event.track->sid();
66+
}
67+
if (event.publication) {
68+
name = event.publication->name();
69+
state_.subscribed_track_names.insert(name);
70+
}
71+
std::cerr << "[MediaMultiStream] onTrackSubscribed name=" << name
72+
<< " sid=" << sid << " audio_count=" << state_.audio_tracks
73+
<< " video_count=" << state_.video_tracks << std::endl;
74+
state_.cv.notify_all();
75+
}
76+
77+
private:
78+
MediaSubscriptionState &state_;
79+
};
80+
81+
void fillWebcamLikeFrame(VideoFrame &frame, std::uint64_t frame_index) {
82+
// ARGB layout: [A, R, G, B]
83+
std::uint8_t *data = frame.data();
84+
const std::size_t size = frame.dataSize();
85+
const std::uint8_t blue = static_cast<std::uint8_t>((frame_index * 3) % 255);
86+
for (std::size_t i = 0; i < size; i += 4) {
87+
data[i + 0] = 255; // A
88+
data[i + 1] = 0; // R
89+
data[i + 2] = 170; // G
90+
data[i + 3] = blue;
91+
}
92+
}
93+
94+
void fillRedFrameWithMetadata(VideoFrame &frame, std::uint64_t frame_index,
95+
std::uint64_t timestamp_us) {
96+
// ARGB layout: [A, R, G, B]
97+
std::uint8_t *data = frame.data();
98+
const std::size_t size = frame.dataSize();
99+
for (std::size_t i = 0; i < size; i += 4) {
100+
data[i + 0] = 255; // A
101+
data[i + 1] = 255; // R
102+
data[i + 2] = 0; // G
103+
data[i + 3] = 0; // B
104+
}
105+
106+
// Encode frame counter + timestamp into first 16 pixels for easy debugging.
107+
std::uint8_t meta[16];
108+
for (int i = 0; i < 8; ++i) {
109+
meta[i] = static_cast<std::uint8_t>((frame_index >> (i * 8)) & 0xFF);
110+
meta[8 + i] = static_cast<std::uint8_t>((timestamp_us >> (i * 8)) & 0xFF);
111+
}
112+
for (int i = 0; i < 16; ++i) {
113+
const std::size_t px = static_cast<std::size_t>(i) * 4;
114+
if (px + 3 < size) {
115+
data[px + 0] = 255;
116+
data[px + 1] = 255;
117+
data[px + 2] = meta[i];
118+
data[px + 3] = meta[(15 - i)];
119+
}
120+
}
121+
}
122+
123+
void runVideoLoop(const std::shared_ptr<VideoSource> &source,
124+
std::atomic<bool> &running,
125+
void (*fill_fn)(VideoFrame &, std::uint64_t, std::uint64_t)) {
126+
VideoFrame frame =
127+
VideoFrame::create(kVideoWidth, kVideoHeight, VideoBufferType::ARGB);
128+
std::uint64_t frame_index = 0;
129+
while (running.load(std::memory_order_relaxed)) {
130+
const auto now = std::chrono::steady_clock::now().time_since_epoch();
131+
const auto ts_us = static_cast<std::uint64_t>(
132+
std::chrono::duration_cast<std::chrono::microseconds>(now).count());
133+
fill_fn(frame, frame_index, ts_us);
134+
try {
135+
source->captureFrame(frame, static_cast<std::int64_t>(ts_us),
136+
VideoRotation::VIDEO_ROTATION_0);
137+
} catch (...) {
138+
break;
139+
}
140+
frame_index++;
141+
std::this_thread::sleep_for(33ms);
142+
}
143+
}
144+
145+
void fillWebcamWrapper(VideoFrame &frame, std::uint64_t frame_index,
146+
std::uint64_t) {
147+
fillWebcamLikeFrame(frame, frame_index);
148+
}
149+
150+
void fillRedWrapper(VideoFrame &frame, std::uint64_t frame_index,
151+
std::uint64_t timestamp_us) {
152+
fillRedFrameWithMetadata(frame, frame_index, timestamp_us);
153+
}
154+
155+
void runToneLoop(const std::shared_ptr<AudioSource> &source,
156+
std::atomic<bool> &running, double base_freq_hz,
157+
bool siren_mode) {
158+
double phase = 0.0;
159+
constexpr double kTwoPi = 6.283185307179586;
160+
while (running.load(std::memory_order_relaxed)) {
161+
AudioFrame frame = AudioFrame::create(kAudioSampleRate, kAudioChannels,
162+
kSamplesPerChannel);
163+
auto &samples = frame.data();
164+
165+
const double time_sec =
166+
static_cast<double>(
167+
std::chrono::duration_cast<std::chrono::milliseconds>(
168+
std::chrono::steady_clock::now().time_since_epoch())
169+
.count()) /
170+
1000.0;
171+
const double freq =
172+
siren_mode ? (700.0 + 250.0 * std::sin(time_sec * 2.0)) : base_freq_hz;
173+
174+
const double phase_inc =
175+
kTwoPi * freq / static_cast<double>(kAudioSampleRate);
176+
for (int i = 0; i < kSamplesPerChannel; ++i) {
177+
samples[static_cast<std::size_t>(i)] =
178+
static_cast<std::int16_t>(std::sin(phase) * 12000.0);
179+
phase += phase_inc;
180+
if (phase > kTwoPi) {
181+
phase -= kTwoPi;
182+
}
183+
}
184+
185+
try {
186+
source->captureFrame(frame);
187+
} catch (...) {
188+
break;
189+
}
190+
191+
std::this_thread::sleep_for(std::chrono::milliseconds(kAudioFrameMs));
192+
}
193+
}
194+
195+
} // namespace
196+
197+
class MediaMultiStreamIntegrationTest : public LiveKitTestBase {
198+
protected:
199+
void runPublishTwoVideoAndTwoAudioTracks(bool single_peer_connection);
200+
};
201+
202+
void MediaMultiStreamIntegrationTest::runPublishTwoVideoAndTwoAudioTracks(
203+
bool single_peer_connection) {
204+
if (!config_.available) {
205+
GTEST_SKIP() << "LIVEKIT_URL, LIVEKIT_CALLER_TOKEN, and "
206+
"LIVEKIT_RECEIVER_TOKEN not set";
207+
}
208+
209+
RoomOptions options;
210+
options.auto_subscribe = true;
211+
options.single_peer_connection = single_peer_connection;
212+
213+
MediaSubscriptionState receiver_state;
214+
MediaTrackCollectorDelegate receiver_delegate(receiver_state);
215+
216+
auto receiver_room = std::make_unique<Room>();
217+
receiver_room->setDelegate(&receiver_delegate);
218+
ASSERT_TRUE(
219+
receiver_room->Connect(config_.url, config_.receiver_token, options))
220+
<< "Receiver failed to connect";
221+
222+
auto sender_room = std::make_unique<Room>();
223+
ASSERT_TRUE(sender_room->Connect(config_.url, config_.caller_token, options))
224+
<< "Sender failed to connect";
225+
226+
const std::string receiver_identity =
227+
receiver_room->localParticipant()->identity();
228+
const std::string sender_identity =
229+
sender_room->localParticipant()->identity();
230+
231+
constexpr int kVideoTrackCount = 10;
232+
constexpr int kAudioTrackCount = 10;
233+
234+
std::vector<std::shared_ptr<VideoSource>> video_sources;
235+
std::vector<std::shared_ptr<LocalVideoTrack>> video_tracks;
236+
std::vector<std::shared_ptr<LocalTrackPublication>> video_pubs;
237+
std::vector<std::shared_ptr<AudioSource>> audio_sources;
238+
std::vector<std::shared_ptr<LocalAudioTrack>> audio_tracks;
239+
std::vector<std::shared_ptr<LocalTrackPublication>> audio_pubs;
240+
std::vector<std::thread> threads;
241+
std::set<std::string> expected_names;
242+
243+
video_sources.reserve(kVideoTrackCount);
244+
video_tracks.reserve(kVideoTrackCount);
245+
video_pubs.reserve(kVideoTrackCount);
246+
audio_sources.reserve(kAudioTrackCount);
247+
audio_tracks.reserve(kAudioTrackCount);
248+
audio_pubs.reserve(kAudioTrackCount);
249+
threads.reserve(kVideoTrackCount + kAudioTrackCount);
250+
251+
for (int i = 0; i < kVideoTrackCount; ++i) {
252+
auto source = std::make_shared<VideoSource>(kVideoWidth, kVideoHeight);
253+
const std::string name = "video-track-" + std::to_string(i);
254+
auto track = LocalVideoTrack::createLocalVideoTrack(name, source);
255+
TrackPublishOptions opts;
256+
opts.source = (i % 2 == 0) ? TrackSource::SOURCE_CAMERA
257+
: TrackSource::SOURCE_SCREENSHARE;
258+
auto pub = sender_room->localParticipant()->publishTrack(track, opts);
259+
std::cerr << "[MediaMultiStream] published video " << name
260+
<< " sid=" << pub->sid() << std::endl;
261+
video_sources.push_back(source);
262+
video_tracks.push_back(track);
263+
video_pubs.push_back(pub);
264+
expected_names.insert(name);
265+
}
266+
267+
for (int i = 0; i < kAudioTrackCount; ++i) {
268+
auto source =
269+
std::make_shared<AudioSource>(kAudioSampleRate, kAudioChannels, 0);
270+
const std::string name = "audio-track-" + std::to_string(i);
271+
auto track = LocalAudioTrack::createLocalAudioTrack(name, source);
272+
TrackPublishOptions opts;
273+
opts.source = (i % 2 == 0) ? TrackSource::SOURCE_MICROPHONE
274+
: TrackSource::SOURCE_SCREENSHARE_AUDIO;
275+
auto pub = sender_room->localParticipant()->publishTrack(track, opts);
276+
std::cerr << "[MediaMultiStream] published audio " << name
277+
<< " sid=" << pub->sid() << std::endl;
278+
audio_sources.push_back(source);
279+
audio_tracks.push_back(track);
280+
audio_pubs.push_back(pub);
281+
expected_names.insert(name);
282+
}
283+
284+
std::atomic<bool> running{true};
285+
for (int i = 0; i < kVideoTrackCount; ++i) {
286+
auto source = video_sources[static_cast<std::size_t>(i)];
287+
const bool red_mode = (i % 2 == 1);
288+
threads.emplace_back([source, &running, red_mode]() {
289+
runVideoLoop(source, running,
290+
red_mode ? fillRedWrapper : fillWebcamWrapper);
291+
});
292+
}
293+
for (int i = 0; i < kAudioTrackCount; ++i) {
294+
auto source = audio_sources[static_cast<std::size_t>(i)];
295+
const bool siren_mode = (i % 2 == 1);
296+
const double base_freq = 320.0 + static_cast<double>(i) * 40.0;
297+
threads.emplace_back([source, &running, base_freq, siren_mode]() {
298+
runToneLoop(source, running, base_freq, siren_mode);
299+
});
300+
}
301+
302+
{
303+
std::unique_lock<std::mutex> lock(receiver_state.mutex);
304+
const bool all_received = receiver_state.cv.wait_for(lock, 20s, [&]() {
305+
return receiver_state.subscribed_track_names.size() >=
306+
expected_names.size();
307+
});
308+
EXPECT_TRUE(all_received) << "Timed out waiting for all subscribed tracks";
309+
if (!all_received) {
310+
std::cerr << "[MediaMultiStream] timeout waiting subscriptions; received "
311+
"names:";
312+
for (const auto &n : receiver_state.subscribed_track_names) {
313+
std::cerr << " " << n;
314+
}
315+
std::cerr << " (audio=" << receiver_state.audio_tracks
316+
<< " video=" << receiver_state.video_tracks << ")" << std::endl;
317+
}
318+
}
319+
320+
{
321+
std::lock_guard<std::mutex> lock(receiver_state.mutex);
322+
for (const auto &expected_name : expected_names) {
323+
EXPECT_TRUE(receiver_state.subscribed_track_names.count(expected_name) >
324+
0)
325+
<< "Missing subscribed track: " << expected_name;
326+
}
327+
EXPECT_GE(receiver_state.video_tracks, kVideoTrackCount);
328+
EXPECT_GE(receiver_state.audio_tracks, kAudioTrackCount);
329+
}
330+
331+
auto *sender_on_receiver = receiver_room->remoteParticipant(sender_identity);
332+
ASSERT_NE(sender_on_receiver, nullptr);
333+
std::cerr << "[MediaMultiStream] receiver sees sender publications="
334+
<< sender_on_receiver->trackPublications().size() << std::endl;
335+
for (const auto &kv : sender_on_receiver->trackPublications()) {
336+
const auto &pub = kv.second;
337+
std::cerr << "[MediaMultiStream] remote publication sid=" << kv.first
338+
<< " name=" << (pub ? pub->name() : "<null>") << " kind="
339+
<< (pub && pub->kind() == TrackKind::KIND_AUDIO ? "audio"
340+
: "video")
341+
<< " source=" << (pub ? static_cast<int>(pub->source()) : -1)
342+
<< std::endl;
343+
}
344+
EXPECT_GE(sender_on_receiver->trackPublications().size(),
345+
static_cast<std::size_t>(kVideoTrackCount + kAudioTrackCount));
346+
347+
running.store(false, std::memory_order_relaxed);
348+
for (auto &t : threads) {
349+
if (t.joinable()) {
350+
t.join();
351+
}
352+
}
353+
354+
for (const auto &pub : video_pubs) {
355+
sender_room->localParticipant()->unpublishTrack(pub->sid());
356+
}
357+
for (const auto &pub : audio_pubs) {
358+
sender_room->localParticipant()->unpublishTrack(pub->sid());
359+
}
360+
}
361+
362+
TEST_F(MediaMultiStreamIntegrationTest,
363+
PublishTwoVideoAndTwoAudioTracks_DualPeerConnection) {
364+
runPublishTwoVideoAndTwoAudioTracks(false);
365+
}
366+
367+
TEST_F(MediaMultiStreamIntegrationTest,
368+
PublishTwoVideoAndTwoAudioTracks_SinglePeerConnection) {
369+
runPublishTwoVideoAndTwoAudioTracks(true);
370+
}
371+
372+
} // namespace test
373+
} // namespace livekit

0 commit comments

Comments
 (0)