Source code for standard_e2e.caching.src_datasets.navsim.navsim_dataset_processor

"""Source-dataset processor for NAVSIM (OpenScene-v1.1).

NAVSIM ships scenes as per-log pickle files (lists of frame dicts) plus
a parallel ``sensor_blobs`` tree with the actual camera JPEGs and merged
lidar PCDs, and an HD-map archive at ``maps/<city>/<version>/map.gpkg``.
We read the dicts and PCDs directly (sidestepping NAVSIM's own data
classes), and call into ``nuplan-devkit``'s map factory for the HD map
— the maps subtree of nuplan-devkit imports cleanly without torch /
hydra / pytorch-lightning, so adding the dep is safe.

Modality coverage: cameras (8), lidar (merged sweep), HD map (vector
lanes, drivable area, intersections, stop lines, crosswalks, walkways
— see ``_navsim_map.py`` for the full translation table), 3D
detections, driving command (Intent), past/future ego trajectory via
segment-context aggregators.
"""

from __future__ import annotations

import logging
import os
import pickle
from pathlib import Path
from typing import Any, Optional, cast

import numpy as np
import pandas as pd
from PIL import Image
from scipy.spatial.transform import Rotation

from standard_e2e.caching import SourceDatasetProcessor
from standard_e2e.caching.adapters import (
    AbstractAdapter,
    CamerasIdentityAdapter,
    Detections3DIdentityAdapter,
    HDMapBEVAdapter,
    IntentIdentityAdapter,
    LidarAdapter,
)
from standard_e2e.caching.segment_context import (
    FutureDetectionsAggregator,
    FuturePastStatesFromMatricesAggregator,
    SegmentContextAggregator,
)
from standard_e2e.caching.src_datasets.navsim._navsim_map import build_navsim_hd_map
from standard_e2e.caching.src_datasets.navsim._pcd import read_navsim_pcd_xyz
from standard_e2e.data_structures import (
    CameraData,
    Detection3D,
    FrameDetections3D,
    HDMap,
    LidarData,
    StandardFrameData,
    Trajectory,
)
from standard_e2e.enums import (
    CameraDirection,
    DetectionType,
    Intent,
    LidarComponent,
    StandardFrameDataField,
)
from standard_e2e.enums import TrajectoryComponent as TC
from standard_e2e.indexing import IndexDataGenerator
from standard_e2e.utils import matrix_to_xyz_heading

# nuPlan ships maps under a fixed map-version folder name. NAVSIM's official
# install docs hardcode this version (see navsim/docs/install.md).
_NUPLAN_MAP_VERSION = "nuplan-maps-v1.0"
# ROI radius around ego for HD-map queries. Slightly larger than the default
# HDMapBEVAdapter extent (±32 m diagonal ≈ 45 m) so polygons that straddle the
# BEV boundary still rasterise correctly.
_NAVSIM_MAP_QUERY_RADIUS_M = 64.0

# NAVSIM ships 8 cameras: front-centre, three on each side (front-of-side,
# side, rear-of-side), plus rear-centre. Mapping into our 8-direction
# CameraDirection enum is exact.
_NAVSIM_CAM_TO_DIRECTION: dict[str, CameraDirection] = {
    "CAM_F0": CameraDirection.FRONT,
    "CAM_L0": CameraDirection.FRONT_LEFT,
    "CAM_R0": CameraDirection.FRONT_RIGHT,
    "CAM_L1": CameraDirection.SIDE_LEFT,
    "CAM_R1": CameraDirection.SIDE_RIGHT,
    "CAM_L2": CameraDirection.REAR_LEFT,
    "CAM_R2": CameraDirection.REAR_RIGHT,
    "CAM_B0": CameraDirection.REAR,
}

# NAVSIM detection labels. The dataset uses lowercase short strings
# (vehicle/pedestrian/bicycle/...). Anything we don't recognise lands in
# DetectionType.UNKNOWN so the framework still surfaces it.
_NAVSIM_CATEGORY_TO_DETECTION_TYPE: dict[str, DetectionType] = {
    "vehicle": DetectionType.VEHICLE,
    "pedestrian": DetectionType.PEDESTRIAN,
    "bicycle": DetectionType.BICYCLE,
    "traffic_cone": DetectionType.UNKNOWN,
    "barrier": DetectionType.UNKNOWN,
    "czone_sign": DetectionType.SIGN,
    "generic_object": DetectionType.UNKNOWN,
}

# NAVSIM driving_command is a (4,) one-hot. Index → Intent mapping is
# documented at navsim/docs/agents.md: "left, straight, right, unknown".
# Real-data distribution (~7%/90%/2%/0%) confirms this layout.
_DRIVING_COMMAND_TO_INTENT: tuple[Intent, ...] = (
    Intent.GO_LEFT,
    Intent.GO_STRAIGHT,
    Intent.GO_RIGHT,
    Intent.UNKNOWN,
)


def _quat_to_rotmat_wxyz(qw: float, qx: float, qy: float, qz: float) -> np.ndarray:
    """NAVSIM (and nuplan) store Hamilton quaternions as (qw, qx, qy, qz)."""
    rotmat = Rotation.from_quat([qx, qy, qz, qw]).as_matrix().astype(np.float32)
    return cast(np.ndarray, rotmat)


def _se3_from_rotation_translation(
    rotation: np.ndarray, translation: np.ndarray
) -> np.ndarray:
    T = np.eye(4, dtype=np.float32)
    T[:3, :3] = np.asarray(rotation, dtype=np.float32)
    T[:3, 3] = np.asarray(translation, dtype=np.float32)
    return T


def _driving_command_to_intent(driving_command: np.ndarray) -> Intent:
    """Decode the 4-element one-hot into our Intent enum.

    Off-spec inputs (multiple ones, all zeros, NaN) fall back to UNKNOWN
    rather than asserting — preprocessing should keep going through
    rare malformed frames.
    """
    arr = np.asarray(driving_command).astype(np.int64).flatten()
    if arr.shape != (4,) or arr.sum() != 1:
        return Intent.UNKNOWN
    idx = int(np.argmax(arr))
    return _DRIVING_COMMAND_TO_INTENT[idx]