Skip to content

vllm.engine.protocol

Classes:

EngineClient

Bases: ABC

Protocol class for Clients to Engine

Methods:

Source code in vllm/engine/protocol.py
class EngineClient(ABC):
    """Protocol class for Clients to Engine"""

    vllm_config: VllmConfig
    model_config: ModelConfig
    renderer: BaseRenderer
    input_processor: InputProcessor

    @property
    @abstractmethod
    def is_running(self) -> bool: ...

    @property
    @abstractmethod
    def is_stopped(self) -> bool: ...

    @property
    @abstractmethod
    def errored(self) -> bool: ...

    @property
    @abstractmethod
    def dead_error(self) -> BaseException: ...

    @abstractmethod
    def generate(
        self,
        prompt: EngineCoreRequest
        | PromptType
        | EngineInput
        | AsyncGenerator[StreamingInput, None],
        sampling_params: SamplingParams,
        request_id: str,
        *,
        prompt_text: str | None = None,
        lora_request: LoRARequest | None = None,
        tokenization_kwargs: dict[str, Any] | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        data_parallel_rank: int | None = None,
        reasoning_ended: bool | None = None,
        reasoning_parser_kwargs: dict[str, Any] | None = None,
    ) -> AsyncGenerator[RequestOutput, None]:
        """Generate outputs for a request."""
        ...

    @abstractmethod
    def encode(
        self,
        prompt: PromptType | EngineInput,
        pooling_params: PoolingParams,
        request_id: str,
        lora_request: LoRARequest | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        tokenization_kwargs: dict[str, Any] | None = None,
        reasoning_ended: bool | None = None,
    ) -> AsyncGenerator[PoolingRequestOutput, None]:
        """Generate outputs for a request from a pooling model."""
        ...

    @abstractmethod
    async def abort(self, request_id: str | Iterable[str]) -> None:
        """Abort a request.

        Args:
            request_id: The unique id of the request,
                        or an iterable of such ids.
        """
        ...

    @abstractmethod
    async def notify_kv_transfer_request_rejected(
        self,
        request_id: str,
        kv_transfer_params: dict[str, Any],
        *,
        data_parallel_rank: int | None = None,
    ) -> None:
        """Notify the engine that a KV-transfer request was rejected before
        engine admission, so connector-side cleanup can run (e.g. free
        prefill blocks pinned on the P node).
        """
        ...

    @abstractmethod
    async def is_tracing_enabled(self) -> bool: ...

    @abstractmethod
    async def do_log_stats(self) -> None: ...

    @abstractmethod
    async def check_health(self) -> None:
        """Raise if unhealthy"""
        ...

    @abstractmethod
    async def start_profile(self) -> None:
        """Start profiling the engine"""
        ...

    @abstractmethod
    async def stop_profile(self) -> None:
        """Stop profiling the engine"""
        ...

    @abstractmethod
    async def reset_mm_cache(self) -> None:
        """Reset the multi-modal cache"""
        ...

    @abstractmethod
    async def reset_encoder_cache(self) -> None:
        """Reset the encoder cache"""
        ...

    @abstractmethod
    async def reset_prefix_cache(
        self, reset_running_requests: bool = False, reset_connector: bool = False
    ) -> bool:
        """Reset the prefix cache and optionally any configured connector cache"""
        ...

    @abstractmethod
    async def sleep(self, level: int = 1, mode: "PauseMode" = "abort") -> None:
        """Sleep the engine"""
        ...

    @abstractmethod
    async def wake_up(self, tags: list[str] | None = None) -> None:
        """Wake up the engine"""
        ...

    @abstractmethod
    async def is_sleeping(self) -> bool:
        """Check whether the engine is sleeping"""
        ...

    @abstractmethod
    async def add_lora(self, lora_request: LoRARequest) -> bool:
        """Load a new LoRA adapter into the engine for future requests."""
        ...

    @abstractmethod
    async def pause_generation(
        self,
        *,
        mode: "PauseMode" = "abort",
        wait_for_inflight_requests: bool = False,
        clear_cache: bool = True,
    ) -> None:
        """Pause new generation/encoding requests.

        Args:
            mode: How to handle in-flight requests:
                - ``"abort"``: Abort all in-flight requests immediately
                  and return partial results with "abort" reason (default).
                - ``"wait"``: Wait for in-flight requests to complete.
                - ``"keep"``: Freeze requests in queue; they resume on
                  :meth:`resume_generation`.
            wait_for_inflight_requests: DEPRECATED. Use ``mode="wait"`` instead.
            clear_cache: DEPRECATED. Whether to clear KV and prefix caches
                after draining.
        """
        ...

    @abstractmethod
    async def resume_generation(self) -> None:
        """Resume accepting generation/encoding requests."""
        ...

    @abstractmethod
    async def is_paused(self) -> bool:
        """Return whether the engine is currently paused."""
        ...

    @abstractmethod
    def shutdown(self, timeout: float | None = None) -> None:
        """Shutdown the engine with optional timeout."""
        ...

    async def scale_elastic_ep(
        self, new_data_parallel_size: int, drain_timeout: int = 300
    ) -> None:
        """Scale the engine"""
        raise NotImplementedError

    async def collective_rpc(
        self,
        method: str,
        timeout: float | None = None,
        args: tuple = (),
        kwargs: dict | None = None,
    ):
        """Perform a collective RPC call to the given path."""
        raise NotImplementedError

    async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
        """Get supported tasks"""
        raise NotImplementedError

    async def init_weight_transfer_engine(
        self, init_request: WeightTransferInitRequest
    ) -> None:
        """Initialize weight transfer for RL training."""
        raise NotImplementedError

    async def start_weight_update(self, is_checkpoint_format: bool = True) -> None:
        """Start a new weight update."""
        raise NotImplementedError

    async def update_weights(self, request: WeightTransferUpdateRequest) -> None:
        """Batched weight update for RL training."""
        raise NotImplementedError

    async def finish_weight_update(self) -> None:
        """Finish the current weight update."""
        raise NotImplementedError

abort(request_id) abstractmethod async

Abort a request.

Parameters:

  • request_id

    (str | Iterable[str]) –

    The unique id of the request, or an iterable of such ids.

Source code in vllm/engine/protocol.py
@abstractmethod
async def abort(self, request_id: str | Iterable[str]) -> None:
    """Abort a request.

    Args:
        request_id: The unique id of the request,
                    or an iterable of such ids.
    """
    ...

add_lora(lora_request) abstractmethod async

Load a new LoRA adapter into the engine for future requests.

Source code in vllm/engine/protocol.py
@abstractmethod
async def add_lora(self, lora_request: LoRARequest) -> bool:
    """Load a new LoRA adapter into the engine for future requests."""
    ...

check_health() abstractmethod async

Raise if unhealthy

Source code in vllm/engine/protocol.py
@abstractmethod
async def check_health(self) -> None:
    """Raise if unhealthy"""
    ...

collective_rpc(method, timeout=None, args=(), kwargs=None) async

Perform a collective RPC call to the given path.

Source code in vllm/engine/protocol.py
async def collective_rpc(
    self,
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
):
    """Perform a collective RPC call to the given path."""
    raise NotImplementedError

encode(prompt, pooling_params, request_id, lora_request=None, trace_headers=None, priority=0, tokenization_kwargs=None, reasoning_ended=None) abstractmethod

Generate outputs for a request from a pooling model.

Source code in vllm/engine/protocol.py
@abstractmethod
def encode(
    self,
    prompt: PromptType | EngineInput,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
    reasoning_ended: bool | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]:
    """Generate outputs for a request from a pooling model."""
    ...

finish_weight_update() async

Finish the current weight update.

Source code in vllm/engine/protocol.py
async def finish_weight_update(self) -> None:
    """Finish the current weight update."""
    raise NotImplementedError

generate(prompt, sampling_params, request_id, *, prompt_text=None, lora_request=None, tokenization_kwargs=None, trace_headers=None, priority=0, data_parallel_rank=None, reasoning_ended=None, reasoning_parser_kwargs=None) abstractmethod

Generate outputs for a request.

Source code in vllm/engine/protocol.py
@abstractmethod
def generate(
    self,
    prompt: EngineCoreRequest
    | PromptType
    | EngineInput
    | AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: str | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    reasoning_ended: bool | None = None,
    reasoning_parser_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[RequestOutput, None]:
    """Generate outputs for a request."""
    ...

get_supported_tasks() async

Get supported tasks

Source code in vllm/engine/protocol.py
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
    """Get supported tasks"""
    raise NotImplementedError

init_weight_transfer_engine(init_request) async

Initialize weight transfer for RL training.

Source code in vllm/engine/protocol.py
async def init_weight_transfer_engine(
    self, init_request: WeightTransferInitRequest
) -> None:
    """Initialize weight transfer for RL training."""
    raise NotImplementedError

is_paused() abstractmethod async

Return whether the engine is currently paused.

Source code in vllm/engine/protocol.py
@abstractmethod
async def is_paused(self) -> bool:
    """Return whether the engine is currently paused."""
    ...

is_sleeping() abstractmethod async

Check whether the engine is sleeping

Source code in vllm/engine/protocol.py
@abstractmethod
async def is_sleeping(self) -> bool:
    """Check whether the engine is sleeping"""
    ...

notify_kv_transfer_request_rejected(request_id, kv_transfer_params, *, data_parallel_rank=None) abstractmethod async

Notify the engine that a KV-transfer request was rejected before engine admission, so connector-side cleanup can run (e.g. free prefill blocks pinned on the P node).

Source code in vllm/engine/protocol.py
@abstractmethod
async def notify_kv_transfer_request_rejected(
    self,
    request_id: str,
    kv_transfer_params: dict[str, Any],
    *,
    data_parallel_rank: int | None = None,
) -> None:
    """Notify the engine that a KV-transfer request was rejected before
    engine admission, so connector-side cleanup can run (e.g. free
    prefill blocks pinned on the P node).
    """
    ...

pause_generation(*, mode='abort', wait_for_inflight_requests=False, clear_cache=True) abstractmethod async

Pause new generation/encoding requests.

Parameters:

  • mode

    (PauseMode, default: 'abort' ) –

    How to handle in-flight requests: - "abort": Abort all in-flight requests immediately and return partial results with "abort" reason (default). - "wait": Wait for in-flight requests to complete. - "keep": Freeze requests in queue; they resume on :meth:resume_generation.

  • wait_for_inflight_requests

    (bool, default: False ) –

    DEPRECATED. Use mode="wait" instead.

  • clear_cache

    (bool, default: True ) –

    DEPRECATED. Whether to clear KV and prefix caches after draining.

Source code in vllm/engine/protocol.py
@abstractmethod
async def pause_generation(
    self,
    *,
    mode: "PauseMode" = "abort",
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None:
    """Pause new generation/encoding requests.

    Args:
        mode: How to handle in-flight requests:
            - ``"abort"``: Abort all in-flight requests immediately
              and return partial results with "abort" reason (default).
            - ``"wait"``: Wait for in-flight requests to complete.
            - ``"keep"``: Freeze requests in queue; they resume on
              :meth:`resume_generation`.
        wait_for_inflight_requests: DEPRECATED. Use ``mode="wait"`` instead.
        clear_cache: DEPRECATED. Whether to clear KV and prefix caches
            after draining.
    """
    ...

reset_encoder_cache() abstractmethod async

Reset the encoder cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_encoder_cache(self) -> None:
    """Reset the encoder cache"""
    ...

reset_mm_cache() abstractmethod async

Reset the multi-modal cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_mm_cache(self) -> None:
    """Reset the multi-modal cache"""
    ...

reset_prefix_cache(reset_running_requests=False, reset_connector=False) abstractmethod async

Reset the prefix cache and optionally any configured connector cache

Source code in vllm/engine/protocol.py
@abstractmethod
async def reset_prefix_cache(
    self, reset_running_requests: bool = False, reset_connector: bool = False
) -> bool:
    """Reset the prefix cache and optionally any configured connector cache"""
    ...

resume_generation() abstractmethod async

Resume accepting generation/encoding requests.

Source code in vllm/engine/protocol.py
@abstractmethod
async def resume_generation(self) -> None:
    """Resume accepting generation/encoding requests."""
    ...

scale_elastic_ep(new_data_parallel_size, drain_timeout=300) async

Scale the engine

Source code in vllm/engine/protocol.py
async def scale_elastic_ep(
    self, new_data_parallel_size: int, drain_timeout: int = 300
) -> None:
    """Scale the engine"""
    raise NotImplementedError

shutdown(timeout=None) abstractmethod

Shutdown the engine with optional timeout.

Source code in vllm/engine/protocol.py
@abstractmethod
def shutdown(self, timeout: float | None = None) -> None:
    """Shutdown the engine with optional timeout."""
    ...

sleep(level=1, mode='abort') abstractmethod async

Sleep the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def sleep(self, level: int = 1, mode: "PauseMode" = "abort") -> None:
    """Sleep the engine"""
    ...

start_profile() abstractmethod async

Start profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def start_profile(self) -> None:
    """Start profiling the engine"""
    ...

start_weight_update(is_checkpoint_format=True) async

Start a new weight update.

Source code in vllm/engine/protocol.py
async def start_weight_update(self, is_checkpoint_format: bool = True) -> None:
    """Start a new weight update."""
    raise NotImplementedError

stop_profile() abstractmethod async

Stop profiling the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def stop_profile(self) -> None:
    """Stop profiling the engine"""
    ...

update_weights(request) async

Batched weight update for RL training.

Source code in vllm/engine/protocol.py
async def update_weights(self, request: WeightTransferUpdateRequest) -> None:
    """Batched weight update for RL training."""
    raise NotImplementedError

wake_up(tags=None) abstractmethod async

Wake up the engine

Source code in vllm/engine/protocol.py
@abstractmethod
async def wake_up(self, tags: list[str] | None = None) -> None:
    """Wake up the engine"""
    ...

StreamingInput dataclass

Input data for a streaming generation request.

This is used with generate() to support multi-turn streaming sessions where inputs are provided via an async generator.

Source code in vllm/engine/protocol.py
@dataclass
class StreamingInput:
    """Input data for a streaming generation request.

    This is used with generate() to support multi-turn streaming sessions
    where inputs are provided via an async generator.
    """

    prompt: EngineInput
    sampling_params: SamplingParams | None = None