@@ -14,6 +14,10 @@
# ============================================================================
"""Test cases for BlendedMegatronDatasetBuilder"""
import os
import subprocess
import time
import glob
from unittest.mock import patch
import pytest
import numpy as np
@@ -23,8 +27,87 @@ from mindformers.dataset.blended_datasets.blended_megatron_dataset_builder impor
_get_size_per_split_per_dataset
)
from mindformers.dataset.blended_datasets.blended_megatron_dataset_config import BlendedMegatronDatasetConfig
from mindformers.dataset.blended_datasets.utils import Split, compile_helpers
from mindformers.dataset.blended_datasets.utils import Split
from mindformers.dataset.blended_datasets import utils as blended_utils_module
from mindformers.tools.logger import logger
try:
from filelock import FileLock
HAS_FILELOCK = True
except ImportError:
FileLock = None
HAS_FILELOCK = False
def _check_helpers_exists(helpers_dir):
"""Check if helpers.so exists and is valid."""
so_pattern = os.path.join(helpers_dir, "helpers*.so")
existing_so_files = glob.glob(so_pattern)
return existing_so_files and any(os.path.getsize(f) > 1000 for f in existing_so_files)
def _compile_helpers_safe(helpers_dir, worker_id):
"""Compile helpers if not already compiled."""
if _check_helpers_exists(helpers_dir):
return
logger.info(f"[{worker_id}] Starting compilation...")
result = subprocess.run(["make", "-C", helpers_dir], capture_output=True, text=True, check=False)
if result.returncode != 0 and not _check_helpers_exists(helpers_dir):
raise RuntimeError(f"Failed to compile helpers: {result.stderr}")
logger.info(f"[{worker_id}] Compilation completed")
@pytest.fixture(scope="session", autouse=True)
def ensure_helpers_compiled(request, tmp_path_factory):
"""Ensure helpers are compiled once with process-safe locking for pytest-xdist."""
# Get worker_id if running with pytest-xdist, otherwise use 'master'
worker_id = getattr(request.config, 'workerinput', {}).get('workerid', 'master')
helpers_dir = os.path.abspath(os.path.dirname(blended_utils_module.__file__))
# Quick check: if already compiled, all workers skip immediately
if _check_helpers_exists(helpers_dir):
logger.info(f"[{worker_id}] helpers.so already exists, using directly")
yield
return
# Single process mode - compile directly
if worker_id == "master":
_compile_helpers_safe(helpers_dir, worker_id)
yield
return
# Parallel mode - use file lock
lock_file = tmp_path_factory.getbasetemp().parent / "helpers_compile.lock"
if HAS_FILELOCK:
with FileLock(str(lock_file), timeout=300):
if not _check_helpers_exists(helpers_dir):
_compile_helpers_safe(helpers_dir, worker_id)
else:
# Fallback: simple atomic lock without filelock library
for _ in range(600): # 5 min timeout (600 * 0.5s)
try:
fd = os.open(str(lock_file), os.O_CREAT | os.O_EXCL | os.O_WRONLY)
try:
if not _check_helpers_exists(helpers_dir):
_compile_helpers_safe(helpers_dir, worker_id)
finally:
os.close(fd)
os.unlink(str(lock_file))
break
except FileExistsError:
time.sleep(0.5)
if _check_helpers_exists(helpers_dir):
break
else:
raise TimeoutError("Timeout waiting for helpers compilation")
yield
class DummyTokenizer:
"""A dummy tokenizer for testing purposes"""
@@ -166,7 +249,7 @@ def create_test_builder(config_kwargs=None, builder_kwargs=None):
class TestBlendedMegatronDatasetBuilder:
"""Test class for BlendedMegatronDatasetBuilder"""
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_builder_initialization(self):
@@ -191,7 +274,7 @@ class TestBlendedMegatronDatasetBuilder:
assert builder.is_built_on_rank is is_built_on_rank_func
assert builder.config == config
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_builder_initialization_assertion_error(self):
@@ -213,7 +296,7 @@ class TestBlendedMegatronDatasetBuilder:
config=config
)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_get_size_per_split_per_dataset(self):
@@ -236,7 +319,7 @@ class TestBlendedMegatronDatasetBuilder:
assert result[0] == expected_0
assert result[1] == expected_1
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_generic_dataset(self):
@@ -270,7 +353,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(dataset, DummyMegatronDataset)
assert len(dataset) == num_samples
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_generic_dataset_with_oserror(self):
@@ -293,7 +376,7 @@ class TestBlendedMegatronDatasetBuilder:
True
)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_generic_dataset_distributed_rank_nonzero(self):
@@ -324,7 +407,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(dataset, DummyMegatronDataset)
assert len(dataset) == 5
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_generic_dataset_distributed_rank_zero_not_built(self):
@@ -354,7 +437,7 @@ class TestBlendedMegatronDatasetBuilder:
assert dataset is None
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_method_mock_dataset(self):
@@ -380,7 +463,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == len(Split)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_method_mock_dataset_failure(self):
@@ -427,7 +510,7 @@ class TestBlendedMegatronDatasetBuilder:
match="FailingDummyMegatronDataset failed to build as a mock data generator"):
builder.build()
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_blended_dataset_single_prefix(self):
@@ -455,7 +538,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == len(Split)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_with_blend_per_split(self):
@@ -484,7 +567,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == len(Split)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_with_blend_per_split_single_prefix(self):
@@ -513,7 +596,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == len(Split)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_with_blend_weights_and_size(self):
@@ -522,7 +605,6 @@ class TestBlendedMegatronDatasetBuilder:
Description: Test build method works with blend configuration having weights and size
Expectation: Method builds datasets correctly with weights processing
"""
compile_helpers()
config = create_test_config()
config.mock = False
config.blend = (["prefix1", "prefix2"], [0.3, 0.7])
@@ -542,7 +624,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == len(Split)
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_verification_logic(self):
@@ -592,7 +674,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == 1
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_verification_logic_cached_no_check(self):
@@ -637,7 +719,7 @@ class TestBlendedMegatronDatasetBuilder:
assert isinstance(datasets, list)
assert len(datasets) == 1
@pytest.mark.level0
@pytest.mark.level1
@pytest.mark.platform_x86_cpu
@pytest.mark.env_onecard
def test_build_parallel_datasets(self):
@@ -646,7 +728,6 @@ class TestBlendedMegatronDatasetBuilder:
Description: Test parallel building of megatron datasets
Expectation: Method builds datasets in parallel correctly
"""
compile_helpers()
config = create_test_config()
config.mock = False
config.blend = (["prefix1", "prefix2"], [0.5, 0.5])