Source code for standard_e2e.caching.adapters.hdmap_adapter

from typing import Any, Sequence, Union

import cv2
import numpy as np

from standard_e2e.caching.adapters.abstract_adapter import AbstractAdapter
from standard_e2e.data_structures import StandardFrameData
from standard_e2e.enums import MapElementType, Modality, StandardFrameDataField

HD_MAP_BEV_CHANNELS_AUX_KEY = "hd_map_bev_channels"

ChannelSpec = Union[MapElementType, str]


def _resolve_channels(
    channels: Sequence[ChannelSpec] | None,
) -> list[MapElementType]:
    """Coerce a YAML/string-friendly channel list to ``list[MapElementType]``.

    ``None`` → all members in declaration order. String entries must match a
    member ``value`` (e.g. ``"lane_center"``); unknown strings raise
    ``ValueError`` listing the valid options so misconfigurations fail fast at
    adapter construction rather than silently producing empty channels.
    """
    if channels is None:
        return list(MapElementType)
    out: list[MapElementType] = []
    valid_values = {t.value for t in MapElementType}
    for entry in channels:
        if isinstance(entry, MapElementType):
            out.append(entry)
        elif isinstance(entry, str):
            try:
                out.append(MapElementType(entry))
            except ValueError as exc:
                raise ValueError(
                    f"Unknown HD-map channel '{entry}'. "
                    f"Valid: {sorted(valid_values)}"
                ) from exc
        else:
            raise TypeError(
                "channels entries must be MapElementType or str, "
                f"got {type(entry).__name__}"
            )
    return out


[docs] class HDMapBEVAdapter(AbstractAdapter): """Rasterizes ``StandardFrameData.hd_map`` to a multi-channel BEV image. One channel per element type listed in ``channels`` (default: every member of :class:`MapElementType` in declaration order). Polygons are drawn filled; polylines are drawn with ``polyline_thickness``; points (single-row ``MapElement.points``) are drawn as filled circles of radius ``polyline_thickness``. Output is ``np.float32`` in ``[0, 1]``. Axis convention matches :class:`LidarBEVAdapter`: rows correspond to the vehicle x axis (forward), columns to the y axis (left). Output shape is ``(C, H, W)`` with ``C = len(channels)``, ``H = (max_x - min_x) * pixels_per_meter``, ``W = (max_y - min_y) * pixels_per_meter``. Args: min_x, max_x: BEV x extent in meters (vehicle x is forward). min_y, max_y: BEV y extent in meters (vehicle y is left). pixels_per_meter: grid resolution; cast to ``int`` internally. channels: ordered list of ``MapElementType`` (or their string ``value`` for YAML configs) determining channel composition and order. Defaults to all enum members in declaration order. The resolved list is exposed via :attr:`metadata` so downstream consumers can interpret the BEV without re-deriving the channel order. polyline_thickness: line thickness in pixels for polyline elements; also the radius of point-element circles. """ def __init__( self, min_x: float = -32.0, max_x: float = 32.0, min_y: float = -32.0, max_y: float = 32.0, pixels_per_meter: float = 4.0, channels: Sequence[ChannelSpec] | None = None, polyline_thickness: int = 1, ) -> None: super().__init__() if max_x <= min_x or max_y <= min_y: raise ValueError("max_{x,y} must be greater than min_{x,y}") if int(pixels_per_meter) < 1: raise ValueError("pixels_per_meter must be >= 1 after int cast") if polyline_thickness < 1: raise ValueError("polyline_thickness must be a positive integer") self._min_x = min_x self._max_x = max_x self._min_y = min_y self._max_y = max_y self._pixels_per_meter = pixels_per_meter self._channels = _resolve_channels(channels) self._type_to_channel = {t: i for i, t in enumerate(self._channels)} self._polyline_thickness = polyline_thickness @property def name(self) -> str: return "HDMapBEVAdapter" @property def consumes_attrs(self) -> set[StandardFrameDataField]: return {StandardFrameDataField.HD_MAP} @property def channels(self) -> list[MapElementType]: """Resolved ordered channel list (read-only view).""" return list(self._channels) @property def metadata(self) -> dict[str, Any]: """Expose the BEV channel order so the .npz remains self-describing.""" return {HD_MAP_BEV_CHANNELS_AUX_KEY: [t.value for t in self._channels]} @property def output_shape(self) -> tuple[int, int, int]: ppm = int(self._pixels_per_meter) h = int((self._max_x - self._min_x) * ppm) w = int((self._max_y - self._min_y) * ppm) return (len(self._channels), h, w) def _world_to_pixel(self, points_xy: np.ndarray) -> np.ndarray: """Map (N, 2) vehicle-frame xy to (N, 2) cv2 pixel coords (col, row). cv2 expects each point as ``(x_pixel, y_pixel)`` where ``x_pixel`` is the column index and ``y_pixel`` is the row index. We map vehicle x (forward) to row and vehicle y (left) to column to match the :class:`LidarBEVAdapter` axis convention. """ ppm = int(self._pixels_per_meter) col = ((points_xy[:, 1] - self._min_y) * ppm).astype(np.int32) row = ((points_xy[:, 0] - self._min_x) * ppm).astype(np.int32) return np.stack([col, row], axis=1) def _transform(self, standard_frame_data: StandardFrameData) -> dict[Modality, Any]: if standard_frame_data.hd_map is None: return {} c, h, w = self.output_shape canvas = np.zeros((c, h, w), dtype=np.uint8) for element in standard_frame_data.hd_map.elements: channel = self._type_to_channel.get(element.type) if channel is None: continue pts = self._world_to_pixel(element.points[:, :2]) if element.is_closed: cv2.fillPoly(canvas[channel], [pts], color=255, lineType=cv2.LINE_8) elif pts.shape[0] == 1: cv2.circle( canvas[channel], (int(pts[0, 0]), int(pts[0, 1])), radius=self._polyline_thickness, color=255, thickness=-1, ) else: cv2.polylines( canvas[channel], [pts], isClosed=False, color=255, thickness=self._polyline_thickness, lineType=cv2.LINE_8, ) return {Modality.HD_MAP_BEV: canvas.astype(np.float32) / 255.0}