Skip to content

vllm.v1.engine.async_llm

logger module-attribute

logger = init_logger(__name__)

AsyncLLM

Bases: EngineClient

Source code in vllm/v1/engine/async_llm.py
  67
  68
  69
  70
  71
  72
  73
  74
  75
  76
  77
  78
  79
  80
  81
  82
  83
  84
  85
  86
  87
  88
  89
  90
  91
  92
  93
  94
  95
  96
  97
  98
  99
 100
 101
 102
 103
 104
 105
 106
 107
 108
 109
 110
 111
 112
 113
 114
 115
 116
 117
 118
 119
 120
 121
 122
 123
 124
 125
 126
 127
 128
 129
 130
 131
 132
 133
 134
 135
 136
 137
 138
 139
 140
 141
 142
 143
 144
 145
 146
 147
 148
 149
 150
 151
 152
 153
 154
 155
 156
 157
 158
 159
 160
 161
 162
 163
 164
 165
 166
 167
 168
 169
 170
 171
 172
 173
 174
 175
 176
 177
 178
 179
 180
 181
 182
 183
 184
 185
 186
 187
 188
 189
 190
 191
 192
 193
 194
 195
 196
 197
 198
 199
 200
 201
 202
 203
 204
 205
 206
 207
 208
 209
 210
 211
 212
 213
 214
 215
 216
 217
 218
 219
 220
 221
 222
 223
 224
 225
 226
 227
 228
 229
 230
 231
 232
 233
 234
 235
 236
 237
 238
 239
 240
 241
 242
 243
 244
 245
 246
 247
 248
 249
 250
 251
 252
 253
 254
 255
 256
 257
 258
 259
 260
 261
 262
 263
 264
 265
 266
 267
 268
 269
 270
 271
 272
 273
 274
 275
 276
 277
 278
 279
 280
 281
 282
 283
 284
 285
 286
 287
 288
 289
 290
 291
 292
 293
 294
 295
 296
 297
 298
 299
 300
 301
 302
 303
 304
 305
 306
 307
 308
 309
 310
 311
 312
 313
 314
 315
 316
 317
 318
 319
 320
 321
 322
 323
 324
 325
 326
 327
 328
 329
 330
 331
 332
 333
 334
 335
 336
 337
 338
 339
 340
 341
 342
 343
 344
 345
 346
 347
 348
 349
 350
 351
 352
 353
 354
 355
 356
 357
 358
 359
 360
 361
 362
 363
 364
 365
 366
 367
 368
 369
 370
 371
 372
 373
 374
 375
 376
 377
 378
 379
 380
 381
 382
 383
 384
 385
 386
 387
 388
 389
 390
 391
 392
 393
 394
 395
 396
 397
 398
 399
 400
 401
 402
 403
 404
 405
 406
 407
 408
 409
 410
 411
 412
 413
 414
 415
 416
 417
 418
 419
 420
 421
 422
 423
 424
 425
 426
 427
 428
 429
 430
 431
 432
 433
 434
 435
 436
 437
 438
 439
 440
 441
 442
 443
 444
 445
 446
 447
 448
 449
 450
 451
 452
 453
 454
 455
 456
 457
 458
 459
 460
 461
 462
 463
 464
 465
 466
 467
 468
 469
 470
 471
 472
 473
 474
 475
 476
 477
 478
 479
 480
 481
 482
 483
 484
 485
 486
 487
 488
 489
 490
 491
 492
 493
 494
 495
 496
 497
 498
 499
 500
 501
 502
 503
 504
 505
 506
 507
 508
 509
 510
 511
 512
 513
 514
 515
 516
 517
 518
 519
 520
 521
 522
 523
 524
 525
 526
 527
 528
 529
 530
 531
 532
 533
 534
 535
 536
 537
 538
 539
 540
 541
 542
 543
 544
 545
 546
 547
 548
 549
 550
 551
 552
 553
 554
 555
 556
 557
 558
 559
 560
 561
 562
 563
 564
 565
 566
 567
 568
 569
 570
 571
 572
 573
 574
 575
 576
 577
 578
 579
 580
 581
 582
 583
 584
 585
 586
 587
 588
 589
 590
 591
 592
 593
 594
 595
 596
 597
 598
 599
 600
 601
 602
 603
 604
 605
 606
 607
 608
 609
 610
 611
 612
 613
 614
 615
 616
 617
 618
 619
 620
 621
 622
 623
 624
 625
 626
 627
 628
 629
 630
 631
 632
 633
 634
 635
 636
 637
 638
 639
 640
 641
 642
 643
 644
 645
 646
 647
 648
 649
 650
 651
 652
 653
 654
 655
 656
 657
 658
 659
 660
 661
 662
 663
 664
 665
 666
 667
 668
 669
 670
 671
 672
 673
 674
 675
 676
 677
 678
 679
 680
 681
 682
 683
 684
 685
 686
 687
 688
 689
 690
 691
 692
 693
 694
 695
 696
 697
 698
 699
 700
 701
 702
 703
 704
 705
 706
 707
 708
 709
 710
 711
 712
 713
 714
 715
 716
 717
 718
 719
 720
 721
 722
 723
 724
 725
 726
 727
 728
 729
 730
 731
 732
 733
 734
 735
 736
 737
 738
 739
 740
 741
 742
 743
 744
 745
 746
 747
 748
 749
 750
 751
 752
 753
 754
 755
 756
 757
 758
 759
 760
 761
 762
 763
 764
 765
 766
 767
 768
 769
 770
 771
 772
 773
 774
 775
 776
 777
 778
 779
 780
 781
 782
 783
 784
 785
 786
 787
 788
 789
 790
 791
 792
 793
 794
 795
 796
 797
 798
 799
 800
 801
 802
 803
 804
 805
 806
 807
 808
 809
 810
 811
 812
 813
 814
 815
 816
 817
 818
 819
 820
 821
 822
 823
 824
 825
 826
 827
 828
 829
 830
 831
 832
 833
 834
 835
 836
 837
 838
 839
 840
 841
 842
 843
 844
 845
 846
 847
 848
 849
 850
 851
 852
 853
 854
 855
 856
 857
 858
 859
 860
 861
 862
 863
 864
 865
 866
 867
 868
 869
 870
 871
 872
 873
 874
 875
 876
 877
 878
 879
 880
 881
 882
 883
 884
 885
 886
 887
 888
 889
 890
 891
 892
 893
 894
 895
 896
 897
 898
 899
 900
 901
 902
 903
 904
 905
 906
 907
 908
 909
 910
 911
 912
 913
 914
 915
 916
 917
 918
 919
 920
 921
 922
 923
 924
 925
 926
 927
 928
 929
 930
 931
 932
 933
 934
 935
 936
 937
 938
 939
 940
 941
 942
 943
 944
 945
 946
 947
 948
 949
 950
 951
 952
 953
 954
 955
 956
 957
 958
 959
 960
 961
 962
 963
 964
 965
 966
 967
 968
 969
 970
 971
 972
 973
 974
 975
 976
 977
 978
 979
 980
 981
 982
 983
 984
 985
 986
 987
 988
 989
 990
 991
 992
 993
 994
 995
 996
 997
 998
 999
1000
1001
1002
1003
1004
1005
1006
1007
1008
class AsyncLLM(EngineClient):
    def __init__(
        self,
        vllm_config: VllmConfig,
        executor_class: type[Executor],
        log_stats: bool,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
        mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
        use_cached_outputs: bool = False,
        log_requests: bool = True,
        start_engine_loop: bool = True,
        stat_loggers: list[StatLoggerFactory] | None = None,
        aggregate_engine_logging: bool = False,
        client_addresses: dict[str, str] | None = None,
        client_count: int = 1,
        client_index: int = 0,
    ) -> None:
        """
        Create an AsyncLLM.

        Args:
            vllm_config: global configuration.
            executor_class: an Executor impl, e.g. MultiprocExecutor.
            log_stats: Whether to log stats.
            usage_context: Usage context of the LLM.
            mm_registry: Multi-modal registry.
            use_cached_outputs: Whether to use cached outputs.
            log_requests: Whether to log requests.
            start_engine_loop: Whether to start the engine loop.
            stat_loggers: customized stat loggers for the engine.
                If not provided, default stat loggers will be used.
                PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE
                IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

        Returns:
            None
        """
        # Ensure we can serialize custom transformer configs
        maybe_register_config_serialize_by_value()

        self.model_config = vllm_config.model_config
        self.vllm_config = vllm_config
        self.observability_config = vllm_config.observability_config
        self.log_requests = log_requests

        custom_stat_loggers = list(stat_loggers or [])
        custom_stat_loggers.extend(load_stat_logger_plugin_factories())

        has_custom_loggers = bool(custom_stat_loggers)
        self.log_stats = log_stats or has_custom_loggers
        if not log_stats and has_custom_loggers:
            logger.info(
                "AsyncLLM created with log_stats=False, "
                "but custom stat loggers were found; "
                "enabling logging without default stat loggers."
            )

        self.input_processor = InputProcessor(self.vllm_config)
        self.io_processor = get_io_processor(
            self.vllm_config,
            self.model_config.io_processor_plugin,
        )

        # OutputProcessor (converts EngineCoreOutputs --> RequestOutput).
        self.output_processor = OutputProcessor(
            self.tokenizer,
            log_stats=self.log_stats,
            stream_interval=self.vllm_config.scheduler_config.stream_interval,
        )
        endpoint = self.observability_config.otlp_traces_endpoint
        if endpoint is not None:
            tracer = init_tracer("vllm.llm_engine", endpoint)
            self.output_processor.tracer = tracer

        # EngineCore (starts the engine in background process).
        self.engine_core = EngineCoreClient.make_async_mp_client(
            vllm_config=vllm_config,
            executor_class=executor_class,
            log_stats=self.log_stats,
            client_addresses=client_addresses,
            client_count=client_count,
            client_index=client_index,
        )

        # Loggers.
        self.logger_manager: StatLoggerManager | None = None
        if self.log_stats:
            self.logger_manager = StatLoggerManager(
                vllm_config=vllm_config,
                engine_idxs=self.engine_core.engine_ranks_managed,
                custom_stat_loggers=custom_stat_loggers,
                enable_default_loggers=log_stats,
                client_count=client_count,
                aggregate_engine_logging=aggregate_engine_logging,
            )
            self.logger_manager.log_engine_initialized()

        # Pause / resume state for async RL workflows.
        self._pause_cond = asyncio.Condition()
        self._paused = False

        self.output_handler: asyncio.Task | None = None
        try:
            # Start output handler eagerly if we are in the asyncio eventloop.
            asyncio.get_running_loop()
            self._run_output_handler()
        except RuntimeError:
            pass

        if (
            vllm_config.profiler_config.profiler == "torch"
            and not vllm_config.profiler_config.ignore_frontend
        ):
            profiler_dir = vllm_config.profiler_config.torch_profiler_dir
            logger.info(
                "Torch profiler enabled. AsyncLLM CPU traces will be collected under %s",  # noqa: E501
                profiler_dir,
            )
            worker_name = f"{socket.gethostname()}_{os.getpid()}.async_llm"
            self.profiler = torch.profiler.profile(
                activities=[
                    torch.profiler.ProfilerActivity.CPU,
                ],
                with_stack=vllm_config.profiler_config.torch_profiler_with_stack,
                on_trace_ready=torch.profiler.tensorboard_trace_handler(
                    profiler_dir,
                    worker_name=worker_name,
                    use_gzip=vllm_config.profiler_config.torch_profiler_use_gzip,
                ),
            )
        else:
            self.profiler = None

    @classmethod
    def from_vllm_config(
        cls,
        vllm_config: VllmConfig,
        start_engine_loop: bool = True,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
        stat_loggers: list[StatLoggerFactory] | None = None,
        enable_log_requests: bool = False,
        aggregate_engine_logging: bool = False,
        disable_log_stats: bool = False,
        client_addresses: dict[str, str] | None = None,
        client_count: int = 1,
        client_index: int = 0,
    ) -> "AsyncLLM":
        # Create the LLMEngine.
        return cls(
            vllm_config=vllm_config,
            executor_class=Executor.get_class(vllm_config),
            start_engine_loop=start_engine_loop,
            stat_loggers=stat_loggers,
            log_requests=enable_log_requests,
            log_stats=not disable_log_stats,
            aggregate_engine_logging=aggregate_engine_logging,
            usage_context=usage_context,
            client_addresses=client_addresses,
            client_count=client_count,
            client_index=client_index,
        )

    @classmethod
    def from_engine_args(
        cls,
        engine_args: AsyncEngineArgs,
        start_engine_loop: bool = True,
        usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
        stat_loggers: list[StatLoggerFactory] | None = None,
    ) -> "AsyncLLM":
        """Create an AsyncLLM from the EngineArgs."""

        # Create the engine configs.
        vllm_config = engine_args.create_engine_config(usage_context)
        executor_class = Executor.get_class(vllm_config)

        # Create the AsyncLLM.
        return cls(
            vllm_config=vllm_config,
            executor_class=executor_class,
            log_requests=engine_args.enable_log_requests,
            log_stats=not engine_args.disable_log_stats,
            start_engine_loop=start_engine_loop,
            usage_context=usage_context,
            stat_loggers=stat_loggers,
        )

    def __del__(self):
        self.shutdown()

    def shutdown(self):
        """Shutdown, cleaning up the background proc and IPC."""

        shutdown_prometheus()

        if engine_core := getattr(self, "engine_core", None):
            engine_core.shutdown()

        if input_processor := getattr(self, "input_processor", None):
            input_processor.close()

        handler = getattr(self, "output_handler", None)
        if handler is not None:
            cancel_task_threadsafe(handler)

    async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
        return await self.engine_core.get_supported_tasks_async()

    async def add_request(
        self,
        request_id: str,
        prompt: EngineCoreRequest | PromptType | AsyncGenerator[StreamingInput, None],
        params: SamplingParams | PoolingParams,
        arrival_time: float | None = None,
        lora_request: LoRARequest | None = None,
        tokenization_kwargs: dict[str, Any] | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        data_parallel_rank: int | None = None,
        prompt_text: str | None = None,
    ) -> RequestOutputCollector:
        """Add new request to the AsyncLLM."""

        if self.errored:
            raise EngineDeadError()

        is_pooling = isinstance(params, PoolingParams)

        if (
            self.vllm_config.cache_config.kv_sharing_fast_prefill
            and not is_pooling
            and params.prompt_logprobs
        ):
            raise ValueError(
                "--kv-sharing-fast-prefill produces incorrect logprobs for "
                "prompt tokens, please disable it when the requests need "
                "prompt logprobs"
            )

        if params.truncate_prompt_tokens is not None:
            params_type = type(params).__name__
            warnings.warn(
                f"The `truncate_prompt_tokens` parameter in `{params_type}` "
                "is deprecated and will be removed in v0.16. "
                "Please pass it via `tokenization_kwargs` instead.",
                DeprecationWarning,
                stacklevel=2,
            )

            tokenization_kwargs = merge_kwargs(
                tokenization_kwargs,
                dict(truncate_prompt_tokens=params.truncate_prompt_tokens),
            )

        if isinstance(prompt, AsyncGenerator):
            # Streaming input case.
            return await self._add_streaming_input_request(
                request_id,
                prompt,
                params,
                arrival_time,
                lora_request,
                tokenization_kwargs,
                trace_headers,
                priority,
                data_parallel_rank,
            )

        # Convert Input --> Request.
        if isinstance(prompt, EngineCoreRequest):
            request = prompt
            if request_id != request.request_id:
                logger.warning_once(
                    "AsyncLLM.add_request() was passed a request_id parameter that "
                    "does not match the EngineCoreRequest.request_id attribute. The "
                    "latter will be used, and the former will be ignored."
                )
        else:
            if prompt_text is not None:
                raise ValueError(
                    "should only provide prompt_text with EngineCoreRequest"
                )
            request = self.input_processor.process_inputs(
                request_id,
                prompt,
                params,
                arrival_time=arrival_time,
                lora_request=lora_request,
                tokenization_kwargs=tokenization_kwargs,
                trace_headers=trace_headers,
                priority=priority,
                data_parallel_rank=data_parallel_rank,
            )
            prompt_text = get_prompt_text(prompt)

        self.input_processor.assign_request_id(request)

        # We start the output_handler on the first call to add_request() so
        # we can call __init__ before the event loop, which enables us
        # to handle startup failure gracefully in the OpenAI server.
        self._run_output_handler()

        # Respect pause state before accepting new requests.
        async with self._pause_cond:
            await self._pause_cond.wait_for(lambda: not self._paused)

        # Create a new output collector for the request.
        queue = RequestOutputCollector(params.output_kind, request.request_id)

        # Use cloned params that may have been updated in process_inputs()
        params = request.params

        if is_pooling or params.n == 1:
            await self._add_request(request, prompt_text, None, 0, queue)
            return queue

        parent_params = params
        assert isinstance(parent_params, SamplingParams)

        # Fan out child requests (for n>1).
        parent_request = ParentRequest(request)
        for idx in range(parent_params.n):
            request_id, child_params = parent_request.get_child_info(idx)
            child_request = request if idx == parent_params.n - 1 else copy(request)
            child_request.request_id = request_id
            child_request.sampling_params = child_params
            await self._add_request(
                child_request, prompt_text, parent_request, idx, queue
            )
        return queue

    async def _add_request(
        self,
        request: EngineCoreRequest,
        prompt: str | None,
        parent_req: ParentRequest | None,
        index: int,
        queue: RequestOutputCollector,
    ):
        # Add the request to OutputProcessor (this process).
        self.output_processor.add_request(request, prompt, parent_req, index, queue)

        # Add the EngineCoreRequest to EngineCore (separate process).
        await self.engine_core.add_request_async(request)

        if self.log_requests:
            logger.info("Added request %s.", request.request_id)

    async def _add_streaming_input_request(
        self,
        request_id: str,
        input_stream: AsyncGenerator[StreamingInput, None],
        sampling_params: SamplingParams | PoolingParams,
        arrival_time: float | None = None,
        lora_request: LoRARequest | None = None,
        tokenization_kwargs: dict[str, Any] | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        data_parallel_rank: int | None = None,
    ) -> RequestOutputCollector:
        self._validate_streaming_input_sampling_params(sampling_params)

        inputs = dict(
            arrival_time=arrival_time,
            lora_request=lora_request,
            tokenization_kwargs=tokenization_kwargs,
            trace_headers=trace_headers,
            priority=priority,
            data_parallel_rank=data_parallel_rank,
        )

        if not sampling_params.skip_clone:
            sampling_params = sampling_params.clone()
            sampling_params.skip_clone = True

        # Create request for validation, also used as the finished signal
        # once the input stream is closed.
        final_req = self.input_processor.process_inputs(
            request_id=request_id,
            prompt=TokensPrompt(prompt_token_ids=[0]),
            params=sampling_params,
            **inputs,  # type: ignore[arg-type]
        )
        self.input_processor.assign_request_id(final_req)
        internal_req_id = final_req.request_id

        queue = RequestOutputCollector(sampling_params.output_kind, internal_req_id)

        async def handle_inputs():
            cancelled = False
            try:
                async for input_chunk in input_stream:
                    sp = input_chunk.sampling_params
                    if sp:
                        self._validate_streaming_input_sampling_params(sp)
                    else:
                        sp = sampling_params
                    req = self.input_processor.process_inputs(
                        request_id=internal_req_id,
                        prompt=input_chunk.prompt,
                        params=sp,
                        resumable=True,
                        **inputs,  # type: ignore[arg-type]
                    )
                    req.external_req_id = request_id
                    if req.prompt_embeds is not None:
                        raise ValueError(
                            "prompt_embeds not supported for streaming inputs"
                        )
                    prompt_text = get_prompt_text(input_chunk.prompt)
                    await self._add_request(req, prompt_text, None, 0, queue)
            except (asyncio.CancelledError, GeneratorExit):
                cancelled = True
            except Exception as error:
                # Wrap in InputStreamError so generate() can propagate it
                # without wrapping in EngineGenerateError.
                queue.put(InputStreamError(error))
            finally:
                queue._input_stream_task = None
                if not cancelled:
                    # Send empty final request to indicate that inputs have
                    # finished. Don't send if cancelled (session was aborted).
                    await self._add_request(final_req, None, None, 0, queue)

        # Ensure output handler is running.
        self._run_output_handler()

        queue._input_stream_task = asyncio.create_task(handle_inputs())
        return queue

    @staticmethod
    def _validate_streaming_input_sampling_params(
        params: SamplingParams | PoolingParams,
    ):
        if (
            not isinstance(params, SamplingParams)
            or params.n > 1
            or params.output_kind == RequestOutputKind.FINAL_ONLY
            or params.stop
        ):
            raise ValueError(
                "Input streaming not currently supported "
                "for pooling models, n > 1, request_kind = FINAL_ONLY "
                "or with stop strings."
            )

    # TODO: we should support multiple prompts in one call, as you
    # can do with LLM.generate. So that for multi-prompt completion
    # requests we don't need to send multiple messages to core proc,
    # and so we don't need multiple streams which then get
    # re-multiplexed in the API server anyhow.
    async def generate(
        self,
        prompt: EngineCoreRequest | PromptType | AsyncGenerator[StreamingInput, None],
        sampling_params: SamplingParams,
        request_id: str,
        *,
        prompt_text: str | None = None,
        lora_request: LoRARequest | None = None,
        tokenization_kwargs: dict[str, Any] | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        data_parallel_rank: int | None = None,
    ) -> AsyncGenerator[RequestOutput, None]:
        """
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the Detokenizer.
            * 4) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        """

        q: RequestOutputCollector | None = None
        try:
            q = await self.add_request(
                request_id,
                prompt,
                sampling_params,
                lora_request=lora_request,
                tokenization_kwargs=tokenization_kwargs,
                trace_headers=trace_headers,
                priority=priority,
                data_parallel_rank=data_parallel_rank,
                prompt_text=prompt_text,
            )

            # The output_handler task pushes items into the queue.
            # This task pulls from the queue and yields to caller.
            finished = False
            while not finished:
                # Note: drain queue without await if possible (avoids
                # task switching under load which helps performance).
                out = q.get_nowait() or await q.get()

                # Note: both OutputProcessor and EngineCore handle their
                # own request cleanup based on finished.
                assert isinstance(out, RequestOutput)
                finished = out.finished
                if out is not STREAM_FINISHED:
                    yield out

        # If the request is disconnected by the client, generate()
        # is cancelled or the generator is garbage collected. So,
        # we abort the request if we end up here.
        except (asyncio.CancelledError, GeneratorExit):
            if q is not None:
                await self.abort(q.request_id, internal=True)
            if self.log_requests:
                logger.info("Request %s aborted.", request_id)
            raise

        # Engine is dead. Do not abort since we shut down.
        except EngineDeadError:
            if self.log_requests:
                logger.info("Request %s failed (engine dead).", request_id)
            raise

        # Request validation error.
        except ValueError as e:
            if self.log_requests:
                logger.info("Request %s failed (bad request): %s.", request_id, e)
            raise

        # Error from input stream generator - propagate directly.
        except InputStreamError as e:
            if q is not None:
                await self.abort(q.request_id, internal=True)
            if self.log_requests:
                logger.info("Request %s failed (input error): %s.", request_id, e)
            raise e.cause from e

        # Unexpected error in the generate() task (possibly recoverable).
        except Exception as e:
            if q is not None:
                await self.abort(q.request_id, internal=True)
            if self.log_requests:
                try:
                    s = f"{e.__class__.__name__}: {e}"
                except Exception as e2:
                    s = (
                        f"{e.__class__.__name__}: "
                        "error during printing an exception of class"
                        + e2.__class__.__name__
                    )
                logger.info("Request %s failed due to %s.", request_id, s)
            raise EngineGenerateError() from e
        finally:
            if q is not None:
                q.close()

    def _run_output_handler(self):
        """Background loop: pulls from EngineCore and pushes to AsyncStreams."""

        if self.output_handler is not None:
            return

        # Ensure that the task doesn't have a circular ref back to the AsyncLLM
        # object, or else it won't be garbage collected and cleaned up properly.
        engine_core = self.engine_core
        output_processor = self.output_processor
        log_stats = self.log_stats
        logger_manager = self.logger_manager
        input_processor = self.input_processor
        chunk_size = envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE

        async def output_handler():
            try:
                while True:
                    # 1) Pull EngineCoreOutputs from the EngineCore.
                    outputs = await engine_core.get_output_async()
                    num_outputs = len(outputs.outputs)

                    iteration_stats = (
                        IterationStats() if (log_stats and num_outputs) else None
                    )

                    # Split outputs into chunks of at most
                    # VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
                    # event loop for too long.
                    engine_core_outputs = outputs.outputs
                    for start in range(0, num_outputs, chunk_size):
                        end = start + chunk_size
                        outputs_slice = engine_core_outputs[start:end]
                        # 2) Process EngineCoreOutputs.
                        processed_outputs = output_processor.process_outputs(
                            outputs_slice, outputs.timestamp, iteration_stats
                        )
                        # NOTE: RequestOutputs are pushed to their queues.
                        assert not processed_outputs.request_outputs

                        # Allow other asyncio tasks to run between chunks
                        if end < num_outputs:
                            await asyncio.sleep(0)

                        # 3) Abort any reqs that finished due to stop strings.
                        if processed_outputs.reqs_to_abort:
                            await engine_core.abort_requests_async(
                                processed_outputs.reqs_to_abort
                            )

                    output_processor.update_scheduler_stats(outputs.scheduler_stats)

                    # 4) Logging.
                    # TODO(rob): make into a coroutine and launch it in
                    # background thread once Prometheus overhead is non-trivial.
                    if logger_manager:
                        logger_manager.record(
                            engine_idx=outputs.engine_index,
                            scheduler_stats=outputs.scheduler_stats,
                            iteration_stats=iteration_stats,
                            mm_cache_stats=input_processor.stat_mm_cache(),
                        )
            except Exception as e:
                logger.exception("AsyncLLM output_handler failed.")
                output_processor.propagate_error(e)

        self.output_handler = asyncio.create_task(output_handler())

    async def abort(
        self, request_id: str | Iterable[str], internal: bool = False
    ) -> None:
        """Abort RequestId in OutputProcessor and EngineCore."""

        request_ids = (
            (request_id,) if isinstance(request_id, str) else as_list(request_id)
        )
        all_request_ids = self.output_processor.abort_requests(request_ids, internal)
        await self.engine_core.abort_requests_async(all_request_ids)

        if self.log_requests:
            logger.info("Aborted request(s) %s.", ",".join(request_ids))

    async def pause_generation(
        self,
        *,
        wait_for_inflight_requests: bool = False,
        clear_cache: bool = True,
    ) -> None:
        """
        Pause generation to allow model weight updates.

        New generation/encoding requests are blocked until resume.

        Args:
            wait_for_inflight_requests: When ``True`` waits for in-flight
                requests to finish before pausing. When ``False`` (default),
                immediately aborts any in-flight requests.
            clear_cache: Whether to clear KV cache and prefix cache after
                draining. Set to ``False`` to preserve cache for faster resume.
                Default is ``True`` (clear caches).
        """

        async with self._pause_cond:
            if self._paused:
                return
            self._paused = True

        if not wait_for_inflight_requests:
            request_ids = list(self.output_processor.request_states.keys())
            if request_ids:
                await self.abort(request_ids, internal=True)

        # Wait for running requests to drain before clearing cache.
        if self.output_processor.has_unfinished_requests():
            await self.output_processor.wait_for_requests_to_drain()

        # Clear cache
        if clear_cache:
            await self.reset_prefix_cache()
            await self.reset_mm_cache()
            await self.reset_encoder_cache()

    async def resume_generation(self) -> None:
        """Resume generation after :meth:`pause_generation`."""

        async with self._pause_cond:
            self._paused = False
            self._pause_cond.notify_all()  # Wake up all waiting requests

    async def is_paused(self) -> bool:
        """Return whether the engine is currently paused."""

        async with self._pause_cond:
            return self._paused

    async def encode(
        self,
        prompt: PromptType,
        pooling_params: PoolingParams,
        request_id: str,
        lora_request: LoRARequest | None = None,
        trace_headers: Mapping[str, str] | None = None,
        priority: int = 0,
        tokenization_kwargs: dict[str, Any] | None = None,
    ) -> AsyncGenerator[PoolingRequestOutput, None]:
        """
        Main function called by the API server to kick off a request
            * 1) Making an AsyncStream corresponding to the Request.
            * 2) Processing the Input.
            * 3) Adding the Request to the EngineCore (separate process).

        A separate output_handler loop runs in a background AsyncIO task,
        pulling outputs from EngineCore and putting them into the
        per-request AsyncStream.

        The caller of generate() iterates the returned AsyncGenerator,
        returning the RequestOutput back to the caller.
        """

        q: RequestOutputCollector | None = None
        try:
            q = await self.add_request(
                request_id,
                prompt,
                pooling_params,
                lora_request=lora_request,
                tokenization_kwargs=tokenization_kwargs,
                trace_headers=trace_headers,
                priority=priority,
            )

            # The output_handler task pushes items into the queue.
            # This task pulls from the queue and yields to caller.
            finished = False
            while not finished:
                # Note: drain queue without await if possible (avoids
                # task switching under load which helps performance).
                out = q.get_nowait() or await q.get()
                assert isinstance(out, PoolingRequestOutput)
                # Note: both OutputProcessor and EngineCore handle their
                # own request cleanup based on finished.
                finished = out.finished
                yield out

        # If the request is disconnected by the client, generate()
        # is cancelled. So, we abort the request if we end up here.
        except asyncio.CancelledError:
            if q is not None:
                await self.abort(q.request_id, internal=True)
            if self.log_requests:
                logger.info("Request %s aborted.", request_id)
            raise

        # Engine is dead. Do not abort since we shut down.
        except EngineDeadError:
            if self.log_requests:
                logger.info("Request %s failed (engine dead).", request_id)
            raise

        # Request validation error.
        except ValueError:
            if self.log_requests:
                logger.info("Request %s failed (bad request).", request_id)
            raise

        # Unexpected error in the generate() task (possibly recoverable).
        except Exception as e:
            if q is not None:
                await self.abort(q.request_id, internal=True)
            if self.log_requests:
                logger.info("Request %s failed.", request_id)
            raise EngineGenerateError() from e
        finally:
            if q is not None:
                q.close()

    @property
    def tokenizer(self) -> TokenizerLike | None:
        return self.input_processor.tokenizer

    def get_tokenizer(self) -> TokenizerLike:
        return self.input_processor.get_tokenizer()

    @property
    def renderer(self) -> BaseRenderer:
        return self.input_processor.renderer

    async def is_tracing_enabled(self) -> bool:
        return self.observability_config.otlp_traces_endpoint is not None  # type: ignore

    async def do_log_stats(self) -> None:
        if self.logger_manager:
            self.logger_manager.log()

    async def check_health(self) -> None:
        logger.debug("Called check_health.")
        if self.errored:
            raise self.dead_error

    async def start_profile(self) -> None:
        coros = [self.engine_core.profile_async(True)]
        if self.profiler is not None:
            coros.append(asyncio.to_thread(self.profiler.start))
        await asyncio.gather(*coros)

    async def stop_profile(self) -> None:
        coros = [self.engine_core.profile_async(False)]
        if self.profiler is not None:
            coros.append(asyncio.to_thread(self.profiler.stop))
        await asyncio.gather(*coros)

    async def reset_mm_cache(self) -> None:
        self.input_processor.clear_mm_cache()
        await self.engine_core.reset_mm_cache_async()

    async def reset_prefix_cache(
        self, reset_running_requests: bool = False, reset_connector: bool = False
    ) -> bool:
        return await self.engine_core.reset_prefix_cache_async(
            reset_running_requests, reset_connector
        )

    async def reset_encoder_cache(self) -> None:
        await self.engine_core.reset_encoder_cache_async()

    async def sleep(self, level: int = 1) -> None:
        await self.reset_prefix_cache()
        await self.engine_core.sleep_async(level)

        if self.logger_manager is not None:
            self.logger_manager.record_sleep_state(1, level)

    async def wake_up(self, tags: list[str] | None = None) -> None:
        await self.engine_core.wake_up_async(tags)

        if self.logger_manager is not None:
            self.logger_manager.record_sleep_state(0, 0)

    async def is_sleeping(self) -> bool:
        return await self.engine_core.is_sleeping_async()

    async def add_lora(self, lora_request: LoRARequest) -> bool:
        """Load a new LoRA adapter into the engine for future requests."""
        return await self.engine_core.add_lora_async(lora_request)

    async def remove_lora(self, lora_id: int) -> bool:
        """Remove an already loaded LoRA adapter."""
        return await self.engine_core.remove_lora_async(lora_id)

    async def list_loras(self) -> set[int]:
        """List all registered adapters."""
        return await self.engine_core.list_loras_async()

    async def pin_lora(self, lora_id: int) -> bool:
        """Prevent an adapter from being evicted."""
        return await self.engine_core.pin_lora_async(lora_id)

    async def collective_rpc(
        self,
        method: str,
        timeout: float | None = None,
        args: tuple = (),
        kwargs: dict | None = None,
    ):
        """
        Perform a collective RPC call to the given path.
        """
        return await self.engine_core.collective_rpc_async(
            method, timeout, args, kwargs
        )

    async def wait_for_requests_to_drain(self, drain_timeout: int = 300):
        """Wait for all requests to be drained."""
        start_time = time.time()
        while time.time() - start_time < drain_timeout:
            if not self.engine_core.dp_engines_running():
                logger.info("Engines are idle, requests have been drained")
                return

            logger.info("Engines are still running, waiting for requests to drain...")
            await asyncio.sleep(1)  # Wait 1 second before checking again

        raise TimeoutError(
            f"Timeout reached after {drain_timeout} seconds "
            "waiting for requests to drain."
        )

    async def scale_elastic_ep(
        self, new_data_parallel_size: int, drain_timeout: int = 300
    ):
        """
        Scale up or down the data parallel size by adding or removing
        engine cores.
        Args:
            new_data_parallel_size: The new number of data parallel workers
            drain_timeout:
                Maximum time to wait for requests to drain (seconds)
        """
        old_data_parallel_size = self.vllm_config.parallel_config.data_parallel_size
        if old_data_parallel_size == new_data_parallel_size:
            logger.info(
                "Data parallel size is already %s, skipping scale",
                new_data_parallel_size,
            )
            return
        logger.info(
            "Waiting for requests to drain before scaling up to %s engines...",
            new_data_parallel_size,
        )
        await self.wait_for_requests_to_drain(drain_timeout)
        logger.info(
            "Requests have been drained, proceeding with scale to %s engines",
            new_data_parallel_size,
        )
        await self.engine_core.scale_elastic_ep(new_data_parallel_size)
        self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size

        # recreate stat loggers
        if new_data_parallel_size > old_data_parallel_size and self.log_stats:
            # TODO(rob): fix this after talking with Ray team.
            # This resets all the prometheus metrics since we
            # unregister during initialization. Need to understand
            # the intended behavior here better.
            self.logger_manager = StatLoggerManager(
                vllm_config=self.vllm_config,
                engine_idxs=list(range(new_data_parallel_size)),
                custom_stat_loggers=None,
            )

    @property
    def is_running(self) -> bool:
        # Is None before the loop is started.
        return self.output_handler is None or not self.output_handler.done()

    @property
    def is_stopped(self) -> bool:
        return self.errored

    @property
    def errored(self) -> bool:
        return self.engine_core.resources.engine_dead or not self.is_running

    @property
    def dead_error(self) -> BaseException:
        return EngineDeadError()

_pause_cond instance-attribute

_pause_cond = Condition()

_paused instance-attribute

_paused = False

dead_error property

dead_error: BaseException

engine_core instance-attribute

engine_core = make_async_mp_client(
    vllm_config=vllm_config,
    executor_class=executor_class,
    log_stats=log_stats,
    client_addresses=client_addresses,
    client_count=client_count,
    client_index=client_index,
)

errored property

errored: bool

input_processor instance-attribute

input_processor = InputProcessor(vllm_config)

io_processor instance-attribute

io_processor = get_io_processor(
    vllm_config, io_processor_plugin
)

is_running property

is_running: bool

is_stopped property

is_stopped: bool

log_requests instance-attribute

log_requests = log_requests

log_stats instance-attribute

log_stats = log_stats or has_custom_loggers

logger_manager instance-attribute

logger_manager: StatLoggerManager | None = None

model_config instance-attribute

model_config = model_config

observability_config instance-attribute

observability_config = observability_config

output_handler instance-attribute

output_handler: Task | None = None

output_processor instance-attribute

output_processor = OutputProcessor(
    tokenizer,
    log_stats=log_stats,
    stream_interval=stream_interval,
)

profiler instance-attribute

profiler = profile(
    activities=[CPU],
    with_stack=torch_profiler_with_stack,
    on_trace_ready=tensorboard_trace_handler(
        profiler_dir,
        worker_name=worker_name,
        use_gzip=torch_profiler_use_gzip,
    ),
)

renderer property

renderer: BaseRenderer

tokenizer property

tokenizer: TokenizerLike | None

vllm_config instance-attribute

vllm_config = vllm_config

__del__

__del__()
Source code in vllm/v1/engine/async_llm.py
def __del__(self):
    self.shutdown()

__init__

__init__(
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    usage_context: UsageContext = ENGINE_CONTEXT,
    mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
    use_cached_outputs: bool = False,
    log_requests: bool = True,
    start_engine_loop: bool = True,
    stat_loggers: list[StatLoggerFactory] | None = None,
    aggregate_engine_logging: bool = False,
    client_addresses: dict[str, str] | None = None,
    client_count: int = 1,
    client_index: int = 0,
) -> None

Create an AsyncLLM.

Parameters:

Name Type Description Default
vllm_config VllmConfig

global configuration.

required
executor_class type[Executor]

an Executor impl, e.g. MultiprocExecutor.

required
log_stats bool

Whether to log stats.

required
usage_context UsageContext

Usage context of the LLM.

ENGINE_CONTEXT
mm_registry MultiModalRegistry

Multi-modal registry.

MULTIMODAL_REGISTRY
use_cached_outputs bool

Whether to use cached outputs.

False
log_requests bool

Whether to log requests.

True
start_engine_loop bool

Whether to start the engine loop.

True
stat_loggers list[StatLoggerFactory] | None

customized stat loggers for the engine. If not provided, default stat loggers will be used. PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

None

Returns:

Type Description
None

None

Source code in vllm/v1/engine/async_llm.py
def __init__(
    self,
    vllm_config: VllmConfig,
    executor_class: type[Executor],
    log_stats: bool,
    usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    mm_registry: MultiModalRegistry = MULTIMODAL_REGISTRY,
    use_cached_outputs: bool = False,
    log_requests: bool = True,
    start_engine_loop: bool = True,
    stat_loggers: list[StatLoggerFactory] | None = None,
    aggregate_engine_logging: bool = False,
    client_addresses: dict[str, str] | None = None,
    client_count: int = 1,
    client_index: int = 0,
) -> None:
    """
    Create an AsyncLLM.

    Args:
        vllm_config: global configuration.
        executor_class: an Executor impl, e.g. MultiprocExecutor.
        log_stats: Whether to log stats.
        usage_context: Usage context of the LLM.
        mm_registry: Multi-modal registry.
        use_cached_outputs: Whether to use cached outputs.
        log_requests: Whether to log requests.
        start_engine_loop: Whether to start the engine loop.
        stat_loggers: customized stat loggers for the engine.
            If not provided, default stat loggers will be used.
            PLEASE BE AWARE THAT STAT LOGGER IS NOT STABLE
            IN V1, AND ITS BASE CLASS INTERFACE MIGHT CHANGE.

    Returns:
        None
    """
    # Ensure we can serialize custom transformer configs
    maybe_register_config_serialize_by_value()

    self.model_config = vllm_config.model_config
    self.vllm_config = vllm_config
    self.observability_config = vllm_config.observability_config
    self.log_requests = log_requests

    custom_stat_loggers = list(stat_loggers or [])
    custom_stat_loggers.extend(load_stat_logger_plugin_factories())

    has_custom_loggers = bool(custom_stat_loggers)
    self.log_stats = log_stats or has_custom_loggers
    if not log_stats and has_custom_loggers:
        logger.info(
            "AsyncLLM created with log_stats=False, "
            "but custom stat loggers were found; "
            "enabling logging without default stat loggers."
        )

    self.input_processor = InputProcessor(self.vllm_config)
    self.io_processor = get_io_processor(
        self.vllm_config,
        self.model_config.io_processor_plugin,
    )

    # OutputProcessor (converts EngineCoreOutputs --> RequestOutput).
    self.output_processor = OutputProcessor(
        self.tokenizer,
        log_stats=self.log_stats,
        stream_interval=self.vllm_config.scheduler_config.stream_interval,
    )
    endpoint = self.observability_config.otlp_traces_endpoint
    if endpoint is not None:
        tracer = init_tracer("vllm.llm_engine", endpoint)
        self.output_processor.tracer = tracer

    # EngineCore (starts the engine in background process).
    self.engine_core = EngineCoreClient.make_async_mp_client(
        vllm_config=vllm_config,
        executor_class=executor_class,
        log_stats=self.log_stats,
        client_addresses=client_addresses,
        client_count=client_count,
        client_index=client_index,
    )

    # Loggers.
    self.logger_manager: StatLoggerManager | None = None
    if self.log_stats:
        self.logger_manager = StatLoggerManager(
            vllm_config=vllm_config,
            engine_idxs=self.engine_core.engine_ranks_managed,
            custom_stat_loggers=custom_stat_loggers,
            enable_default_loggers=log_stats,
            client_count=client_count,
            aggregate_engine_logging=aggregate_engine_logging,
        )
        self.logger_manager.log_engine_initialized()

    # Pause / resume state for async RL workflows.
    self._pause_cond = asyncio.Condition()
    self._paused = False

    self.output_handler: asyncio.Task | None = None
    try:
        # Start output handler eagerly if we are in the asyncio eventloop.
        asyncio.get_running_loop()
        self._run_output_handler()
    except RuntimeError:
        pass

    if (
        vllm_config.profiler_config.profiler == "torch"
        and not vllm_config.profiler_config.ignore_frontend
    ):
        profiler_dir = vllm_config.profiler_config.torch_profiler_dir
        logger.info(
            "Torch profiler enabled. AsyncLLM CPU traces will be collected under %s",  # noqa: E501
            profiler_dir,
        )
        worker_name = f"{socket.gethostname()}_{os.getpid()}.async_llm"
        self.profiler = torch.profiler.profile(
            activities=[
                torch.profiler.ProfilerActivity.CPU,
            ],
            with_stack=vllm_config.profiler_config.torch_profiler_with_stack,
            on_trace_ready=torch.profiler.tensorboard_trace_handler(
                profiler_dir,
                worker_name=worker_name,
                use_gzip=vllm_config.profiler_config.torch_profiler_use_gzip,
            ),
        )
    else:
        self.profiler = None

_add_request async

_add_request(
    request: EngineCoreRequest,
    prompt: str | None,
    parent_req: ParentRequest | None,
    index: int,
    queue: RequestOutputCollector,
)
Source code in vllm/v1/engine/async_llm.py
async def _add_request(
    self,
    request: EngineCoreRequest,
    prompt: str | None,
    parent_req: ParentRequest | None,
    index: int,
    queue: RequestOutputCollector,
):
    # Add the request to OutputProcessor (this process).
    self.output_processor.add_request(request, prompt, parent_req, index, queue)

    # Add the EngineCoreRequest to EngineCore (separate process).
    await self.engine_core.add_request_async(request)

    if self.log_requests:
        logger.info("Added request %s.", request.request_id)

_add_streaming_input_request async

_add_streaming_input_request(
    request_id: str,
    input_stream: AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> RequestOutputCollector
Source code in vllm/v1/engine/async_llm.py
async def _add_streaming_input_request(
    self,
    request_id: str,
    input_stream: AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> RequestOutputCollector:
    self._validate_streaming_input_sampling_params(sampling_params)

    inputs = dict(
        arrival_time=arrival_time,
        lora_request=lora_request,
        tokenization_kwargs=tokenization_kwargs,
        trace_headers=trace_headers,
        priority=priority,
        data_parallel_rank=data_parallel_rank,
    )

    if not sampling_params.skip_clone:
        sampling_params = sampling_params.clone()
        sampling_params.skip_clone = True

    # Create request for validation, also used as the finished signal
    # once the input stream is closed.
    final_req = self.input_processor.process_inputs(
        request_id=request_id,
        prompt=TokensPrompt(prompt_token_ids=[0]),
        params=sampling_params,
        **inputs,  # type: ignore[arg-type]
    )
    self.input_processor.assign_request_id(final_req)
    internal_req_id = final_req.request_id

    queue = RequestOutputCollector(sampling_params.output_kind, internal_req_id)

    async def handle_inputs():
        cancelled = False
        try:
            async for input_chunk in input_stream:
                sp = input_chunk.sampling_params
                if sp:
                    self._validate_streaming_input_sampling_params(sp)
                else:
                    sp = sampling_params
                req = self.input_processor.process_inputs(
                    request_id=internal_req_id,
                    prompt=input_chunk.prompt,
                    params=sp,
                    resumable=True,
                    **inputs,  # type: ignore[arg-type]
                )
                req.external_req_id = request_id
                if req.prompt_embeds is not None:
                    raise ValueError(
                        "prompt_embeds not supported for streaming inputs"
                    )
                prompt_text = get_prompt_text(input_chunk.prompt)
                await self._add_request(req, prompt_text, None, 0, queue)
        except (asyncio.CancelledError, GeneratorExit):
            cancelled = True
        except Exception as error:
            # Wrap in InputStreamError so generate() can propagate it
            # without wrapping in EngineGenerateError.
            queue.put(InputStreamError(error))
        finally:
            queue._input_stream_task = None
            if not cancelled:
                # Send empty final request to indicate that inputs have
                # finished. Don't send if cancelled (session was aborted).
                await self._add_request(final_req, None, None, 0, queue)

    # Ensure output handler is running.
    self._run_output_handler()

    queue._input_stream_task = asyncio.create_task(handle_inputs())
    return queue

_run_output_handler

_run_output_handler()

Background loop: pulls from EngineCore and pushes to AsyncStreams.

Source code in vllm/v1/engine/async_llm.py
def _run_output_handler(self):
    """Background loop: pulls from EngineCore and pushes to AsyncStreams."""

    if self.output_handler is not None:
        return

    # Ensure that the task doesn't have a circular ref back to the AsyncLLM
    # object, or else it won't be garbage collected and cleaned up properly.
    engine_core = self.engine_core
    output_processor = self.output_processor
    log_stats = self.log_stats
    logger_manager = self.logger_manager
    input_processor = self.input_processor
    chunk_size = envs.VLLM_V1_OUTPUT_PROC_CHUNK_SIZE

    async def output_handler():
        try:
            while True:
                # 1) Pull EngineCoreOutputs from the EngineCore.
                outputs = await engine_core.get_output_async()
                num_outputs = len(outputs.outputs)

                iteration_stats = (
                    IterationStats() if (log_stats and num_outputs) else None
                )

                # Split outputs into chunks of at most
                # VLLM_V1_OUTPUT_PROC_CHUNK_SIZE, so that we don't block the
                # event loop for too long.
                engine_core_outputs = outputs.outputs
                for start in range(0, num_outputs, chunk_size):
                    end = start + chunk_size
                    outputs_slice = engine_core_outputs[start:end]
                    # 2) Process EngineCoreOutputs.
                    processed_outputs = output_processor.process_outputs(
                        outputs_slice, outputs.timestamp, iteration_stats
                    )
                    # NOTE: RequestOutputs are pushed to their queues.
                    assert not processed_outputs.request_outputs

                    # Allow other asyncio tasks to run between chunks
                    if end < num_outputs:
                        await asyncio.sleep(0)

                    # 3) Abort any reqs that finished due to stop strings.
                    if processed_outputs.reqs_to_abort:
                        await engine_core.abort_requests_async(
                            processed_outputs.reqs_to_abort
                        )

                output_processor.update_scheduler_stats(outputs.scheduler_stats)

                # 4) Logging.
                # TODO(rob): make into a coroutine and launch it in
                # background thread once Prometheus overhead is non-trivial.
                if logger_manager:
                    logger_manager.record(
                        engine_idx=outputs.engine_index,
                        scheduler_stats=outputs.scheduler_stats,
                        iteration_stats=iteration_stats,
                        mm_cache_stats=input_processor.stat_mm_cache(),
                    )
        except Exception as e:
            logger.exception("AsyncLLM output_handler failed.")
            output_processor.propagate_error(e)

    self.output_handler = asyncio.create_task(output_handler())

_validate_streaming_input_sampling_params staticmethod

_validate_streaming_input_sampling_params(
    params: SamplingParams | PoolingParams,
)
Source code in vllm/v1/engine/async_llm.py
@staticmethod
def _validate_streaming_input_sampling_params(
    params: SamplingParams | PoolingParams,
):
    if (
        not isinstance(params, SamplingParams)
        or params.n > 1
        or params.output_kind == RequestOutputKind.FINAL_ONLY
        or params.stop
    ):
        raise ValueError(
            "Input streaming not currently supported "
            "for pooling models, n > 1, request_kind = FINAL_ONLY "
            "or with stop strings."
        )

abort async

abort(
    request_id: str | Iterable[str], internal: bool = False
) -> None

Abort RequestId in OutputProcessor and EngineCore.

Source code in vllm/v1/engine/async_llm.py
async def abort(
    self, request_id: str | Iterable[str], internal: bool = False
) -> None:
    """Abort RequestId in OutputProcessor and EngineCore."""

    request_ids = (
        (request_id,) if isinstance(request_id, str) else as_list(request_id)
    )
    all_request_ids = self.output_processor.abort_requests(request_ids, internal)
    await self.engine_core.abort_requests_async(all_request_ids)

    if self.log_requests:
        logger.info("Aborted request(s) %s.", ",".join(request_ids))

add_lora async

add_lora(lora_request: LoRARequest) -> bool

Load a new LoRA adapter into the engine for future requests.

Source code in vllm/v1/engine/async_llm.py
async def add_lora(self, lora_request: LoRARequest) -> bool:
    """Load a new LoRA adapter into the engine for future requests."""
    return await self.engine_core.add_lora_async(lora_request)

add_request async

add_request(
    request_id: str,
    prompt: EngineCoreRequest
    | PromptType
    | AsyncGenerator[StreamingInput, None],
    params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    prompt_text: str | None = None,
) -> RequestOutputCollector

Add new request to the AsyncLLM.

Source code in vllm/v1/engine/async_llm.py
async def add_request(
    self,
    request_id: str,
    prompt: EngineCoreRequest | PromptType | AsyncGenerator[StreamingInput, None],
    params: SamplingParams | PoolingParams,
    arrival_time: float | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
    prompt_text: str | None = None,
) -> RequestOutputCollector:
    """Add new request to the AsyncLLM."""

    if self.errored:
        raise EngineDeadError()

    is_pooling = isinstance(params, PoolingParams)

    if (
        self.vllm_config.cache_config.kv_sharing_fast_prefill
        and not is_pooling
        and params.prompt_logprobs
    ):
        raise ValueError(
            "--kv-sharing-fast-prefill produces incorrect logprobs for "
            "prompt tokens, please disable it when the requests need "
            "prompt logprobs"
        )

    if params.truncate_prompt_tokens is not None:
        params_type = type(params).__name__
        warnings.warn(
            f"The `truncate_prompt_tokens` parameter in `{params_type}` "
            "is deprecated and will be removed in v0.16. "
            "Please pass it via `tokenization_kwargs` instead.",
            DeprecationWarning,
            stacklevel=2,
        )

        tokenization_kwargs = merge_kwargs(
            tokenization_kwargs,
            dict(truncate_prompt_tokens=params.truncate_prompt_tokens),
        )

    if isinstance(prompt, AsyncGenerator):
        # Streaming input case.
        return await self._add_streaming_input_request(
            request_id,
            prompt,
            params,
            arrival_time,
            lora_request,
            tokenization_kwargs,
            trace_headers,
            priority,
            data_parallel_rank,
        )

    # Convert Input --> Request.
    if isinstance(prompt, EngineCoreRequest):
        request = prompt
        if request_id != request.request_id:
            logger.warning_once(
                "AsyncLLM.add_request() was passed a request_id parameter that "
                "does not match the EngineCoreRequest.request_id attribute. The "
                "latter will be used, and the former will be ignored."
            )
    else:
        if prompt_text is not None:
            raise ValueError(
                "should only provide prompt_text with EngineCoreRequest"
            )
        request = self.input_processor.process_inputs(
            request_id,
            prompt,
            params,
            arrival_time=arrival_time,
            lora_request=lora_request,
            tokenization_kwargs=tokenization_kwargs,
            trace_headers=trace_headers,
            priority=priority,
            data_parallel_rank=data_parallel_rank,
        )
        prompt_text = get_prompt_text(prompt)

    self.input_processor.assign_request_id(request)

    # We start the output_handler on the first call to add_request() so
    # we can call __init__ before the event loop, which enables us
    # to handle startup failure gracefully in the OpenAI server.
    self._run_output_handler()

    # Respect pause state before accepting new requests.
    async with self._pause_cond:
        await self._pause_cond.wait_for(lambda: not self._paused)

    # Create a new output collector for the request.
    queue = RequestOutputCollector(params.output_kind, request.request_id)

    # Use cloned params that may have been updated in process_inputs()
    params = request.params

    if is_pooling or params.n == 1:
        await self._add_request(request, prompt_text, None, 0, queue)
        return queue

    parent_params = params
    assert isinstance(parent_params, SamplingParams)

    # Fan out child requests (for n>1).
    parent_request = ParentRequest(request)
    for idx in range(parent_params.n):
        request_id, child_params = parent_request.get_child_info(idx)
        child_request = request if idx == parent_params.n - 1 else copy(request)
        child_request.request_id = request_id
        child_request.sampling_params = child_params
        await self._add_request(
            child_request, prompt_text, parent_request, idx, queue
        )
    return queue

check_health async

check_health() -> None
Source code in vllm/v1/engine/async_llm.py
async def check_health(self) -> None:
    logger.debug("Called check_health.")
    if self.errored:
        raise self.dead_error

collective_rpc async

collective_rpc(
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
)

Perform a collective RPC call to the given path.

Source code in vllm/v1/engine/async_llm.py
async def collective_rpc(
    self,
    method: str,
    timeout: float | None = None,
    args: tuple = (),
    kwargs: dict | None = None,
):
    """
    Perform a collective RPC call to the given path.
    """
    return await self.engine_core.collective_rpc_async(
        method, timeout, args, kwargs
    )

do_log_stats async

do_log_stats() -> None
Source code in vllm/v1/engine/async_llm.py
async def do_log_stats(self) -> None:
    if self.logger_manager:
        self.logger_manager.log()

encode async

encode(
    prompt: PromptType,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]

Main function called by the API server to kick off a request * 1) Making an AsyncStream corresponding to the Request. * 2) Processing the Input. * 3) Adding the Request to the EngineCore (separate process).

A separate output_handler loop runs in a background AsyncIO task, pulling outputs from EngineCore and putting them into the per-request AsyncStream.

The caller of generate() iterates the returned AsyncGenerator, returning the RequestOutput back to the caller.

Source code in vllm/v1/engine/async_llm.py
async def encode(
    self,
    prompt: PromptType,
    pooling_params: PoolingParams,
    request_id: str,
    lora_request: LoRARequest | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    tokenization_kwargs: dict[str, Any] | None = None,
) -> AsyncGenerator[PoolingRequestOutput, None]:
    """
    Main function called by the API server to kick off a request
        * 1) Making an AsyncStream corresponding to the Request.
        * 2) Processing the Input.
        * 3) Adding the Request to the EngineCore (separate process).

    A separate output_handler loop runs in a background AsyncIO task,
    pulling outputs from EngineCore and putting them into the
    per-request AsyncStream.

    The caller of generate() iterates the returned AsyncGenerator,
    returning the RequestOutput back to the caller.
    """

    q: RequestOutputCollector | None = None
    try:
        q = await self.add_request(
            request_id,
            prompt,
            pooling_params,
            lora_request=lora_request,
            tokenization_kwargs=tokenization_kwargs,
            trace_headers=trace_headers,
            priority=priority,
        )

        # The output_handler task pushes items into the queue.
        # This task pulls from the queue and yields to caller.
        finished = False
        while not finished:
            # Note: drain queue without await if possible (avoids
            # task switching under load which helps performance).
            out = q.get_nowait() or await q.get()
            assert isinstance(out, PoolingRequestOutput)
            # Note: both OutputProcessor and EngineCore handle their
            # own request cleanup based on finished.
            finished = out.finished
            yield out

    # If the request is disconnected by the client, generate()
    # is cancelled. So, we abort the request if we end up here.
    except asyncio.CancelledError:
        if q is not None:
            await self.abort(q.request_id, internal=True)
        if self.log_requests:
            logger.info("Request %s aborted.", request_id)
        raise

    # Engine is dead. Do not abort since we shut down.
    except EngineDeadError:
        if self.log_requests:
            logger.info("Request %s failed (engine dead).", request_id)
        raise

    # Request validation error.
    except ValueError:
        if self.log_requests:
            logger.info("Request %s failed (bad request).", request_id)
        raise

    # Unexpected error in the generate() task (possibly recoverable).
    except Exception as e:
        if q is not None:
            await self.abort(q.request_id, internal=True)
        if self.log_requests:
            logger.info("Request %s failed.", request_id)
        raise EngineGenerateError() from e
    finally:
        if q is not None:
            q.close()

from_engine_args classmethod

from_engine_args(
    engine_args: AsyncEngineArgs,
    start_engine_loop: bool = True,
    usage_context: UsageContext = ENGINE_CONTEXT,
    stat_loggers: list[StatLoggerFactory] | None = None,
) -> AsyncLLM

Create an AsyncLLM from the EngineArgs.

Source code in vllm/v1/engine/async_llm.py
@classmethod
def from_engine_args(
    cls,
    engine_args: AsyncEngineArgs,
    start_engine_loop: bool = True,
    usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    stat_loggers: list[StatLoggerFactory] | None = None,
) -> "AsyncLLM":
    """Create an AsyncLLM from the EngineArgs."""

    # Create the engine configs.
    vllm_config = engine_args.create_engine_config(usage_context)
    executor_class = Executor.get_class(vllm_config)

    # Create the AsyncLLM.
    return cls(
        vllm_config=vllm_config,
        executor_class=executor_class,
        log_requests=engine_args.enable_log_requests,
        log_stats=not engine_args.disable_log_stats,
        start_engine_loop=start_engine_loop,
        usage_context=usage_context,
        stat_loggers=stat_loggers,
    )

from_vllm_config classmethod

from_vllm_config(
    vllm_config: VllmConfig,
    start_engine_loop: bool = True,
    usage_context: UsageContext = ENGINE_CONTEXT,
    stat_loggers: list[StatLoggerFactory] | None = None,
    enable_log_requests: bool = False,
    aggregate_engine_logging: bool = False,
    disable_log_stats: bool = False,
    client_addresses: dict[str, str] | None = None,
    client_count: int = 1,
    client_index: int = 0,
) -> AsyncLLM
Source code in vllm/v1/engine/async_llm.py
@classmethod
def from_vllm_config(
    cls,
    vllm_config: VllmConfig,
    start_engine_loop: bool = True,
    usage_context: UsageContext = UsageContext.ENGINE_CONTEXT,
    stat_loggers: list[StatLoggerFactory] | None = None,
    enable_log_requests: bool = False,
    aggregate_engine_logging: bool = False,
    disable_log_stats: bool = False,
    client_addresses: dict[str, str] | None = None,
    client_count: int = 1,
    client_index: int = 0,
) -> "AsyncLLM":
    # Create the LLMEngine.
    return cls(
        vllm_config=vllm_config,
        executor_class=Executor.get_class(vllm_config),
        start_engine_loop=start_engine_loop,
        stat_loggers=stat_loggers,
        log_requests=enable_log_requests,
        log_stats=not disable_log_stats,
        aggregate_engine_logging=aggregate_engine_logging,
        usage_context=usage_context,
        client_addresses=client_addresses,
        client_count=client_count,
        client_index=client_index,
    )

generate async

generate(
    prompt: EngineCoreRequest
    | PromptType
    | AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: str | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> AsyncGenerator[RequestOutput, None]

Main function called by the API server to kick off a request * 1) Making an AsyncStream corresponding to the Request. * 2) Processing the Input. * 3) Adding the Request to the Detokenizer. * 4) Adding the Request to the EngineCore (separate process).

A separate output_handler loop runs in a background AsyncIO task, pulling outputs from EngineCore and putting them into the per-request AsyncStream.

The caller of generate() iterates the returned AsyncGenerator, returning the RequestOutput back to the caller.

Source code in vllm/v1/engine/async_llm.py
async def generate(
    self,
    prompt: EngineCoreRequest | PromptType | AsyncGenerator[StreamingInput, None],
    sampling_params: SamplingParams,
    request_id: str,
    *,
    prompt_text: str | None = None,
    lora_request: LoRARequest | None = None,
    tokenization_kwargs: dict[str, Any] | None = None,
    trace_headers: Mapping[str, str] | None = None,
    priority: int = 0,
    data_parallel_rank: int | None = None,
) -> AsyncGenerator[RequestOutput, None]:
    """
    Main function called by the API server to kick off a request
        * 1) Making an AsyncStream corresponding to the Request.
        * 2) Processing the Input.
        * 3) Adding the Request to the Detokenizer.
        * 4) Adding the Request to the EngineCore (separate process).

    A separate output_handler loop runs in a background AsyncIO task,
    pulling outputs from EngineCore and putting them into the
    per-request AsyncStream.

    The caller of generate() iterates the returned AsyncGenerator,
    returning the RequestOutput back to the caller.
    """

    q: RequestOutputCollector | None = None
    try:
        q = await self.add_request(
            request_id,
            prompt,
            sampling_params,
            lora_request=lora_request,
            tokenization_kwargs=tokenization_kwargs,
            trace_headers=trace_headers,
            priority=priority,
            data_parallel_rank=data_parallel_rank,
            prompt_text=prompt_text,
        )

        # The output_handler task pushes items into the queue.
        # This task pulls from the queue and yields to caller.
        finished = False
        while not finished:
            # Note: drain queue without await if possible (avoids
            # task switching under load which helps performance).
            out = q.get_nowait() or await q.get()

            # Note: both OutputProcessor and EngineCore handle their
            # own request cleanup based on finished.
            assert isinstance(out, RequestOutput)
            finished = out.finished
            if out is not STREAM_FINISHED:
                yield out

    # If the request is disconnected by the client, generate()
    # is cancelled or the generator is garbage collected. So,
    # we abort the request if we end up here.
    except (asyncio.CancelledError, GeneratorExit):
        if q is not None:
            await self.abort(q.request_id, internal=True)
        if self.log_requests:
            logger.info("Request %s aborted.", request_id)
        raise

    # Engine is dead. Do not abort since we shut down.
    except EngineDeadError:
        if self.log_requests:
            logger.info("Request %s failed (engine dead).", request_id)
        raise

    # Request validation error.
    except ValueError as e:
        if self.log_requests:
            logger.info("Request %s failed (bad request): %s.", request_id, e)
        raise

    # Error from input stream generator - propagate directly.
    except InputStreamError as e:
        if q is not None:
            await self.abort(q.request_id, internal=True)
        if self.log_requests:
            logger.info("Request %s failed (input error): %s.", request_id, e)
        raise e.cause from e

    # Unexpected error in the generate() task (possibly recoverable).
    except Exception as e:
        if q is not None:
            await self.abort(q.request_id, internal=True)
        if self.log_requests:
            try:
                s = f"{e.__class__.__name__}: {e}"
            except Exception as e2:
                s = (
                    f"{e.__class__.__name__}: "
                    "error during printing an exception of class"
                    + e2.__class__.__name__
                )
            logger.info("Request %s failed due to %s.", request_id, s)
        raise EngineGenerateError() from e
    finally:
        if q is not None:
            q.close()

get_supported_tasks async

get_supported_tasks() -> tuple[SupportedTask, ...]
Source code in vllm/v1/engine/async_llm.py
async def get_supported_tasks(self) -> tuple[SupportedTask, ...]:
    return await self.engine_core.get_supported_tasks_async()

get_tokenizer

get_tokenizer() -> TokenizerLike
Source code in vllm/v1/engine/async_llm.py
def get_tokenizer(self) -> TokenizerLike:
    return self.input_processor.get_tokenizer()

is_paused async

is_paused() -> bool

Return whether the engine is currently paused.

Source code in vllm/v1/engine/async_llm.py
async def is_paused(self) -> bool:
    """Return whether the engine is currently paused."""

    async with self._pause_cond:
        return self._paused

is_sleeping async

is_sleeping() -> bool
Source code in vllm/v1/engine/async_llm.py
async def is_sleeping(self) -> bool:
    return await self.engine_core.is_sleeping_async()

is_tracing_enabled async

is_tracing_enabled() -> bool
Source code in vllm/v1/engine/async_llm.py
async def is_tracing_enabled(self) -> bool:
    return self.observability_config.otlp_traces_endpoint is not None  # type: ignore

list_loras async

list_loras() -> set[int]

List all registered adapters.

Source code in vllm/v1/engine/async_llm.py
async def list_loras(self) -> set[int]:
    """List all registered adapters."""
    return await self.engine_core.list_loras_async()

pause_generation async

pause_generation(
    *,
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None

Pause generation to allow model weight updates.

New generation/encoding requests are blocked until resume.

Parameters:

Name Type Description Default
wait_for_inflight_requests bool

When True waits for in-flight requests to finish before pausing. When False (default), immediately aborts any in-flight requests.

False
clear_cache bool

Whether to clear KV cache and prefix cache after draining. Set to False to preserve cache for faster resume. Default is True (clear caches).

True
Source code in vllm/v1/engine/async_llm.py
async def pause_generation(
    self,
    *,
    wait_for_inflight_requests: bool = False,
    clear_cache: bool = True,
) -> None:
    """
    Pause generation to allow model weight updates.

    New generation/encoding requests are blocked until resume.

    Args:
        wait_for_inflight_requests: When ``True`` waits for in-flight
            requests to finish before pausing. When ``False`` (default),
            immediately aborts any in-flight requests.
        clear_cache: Whether to clear KV cache and prefix cache after
            draining. Set to ``False`` to preserve cache for faster resume.
            Default is ``True`` (clear caches).
    """

    async with self._pause_cond:
        if self._paused:
            return
        self._paused = True

    if not wait_for_inflight_requests:
        request_ids = list(self.output_processor.request_states.keys())
        if request_ids:
            await self.abort(request_ids, internal=True)

    # Wait for running requests to drain before clearing cache.
    if self.output_processor.has_unfinished_requests():
        await self.output_processor.wait_for_requests_to_drain()

    # Clear cache
    if clear_cache:
        await self.reset_prefix_cache()
        await self.reset_mm_cache()
        await self.reset_encoder_cache()

pin_lora async

pin_lora(lora_id: int) -> bool

Prevent an adapter from being evicted.

Source code in vllm/v1/engine/async_llm.py
async def pin_lora(self, lora_id: int) -> bool:
    """Prevent an adapter from being evicted."""
    return await self.engine_core.pin_lora_async(lora_id)

remove_lora async

remove_lora(lora_id: int) -> bool

Remove an already loaded LoRA adapter.

Source code in vllm/v1/engine/async_llm.py
async def remove_lora(self, lora_id: int) -> bool:
    """Remove an already loaded LoRA adapter."""
    return await self.engine_core.remove_lora_async(lora_id)

reset_encoder_cache async

reset_encoder_cache() -> None
Source code in vllm/v1/engine/async_llm.py
async def reset_encoder_cache(self) -> None:
    await self.engine_core.reset_encoder_cache_async()

reset_mm_cache async

reset_mm_cache() -> None
Source code in vllm/v1/engine/async_llm.py
async def reset_mm_cache(self) -> None:
    self.input_processor.clear_mm_cache()
    await self.engine_core.reset_mm_cache_async()

reset_prefix_cache async

reset_prefix_cache(
    reset_running_requests: bool = False,
    reset_connector: bool = False,
) -> bool
Source code in vllm/v1/engine/async_llm.py
async def reset_prefix_cache(
    self, reset_running_requests: bool = False, reset_connector: bool = False
) -> bool:
    return await self.engine_core.reset_prefix_cache_async(
        reset_running_requests, reset_connector
    )

resume_generation async

resume_generation() -> None

Resume generation after :meth:pause_generation.

Source code in vllm/v1/engine/async_llm.py
async def resume_generation(self) -> None:
    """Resume generation after :meth:`pause_generation`."""

    async with self._pause_cond:
        self._paused = False
        self._pause_cond.notify_all()  # Wake up all waiting requests

scale_elastic_ep async

scale_elastic_ep(
    new_data_parallel_size: int, drain_timeout: int = 300
)

Scale up or down the data parallel size by adding or removing engine cores. Args: new_data_parallel_size: The new number of data parallel workers drain_timeout: Maximum time to wait for requests to drain (seconds)

Source code in vllm/v1/engine/async_llm.py
async def scale_elastic_ep(
    self, new_data_parallel_size: int, drain_timeout: int = 300
):
    """
    Scale up or down the data parallel size by adding or removing
    engine cores.
    Args:
        new_data_parallel_size: The new number of data parallel workers
        drain_timeout:
            Maximum time to wait for requests to drain (seconds)
    """
    old_data_parallel_size = self.vllm_config.parallel_config.data_parallel_size
    if old_data_parallel_size == new_data_parallel_size:
        logger.info(
            "Data parallel size is already %s, skipping scale",
            new_data_parallel_size,
        )
        return
    logger.info(
        "Waiting for requests to drain before scaling up to %s engines...",
        new_data_parallel_size,
    )
    await self.wait_for_requests_to_drain(drain_timeout)
    logger.info(
        "Requests have been drained, proceeding with scale to %s engines",
        new_data_parallel_size,
    )
    await self.engine_core.scale_elastic_ep(new_data_parallel_size)
    self.vllm_config.parallel_config.data_parallel_size = new_data_parallel_size

    # recreate stat loggers
    if new_data_parallel_size > old_data_parallel_size and self.log_stats:
        # TODO(rob): fix this after talking with Ray team.
        # This resets all the prometheus metrics since we
        # unregister during initialization. Need to understand
        # the intended behavior here better.
        self.logger_manager = StatLoggerManager(
            vllm_config=self.vllm_config,
            engine_idxs=list(range(new_data_parallel_size)),
            custom_stat_loggers=None,
        )

shutdown

shutdown()

Shutdown, cleaning up the background proc and IPC.

Source code in vllm/v1/engine/async_llm.py
def shutdown(self):
    """Shutdown, cleaning up the background proc and IPC."""

    shutdown_prometheus()

    if engine_core := getattr(self, "engine_core", None):
        engine_core.shutdown()

    if input_processor := getattr(self, "input_processor", None):
        input_processor.close()

    handler = getattr(self, "output_handler", None)
    if handler is not None:
        cancel_task_threadsafe(handler)

sleep async

sleep(level: int = 1) -> None
Source code in vllm/v1/engine/async_llm.py
async def sleep(self, level: int = 1) -> None:
    await self.reset_prefix_cache()
    await self.engine_core.sleep_async(level)

    if self.logger_manager is not None:
        self.logger_manager.record_sleep_state(1, level)

start_profile async

start_profile() -> None
Source code in vllm/v1/engine/async_llm.py
async def start_profile(self) -> None:
    coros = [self.engine_core.profile_async(True)]
    if self.profiler is not None:
        coros.append(asyncio.to_thread(self.profiler.start))
    await asyncio.gather(*coros)

stop_profile async

stop_profile() -> None
Source code in vllm/v1/engine/async_llm.py
async def stop_profile(self) -> None:
    coros = [self.engine_core.profile_async(False)]
    if self.profiler is not None:
        coros.append(asyncio.to_thread(self.profiler.stop))
    await asyncio.gather(*coros)

wait_for_requests_to_drain async

wait_for_requests_to_drain(drain_timeout: int = 300)

Wait for all requests to be drained.

Source code in vllm/v1/engine/async_llm.py
async def wait_for_requests_to_drain(self, drain_timeout: int = 300):
    """Wait for all requests to be drained."""
    start_time = time.time()
    while time.time() - start_time < drain_timeout:
        if not self.engine_core.dp_engines_running():
            logger.info("Engines are idle, requests have been drained")
            return

        logger.info("Engines are still running, waiting for requests to drain...")
        await asyncio.sleep(1)  # Wait 1 second before checking again

    raise TimeoutError(
        f"Timeout reached after {drain_timeout} seconds "
        "waiting for requests to drain."
    )

wake_up async

wake_up(tags: list[str] | None = None) -> None
Source code in vllm/v1/engine/async_llm.py
async def wake_up(self, tags: list[str] | None = None) -> None:
    await self.engine_core.wake_up_async(tags)

    if self.logger_manager is not None:
        self.logger_manager.record_sleep_state(0, 0)

InputStreamError

Bases: Exception

Wrapper for errors from the input stream generator.

This is used to propagate errors from the user's input generator without wrapping them in EngineGenerateError.

Source code in vllm/v1/engine/async_llm.py
class InputStreamError(Exception):
    """Wrapper for errors from the input stream generator.

    This is used to propagate errors from the user's input generator
    without wrapping them in EngineGenerateError.
    """

    def __init__(self, cause: Exception):
        self.cause = cause
        super().__init__(str(cause))

cause instance-attribute

cause = cause

__init__

__init__(cause: Exception)
Source code in vllm/v1/engine/async_llm.py
def __init__(self, cause: Exception):
    self.cause = cause
    super().__init__(str(cause))