Skip to content

vllm.entrypoints.openai.realtime.serving

logger module-attribute

logger = init_logger(__name__)

OpenAIServingRealtime

Bases: OpenAIServing

Realtime audio transcription service via WebSocket streaming.

Provides streaming audio-to-text transcription by transforming audio chunks into StreamingInput objects that can be consumed by the engine.

Source code in vllm/entrypoints/openai/realtime/serving.py
class OpenAIServingRealtime(OpenAIServing):
    """Realtime audio transcription service via WebSocket streaming.

    Provides streaming audio-to-text transcription by transforming audio chunks
    into StreamingInput objects that can be consumed by the engine.
    """

    def __init__(
        self,
        engine_client: EngineClient,
        models: OpenAIServingModels,
        *,
        request_logger: RequestLogger | None,
        log_error_stack: bool = False,
    ):
        super().__init__(
            engine_client=engine_client,
            models=models,
            request_logger=request_logger,
            log_error_stack=log_error_stack,
        )

        self.task_type: Literal["realtime"] = "realtime"

        logger.info("OpenAIServingRealtime initialized for task: %s", self.task_type)

    @cached_property
    def model_cls(self) -> type[SupportsRealtime]:
        """Get the model class that supports transcription."""
        from vllm.model_executor.model_loader import get_model_cls

        model_cls = get_model_cls(self.model_config)
        return cast(type[SupportsRealtime], model_cls)

    async def transcribe_realtime(
        self,
        audio_stream: AsyncGenerator[np.ndarray, None],
        input_stream: asyncio.Queue[list[int]],
    ) -> AsyncGenerator[StreamingInput, None]:
        """Transform audio stream into StreamingInput for engine.generate().

        Args:
            audio_stream: Async generator yielding float32 numpy audio arrays
            input_stream: Queue containing context token IDs from previous
                generation outputs. Used for autoregressive multi-turn
                processing where each generation's output becomes the context
                for the next iteration.

        Yields:
            StreamingInput objects containing audio prompts for the engine
        """

        # mypy is being stupid
        # TODO(Patrick) - fix this
        stream_input_iter = cast(
            AsyncGenerator[PromptType, None],
            self.model_cls.buffer_realtime_audio(
                audio_stream, input_stream, self.model_config
            ),
        )

        async for prompt in stream_input_iter:
            yield StreamingInput(prompt=prompt)

model_cls cached property

model_cls: type[SupportsRealtime]

Get the model class that supports transcription.

task_type instance-attribute

task_type: Literal['realtime'] = 'realtime'

__init__

__init__(
    engine_client: EngineClient,
    models: OpenAIServingModels,
    *,
    request_logger: RequestLogger | None,
    log_error_stack: bool = False,
)
Source code in vllm/entrypoints/openai/realtime/serving.py
def __init__(
    self,
    engine_client: EngineClient,
    models: OpenAIServingModels,
    *,
    request_logger: RequestLogger | None,
    log_error_stack: bool = False,
):
    super().__init__(
        engine_client=engine_client,
        models=models,
        request_logger=request_logger,
        log_error_stack=log_error_stack,
    )

    self.task_type: Literal["realtime"] = "realtime"

    logger.info("OpenAIServingRealtime initialized for task: %s", self.task_type)

transcribe_realtime async

transcribe_realtime(
    audio_stream: AsyncGenerator[ndarray, None],
    input_stream: Queue[list[int]],
) -> AsyncGenerator[StreamingInput, None]

Transform audio stream into StreamingInput for engine.generate().

Parameters:

Name Type Description Default
audio_stream AsyncGenerator[ndarray, None]

Async generator yielding float32 numpy audio arrays

required
input_stream Queue[list[int]]

Queue containing context token IDs from previous generation outputs. Used for autoregressive multi-turn processing where each generation's output becomes the context for the next iteration.

required

Yields:

Type Description
AsyncGenerator[StreamingInput, None]

StreamingInput objects containing audio prompts for the engine

Source code in vllm/entrypoints/openai/realtime/serving.py
async def transcribe_realtime(
    self,
    audio_stream: AsyncGenerator[np.ndarray, None],
    input_stream: asyncio.Queue[list[int]],
) -> AsyncGenerator[StreamingInput, None]:
    """Transform audio stream into StreamingInput for engine.generate().

    Args:
        audio_stream: Async generator yielding float32 numpy audio arrays
        input_stream: Queue containing context token IDs from previous
            generation outputs. Used for autoregressive multi-turn
            processing where each generation's output becomes the context
            for the next iteration.

    Yields:
        StreamingInput objects containing audio prompts for the engine
    """

    # mypy is being stupid
    # TODO(Patrick) - fix this
    stream_input_iter = cast(
        AsyncGenerator[PromptType, None],
        self.model_cls.buffer_realtime_audio(
            audio_stream, input_stream, self.model_config
        ),
    )

    async for prompt in stream_input_iter:
        yield StreamingInput(prompt=prompt)