3 Commits

Author SHA1 Message Date
  LookAround0301 bb7b74c14f
add ut for model runner (#4991) 1 day ago
  wangxiyuan 8090914d69
[CI] CI refactor (#4928) 1 day ago
  AlvisGong ba28d54f35
[Perf]enable prefill flashcommon3 (#4065) 1 day ago
28 changed files with 546 additions and 307 deletions
Split View
  1. +0
    -0
      .github/workflows/bot_merge_conflict.yml
  2. +0
    -0
      .github/workflows/bot_pr_create.yaml
  3. +0
    -0
      .github/workflows/labled_doctest.yaml
  4. +0
    -0
      .github/workflows/labled_test_310.yaml
  5. +0
    -0
      .github/workflows/nightly_test_a2.yaml
  6. +0
    -0
      .github/workflows/nightly_test_a3.yaml
  7. +0
    -0
      .github/workflows/pr_tag_image_build_and_push.yaml
  8. +0
    -0
      .github/workflows/pr_tag_release_code_and_wheel.yml
  9. +0
    -0
      .github/workflows/pr_test_full.yaml
  10. +0
    -0
      .github/workflows/pr_test_light.yaml
  11. +0
    -0
      .github/workflows/schedule_test_benchmarks.yaml
  12. +2
    -2
      .github/workflows/schedule_test_vllm_main.yaml
  13. +0
    -172
      .github/workflows/vllm_ascend_test_report.yaml
  14. +0
    -20
      docs/source/developer_guide/evaluation/accuracy_report/DeepSeek-V2-Lite.md
  15. +0
    -19
      docs/source/developer_guide/evaluation/accuracy_report/Qwen2.5-VL-7B-Instruct.md
  16. +0
    -21
      docs/source/developer_guide/evaluation/accuracy_report/Qwen3-30B-A3B.md
  17. +0
    -21
      docs/source/developer_guide/evaluation/accuracy_report/Qwen3-8B-Base.md
  18. +0
    -10
      docs/source/developer_guide/evaluation/accuracy_report/index.md
  19. +0
    -1
      docs/source/developer_guide/evaluation/index.md
  20. +4
    -0
      tests/ut/ops/test_prepare_finalize.py
  21. +304
    -0
      tests/ut/worker/test_model_runner_v1.py
  22. +2
    -0
      vllm_ascend/ascend_config.py
  23. +21
    -1
      vllm_ascend/distributed/parallel_state.py
  24. +32
    -1
      vllm_ascend/distributed/utils.py
  25. +42
    -0
      vllm_ascend/flash_common3_context.py
  26. +87
    -21
      vllm_ascend/ops/fused_moe/fused_moe.py
  27. +29
    -5
      vllm_ascend/ops/fused_moe/prepare_finalize.py
  28. +23
    -13
      vllm_ascend/quantization/w8a8_dynamic.py

.github/workflows/label_merge_conflict.yml → .github/workflows/bot_merge_conflict.yml View File


.github/workflows/pr_create.yaml → .github/workflows/bot_pr_create.yaml View File


.github/workflows/vllm_ascend_doctest.yaml → .github/workflows/labled_doctest.yaml View File


.github/workflows/vllm_ascend_test_310p.yaml → .github/workflows/labled_test_310.yaml View File


.github/workflows/vllm_ascend_test_nightly_a2.yaml → .github/workflows/nightly_test_a2.yaml View File


.github/workflows/vllm_ascend_test_nightly_a3.yaml → .github/workflows/nightly_test_a3.yaml View File


.github/workflows/image_build_and_push.yaml → .github/workflows/pr_tag_image_build_and_push.yaml View File


.github/workflows/release_code_and_wheel.yml → .github/workflows/pr_tag_release_code_and_wheel.yml View File


.github/workflows/vllm_ascend_test_pr_full.yaml → .github/workflows/pr_test_full.yaml View File


.github/workflows/vllm_ascend_test_pr_light.yaml → .github/workflows/pr_test_light.yaml View File


.github/workflows/nightly_benchmarks.yaml → .github/workflows/schedule_test_benchmarks.yaml View File


.github/workflows/vllm_ascend_test_full_vllm_main.yaml → .github/workflows/schedule_test_vllm_main.yaml View File

@@ -17,9 +17,9 @@
name: 'ascend test / vllm main'

on:
# Run full e2e tests per 2h
# Run full e2e tests per 4h
schedule:
- cron: '0 */2 * * *'
- cron: '0 */4 * * *'
workflow_dispatch:

# Bash shells do not use ~/.profile or ~/.bashrc so these shells need to be explicitly

+ 0
- 172
.github/workflows/vllm_ascend_test_report.yaml View File

@@ -1,172 +0,0 @@
#
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.
#

# This test will be triggered:
# 1. schedule
# 2. pull_request change the related files
# 3. workflow_dispatch with models input

name: ascend test / accuracy report

on:
pull_request:
branches:
- 'main'
- '*-dev'
paths:
- '.github/workflows/vllm_ascend_test_report.yaml'
- 'tests/e2e/models/test_lm_eval_correctness.py'
workflow_dispatch:
inputs:
vllm-ascend-version:
description: 'vllm-ascend:'
required: true
type: choice
# Current supported vLLM versions
options:
- latest
- main
default: main

# Bash shells do not use ~/.profile or ~/.bashrc so these shells need to be explicitly
# declared as "shell: bash -el {0}" on steps that need to be properly activated.
# It's used to activate ascend-toolkit environment variables.
defaults:
run:
shell: bash -el {0}

# only cancel in-progress runs of the same workflow
concurrency:
group: ${{ github.workflow }}-${{ github.ref }}
cancel-in-progress: true

jobs:
run:
strategy:
fail-fast: false
matrix:
include:
- runner: linux-aarch64-a2-1
model_list:
- Qwen3-8B
- Qwen2.5-VL-7B-Instruct
- Qwen2-Audio-7B-Instruct
- runner: linux-aarch64-a2-2
model_list:
- Qwen3-30B-A3B
- Qwen3-VL-30B-A3B-Instruct
- DeepSeek-V2-Lite
uses: ./.github/workflows/_e2e_nightly_single_node_models.yaml
with:
vllm: v0.12.0
runner: ${{ matrix.runner }}
image: swr.cn-southwest-2.myhuaweicloud.com/base_image/ascend-ci/cann:8.3.rc2-910b-ubuntu22.04-py3.11
model_list: ${{ toJson(matrix.model_list) }}
upload: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.vllm-ascend-version == 'latest' }}

create_pr:
runs-on: ubuntu-latest
needs: run
if: ${{ github.event_name == 'workflow_dispatch' && github.event.inputs.vllm-ascend-version == 'latest' }}
env:
UPSTREAM_REPO: vllm-project/vllm-ascend
steps:
- name: Checkout repository
uses: actions/checkout@v6.0.1
with:
repository: vllm-ascend-ci/vllm-ascend
token: ${{ secrets.PAT_TOKEN }}
ref: main
- name: Add upstream remote
run: |
git remote add upstream https://github.com/${{ env.UPSTREAM_REPO }}.git
git fetch upstream
git remote -v

- name: Set Git user info dynamically
run: |
git config user.name "${{ github.actor }}"
git config user.email "${{ github.actor }}@users.noreply.github.com"

- name: Create or switch to branch
run: |
TIMESTAMP=$(date +%Y%m%d%H%M%S)
BRANCH_NAME="auto-pr/accuracy-report-${TIMESTAMP}"
echo "BRANCH_NAME=${BRANCH_NAME}" >> $GITHUB_ENV
git checkout -B "${BRANCH_NAME}" upstream/main

- name: Download only current run reports
uses: actions/download-artifact@v6
with:
path: ./docs/source/developer_guide/evaluation/accuracy_report
pattern: report-*
github-token: ${{ secrets.GITHUB_TOKEN }}
run-id: ${{ github.run_id }}

- name: Delete old report
run: |
find ./docs/source/developer_guide/evaluation/accuracy_report -maxdepth 1 -type f -name '*.md' ! -name 'index.md' -delete
find ./docs/source/developer_guide/evaluation/accuracy_report -mindepth 2 -type f -name '*.md' -exec mv -f {} ./docs/source/developer_guide/evaluation/accuracy_report \;
find ./docs/source/developer_guide/evaluation/accuracy_report -mindepth 1 -type d -empty -delete
- name: Update accuracy_report/index.md
run: |
REPORT_DIR="./docs/source/developer_guide/evaluation/accuracy_report"
INDEX_MD="$REPORT_DIR/index.md"
{
echo "# Accuracy Report"
echo ""
echo ":::{toctree}"
echo ":caption: Accuracy Report"
echo ":maxdepth: 1"
for report in "$REPORT_DIR"/*.md; do
filename="$(basename "$report" .md)"
if [ "$filename" != "index" ]; then
echo "$filename"
fi
done
echo ":::"
} > "$INDEX_MD"

- name: push accuracy report
env:
GITHUB_TOKEN: ${{ secrets.PAT_TOKEN }}
run: |
git add ./docs/source/developer_guide/evaluation/accuracy_report/*.md
git commit -s -m "[Doc] Update accuracy reports for ${{ env.BRANCH_NAME }}"
git push -f origin "${{ env.BRANCH_NAME }}"

- name: Create PR in upstream via API
uses: actions/github-script@v8
with:
github-token: ${{ secrets.PAT_TOKEN }}
script: |
const pr = await github.rest.pulls.create({
owner: 'vllm-project',
repo: 'vllm-ascend',
head: `vllm-ascend-ci:${{ env.BRANCH_NAME }}`,
base: 'main',
title: `[Doc] Update accuracy reports for ${{ env.BRANCH_NAME }}`,
body: `The accuracy results running on NPU Altlas A2 have changed, updating reports for: All models
- [Workflow run][1]
[1]: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}`
});
core.info(`Created PR #${pr.data.number}`);

+ 0
- 20
docs/source/developer_guide/evaluation/accuracy_report/DeepSeek-V2-Lite.md View File

@@ -1,20 +0,0 @@
# deepseek-ai/DeepSeek-V2-Lite

- **vLLM Version**: vLLM: 0.10.1.1 ([1da94e6](https://github.com/vllm-project/vllm/commit/1da94e6)), **vLLM Ascend Version**: v0.10.1rc1 ([7e16b4a](https://github.com/vllm-project/vllm-ascend/commit/7e16b4a))
- **Software Environment**: **CANN**: 8.2.RC1, **PyTorch**: 2.7.1, **torch-npu**: 2.7.1.dev20250724
- **Hardware Environment**: Atlas A2 Series
- **Parallel mode**: TP2
- **Execution mode**: ACLGraph

**Command**:

```bash
export MODEL_ARGS='pretrained=deepseek-ai/DeepSeek-V2-Lite,tensor_parallel_size=2,dtype=auto,trust_remote_code=True,max_model_len=4096,enforce_eager=True'
lm_eval --model vllm --model_args $MODEL_ARGS --tasks gsm8k \
--batch_size auto
```

| Task | Metric | Value | Stderr |
|-----------------------|-------------|----------:|-------:|
| gsm8k | exact_match,strict-match | ✅0.3813 | ± 0.0134 |
| gsm8k | exact_match,flexible-extract | ✅0.3836 | ± 0.0134 |

+ 0
- 19
docs/source/developer_guide/evaluation/accuracy_report/Qwen2.5-VL-7B-Instruct.md View File

@@ -1,19 +0,0 @@
# Qwen/Qwen2.5-VL-7B-Instruct

- **vLLM Version**: vLLM: 0.10.1.1 ([1da94e6](https://github.com/vllm-project/vllm/commit/1da94e6)), **vLLM Ascend Version**: v0.10.1rc1 ([7e16b4a](https://github.com/vllm-project/vllm-ascend/commit/7e16b4a))
- **Software Environment**: **CANN**: 8.2.RC1, **PyTorch**: 2.7.1, **torch-npu**: 2.7.1.dev20250724
- **Hardware Environment**: Atlas A2 Series
- **Parallel mode**: TP1
- **Execution mode**: ACLGraph

**Command**:

```bash
export MODEL_ARGS='pretrained=Qwen/Qwen2.5-VL-7B-Instruct,tensor_parallel_size=1,dtype=auto,trust_remote_code=False,max_model_len=8192'
lm_eval --model vllm-vlm --model_args $MODEL_ARGS --tasks mmmu_val \
--apply_chat_template True --fewshot_as_multiturn True --batch_size auto
```

| Task | Metric | Value | Stderr |
|-----------------------|-------------|----------:|-------:|
| mmmu_val | acc,none | ✅0.52 | ± 0.0162 |

+ 0
- 21
docs/source/developer_guide/evaluation/accuracy_report/Qwen3-30B-A3B.md View File

@@ -1,21 +0,0 @@
# Qwen/Qwen3-30B-A3B

- **vLLM Version**: vLLM: 0.10.1.1 ([1da94e6](https://github.com/vllm-project/vllm/commit/1da94e6)), **vLLM Ascend Version**: v0.10.1rc1 ([7e16b4a](https://github.com/vllm-project/vllm-ascend/commit/7e16b4a))
- **Software Environment**: **CANN**: 8.2.RC1, **PyTorch**: 2.7.1, **torch-npu**: 2.7.1.dev20250724
- **Hardware Environment**: Atlas A2 Series
- **Parallel mode**: TP2 + EP
- **Execution mode**: ACLGraph

**Command**:

```bash
export MODEL_ARGS='pretrained=Qwen/Qwen3-30B-A3B,tensor_parallel_size=2,dtype=auto,trust_remote_code=False,max_model_len=4096,gpu_memory_utilization=0.6,enable_expert_parallel=True'
lm_eval --model vllm --model_args $MODEL_ARGS --tasks gsm8k,ceval-valid \
--num_fewshot 5 --batch_size auto
```

| Task | Metric | Value | Stderr |
|-----------------------|-------------|----------:|-------:|
| gsm8k | exact_match,strict-match | ✅0.8923 | ± 0.0085 |
| gsm8k | exact_match,flexible-extract | ✅0.8506 | ± 0.0098 |
| ceval-valid | acc,none | ✅0.8358 | ± 0.0099 |

+ 0
- 21
docs/source/developer_guide/evaluation/accuracy_report/Qwen3-8B-Base.md View File

@@ -1,21 +0,0 @@
# Qwen/Qwen3-8B-Base

- **vLLM Version**: vLLM: 0.10.1.1 ([1da94e6](https://github.com/vllm-project/vllm/commit/1da94e6)), **vLLM Ascend Version**: v0.10.1rc1 ([7e16b4a](https://github.com/vllm-project/vllm-ascend/commit/7e16b4a))
- **Software Environment**: **CANN**: 8.2.RC1, **PyTorch**: 2.7.1, **torch-npu**: 2.7.1.dev20250724
- **Hardware Environment**: Atlas A2 Series
- **Parallel mode**: TP1
- **Execution mode**: ACLGraph

**Command**:

```bash
export MODEL_ARGS='pretrained=Qwen/Qwen3-8B-Base,tensor_parallel_size=1,dtype=auto,trust_remote_code=False,max_model_len=4096'
lm_eval --model vllm --model_args $MODEL_ARGS --tasks gsm8k,ceval-valid \
--apply_chat_template True --fewshot_as_multiturn True --num_fewshot 5 --batch_size auto
```

| Task | Metric | Value | Stderr |
|-----------------------|-------------|----------:|-------:|
| gsm8k | exact_match,strict-match | ✅0.8271 | ± 0.0104 |
| gsm8k | exact_match,flexible-extract | ✅0.8294 | ± 0.0104 |
| ceval-valid | acc,none | ✅0.815 | ± 0.0103 |

+ 0
- 10
docs/source/developer_guide/evaluation/accuracy_report/index.md View File

@@ -1,10 +0,0 @@
# Accuracy Report

:::{toctree}
:caption: Accuracy Report
:maxdepth: 1
DeepSeek-V2-Lite
Qwen2.5-VL-7B-Instruct
Qwen3-30B-A3B
Qwen3-8B-Base
:::

+ 0
- 1
docs/source/developer_guide/evaluation/index.md View File

@@ -7,5 +7,4 @@ using_evalscope
using_lm_eval
using_ais_bench
using_opencompass
accuracy_report/index
:::

+ 4
- 0
tests/ut/ops/test_prepare_finalize.py View File

@@ -13,6 +13,10 @@ class TestPrepareAndFinalize(unittest.TestCase):
def setUp(self):
# Mock FusedMoEConfig
fake_stream = MagicMock()
patcher = patch("torch.npu.Stream", return_value=fake_stream)
patcher.start()
self.addCleanup(patcher.stop)
self.moe_config = MagicMock(spec=FusedMoEConfig)
self.moe_config.tp_group = MagicMock()
self.moe_config.tp_group.device_group = MagicMock()


+ 304
- 0
tests/ut/worker/test_model_runner_v1.py View File

@@ -0,0 +1,304 @@
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# This file is a part of the vllm-ascend project.

from unittest.mock import MagicMock

import numpy as np
import pytest
import torch

from vllm_ascend.worker.model_runner_v1 import NPUModelRunner


@pytest.mark.parametrize(
"pcp_size, dcp_size, num_reqs, query_lens, num_decodes, use_mla, total_tokens, expect_not_none",
[
(1, 1, 5, [10, 20, 30, 40, 50], 2, False, 100, False),
(1, 2, 3, [20, 30, 40], 1, False, 50, True),
(2, 1, 4, [5, 10, 40, 60], 2, False, 100, True),
(2, 1, 4, [5, 10, 40, 60], 2, True, 100, True),
(2, 1, 3, [5, 10, 15], 3, False, 50, True),
(2, 1, 3, [40, 50, 60], 0, False, 150, True),
])
def test_generate_pcp_metadata_basic(pcp_size, dcp_size, num_reqs, query_lens,
num_decodes, use_mla, total_tokens,
expect_not_none):
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.pcp_size = pcp_size
mock_runner.dcp_size = dcp_size
mock_runner.decode_threshold = 4
mock_runner.pcp_rank = 0
mock_runner.device = torch.device('cpu')
mock_runner.dtype = torch.float32

mock_runner.parallel_config = MagicMock()
mock_runner.parallel_config.cp_kv_cache_interleave_size = 64

mock_runner.vllm_config = MagicMock()
mock_runner.vllm_config.model_config = MagicMock()
mock_runner.vllm_config.model_config.use_mla = use_mla

mock_runner.input_batch = MagicMock()
mock_runner.input_batch.num_reqs = num_reqs

num_computed_tokens = []
num_prompt_tokens = []
num_tokens = []

for i in range(num_reqs):
if i < num_decodes:
num_computed_tokens.append(query_lens[i])
num_prompt_tokens.append(query_lens[i] // 2)
num_tokens.append(query_lens[i])
else:
num_computed_tokens.append(0)
num_prompt_tokens.append(query_lens[i])
num_tokens.append(query_lens[i])

mock_runner.input_batch.num_computed_tokens_cpu = torch.tensor(
num_computed_tokens)
mock_runner.input_batch.num_prompt_tokens = torch.tensor(num_prompt_tokens)
mock_runner.input_batch.num_tokens = torch.tensor(num_tokens)

mock_runner.query_lens = torch.tensor(query_lens)

mock_runner._get_cp_local_seq_lens = NPUModelRunner._get_cp_local_seq_lens.__get__(
mock_runner, NPUModelRunner)

mock_runner.pcp_allgather_restore_idx = torch.arange(total_tokens * 2)
mock_runner.cp_kv_recover_idx_for_chunk = torch.arange(total_tokens)

mock_runner.long_seq_metadata = None
mock_runner.num_actual_tokens_pcp_padded = 0
mock_runner.kv_idx_names = {}
mock_runner.extra_long_seq_kwargs = {}
mock_runner.attn_mask = None
mock_runner.q_head_idx_tensor = None
mock_runner.q_tail_idx_tensor = None
mock_runner.q_full_idx = None

method = NPUModelRunner._generate_pcp_metadata.__get__(
mock_runner, NPUModelRunner)
result = method(total_tokens)

if not expect_not_none:
assert result is None, f"Expected to return None, but got {type(result)}"
else:
assert result is not None, "Expected to return a metadata object, but got None."

assert hasattr(result, 'num_actual_tokens_pcp_padded')
assert hasattr(result, 'num_computed_tokens_of_pcp_dcp')

if pcp_size > 1:
assert hasattr(result, 'pcp_allgather_restore_idx')

has_prefill_requests = (num_reqs - num_decodes) > 0
if has_prefill_requests:
assert hasattr(result, 'q_head_idx_tensor')
assert hasattr(result, 'q_tail_idx_tensor')
assert hasattr(result, 'q_full_idx')
assert hasattr(result, 'kv_with_q_head_nomask_idx_tensor')
assert hasattr(result, 'kv_with_q_head_mask_idx_tensor')
assert hasattr(result, 'kv_with_q_tail_nomask_idx_tensor')
assert hasattr(result, 'kv_with_q_tail_mask_idx_tensor')
assert hasattr(result, 'attn_mask_seqlens')
assert hasattr(result, 'head_attn_nomask_seqlens')
assert hasattr(result, 'tail_attn_nomask_seqlens')

if hasattr(result, 'pcp_prefill_mask'
) and result.pcp_prefill_mask is not None:
if use_mla:
assert result.pcp_prefill_mask.shape == (512, 512)
else:
assert result.pcp_prefill_mask.shape == (2048, 2048)
else:
if hasattr(result, 'pcp_prefill_mask'):
if result.pcp_prefill_mask is not None:
if use_mla:
assert result.pcp_prefill_mask.shape == (512, 512)
else:
assert result.pcp_prefill_mask.shape == (2048,
2048)


def test_generate_pcp_metadata_edge_cases():
mock_runner = MagicMock()
mock_runner.pcp_size = 2
mock_runner.dcp_size = 1
mock_runner.input_batch = MagicMock()
mock_runner.input_batch.num_reqs = 0
mock_runner.query_lens = torch.tensor([10, 20, 30])

assert (mock_runner.input_batch.num_reqs
or mock_runner.query_lens.size(0)) == 3

mock_runner.input_batch.num_reqs = 100
mock_runner.query_lens = torch.ones(100) * 1000

for rank in [0, 1]:
mock_runner.pcp_rank = rank
q_head_chunk_id = rank
q_tail_chunk_id = 2 * 2 - 1 - rank
assert q_head_chunk_id == rank
assert q_tail_chunk_id == 3 - rank


def test_pcp_allgather_restore_idx_slicing():
mock_runner = MagicMock()
mock_runner.pcp_size = 2
mock_runner.pcp_allgather_restore_idx = torch.arange(1000)

total_num_scheduled_tokens = 200
num_actual_tokens_pcp_padded = total_num_scheduled_tokens * 2

expected_slice = mock_runner.pcp_allgather_restore_idx[:
num_actual_tokens_pcp_padded]
assert len(expected_slice) == 400
assert expected_slice[0] == 0
assert expected_slice[-1] == 399


@pytest.mark.parametrize(
"tokens, num_reqs, num_computed_tokens, num_prompt_tokens, pcp_size, pcp_rank, expected_pcp_tokens",
[
# Case 1: prefill only
([8, 12, 16], 3, [0, 0, 0], [8, 12, 16], 4, 0, [2, 4, 4]),

# Case 2: mix prefill and decode
([8, 4, 12], 3, [8, 4, 0], [8, 4, 12], 4, 0, [8, 4, 4]),

# Case 3: request which need to be padded
([3, 7, 9], 3, [0, 0, 0], [3, 7, 9], 4, 0, [2, 2, 4]),

# Case 4: single request
([10], 1, [0], [10], 4, 0, [4]),
])
def test_update_tokens_for_pcp_basic(tokens, num_reqs, num_computed_tokens,
num_prompt_tokens, pcp_size, pcp_rank,
expected_pcp_tokens):
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.pcp_size = pcp_size
mock_runner.pcp_rank = pcp_rank

mock_runner.input_batch = MagicMock()
mock_runner.input_batch.num_reqs = num_reqs
mock_runner.input_batch.num_computed_tokens_cpu = np.array(
num_computed_tokens, dtype=np.int32)
mock_runner.input_batch.num_prompt_tokens = np.array(num_prompt_tokens,
dtype=np.int32)

mock_runner.pcp_allgather_restore_idx = torch.zeros(1000, dtype=torch.long)

mock_runner.num_pcp_pads = [0] * num_reqs
mock_runner.arange_np = np.arange(10000)

mock_runner._update_tokens_for_pcp = NPUModelRunner._update_tokens_for_pcp.__get__(
mock_runner, NPUModelRunner)
mock_runner._get_cumsum_and_arange = NPUModelRunner._get_cumsum_and_arange.__get__(
mock_runner, NPUModelRunner)

pcp_tokens_result, positions_result, unpad_mask_result = mock_runner._update_tokens_for_pcp(
tokens)

assert np.array_equal(pcp_tokens_result, expected_pcp_tokens), \
f"Expected pcp_tokens: {expected_pcp_tokens}, got: {pcp_tokens_result}"

total_pcp_tokens: int = np.sum(pcp_tokens_result)
assert positions_result.shape == (total_pcp_tokens,), \
f"Positions shape mismatch. Expected length {total_pcp_tokens}, got {positions_result.shape}"

padded_tokens = [
(t + 2 * pcp_size - 1) // (2 * pcp_size) *
(2 * pcp_size) if num_computed_tokens[i] == 0 else t * pcp_size
for i, t in enumerate(tokens)
]
total_padded_tokens: int = np.sum(padded_tokens)
assert unpad_mask_result.shape[0] == total_padded_tokens, \
f"unpad_mask size mismatch: expected {total_padded_tokens}, got {unpad_mask_result.shape[0]}"


def test_update_tokens_for_pcp_with_padding():
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.pcp_size = 4
mock_runner.pcp_rank = 0

mock_runner.arange_np = np.arange(10000)

mock_runner.input_batch = MagicMock()
mock_runner.input_batch.num_reqs = 3
mock_runner.input_batch.num_computed_tokens_cpu = np.array([0, 0, 0],
dtype=np.int32)
mock_runner.input_batch.num_prompt_tokens = np.array([5, 9, 13],
dtype=np.int32)

mock_runner.num_pcp_pads = [0, 0, 0]
mock_runner.pcp_allgather_restore_idx = torch.zeros(1000, dtype=torch.long)

mock_runner._update_tokens_for_pcp = NPUModelRunner._update_tokens_for_pcp.__get__(
mock_runner, NPUModelRunner)
mock_runner._get_cumsum_and_arange = NPUModelRunner._get_cumsum_and_arange.__get__(
mock_runner, NPUModelRunner)

tokens = [5, 9, 13]

pcp_tokens, positions, unpad_mask = mock_runner._update_tokens_for_pcp(
tokens)

expected_pcp_tokens = [2, 4, 4]
assert np.array_equal(pcp_tokens, expected_pcp_tokens), \
f"Expected {expected_pcp_tokens}, got {pcp_tokens}"

expected_pads = [3, 7, 3]
assert np.array_equal(mock_runner.num_pcp_pads, expected_pads), \
f"Expected padding {expected_pads}, got {mock_runner.num_pcp_pads}"


def test_update_tokens_for_pcp_unpad_mask():
mock_runner = MagicMock(spec=NPUModelRunner)
mock_runner.pcp_size = 4
mock_runner.pcp_rank = 0

mock_runner.arange_np = np.arange(10000)

mock_runner.input_batch = MagicMock()
mock_runner.input_batch.num_reqs = 2
mock_runner.input_batch.num_computed_tokens_cpu = np.array([0, 0],
dtype=np.int32)
mock_runner.input_batch.num_prompt_tokens = np.array([5, 7],
dtype=np.int32)

mock_runner.num_pcp_pads = [0, 0]
mock_runner.pcp_allgather_restore_idx = torch.zeros(1000, dtype=torch.long)

mock_runner._update_tokens_for_pcp = NPUModelRunner._update_tokens_for_pcp.__get__(
mock_runner, NPUModelRunner)
mock_runner._get_cumsum_and_arange = NPUModelRunner._get_cumsum_and_arange.__get__(
mock_runner, NPUModelRunner)

tokens = [5, 7]

pcp_tokens, positions, unpad_mask = mock_runner._update_tokens_for_pcp(
tokens)

assert unpad_mask.dtype == torch.bool, \
f"unpad_mask should be bool, got {unpad_mask.dtype}"

padded_tokens = [8, 8]
expected_length = sum(padded_tokens)
assert unpad_mask.shape[0] == expected_length, \
f"unpad_mask length mismatch: expected {expected_length}, got {unpad_mask.shape[0]}"

expected_mask = [True] * 5 + [False] * 3 + [True] * 7 + [False] * 1
actual_mask = unpad_mask.numpy().tolist()
assert actual_mask == expected_mask, \
f"unpad_mask incorrect. Expected {expected_mask}, got {actual_mask}"

+ 2
- 0
vllm_ascend/ascend_config.py View File

@@ -106,6 +106,8 @@ class AscendConfig:
enable_shared_expert_dp=True)
self.multistream_overlap_shared_expert = additional_config.get(
"multistream_overlap_shared_expert", False)
self.multistream_overlap_gate = additional_config.get(
"multistream_overlap_gate", False)
self.recompute_scheduler_enable = additional_config.get(
"recompute_scheduler_enable", False)
self.enable_cpu_binding = additional_config.get(


+ 21
- 1
vllm_ascend/distributed/parallel_state.py View File

@@ -20,9 +20,10 @@ _OTP: Optional[GroupCoordinator] = None
_LMTP: Optional[GroupCoordinator] = None
_EMBED_TP: Optional[GroupCoordinator] = None

# flashcomm2 specific groups
# flashcomm specific groups
_FLASHCOMM2_OTP: Optional[GroupCoordinator] = None
_FLASHCOMM2_ODP: Optional[GroupCoordinator] = None
_FC3_QUANT_X: Optional[GroupCoordinator] = None

# shared_weight across rank groups
_SHARED_WEIGHT: Optional[GroupCoordinator] = None
@@ -241,6 +242,15 @@ def init_ascend_model_parallel(parallel_config: ParallelConfig, ):
assert flashcomm2_otp_size == 1, "flashcomm2_o_shared is only supported when flashcomm2_otp_size is 1"
_SHARED_WEIGHT = _create_shared_weight_group("flashcomm2_o_shared")

if get_ascend_config().multistream_overlap_gate:
global _FC3_QUANT_X
group_ranks = all_ranks.unbind(0)
group_ranks = [x.tolist() for x in group_ranks]
_FC3_QUANT_X = init_model_parallel_group(group_ranks,
get_world_group().local_rank,
backend,
group_name="fc3_quant_x")


def model_parallel_initialized():
return (_MC2 is not None)
@@ -296,6 +306,11 @@ def get_p_tp_group() -> GroupCoordinator:
return _P_TP


def get_fc3_quant_x_group() -> GroupCoordinator:
assert _FC3_QUANT_X is not None, ("fc3 quant x group is not initialized")
return _FC3_QUANT_X


def destroy_ascend_model_parallel():
global _MC2
if _MC2:
@@ -343,3 +358,8 @@ def destroy_ascend_model_parallel():
if _SHARED_WEIGHT:
_SHARED_WEIGHT.destroy()
_SHARED_WEIGHT = None

global _FC3_QUANT_X
if _FC3_QUANT_X:
_FC3_QUANT_X.destroy()
_FC3_QUANT_X = None

+ 32
- 1
vllm_ascend/distributed/utils.py View File

@@ -2,8 +2,11 @@ import os

import torch
import torch.distributed as dist
from vllm.forward_context import get_forward_context

from vllm_ascend.distributed.parallel_state import get_p_tp_group
from vllm_ascend.distributed.parallel_state import (get_dp_group,
get_fc3_quant_x_group,
get_p_tp_group)


def kv_alltoall_and_rearrange(pd_tp_ratio: int, key: torch.Tensor,
@@ -59,3 +62,31 @@ def get_transfer_timeout_value():
'7')) # type: ignore
return int((4.096 * (2**hccl_rdma_timeout)) * hccl_rdma_retry_cnt // 1000 +
3000)


def fc3_all_gather_and_maybe_unpad_impl(x: torch.Tensor, ) -> torch.Tensor:
try:
forward_context = get_forward_context()
except AssertionError:
return x
x = get_fc3_quant_x_group().all_gather(x, 0)
dp_metadata = forward_context.dp_metadata
if dp_metadata is None:
pad_size = forward_context.pad_size
if pad_size > 0:
x = x[:-pad_size]
else:
# unpad
num_tokens_across_dp_cpu = dp_metadata.num_tokens_across_dp_cpu
result = torch.empty((num_tokens_across_dp_cpu.sum(), *x.shape[1:]),
device=x.device,
dtype=x.dtype)
dp_size = get_dp_group().world_size
x = x.view(dp_size, forward_context.padded_length, *x.shape[1:])
offset = 0
for idx in range(dp_size):
num_tokens_dp = num_tokens_across_dp_cpu[idx]
result[offset:offset + num_tokens_dp] = x[idx, :num_tokens_dp]
offset += num_tokens_dp
x = result
return x

+ 42
- 0
vllm_ascend/flash_common3_context.py View File

@@ -0,0 +1,42 @@
from dataclasses import dataclass
from typing import Optional

import torch
from vllm.model_executor.layers.linear import LinearBase


@dataclass
class FlashCommon3Context:
gate: Optional[LinearBase] = None
topk_weights: Optional[torch.Tensor] = None
topk_ids: Optional[torch.Tensor] = None
row_idx: Optional[torch.Tensor] = None
shared_experts: Optional[torch.nn.Module] = None
shared_out: Optional[torch.Tensor] = None


_flash_common3_context: Optional[FlashCommon3Context] = None


def get_flash_common3_context() -> Optional[FlashCommon3Context]:
return _flash_common3_context


def set_flash_common3_context(
topk_weights: Optional[torch.Tensor] = None,
topk_ids: Optional[torch.Tensor] = None,
shared_experts: Optional[torch.nn.Module] = None,
shared_out: Optional[torch.Tensor] = None,
):
global _flash_common3_context
if _flash_common3_context is None:
_flash_common3_context = FlashCommon3Context()

if topk_weights is not None:
_flash_common3_context.topk_weights = topk_weights
if topk_ids is not None:
_flash_common3_context.topk_ids = topk_ids
if shared_experts is not None:
_flash_common3_context.shared_experts = shared_experts
if shared_out is not None:
_flash_common3_context.shared_out = shared_out

+ 87
- 21
vllm_ascend/ops/fused_moe/fused_moe.py View File

@@ -37,9 +37,12 @@ from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.distributed.parallel_state import get_mc2_group
from vllm_ascend.eplb.core.eplb_utils import determine_default_log2phy_map
from vllm_ascend.eplb.utils import moe_load_async_stream
from vllm_ascend.flash_common3_context import (get_flash_common3_context,
set_flash_common3_context)
from vllm_ascend.ops.expert_load_balancer import ExpertLoadBalancer
from vllm_ascend.ops.fused_moe.experts_selector import select_experts
from vllm_ascend.ops.fused_moe.moe_comm_method import setup_moe_comm_method
from vllm_ascend.ops.fused_moe.moe_comm_method import (AllGatherCommImpl,
setup_moe_comm_method)
from vllm_ascend.ops.fused_moe.prepare_finalize import QuantType
from vllm_ascend.quantization.w4a8_dynamic import \
AscendW4A8DynamicFusedMoEMethod
@@ -139,6 +142,7 @@ class AscendUnquantizedFusedMoEMethod(UnquantizedFusedMoEMethod):

class AscendFusedMoE(FusedMoE):
moe_counter = -1
gate_stream: Optional[torch.npu.Stream] = None

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
@@ -170,6 +174,10 @@ class AscendFusedMoE(FusedMoE):
self.expert_map_path = ascend_config.expert_map_path
self.global_redundant_expert_num = ascend_config.init_redundancy_expert
self.global_num_experts = num_experts + self.global_redundant_expert_num
# flashcommon3 gate stream
self.multistream_overlap_gate = ascend_config.multistream_overlap_gate
if self.multistream_overlap_gate and AscendFusedMoE.gate_stream is None:
AscendFusedMoE.gate_stream = torch.npu.Stream()
if self.custom_routing_function is None and self.e_score_correction_bias is not None:
vllm_config = get_current_vllm_config()
self.e_score_correction_bias.data = self.e_score_correction_bias.data.to(
@@ -332,6 +340,47 @@ class AscendFusedMoE(FusedMoE):
enable_force_load_balance = forward_context.in_profile_run

forward_context = get_forward_context()
if self.multistream_overlap_gate:
assert AscendFusedMoE.gate_stream is not None
fc3_context = get_flash_common3_context()
assert fc3_context is not None
AscendFusedMoE.gate_stream.wait_stream(torch.npu.current_stream())
with npu_stream_switch(AscendFusedMoE.gate_stream,
enabled=self.multistream_overlap_gate):
# share_expert
assert fc3_context.shared_experts is not None
shared_out = fc3_context.shared_experts(hidden_states)
# NOTE: This is exactly the opposite of `maybe_all_reduce_tensor_model_parallel`
moe_comm_type = forward_context.moe_comm_type
if moe_comm_type in {MoECommType.ALLTOALL, MoECommType.MC2} \
and not shared_expert_dp_enabled():
shared_out = tensor_model_parallel_all_reduce(shared_out)
set_flash_common3_context(shared_out=shared_out)

topk_weights, topk_ids = select_experts(
hidden_states=hidden_states,
router_logits=router_logits,
top_k=self.top_k,
use_grouped_topk=self.use_grouped_topk,
renormalize=self.renormalize,
topk_group=self.topk_group,
num_expert_group=self.num_expert_group,
custom_routing_function=self.custom_routing_function,
scoring_func=self.scoring_func,
routed_scaling_factor=self.routed_scaling_factor,
e_score_correction_bias=self.e_score_correction_bias,
global_num_experts=self.global_num_experts)

if isinstance(forward_context.moe_comm_method,
AllGatherCommImpl):
topk_weights = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
topk_weights, True, True)
topk_ids = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
topk_ids, True, True)

set_flash_common3_context(topk_weights=topk_weights,
topk_ids=topk_ids)

hidden_states, router_logits, mc2_mask, context_metadata = forward_context.moe_comm_method.prepare(
hidden_states=hidden_states,
router_logits=router_logits,
@@ -339,6 +388,10 @@ class AscendFusedMoE(FusedMoE):
enable_shared_expert_dp=self.enable_shared_expert_dp,
quant_type=self.quant_type)

# Make sure the default stream waits for the gate stream to finish.
if self.multistream_overlap_gate:
torch.npu.current_stream().wait_stream(AscendFusedMoE.gate_stream)

if isinstance(hidden_states, tuple):
hidden_states, pertoken_scale = hidden_states
else:
@@ -407,6 +460,7 @@ class AscendSharedFusedMoE(SharedFusedMoE, AscendFusedMoE):
self.shared_expert_stream = None
ascend_config = get_ascend_config()
self.multistream_overlap_shared_expert = ascend_config.multistream_overlap_shared_expert
self.multistream_overlap_gate = ascend_config.multistream_overlap_gate
if enable_sp():
logger.info_once(
"Sequence parallelism is enabled, shared experts are replicated for best performance."
@@ -443,30 +497,42 @@ class AscendSharedFusedMoE(SharedFusedMoE, AscendFusedMoE):

def forward_impl(self, hidden_states: torch.Tensor,
router_logits: torch.Tensor):
# Make sure the shared experts stream begins after hidden_states are ready.
if self.multistream_overlap_shared_expert:
shared_experts_calculation_stream().wait_stream( # type: ignore
torch.npu.current_stream())
with npu_stream_switch(shared_experts_calculation_stream(),
enabled=self.multistream_overlap_shared_expert):
# Use a separate stream to run shared experts.
# Note that currently we only support calculations in separate streams with aclgraph.
# Communication operations in another stream might cause unknown errors.
shared_out = self._shared_experts(hidden_states)
shared_out = None
if not self.multistream_overlap_gate:
# Make sure the shared experts stream begins after hidden_states are ready.
if self.multistream_overlap_shared_expert:
shared_experts_calculation_stream(
).wait_stream( # type: ignore
torch.npu.current_stream())
with npu_stream_switch(
shared_experts_calculation_stream(),
enabled=self.multistream_overlap_shared_expert):
# Use a separate stream to run shared experts.
shared_out = self._shared_experts(hidden_states)
else:
set_flash_common3_context(shared_experts=self._shared_experts)

fused_output = AscendFusedMoE.forward_impl(
self,
hidden_states=hidden_states,
router_logits=router_logits,
)
# Make sure the default stream waits for the shared experts stream to finish.
if self.multistream_overlap_shared_expert:
torch.npu.current_stream().wait_stream(
shared_experts_calculation_stream())
# NOTE: This is exactly the opposite of `maybe_all_reduce_tensor_model_parallel`
forward_context = get_forward_context()
moe_comm_type = forward_context.moe_comm_type
if moe_comm_type in {MoECommType.ALLTOALL, MoECommType.MC2, MoECommType.FUSED_ALLTOALL} \
and not shared_expert_dp_enabled():
shared_out = tensor_model_parallel_all_reduce(shared_out)

if not self.multistream_overlap_gate:
# Make sure the default stream waits for the shared experts stream to finish.
if self.multistream_overlap_shared_expert:
torch.npu.current_stream().wait_stream(
shared_experts_calculation_stream())

# NOTE: This is exactly the opposite of `maybe_all_reduce_tensor_model_parallel`
forward_context = get_forward_context()
moe_comm_type = forward_context.moe_comm_type
if moe_comm_type in {MoECommType.ALLTOALL, MoECommType.MC2} \
and not shared_expert_dp_enabled():
shared_out = tensor_model_parallel_all_reduce(shared_out)
else:
fc3_context = get_flash_common3_context()
assert fc3_context is not None
shared_out = fc3_context.shared_out

return shared_out, fused_output

+ 29
- 5
vllm_ascend/ops/fused_moe/prepare_finalize.py View File

@@ -29,7 +29,10 @@ from vllm.distributed.parallel_state import (
from vllm.forward_context import get_forward_context
from vllm.model_executor.layers.fused_moe import FusedMoEConfig
from vllm_ascend.utils import enable_sp, prefill_context_parallel_enable
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.distributed.utils import fc3_all_gather_and_maybe_unpad_impl
from vllm_ascend.utils import (enable_sp, npu_stream_switch,
prefill_context_parallel_enable)
class QuantType(Enum):
@@ -49,9 +52,14 @@ class PrepareAndFinalize(ABC):
moe_config (FusedMoEConfig): Configuration object containing TP/DP/EP group info,
sizes, ranks, and communication settings.
"""
quant_stream: Optional[torch.npu.Stream] = None
def __init__(self, moe_config: FusedMoEConfig):
self.moe_config = moe_config
ascend_config = get_ascend_config()
self.multistream_overlap_gate = ascend_config.multistream_overlap_gate
if self.multistream_overlap_gate and PrepareAndFinalize.quant_stream is None:
PrepareAndFinalize.quant_stream = torch.npu.Stream()
@abstractmethod
def prepare(
@@ -335,12 +343,28 @@ class PrepareAndFinalizeWithAllGather(PrepareAndFinalize):
if quant_type == QuantType.W8A8:
hidden_states, pertoken_scale = torch_npu.npu_dynamic_quant(
hidden_states)
if self.multistream_overlap_gate:
assert PrepareAndFinalize.quant_stream is not None
PrepareAndFinalize.quant_stream.wait_stream(
torch.npu.current_stream())
with npu_stream_switch(PrepareAndFinalize.quant_stream,
enabled=self.multistream_overlap_gate):
hidden_states = fc3_all_gather_and_maybe_unpad_impl(
hidden_states)
else:
hidden_states = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
hidden_states, True, True)
router_logits = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
router_logits, True, True)
if pertoken_scale is not None:
pertoken_scale = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
pertoken_scale, True, True)
hidden_states = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
hidden_states, True, True)
router_logits = torch.ops.vllm.maybe_all_gather_and_maybe_unpad(
router_logits, True, True)
if self.multistream_overlap_gate:
torch.npu.current_stream().wait_stream(
PrepareAndFinalize.quant_stream)
if pertoken_scale is not None:
return (hidden_states, pertoken_scale), router_logits, None, None


+ 23
- 13
vllm_ascend/quantization/w8a8_dynamic.py View File

@@ -26,6 +26,7 @@ from vllm.forward_context import get_forward_context
from vllm_ascend.ascend_config import get_ascend_config
from vllm_ascend.ascend_forward_context import MoECommType
from vllm_ascend.distributed.parallel_state import get_mc2_group
from vllm_ascend.flash_common3_context import get_flash_common3_context
from vllm_ascend.ops.fused_moe.experts_selector import select_experts
from vllm_ascend.utils import ACL_FORMAT_FRACTAL_NZ, is_enable_nz

@@ -114,6 +115,7 @@ class AscendW8A8DynamicFusedMoEMethod:
self.use_aclgraph = (vllm_config.compilation_config.mode
== CompilationMode.VLLM_COMPILE
and not vllm_config.model_config.enforce_eager)
self.multistream_overlap_gate = ascend_config.multistream_overlap_gate

self.dynamic_eplb = ascend_config.dynamic_eplb or ascend_config.expert_map_record_path
self.in_dtype = vllm_config.model_config.dtype
@@ -198,19 +200,26 @@ class AscendW8A8DynamicFusedMoEMethod:
assert router_logits.shape[
1] == global_num_experts - global_redundant_expert_num, "Number of global experts mismatch (excluding redundancy)"

topk_weights, topk_ids = select_experts(
hidden_states=x,
router_logits=router_logits,
top_k=top_k,
use_grouped_topk=use_grouped_topk,
renormalize=renormalize,
topk_group=topk_group,
num_expert_group=num_expert_group,
custom_routing_function=custom_routing_function,
scoring_func=scoring_func,
e_score_correction_bias=e_score_correction_bias,
global_num_experts=global_num_experts)

if self.multistream_overlap_gate:
fc3_context = get_flash_common3_context()
assert fc3_context is not None
topk_weights = fc3_context.topk_weights
topk_ids = fc3_context.topk_ids
else:
topk_weights, topk_ids = select_experts(
hidden_states=x,
router_logits=router_logits,
top_k=top_k,
use_grouped_topk=use_grouped_topk,
renormalize=renormalize,
topk_group=topk_group,
num_expert_group=num_expert_group,
custom_routing_function=custom_routing_function,
scoring_func=scoring_func,
e_score_correction_bias=e_score_correction_bias,
global_num_experts=global_num_experts)
assert topk_ids is not None
assert topk_weights is not None
# this is a naive implementation for experts load balance so as
# to avoid accumulating too much tokens on a single rank.
# currently it is only activated when doing profile runs.
@@ -222,6 +231,7 @@ class AscendW8A8DynamicFusedMoEMethod:
topk_ids = torch.argsort(
random_matrix, dim=1)[:, :topk_ids.size(1)].to(topk_ids.dtype)

assert topk_weights is not None
topk_weights = topk_weights.to(self.in_dtype)

moe_comm_method = get_forward_context().moe_comm_method


Loading…
Cancel
Save
Baidu
map