RealSense visualizer#

Connect to a RealSense camera, then visualize RGB-D readings as a point clouds. Requires pyrealsense2.

  1import contextlib
  2from typing import Tuple
  3
  4import numpy as np
  5import numpy.typing as npt
  6import pyrealsense2 as rs  # type: ignore
  7import viser
  8from tqdm.auto import tqdm
  9
 10
 11@contextlib.contextmanager
 12def realsense_pipeline(fps: int = 30):
 13    """Context manager that yields a RealSense pipeline."""
 14
 15    # Configure depth and color streams.
 16    pipeline = rs.pipeline()  # type: ignore
 17    config = rs.config()  # type: ignore
 18
 19    pipeline_wrapper = rs.pipeline_wrapper(pipeline)  # type: ignore
 20    config.resolve(pipeline_wrapper)
 21
 22    config.enable_stream(rs.stream.depth, rs.format.z16, fps)  # type: ignore
 23    config.enable_stream(rs.stream.color, rs.format.rgb8, fps)  # type: ignore
 24
 25    # Start streaming.
 26    pipeline.start(config)
 27
 28    yield pipeline
 29
 30    # Close pipeline when done.
 31    pipeline.close()
 32
 33
 34def point_cloud_arrays_from_frames(
 35    depth_frame, color_frame
 36) -> Tuple[npt.NDArray[np.float32], npt.NDArray[np.uint8]]:
 37    """Maps realsense frames to two arrays.
 38
 39    Returns:
 40    - A point position array: (N, 3) float32.
 41    - A point color array: (N, 3) uint8.
 42    """
 43    # Processing blocks. Could be tuned.
 44    point_cloud = rs.pointcloud()  # type: ignore
 45    decimate = rs.decimation_filter()  # type: ignore
 46    decimate.set_option(rs.option.filter_magnitude, 3)  # type: ignore
 47
 48    # Downsample depth frame.
 49    depth_frame = decimate.process(depth_frame)
 50
 51    # Map texture and calculate points from frames. Uses frame intrinsics.
 52    point_cloud.map_to(color_frame)
 53    points = point_cloud.calculate(depth_frame)
 54
 55    # Get color coordinates.
 56    texture_uv = (
 57        np.asanyarray(points.get_texture_coordinates())
 58        .view(np.float32)
 59        .reshape((-1, 2))
 60    )
 61    color_image = np.asanyarray(color_frame.get_data())
 62    color_h, color_w, _ = color_image.shape
 63
 64    # Note: for points that aren't in the view of our RGB camera, we currently clamp to
 65    # the closes available RGB pixel. We could also just remove these points.
 66    texture_uv = texture_uv.clip(0.0, 1.0)
 67
 68    # Get positions and colors.
 69    positions = np.asanyarray(points.get_vertices()).view(np.float32)
 70    positions = positions.reshape((-1, 3))
 71    colors = color_image[
 72        (texture_uv[:, 1] * (color_h - 1.0)).astype(np.int32),
 73        (texture_uv[:, 0] * (color_w - 1.0)).astype(np.int32),
 74        :,
 75    ]
 76    N = positions.shape[0]
 77
 78    assert positions.shape == (N, 3)
 79    assert positions.dtype == np.float32
 80    assert colors.shape == (N, 3)
 81    assert colors.dtype == np.uint8
 82
 83    return positions, colors
 84
 85
 86def main():
 87    # Start visualization server.
 88    viser_server = viser.ViserServer()
 89
 90    with realsense_pipeline() as pipeline:
 91        for i in tqdm(range(10000000)):
 92            # Wait for a coherent pair of frames: depth and color
 93            frames = pipeline.wait_for_frames()
 94            depth_frame = frames.get_depth_frame()
 95            color_frame = frames.get_color_frame()
 96
 97            # Compute point cloud from frames.
 98            positions, colors = point_cloud_arrays_from_frames(depth_frame, color_frame)
 99
100            R = np.array(
101                [
102                    [1.0, 0.0, 0.0],
103                    [0.0, 0.0, 1.0],
104                    [0.0, -1.0, 0.0],
105                ],
106                dtype=np.float32,
107            )
108            positions = positions @ R.T
109
110            # Visualize.
111            viser_server.add_point_cloud(
112                "/realsense",
113                points=positions * 10.0,
114                colors=colors,
115                point_size=0.1,
116            )
117
118
119if __name__ == "__main__":
120    main()