Skip to content

vllm.distributed.kv_transfer.kv_connector.v1.mooncake.mooncake_utils

WorkerAddr module-attribute

WorkerAddr = str

logger module-attribute

logger = init_logger(__name__)

EngineEntry dataclass

Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
@dataclass
class EngineEntry:
    engine_id: EngineId
    # {tp_rank: {pp_rank: worker_addr}}
    worker_addr: dict[int, dict[int, WorkerAddr]]

engine_id instance-attribute

engine_id: EngineId

worker_addr instance-attribute

worker_addr: dict[int, dict[int, WorkerAddr]]

__init__

__init__(
    engine_id: EngineId,
    worker_addr: dict[int, dict[int, WorkerAddr]],
) -> None

MooncakeBootstrapServer

A centralized server running on the global rank 0 prefiller worker. Prefiller workers register their connection info (IP, port, ranks) here.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
class MooncakeBootstrapServer:
    """
    A centralized server running on the global rank 0 prefiller worker.
    Prefiller workers register their connection info (IP, port, ranks) here.
    """

    def __init__(self, vllm_config: VllmConfig, host: str, port: int):
        self.workers: dict[int, EngineEntry] = {}

        self.host = host
        self.port = port
        self.app = FastAPI()
        self._register_routes()
        self.server_thread: threading.Thread | None = None
        self.server: uvicorn.Server | None = None

    def __del__(self):
        self.shutdown()

    def _register_routes(self):
        # All methods are async. No need to use lock to protect data.
        self.app.post("/register")(self.register_worker)
        self.app.get("/query", response_model=dict[int, EngineEntry])(self.query)

    def start(self):
        if self.server_thread:
            return

        config = uvicorn.Config(app=self.app, host=self.host, port=self.port)
        self.server = uvicorn.Server(config=config)
        self.server_thread = threading.Thread(
            target=self.server.run, name="mooncake_bootstrap_server", daemon=True
        )
        self.server_thread.start()
        while not self.server.started:
            time.sleep(0.1)  # Wait for the server to start
        logger.info("Mooncake Bootstrap Server started at %s:%d", self.host, self.port)

    def shutdown(self):
        if self.server_thread is None or self.server is None or not self.server.started:
            return

        self.server.should_exit = True
        self.server_thread.join()
        logger.info("Mooncake Bootstrap Server stopped.")

    async def register_worker(self, payload: RegisterWorkerPayload):
        """Handles registration of a prefiller worker."""
        if payload.dp_rank not in self.workers:
            self.workers[payload.dp_rank] = EngineEntry(
                engine_id=payload.engine_id,
                worker_addr={},
            )

        dp_entry = self.workers[payload.dp_rank]
        if dp_entry.engine_id != payload.engine_id:
            raise HTTPException(
                status_code=400,
                detail=(
                    f"Engine ID mismatch for dp_rank={payload.dp_rank}: "
                    f"expected {dp_entry.engine_id}, got {payload.engine_id}"
                ),
            )
        if payload.tp_rank not in dp_entry.worker_addr:
            dp_entry.worker_addr[payload.tp_rank] = {}

        tp_entry = dp_entry.worker_addr[payload.tp_rank]
        if payload.pp_rank in tp_entry:
            raise HTTPException(
                status_code=400,
                detail=(
                    f"Worker with dp_rank={payload.dp_rank}, "
                    f"tp_rank={payload.tp_rank}, pp_rank={payload.pp_rank} "
                    f"is already registered at "
                    f"{tp_entry[payload.pp_rank]}, "
                    f"but still want to register at {payload.addr}"
                ),
            )

        tp_entry[payload.pp_rank] = payload.addr
        logger.debug(
            "Registered worker: engine_id=%s, dp_rank=%d, tp_rank=%d, pp_rank=%d at %s",
            payload.engine_id,
            payload.dp_rank,
            payload.tp_rank,
            payload.pp_rank,
            payload.addr,
        )

        return {"status": "ok"}

    async def query(self) -> dict[int, EngineEntry]:
        return self.workers

app instance-attribute

app = FastAPI()

host instance-attribute

host = host

port instance-attribute

port = port

server instance-attribute

server: Server | None = None

server_thread instance-attribute

server_thread: Thread | None = None

workers instance-attribute

workers: dict[int, EngineEntry] = {}

__del__

__del__()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
def __del__(self):
    self.shutdown()

__init__

__init__(vllm_config: VllmConfig, host: str, port: int)
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
def __init__(self, vllm_config: VllmConfig, host: str, port: int):
    self.workers: dict[int, EngineEntry] = {}

    self.host = host
    self.port = port
    self.app = FastAPI()
    self._register_routes()
    self.server_thread: threading.Thread | None = None
    self.server: uvicorn.Server | None = None

_register_routes

_register_routes()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
def _register_routes(self):
    # All methods are async. No need to use lock to protect data.
    self.app.post("/register")(self.register_worker)
    self.app.get("/query", response_model=dict[int, EngineEntry])(self.query)

query async

query() -> dict[int, EngineEntry]
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
async def query(self) -> dict[int, EngineEntry]:
    return self.workers

register_worker async

register_worker(payload: RegisterWorkerPayload)

Handles registration of a prefiller worker.

Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
async def register_worker(self, payload: RegisterWorkerPayload):
    """Handles registration of a prefiller worker."""
    if payload.dp_rank not in self.workers:
        self.workers[payload.dp_rank] = EngineEntry(
            engine_id=payload.engine_id,
            worker_addr={},
        )

    dp_entry = self.workers[payload.dp_rank]
    if dp_entry.engine_id != payload.engine_id:
        raise HTTPException(
            status_code=400,
            detail=(
                f"Engine ID mismatch for dp_rank={payload.dp_rank}: "
                f"expected {dp_entry.engine_id}, got {payload.engine_id}"
            ),
        )
    if payload.tp_rank not in dp_entry.worker_addr:
        dp_entry.worker_addr[payload.tp_rank] = {}

    tp_entry = dp_entry.worker_addr[payload.tp_rank]
    if payload.pp_rank in tp_entry:
        raise HTTPException(
            status_code=400,
            detail=(
                f"Worker with dp_rank={payload.dp_rank}, "
                f"tp_rank={payload.tp_rank}, pp_rank={payload.pp_rank} "
                f"is already registered at "
                f"{tp_entry[payload.pp_rank]}, "
                f"but still want to register at {payload.addr}"
            ),
        )

    tp_entry[payload.pp_rank] = payload.addr
    logger.debug(
        "Registered worker: engine_id=%s, dp_rank=%d, tp_rank=%d, pp_rank=%d at %s",
        payload.engine_id,
        payload.dp_rank,
        payload.tp_rank,
        payload.pp_rank,
        payload.addr,
    )

    return {"status": "ok"}

shutdown

shutdown()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
def shutdown(self):
    if self.server_thread is None or self.server is None or not self.server.started:
        return

    self.server.should_exit = True
    self.server_thread.join()
    logger.info("Mooncake Bootstrap Server stopped.")

start

start()
Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
def start(self):
    if self.server_thread:
        return

    config = uvicorn.Config(app=self.app, host=self.host, port=self.port)
    self.server = uvicorn.Server(config=config)
    self.server_thread = threading.Thread(
        target=self.server.run, name="mooncake_bootstrap_server", daemon=True
    )
    self.server_thread.start()
    while not self.server.started:
        time.sleep(0.1)  # Wait for the server to start
    logger.info("Mooncake Bootstrap Server started at %s:%d", self.host, self.port)

RegisterWorkerPayload

Bases: BaseModel

Source code in vllm/distributed/kv_transfer/kv_connector/v1/mooncake/mooncake_utils.py
class RegisterWorkerPayload(BaseModel):
    engine_id: EngineId
    dp_rank: int
    tp_rank: int
    pp_rank: int
    addr: WorkerAddr

addr instance-attribute

addr: WorkerAddr

dp_rank instance-attribute

dp_rank: int

engine_id instance-attribute

engine_id: EngineId

pp_rank instance-attribute

pp_rank: int

tp_rank instance-attribute

tp_rank: int