Skip to content

vllm.multimodal.video

Classes:

DynamicVideoBackend

Bases: VideoBackend

Duration-aware dynamic-sampling video backend.

Samples at fps up to max_duration seconds, falling back to uniform sampling across the full duration when the video is longer than max_duration. Codec is selectable the same way as :class:VideoBackend.

Source code in vllm/multimodal/video.py
@VIDEO_LOADER_REGISTRY.register(
    "opencv_dynamic",
    video_processor="Glm4vVideoProcessor",
)
class DynamicVideoBackend(VideoBackend):
    """Duration-aware dynamic-sampling video backend.

    Samples at ``fps`` up to ``max_duration`` seconds, falling back to
    uniform sampling across the full duration when the video is longer
    than ``max_duration``. Codec is selectable the same way as
    :class:`VideoBackend`.
    """

    _sampling_suffix: ClassVar[str] = "_dynamic"

    @classmethod
    def _prepare_source(cls, source: VideoSourceMetadata) -> VideoSourceMetadata:
        # Estimate duration from frame count and fps when the container
        # does not report it (common for WebM/streaming inputs).
        if source.duration:
            return source
        if source.original_fps > 0:
            max_frame_idx = source.total_frames_num - 1
            duration = round(max_frame_idx / source.original_fps) + 1
        else:
            duration = 0
        return VideoSourceMetadata(
            source.total_frames_num, source.original_fps, duration
        )

    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ) -> list[int]:
        total_frames_num = source.total_frames_num
        duration = source.duration
        original_fps = source.original_fps
        max_duration = target.max_duration
        fps = target.fps
        max_frame_idx = source.total_frames_num - 1

        # Refer to:
        # https://github.com/huggingface/transformers/blob/v4.55.4/src/transformers/models/glm4v/video_processing_glm4v.py#L103-L140
        frame_indices_list: list[int]
        if duration <= max_duration:
            n = int(math.floor(duration * fps))
            frame_indices_list = sorted(
                {
                    min(max_frame_idx, int(math.ceil(i * original_fps / fps)))
                    for i in range(n)
                }
            )
        else:
            num_samples = int(max_duration * fps)
            if num_samples >= total_frames_num:
                frame_indices_list = list(range(total_frames_num))
            else:
                target_seconds = np.linspace(0, duration, num_samples, endpoint=True)
                frame_indices_list = sorted(
                    {
                        min(max_frame_idx, int(math.ceil(t * original_fps)))
                        for t in target_seconds
                    }
                )
        return frame_indices_list

    @classmethod
    def load_bytes(
        cls,
        data: bytes,
        num_frames: int = -1,
        fps: int = 2,
        max_duration: int = 300,
        frame_recovery: bool = False,
        *,
        backend: Literal["opencv", "pyav"] = "opencv",
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        return super().load_bytes(
            data,
            num_frames=num_frames,
            fps=fps,
            max_duration=max_duration,
            frame_recovery=frame_recovery,
            backend=backend,
            **kwargs,
        )

GLM46VVideoBackend

Bases: VideoBackend

GLM-4.6V dynamic FPS video backend.

Faithfully replicates the frame sampling logic from transformers' Glm46VVideoProcessor.sample_frames:

  • Dynamic FPS thresholds based on effective video duration: {≤30s: 3fps, ≤300s: 1fps, >300s: 0.5fps}
  • temporal_patch_size multiplier (default 2) applied to extract count
  • Duration capped at 2400s, frame count capped at 640
  • Even frame count enforced (append last frame if odd)
Source code in vllm/multimodal/video.py
@VIDEO_LOADER_REGISTRY.register(
    "glm46v",
    video_processor="Glm46VVideoProcessor",
)
class GLM46VVideoBackend(VideoBackend):
    """GLM-4.6V dynamic FPS video backend.

    Faithfully replicates the frame sampling logic from transformers'
    ``Glm46VVideoProcessor.sample_frames``:

    - Dynamic FPS thresholds based on effective video duration:
      ``{≤30s: 3fps, ≤300s: 1fps, >300s: 0.5fps}``
    - ``temporal_patch_size`` multiplier (default 2) applied to extract count
    - Duration capped at 2400s, frame count capped at 640
    - Even frame count enforced (append last frame if odd)
    """

    # Match transformers defaults
    _DYNAMIC_FPS_THRESHOLDS: ClassVar[dict[int, float]] = {
        30: 3.0,
        300: 1.0,
        2400: 0.5,
    }
    _MAX_FRAME_COUNT_DYNAMIC: ClassVar[int] = 640
    _MAX_DURATION: ClassVar[int] = 2400

    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ) -> list[int]:
        # Refer to:
        # https://github.com/huggingface/transformers/blob/v5.9.0/src/transformers/models/glm46v/video_processing_glm46v.py#L97-L102
        total_frames_num = source.total_frames_num
        original_fps = source.original_fps
        duration = source.duration
        temporal_patch_size = kwargs.get("temporal_patch_size", 2)

        max_frame_idx = total_frames_num - 1

        # Estimate duration from frame count and fps when not reported
        if not duration and original_fps > 0:
            duration = round(max_frame_idx / original_fps) + 1

        effective_duration = min(duration, cls._MAX_DURATION)

        # Select target_fps from dynamic thresholds
        if effective_duration <= 30:
            target_fps = cls._DYNAMIC_FPS_THRESHOLDS[30]
        elif effective_duration <= 300:
            target_fps = cls._DYNAMIC_FPS_THRESHOLDS[300]
        else:
            target_fps = cls._DYNAMIC_FPS_THRESHOLDS[2400]

        extract_t = int(effective_duration * target_fps * temporal_patch_size)
        extract_t = min(extract_t, cls._MAX_FRAME_COUNT_DYNAMIC)

        duration_per_frame = 1 / original_fps if original_fps > 0 else 0
        timestamps = [i * duration_per_frame for i in range(total_frames_num)]
        max_second = int(duration) if duration else 0

        if total_frames_num < extract_t:
            frame_indices = np.linspace(
                0, total_frames_num - 1, extract_t, dtype=int
            ).tolist()
        else:
            frame_indices = []
            current_second = 0.0
            inv_fps = 1 / (temporal_patch_size * target_fps)
            for frame_index in range(total_frames_num):
                if timestamps[frame_index] >= current_second:
                    current_second += inv_fps
                    frame_indices.append(frame_index)
                    if current_second >= max_second:
                        break

        if len(frame_indices) < extract_t:
            if len(frame_indices) == 0:
                start, end = 0, max(total_frames_num - 1, 0)
            else:
                start, end = frame_indices[0], frame_indices[-1]
            frame_indices = np.linspace(start, end, extract_t, dtype=int).tolist()
        elif len(frame_indices) > extract_t:
            frame_indices = np.linspace(
                0, total_frames_num - 1, extract_t, dtype=int
            ).tolist()

        # Deduplicate
        seen: set[int] = set()
        uniq: list[int] = []
        for idx in frame_indices:
            if idx not in seen:
                seen.add(idx)
                uniq.append(idx)

        # Ensure even frame count
        if len(uniq) & 1:
            uniq.append(uniq[-1])

        return uniq

    @classmethod
    def load_bytes(
        cls,
        data: bytes,
        num_frames: int = -1,
        fps: int = -1,
        max_duration: int = 300,
        frame_recovery: bool = False,
        *,
        backend: Literal["opencv", "pyav"] = "opencv",
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        return super().load_bytes(
            data,
            num_frames=num_frames,
            fps=fps,
            max_duration=max_duration,
            frame_recovery=frame_recovery,
            backend=backend,
            **kwargs,
        )

Molmo2VideoBackend

Bases: VideoLoader, OpenCVVideoBackendMixin

Methods:

Source code in vllm/multimodal/video.py
@VIDEO_LOADER_REGISTRY.register(
    "molmo2",
    video_processor="Molmo2VideoProcessor",
)
class Molmo2VideoBackend(VideoLoader, OpenCVVideoBackendMixin):
    @classmethod
    def get_candidate_target_fps(
        cls,
        video_fps: float,
        sampling_fps: float,
        max_fps: float = 8.0,
    ) -> list[float]:
        """
        Return the subset of `video_fps` factors that remain multiples
        of `sampling_fps`.

        Examples:
            >>> get_candidate_target_fps(video_fps=6, sampling_fps=2)
            [2, 6]
            >>> get_candidate_target_fps(video_fps=5, sampling_fps=1)
            [1, 5]
            >>> get_candidate_target_fps(video_fps=2, sampling_fps=2)
            [2]
            >>> get_candidate_target_fps(video_fps=5, sampling_fps=2)
            Traceback (most recent call last):
                ...
            ValueError: sampling_fps=2 must divide video_fps=5 to produce
                consistent frame steps.
        """
        video_fps = int(video_fps)
        sampling_fps = int(sampling_fps)
        max_fps = int(max_fps)

        if sampling_fps is None:
            raise ValueError("sampling_fps must be provided")
        if video_fps <= 0 or sampling_fps <= 0:
            raise ValueError(
                "video_fps and sampling_fps must be positive "
                f"(got {video_fps}, {sampling_fps})"
            )
        if video_fps % sampling_fps != 0:
            raise ValueError(
                f"sampling_fps={sampling_fps} must divide video_fps={video_fps}."
            )

        candidates = []
        for candidate in range(sampling_fps, video_fps + 1, sampling_fps):
            if candidate > max_fps:
                break
            if video_fps % candidate == 0:
                candidates.append(float(candidate))

        return candidates

    @classmethod
    def get_target_fps(
        cls,
        video_fps: float,
        max_frames: int,
        total_frames: int,
        frame_sample_mode: str,
        candidate_target_fps: list[float],
    ) -> float | None:
        """
        Get the target fps that best spans the videoand has the most frames sampled
        """
        num_frames_sampled = 0
        selected_target_fps = None
        for target_fps in candidate_target_fps:
            step_size = max(int(video_fps / target_fps), 1)
            num_frames_sampled_at_fps = int(total_frames / step_size)
            if num_frames_sampled == 0:
                if (
                    "uniform" in frame_sample_mode
                    and num_frames_sampled_at_fps > max_frames
                ):
                    break
                selected_target_fps = target_fps
                num_frames_sampled = num_frames_sampled_at_fps

            else:
                # the candidate sampling fps increases so frame count can't decrease
                assert num_frames_sampled <= num_frames_sampled_at_fps
                if num_frames_sampled_at_fps > max_frames:
                    # choose the sampling fps that spans the video
                    continue

                elif num_frames_sampled_at_fps > num_frames_sampled:
                    # both are less than max_frames; choose the one with higher
                    # density of frames sampled
                    selected_target_fps = target_fps
                    num_frames_sampled = num_frames_sampled_at_fps
        return selected_target_fps

    @classmethod
    def get_frame_times_and_chosen_fps(
        cls,
        selected_target_fps: float | None,
        total_frames: int,
        max_frames: int,
        video_fps: float,
    ) -> tuple[float | None, npt.NDArray]:
        if selected_target_fps is None:
            frame_indices = np.linspace(
                0, total_frames, max_frames, endpoint=False, dtype=int
            )
        else:
            step_size = max(int(video_fps / selected_target_fps), 1)
            frame_indices = np.arange(0, total_frames, step_size)
        if len(frame_indices) > max_frames:
            frame_indices = frame_indices[:max_frames]
        return selected_target_fps, frame_indices

    @classmethod
    def sample_times(
        cls,
        duration: float,
        max_frames: int,
        frame_sample_mode: str,
        max_fps: int | None,
        candidate_target_fps: list[float] | None = None,
        **kwargs,
    ) -> npt.NDArray:
        if frame_sample_mode == "fps":
            assert candidate_target_fps is not None
            # Try larger and larger FPSs until we hit one that can't span the video
            sampling_fps = candidate_target_fps[0]
            for candidate_fps in candidate_target_fps[1:]:
                if max_frames / candidate_fps < duration:
                    break
                sampling_fps = candidate_fps
            times = np.arange(0, max_frames) / sampling_fps
            times = times[times < duration]
            return times
        elif frame_sample_mode == "uniform_last_frame":
            if max_fps is not None:
                max_duration = (
                    max_frames - 1
                ) / max_fps  # -1 to include the last frame
                if max_duration < duration:
                    times = np.linspace(
                        0, duration, num=max_frames, endpoint=True, dtype=np.float64
                    )
                else:
                    times = np.arange(0.0, stop=duration, step=1 / max_fps)
                    times = np.concatenate([times, [duration]], axis=0)
                    assert len(times) <= max_frames
            else:
                times = np.linspace(
                    0, duration, num=max_frames, endpoint=True, dtype=np.float64
                )
            return times
        else:
            raise NotImplementedError(frame_sample_mode)

    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ):
        max_fps = kwargs.get("max_fps")
        frame_sample_mode = kwargs.get("frame_sample_mode")
        if frame_sample_mode is None:
            return list(range(0, source.total_frames_num))

        if frame_sample_mode not in {"uniform_last_frame", "fps"}:
            raise NotImplementedError(
                f"Unsupported frame_sample_mode: {frame_sample_mode}"
            )

        duration = source.duration
        video_fps = source.original_fps
        total_num_frames = source.total_frames_num
        num_frames = target.num_frames
        sampling_fps = target.fps

        if frame_sample_mode == "uniform_last_frame" and max_fps is not None:
            if total_num_frames <= 2:
                indices = np.arange(total_num_frames).astype(int)
            elif duration > (num_frames - 1) / max_fps:  # -1 to include the last frame
                # uniform fallback
                indices = np.linspace(
                    0,
                    total_num_frames - 1,
                    num=min(num_frames, total_num_frames),
                    endpoint=True,
                ).astype(int)
            else:
                float_indices = np.arange(
                    0.0,
                    stop=total_num_frames - 1,
                    step=float(video_fps / max_fps),
                )
                if np.round(float_indices[-1]) != total_num_frames - 1:
                    float_indices = np.concatenate(
                        [float_indices, [total_num_frames - 1]], axis=0
                    )
                indices = np.round(float_indices).astype(int)
                assert indices[-1] < total_num_frames
                assert len(float_indices) <= num_frames
        elif frame_sample_mode == "uniform_last_frame":
            indices = np.linspace(
                0,
                total_num_frames - 1,
                num=min(num_frames, total_num_frames),
                endpoint=True,
            ).astype(int)
        elif frame_sample_mode == "fps":
            candidate_target_fps = cls.get_candidate_target_fps(video_fps, sampling_fps)
            selected_target_fps = cls.get_target_fps(
                video_fps,
                num_frames,
                total_num_frames,
                frame_sample_mode,
                candidate_target_fps,
            )
            _, indices = cls.get_frame_times_and_chosen_fps(
                selected_target_fps,
                total_num_frames,
                num_frames,
                video_fps,
            )
        return indices.tolist()

    @classmethod
    def load_bytes_opencv(
        cls,
        data: bytes,
        frame_sample_mode: str | None = None,
        num_frames: int = -1,
        max_fps: int = 2,
        sampling_fps: int = 2,
        frame_recovery: bool = False,
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        cap = cls.open_video_capture(data)

        source = OpenCVVideoBackendMixin.get_video_metadata(cap)
        target = VideoTargetMetadata(
            num_frames=num_frames,
            fps=sampling_fps,
            max_duration=source.duration,
        )

        frame_idx = cls.compute_frames_index_to_sample(
            source=source,
            target=target,
            frame_sample_mode=frame_sample_mode,
            max_fps=max_fps,
        )

        frames, valid_frame_indices = cls.read_frames(
            cap,
            frame_idx,
            total_frames_num=source.total_frames_num,
            frame_recovery=frame_recovery,
        )

        metadata = cls.create_hf_metadata(
            source=source,
            video_backend="opencv",
            valid_frame_indices=valid_frame_indices,
        )

        return frames, metadata

    @classmethod
    def load_bytes(
        cls,
        data: bytes,
        num_frames: int = -1,
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        frame_sample_mode = cast(str | None, kwargs.pop("frame_sample_mode", None))
        max_fps = cast(int, kwargs.pop("max_fps", 2))
        sampling_fps = cast(int, kwargs.pop("sampling_fps", 2))
        out = cls.load_bytes_opencv(
            data,
            frame_sample_mode,
            num_frames,
            max_fps,
            sampling_fps,
            **kwargs,
        )
        return out

get_candidate_target_fps(video_fps, sampling_fps, max_fps=8.0) classmethod

Return the subset of video_fps factors that remain multiples of sampling_fps.

Examples:

>>> get_candidate_target_fps(video_fps=6, sampling_fps=2)
[2, 6]
>>> get_candidate_target_fps(video_fps=5, sampling_fps=1)
[1, 5]
>>> get_candidate_target_fps(video_fps=2, sampling_fps=2)
[2]
>>> get_candidate_target_fps(video_fps=5, sampling_fps=2)
Traceback (most recent call last):
    ...
ValueError: sampling_fps=2 must divide video_fps=5 to produce
    consistent frame steps.
Source code in vllm/multimodal/video.py
@classmethod
def get_candidate_target_fps(
    cls,
    video_fps: float,
    sampling_fps: float,
    max_fps: float = 8.0,
) -> list[float]:
    """
    Return the subset of `video_fps` factors that remain multiples
    of `sampling_fps`.

    Examples:
        >>> get_candidate_target_fps(video_fps=6, sampling_fps=2)
        [2, 6]
        >>> get_candidate_target_fps(video_fps=5, sampling_fps=1)
        [1, 5]
        >>> get_candidate_target_fps(video_fps=2, sampling_fps=2)
        [2]
        >>> get_candidate_target_fps(video_fps=5, sampling_fps=2)
        Traceback (most recent call last):
            ...
        ValueError: sampling_fps=2 must divide video_fps=5 to produce
            consistent frame steps.
    """
    video_fps = int(video_fps)
    sampling_fps = int(sampling_fps)
    max_fps = int(max_fps)

    if sampling_fps is None:
        raise ValueError("sampling_fps must be provided")
    if video_fps <= 0 or sampling_fps <= 0:
        raise ValueError(
            "video_fps and sampling_fps must be positive "
            f"(got {video_fps}, {sampling_fps})"
        )
    if video_fps % sampling_fps != 0:
        raise ValueError(
            f"sampling_fps={sampling_fps} must divide video_fps={video_fps}."
        )

    candidates = []
    for candidate in range(sampling_fps, video_fps + 1, sampling_fps):
        if candidate > max_fps:
            break
        if video_fps % candidate == 0:
            candidates.append(float(candidate))

    return candidates

get_target_fps(video_fps, max_frames, total_frames, frame_sample_mode, candidate_target_fps) classmethod

Get the target fps that best spans the videoand has the most frames sampled

Source code in vllm/multimodal/video.py
@classmethod
def get_target_fps(
    cls,
    video_fps: float,
    max_frames: int,
    total_frames: int,
    frame_sample_mode: str,
    candidate_target_fps: list[float],
) -> float | None:
    """
    Get the target fps that best spans the videoand has the most frames sampled
    """
    num_frames_sampled = 0
    selected_target_fps = None
    for target_fps in candidate_target_fps:
        step_size = max(int(video_fps / target_fps), 1)
        num_frames_sampled_at_fps = int(total_frames / step_size)
        if num_frames_sampled == 0:
            if (
                "uniform" in frame_sample_mode
                and num_frames_sampled_at_fps > max_frames
            ):
                break
            selected_target_fps = target_fps
            num_frames_sampled = num_frames_sampled_at_fps

        else:
            # the candidate sampling fps increases so frame count can't decrease
            assert num_frames_sampled <= num_frames_sampled_at_fps
            if num_frames_sampled_at_fps > max_frames:
                # choose the sampling fps that spans the video
                continue

            elif num_frames_sampled_at_fps > num_frames_sampled:
                # both are less than max_frames; choose the one with higher
                # density of frames sampled
                selected_target_fps = target_fps
                num_frames_sampled = num_frames_sampled_at_fps
    return selected_target_fps

OpenCVDynamicOpenPanguVideoBackend

Bases: VideoLoader, OpenCVVideoBackendMixin

Methods:

  • load_bytes

    Load video frames with dynamic sampling based on duration.

Source code in vllm/multimodal/video.py
@VIDEO_LOADER_REGISTRY.register("openpangu")
class OpenCVDynamicOpenPanguVideoBackend(VideoLoader, OpenCVVideoBackendMixin):
    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ) -> list[int]:
        total_frames_num = source.total_frames_num
        original_fps = source.original_fps
        num_frames = target.num_frames
        fps = target.fps

        # The timestamp of the rightmost frame, cannot be used to calculate frame 0.
        if total_frames_num >= 1 and original_fps > 0:
            total_duration = (total_frames_num - 1) / original_fps
        else:
            total_duration = 0

        # `fps` is the FPS parameter passed in for sampling,
        # -1 indicates that sampling can be performed directly without FPS limitation.
        if fps > 0:
            # Num_frames is the maximum number of frames to sample.
            # If fewer frames are sampled at this sample_fps, the update duration will be longer. # noqa: E501
            if num_frames >= int(total_duration * fps) + 1:
                num_frames = int(total_duration * fps) + 1
                # Under the new maximum frame rate, the video duration of the rightmost frame, # noqa: E501
                # cannot be calculated for frame 0.
                total_duration = min(total_duration, (num_frames - 1) / fps)
        elif fps != -1:
            raise ValueError(
                f"requires dataset fps is -1 or greater than 0 but got {fps}"
            )

        sample_frame_timestamps = np.linspace(
            0, total_duration, num_frames, dtype=float
        )
        frames_indices = [
            min(total_frames_num - 1, round(t * original_fps))
            for t in sample_frame_timestamps
        ]
        return frames_indices

    @classmethod
    def load_bytes(
        cls,
        data: bytes,
        num_frames: int = -1,
        fps: int = 2,
        max_duration: int = 300,
        frame_recovery: bool = False,
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        """
        Load video frames with dynamic sampling based on duration.

        Args:
            data: Raw video bytes
            num_frames: Not used in dynamic backend
            fps: Target FPS for sampling (default: 2)
            max_duration: Maximum video duration to process (default: 300s)
            frame_recovery: Enable forward-scan recovery for failed frames

        Returns:
            Tuple of (frames_array, metadata_dict)
        """
        cap = cls.open_video_capture(data)

        source = OpenCVVideoBackendMixin.get_video_metadata(cap)

        # recompute source metadata with adjusted duration to ensure correct
        # sampling indices computation
        target = VideoTargetMetadata(
            num_frames=num_frames,
            fps=fps,
            max_duration=max_duration,
        )

        frame_indices_list = cls.compute_frames_index_to_sample(
            source=source,
            target=target,
        )

        frames, valid_frame_indices = cls.read_frames(
            cap,
            frame_indices_list,
            total_frames_num=source.total_frames_num,
            frame_recovery=frame_recovery,
        )

        # Use transformers.video_utils.VideoMetadata format
        metadata = cls.create_hf_metadata(
            source=source,
            video_backend="opencv_dynamic",
            valid_frame_indices=valid_frame_indices,
        )
        return frames, metadata

load_bytes(data, num_frames=-1, fps=2, max_duration=300, frame_recovery=False, **kwargs) classmethod

Load video frames with dynamic sampling based on duration.

Parameters:

  • data

    (bytes) –

    Raw video bytes

  • num_frames

    (int, default: -1 ) –

    Not used in dynamic backend

  • fps

    (int, default: 2 ) –

    Target FPS for sampling (default: 2)

  • max_duration

    (int, default: 300 ) –

    Maximum video duration to process (default: 300s)

  • frame_recovery

    (bool, default: False ) –

    Enable forward-scan recovery for failed frames

Returns:

Source code in vllm/multimodal/video.py
@classmethod
def load_bytes(
    cls,
    data: bytes,
    num_frames: int = -1,
    fps: int = 2,
    max_duration: int = 300,
    frame_recovery: bool = False,
    **kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
    """
    Load video frames with dynamic sampling based on duration.

    Args:
        data: Raw video bytes
        num_frames: Not used in dynamic backend
        fps: Target FPS for sampling (default: 2)
        max_duration: Maximum video duration to process (default: 300s)
        frame_recovery: Enable forward-scan recovery for failed frames

    Returns:
        Tuple of (frames_array, metadata_dict)
    """
    cap = cls.open_video_capture(data)

    source = OpenCVVideoBackendMixin.get_video_metadata(cap)

    # recompute source metadata with adjusted duration to ensure correct
    # sampling indices computation
    target = VideoTargetMetadata(
        num_frames=num_frames,
        fps=fps,
        max_duration=max_duration,
    )

    frame_indices_list = cls.compute_frames_index_to_sample(
        source=source,
        target=target,
    )

    frames, valid_frame_indices = cls.read_frames(
        cap,
        frame_indices_list,
        total_frames_num=source.total_frames_num,
        frame_recovery=frame_recovery,
    )

    # Use transformers.video_utils.VideoMetadata format
    metadata = cls.create_hf_metadata(
        source=source,
        video_backend="opencv_dynamic",
        valid_frame_indices=valid_frame_indices,
    )
    return frames, metadata

OpenCVVideoBackendMixin

Source code in vllm/multimodal/video.py
class OpenCVVideoBackendMixin:
    @staticmethod
    def get_cv2_video_api():
        api_pref = None
        for backend in vr.getStreamBufferedBackends():
            if not vr.hasBackend(backend):
                continue
            if not vr.isBackendBuiltIn(backend):
                _, abi, api = vr.getStreamBufferedBackendPluginVersion(backend)
                if abi < 1 or (abi == 1 and api < 2):
                    continue
            api_pref = backend
            break
        return api_pref

    @classmethod
    def open_video_capture(cls, data: bytes) -> "cv2.VideoCapture":
        backend = cls.get_cv2_video_api()
        cap = cv2.VideoCapture(BytesIO(data), backend, [])
        if not cap.isOpened():
            raise ValueError("Could not open video stream")
        return cap

    @staticmethod
    def get_video_metadata(cap: "cv2.VideoCapture") -> VideoSourceMetadata:
        total_frames_num = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
        original_fps = cap.get(cv2.CAP_PROP_FPS)
        duration = total_frames_num / original_fps if original_fps > 0 else 0
        return VideoSourceMetadata(
            total_frames_num=total_frames_num,
            original_fps=original_fps,
            duration=duration,
        )

    @classmethod
    def _can_use_for_recovery(
        cls,
        idx: int,
        failed_frames: list[int],
        next_target_map: dict[int, int],
        total_frames: int,
    ) -> bool:
        """Check if current frame can recover the oldest failed frame."""
        if not failed_frames:
            return False
        oldest_failed = failed_frames[0]
        limit = next_target_map.get(oldest_failed, total_frames)
        return idx < limit

    @classmethod
    def _read_frames_with_recovery(
        cls,
        cap: "cv2.VideoCapture",
        frame_indices: list[int],
        total_frames: int,
    ) -> tuple[npt.NDArray, list[int], dict[int, int]]:
        """
        Read frames with dynamic window forward-scan recovery.

        When a target frame fails to load, the next successfully grabbed
        frame (before the next target frame) will be used to recover it.

        Args:
            cap: OpenCV VideoCapture object
            frame_indices: Sorted list of target frame indices to load
            total_frames: Total number of frames in the video

        Returns:
            Tuple of (frames_array, valid_frame_indices, recovered_map)
            - frames_array: Array of loaded frames
            - valid_frame_indices: List of frame indices that were loaded
            - recovered_map: Dict mapping recovered_idx -> source_idx
        """
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

        assert width > 0 and height > 0, (
            f"Invalid video frame size: width={width}, height={height}"
        )

        frame_idx_set = set(frame_indices)
        max_frame_idx = frame_indices[-1] if frame_indices else 0

        # Build map: target_idx -> next_target_idx (for recovery window)
        next_target_map: dict[int, int] = {}
        for k in range(len(frame_indices) - 1):
            next_target_map[frame_indices[k]] = frame_indices[k + 1]
        next_target_map[frame_indices[-1]] = total_frames

        frames_list: list[npt.NDArray] = []
        valid_frame_indices: list[int] = []
        failed_frames_idx: list[int] = []
        recovered_map: dict[int, int] = {}

        i = 0
        for idx in range(max_frame_idx + 1):
            is_target_frame = idx in frame_idx_set

            # Attempt to grab the current frame
            ok = cap.grab()

            if not ok:
                if is_target_frame:
                    logger.warning(
                        "Failed to grab frame %d during video loading.",
                        idx,
                    )
                    failed_frames_idx.append(idx)
                continue

            # Check if we should retrieve: target frame OR can recover a failed one
            can_recover = cls._can_use_for_recovery(
                idx, failed_frames_idx, next_target_map, total_frames
            )

            if is_target_frame or can_recover:
                ret, frame = cap.retrieve()

                if ret and frame is not None and frame.size > 0:
                    rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    frames_list.append(rgb_frame)
                    valid_frame_indices.append(idx)
                    i += 1

                    if can_recover:
                        recovered_idx = failed_frames_idx.pop(0)
                        recovered_map[recovered_idx] = idx
                        logger.info(
                            "Recovered frame %d using frame %d (delay: %d)",
                            recovered_idx,
                            idx,
                            idx - recovered_idx,
                        )
                elif is_target_frame:
                    logger.warning(
                        "Failed to retrieve frame %d during video loading.",
                        idx,
                    )
                    failed_frames_idx.append(idx)

        # Log any remaining failed frames
        for failed_idx in failed_frames_idx:
            logger.warning(
                "Frame %d could not be recovered (end of video).",
                failed_idx,
            )

        # Stack frames
        if frames_list:
            frames = np.stack(frames_list)
        else:
            frames = np.empty((0, height, width, 3), dtype=np.uint8)

        return frames, valid_frame_indices, recovered_map

    @classmethod
    def _read_frames_no_recovery(
        cls,
        cap,
        frame_indices: set[int],
        max_frame_idx: int,
    ) -> tuple[npt.NDArray, list[int]]:
        num_expected_frames = len(frame_indices)
        width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
        height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
        frames = np.empty((num_expected_frames, height, width, 3), dtype=np.uint8)

        i = 0
        valid_frame_indices = []
        for idx in range(max_frame_idx + 1):
            ok = cap.grab()
            if not ok:
                # Frame is broken/unreadable, log warning
                if idx in frame_indices:
                    logger.warning(
                        "Failed to grab frame %d during video loading. "
                        "This frame will be skipped.",
                        idx,
                    )
                continue
            if idx in frame_indices:
                ret, frame = cap.retrieve()
                if ret:
                    frames[i] = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                    valid_frame_indices.append(idx)
                    i += 1
                else:
                    # retrieve() failed even though grab() succeeded
                    logger.warning(
                        "Failed to retrieve frame %d during video loading. "
                        "This frame will be skipped.",
                        idx,
                    )

        valid_num_frames = len(valid_frame_indices)
        if valid_num_frames < num_expected_frames:
            logger.warning(
                "Video loading completed with %d broken/unreadable frames. "
                "Expected %d frames but only loaded %d frames.",
                num_expected_frames - valid_num_frames,
                num_expected_frames,
                valid_num_frames,
            )

        return frames[:valid_num_frames], valid_frame_indices

    @classmethod
    def read_frames(
        cls,
        cap: "cv2.VideoCapture",
        frame_idx: list[int],
        total_frames_num: int,
        *,
        frame_recovery: bool = False,
    ) -> tuple[npt.NDArray, list[int]]:
        if frame_recovery:
            num_frames_to_sample = len(frame_idx)
            frames, valid_frame_indices, recovered_map = cls._read_frames_with_recovery(
                cap, frame_idx, total_frames_num
            )

            if recovered_map:
                logger.info(
                    "Frame recovery: %d frames recovered using forward scan.",
                    len(recovered_map),
                )
        else:
            frame_idx_set = set(frame_idx)
            num_frames_to_sample = len(frame_idx_set)
            frames, valid_frame_indices = cls._read_frames_no_recovery(
                cap, frame_idx_set, max(frame_idx)
            )
        valid_num_frames = len(valid_frame_indices)
        if valid_num_frames < num_frames_to_sample:
            logger.warning(
                "Video loading completed with %d broken/unreadable frames. "
                "Expected to sample %d frames but only loaded %d frames.",
                num_frames_to_sample - valid_num_frames,
                num_frames_to_sample,
                valid_num_frames,
            )
        return frames, valid_frame_indices

_can_use_for_recovery(idx, failed_frames, next_target_map, total_frames) classmethod

Check if current frame can recover the oldest failed frame.

Source code in vllm/multimodal/video.py
@classmethod
def _can_use_for_recovery(
    cls,
    idx: int,
    failed_frames: list[int],
    next_target_map: dict[int, int],
    total_frames: int,
) -> bool:
    """Check if current frame can recover the oldest failed frame."""
    if not failed_frames:
        return False
    oldest_failed = failed_frames[0]
    limit = next_target_map.get(oldest_failed, total_frames)
    return idx < limit

_read_frames_with_recovery(cap, frame_indices, total_frames) classmethod

Read frames with dynamic window forward-scan recovery.

When a target frame fails to load, the next successfully grabbed frame (before the next target frame) will be used to recover it.

Parameters:

  • cap

    (VideoCapture) –

    OpenCV VideoCapture object

  • frame_indices

    (list[int]) –

    Sorted list of target frame indices to load

  • total_frames

    (int) –

    Total number of frames in the video

Returns:

  • NDArray

    Tuple of (frames_array, valid_frame_indices, recovered_map)

  • list[int]
    • frames_array: Array of loaded frames
  • dict[int, int]
    • valid_frame_indices: List of frame indices that were loaded
  • tuple[NDArray, list[int], dict[int, int]]
    • recovered_map: Dict mapping recovered_idx -> source_idx
Source code in vllm/multimodal/video.py
@classmethod
def _read_frames_with_recovery(
    cls,
    cap: "cv2.VideoCapture",
    frame_indices: list[int],
    total_frames: int,
) -> tuple[npt.NDArray, list[int], dict[int, int]]:
    """
    Read frames with dynamic window forward-scan recovery.

    When a target frame fails to load, the next successfully grabbed
    frame (before the next target frame) will be used to recover it.

    Args:
        cap: OpenCV VideoCapture object
        frame_indices: Sorted list of target frame indices to load
        total_frames: Total number of frames in the video

    Returns:
        Tuple of (frames_array, valid_frame_indices, recovered_map)
        - frames_array: Array of loaded frames
        - valid_frame_indices: List of frame indices that were loaded
        - recovered_map: Dict mapping recovered_idx -> source_idx
    """
    width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
    height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))

    assert width > 0 and height > 0, (
        f"Invalid video frame size: width={width}, height={height}"
    )

    frame_idx_set = set(frame_indices)
    max_frame_idx = frame_indices[-1] if frame_indices else 0

    # Build map: target_idx -> next_target_idx (for recovery window)
    next_target_map: dict[int, int] = {}
    for k in range(len(frame_indices) - 1):
        next_target_map[frame_indices[k]] = frame_indices[k + 1]
    next_target_map[frame_indices[-1]] = total_frames

    frames_list: list[npt.NDArray] = []
    valid_frame_indices: list[int] = []
    failed_frames_idx: list[int] = []
    recovered_map: dict[int, int] = {}

    i = 0
    for idx in range(max_frame_idx + 1):
        is_target_frame = idx in frame_idx_set

        # Attempt to grab the current frame
        ok = cap.grab()

        if not ok:
            if is_target_frame:
                logger.warning(
                    "Failed to grab frame %d during video loading.",
                    idx,
                )
                failed_frames_idx.append(idx)
            continue

        # Check if we should retrieve: target frame OR can recover a failed one
        can_recover = cls._can_use_for_recovery(
            idx, failed_frames_idx, next_target_map, total_frames
        )

        if is_target_frame or can_recover:
            ret, frame = cap.retrieve()

            if ret and frame is not None and frame.size > 0:
                rgb_frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
                frames_list.append(rgb_frame)
                valid_frame_indices.append(idx)
                i += 1

                if can_recover:
                    recovered_idx = failed_frames_idx.pop(0)
                    recovered_map[recovered_idx] = idx
                    logger.info(
                        "Recovered frame %d using frame %d (delay: %d)",
                        recovered_idx,
                        idx,
                        idx - recovered_idx,
                    )
            elif is_target_frame:
                logger.warning(
                    "Failed to retrieve frame %d during video loading.",
                    idx,
                )
                failed_frames_idx.append(idx)

    # Log any remaining failed frames
    for failed_idx in failed_frames_idx:
        logger.warning(
            "Frame %d could not be recovered (end of video).",
            failed_idx,
        )

    # Stack frames
    if frames_list:
        frames = np.stack(frames_list)
    else:
        frames = np.empty((0, height, width, 3), dtype=np.uint8)

    return frames, valid_frame_indices, recovered_map

PyAVVideoBackendMixin

PyAV (in-process FFmpeg bindings) codec utilities.

Reads stream metadata and decodes target frames via per-frame container.seek(). The seek releases the GIL between frames and scales with the number of sampled frames rather than the video length, enabling concurrent decoding under serving load.

Methods:

  • decode_frames

    Decode target frames via per-frame seek + forward decode to PTS.

Source code in vllm/multimodal/video.py
class PyAVVideoBackendMixin:
    """PyAV (in-process FFmpeg bindings) codec utilities.

    Reads stream metadata and decodes target frames via per-frame
    ``container.seek()``. The seek releases the GIL between frames and
    scales with the number of sampled frames rather than the video
    length, enabling concurrent decoding under serving load.
    """

    @staticmethod
    def get_metadata(
        container: "av.container.InputContainer",
    ) -> VideoSourceMetadata:
        if not container.streams.video:
            raise ValueError("No video streams found in container")
        stream = container.streams.video[0]
        total_frames = stream.frames or 0
        fps = float(stream.average_rate) if stream.average_rate else 0.0
        duration = float(stream.duration * stream.time_base) if stream.duration else 0.0
        if total_frames == 0 and duration > 0 and fps > 0:
            total_frames = int(duration * fps)
        return VideoSourceMetadata(total_frames, fps, duration)

    @staticmethod
    def decode_frames(
        container: "av.container.InputContainer",
        frame_indices: list[int],
        fps: float,
        duration: float,
    ) -> tuple[npt.NDArray, list[int]]:
        """Decode target frames via per-frame seek + forward decode to PTS."""
        stream = container.streams.video[0]
        # SLICE parallelizes within a single frame without the
        # one-frame-per-thread latency penalty of FRAME threading.
        stream.thread_type = "SLICE"
        time_base = stream.time_base

        frames_list: list[npt.NDArray] = []
        valid_indices: list[int] = []
        frame_interval = 1.0 / fps if fps > 0 else 0.1
        max_ts = max(0.0, duration - frame_interval) if duration > 0 else float("inf")

        decoder = None
        last_pts = None
        for idx in frame_indices:
            ts = min(idx / fps, max_ts) if fps > 0 else 0.0
            pts = int(ts / time_base)
            # seek() snaps backward to a keyframe; reuse the running decoder
            # while targets advance monotonically to avoid re-decoding the
            # GOP prefix once per requested frame.
            if decoder is None or last_pts is None or pts <= last_pts:
                container.seek(pts, stream=stream)
                decoder = container.decode(video=0)
            chosen = None
            for frame in decoder:
                if frame.pts is not None and frame.pts >= pts:
                    chosen = frame
                    last_pts = frame.pts
                    break
            if chosen is not None:
                frames_list.append(chosen.to_ndarray(format="rgb24"))
                valid_indices.append(idx)
            else:
                decoder = None

        if not frames_list:
            return np.empty((0,), dtype=np.uint8), valid_indices
        return np.stack(frames_list), valid_indices

decode_frames(container, frame_indices, fps, duration) staticmethod

Decode target frames via per-frame seek + forward decode to PTS.

Source code in vllm/multimodal/video.py
@staticmethod
def decode_frames(
    container: "av.container.InputContainer",
    frame_indices: list[int],
    fps: float,
    duration: float,
) -> tuple[npt.NDArray, list[int]]:
    """Decode target frames via per-frame seek + forward decode to PTS."""
    stream = container.streams.video[0]
    # SLICE parallelizes within a single frame without the
    # one-frame-per-thread latency penalty of FRAME threading.
    stream.thread_type = "SLICE"
    time_base = stream.time_base

    frames_list: list[npt.NDArray] = []
    valid_indices: list[int] = []
    frame_interval = 1.0 / fps if fps > 0 else 0.1
    max_ts = max(0.0, duration - frame_interval) if duration > 0 else float("inf")

    decoder = None
    last_pts = None
    for idx in frame_indices:
        ts = min(idx / fps, max_ts) if fps > 0 else 0.0
        pts = int(ts / time_base)
        # seek() snaps backward to a keyframe; reuse the running decoder
        # while targets advance monotonically to avoid re-decoding the
        # GOP prefix once per requested frame.
        if decoder is None or last_pts is None or pts <= last_pts:
            container.seek(pts, stream=stream)
            decoder = container.decode(video=0)
        chosen = None
        for frame in decoder:
            if frame.pts is not None and frame.pts >= pts:
                chosen = frame
                last_pts = frame.pts
                break
        if chosen is not None:
            frames_list.append(chosen.to_ndarray(format="rgb24"))
            valid_indices.append(idx)
        else:
            decoder = None

    if not frames_list:
        return np.empty((0,), dtype=np.uint8), valid_indices
    return np.stack(frames_list), valid_indices

VideoBackend

Bases: VideoLoader, OpenCVVideoBackendMixin, PyAVVideoBackendMixin

Uniform-sampling video backend.

Samples num_frames uniformly across the video (or one frame every 1/fps seconds, whichever produces fewer frames). The decoding codec is selected via the backend kwarg ("opencv" or "pyav"), which can be passed through --media-io-kwargs. Defaults to "pyav" for concurrent decoding.

Methods:

  • load_bytes

    Load sampled frames from raw video bytes.

Source code in vllm/multimodal/video.py
@VIDEO_LOADER_REGISTRY.register("opencv")
class VideoBackend(VideoLoader, OpenCVVideoBackendMixin, PyAVVideoBackendMixin):
    """Uniform-sampling video backend.

    Samples ``num_frames`` uniformly across the video (or one frame every
    ``1/fps`` seconds, whichever produces fewer frames). The decoding codec
    is selected via the ``backend`` kwarg (``"opencv"`` or ``"pyav"``),
    which can be passed through ``--media-io-kwargs``. Defaults to
    ``"pyav"`` for concurrent decoding.
    """

    _sampling_suffix: ClassVar[str] = ""

    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ) -> list[int]:
        total_frames_num = source.total_frames_num
        duration = source.duration
        num_frames = target.num_frames
        fps = target.fps
        # resample video to target num_frames and fps
        # - the minimum of the two will be used
        num_frames_to_sample = total_frames_num
        if num_frames > 0:
            num_frames_to_sample = min(num_frames, total_frames_num)
        if fps > 0:
            num_frames_to_sample = min(num_frames_to_sample, math.floor(duration * fps))
        num_frames_to_sample = max(1, num_frames_to_sample)

        if num_frames_to_sample == total_frames_num:
            return list(range(num_frames_to_sample))
        return np.linspace(
            0, total_frames_num - 1, num_frames_to_sample, dtype=int
        ).tolist()

    @classmethod
    def _prepare_source(cls, source: VideoSourceMetadata) -> VideoSourceMetadata:
        """Sampling-algorithm-specific metadata adjustment hook."""
        return source

    @classmethod
    def load_bytes(
        cls,
        data: bytes,
        num_frames: int = -1,
        fps: int = -1,
        max_duration: int = 300,
        frame_recovery: bool = False,
        *,
        backend: Literal["opencv", "pyav"] = "opencv",
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        """Load sampled frames from raw video bytes.

        Args:
            data: Raw video bytes.
            num_frames: Target number of frames to sample (``-1`` for all).
            fps: Target FPS for sampling (``-1`` for original).
            max_duration: Maximum duration in seconds — only used by the
                dynamic subclass; ignored here.
            frame_recovery: Enable forward-scan recovery for failed frames.
                Only honored by the OpenCV codec.
            backend: Decoding codec — ``"opencv"`` or ``"pyav"`` .

        Returns:
            Tuple of ``(frames_array, metadata_dict)``.
        """
        target = VideoTargetMetadata(
            num_frames=num_frames, fps=fps, max_duration=max_duration
        )

        if backend == "opencv":
            cap = cls.open_video_capture(data)
            source = cls._prepare_source(cls.get_video_metadata(cap))
            frame_idx = cls.compute_frames_index_to_sample(
                source=source, target=target, **kwargs
            )
            frames, valid = cls.read_frames(
                cap,
                frame_idx,
                total_frames_num=source.total_frames_num,
                frame_recovery=frame_recovery,
            )
        elif backend == "pyav":
            assert not frame_recovery, (
                "frame_recovery is only available for `opencv` backend"
            )
            with av.open(BytesIO(data)) as container:
                source = cls._prepare_source(cls.get_metadata(container))
                frame_idx = cls.compute_frames_index_to_sample(
                    source=source, target=target, **kwargs
                )
                frames, valid = cls.decode_frames(
                    container, frame_idx, source.original_fps, source.duration
                )
        else:
            raise ValueError(
                f"Unknown video codec backend {backend!r}; "
                "valid options: 'opencv', 'pyav'."
            )

        if len(valid) < len(frame_idx):
            logger.warning(
                "%s video loading: expected %d frames but got %d.",
                backend,
                len(frame_idx),
                len(valid),
            )

        return frames, cls.create_hf_metadata(
            source=source,
            video_backend=f"{backend}{cls._sampling_suffix}",
            valid_frame_indices=valid,
        )

_prepare_source(source) classmethod

Sampling-algorithm-specific metadata adjustment hook.

Source code in vllm/multimodal/video.py
@classmethod
def _prepare_source(cls, source: VideoSourceMetadata) -> VideoSourceMetadata:
    """Sampling-algorithm-specific metadata adjustment hook."""
    return source

load_bytes(data, num_frames=-1, fps=-1, max_duration=300, frame_recovery=False, *, backend='opencv', **kwargs) classmethod

Load sampled frames from raw video bytes.

Parameters:

  • data

    (bytes) –

    Raw video bytes.

  • num_frames

    (int, default: -1 ) –

    Target number of frames to sample (-1 for all).

  • fps

    (int, default: -1 ) –

    Target FPS for sampling (-1 for original).

  • max_duration

    (int, default: 300 ) –

    Maximum duration in seconds — only used by the dynamic subclass; ignored here.

  • frame_recovery

    (bool, default: False ) –

    Enable forward-scan recovery for failed frames. Only honored by the OpenCV codec.

  • backend

    (Literal['opencv', 'pyav'], default: 'opencv' ) –

    Decoding codec — "opencv" or "pyav" .

Returns:

Source code in vllm/multimodal/video.py
@classmethod
def load_bytes(
    cls,
    data: bytes,
    num_frames: int = -1,
    fps: int = -1,
    max_duration: int = 300,
    frame_recovery: bool = False,
    *,
    backend: Literal["opencv", "pyav"] = "opencv",
    **kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
    """Load sampled frames from raw video bytes.

    Args:
        data: Raw video bytes.
        num_frames: Target number of frames to sample (``-1`` for all).
        fps: Target FPS for sampling (``-1`` for original).
        max_duration: Maximum duration in seconds — only used by the
            dynamic subclass; ignored here.
        frame_recovery: Enable forward-scan recovery for failed frames.
            Only honored by the OpenCV codec.
        backend: Decoding codec — ``"opencv"`` or ``"pyav"`` .

    Returns:
        Tuple of ``(frames_array, metadata_dict)``.
    """
    target = VideoTargetMetadata(
        num_frames=num_frames, fps=fps, max_duration=max_duration
    )

    if backend == "opencv":
        cap = cls.open_video_capture(data)
        source = cls._prepare_source(cls.get_video_metadata(cap))
        frame_idx = cls.compute_frames_index_to_sample(
            source=source, target=target, **kwargs
        )
        frames, valid = cls.read_frames(
            cap,
            frame_idx,
            total_frames_num=source.total_frames_num,
            frame_recovery=frame_recovery,
        )
    elif backend == "pyav":
        assert not frame_recovery, (
            "frame_recovery is only available for `opencv` backend"
        )
        with av.open(BytesIO(data)) as container:
            source = cls._prepare_source(cls.get_metadata(container))
            frame_idx = cls.compute_frames_index_to_sample(
                source=source, target=target, **kwargs
            )
            frames, valid = cls.decode_frames(
                container, frame_idx, source.original_fps, source.duration
            )
    else:
        raise ValueError(
            f"Unknown video codec backend {backend!r}; "
            "valid options: 'opencv', 'pyav'."
        )

    if len(valid) < len(frame_idx):
        logger.warning(
            "%s video loading: expected %d frames but got %d.",
            backend,
            len(frame_idx),
            len(valid),
        )

    return frames, cls.create_hf_metadata(
        source=source,
        video_backend=f"{backend}{cls._sampling_suffix}",
        valid_frame_indices=valid,
    )

VideoLoader

Methods:

Source code in vllm/multimodal/video.py
class VideoLoader:
    @classmethod
    def compute_frames_index_to_sample(
        cls,
        source: VideoSourceMetadata,
        target: VideoTargetMetadata,
        **kwargs,
    ) -> list[int]:
        """Return the list of frame indices to sample from the video."""
        raise NotImplementedError

    @classmethod
    @abstractmethod
    def load_bytes(
        cls,
        data: bytes,
        **kwargs,
    ) -> tuple[npt.NDArray, dict[str, Any]]:
        """Load video frames from bytes and return (frames_array, metadata_dict)."""
        raise NotImplementedError

    @classmethod
    def create_hf_metadata(
        cls,
        source: VideoSourceMetadata,
        valid_frame_indices: list[int],
        video_backend: str,
    ):
        return {
            "total_num_frames": source.total_frames_num,
            "fps": source.original_fps,
            "duration": source.duration,
            "video_backend": video_backend,
            "frames_indices": valid_frame_indices,
            "do_sample_frames": len(valid_frame_indices) == source.total_frames_num,
        }

compute_frames_index_to_sample(source, target, **kwargs) classmethod

Return the list of frame indices to sample from the video.

Source code in vllm/multimodal/video.py
@classmethod
def compute_frames_index_to_sample(
    cls,
    source: VideoSourceMetadata,
    target: VideoTargetMetadata,
    **kwargs,
) -> list[int]:
    """Return the list of frame indices to sample from the video."""
    raise NotImplementedError

load_bytes(data, **kwargs) abstractmethod classmethod

Load video frames from bytes and return (frames_array, metadata_dict).

Source code in vllm/multimodal/video.py
@classmethod
@abstractmethod
def load_bytes(
    cls,
    data: bytes,
    **kwargs,
) -> tuple[npt.NDArray, dict[str, Any]]:
    """Load video frames from bytes and return (frames_array, metadata_dict)."""
    raise NotImplementedError

VideoSourceMetadata

Bases: NamedTuple

Metadata represents source video.

Source code in vllm/multimodal/video.py
class VideoSourceMetadata(NamedTuple):
    """Metadata represents source video."""

    total_frames_num: int
    original_fps: float
    duration: float

VideoTargetMetadata

Bases: NamedTuple

Metadata represents target video.

Source code in vllm/multimodal/video.py
class VideoTargetMetadata(NamedTuple):
    """Metadata represents target video."""

    num_frames: int
    fps: float
    max_duration: float