PicoAudio2 / utils /lr_scheduler_utilities.py
rookie9's picture
Upload 77 files
f582ec6 verified
from typing import Any
import math
import copy
from torch.utils.data import DataLoader
def get_warmup_steps(
dataloader_one_pass_outside_steps: int,
warmup_steps: int | None = None,
warmup_epochs: float | None = None,
epoch_length: int | None = None,
) -> int:
"""
Derive warmup steps according to step number or epoch number.
If `warmup_steps` is provided, then just return it. Otherwise, derive
the warmup steps by epoch length and warmup epoch number.
"""
if warmup_steps is not None:
return warmup_steps
else:
if epoch_length is None:
epoch_length = dataloader_one_pass_outside_steps
assert warmup_epochs is not None, "warmup_steps and warmup_epochs cannot be both None"
return int(epoch_length * warmup_epochs)
def get_dataloader_one_pass_outside_steps(
train_dataloader: DataLoader,
num_processes: int = 1,
):
"""
dataloader length after DDP, close to `original_length / gpu_number`
"""
return math.ceil(len(train_dataloader) / num_processes)
def get_total_training_steps(
train_dataloader: DataLoader,
epochs: int,
num_processes: int = 1,
epoch_length: int | None = None
):
"""
Calculate the total number of "visible" training steps.
If `epoch_length` is provided, it is used as the fixed length for each epoch.
Otherwise, the function will determine the epoch length from `train_dataloader`.
Args:
train_dataloader:
Training dataloader object.
epochs:
The total number of epochs to run.
num_processes:
The number of parallel processes used for distributed training.
epoch_length:
A fixed number of training steps for each epoch. Defaults to None.
Returns:
int: The total number of training steps (i.e., `epochs * epoch_length`).
"""
# `epoch_length` is not None: fixed length for each epoch
if epoch_length is None:
# `epoch_length` is the length of DDP-wrapped `train_dataloader`
epoch_length = get_dataloader_one_pass_outside_steps(
train_dataloader, num_processes
)
return epochs * epoch_length
def get_dataloader_one_pass_steps_inside_accelerator(
dataloader_one_pass_steps: int, gradient_accumulation_steps: int,
num_processes: int
):
"""
Calculate the number of "visible" training steps for a single pass over the dataloader
inside an accelerator, accounting for gradient accumulation and distributed training.
Args:
dataloader_one_pass_steps:
The number of steps (batches) in one pass over the dataset.
gradient_accumulation_steps:
The number of steps to accumulate gradients before performing a parameter update.
num_processes:
The number of parallel processes used for distributed training.
Returns:
int: The total number of "visible" training steps for one pass over the dataset,
multiplied by the number of processes.
"""
return math.ceil(
dataloader_one_pass_steps / gradient_accumulation_steps
) * num_processes
def get_steps_inside_accelerator_from_outside_steps(
outside_steps: int, dataloader_one_pass_outside_steps: int,
dataloader_one_pass_steps_inside_accelerator: int,
gradient_accumulation_steps: int, num_processes: int
):
"""
Convert "outside" steps (as observed in wandb logger or similar context)
to the corresponding number of "inside" steps (for accelerate lr scheduler).
Specifically, accelerate lr scheduler call `step()` `num_processes` times for
every `gradient_accumulation_steps` outside steps.
Args:
outside_steps:
The total number of steps counted outside accelerate context.
dataloader_one_pass_outside_steps:
The number of steps (batches) to complete one pass of the dataloader
outside accelerate.
dataloader_one_pass_steps_inside_accelerator:
The number of `lr_scheduler.step()` calls inside accelerate, calculated via
`get_dataloader_one_pass_steps_inside_accelerator`.
gradient_accumulation_steps:
The number of steps to accumulate gradients.
num_processes:
The number of parallel processes (GPUs) used in distributed training.
Returns:
int: The total number of `lr_scheduler.step()` calls inside accelerate that
correspond to the given `outside_steps`.
"""
num_dataloader_epochs_passed = outside_steps // dataloader_one_pass_outside_steps
remaining_outside_steps = outside_steps % dataloader_one_pass_outside_steps
remaining_inside_accelerator_steps = (
remaining_outside_steps // gradient_accumulation_steps * num_processes
)
# accelerate scheduler call `step()` `num_processes` times every
# `gradient_accumulation_steps` steps:
# https://github.com/huggingface/accelerate/blob/main/src/accelerate/scheduler.py#L76
total_steps = (
num_dataloader_epochs_passed*
dataloader_one_pass_steps_inside_accelerator +
remaining_inside_accelerator_steps
)
return total_steps
def lr_scheduler_param_adapter(
config_dict: dict[str, Any], num_training_steps: int, num_warmup_steps: int
) -> dict[str, Any]:
target_class = config_dict["_target_"]
return_dict = copy.deepcopy(config_dict)
if target_class == "transformers.get_scheduler":
return_dict.update({
"num_training_steps": num_training_steps,
"num_warmup_steps": num_warmup_steps
})
return return_dict