RealSense visualizer#

Connect to a RealSense camera, then visualize RGB-D readings as a point clouds. Requires pyrealsense2.

  1from __future__ import annotations
  2
  3import contextlib
  4
  5import numpy as np
  6import numpy.typing as npt
  7import pyrealsense2 as rs  # type: ignore
  8import viser
  9from tqdm.auto import tqdm
 10
 11
 12@contextlib.contextmanager
 13def realsense_pipeline(fps: int = 30):
 14    """Context manager that yields a RealSense pipeline."""
 15
 16    # Configure depth and color streams.
 17    pipeline = rs.pipeline()  # type: ignore
 18    config = rs.config()  # type: ignore
 19
 20    pipeline_wrapper = rs.pipeline_wrapper(pipeline)  # type: ignore
 21    config.resolve(pipeline_wrapper)
 22
 23    config.enable_stream(rs.stream.depth, rs.format.z16, fps)  # type: ignore
 24    config.enable_stream(rs.stream.color, rs.format.rgb8, fps)  # type: ignore
 25
 26    # Start streaming.
 27    pipeline.start(config)
 28
 29    yield pipeline
 30
 31    # Close pipeline when done.
 32    pipeline.close()
 33
 34
 35def point_cloud_arrays_from_frames(
 36    depth_frame, color_frame
 37) -> tuple[npt.NDArray[np.float32], npt.NDArray[np.uint8]]:
 38    """Maps realsense frames to two arrays.
 39
 40    Returns:
 41    - A point position array: (N, 3) float32.
 42    - A point color array: (N, 3) uint8.
 43    """
 44    # Processing blocks. Could be tuned.
 45    point_cloud = rs.pointcloud()  # type: ignore
 46    decimate = rs.decimation_filter()  # type: ignore
 47    decimate.set_option(rs.option.filter_magnitude, 3)  # type: ignore
 48
 49    # Downsample depth frame.
 50    depth_frame = decimate.process(depth_frame)
 51
 52    # Map texture and calculate points from frames. Uses frame intrinsics.
 53    point_cloud.map_to(color_frame)
 54    points = point_cloud.calculate(depth_frame)
 55
 56    # Get color coordinates.
 57    texture_uv = (
 58        np.asanyarray(points.get_texture_coordinates())
 59        .view(np.float32)
 60        .reshape((-1, 2))
 61    )
 62    color_image = np.asanyarray(color_frame.get_data())
 63    color_h, color_w, _ = color_image.shape
 64
 65    # Note: for points that aren't in the view of our RGB camera, we currently clamp to
 66    # the closes available RGB pixel. We could also just remove these points.
 67    texture_uv = texture_uv.clip(0.0, 1.0)
 68
 69    # Get positions and colors.
 70    positions = np.asanyarray(points.get_vertices()).view(np.float32)
 71    positions = positions.reshape((-1, 3))
 72    colors = color_image[
 73        (texture_uv[:, 1] * (color_h - 1.0)).astype(np.int32),
 74        (texture_uv[:, 0] * (color_w - 1.0)).astype(np.int32),
 75        :,
 76    ]
 77    N = positions.shape[0]
 78
 79    assert positions.shape == (N, 3)
 80    assert positions.dtype == np.float32
 81    assert colors.shape == (N, 3)
 82    assert colors.dtype == np.uint8
 83
 84    return positions, colors
 85
 86
 87def main():
 88    # Start visualization server.
 89    server = viser.ViserServer()
 90
 91    with realsense_pipeline() as pipeline:
 92        for i in tqdm(range(10000000)):
 93            # Wait for a coherent pair of frames: depth and color
 94            frames = pipeline.wait_for_frames()
 95            depth_frame = frames.get_depth_frame()
 96            color_frame = frames.get_color_frame()
 97
 98            # Compute point cloud from frames.
 99            positions, colors = point_cloud_arrays_from_frames(depth_frame, color_frame)
100
101            R = np.array(
102                [
103                    [1.0, 0.0, 0.0],
104                    [0.0, 0.0, 1.0],
105                    [0.0, -1.0, 0.0],
106                ],
107                dtype=np.float32,
108            )
109            positions = positions @ R.T
110
111            # Visualize.
112            server.scene.add_point_cloud(
113                "/realsense",
114                points=positions * 10.0,
115                colors=colors,
116                point_size=0.1,
117            )
118
119
120if __name__ == "__main__":
121    main()