from typing import Any
import numpy as np
from standard_e2e.caching.adapters.abstract_adapter import AbstractAdapter
from standard_e2e.data_structures import LidarPointCloud, StandardFrameData
from standard_e2e.enums import LidarComponent, Modality, StandardFrameDataField
[docs]
class LidarAdapter(AbstractAdapter):
"""Lidar adapter: extracts XYZ from ``StandardFrameData.lidar`` into a
:class:`LidarPointCloud`, with optional deterministic stride subsampling.
Args:
max_points: optional cap on the number of output points. When set and
the input cloud has more points, a stride subsample is taken
(``points[::N // max_points][:max_points]``) producing at most
``max_points`` rows. No randomness is involved, so preprocessing
is fully reproducible without seeding. Otherwise the full cloud
is passed through.
"""
_COMPONENTS = [LidarComponent.X, LidarComponent.Y, LidarComponent.Z]
def __init__(self, max_points: int | None = None) -> None:
super().__init__()
if max_points is not None and max_points <= 0:
raise ValueError("max_points must be a positive integer when set")
self._max_points = max_points
@property
def name(self) -> str:
return "LidarAdapter"
@property
def consumes_attrs(self) -> set[StandardFrameDataField]:
return {StandardFrameDataField.LIDAR}
def _transform(self, standard_frame_data: StandardFrameData) -> dict[Modality, Any]:
if standard_frame_data.lidar is None:
return {}
df = standard_frame_data.lidar.points
points = df[[c.value for c in self._COMPONENTS]].to_numpy(dtype=np.float32)
if self._max_points is not None and points.shape[0] > self._max_points:
step = points.shape[0] // self._max_points
points = points[::step][: self._max_points]
return {
Modality.LIDAR_PC: LidarPointCloud(
points=points, components=self._COMPONENTS
)
}
[docs]
class LidarBEVAdapter(AbstractAdapter):
"""Top-down 2-D histogram of lidar points in vehicle frame.
Implements the Transfuser / CARLA-Garage BEV recipe: drop points above
``max_height``, optionally split into ground / obstacle channels at
``split_height``, then 2-D bin into a regular grid over
``[min_x, max_x] x [min_y, max_y]`` at ``pixels_per_meter``. Each
channel is the per-cell point count, clipped at ``count_cap`` and
divided into ``[0, 1]``.
Output (``Modality.LIDAR_BEV``): ``np.float32`` array of shape
``(C, H, W)`` where ``C = 2`` if ``use_ground_plane`` else ``1``.
Axis order matches ``np.histogramdd((x, y), ...)`` so ``H`` is the
number of x-bins and ``W`` the number of y-bins; with the ground
plane channel order is ``(below, above)``.
Args:
min_x, max_x: BEV x extent in meters (vehicle x is forward).
min_y, max_y: BEV y extent in meters (vehicle y is left).
pixels_per_meter: grid resolution; cast to ``int`` internally
(matches Transfuser).
max_height: drop points with ``z >= max_height``.
split_height: ground / obstacle threshold (``z <=`` -> ground).
use_ground_plane: include the ground channel.
count_cap: per-cell saturation; histogram clipped at this value
before being divided into ``[0, 1]``.
"""
_COMPONENTS = [LidarComponent.X, LidarComponent.Y, LidarComponent.Z]
def __init__(
self,
min_x: float = -32.0,
max_x: float = 32.0,
min_y: float = -32.0,
max_y: float = 32.0,
pixels_per_meter: float = 4.0,
max_height: float = 100.0,
split_height: float = 0.2,
use_ground_plane: bool = False,
count_cap: int = 5,
) -> None:
super().__init__()
if max_x <= min_x or max_y <= min_y:
raise ValueError("max_{x,y} must be greater than min_{x,y}")
if int(pixels_per_meter) < 1:
raise ValueError("pixels_per_meter must be >= 1 after int cast")
if count_cap <= 0:
raise ValueError("count_cap must be a positive integer")
self._min_x = min_x
self._max_x = max_x
self._min_y = min_y
self._max_y = max_y
self._pixels_per_meter = pixels_per_meter
self._max_height = max_height
self._split_height = split_height
self._use_ground_plane = use_ground_plane
self._count_cap = count_cap
@property
def name(self) -> str:
return "LidarBEVAdapter"
@property
def consumes_attrs(self) -> set[StandardFrameDataField]:
return {StandardFrameDataField.LIDAR}
@property
def output_shape(self) -> tuple[int, int, int]:
"""Shape ``(C, H, W)`` of the BEV tensor this adapter emits."""
ppm = int(self._pixels_per_meter)
h = int((self._max_x - self._min_x) * ppm)
w = int((self._max_y - self._min_y) * ppm)
c = 2 if self._use_ground_plane else 1
return (c, h, w)
def _splat(self, xy: np.ndarray) -> np.ndarray:
ppm = int(self._pixels_per_meter)
n_x_edges = int((self._max_x - self._min_x) * ppm) + 1
n_y_edges = int((self._max_y - self._min_y) * ppm) + 1
xbins = np.linspace(self._min_x, self._max_x, n_x_edges)
ybins = np.linspace(self._min_y, self._max_y, n_y_edges)
hist, _ = np.histogramdd(xy, bins=(xbins, ybins))
np.clip(hist, 0, self._count_cap, out=hist)
return (hist / self._count_cap).astype(np.float32)
def _transform(self, standard_frame_data: StandardFrameData) -> dict[Modality, Any]:
if standard_frame_data.lidar is None:
return {}
df = standard_frame_data.lidar.points
pts = df[[c.value for c in self._COMPONENTS]].to_numpy(dtype=np.float32)
pts = pts[pts[:, 2] < self._max_height]
above = self._splat(pts[pts[:, 2] > self._split_height][:, :2])
if self._use_ground_plane:
below = self._splat(pts[pts[:, 2] <= self._split_height][:, :2])
bev = np.stack([below, above])
else:
bev = above[None]
return {Modality.LIDAR_BEV: bev}