Skip to content

Per-message allocation and copy in from_rmw_message causes large-message throughput to scale poorly #628

@azerupi

Description

@azerupi

Summary

Every received message goes through a RmwMsg::default() allocation followed by an element-by-element conversion in from_rmw_message. For large payloads this is the dominant bottleneck: at 64 KB, rclrs throughput is ~18x slower than rclcpp even after eliminating the thread-per-spin overhead (see #627).

Minimal reproduction

Using the same benchmark structure as #627, but with a message size of 65536 bytes instead of 0 bytes.

rclrs (Rust)

use rclrs::{Context, CreateBasicExecutor, IntoPrimitiveOptions, QoSProfile, SpinOptions};
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::Arc;
use std::time::{Duration, Instant};

fn main() {
    let context = Context::default();
    let mut executor = context.create_basic_executor();
    let node = executor.create_node("bench").unwrap();

    let qos = QoSProfile::default().reliable().keep_last(1000);
    let recv_count = Arc::new(AtomicU64::new(0));

    let pub_handle = node
        .create_publisher::<std_msgs::msg::UInt8MultiArray>("bench_topic".qos(qos.clone()))
        .unwrap();

    let count = Arc::clone(&recv_count);
    let _sub = node
        .create_subscription::<std_msgs::msg::UInt8MultiArray, _>(
            "bench_topic".qos(qos.clone()),
            move |_msg: std_msgs::msg::UInt8MultiArray| {
                count.fetch_add(1, Ordering::Relaxed);
            },
        )
        .unwrap();

    let duration = Duration::from_secs(5);
    let mut sent: u64 = 0;
    let start = Instant::now();

    while start.elapsed() < duration {
        let mut msg = std_msgs::msg::UInt8MultiArray::default();
        msg.data = vec![0u8; 65536];
        pub_handle.publish(msg).ok();
        sent += 1;

        // Assumes a spin_some that runs on the calling thread (see issue #NNN)
        executor.spin_some(SpinOptions::new().timeout(Duration::ZERO));
    }

    let received = recv_count.load(Ordering::Relaxed);
    let elapsed = start.elapsed().as_secs_f64();
    eprintln!("Sent:       {sent}");
    eprintln!("Received:   {received}");
    eprintln!("Throughput: {:.0} msg/s", received as f64 / elapsed);
}

rclcpp (C++) equivalent

#include <rclcpp/rclcpp.hpp>
#include <std_msgs/msg/u_int8_multi_array.hpp>
#include <atomic>
#include <chrono>
#include <cstdio>

int main(int argc, char** argv) {
    rclcpp::init(argc, argv);

    auto executor = std::make_shared<rclcpp::executors::SingleThreadedExecutor>();
    auto node = std::make_shared<rclcpp::Node>("spin_bench");
    executor->add_node(node);

    auto qos = rclcpp::QoS(1000).reliable();
    std::atomic<uint64_t> recv_count{0};

    auto pub_handle = node->create_publisher<std_msgs::msg::UInt8MultiArray>("bench_topic", qos);

    auto sub = node->create_subscription<std_msgs::msg::UInt8MultiArray>(
        "bench_topic", qos,
        [&recv_count](const std_msgs::msg::UInt8MultiArray::SharedPtr) {
            recv_count.fetch_add(1, std::memory_order_relaxed);
        });

    uint64_t sent = 0;
    auto start = std::chrono::steady_clock::now();
    auto duration = std::chrono::seconds(5);

    while (std::chrono::steady_clock::now() - start < duration) {
        std_msgs::msg::UInt8MultiArray msg;
        msg.data.resize(65536, 0);
        pub_handle->publish(msg);
        sent++;

        executor->spin_some(std::chrono::milliseconds(0));
    }

    uint64_t received = recv_count.load();
    double elapsed = std::chrono::duration<double>(
        std::chrono::steady_clock::now() - start).count();

    fprintf(stderr, "Sent:       %lu\n", sent);
    fprintf(stderr, "Received:   %lu\n", received);
    fprintf(stderr, "Throughput: %.0f msg/s\n", received / elapsed);

    rclcpp::shutdown();
    return 0;
}

Results (release build, 3 runs averaged, same machine)

Implementation 0 B payload 64 KB payload Slowdown factor
rclcpp spin_some ~450k msg/s ~273k msg/s 1.6x
rclrs spin_some ~373k msg/s ~15k msg/s 25x

At 0 B, rclrs is within 1.2x of rclcpp (see #627). At 64 KB, it is 18x slower. rclcpp barely slows down with larger messages (1.6x), while rclrs degrades dramatically (25x).

Most likely root cause

There are two sources of per-message overhead that scale with payload size.

1. Wasted RmwMsg::default() allocation

Every take() call creates a fresh RMW message via default() (subscription.rs:352):

let mut rmw_message = <T as Message>::RmwMsg::default();   // C heap alloc
Self::take_inner::<T>(self, &mut rmw_message)?;            // rcl_take fills it
Ok((T::from_rmw_message(rmw_message), message_info))       // convert to Rust

RmwMsg::default() calls the C init() function which pre-allocates memory for all sequences and strings. For a message with a 64 KB sequence field, this allocates 64 KB that is immediately overwritten by rcl_take and then freed when the RMW message is dropped. This is a wasted alloc/dealloc cycle per message.

2. Element-by-element sequence conversion in from_rmw_message

The generated from_rmw_message code converts Sequence<T> to Vec<T> via .into_iter().collect(). The SequenceIterator::next() implementation (rosidl_runtime_rs/src/sequence.rs) does this per element:

let elem = ptr.read();
ptr.write(std::mem::zeroed::<T>());  // writes zero back for EVERY element

For a 64 KB Sequence<u8>, this is 65,536 individual read + zero-write + insert cycles instead of a single memcpy.

How rclcpp avoids this

rclcpp uses C++ message types directly. The RMW layer writes into the final message object, so there is no temporary buffer, no type conversion step, and no element-by-element copying. The message exists in one place from rcl_take through callback invocation.

Possible improvements

  1. Efficient sequence conversion: For Copy types (all ROS 2 primitives), Sequence::as_slice().to_vec() compiles to a single memcpy. A From<Sequence<T>> for Vec<T> impl with a Copy bound, combined with a code generator change to emit .into() instead of .into_iter().collect() for BasicType sequences, would eliminate the per-element overhead.

  2. RMW message buffer reuse: Store a reusable RmwMsg buffer in the subscription instead of allocating a fresh one per take(). This avoids the wasted alloc/dealloc cycle from RmwMsg::default().

Impact

This affects any subscription receiving messages with variable-length fields (sequences, strings). The overhead scales linearly with payload size, making rclrs unsuitable for high-throughput large-message workloads (e.g. point clouds, images) without workarounds like ReadOnlyLoanedMessage.

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type
    No fields configured for issues without a type.

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions