Bases: AsyncModelRunnerOutput
Source code in vllm/v1/worker/gpu/async_utils.py
| class AsyncOutput(AsyncModelRunnerOutput):
def __init__(
self,
model_runner_output: ModelRunnerOutput,
sampler_output: SamplerOutput,
num_sampled_tokens: torch.Tensor,
copy_stream: torch.cuda.Stream,
copy_event: torch.cuda.Event,
):
# NOTE(woosuk): We must retain references to the GPU tensors,
# as the copy operations are performed on a different CUDA stream than
# the one where the tensors were created.
self.model_runner_output = model_runner_output
self.sampler_output = sampler_output
self.num_sampled_tokens = num_sampled_tokens
self.copy_event = copy_event
default_stream = torch.cuda.current_stream()
with torch.cuda.stream(copy_stream):
copy_stream.wait_stream(default_stream)
self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
self.logprobs_tensors: LogprobsTensors | None = None
if sampler_output.logprobs_tensors is not None:
self.logprobs_tensors = (
sampler_output.logprobs_tensors.to_cpu_nonblocking()
)
self.num_nans: np.ndarray | None = None
if sampler_output.num_nans is not None:
self.num_nans = async_copy_to_np(sampler_output.num_nans)
self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
self.prompt_logprobs_dict = {
k: v.to_cpu_nonblocking() if v is not None else None
for k, v in self.model_runner_output.prompt_logprobs_dict.items()
}
self.copy_event.record(copy_stream)
def get_output(self) -> ModelRunnerOutput:
self.copy_event.synchronize()
# NOTE(woosuk): The following code is to ensure compatibility with
# the existing model runner.
# Going forward, we should keep the data structures as NumPy arrays
# rather than Python lists.
sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
del token_ids[num_tokens:]
self.model_runner_output.sampled_token_ids = sampled_token_ids
if self.num_nans is not None:
self.model_runner_output.num_nans_in_logits = dict(
zip(self.model_runner_output.req_ids, self.num_nans.tolist())
)
if self.logprobs_tensors is not None:
self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
return self.model_runner_output
|
copy_event instance-attribute
logprobs_tensors instance-attribute
model_runner_output instance-attribute
model_runner_output = model_runner_output
num_nans instance-attribute
num_sampled_tokens instance-attribute
num_sampled_tokens = num_sampled_tokens
num_sampled_tokens_np instance-attribute
prompt_logprobs_dict instance-attribute
prompt_logprobs_dict = {
k: (to_cpu_nonblocking() if v is not None else None)
for k, v in (items())
}
sampled_token_ids instance-attribute
sampler_output instance-attribute
sampler_output = sampler_output
__init__
Source code in vllm/v1/worker/gpu/async_utils.py
| def __init__(
self,
model_runner_output: ModelRunnerOutput,
sampler_output: SamplerOutput,
num_sampled_tokens: torch.Tensor,
copy_stream: torch.cuda.Stream,
copy_event: torch.cuda.Event,
):
# NOTE(woosuk): We must retain references to the GPU tensors,
# as the copy operations are performed on a different CUDA stream than
# the one where the tensors were created.
self.model_runner_output = model_runner_output
self.sampler_output = sampler_output
self.num_sampled_tokens = num_sampled_tokens
self.copy_event = copy_event
default_stream = torch.cuda.current_stream()
with torch.cuda.stream(copy_stream):
copy_stream.wait_stream(default_stream)
self.sampled_token_ids = async_copy_to_np(sampler_output.sampled_token_ids)
self.logprobs_tensors: LogprobsTensors | None = None
if sampler_output.logprobs_tensors is not None:
self.logprobs_tensors = (
sampler_output.logprobs_tensors.to_cpu_nonblocking()
)
self.num_nans: np.ndarray | None = None
if sampler_output.num_nans is not None:
self.num_nans = async_copy_to_np(sampler_output.num_nans)
self.num_sampled_tokens_np = async_copy_to_np(num_sampled_tokens)
self.prompt_logprobs_dict = {
k: v.to_cpu_nonblocking() if v is not None else None
for k, v in self.model_runner_output.prompt_logprobs_dict.items()
}
self.copy_event.record(copy_stream)
|
get_output
Source code in vllm/v1/worker/gpu/async_utils.py
| def get_output(self) -> ModelRunnerOutput:
self.copy_event.synchronize()
# NOTE(woosuk): The following code is to ensure compatibility with
# the existing model runner.
# Going forward, we should keep the data structures as NumPy arrays
# rather than Python lists.
sampled_token_ids: list[list[int]] = self.sampled_token_ids.tolist()
num_sampled_tokens: list[int] = self.num_sampled_tokens_np.tolist()
for token_ids, num_tokens in zip(sampled_token_ids, num_sampled_tokens):
del token_ids[num_tokens:]
self.model_runner_output.sampled_token_ids = sampled_token_ids
if self.num_nans is not None:
self.model_runner_output.num_nans_in_logits = dict(
zip(self.model_runner_output.req_ids, self.num_nans.tolist())
)
if self.logprobs_tensors is not None:
self.model_runner_output.logprobs = self.logprobs_tensors.tolists()
self.model_runner_output.prompt_logprobs_dict = self.prompt_logprobs_dict
return self.model_runner_output
|