@@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
# ============================================================================
"""Trainer API For Import ."""
"""LLMTrainer For Graph Mode ."""
import os
import subprocess
from pprint import pprint
@@ -95,6 +95,8 @@ class LLMTrainer:
self.common_restore_info = None
self.network_delay_inited = False
self.optimizer_delay_inited = False
self.lr_scheduler = None
self.grouped_lr_scheduler = None
def _setup_config(self, config: MindFormerConfig, is_train: bool = True) -> None:
"""Initialize and setup configuration for training or inference.
@@ -286,7 +288,7 @@ class LLMTrainer:
self.config.model.model_config = self.config.model_config
del self.config.model_config
def _set_and_logging_training_step(self, dataset ) -> None:
def _set_and_logging_training_step(self) -> None:
"""Check runner config and set training step parameters.
This method calculates and configures the training steps based on the dataset size,
@@ -294,9 +296,6 @@ class LLMTrainer:
is enabled and sink_size is specified. It also sets initial epoch and step values
for training resumption.
Args:
dataset: Training dataset used to determine data size for calculations.
The method performs the following operations:
1. Gets the training dataset size
2. Sets original epochs value
@@ -305,7 +304,7 @@ class LLMTrainer:
5. Adjusts epochs calculation based on sink mode and sink size
6. Updates configuration with dataset size and training parameters
"""
data_size = self._get_train_dataset_size(dataset )
data_size = self._get_train_dataset_size()
new_epochs = self.config.training_args.epochs
self.config.training_args.origin_epochs = new_epochs
@@ -683,6 +682,61 @@ class LLMTrainer:
logger.info(f'load_checkpoint config is None, which is invalid for run_mode: {self.config.run_mode}')
return load_checkpoint_path_or_file
def _set_learning_rate_scheduler(
self,
lr_scheduler: Optional[nn.learning_rate_schedule.LearningRateSchedule] = None,
grouped_lr_scheduler: List[dict] = None) -> None:
"""Set learning rate scheduler(s) for model training.
This method configures the learning rate scheduling strategy for the optimizer.
It supports two modes:
1. Standard mode: A single learning rate scheduler applied to all model parameters
2. Grouped mode: Different learning rate schedulers for different parameter groups
The grouped learning rate scheduler allows fine-grained control over learning rates
for different parts of the model (e.g., different layers, embeddings, or attention
mechanisms), which is useful for transfer learning, fine-tuning, or when different
components require different learning rate schedules.
Args:
lr_scheduler (Optional[nn.learning_rate_schedule.LearningRateSchedule]):
A single learning rate scheduler instance to be applied uniformly to all
model parameters. This is used when all parameters should follow the same
learning rate schedule. Examples include CosineAnnealingLR, PolynomialDecayLR, etc.
If None, the scheduler will not be set (useful when only grouped schedulers are used).
grouped_lr_scheduler (List[dict], optional):
A list of dictionaries, where each dictionary represents a parameter group
with its own learning rate scheduler. Each dictionary should contain:
- 'params': List[str] - Parameter name patterns to match (supports wildcards)
- 'lr_scheduler': LearningRateSchedule - The scheduler instance for this group
- 'lr_config': MindFormerConfig - Configuration used to create the scheduler
This allows different parameter groups to have independent learning rate schedules.
For example, embeddings might use a different schedule than transformer layers.
If None, grouped learning rate scheduling will not be used.
Note:
- Both schedulers can be set simultaneously. When both are provided, the grouped
scheduler takes precedence for matching parameters, while the standard scheduler
applies to unmatched parameters.
- This method is typically called during optimizer creation in
`create_optimizer_scheduler` method.
- The method will log informational messages when schedulers are successfully set.
Side Effects:
- Sets `self.lr_scheduler` if lr_scheduler is provided
- Sets `self.grouped_lr_scheduler` if grouped_lr_scheduler is provided
- Logs configuration information for debugging purposes
"""
if lr_scheduler is not None:
self.lr_scheduler = lr_scheduler
logger.info("The default learning rate scheduler has been set.")
if grouped_lr_scheduler is not None:
self.grouped_lr_scheduler = grouped_lr_scheduler
logger.info("The group learning rate scheduler has been set.")
def _train_dataset_restore_from_checkpoint(self, dataset: GeneratorDataset,
load_checkpoint_path_or_file: Optional[str]) -> None:
"""Restore training dataset state from checkpoint.
@@ -1137,7 +1191,7 @@ class LLMTrainer:
return model
def create_optimizer_scheduler(self, network: nn.Cell, dataset: GeneratorDataset ) -> nn.Optimizer:
def create_optimizer_scheduler(self, network: nn.Cell) -> nn.Optimizer:
"""Create optimizer and learning rate scheduler for model training.
This method constructs an optimizer with grouped parameters and learning rate schedule
@@ -1146,7 +1200,6 @@ class LLMTrainer:
Args:
network (nn.Cell): The neural network model to optimize.
dataset (GeneratorDataset): Training dataset used for learning rate scheduling.
Returns:
nn.Optimizer: Configured optimizer with learning rate scheduler.
@@ -1165,7 +1218,13 @@ class LLMTrainer:
logger.info("Building optimizer and learning rate scheduler from configuration...")
# Build learning rate schedule
lr_schedule = self.create_lr_scheduler(dataset)
is_grouped_lr_scheduler = self.config.grouped_lr_schedule is not None
grouped_lr_scheduler = None
if is_grouped_lr_scheduler:
lr_scheduler, grouped_lr_scheduler = self.create_grouped_lr_scheduler()
else:
lr_scheduler = self.create_lr_scheduler()
self._set_learning_rate_scheduler(lr_scheduler, grouped_lr_scheduler)
optimizer_type = self.config.optimizer.type
@@ -1181,7 +1240,10 @@ class LLMTrainer:
# Group parameters with weight decay
weight_decay = self.config.optimizer.weight_decay if self.config.optimizer.weight_decay else 0.
group_params = get_optimizer_grouped_parameters(
network, weight_decay, optimizer_type=optimizer_type, model_params=model_params)
network, weight_decay,
optimizer_type=optimizer_type,
grouped_lr_schedule=grouped_lr_scheduler,
model_params=model_params)
def create_optimizer() -> nn.Optimizer:
"""Internal function to create optimizer instance.
@@ -1193,8 +1255,8 @@ class LLMTrainer:
ValueError: If learning_rate is not set in optimizer config.
"""
optimizer_kwargs = {"params": group_params}
if lr_schedule is not None:
optimizer_kwargs["learning_rate"] = lr_schedule
if lr_scheduler is not None:
optimizer_kwargs["learning_rate"] = lr_scheduler
else:
if self.config.optimizer.learning_rate is None:
raise ValueError("lr_schedule is None, please set learning_rate in optimizer config.")
@@ -1221,68 +1283,81 @@ class LLMTrainer:
logger.info("Optimizer and learning rate scheduler created successfully")
return optimizer
def create_lr_scheduler(self, dataset) -> nn.learning_rate_schedule.LearningRateSchedule:
def create_lr_scheduler(
self, lr_schedule_config: MindFormerConfig = None) -> nn.learning_rate_schedule.LearningRateSchedule:
"""Create learning rate scheduler based on configuration.
This method builds a learning rate scheduler by processing the configuration settings
and calculating the appropriate parameters for the learning rate schedule.
and calculating the appropriate parameters for the learning rate schedule. It handles
warmup configuration, total steps calculation, and scheduler instantiation.
Args:
dataset: Training dataset used to calculate total training steps.
lr_schedule_config (MindFormerConfig, optional):
Learning rate scheduler configuration. If None, uses `self.config.lr_schedule`.
The configuration should contain:
- `type`: str - Scheduler type (e.g., "CosineWithWarmUpLR", "PolynomialWithWarmUpLR",
"ConstantWarmUpLR", "CosineAnnealingLR", etc.)
- `learning_rate`: float - Base learning rate value
- `warmup_ratio`: float, optional - Warmup ratio (required if warmup_epochs is set)
- `warmup_steps`: int, optional - Number of warmup steps (can be set directly)
- `warmup_lr_init`: float, optional - Initial learning rate during warmup
- `total_steps`: int, optional - Total training steps (-1 means auto-calculate)
- Other scheduler-specific parameters (e.g., `min_lr`, `max_lr`, `decay_steps`, etc.)
Returns:
object: Built learning rate scheduler instance.
Raises:
ValueError: If warmup_epochs is not a non-negative integer.
The method performs the following operations:
- Calculates total training steps based on dataset size and training configuration
- Processes warmup settings (epochs or ratio) and converts warmup_epochs to warmup_steps
- Handles conflicts between warmup_epochs and warmup_ratio settings
- Builds the learning rate scheduler using the `build_lr` function
- Applies default warmup_lr_init value if not explicitly set
nn.learning_rate_schedule.LearningRateSchedule:
Built learning rate scheduler instance. The scheduler will dynamically adjust
the learning rate during training based on the current step. Returns None if
no valid configuration is provided (both lr_schedule_config and self.config.lr_schedule
are None or empty).
Note:
- Total steps calculation:
- If `sink_mode` is False: total_steps = epochs * train_dataset_size
- If `sink_mode` is True: total_steps = epochs * sink_size
- If `total_steps` is None or -1 in config, it will be auto-calculated
- `warmup_epochs` is converted to `warmup_steps` using: warmup_steps = warmup_epochs * train_dataset_size
- The method modifies the input config by popping `warmup_epochs` (if present)
- If `warmup_lr_init` is not set and the scheduler supports it, a default value will be used
- Supported scheduler types include: CosineWithWarmUpLR, PolynomialWithWarmUpLR,
ConstantWarmUpLR, CosineAnnealingLR, CosineWithRestartsAndWarmUpLR, etc.
Example:
Configuration example:
```
lr_schedule:
type: "CosineWithWarmUpLR"
learning_rate: 1e-4
warmup_ratio: 0.1
total_steps: -1 # Auto-calculate
```
"""
logger.info("Building learning rate scheduler from configuration...")
train_data_size = self._get_train_dataset_size(dataset)
train_dataset _size = self._get_train_dataset_size()
warmup_lr_init = None
lr_schedule_config = self.config.lr_schedule if lr_schedule_config is None else lr_schedule_config
if self.config.lr_schedule:
warmup_epochs = self.config.lr_schedule.pop("warmup_epochs", None)
warmup_lr_init = self.config.lr_schedule.get("warmup_lr_init", None)
if warmup_epochs is not None:
if not isinstance(warmup_epochs, int):
raise ValueError(f"The type of warmup_epochs must be int, but got type {type(warmup_epochs)}.")
if warmup_epochs < 0:
raise ValueError(f"The value of warmup_epochs must be non-negative integer, "
f"but got {warmup_epochs}.")
if lr_schedule_config:
warmup_lr_init = lr_schedule_config.get("warmup_lr_init", 0.)
warmup_steps = lr_schedule_config.get("warmup_steps", 0)
warmup_ratio = lr_schedule_config.get("warmup_ratio", 0.)
lr_schedule_config.warmup_steps = warmup_steps
lr_schedule_config.warmup_ratio = warmup_ratio
# Calculate total training steps
if not self.config.training_args.sink_mode:
total_steps = int(self.config.training_args.epochs * train_data_size)
total_steps = int(self.config.training_args.epochs * train_dataset_size)
else:
total_steps = int(self.config.training_args.epochs * self.config.training_args.sink_size)
# Handle conflicts between warmup_epochs and warmup_ratio
if warmup_epochs is not None and self.config.lr_schedule.warmup_ratio is not None:
logger.warning("warmup_epochs and warmup_ratio are set simultaneously, "
"warmup_ratio takes precedence.")
warmup_epochs = None
# Convert warmup_epochs to warmup_steps
if warmup_epochs is not None:
logger.info("warmup_epochs was set in lr_schedule, "
"converting to warmup_steps based on dataset size")
self.config.lr_schedule.warmup_steps = int(warmup_epochs * train_data_size)
# Set total_steps in lr_schedule
self.config.lr_schedule.total_steps = total_steps \
if self.config.lr_schedule.total_steps is None or self.config.lr_schedule.total_steps == -1 \
else int(self.config.lr_schedule.total_steps)
# Set total_steps in `lr_schedule` if not explicitly defined
if lr_schedule_config.total_steps is None or lr_schedule_config.total_steps == -1:
lr_schedule_config.total_steps = total_steps
else:
lr_schedule_config.total_steps = int(lr_schedule_config.total_steps)
# Build learning rate scheduler
lr_schedule = build_lr(self.config. lr_schedule)
lr_schedule = build_lr(lr_schedule_config)
# Apply default warmup_lr_init if not set
if lr_schedule and hasattr(lr_schedule, "warmup_lr_init") and warmup_lr_init is None:
@@ -1291,6 +1366,95 @@ class LLMTrainer:
logger.info("Learning rate scheduler created successfully")
return lr_schedule
def create_grouped_lr_scheduler(self) -> tuple[nn.learning_rate_schedule.LearningRateSchedule, list[dict]]:
"""Create grouped learning rate schedulers from configuration.
This method builds a set of learning rate schedulers for different parameter groups,
allowing fine-grained control over learning rates for different parts of the model.
It creates a default scheduler for unmatched parameters and multiple group-specific
schedulers based on parameter name patterns.
The configuration structure (`self.config.grouped_lr_schedule`) should contain:
- `default`: Configuration for the default learning rate scheduler (applied to all
parameters that don't match any group patterns)
- `grouped`: A list of dictionaries, each containing:
- `params`: List[str] - Parameter name patterns to match (supports wildcards via fnmatch)
- Other LR scheduler configuration keys (e.g., `type`, `learning_rate`, `warmup_steps`, etc.)
This is particularly useful for:
- Transfer learning: Different learning rates for pretrained vs. new layers
- Fine-tuning: Lower learning rates for embeddings, higher for task-specific layers
- Layer-wise decay: Gradually decreasing learning rates from top to bottom layers
- Component-specific schedules: Different schedules for attention, MLP, embeddings, etc.
Returns:
tuple[nn.learning_rate_schedule.LearningRateSchedule, list[dict]]:
A tuple containing:
- Default learning rate scheduler: Applied to parameters that don't match
any group patterns in the grouped configuration
- Grouped learning rate scheduler list: A list of dictionaries, each containing:
- 'params': List[str] - Parameter name patterns for this group
- 'lr_scheduler': LearningRateSchedule - The scheduler instance for this group
- 'lr_config': MindFormerConfig - The configuration used to create this scheduler
Raises:
ValueError: If any group configuration in `grouped` is missing the 'params' field
or if 'params' is empty.
Note:
- Parameter matching is done by name patterns, supporting wildcards (e.g., "*.embedding*")
- The default scheduler is always created and will be used for unmatched parameters
- Each group can have its own independent learning rate schedule configuration
- The method modifies the input configuration dictionaries by popping 'params'
Example:
Configuration structure:
```
grouped_lr_schedule:
default:
type: "ConstantWarmUpLR"
learning_rate: 1.e-4
warmup_ratio: 0.
total_steps: -1
grouped:
- params: ["embedding*"]
type: "CosineWithWarmUpLR"
learning_rate: 1.e-5
warmup_ratio: 0.
total_steps: -1
- params: ["*.self_attention*"]
type: "PolynomialWithWarmUpLR"
learning_rate: 2.e-4
warmup_ratio: 0.
total_steps: -1
```
"""
logger.info("Building grouped learning rate scheduler from configuration...")
default_lr_schedule_config = self.config.grouped_lr_schedule.default
default_lr_scheduler = self.create_lr_scheduler(default_lr_schedule_config)
grouped_lr_scheduler = []
grouped_config = self.config.grouped_lr_schedule.grouped
# Iterate over each grouped LR configuration
for lr_config in grouped_config:
params = lr_config.pop('params', None)
if not params or not isinstance(params, list):
raise ValueError(
"Got invalid 'params' in grouped_lr_schedule.grouped: each item must include "
"a non-empty 'params' list."
)
lr_config = MindFormerConfig(**lr_config)
lr_scheduler = self.create_lr_scheduler(lr_config)
grouped_lr_scheduler.append({
'params': params,
'lr_scheduler': lr_scheduler,
'lr_config': lr_config
})
return default_lr_scheduler, grouped_lr_scheduler
def create_model_wrapper(self, network: nn.Cell, optimizer: nn.Optimizer) -> Union[
MFPipelineWithLossScaleCell, MFTrainOneStepCell]:
"""Create model wrapper with training tools and configurations.
@@ -1339,6 +1503,8 @@ class LLMTrainer:
use_skip_data_by_global_norm=use_skip_data_by_global_norm,
global_norm_spike_threshold=global_norm_spike_threshold,
print_separate_loss=self.config.training_args.get("print_separate_loss", True),
lr_scheduler=self.lr_scheduler,
grouped_lr_scheduler=self.grouped_lr_scheduler,
)
logger.info("Created MFPipelineWithLossScaleCell model wrapper for pipeline parallel training")
else:
@@ -1355,6 +1521,8 @@ class LLMTrainer:
use_skip_data_by_global_norm=use_skip_data_by_global_norm,
global_norm_spike_threshold=global_norm_spike_threshold,
print_separate_loss=self.config.training_args.get("print_separate_loss", True),
lr_scheduler=self.lr_scheduler,
grouped_lr_scheduler=self.grouped_lr_scheduler,
)
logger.info("Created MFTrainOneStepCell model wrapper for standard training")
@@ -1489,7 +1657,7 @@ class LLMTrainer:
self._train_dataset_restore_from_checkpoint(train_dataset, load_checkpoint_path_or_file)
# Configure training steps and epochs
self._set_and_logging_training_step(train_dataset )
self._set_and_logging_training_step()
# Record configuration to global environment
self._record_config_to_global_envs()
@@ -1507,7 +1675,7 @@ class LLMTrainer:
self._count_parameters(network, run_mode=self.config.run_mode)
# Create optimizer and learning rate scheduler
optimizer = self.create_optimizer_scheduler(network, train_dataset )
optimizer = self.create_optimizer_scheduler(network)
# Wrap model with training tools
model_forward_and_backward_wrapper = self.create_model_wrapper(network, optimizer)
@@ -1807,7 +1975,7 @@ class LLMTrainer:
if units == 'M':
count_params_m = sum(total_params) / 1e6
trainable_params_m = sum(trainable_params) / 1e6
if run_mode in ['train', 'finetune']:
if run_mode in ['train', 'finetune']:
logger.info(f"Network Parameters: {count_params_m:.0f} M.")
logger.info(f"Network Trainable Parameters: {trainable_params_m:.0f} M.")
else:
@@ -1815,7 +1983,7 @@ class LLMTrainer:
elif units == 'B':
count_params_m = sum(total_params) / 1e9
trainable_params_m = sum(trainable_params) / 1e9
if run_mode in ['train', 'finetune']:
if run_mode in ['train', 'finetune']:
logger.info(f"Network Parameters: {count_params_m:.1f} B.")
logger.info(f"Network Trainable Parameters: {trainable_params_m:.1f} B.")
else:
@@ -1894,21 +2062,19 @@ class LLMTrainer:
raise ValueError(f"{checkpoint} does not exist, please check load_checkpoint in yaml and set a correct value.")
@staticmethod
def _get_train_dataset_size(train_dataset: GeneratorDataset) -> int:
def _get_train_dataset_size(self) -> int:
"""Get the size of the training dataset.
This static method retrieves the total number of samples in the
training dataset, which is used for calculating training steps
and epochs.
Args:
train_dataset (GeneratorDataset): Training dataset instance.
Returns:
int: Total number of samples in the training dataset.
"""
return train_dataset.get_dataset_size()
if self.train_dataset is None:
raise RuntimeError("Please set train_dataset in yaml.")
return self.train_dataset.get_dataset_size()
@staticmethod
def _check_auto_parallel_mode_valid() -> bool: