Skip to content

Kinematic-ICP on sparsely sampled LiDAR data #42

@sendeniz

Description

@sendeniz

Description:
We are running kinematic-ICP on sparsely sampled Livox Mid 360 LiDAR data (~30K points per cloud) with wheel odometry (ϕ: rotation around z axis) and one timestamp per cloud (x, y, z) when entire point cloud was written. Data is collected when the robot moves 0.5 m or rotates 60°.

We converted the our collected data from .txt format as close as possible to kitti format (here) and convert it to a ROS bag (here) using a python script (see below) and replicated the TF tree and frame names using ros2 run tf2_tools view_frames.

Image

Fig1. : Working sample bag from you.

Image

Fig 2.: Our bag.

Problem:
Using base_link throws an error. Renaming it to base_footprint removes the error but only shows a black screen. We suspect base_footprint is expected in the TF tree or point cloud header, but could not locate it.

Error message when using same tree structure as original bag:
[kinematic_icp_offline_node-1] [WARN] [1756312916.914347488] [LookupTransform]: Could not find a connection between 'odom' and 'base_footprint' because they are not part of the same tree. Tf has two or more unconnected trees.

When renaming base_link to base_footprint no errors emerge but we observe a black screen.

Goal:
Ensure our bag matches the working sample structure so kinematic-ICP runs correctly.

Considerations are however, also about data, given our sample regime is kinematic-ICP suitable ? Is there a more convenient way than writing it into a bag to execute kinematic-icp ? One of your colleagues suggested kinematic icp instead of kiss-slam as kiss-slam does not support odometry.

Bag creation: Data to Ros Bag:

Terminal 1:
ros2 bag record /lidar_points /odom /tf /tf_static /joint_states /diagnostics -o rosbag15 --storage mcap

Terminal 2:
python3 data_to_ros_bag.py

Python script:
data_to_ros_bag.py

import rclpy
from rclpy.node import Node
import struct
from std_msgs.msg import Header
from sensor_msgs.msg import PointCloud2, PointField
from sensor_msgs_py import point_cloud2
from nav_msgs.msg import Odometry
from geometry_msgs.msg import Quaternion, TransformStamped
from tf2_ros import TransformBroadcaster
import tf_transformations
from datetime import datetime
from pathlib import Path

class DataPublisher(Node):
    def __init__(self):
        super().__init__('data_publisher')

        # Publishers
        self.point_pub = self.create_publisher(PointCloud2, '/lidar_points', 10)
        self.odom_pub = self.create_publisher(Odometry, '/odom', 10)
        self.tf_broadcaster = TransformBroadcaster(self)

        # Load data
        self.base_path = Path("~/Downloads/2025_07_01/2025_07_01_drive_0001_sync").expanduser()
        self.load_data()

        # Timer for publishing at 10Hz, maybe incorrect ?
        self.timer = self.create_timer(0.1, self.publish_next)
        self.index = 0

    def load_data(self):
        self.timestamps = []
        with open(self.base_path / 'livox_points/timestamps.txt') as f:
            for line in f:
                dt = datetime.strptime(line.strip(), '%Y-%m-%d %H:%M:%S.%f')
                self.timestamps.append(dt)

        self.point_files = sorted((self.base_path / 'livox_points/data').glob('*.bin'))
        self.odom_files = sorted((self.base_path / 'oxts/data').glob('*.txt'))

        assert len(self.timestamps) == len(self.point_files) == len(self.odom_files)

    def publish_next(self):
        if self.index >= len(self.timestamps):
            self.get_logger().info("Finished publishing all data.")
            rclpy.shutdown()
            return

        ros_stamp = self.get_clock().now().to_msg()

        # Read odometry
        x, y, phi = self.read_odom(self.odom_files[self.index])
        quat = tf_transformations.quaternion_from_euler(0, 0, phi)

        # Header for lidar
        header = Header()
        header.stamp = ros_stamp
        header.frame_id = "hesai_lidar"  # only in message, no TF ?

        # Read and enhance point cloud
        points = self.read_bin_points(self.point_files[self.index])
        enhanced_points = [(px, py, pz, 1.0, 0, float(self.timestamps[self.index].timestamp()))
                           for px, py, pz in points]

        fields = [
            PointField(name='x', offset=0,  datatype=PointField.FLOAT32, count=1),
            PointField(name='y', offset=4,  datatype=PointField.FLOAT32, count=1),
            PointField(name='z', offset=8,  datatype=PointField.FLOAT32, count=1),
            PointField(name='intensity', offset=12, datatype=PointField.FLOAT32, count=1),
            PointField(name='ring', offset=16, datatype=PointField.UINT16, count=1),
            PointField(name='timestamp', offset=18, datatype=PointField.FLOAT64, count=1),
        ]

        cloud_msg = point_cloud2.create_cloud(header, fields, enhanced_points)
        self.point_pub.publish(cloud_msg)

        # Publish odometry: odom -> base_link
        odom_msg = Odometry()
        odom_msg.header.stamp = ros_stamp
        odom_msg.header.frame_id = "odom"
        odom_msg.child_frame_id = "base_link"
        odom_msg.pose.pose.position.x = x
        odom_msg.pose.pose.position.y = y
        odom_msg.pose.pose.position.z = 0.0
        odom_msg.pose.pose.orientation = Quaternion(x=quat[0], y=quat[1], z=quat[2], w=quat[3])
        self.odom_pub.publish(odom_msg)

        # TF: odom -> base_link
        t_odom = TransformStamped()
        t_odom.header.stamp = ros_stamp
        t_odom.header.frame_id = "odom"
        t_odom.child_frame_id = "base_link"
        t_odom.transform.translation.x = x
        t_odom.transform.translation.y = y
        t_odom.transform.translation.z = 0.0
        t_odom.transform.rotation.x = quat[0]
        t_odom.transform.rotation.y = quat[1]
        t_odom.transform.rotation.z = quat[2]
        t_odom.transform.rotation.w = quat[3]
        self.tf_broadcaster.sendTransform(t_odom)

        # Wheel frames: base_link -> wheels
        wheel_frames = ['front_left_wheel', 'front_right_wheel', 'rear_left_wheel', 'rear_right_wheel']
        for wheel_frame in wheel_frames:
            t_wheel = TransformStamped()
            t_wheel.header.stamp = ros_stamp
            t_wheel.header.frame_id = "base_link"
            t_wheel.child_frame_id = wheel_frame
            t_wheel.transform.translation.x = 0.0
            t_wheel.transform.translation.y = 0.0
            t_wheel.transform.translation.z = 0.0
            t_wheel.transform.rotation.x = quat[0]
            t_wheel.transform.rotation.y = quat[1]
            t_wheel.transform.rotation.z = quat[2]
            t_wheel.transform.rotation.w = quat[3]
            self.tf_broadcaster.sendTransform(t_wheel)

        self.index += 1

    def read_bin_points(self, file_path):
        with open(file_path, 'rb') as f:
            data = f.read()
            num_points = len(data) // (3 * 4)
            points = struct.unpack('f' * num_points * 3, data)
            return [(points[i], points[i+1], points[i+2]) for i in range(0, len(points), 3)]

    def read_odom(self, file_path):
        with open(file_path, 'r') as f:
            line = f.readline().strip()
            x, y, phi = map(float, line.split())
            return (x, y, phi)

def main():
    rclpy.init()
    node = DataPublisher()
    rclpy.spin(node)

if __name__ == '__main__':
    main()

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions