Skip to content

vllm.v1.worker.xpu_worker

logger module-attribute

logger = init_logger(__name__)

XPUWorker

Bases: Worker

A XPU worker class.

Source code in vllm/v1/worker/xpu_worker.py
class XPUWorker(Worker):
    """A XPU worker class."""

    def __init__(
        self,
        vllm_config: VllmConfig,
        local_rank: int,
        rank: int,
        distributed_init_method: str,
        is_driver_worker: bool = False,
    ):
        super().__init__(
            vllm_config, local_rank, rank, distributed_init_method, is_driver_worker
        )
        device_config = self.device_config
        assert device_config.device_type == "xpu"
        assert current_platform.is_xpu()

        # Torch profiler. Enabled and configured through profiler_config.
        self.profiler: Any | None = None
        profiler_config = vllm_config.profiler_config
        if profiler_config.profiler == "torch":
            worker_name = f"{vllm_config.instance_id}-rank-{self.rank}"
            self.profiler = TorchProfilerWrapper(
                profiler_config,
                worker_name=worker_name,
                local_rank=self.local_rank,
                activities=["CPU", "XPU"],
            )

    def init_device(self):
        device = self.device_config.device
        if (
            isinstance(device, torch.device)
            and device.type == "xpu"
            and current_platform.is_xpu()
        ):
            self.device = torch.device(f"xpu:{self.local_rank}")
            current_platform.set_device(self.device)
            current_platform.check_if_supports_dtype(self.model_config.dtype)
            torch.xpu.empty_cache()
            self.init_gpu_memory = torch.xpu.get_device_properties(
                self.local_rank
            ).total_memory
        else:
            raise RuntimeError(f"Not support device type: {self.device_config.device}")

        ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
        ENV_LOCAL_WORLD_SIZE = os.getenv(
            "LOCAL_WORLD_SIZE", str(self.parallel_config.world_size)
        )
        os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
        os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
        os.environ["LOCAL_RANK"] = str(self.local_rank)

        init_worker_distributed_environment(
            self.vllm_config,
            self.rank,
            self.distributed_init_method,
            self.local_rank,
            current_platform.dist_backend,
        )

        torch.xpu.empty_cache()
        self.init_snapshot = init_snapshot = MemorySnapshot(device=self.device)
        self.requested_memory = request_memory(init_snapshot, self.cache_config)
        logger.debug("worker init memory snapshot: %r", self.init_snapshot)
        logger.debug(
            "worker requested memory: %sGiB", format_gib(self.requested_memory)
        )

        # Set random seed.
        set_random_seed(self.model_config.seed)

        # Initialize workspace manager
        num_ubatches = 2 if self.vllm_config.parallel_config.enable_dbo else 1
        init_workspace_manager(self.device, num_ubatches)

        # Construct the model runner
        self.model_runner = XPUModelRunner(  # type: ignore
            self.vllm_config, self.device
        )

        if self.rank == 0:
            # If usage stat is enabled, collect relevant info.
            report_usage_stats(self.vllm_config)

profiler instance-attribute

profiler: Any | None = None

__init__

__init__(
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
)
Source code in vllm/v1/worker/xpu_worker.py
def __init__(
    self,
    vllm_config: VllmConfig,
    local_rank: int,
    rank: int,
    distributed_init_method: str,
    is_driver_worker: bool = False,
):
    super().__init__(
        vllm_config, local_rank, rank, distributed_init_method, is_driver_worker
    )
    device_config = self.device_config
    assert device_config.device_type == "xpu"
    assert current_platform.is_xpu()

    # Torch profiler. Enabled and configured through profiler_config.
    self.profiler: Any | None = None
    profiler_config = vllm_config.profiler_config
    if profiler_config.profiler == "torch":
        worker_name = f"{vllm_config.instance_id}-rank-{self.rank}"
        self.profiler = TorchProfilerWrapper(
            profiler_config,
            worker_name=worker_name,
            local_rank=self.local_rank,
            activities=["CPU", "XPU"],
        )

init_device

init_device()
Source code in vllm/v1/worker/xpu_worker.py
def init_device(self):
    device = self.device_config.device
    if (
        isinstance(device, torch.device)
        and device.type == "xpu"
        and current_platform.is_xpu()
    ):
        self.device = torch.device(f"xpu:{self.local_rank}")
        current_platform.set_device(self.device)
        current_platform.check_if_supports_dtype(self.model_config.dtype)
        torch.xpu.empty_cache()
        self.init_gpu_memory = torch.xpu.get_device_properties(
            self.local_rank
        ).total_memory
    else:
        raise RuntimeError(f"Not support device type: {self.device_config.device}")

    ENV_CCL_ATL_TRANSPORT = os.getenv("CCL_ATL_TRANSPORT", "ofi")
    ENV_LOCAL_WORLD_SIZE = os.getenv(
        "LOCAL_WORLD_SIZE", str(self.parallel_config.world_size)
    )
    os.environ["CCL_ATL_TRANSPORT"] = ENV_CCL_ATL_TRANSPORT
    os.environ["LOCAL_WORLD_SIZE"] = ENV_LOCAL_WORLD_SIZE
    os.environ["LOCAL_RANK"] = str(self.local_rank)

    init_worker_distributed_environment(
        self.vllm_config,
        self.rank,
        self.distributed_init_method,
        self.local_rank,
        current_platform.dist_backend,
    )

    torch.xpu.empty_cache()
    self.init_snapshot = init_snapshot = MemorySnapshot(device=self.device)
    self.requested_memory = request_memory(init_snapshot, self.cache_config)
    logger.debug("worker init memory snapshot: %r", self.init_snapshot)
    logger.debug(
        "worker requested memory: %sGiB", format_gib(self.requested_memory)
    )

    # Set random seed.
    set_random_seed(self.model_config.seed)

    # Initialize workspace manager
    num_ubatches = 2 if self.vllm_config.parallel_config.enable_dbo else 1
    init_workspace_manager(self.device, num_ubatches)

    # Construct the model runner
    self.model_runner = XPUModelRunner(  # type: ignore
        self.vllm_config, self.device
    )

    if self.rank == 0:
        # If usage stat is enabled, collect relevant info.
        report_usage_stats(self.vllm_config)