Skip to content

vllm.v1.worker.gpu.async_utils

AsyncOutput

Bases: AsyncModelRunnerOutput

Source code in vllm/v1/worker/gpu/async_utils.py
class AsyncOutput(AsyncModelRunnerOutput):
    def __init__(
        self,
        model_runner_output: ModelRunnerOutput,
        sampler_output: SamplerOutput,
        num_sampled_tokens: torch.Tensor,
        copy_stream: torch.cuda.Stream,
        copy_event: torch.cuda.Event,
    ):
        # NOTE(woosuk): We must retain references to the GPU tensors,
        # as the copy operations are performed on a different CUDA stream than
        # the one where the tensors were created.
        self.model_runner_output = model_runner_output
        self.sampler_output = sampler_output
        self.num_sampled_tokens = num_sampled_tokens
        self.copy_event = copy_event

        default_stream = torch.cuda.current_stream()
        with torch.cuda.stream(copy_stream):
            copy_stream.wait_stream(default_stream)

            self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
            self.logprobs_tensors: LogprobsTensors | None = None
            if sampler_output.logprobs_tensors is not None:
                self.logprobs_tensors = (
                    sampler_output.logprobs_tensors.to_cpu_nonblocking()
                )
            self.num_nans: np.ndarray | None = None
            if sampler_output.num_nans is not None:
                self.num_nans = async_copy_to_np(sampler_output.num_nans)
            self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
            self.prompt_logprobs_dict = {
                k: v.to_cpu_nonblocking() if v is not None else None
                for k, v in self.model_runner_output.prompt_logprobs_dict.items()
            }
            self.copy_event.record(copy_stream)

    def get_output(self) -> ModelRunnerOutput:
        self.copy_event.synchronize()

        # NOTE(woosuk): The following code is to ensure compatibility with
        # the existing model runner.
        # Going forward, we should keep the data structures as NumPy arrays
        # rather than Python lists.
        sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
        num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
        for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
            del token_ids[num_tokens:]
        self.model_runner_output.sampled_token_ids = sampled_token_ids

        if self.num_nans is not None:
            self.model_runner_output.num_nans_in_logits = dict(
                zip(self.model_runner_output.req_ids, self.num_nans.tolist())
            )

        if self.logprobs_tensors is not None:
            self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
        self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
        return self.model_runner_output

copy_event instance-attribute

copy_event = copy_event

logprobs_tensors instance-attribute

logprobs_tensors: LogprobsTensors | None = None

model_runner_output instance-attribute

model_runner_output = model_runner_output

num_nans instance-attribute

num_nans: ndarray | None = None

num_sampled_tokens instance-attribute

num_sampled_tokens = num_sampled_tokens

num_sampled_tokens_np instance-attribute

num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)

prompt_logprobs_dict instance-attribute

prompt_logprobs_dict = {
    k: (to_cpu_nonblocking() if v is not None else None)
    for k, v in (items())
}

sampled_token_ids instance-attribute

sampled_token_ids = async_copy_to_np(sampled_token_ids)

sampler_output instance-attribute

sampler_output = sampler_output

__init__

__init__(
    model_runner_output: ModelRunnerOutput,
    sampler_output: SamplerOutput,
    num_sampled_tokens: Tensor,
    copy_stream: Stream,
    copy_event: Event,
)
Source code in vllm/v1/worker/gpu/async_utils.py
def __init__(
    self,
    model_runner_output: ModelRunnerOutput,
    sampler_output: SamplerOutput,
    num_sampled_tokens: torch.Tensor,
    copy_stream: torch.cuda.Stream,
    copy_event: torch.cuda.Event,
):
    # NOTE(woosuk): We must retain references to the GPU tensors,
    # as the copy operations are performed on a different CUDA stream than
    # the one where the tensors were created.
    self.model_runner_output = model_runner_output
    self.sampler_output = sampler_output
    self.num_sampled_tokens = num_sampled_tokens
    self.copy_event = copy_event

    default_stream = torch.cuda.current_stream()
    with torch.cuda.stream(copy_stream):
        copy_stream.wait_stream(default_stream)

        self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
        self.logprobs_tensors: LogprobsTensors | None = None
        if sampler_output.logprobs_tensors is not None:
            self.logprobs_tensors = (
                sampler_output.logprobs_tensors.to_cpu_nonblocking()
            )
        self.num_nans: np.ndarray | None = None
        if sampler_output.num_nans is not None:
            self.num_nans = async_copy_to_np(sampler_output.num_nans)
        self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
        self.prompt_logprobs_dict = {
            k: v.to_cpu_nonblocking() if v is not None else None
            for k, v in self.model_runner_output.prompt_logprobs_dict.items()
        }
        self.copy_event.record(copy_stream)

get_output

get_output() -> ModelRunnerOutput
Source code in vllm/v1/worker/gpu/async_utils.py
def get_output(self) -> ModelRunnerOutput:
    self.copy_event.synchronize()

    # NOTE(woosuk): The following code is to ensure compatibility with
    # the existing model runner.
    # Going forward, we should keep the data structures as NumPy arrays
    # rather than Python lists.
    sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
    num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
    for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
        del token_ids[num_tokens:]
    self.model_runner_output.sampled_token_ids = sampled_token_ids

    if self.num_nans is not None:
        self.model_runner_output.num_nans_in_logits = dict(
            zip(self.model_runner_output.req_ids, self.num_nans.tolist())
        )

    if self.logprobs_tensors is not None:
        self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
    self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
    return self.model_runner_output

async_copy_to_np

async_copy_to_np(x: Tensor) -> ndarray
Source code in vllm/v1/worker/gpu/async_utils.py
def async_copy_to_np(x: torch.Tensor) -> np.ndarray:
    return x.to("cpu", non_blocking=True).numpy()