Skip to content

vllm.v1.worker.gpu.spec_decode.utils

DraftTokensHandler

Source code in vllm/v1/worker/gpu/spec_decode/utils.py
class DraftTokensHandler:
    def __init__(self, device: torch.device | None = None):
        self.device = device
        self.copy_stream = torch.cuda.Stream(device)
        self.copy_event = torch.cuda.Event()

        self.req_ids: list[str] = []
        self.draft_tokens_np: np.ndarray | None = None

    def set_draft_tokens(
        self, input_batch: InputBatch, draft_tokens: torch.Tensor
    ) -> None:
        if not input_batch.has_structured_output_reqs:
            # No draft token validation needs to be performed by
            # the scheduler for this batch.
            if self.req_ids:
                self.req_ids = []
            self.draft_tokens_np = None
            return

        # For spec decoding + structured outputs, we must transfer the
        # draft tokens back to the scheduler for grammar validation.
        self.req_ids = input_batch.req_ids
        current_stream = torch.cuda.current_stream(self.device)
        self.copy_stream.wait_stream(current_stream)
        with torch.cuda.stream(self.copy_stream):
            self.draft_tokens_np = async_copy_to_np(draft_tokens)
            self.copy_event.record()

    def get_draft_tokens(self) -> DraftTokenIds | None:
        if self.draft_tokens_np is None:
            return None

        self.copy_event.synchronize()
        return DraftTokenIds(self.req_ids, self.draft_tokens_np.tolist())

copy_event instance-attribute

copy_event = Event()

copy_stream instance-attribute

copy_stream = Stream(device)

device instance-attribute

device = device

draft_tokens_np instance-attribute

draft_tokens_np: ndarray | None = None

req_ids instance-attribute

req_ids: list[str] = []

__init__

__init__(device: device | None = None)
Source code in vllm/v1/worker/gpu/spec_decode/utils.py
def __init__(self, device: torch.device | None = None):
    self.device = device
    self.copy_stream = torch.cuda.Stream(device)
    self.copy_event = torch.cuda.Event()

    self.req_ids: list[str] = []
    self.draft_tokens_np: np.ndarray | None = None

get_draft_tokens

get_draft_tokens() -> DraftTokenIds | None
Source code in vllm/v1/worker/gpu/spec_decode/utils.py
def get_draft_tokens(self) -> DraftTokenIds | None:
    if self.draft_tokens_np is None:
        return None

    self.copy_event.synchronize()
    return DraftTokenIds(self.req_ids, self.draft_tokens_np.tolist())

set_draft_tokens

set_draft_tokens(
    input_batch: InputBatch, draft_tokens: Tensor
) -> None
Source code in vllm/v1/worker/gpu/spec_decode/utils.py
def set_draft_tokens(
    self, input_batch: InputBatch, draft_tokens: torch.Tensor
) -> None:
    if not input_batch.has_structured_output_reqs:
        # No draft token validation needs to be performed by
        # the scheduler for this batch.
        if self.req_ids:
            self.req_ids = []
        self.draft_tokens_np = None
        return

    # For spec decoding + structured outputs, we must transfer the
    # draft tokens back to the scheduler for grammar validation.
    self.req_ids = input_batch.req_ids
    current_stream = torch.cuda.current_stream(self.device)
    self.copy_stream.wait_stream(current_stream)
    with torch.cuda.stream(self.copy_stream):
        self.draft_tokens_np = async_copy_to_np(draft_tokens)
        self.copy_event.record()