Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
52 changes: 35 additions & 17 deletions src/pypcd4/pypcd4.py
Original file line number Diff line number Diff line change
Expand Up @@ -616,32 +616,44 @@ def to_msg(self, header: Optional["Header"] = None) -> "PointCloud2":
"""
ROS_MSG_AVAILABLE = False
try:
from builtin_interfaces.msg import Time
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header

ROS_MSG_AVAILABLE = True
try:
from builtin_interfaces.msg import Time # ROS2
except ImportError:
from std_msgs.msg import Time # ROS1
except ImportError:
pass
try:
from rosbags.typesys.stores.latest import builtin_interfaces__msg__Time as Time
from rosbags.typesys.stores.latest import sensor_msgs__msg__PointCloud2 as PointCloud2
from rosbags.typesys.stores.latest import sensor_msgs__msg__PointField as PointField
from rosbags.typesys.stores.latest import std_msgs__msg__Header as Header

ROS_MSG_AVAILABLE = True
except ImportError:
pass
if not ROS_MSG_AVAILABLE:
try:
# Fallback to rosbags
from rosbags.typesys.stores.latest import (
builtin_interfaces__msg__Time as Time,
)
from rosbags.typesys.stores.latest import (
sensor_msgs__msg__PointCloud2 as PointCloud2,
)
from rosbags.typesys.stores.latest import (
sensor_msgs__msg__PointField as PointField,
)
from rosbags.typesys.stores.latest import (
std_msgs__msg__Header as Header,
)

ROS_MSG_AVAILABLE = True
except ImportError:
pass

if not ROS_MSG_AVAILABLE:
raise ImportError(
"The PointCloud.to_msg() method requires ROS sensor_msgs or rosbags installed.\n"
"Please install:\n"
" - ROS/ROS2 as appropriate (check official documentation), or\n"
" - rosbags via `pip install rosbags`. \n"
" Rosbags can be used to write to bags but not .publish(msg)"
"The PointCloud.to_msg() method requires ROS `sensor_msgs` or the `rosbags` pkg.\n"
"Please install either:\n"
" - ROS/ROS2 (check official documentation), or\n"
" - rosbags via `pip install rosbags`."
)

fields = []
itemsize = 0
row_step = 0
Expand All @@ -662,10 +674,16 @@ def to_msg(self, header: Optional["Header"] = None) -> "PointCloud2":
)
offset += type_.itemsize * count

data = self.pc_data.tobytes()
data = self.pc_data.reshape(-1).view(np.uint8)

if header is None:
try:
header = Header(stamp=Time(sec=0, nanosec=0), frame_id="")
except (TypeError, AttributeError):
header = Header(stamp=Time(0), frame_id="", seq=0)

msg = PointCloud2(
header=Header(stamp=Time(sec=0, nanosec=0), frame_id="") if header is None else header,
header=header,
height=1,
width=self.points,
is_dense=False,
Expand Down
7 changes: 6 additions & 1 deletion tests/test/test_pypcd4.py
Original file line number Diff line number Diff line change
Expand Up @@ -961,4 +961,9 @@ def test_list_pointcloud():
concatenated_pc = PointCloud.from_list(pc_list)
assert concatenated_pc.points == pc.points * num_pcs
assert concatenated_pc.fields == pc.fields
assert concatenated_pc.types == pc.types
assert concatenated_pc.types == pc.types

if __name__ == "__main__":
print("Testing PointCloud msg functionality...")
test_pointcloud_tomsg()
print("Msg test passed!")
Loading