RealSense visualizerΒΆ

Connect to a RealSense camera, then visualize RGB-D readings as a point clouds. Requires pyrealsense2.

  1from __future__ import annotations
  2
  3import contextlib
  4
  5import numpy as np
  6import numpy.typing as npt
  7import pyrealsense2 as rs  # type: ignore
  8from tqdm.auto import tqdm
  9
 10import viser
 11
 12
 13@contextlib.contextmanager
 14def realsense_pipeline(fps: int = 30):
 15    """Context manager that yields a RealSense pipeline."""
 16
 17    # Configure depth and color streams.
 18    pipeline = rs.pipeline()  # type: ignore
 19    config = rs.config()  # type: ignore
 20
 21    pipeline_wrapper = rs.pipeline_wrapper(pipeline)  # type: ignore
 22    config.resolve(pipeline_wrapper)
 23
 24    config.enable_stream(rs.stream.depth, rs.format.z16, fps)  # type: ignore
 25    config.enable_stream(rs.stream.color, rs.format.rgb8, fps)  # type: ignore
 26
 27    # Start streaming.
 28    pipeline.start(config)
 29
 30    yield pipeline
 31
 32    # Close pipeline when done.
 33    pipeline.close()
 34
 35
 36def point_cloud_arrays_from_frames(
 37    depth_frame, color_frame
 38) -> tuple[npt.NDArray[np.float32], npt.NDArray[np.uint8]]:
 39    """Maps realsense frames to two arrays.
 40
 41    Returns:
 42    - A point position array: (N, 3) float32.
 43    - A point color array: (N, 3) uint8.
 44    """
 45    # Processing blocks. Could be tuned.
 46    point_cloud = rs.pointcloud()  # type: ignore
 47    decimate = rs.decimation_filter()  # type: ignore
 48    decimate.set_option(rs.option.filter_magnitude, 3)  # type: ignore
 49
 50    # Downsample depth frame.
 51    depth_frame = decimate.process(depth_frame)
 52
 53    # Map texture and calculate points from frames. Uses frame intrinsics.
 54    point_cloud.map_to(color_frame)
 55    points = point_cloud.calculate(depth_frame)
 56
 57    # Get color coordinates.
 58    texture_uv = (
 59        np.asanyarray(points.get_texture_coordinates())
 60        .view(np.float32)
 61        .reshape((-1, 2))
 62    )
 63    color_image = np.asanyarray(color_frame.get_data())
 64    color_h, color_w, _ = color_image.shape
 65
 66    # Note: for points that aren't in the view of our RGB camera, we currently clamp to
 67    # the closes available RGB pixel. We could also just remove these points.
 68    texture_uv = texture_uv.clip(0.0, 1.0)
 69
 70    # Get positions and colors.
 71    positions = np.asanyarray(points.get_vertices()).view(np.float32)
 72    positions = positions.reshape((-1, 3))
 73    colors = color_image[
 74        (texture_uv[:, 1] * (color_h - 1.0)).astype(np.int32),
 75        (texture_uv[:, 0] * (color_w - 1.0)).astype(np.int32),
 76        :,
 77    ]
 78    N = positions.shape[0]
 79
 80    assert positions.shape == (N, 3)
 81    assert positions.dtype == np.float32
 82    assert colors.shape == (N, 3)
 83    assert colors.dtype == np.uint8
 84
 85    return positions, colors
 86
 87
 88def main():
 89    # Start visualization server.
 90    server = viser.ViserServer()
 91
 92    with realsense_pipeline() as pipeline:
 93        for i in tqdm(range(10000000)):
 94            # Wait for a coherent pair of frames: depth and color
 95            frames = pipeline.wait_for_frames()
 96            depth_frame = frames.get_depth_frame()
 97            color_frame = frames.get_color_frame()
 98
 99            # Compute point cloud from frames.
100            positions, colors = point_cloud_arrays_from_frames(depth_frame, color_frame)
101
102            R = np.array(
103                [
104                    [1.0, 0.0, 0.0],
105                    [0.0, 0.0, 1.0],
106                    [0.0, -1.0, 0.0],
107                ],
108                dtype=np.float32,
109            )
110            positions = positions @ R.T
111
112            # Visualize.
113            server.scene.add_point_cloud(
114                "/realsense",
115                points=positions * 10.0,
116                colors=colors,
117                point_size=0.1,
118            )
119
120
121if __name__ == "__main__":
122    main()