From 8f97cba70d71d688072e578b93e6b15ba179ed64 Mon Sep 17 00:00:00 2001 From: Jacob Date: Wed, 21 May 2025 10:10:03 +0900 Subject: [PATCH 1/2] fix ROS1/ROS2/Rosbags --- src/pypcd4/pypcd4.py | 52 +++++++++++++++++++++++++++++--------------- 1 file changed, 35 insertions(+), 17 deletions(-) diff --git a/src/pypcd4/pypcd4.py b/src/pypcd4/pypcd4.py index c8fc042..f1bfaf4 100644 --- a/src/pypcd4/pypcd4.py +++ b/src/pypcd4/pypcd4.py @@ -616,32 +616,44 @@ def to_msg(self, header: Optional["Header"] = None) -> "PointCloud2": """ ROS_MSG_AVAILABLE = False try: - from builtin_interfaces.msg import Time from sensor_msgs.msg import PointCloud2, PointField from std_msgs.msg import Header ROS_MSG_AVAILABLE = True + try: + from builtin_interfaces.msg import Time # ROS2 + except ImportError: + from std_msgs.msg import Time # ROS1 except ImportError: pass - try: - from rosbags.typesys.stores.latest import builtin_interfaces__msg__Time as Time - from rosbags.typesys.stores.latest import sensor_msgs__msg__PointCloud2 as PointCloud2 - from rosbags.typesys.stores.latest import sensor_msgs__msg__PointField as PointField - from rosbags.typesys.stores.latest import std_msgs__msg__Header as Header - ROS_MSG_AVAILABLE = True - except ImportError: - pass + if not ROS_MSG_AVAILABLE: + try: + # Fallback to rosbags + from rosbags.typesys.stores.latest import ( + builtin_interfaces__msg__Time as Time, + ) + from rosbags.typesys.stores.latest import ( + sensor_msgs__msg__PointCloud2 as PointCloud2, + ) + from rosbags.typesys.stores.latest import ( + sensor_msgs__msg__PointField as PointField, + ) + from rosbags.typesys.stores.latest import ( + std_msgs__msg__Header as Header, + ) + + ROS_MSG_AVAILABLE = True + except ImportError: + pass if not ROS_MSG_AVAILABLE: raise ImportError( - "The PointCloud.to_msg() method requires ROS sensor_msgs or rosbags installed.\n" - "Please install:\n" - " - ROS/ROS2 as appropriate (check official documentation), or\n" - " - rosbags via `pip install rosbags`. \n" - " Rosbags can be used to write to bags but not .publish(msg)" + "The PointCloud.to_msg() method requires ROS `sensor_msgs` or the `rosbags` pkg.\n" + "Please install either:\n" + " - ROS/ROS2 (check official documentation), or\n" + " - rosbags via `pip install rosbags`." ) - fields = [] itemsize = 0 row_step = 0 @@ -662,10 +674,16 @@ def to_msg(self, header: Optional["Header"] = None) -> "PointCloud2": ) offset += type_.itemsize * count - data = self.pc_data.tobytes() + data = self.pc_data.reshape(-1).view(np.uint8) + + if header is None: + try: + header = Header(stamp=Time(sec=0, nanosec=0), frame_id="") + except (TypeError, AttributeError): + header = Header(stamp=Time(0), frame_id="", seq=0) msg = PointCloud2( - header=Header(stamp=Time(sec=0, nanosec=0), frame_id="") if header is None else header, + header=header, height=1, width=self.points, is_dense=False, From b3069e19b22669959d0f92c850d44c918967f398 Mon Sep 17 00:00:00 2001 From: Jacob Date: Wed, 21 May 2025 10:10:56 +0900 Subject: [PATCH 2/2] add manual test for ROS1/ROS2 --- tests/test/test_pypcd4.py | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/tests/test/test_pypcd4.py b/tests/test/test_pypcd4.py index 56b223c..c1b22aa 100644 --- a/tests/test/test_pypcd4.py +++ b/tests/test/test_pypcd4.py @@ -961,4 +961,9 @@ def test_list_pointcloud(): concatenated_pc = PointCloud.from_list(pc_list) assert concatenated_pc.points == pc.points * num_pcs assert concatenated_pc.fields == pc.fields - assert concatenated_pc.types == pc.types \ No newline at end of file + assert concatenated_pc.types == pc.types + +if __name__ == "__main__": + print("Testing PointCloud msg functionality...") + test_pointcloud_tomsg() + print("Msg test passed!") \ No newline at end of file