Skip to content

vllm.model_executor.layers.fused_moe.deepep_ll_prepare_finalize

DEEPEP_QUANT_BLOCK_SHAPE module-attribute

DEEPEP_QUANT_BLOCK_SHAPE = [
    DEEPEP_QUANT_BLOCK_SIZE,
    DEEPEP_QUANT_BLOCK_SIZE,
]

DEEPEP_QUANT_BLOCK_SIZE module-attribute

DEEPEP_QUANT_BLOCK_SIZE = 128

logger module-attribute

logger = init_logger(__name__)

DeepEPLLPrepareAndFinalize

Bases: FusedMoEPrepareAndFinalize

Prepare/Finalize using DeepEP low-latency kernels.

Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
class DeepEPLLPrepareAndFinalize(mk.FusedMoEPrepareAndFinalize):
    """
    Prepare/Finalize using DeepEP low-latency kernels.
    """

    # DeepEP low-latency kernels are compiled only for certain
    # specific hidden sizes.
    # NOTE: Keep this list sorted, maybe_roundup_layer_hidden_size depends
    # on it.
    SUPPORTED_HIDDEN_SIZES = [2048, 2560, 3072, 4096, 5120, 6144, 7168, 8192]

    @staticmethod
    def maybe_roundup_layer_hidden_size(hidden_size: int) -> int:
        # Round up hidden size to the closest supported hidden size.
        _supported_hs = DeepEPLLPrepareAndFinalize.SUPPORTED_HIDDEN_SIZES
        # Check sorted
        num_supported_hs = len(_supported_hs)
        assert all(
            [
                _supported_hs[i] < _supported_hs[i + 1]
                for i in range(num_supported_hs - 1)
            ]
        )

        for x in _supported_hs:
            if x >= hidden_size:
                return x

        raise ValueError(
            f"Hidden Size {hidden_size} is greater than the "
            f"maximum supported hidden size {_supported_hs[-1]}"
        )

    def __init__(
        self,
        buffer: deep_ep.Buffer,
        max_tokens_per_rank: int,
        num_dispatchers: int,
        use_fp8_dispatch: bool = False,
        global_to_physical: torch.Tensor | None = None,
        physical_to_global: torch.Tensor | None = None,
        local_expert_global_ids: torch.Tensor | None = None,
    ):
        super().__init__()

        self.buffer = buffer
        self.max_tokens_per_rank = max_tokens_per_rank
        self.use_fp8_dispatch = use_fp8_dispatch
        # The dispatch function returns a handle that the combine function
        # requires. We store the handle here so it is available to the
        # combine function.
        self.handles: list[tuple | None] = [None, None]
        self.num_dispatchers_ = num_dispatchers

        topk_indices_dtype = self.topk_indices_dtype()

        def _maybe_cast(tensor: torch.Tensor | None) -> torch.Tensor | None:
            if tensor is None or topk_indices_dtype is None:
                return tensor
            return tensor.to(dtype=topk_indices_dtype)

        self.global_to_physical = _maybe_cast(global_to_physical)
        self.physical_to_global = _maybe_cast(physical_to_global)
        self.local_expert_global_ids = _maybe_cast(local_expert_global_ids)

        # We don't have enough information to determine if we should dispatch
        # activation scales in a packed ue8m0 format during object construction
        # time. This setting is handled by post_init_setup.
        self.use_ue8m0_dispatch = False

    def post_init_setup(self, fused_experts: mk.FusedMoEPermuteExpertsUnpermute):
        if not fused_experts.supports_packed_ue8m0_act_scales():
            # Early exit.
            return

        if self.use_fp8_dispatch:
            logger.debug_once(
                "Update DeepEPLLPrepareFinalize to do packed ue8m0 scales dispatch."
            )
            self.use_ue8m0_dispatch = True
        else:
            logger.warning_once(
                "DeepEPLLPrepareAndFinalize is setup to dispatch raw/unquantized "
                f"activations despite ({fused_experts.__class__.__name__}) being able "
                "to support quantized activations.",
                scope="local",
            )

    def num_dispatchers(self) -> int:
        return self.num_dispatchers_

    def output_is_reduced(self) -> bool:
        return True

    @property
    def activation_format(self) -> mk.FusedMoEActivationFormat:
        return mk.FusedMoEActivationFormat.BatchedExperts

    def max_num_tokens_per_rank(self) -> int | None:
        return self.max_tokens_per_rank

    def topk_indices_dtype(self) -> torch.dtype | None:
        return torch.int64

    def _map_global_to_physical_ids(self, topk_ids: torch.Tensor) -> torch.Tensor:
        if self.global_to_physical is None:
            return topk_ids
        return self.global_to_physical[topk_ids]

    def _map_local_to_global_ids(self, expert_topk_ids: torch.Tensor) -> torch.Tensor:
        if self.local_expert_global_ids is None:
            return expert_topk_ids
        return self.local_expert_global_ids[expert_topk_ids]

    def _do_quant(
        self,
        x: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
        a1_dtype: torch.dtype,
        quant_config: FusedMoEQuantConfig,
    ) -> tuple[torch.Tensor, torch.Tensor | None]:
        if self.use_fp8_dispatch:
            block_k = (
                quant_config.block_shape[1]
                if quant_config.block_shape is not None
                else None
            )
            if block_k == DEEPEP_QUANT_BLOCK_SIZE:
                # DeepEP kernels did the quantization for us.
                x, x_scales = x
                return x, x_scales

            # Dequant to get back the tokens in the datatype we dispatched in.
            x_fp8, x_scales = x
            x = dequant_fp8(x_fp8, x_scales).to(dtype=a1_dtype)

        assert isinstance(x, (torch.Tensor, tuple))
        q_dtype = quant_config.quant_dtype

        if q_dtype == "nvfp4" and envs.VLLM_DEEPEPLL_NVFP4_DISPATCH:
            logger.info_once(
                "Since VLLM_DEEPEPLL_NVFP4_DISPATCH==1, make sure "
                "using the hybrid-ep branch of DeepEP"
                "(https://github.com/deepseek-ai/DeepEP/tree/hybrid-ep)"
            )
            assert isinstance(x, tuple)
            x_scales = x[1]
            x = x[0].permute(2, 0, 1)
            num_experts, max_tokens, hidden_dim_by_2 = x.shape
            hidden_dim = hidden_dim_by_2 * 2
            assert envs.VLLM_FLASHINFER_MOE_BACKEND == "masked_gemm"
            logger.info_once(
                "Quantization is fused with DeepEP nvfp4 dispatch for "
                "FlashInfer CUTEDSL as VLLM_DEEPEPLL_NVFP4_DISPATCH==1"
            )
        else:
            if q_dtype == "nvfp4":
                q_dtype = None
                logger.info_once(
                    "Using DeepEP bfloat16 dispatch for FlashInfer CUTEDSL as "
                    "VLLM_DEEPEPLL_NVFP4_DISPATCH==0"
                )
            assert isinstance(x, torch.Tensor)
            num_experts, max_tokens, hidden_dim = x.size()

            # TODO (varun): Optimization - Use a batched version of quant
            x = x.view((-1, hidden_dim))
            x, x_scales = moe_kernel_quantize_input(
                x,
                quant_config.a1_scale,
                q_dtype,
                quant_config.per_act_token_quant,
                quant_config.block_shape,
            )
            x = x.view((num_experts, -1, hidden_dim))

        if q_dtype is not None and q_dtype != "nvfp4":
            assert x_scales is not None
            x_scales = normalize_batched_scales_shape(x_scales, num_experts)

        return x, x_scales

    def supports_async(self) -> bool:
        return True

    def prepare_async(
        self,
        a1: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        num_experts: int,
        expert_map: torch.Tensor | None,
        apply_router_weight_on_input: bool,
        quant_config: FusedMoEQuantConfig,
        defer_input_quant: bool = False,
    ) -> tuple[Callable, mk.ReceiverType]:
        if defer_input_quant:
            raise NotImplementedError(
                f"{self.__class__.__name__} does not support defer_input_quant=True. "
                "Please select an MoE kernel that accepts quantized inputs."
            )

        hidden_size = a1.size(1)
        assert hidden_size in self.SUPPORTED_HIDDEN_SIZES, (
            f"Hidden Size {hidden_size} not in supported list of hidden sizes"
            f"{self.SUPPORTED_HIDDEN_SIZES}"
        )

        a2a_idx = dbo_current_ubatch_id()

        if self.use_fp8_dispatch:
            assert hidden_size % 128 == 0, (
                "DeepEP kernels quantize the inputs in blocks of shape 128"
            )

        use_nvfp4 = False
        nvfp4_dispatch = (
            quant_config.quant_dtype == "nvfp4" and envs.VLLM_DEEPEPLL_NVFP4_DISPATCH
        )
        if nvfp4_dispatch:
            use_nvfp4 = True
        qc_a1_gscale_or_scale = (
            quant_config.a1_gscale if nvfp4_dispatch else quant_config.a1_scale
        )
        has_per_token_scales = (
            qc_a1_gscale_or_scale.numel() != 1
            if qc_a1_gscale_or_scale is not None
            else (
                quant_config.a2_scale.numel() != 1
                if quant_config.a2_scale is not None
                else False
            )
        )
        if not use_nvfp4:
            assert not has_per_token_scales, (
                "low_latency kernels doesn't support dispatching per-token scales"
            )

        if apply_router_weight_on_input:
            topk = topk_ids.size(1)
            # TODO: this only works for topK=1, will need to update for topK>1
            assert topk == 1, (
                "apply_router_weight_on_input is only implemented for topk=1"
            )
            a1 = a1 * topk_weights.to(a1.dtype)

        # Dispatch
        dispatch_topk_ids = self._map_global_to_physical_ids(topk_ids)
        expert_x, expert_num_tokens, handle, _, hook = self.buffer.low_latency_dispatch(
            a1,
            dispatch_topk_ids,
            self.max_tokens_per_rank,
            num_experts,
            use_fp8=self.use_fp8_dispatch,
            round_scale=self.use_ue8m0_dispatch,
            use_ue8m0=self.use_ue8m0_dispatch,
            **(dict(use_nvfp4=True) if use_nvfp4 else dict()),
            **(
                dict(x_global_scale=qc_a1_gscale_or_scale)
                if qc_a1_gscale_or_scale is not None
                else dict()
            ),
            async_finish=False,
            return_recv_hook=True,
        )
        self.handles[a2a_idx] = handle

        return (
            hook,
            lambda: self._receiver(
                expert_x,
                expert_num_tokens,
                quant_config.a1_scale,
                a1.dtype,
                quant_config,
            ),
        )

    def _receiver(
        self,
        expert_x: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
        expert_num_tokens: torch.Tensor,
        a1_scale: torch.Tensor | None,
        a1_dtype: torch.dtype,
        quant_config: FusedMoEQuantConfig,
    ) -> mk.PrepareResultType:
        expert_x, expert_x_scale = self._do_quant(expert_x, a1_dtype, quant_config)

        expert_tokens_meta = mk.ExpertTokensMetadata(
            expert_num_tokens=expert_num_tokens, expert_num_tokens_cpu=None
        )

        return expert_x, expert_x_scale, expert_tokens_meta, None, None

    def prepare(
        self,
        a1: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        num_experts: int,
        expert_map: torch.Tensor | None,
        apply_router_weight_on_input: bool,
        quant_config: FusedMoEQuantConfig,
        defer_input_quant: bool = False,
    ) -> mk.PrepareResultType:
        if defer_input_quant:
            raise NotImplementedError(
                f"{self.__class__.__name__} does not support defer_input_quant=True. "
                "Please select an MoE kernel that accepts quantized inputs."
            )
        hook, receiver = self.prepare_async(
            a1,
            topk_weights,
            topk_ids,
            num_experts,
            expert_map,
            apply_router_weight_on_input,
            quant_config,
        )
        hook()
        return receiver()

    def _finalize(
        self,
        output: torch.Tensor,
        fused_expert_output: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        apply_router_weight_on_input: bool,
        weight_and_reduce_impl: mk.TopKWeightAndReduce,
        do_async: bool,
    ) -> tuple[Callable, Callable]:
        assert isinstance(weight_and_reduce_impl, TopKWeightAndReduceDelegate), (
            "Weight application and reduction happens in the combine kernel."
        )

        a2a_idx = dbo_current_ubatch_id()
        do_recv_hook = dbo_enabled() or do_async
        handle = self.handles[a2a_idx]
        assert handle is not None

        combine_topk_weights = topk_weights
        if apply_router_weight_on_input:
            # weights have already been applied.
            combine_topk_weights = torch.ones_like(topk_weights)

        combine_topk_ids = self._map_global_to_physical_ids(topk_ids)
        # TODO (varun) : Enable zero copy mode
        dbo_maybe_run_recv_hook()
        _, _, recv_hook = self.buffer.low_latency_combine(
            fused_expert_output,
            combine_topk_ids,
            combine_topk_weights,
            handle,
            async_finish=False,
            zero_copy=False,
            return_recv_hook=do_recv_hook,
            out=output,
        )

        return recv_hook, lambda: None

    def finalize_async(
        self,
        output: torch.Tensor,
        fused_expert_output: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        apply_router_weight_on_input: bool,
        weight_and_reduce_impl: mk.TopKWeightAndReduce,
    ) -> tuple[Callable, Callable]:
        return self._finalize(
            output,
            fused_expert_output,
            topk_weights,
            topk_ids,
            apply_router_weight_on_input,
            weight_and_reduce_impl,
            do_async=True,
        )

    def finalize(
        self,
        output: torch.Tensor,
        fused_expert_output: torch.Tensor,
        topk_weights: torch.Tensor,
        topk_ids: torch.Tensor,
        apply_router_weight_on_input: bool,
        weight_and_reduce_impl: mk.TopKWeightAndReduce,
    ) -> None:
        self._finalize(
            output,
            fused_expert_output,
            topk_weights,
            topk_ids,
            apply_router_weight_on_input,
            weight_and_reduce_impl,
            do_async=False,
        )

SUPPORTED_HIDDEN_SIZES class-attribute instance-attribute

SUPPORTED_HIDDEN_SIZES = [
    2048,
    2560,
    3072,
    4096,
    5120,
    6144,
    7168,
    8192,
]

activation_format property

activation_format: FusedMoEActivationFormat

buffer instance-attribute

buffer = buffer

global_to_physical instance-attribute

global_to_physical = _maybe_cast(global_to_physical)

handles instance-attribute

handles: list[tuple | None] = [None, None]

local_expert_global_ids instance-attribute

local_expert_global_ids = _maybe_cast(
    local_expert_global_ids
)

max_tokens_per_rank instance-attribute

max_tokens_per_rank = max_tokens_per_rank

num_dispatchers_ instance-attribute

num_dispatchers_ = num_dispatchers

physical_to_global instance-attribute

physical_to_global = _maybe_cast(physical_to_global)

use_fp8_dispatch instance-attribute

use_fp8_dispatch = use_fp8_dispatch

use_ue8m0_dispatch instance-attribute

use_ue8m0_dispatch = False

__init__

__init__(
    buffer: Buffer,
    max_tokens_per_rank: int,
    num_dispatchers: int,
    use_fp8_dispatch: bool = False,
    global_to_physical: Tensor | None = None,
    physical_to_global: Tensor | None = None,
    local_expert_global_ids: Tensor | None = None,
)
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def __init__(
    self,
    buffer: deep_ep.Buffer,
    max_tokens_per_rank: int,
    num_dispatchers: int,
    use_fp8_dispatch: bool = False,
    global_to_physical: torch.Tensor | None = None,
    physical_to_global: torch.Tensor | None = None,
    local_expert_global_ids: torch.Tensor | None = None,
):
    super().__init__()

    self.buffer = buffer
    self.max_tokens_per_rank = max_tokens_per_rank
    self.use_fp8_dispatch = use_fp8_dispatch
    # The dispatch function returns a handle that the combine function
    # requires. We store the handle here so it is available to the
    # combine function.
    self.handles: list[tuple | None] = [None, None]
    self.num_dispatchers_ = num_dispatchers

    topk_indices_dtype = self.topk_indices_dtype()

    def _maybe_cast(tensor: torch.Tensor | None) -> torch.Tensor | None:
        if tensor is None or topk_indices_dtype is None:
            return tensor
        return tensor.to(dtype=topk_indices_dtype)

    self.global_to_physical = _maybe_cast(global_to_physical)
    self.physical_to_global = _maybe_cast(physical_to_global)
    self.local_expert_global_ids = _maybe_cast(local_expert_global_ids)

    # We don't have enough information to determine if we should dispatch
    # activation scales in a packed ue8m0 format during object construction
    # time. This setting is handled by post_init_setup.
    self.use_ue8m0_dispatch = False

_do_quant

_do_quant(
    x: Tensor | tuple[Tensor, Tensor],
    a1_dtype: dtype,
    quant_config: FusedMoEQuantConfig,
) -> tuple[Tensor, Tensor | None]
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def _do_quant(
    self,
    x: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
    a1_dtype: torch.dtype,
    quant_config: FusedMoEQuantConfig,
) -> tuple[torch.Tensor, torch.Tensor | None]:
    if self.use_fp8_dispatch:
        block_k = (
            quant_config.block_shape[1]
            if quant_config.block_shape is not None
            else None
        )
        if block_k == DEEPEP_QUANT_BLOCK_SIZE:
            # DeepEP kernels did the quantization for us.
            x, x_scales = x
            return x, x_scales

        # Dequant to get back the tokens in the datatype we dispatched in.
        x_fp8, x_scales = x
        x = dequant_fp8(x_fp8, x_scales).to(dtype=a1_dtype)

    assert isinstance(x, (torch.Tensor, tuple))
    q_dtype = quant_config.quant_dtype

    if q_dtype == "nvfp4" and envs.VLLM_DEEPEPLL_NVFP4_DISPATCH:
        logger.info_once(
            "Since VLLM_DEEPEPLL_NVFP4_DISPATCH==1, make sure "
            "using the hybrid-ep branch of DeepEP"
            "(https://github.com/deepseek-ai/DeepEP/tree/hybrid-ep)"
        )
        assert isinstance(x, tuple)
        x_scales = x[1]
        x = x[0].permute(2, 0, 1)
        num_experts, max_tokens, hidden_dim_by_2 = x.shape
        hidden_dim = hidden_dim_by_2 * 2
        assert envs.VLLM_FLASHINFER_MOE_BACKEND == "masked_gemm"
        logger.info_once(
            "Quantization is fused with DeepEP nvfp4 dispatch for "
            "FlashInfer CUTEDSL as VLLM_DEEPEPLL_NVFP4_DISPATCH==1"
        )
    else:
        if q_dtype == "nvfp4":
            q_dtype = None
            logger.info_once(
                "Using DeepEP bfloat16 dispatch for FlashInfer CUTEDSL as "
                "VLLM_DEEPEPLL_NVFP4_DISPATCH==0"
            )
        assert isinstance(x, torch.Tensor)
        num_experts, max_tokens, hidden_dim = x.size()

        # TODO (varun): Optimization - Use a batched version of quant
        x = x.view((-1, hidden_dim))
        x, x_scales = moe_kernel_quantize_input(
            x,
            quant_config.a1_scale,
            q_dtype,
            quant_config.per_act_token_quant,
            quant_config.block_shape,
        )
        x = x.view((num_experts, -1, hidden_dim))

    if q_dtype is not None and q_dtype != "nvfp4":
        assert x_scales is not None
        x_scales = normalize_batched_scales_shape(x_scales, num_experts)

    return x, x_scales

_finalize

_finalize(
    output: Tensor,
    fused_expert_output: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: TopKWeightAndReduce,
    do_async: bool,
) -> tuple[Callable, Callable]
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def _finalize(
    self,
    output: torch.Tensor,
    fused_expert_output: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: mk.TopKWeightAndReduce,
    do_async: bool,
) -> tuple[Callable, Callable]:
    assert isinstance(weight_and_reduce_impl, TopKWeightAndReduceDelegate), (
        "Weight application and reduction happens in the combine kernel."
    )

    a2a_idx = dbo_current_ubatch_id()
    do_recv_hook = dbo_enabled() or do_async
    handle = self.handles[a2a_idx]
    assert handle is not None

    combine_topk_weights = topk_weights
    if apply_router_weight_on_input:
        # weights have already been applied.
        combine_topk_weights = torch.ones_like(topk_weights)

    combine_topk_ids = self._map_global_to_physical_ids(topk_ids)
    # TODO (varun) : Enable zero copy mode
    dbo_maybe_run_recv_hook()
    _, _, recv_hook = self.buffer.low_latency_combine(
        fused_expert_output,
        combine_topk_ids,
        combine_topk_weights,
        handle,
        async_finish=False,
        zero_copy=False,
        return_recv_hook=do_recv_hook,
        out=output,
    )

    return recv_hook, lambda: None

_map_global_to_physical_ids

_map_global_to_physical_ids(topk_ids: Tensor) -> Tensor
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def _map_global_to_physical_ids(self, topk_ids: torch.Tensor) -> torch.Tensor:
    if self.global_to_physical is None:
        return topk_ids
    return self.global_to_physical[topk_ids]

_map_local_to_global_ids

_map_local_to_global_ids(expert_topk_ids: Tensor) -> Tensor
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def _map_local_to_global_ids(self, expert_topk_ids: torch.Tensor) -> torch.Tensor:
    if self.local_expert_global_ids is None:
        return expert_topk_ids
    return self.local_expert_global_ids[expert_topk_ids]

_receiver

_receiver(
    expert_x: Tensor | tuple[Tensor, Tensor],
    expert_num_tokens: Tensor,
    a1_scale: Tensor | None,
    a1_dtype: dtype,
    quant_config: FusedMoEQuantConfig,
) -> PrepareResultType
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def _receiver(
    self,
    expert_x: torch.Tensor | tuple[torch.Tensor, torch.Tensor],
    expert_num_tokens: torch.Tensor,
    a1_scale: torch.Tensor | None,
    a1_dtype: torch.dtype,
    quant_config: FusedMoEQuantConfig,
) -> mk.PrepareResultType:
    expert_x, expert_x_scale = self._do_quant(expert_x, a1_dtype, quant_config)

    expert_tokens_meta = mk.ExpertTokensMetadata(
        expert_num_tokens=expert_num_tokens, expert_num_tokens_cpu=None
    )

    return expert_x, expert_x_scale, expert_tokens_meta, None, None

finalize

finalize(
    output: Tensor,
    fused_expert_output: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: TopKWeightAndReduce,
) -> None
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def finalize(
    self,
    output: torch.Tensor,
    fused_expert_output: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: mk.TopKWeightAndReduce,
) -> None:
    self._finalize(
        output,
        fused_expert_output,
        topk_weights,
        topk_ids,
        apply_router_weight_on_input,
        weight_and_reduce_impl,
        do_async=False,
    )

finalize_async

finalize_async(
    output: Tensor,
    fused_expert_output: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: TopKWeightAndReduce,
) -> tuple[Callable, Callable]
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def finalize_async(
    self,
    output: torch.Tensor,
    fused_expert_output: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    apply_router_weight_on_input: bool,
    weight_and_reduce_impl: mk.TopKWeightAndReduce,
) -> tuple[Callable, Callable]:
    return self._finalize(
        output,
        fused_expert_output,
        topk_weights,
        topk_ids,
        apply_router_weight_on_input,
        weight_and_reduce_impl,
        do_async=True,
    )

max_num_tokens_per_rank

max_num_tokens_per_rank() -> int | None
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def max_num_tokens_per_rank(self) -> int | None:
    return self.max_tokens_per_rank

maybe_roundup_layer_hidden_size staticmethod

maybe_roundup_layer_hidden_size(hidden_size: int) -> int
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
@staticmethod
def maybe_roundup_layer_hidden_size(hidden_size: int) -> int:
    # Round up hidden size to the closest supported hidden size.
    _supported_hs = DeepEPLLPrepareAndFinalize.SUPPORTED_HIDDEN_SIZES
    # Check sorted
    num_supported_hs = len(_supported_hs)
    assert all(
        [
            _supported_hs[i] < _supported_hs[i + 1]
            for i in range(num_supported_hs - 1)
        ]
    )

    for x in _supported_hs:
        if x >= hidden_size:
            return x

    raise ValueError(
        f"Hidden Size {hidden_size} is greater than the "
        f"maximum supported hidden size {_supported_hs[-1]}"
    )

num_dispatchers

num_dispatchers() -> int
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def num_dispatchers(self) -> int:
    return self.num_dispatchers_

output_is_reduced

output_is_reduced() -> bool
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def output_is_reduced(self) -> bool:
    return True

post_init_setup

post_init_setup(
    fused_experts: FusedMoEPermuteExpertsUnpermute,
)
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def post_init_setup(self, fused_experts: mk.FusedMoEPermuteExpertsUnpermute):
    if not fused_experts.supports_packed_ue8m0_act_scales():
        # Early exit.
        return

    if self.use_fp8_dispatch:
        logger.debug_once(
            "Update DeepEPLLPrepareFinalize to do packed ue8m0 scales dispatch."
        )
        self.use_ue8m0_dispatch = True
    else:
        logger.warning_once(
            "DeepEPLLPrepareAndFinalize is setup to dispatch raw/unquantized "
            f"activations despite ({fused_experts.__class__.__name__}) being able "
            "to support quantized activations.",
            scope="local",
        )

prepare

prepare(
    a1: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    num_experts: int,
    expert_map: Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> PrepareResultType
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def prepare(
    self,
    a1: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    num_experts: int,
    expert_map: torch.Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> mk.PrepareResultType:
    if defer_input_quant:
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support defer_input_quant=True. "
            "Please select an MoE kernel that accepts quantized inputs."
        )
    hook, receiver = self.prepare_async(
        a1,
        topk_weights,
        topk_ids,
        num_experts,
        expert_map,
        apply_router_weight_on_input,
        quant_config,
    )
    hook()
    return receiver()

prepare_async

prepare_async(
    a1: Tensor,
    topk_weights: Tensor,
    topk_ids: Tensor,
    num_experts: int,
    expert_map: Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> tuple[Callable, ReceiverType]
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def prepare_async(
    self,
    a1: torch.Tensor,
    topk_weights: torch.Tensor,
    topk_ids: torch.Tensor,
    num_experts: int,
    expert_map: torch.Tensor | None,
    apply_router_weight_on_input: bool,
    quant_config: FusedMoEQuantConfig,
    defer_input_quant: bool = False,
) -> tuple[Callable, mk.ReceiverType]:
    if defer_input_quant:
        raise NotImplementedError(
            f"{self.__class__.__name__} does not support defer_input_quant=True. "
            "Please select an MoE kernel that accepts quantized inputs."
        )

    hidden_size = a1.size(1)
    assert hidden_size in self.SUPPORTED_HIDDEN_SIZES, (
        f"Hidden Size {hidden_size} not in supported list of hidden sizes"
        f"{self.SUPPORTED_HIDDEN_SIZES}"
    )

    a2a_idx = dbo_current_ubatch_id()

    if self.use_fp8_dispatch:
        assert hidden_size % 128 == 0, (
            "DeepEP kernels quantize the inputs in blocks of shape 128"
        )

    use_nvfp4 = False
    nvfp4_dispatch = (
        quant_config.quant_dtype == "nvfp4" and envs.VLLM_DEEPEPLL_NVFP4_DISPATCH
    )
    if nvfp4_dispatch:
        use_nvfp4 = True
    qc_a1_gscale_or_scale = (
        quant_config.a1_gscale if nvfp4_dispatch else quant_config.a1_scale
    )
    has_per_token_scales = (
        qc_a1_gscale_or_scale.numel() != 1
        if qc_a1_gscale_or_scale is not None
        else (
            quant_config.a2_scale.numel() != 1
            if quant_config.a2_scale is not None
            else False
        )
    )
    if not use_nvfp4:
        assert not has_per_token_scales, (
            "low_latency kernels doesn't support dispatching per-token scales"
        )

    if apply_router_weight_on_input:
        topk = topk_ids.size(1)
        # TODO: this only works for topK=1, will need to update for topK>1
        assert topk == 1, (
            "apply_router_weight_on_input is only implemented for topk=1"
        )
        a1 = a1 * topk_weights.to(a1.dtype)

    # Dispatch
    dispatch_topk_ids = self._map_global_to_physical_ids(topk_ids)
    expert_x, expert_num_tokens, handle, _, hook = self.buffer.low_latency_dispatch(
        a1,
        dispatch_topk_ids,
        self.max_tokens_per_rank,
        num_experts,
        use_fp8=self.use_fp8_dispatch,
        round_scale=self.use_ue8m0_dispatch,
        use_ue8m0=self.use_ue8m0_dispatch,
        **(dict(use_nvfp4=True) if use_nvfp4 else dict()),
        **(
            dict(x_global_scale=qc_a1_gscale_or_scale)
            if qc_a1_gscale_or_scale is not None
            else dict()
        ),
        async_finish=False,
        return_recv_hook=True,
    )
    self.handles[a2a_idx] = handle

    return (
        hook,
        lambda: self._receiver(
            expert_x,
            expert_num_tokens,
            quant_config.a1_scale,
            a1.dtype,
            quant_config,
        ),
    )

supports_async

supports_async() -> bool
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def supports_async(self) -> bool:
    return True

topk_indices_dtype

topk_indices_dtype() -> dtype | None
Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def topk_indices_dtype(self) -> torch.dtype | None:
    return torch.int64

dequant_fp8

dequant_fp8(
    expert_x_fp8: Tensor, expert_x_scales: Tensor
) -> Tensor

Return dequantized tensor in fp32

Source code in vllm/model_executor/layers/fused_moe/deepep_ll_prepare_finalize.py
def dequant_fp8(
    expert_x_fp8: torch.Tensor, expert_x_scales: torch.Tensor
) -> torch.Tensor:
    """
    Return dequantized tensor in fp32
    """
    # TODO (varun) : Optimize leverage num_tokens_per_expert counts
    assert expert_x_fp8.is_contiguous()
    expert_x_scales = expert_x_scales.contiguous()
    num_experts = expert_x_fp8.size(0)

    expert_x_fp32 = expert_x_fp8.to(torch.float32).view(
        num_experts, -1, DEEPEP_QUANT_BLOCK_SIZE
    )
    expert_x_scales = expert_x_scales.view(num_experts, -1, 1)
    return (expert_x_fp32 * expert_x_scales).view(expert_x_fp8.size())