from __future__ import annotations
import asyncio
import copy
import dataclasses
import warnings
from collections.abc import Coroutine
from typing import (
TYPE_CHECKING,
Any,
Callable,
Dict,
Generic,
Literal,
Optional,
Protocol,
Tuple,
TypeVar,
Union,
cast,
get_args,
overload,
)
import numpy as np
import numpy.typing as npt
from typing_extensions import Self, deprecated, override
from . import _messages
from ._assignable_props_api import AssignablePropsBase
from .infra._infra import WebsockClientConnection, WebsockServer
if TYPE_CHECKING:
from ._gui_api import GuiApi
from ._scene_api import SceneApi
from ._viser import ClientHandle
from .infra import ClientId
[docs]
@dataclasses.dataclass(frozen=True)
class SceneClickEvent:
"""Event passed to scene-level click callbacks (``SceneApi.on_click``)."""
client: ClientHandle
"""Client that triggered this event."""
client_id: int
"""ID of client that triggered this event."""
ray_origin: tuple[float, float, float]
"""Origin of the 3D ray corresponding to this click, in world coordinates."""
ray_direction: tuple[float, float, float]
"""Direction of the 3D ray corresponding to this click, in world coordinates."""
screen_pos: tuple[float, float]
"""Screen position of the click in OpenCV image coordinates (0 to 1).
(0, 0) is the upper-left corner, (1, 1) is the bottom-right corner."""
modifier: _messages.KeyModifier | None
"""Modifier-combo held at click time. ``None`` if no modifiers
were held; otherwise a canonical :data:`KeyModifier` string."""
[docs]
@dataclasses.dataclass(frozen=True)
class SceneRectSelectEvent:
"""Event passed to scene rectangle-select callbacks
(``SceneApi.on_rect_select``)."""
client: ClientHandle
"""Client that triggered this event."""
client_id: int
"""ID of client that triggered this event."""
screen_min: tuple[float, float]
"""Min-corner of the selection rectangle in OpenCV image coordinates
(0 to 1)."""
screen_max: tuple[float, float]
"""Max-corner of the selection rectangle."""
modifier: _messages.KeyModifier | None
"""Modifier-combo held at gesture start. ``None`` if no modifiers
were held; otherwise a canonical :data:`KeyModifier` string."""
[docs]
@dataclasses.dataclass(frozen=True)
class ScenePointerEvent:
"""Event passed to scene pointer callbacks (legacy ``on_pointer_event``).
.. deprecated::
Use :meth:`SceneApi.on_click` with :class:`SceneClickEvent` or
:meth:`SceneApi.on_rect_select` with :class:`SceneRectSelectEvent`
instead. This shape unions the click and rect-select cases into a
single dataclass with awkward Optional/variable-length fields.
"""
client: ClientHandle
"""Client that triggered this event."""
client_id: int
"""ID of client that triggered this event."""
event_type: _messages.ScenePointerEventType
"""Type of event that was triggered. Currently we only support clicks and box selections."""
ray_origin: tuple[float, float, float] | None
"""Origin of 3D ray corresponding to this click, in world coordinates."""
ray_direction: tuple[float, float, float] | None
"""Direction of 3D ray corresponding to this click, in world coordinates."""
screen_pos: tuple[tuple[float, float], ...]
"""Screen position of the click on the screen (OpenCV image coordinates, 0 to 1).
(0, 0) is the upper-left corner, (1, 1) is the bottom-right corner.
For a box selection, this includes the min- and max- corners of the box."""
modifier: _messages.KeyModifier | None
"""Modifier-combo held when this event fired. ``None`` if no
modifiers were held; otherwise a canonical :data:`KeyModifier`
string."""
@property
@deprecated("The `event` property is deprecated. Use `event_type` instead.")
def event(self):
"""Deprecated. Use `event_type` instead.
.. deprecated:: 0.2.23
The `event` property is deprecated. Use `event_type` instead.
"""
return self.event_type
TSceneNodeHandle = TypeVar("TSceneNodeHandle", bound="SceneNodeHandle")
DragPhase = Literal["start", "update", "end"]
"""Which point in a scene-node drag lifecycle a callback fires on."""
@dataclasses.dataclass(frozen=True)
class _DragInput:
"""Pointer input state at the moment of a drag event.
Private -- consolidates the button + modifier pair that would
otherwise move as separate positional args through every dispatch
function."""
button: Literal["left", "middle", "right"]
modifier: _messages.KeyModifier | None
@dataclasses.dataclass
class _DragCallbackEntry:
"""One registered drag callback + its filter.
Private to the handle; exposed to dispatch via ``_dispatch_drag``."""
callback: Callable[
[SceneNodeDragEvent[_RaycastSupportedSceneNodeHandle]], None | Coroutine
]
button: _messages.DragButton
modifier: _messages.KeyModifier | None
@dataclasses.dataclass
class _ClickCallbackEntry:
"""One registered click callback + its modifier filter.
Private to the handle; exposed to dispatch via ``_dispatch_click``."""
callback: Callable[
[SceneNodePointerEvent[_RaycastSupportedSceneNodeHandle]], None | Coroutine
]
modifier: _messages.KeyModifier | None
@dataclasses.dataclass
class _SceneNodeHandleState:
name: str
props: Any # _messages.*Prop object.
"""Message containing properties of this scene node that are sent to the
client."""
api: SceneApi
wxyz: np.ndarray = dataclasses.field(
default_factory=lambda: np.array([1.0, 0.0, 0.0, 0.0])
)
position: np.ndarray = dataclasses.field(
default_factory=lambda: np.array([0.0, 0.0, 0.0])
)
visible: bool = True
click_cb: list[_ClickCallbackEntry] = dataclasses.field(default_factory=list)
drag_cb: list[_DragCallbackEntry] = dataclasses.field(default_factory=list)
removed: bool = False
# Last bindings tuple published to the client. Used to dedup
# redundant ``SetSceneNodeClickBindingsMessage`` emits — without
# this, a no-op ``remove_click_callback("foo")`` for an
# unregistered callback resends an empty bindings tuple.
# ``None`` until the first publish.
_last_published_click_bindings: tuple[_messages.DragBinding, ...] | None = None
class _SceneNodeMessage(Protocol):
name: str
props: Any
[docs]
class SceneNodeHandle(AssignablePropsBase[_SceneNodeHandleState]):
"""Handle base class for interacting with scene nodes."""
@override
def _queue_update(self, name: str, value: Any) -> None:
self._impl.api._websock_interface.queue_message(
_messages.SceneNodeUpdateMessage(self._impl.name, {name: value})
)
@property
def name(self) -> str:
"""Read-only name of the scene node."""
return self._impl.name
@classmethod
def _make(
cls: type[TSceneNodeHandle],
api: SceneApi,
message: _SceneNodeMessage,
name: str,
wxyz: tuple[float, float, float, float] | np.ndarray,
position: tuple[float, float, float] | np.ndarray,
visible: bool,
) -> TSceneNodeHandle:
"""Create scene node: send state to client(s) and set up
server-side state."""
# Normalize name to always start with "/".
if not name.startswith("/"):
name = "/" + name
message.name = name
# Ensure all ancestor nodes exist (creates intermediate frames as needed).
api._ensure_ancestors_exist(name)
# Send message.
assert isinstance(message, _messages.Message)
api._websock_interface.queue_message(message)
out = cls(_SceneNodeHandleState(name, copy.deepcopy(message.props), api))
api._handle_from_node_name[name] = out
# Track parent -> child relationship.
parent = name.rsplit("/", 1)[0]
api._children_from_node_name.setdefault(parent, set()).add(name)
api._children_from_node_name.setdefault(name, set())
out.wxyz = wxyz
out.position = position
# Toggle visibility to make sure we send a
# SetSceneNodeVisibilityMessage to the client.
out._impl.visible = not visible
out.visible = visible
return out
@property
def wxyz(self) -> npt.NDArray[np.float64]:
"""Orientation of the scene node. This is the quaternion representation of the R
in `p_parent = [R | t] p_local`. Synchronized to clients automatically when assigned.
"""
return self._impl.wxyz
@wxyz.setter
def wxyz(self, wxyz: tuple[float, float, float, float] | np.ndarray) -> None:
from ._scene_api import cast_vector
wxyz_cast = cast_vector(wxyz, 4)
wxyz_array = np.asarray(wxyz)
if np.allclose(wxyz_cast, self._impl.wxyz):
return
self._impl.wxyz[:] = wxyz_array
self._impl.api._websock_interface.queue_message(
_messages.SetOrientationMessage(self._impl.name, wxyz_cast)
)
@property
def position(self) -> npt.NDArray[np.float64]:
"""Position of the scene node. This is equivalent to the t in
`p_parent = [R | t] p_local`. Synchronized to clients automatically when assigned.
"""
return self._impl.position
@position.setter
def position(self, position: tuple[float, float, float] | np.ndarray) -> None:
from ._scene_api import cast_vector
position_cast = cast_vector(position, 3)
position_array = np.asarray(position)
if np.allclose(position_array, self._impl.position):
return
self._impl.position[:] = position_array
self._impl.api._websock_interface.queue_message(
_messages.SetPositionMessage(self._impl.name, position_cast)
)
@property
def visible(self) -> bool:
"""Whether the scene node is visible or not. Synchronized to clients automatically when assigned."""
return self._impl.visible
@visible.setter
def visible(self, visible: bool) -> None:
if visible == self._impl.visible:
return
self._impl.api._websock_interface.queue_message(
_messages.SetSceneNodeVisibilityMessage(self._impl.name, visible)
)
self._impl.visible = visible
[docs]
def remove(self) -> None:
"""Remove the node from the scene."""
# Warn if already removed.
if self._impl.removed:
warnings.warn(f"Attempted to remove already removed node: {self.name}")
return
# Collect all descendants via BFS.
api = self._impl.api
to_remove = [self._impl.name]
i = 0
while i < len(to_remove):
children = api._children_from_node_name.get(to_remove[i], ())
to_remove.extend(children)
i += 1
# Clear stale per-node interaction state (click + drag) before
# we tear down handles. The bindings messages are name-keyed in
# the persistent buffer and aren't purged by
# ``RemoveSceneNodeMessage``, so without an empty replacement a
# future node created with the same name would inherit stale
# interaction state on late-joining clients. Has to run for
# every node in ``to_remove`` (not just ``self``) so cascading
# parent removal cleans up interactive descendants too.
#
# Drag callbacks are preserved when a drag is in flight: the
# client will send a final ``phase="end"`` message after it
# observes the removal (via ``stopIfNodeIs``), and the user's
# ``on_drag_end`` MUST fire so per-drag state can be released.
# ``_handle_node_drag`` looks the handle up in the active-drag
# registry for non-start phases, so preserving ``drag_cb`` on
# the handle keeps the dispatch path alive until end.
for node_name in to_remove:
handle = api._handle_from_node_name.get(node_name)
if handle is None:
continue
impl = handle._impl
if len(impl.click_cb) > 0:
# Empty the per-node click-binding set so a re-created
# node with the same name doesn't inherit the prior
# node's modifier filters from the persistent buffer.
api._websock_interface.queue_message(
_messages.SetSceneNodeClickBindingsMessage(node_name, ())
)
if impl.drag_cb:
if not api._is_drag_active_for(node_name):
impl.drag_cb.clear()
api._websock_interface.queue_message(
_messages.SetSceneNodeDragBindingsMessage(node_name, ())
)
# Clean up all descendants from both dicts.
for node_name in to_remove:
handle = api._handle_from_node_name.pop(node_name, None)
if handle is not None:
handle._impl.removed = True
api._children_from_node_name.pop(node_name, None)
# Remove from parent's children set.
parent = self._impl.name.rsplit("/", 1)[0]
parent_children = api._children_from_node_name.get(parent)
if parent_children is not None:
parent_children.discard(self._impl.name)
# Send a RemoveSceneNodeMessage per descendant so redundancy keys
# clean up their creation messages from the broadcast buffer.
for node_name in to_remove:
api._websock_interface.queue_message(
_messages.RemoveSceneNodeMessage(node_name)
)
[docs]
@dataclasses.dataclass(frozen=True)
class SceneNodePointerEvent(Generic[TSceneNodeHandle]):
"""Event passed to pointer callbacks for scene nodes (currently only clicks)."""
client: ClientHandle
"""Client that triggered this event."""
client_id: int
"""ID of client that triggered this event."""
event: Literal["click"]
"""Type of event that was triggered. Currently we only support clicks."""
target: TSceneNodeHandle
"""Scene node that was clicked."""
ray_origin: tuple[float, float, float]
"""Origin of 3D ray corresponding to this click, in world coordinates."""
ray_direction: tuple[float, float, float]
"""Direction of 3D ray corresponding to this click, in world coordinates."""
screen_pos: tuple[float, float]
"""Screen position of the click on the screen (OpenCV image coordinates, 0 to 1).
(0, 0) is the upper-left corner, (1, 1) is the bottom-right corner."""
instance_index: int | None
"""Instance ID of the clicked object, if applicable. Currently this is `None` for all objects except for the output of :meth:`SceneApi.add_batched_axes()`."""
modifier: _messages.KeyModifier | None
"""Modifier-combo held when this event fired. ``None`` if no
modifiers were held; otherwise a canonical :data:`KeyModifier`
string."""
NoneOrCoroutine = TypeVar("NoneOrCoroutine", None, Coroutine)
[docs]
@dataclasses.dataclass(frozen=True)
class SceneNodeDragEvent(Generic[TSceneNodeHandle]):
"""Event passed to scene-node drag callbacks."""
client: ClientHandle
"""Client that triggered this event."""
client_id: int
"""ID of client that triggered this event."""
target: TSceneNodeHandle
"""Scene node that is being dragged."""
phase: DragPhase
"""Drag lifecycle phase: ``"start"`` at press, ``"update"`` on
every throttled pointermove (~20Hz), ``"end"`` at release. A
single drag fires exactly one ``"start"``, zero or more
``"update"``s, and exactly one ``"end"``."""
instance_index: int | None
"""Instance index within a batched scene node (e.g. batched meshes,
batched GLBs, batched axes); ``None`` for non-batched nodes. Frozen
at drag-start -- the drag always refers to the instance that was
under the cursor when the gesture began."""
start_position: Tuple[float, float, float]
"""World-coords position of the click point on the object. *Live* --
updates each event as the object moves, so it always reflects where
the grab point currently is in world coords (useful for
rotate-around-grab gestures)."""
start_screen_pos: Tuple[float, float]
"""Live OpenCV screen-space projection of the click point."""
end_position: Tuple[float, float, float]
"""Current pointer projected onto the camera-aligned drag plane,
in world coords."""
end_screen_pos: Tuple[float, float]
"""Current pointer in OpenCV screen-space coordinates."""
button: Literal["left", "middle", "right"]
"""Mouse button that initiated the drag."""
modifier: _messages.KeyModifier | None
"""Modifier-combo held at drag-start (frozen for the drag's
lifetime). ``None`` if no modifiers were held; otherwise a
canonical :data:`KeyModifier` string."""
_VALID_DRAG_BUTTONS: Tuple[_messages.DragButton, ...] = get_args(_messages.DragButton)
class _RaycastSupportedSceneNodeHandle(SceneNodeHandle):
def _sync_drag_bindings(self) -> None:
"""Recompute the union of registered (button, modifiers) and
push it to the client as a full binding set."""
seen: set[Tuple[_messages.DragButton, _messages.KeyModifier | None]] = set()
bindings: list[_messages.DragBinding] = []
for entry in self._impl.drag_cb:
key = (entry.button, entry.modifier)
if key in seen:
continue
seen.add(key)
bindings.append(
_messages.DragBinding(button=entry.button, modifier=entry.modifier)
)
self._impl.api._websock_interface.queue_message(
_messages.SetSceneNodeDragBindingsMessage(self._impl.name, tuple(bindings))
)
def _has_any_drag_callbacks(self) -> bool:
return bool(self._impl.drag_cb)
def _dispatch_drag(
self, input: _DragInput
) -> list[
Callable[
[SceneNodeDragEvent[_RaycastSupportedSceneNodeHandle]], None | Coroutine
]
]:
"""Return the callbacks whose filter matches this input."""
from ._scene_api import _drag_input_matches_filter
return [
entry.callback
for entry in self._impl.drag_cb
if _drag_input_matches_filter(input, entry.button, entry.modifier)
]
@staticmethod
def _validate_button(button: _messages.DragButton) -> None:
if button not in _VALID_DRAG_BUTTONS:
raise ValueError(
f"Unknown drag button {button!r}. "
f"Valid buttons: {list(_VALID_DRAG_BUTTONS)!r}."
)
def _register_drag_callback(
self: Self,
button: _messages.DragButton,
modifier: _messages.KeyModifier | None = None,
) -> Callable[
[Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine]],
Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine],
]:
self._validate_button(button)
normalized = _messages._normalize_key_modifier(modifier)
def decorator(
func: Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine],
) -> Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine]:
entry = _DragCallbackEntry(
callback=cast(
Callable[
[SceneNodeDragEvent[_RaycastSupportedSceneNodeHandle]],
Union[None, Coroutine],
],
func,
),
button=button,
modifier=normalized,
)
# Skip duplicate registration -- without this, the same
# callback fires twice per matching event. Equality is by
# tuple/dataclass value.
if entry not in self._impl.drag_cb:
self._impl.drag_cb.append(entry)
self._sync_drag_bindings()
return func
return decorator
@overload
def on_drag(
self: Self,
button: Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine],
) -> Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine]: ...
@overload
def on_drag(
self: Self,
button: _messages.DragButton = ...,
*,
modifier: _messages.KeyModifier | None = ...,
) -> Callable[
[Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine]],
Callable[[SceneNodeDragEvent[Self]], NoneOrCoroutine],
]: ...
def on_drag(
self: Self,
button: Union[_messages.DragButton, Callable[..., Any]] = "left",
*,
modifier: _messages.KeyModifier | None = None,
) -> Any:
"""Attach a callback for the full drag lifecycle.
Fires three times per gesture: once with
``event.phase == "start"`` at press, zero or more times with
``"update"`` (throttled pointermove), once with ``"end"`` at
release. ``end`` fires even on cancellation paths (window
blur, pointer cancel, node removed mid-drag) so per-drag
state can be released.
Usable as a bare decorator (``@handle.on_drag``, defaults to
``button="left"`` and no modifiers) or with arguments
(``@handle.on_drag("left", modifier="cmd/ctrl")``).
Args:
button: Mouse button that triggers the drag. One of
``"left" | "middle" | "right"``. Defaults to ``"left"``.
modifier: Modifier keys that must be held, as a canonically
ordered ``"+"``-separated string like ``"cmd/ctrl"``,
``"shift"``, or ``"cmd/ctrl+shift"``. ``None`` matches
"no modifiers held". Matching is exact: listed modifiers
must be held and others must not be. Left-drag on this
node intercepts the gesture -- the camera only orbits on
empty-space drags.
Note on ordering: synchronous (``def``) callbacks are submitted
to a thread pool fire-and-forget and can run out of order -- an
``"update"`` phase may begin before ``"start"`` finishes,
leaving any state set in ``"start"`` ``None`` when ``"update"``
reads it. To get strict ordering, define your callback as
``async def``; async callbacks are awaited on the event loop,
which preserves phase order so long as you don't ``await``
inside them.
"""
if callable(button):
# Bare-decorator form: @handle.on_drag -- defaults to
# button="left" and no modifiers.
return self._register_drag_callback("left", modifier)(
button # type: ignore[arg-type]
)
return self._register_drag_callback(button, modifier)
def remove_drag_callback(self, callback: Literal["all"] | Callable = "all") -> None:
"""Remove drag callbacks from the scene node.
``callback="all"`` removes every drag callback; a specific
function removes only entries whose callback identity matches.
"""
if callback == "all":
self._impl.drag_cb.clear()
else:
self._impl.drag_cb = [
entry for entry in self._impl.drag_cb if entry.callback != callback
]
self._sync_drag_bindings()
@overload
def on_click(
self: Self,
func: Callable[[SceneNodePointerEvent[Self]], NoneOrCoroutine],
) -> Callable[[SceneNodePointerEvent[Self]], NoneOrCoroutine]: ...
@overload
def on_click(
self: Self,
*,
modifier: _messages.KeyModifier | None = ...,
) -> Callable[
[Callable[[SceneNodePointerEvent[Self]], NoneOrCoroutine]],
Callable[[SceneNodePointerEvent[Self]], NoneOrCoroutine],
]: ...
def on_click(
self: Self,
func: Optional[Callable[[SceneNodePointerEvent[Self]], NoneOrCoroutine]] = None,
*,
modifier: _messages.KeyModifier | None = None,
) -> Any:
"""Attach a callback for when a scene node is clicked.
Usable as a bare decorator (``@handle.on_click``) or with a
modifier filter (``@handle.on_click(modifier="cmd/ctrl")``).
The callback can be either a standard function or an async function:
- Standard functions (def) will be executed in a threadpool.
- Async functions (async def) will be executed in the event loop.
Using async functions can be useful for reducing race conditions.
Args:
modifier: Modifier-combo filter. Default ``None`` matches
"no modifiers held". ``"cmd/ctrl"``, ``"shift"``,
``"cmd/ctrl+shift"``, etc. are exact matches (listed
modifiers held, others not). ``cmd/ctrl`` matches
whenever either Cmd or Ctrl is held.
"""
# Validate eagerly so a bad string raises at the call site,
# not when the user later applies the returned decorator.
normalized_modifier = _messages._normalize_key_modifier(modifier)
def register(callback: Callable) -> Callable:
self._impl.click_cb.append(
_ClickCallbackEntry(
callback=cast(
Callable[
[SceneNodePointerEvent[_RaycastSupportedSceneNodeHandle]],
Union[None, Coroutine],
],
callback,
),
modifier=normalized_modifier,
)
)
self._publish_click_state()
return callback
if func is None:
return register
return register(func)
def remove_click_callback(
self, callback: Literal["all"] | Callable = "all"
) -> None:
"""Remove click callbacks from scene node.
Args:
callback: Either "all" to remove all callbacks, or a specific callback function to remove.
"""
if callback == "all":
self._impl.click_cb.clear()
else:
self._impl.click_cb = [
entry for entry in self._impl.click_cb if entry.callback != callback
]
self._publish_click_state()
def _publish_click_state(self) -> None:
"""Publish ``SetSceneNodeClickBindingsMessage`` to the client
only when the bindings tuple has changed since the last
publish. Without the dedup, a no-op
``remove_click_callback("nonexistent")`` would still emit an
empty bindings tuple.
The client derives `clickable` from `bindings.length > 0`; no
separate flag is sent.
"""
bindings = tuple(
_messages.DragBinding(button="left", modifier=entry.modifier)
for entry in self._impl.click_cb
)
if self._impl._last_published_click_bindings == bindings:
return
# Queue the message BEFORE committing the cache. If
# ``queue_message`` raises, the cache stays at its previous
# value so the next state change retries the publish.
self._impl.api._websock_interface.queue_message(
_messages.SetSceneNodeClickBindingsMessage(self._impl.name, bindings)
)
self._impl._last_published_click_bindings = bindings
[docs]
class CameraFrustumHandle(
_RaycastSupportedSceneNodeHandle,
_messages.CameraFrustumProps,
):
"""Handle for camera frustums."""
_image: np.ndarray | None
_jpeg_quality: int | None
_user_format: Literal["auto", "jpeg", "png"]
@property
def image(self) -> np.ndarray | None:
"""Current content of the image. Synchronized automatically when assigned."""
return self._image
@image.setter
def image(self, image: np.ndarray | None) -> None:
from ._scene_api import _encode_image_binary
if image is None:
self._image_data = None
return
self._image = image
resolved_format, data = _encode_image_binary(
image, self._user_format, jpeg_quality=self._jpeg_quality
)
self._format = resolved_format
self._image_data = data
@property
def format(self) -> Literal["auto", "jpeg", "png"]:
"""Image format. 'auto' will use PNG for RGBA images and JPEG for RGB."""
return self._user_format
@format.setter
def format(self, value: Literal["auto", "jpeg", "png"]) -> None:
import warnings
from ._scene_api import _encode_image_binary
# Skip if format isn't changing.
if self._user_format == value:
return
self._user_format = value
# Re-encode image. if we have one.
if self._image is not None:
if value == "jpeg" and self._image.shape[2] == 4:
warnings.warn(
"Converting RGBA image to JPEG will discard the alpha channel."
)
resolved_format, data = _encode_image_binary(
self._image, value, jpeg_quality=self._jpeg_quality
)
self._format = resolved_format
self._image_data = data
[docs]
def compute_canonical_frustum_size(self) -> tuple[float, float, float]:
"""Compute the X, Y, and Z dimensions of the frustum if it had
`.scale=1.0`. These dimensions will change whenever `.fov` or `.aspect`
are changed.
To set the distance between a frustum's origin and image plane to 1, we
can run:
.. code-block:: python
frustum.scale = 1.0 / frustum.compute_canonical_frustum_size()[2]
`.scale` can be a float for uniform scaling or a 3-tuple for per-axis
scaling of the X, Y, and Z dimensions. It aims to preserve the visual
volume of the frustum regardless of the aspect ratio or FOV. This
method allows more precise computation and control of the frustum's
dimensions.
"""
# Math used in the client implementation.
y = np.tan(self.fov / 2.0)
x = y * self.aspect
z = 1.0
volume_scale = np.cbrt((x * y * z) / 3.0)
z /= volume_scale
# x and y need to be doubled, since on the client they correspond to
# NDC-style spans [-1, 1].
return x * 2.0, y * 2.0, z
[docs]
class DirectionalLightHandle(
SceneNodeHandle,
_messages.DirectionalLightProps,
):
"""Handle for directional lights."""
[docs]
class AmbientLightHandle(
SceneNodeHandle,
_messages.AmbientLightProps,
):
"""Handle for ambient lights."""
[docs]
class HemisphereLightHandle(
SceneNodeHandle,
_messages.HemisphereLightProps,
):
"""Handle for hemisphere lights."""
[docs]
class PointLightHandle(
SceneNodeHandle,
_messages.PointLightProps,
):
"""Handle for point lights."""
[docs]
class RectAreaLightHandle(
SceneNodeHandle,
_messages.RectAreaLightProps,
):
"""Handle for rectangular area lights."""
[docs]
class SpotLightHandle(
SceneNodeHandle,
_messages.SpotLightProps,
):
"""Handle for spot lights."""
[docs]
class PointCloudHandle(
SceneNodeHandle,
_messages.PointCloudProps,
):
"""Handle for point clouds. Does not support click events."""
@override
def _cast_array_dtypes(
self,
prop_hints: Dict[str, Any],
prop_name: str,
value: np.ndarray,
) -> np.ndarray:
"""Casts assigned `points` based on the current value of `precision`."""
if prop_name == "points":
return value.astype(
{"float16": np.float16, "float32": np.float32}[self.precision]
)
return super()._cast_array_dtypes(prop_hints, prop_name, value)
[docs]
class BatchedAxesHandle(
_RaycastSupportedSceneNodeHandle,
_messages.BatchedAxesProps,
):
"""Handle for batched coordinate frames."""
[docs]
class FrameHandle(
_RaycastSupportedSceneNodeHandle,
_messages.FrameProps,
):
"""Handle for coordinate frames."""
[docs]
class MeshHandle(
_RaycastSupportedSceneNodeHandle,
_messages.MeshProps,
):
"""Handle for mesh objects."""
[docs]
class BoxHandle(
_RaycastSupportedSceneNodeHandle,
_messages.BoxProps,
):
"""Handle for box objects."""
[docs]
class IcosphereHandle(
_RaycastSupportedSceneNodeHandle,
_messages.IcosphereProps,
):
"""Handle for icosphere objects."""
[docs]
class CylinderHandle(
_RaycastSupportedSceneNodeHandle,
_messages.CylinderProps,
):
"""Handle for cylinder objects."""
[docs]
class BatchedMeshHandle(
_RaycastSupportedSceneNodeHandle,
_messages.BatchedMeshesProps,
):
"""Handle for batched mesh objects."""
[docs]
class BatchedGlbHandle(
_RaycastSupportedSceneNodeHandle,
_messages.BatchedGlbProps,
):
"""Handle for batched GLB objects."""
[docs]
class GaussianSplatHandle(
_RaycastSupportedSceneNodeHandle,
_messages.GaussianSplatsProps,
):
"""Handle for Gaussian splatting objects.
**Work-in-progress.** Gaussian rendering is still under development.
Buffer layout per Gaussian (8 uint32 elements = 32 bytes):
- [0:3]: centers (3x float32)
- [3]: reserved for renderer
- [4:7]: covariance upper-triangular (6x float16)
- [7]: RGBA (4x uint8)
"""
def _ensure_buffer_size(self, num_gaussians: int) -> None:
"""Ensure the internal buffer can hold the specified number of Gaussians.
If the buffer is already the correct size, this is a no-op. Otherwise,
a new buffer is allocated with default values (white color, full opacity,
small identity-like covariances, centers at origin).
"""
if self.buffer.shape[0] == num_gaussians:
return
# Create new buffer with default values.
new_buffer = np.zeros((num_gaussians, 8), dtype=np.uint32)
# Set default RGBA to white, fully opaque (255, 255, 255, 255).
new_buffer[:, 7] = 0xFFFFFFFF
# Set default covariances to small identity-like values.
# Store as 6 float16 values: [cov00, cov01, cov02, cov11, cov12, cov22].
default_cov = np.array([0.01, 0.0, 0.0, 0.01, 0.0, 0.01], dtype=np.float16)
new_buffer[:, 4:7] = np.tile(default_cov.view(np.uint32), (num_gaussians, 1))
self.buffer = new_buffer
@property
def centers(self) -> npt.NDArray[np.float32]:
"""Centers of the Gaussians. Shape: (N, 3). Synchronized automatically when assigned."""
return self.buffer[:, 0:3].view(np.float32)
@centers.setter
def centers(self, centers: np.ndarray) -> None:
assert centers.ndim == 2 and centers.shape[1] == 3, (
f"centers must have shape (N, 3), got {centers.shape}"
)
self._ensure_buffer_size(centers.shape[0])
self.buffer[:, 0:3] = centers.astype(np.float32).view(np.uint32)
self._queue_update("buffer", self.buffer)
@property
def rgbs(self) -> npt.NDArray[np.uint8]:
"""Colors of the Gaussians. Shape: (N, 3). Values in [0, 1]. Synchronized automatically when assigned."""
rgba = self.buffer[:, 7:8].view(np.uint8).reshape(-1, 4)
return rgba[:, :3]
@rgbs.setter
def rgbs(self, rgbs: np.ndarray) -> None:
from ._assignable_props_api import colors_to_uint8
assert rgbs.ndim == 2 and rgbs.shape[1] == 3, (
f"rgbs must have shape (N, 3), got {rgbs.shape}"
)
self._ensure_buffer_size(rgbs.shape[0])
rgba = self.buffer[:, 7:8].view(np.uint8).reshape(-1, 4)
rgba[:, :3] = colors_to_uint8(rgbs)
self.buffer[:, 7:8] = rgba.view(np.uint32)
self._queue_update("buffer", self.buffer)
@property
def opacities(self) -> npt.NDArray[np.uint8]:
"""Opacities of the Gaussians. Shape: (N, 1). Values in [0, 1]. Synchronized automatically when assigned."""
buffer = self.buffer
rgba = buffer[:, 7:8].view(np.uint8).reshape(-1, 4)
return rgba[:, 3:4]
@opacities.setter
def opacities(self, opacities: np.ndarray) -> None:
from ._assignable_props_api import colors_to_uint8
assert opacities.ndim == 2 and opacities.shape[1] == 1, (
f"opacities must have shape (N, 1), got {opacities.shape}"
)
self._ensure_buffer_size(opacities.shape[0])
rgba = self.buffer[:, 7:8].view(np.uint8).reshape(-1, 4)
rgba[:, 3:4] = colors_to_uint8(opacities)
self.buffer[:, 7:8] = rgba.view(np.uint32)
self._queue_update("buffer", self.buffer)
@property
def covariances(self) -> npt.NDArray[np.float32]:
"""Covariances of the Gaussians. Shape: (N, 3, 3). Synchronized automatically when assigned."""
# Extract upper-triangular terms stored as 6 float16 values.
cov_triu_f16 = self.buffer[:, 4:7].view(np.float16).reshape(-1, 6)
cov_triu = cov_triu_f16.astype(np.float32)
# Reconstruct symmetric 3x3 matrix.
n = cov_triu.shape[0]
cov = np.zeros((n, 3, 3), dtype=np.float32)
cov[:, 0, 0] = cov_triu[:, 0]
cov[:, 0, 1] = cov_triu[:, 1]
cov[:, 0, 2] = cov_triu[:, 2]
cov[:, 1, 0] = cov_triu[:, 1] # Symmetric.
cov[:, 1, 1] = cov_triu[:, 3]
cov[:, 1, 2] = cov_triu[:, 4]
cov[:, 2, 0] = cov_triu[:, 2] # Symmetric.
cov[:, 2, 1] = cov_triu[:, 4] # Symmetric.
cov[:, 2, 2] = cov_triu[:, 5]
return cov
@covariances.setter
def covariances(self, covariances: np.ndarray) -> None:
assert covariances.ndim == 3 and covariances.shape[1:] == (3, 3), (
f"covariances must have shape (N, 3, 3), got {covariances.shape}"
)
self._ensure_buffer_size(covariances.shape[0])
# Extract upper-triangular terms: indices [0,1,2,4,5,8] from flattened 3x3.
cov_triu = covariances.reshape((-1, 9))[:, np.array([0, 1, 2, 4, 5, 8])]
cov_triu_f16 = cov_triu.astype(np.float16)
self.buffer[:, 4:7] = np.ascontiguousarray(cov_triu_f16).view(np.uint32)
self._queue_update("buffer", self.buffer)
[docs]
class MeshSkinnedHandle(
_RaycastSupportedSceneNodeHandle,
_messages.SkinnedMeshProps,
):
"""Handle for skinned mesh objects."""
def __init__(
self, impl: _SceneNodeHandleState, bones: tuple[MeshSkinnedBoneHandle, ...]
):
super().__init__(impl)
self.bones = bones
@dataclasses.dataclass
class BoneState:
name: str
websock_interface: WebsockServer | WebsockClientConnection
bone_index: int
wxyz: np.ndarray
position: np.ndarray
[docs]
@dataclasses.dataclass
class MeshSkinnedBoneHandle:
"""Handle for reading and writing the poses of bones in a skinned mesh."""
_impl: BoneState
@property
def wxyz(self) -> npt.NDArray[np.float64]:
"""Orientation of the bone. This is the quaternion representation of the R
in `p_parent = [R | t] p_local`. Synchronized to clients automatically when assigned.
"""
return self._impl.wxyz
@wxyz.setter
def wxyz(self, wxyz: tuple[float, float, float, float] | np.ndarray) -> None:
from ._scene_api import cast_vector
wxyz_cast = cast_vector(wxyz, 4)
wxyz_array = np.asarray(wxyz)
if np.allclose(wxyz_cast, self._impl.wxyz):
return
self._impl.wxyz[:] = wxyz_array
self._impl.websock_interface.queue_message(
_messages.SetBoneOrientationMessage(
self._impl.name, self._impl.bone_index, wxyz_cast
)
)
@property
def position(self) -> npt.NDArray[np.float64]:
"""Position of the bone. This is equivalent to the t in
`p_parent = [R | t] p_local`. Synchronized to clients automatically when assigned.
"""
return self._impl.position
@position.setter
def position(self, position: tuple[float, float, float] | np.ndarray) -> None:
from ._scene_api import cast_vector
position_cast = cast_vector(position, 3)
position_array = np.asarray(position)
if np.allclose(position_array, self._impl.position):
return
self._impl.position[:] = position_array
self._impl.websock_interface.queue_message(
_messages.SetBonePositionMessage(
self._impl.name, self._impl.bone_index, position_cast
)
)
[docs]
class GridHandle(
SceneNodeHandle,
_messages.GridProps,
):
"""Handle for grid objects."""
[docs]
class LineSegmentsHandle(
SceneNodeHandle,
_messages.LineSegmentsProps,
):
"""Handle for line segments objects."""
[docs]
class ArrowsHandle(
SceneNodeHandle,
_messages.ArrowProps,
):
"""Handle for arrow objects."""
[docs]
class SplineCatmullRomHandle(
SceneNodeHandle,
_messages.CatmullRomSplineProps,
):
"""Handle for Catmull-Rom splines."""
@property
@deprecated("The 'positions' property is deprecated. Use 'points' instead.")
def positions(self) -> tuple[tuple[float, float, float], ...]:
"""Get the spline positions. Deprecated: use 'points' instead.
.. deprecated:: 1.0.0
"The 'positions' tuple property is deprecated. Use the 'points' numpy array instead.",
"""
import warnings
warnings.warn(
"The 'positions' tuple property is deprecated. Use the 'points' numpy array instead.",
DeprecationWarning,
stacklevel=2,
)
return tuple(tuple(x) for x in self.points.tolist()) # type: ignore
@positions.setter
@deprecated("The 'positions' property is deprecated. Use 'points' instead.")
def positions(self, positions: tuple[tuple[float, float, float], ...]) -> None:
import warnings
warnings.warn(
"The 'positions' tuple property is deprecated. Use the 'points' numpy array instead.",
DeprecationWarning,
stacklevel=2,
)
self.points = np.asarray(positions)
[docs]
class SplineCubicBezierHandle(
SceneNodeHandle,
_messages.CubicBezierSplineProps,
):
"""Handle for cubic Bezier splines."""
@property
@deprecated(
"The 'positions' tuple property is deprecated. Use 'points' numpy array instead."
)
def positions(self) -> tuple[tuple[float, float, float], ...]:
"""Get the spline positions. Deprecated: use 'points' instead.
.. deprecated:: 1.0.0
The 'positions' tuple property is deprecated. Use the 'points' numpy array instead.
"""
return tuple(tuple(p) for p in self.points.tolist()) # type: ignore
@positions.setter
@deprecated(
"The 'positions' tuple property is deprecated. Use the 'points' numpy array instead."
)
def positions(self, positions: tuple[tuple[float, float, float], ...]) -> None:
import warnings
warnings.warn(
"The 'positions' tuple property is deprecated. Use the 'points' numpy array instead.",
DeprecationWarning,
stacklevel=2,
)
self.points = np.asarray(positions)
[docs]
class GlbHandle(
_RaycastSupportedSceneNodeHandle,
_messages.GlbProps,
):
"""Handle for GLB objects."""
[docs]
class ImageHandle(
_RaycastSupportedSceneNodeHandle,
_messages.ImageProps,
):
"""Handle for 2D images, rendered in 3D."""
_image: np.ndarray
_jpeg_quality: int | None
_user_format: Literal["auto", "jpeg", "png"]
@property
def image(self) -> np.ndarray:
"""Current content of the image. Synchronized automatically when assigned."""
assert self._image is not None
return self._image
@image.setter
def image(self, image: np.ndarray) -> None:
from ._scene_api import _encode_image_binary
self._image = image
resolved_format, data = _encode_image_binary(
image, self._user_format, jpeg_quality=self._jpeg_quality
)
self._format = resolved_format
self._data = data
@property
def format(self) -> Literal["auto", "jpeg", "png"]:
"""Image format. 'auto' will use PNG for RGBA images and JPEG for RGB."""
return self._user_format
@format.setter
def format(self, value: Literal["auto", "jpeg", "png"]) -> None:
import warnings
from ._scene_api import _encode_image_binary
# Skip if format isn't changing.
if self._user_format == value:
return
self._user_format = value
# Re-encode image.
if value == "jpeg" and self._image.shape[2] == 4:
warnings.warn(
"Converting RGBA image to JPEG will discard the alpha channel."
)
resolved_format, data = _encode_image_binary(
self._image, value, jpeg_quality=self._jpeg_quality
)
self._format = resolved_format
self._data = data
[docs]
class LabelHandle(
SceneNodeHandle,
_messages.LabelProps,
):
"""Handle for 2D label objects. Does not support click events."""
@dataclasses.dataclass
class _TransformControlsState:
last_updated: float
update_cb: list[Callable[[TransformControlsEvent], None | Coroutine]]
sync_cb: None | Callable[[ClientId, TransformControlsHandle], None] = None
def _phase_filtered_wrapper(
phase: DragPhase,
func: Callable[[TransformControlsEvent], NoneOrCoroutine],
) -> Callable[[TransformControlsEvent], None | Coroutine]:
"""Build an ``update_cb`` entry for the deprecated
``on_drag_start`` / ``on_drag_end`` methods. Tagged so
``remove_*`` can locate it by the original ``func`` identity."""
if asyncio.iscoroutinefunction(func):
async def async_wrapper(event: TransformControlsEvent) -> None:
if event.phase == phase:
await func(event) # type: ignore[misc]
async_wrapper._wraps = func # type: ignore[attr-defined]
async_wrapper._phase_filter = phase # type: ignore[attr-defined]
return async_wrapper
def sync_wrapper(event: TransformControlsEvent) -> None:
if event.phase == phase:
func(event)
sync_wrapper._wraps = func # type: ignore[attr-defined]
sync_wrapper._phase_filter = phase # type: ignore[attr-defined]
return sync_wrapper
[docs]
class Gui3dContainerHandle(
SceneNodeHandle,
_messages.Gui3DProps,
):
"""Use as a context to place GUI elements into a 3D GUI container."""
def __init__(self, impl: _SceneNodeHandleState, gui_api: GuiApi, container_id: str):
super().__init__(impl)
self._gui_api = gui_api
self._container_id = container_id
self._container_id_restore = None
self._children = {}
self._gui_api._container_handle_from_uuid[self._container_id] = self
def __enter__(self) -> Gui3dContainerHandle:
self._container_id_restore = self._gui_api._get_container_uuid()
self._gui_api._set_container_uuid(self._container_id)
return self
def __exit__(self, *args) -> None:
del args
assert self._container_id_restore is not None
self._gui_api._set_container_uuid(self._container_id_restore)
self._container_id_restore = None
[docs]
def remove(self) -> None:
"""Permanently remove this GUI container from the visualizer."""
# Call scene node remove.
super().remove()
# Clean up contained GUI elements.
for child in tuple(self._children.values()):
child.remove()
self._gui_api._container_handle_from_uuid.pop(self._container_id)