Spaces:
Running
on
Zero
Running
on
Zero
| from typing import Iterator, List, Optional, Union | |
| from collections import Counter | |
| import logging | |
| from operator import itemgetter | |
| import random | |
| import numpy as np | |
| from torch.utils.data import DistributedSampler | |
| from torch.utils.data.sampler import Sampler | |
| LOGGER = logging.getLogger(__name__) | |
| from torch.utils.data import Dataset, Sampler | |
| class DatasetFromSampler(Dataset): | |
| """Dataset to create indexes from `Sampler`. | |
| Args: | |
| sampler: PyTorch sampler | |
| """ | |
| def __init__(self, sampler: Sampler): | |
| """Initialisation for DatasetFromSampler.""" | |
| self.sampler = sampler | |
| self.sampler_list = None | |
| def __getitem__(self, index: int): | |
| """Gets element of the dataset. | |
| Args: | |
| index: index of the element in the dataset | |
| Returns: | |
| Single element by index | |
| """ | |
| if self.sampler_list is None: | |
| self.sampler_list = list(self.sampler) | |
| return self.sampler_list[index] | |
| def __len__(self) -> int: | |
| """ | |
| Returns: | |
| int: length of the dataset | |
| """ | |
| return len(self.sampler) | |
| class BalanceClassSampler(Sampler): | |
| """Allows you to create stratified sample on unbalanced classes. | |
| Args: | |
| labels: list of class label for each elem in the dataset | |
| mode: Strategy to balance classes. | |
| Must be one of [downsampling, upsampling] | |
| Python API examples: | |
| .. code-block:: python | |
| import os | |
| from torch import nn, optim | |
| from torch.utils.data import DataLoader | |
| from catalyst import dl | |
| from catalyst.data import ToTensor, BalanceClassSampler | |
| from catalyst.contrib.datasets import MNIST | |
| train_data = MNIST(os.getcwd(), train=True, download=True, transform=ToTensor()) | |
| train_labels = train_data.targets.cpu().numpy().tolist() | |
| train_sampler = BalanceClassSampler(train_labels, mode=5000) | |
| valid_data = MNIST(os.getcwd(), train=False) | |
| loaders = { | |
| "train": DataLoader(train_data, sampler=train_sampler, batch_size=32), | |
| "valid": DataLoader(valid_data, batch_size=32), | |
| } | |
| model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.02) | |
| runner = dl.SupervisedRunner() | |
| # model training | |
| runner.train( | |
| model=model, | |
| criterion=criterion, | |
| optimizer=optimizer, | |
| loaders=loaders, | |
| num_epochs=1, | |
| logdir="./logs", | |
| valid_loader="valid", | |
| valid_metric="loss", | |
| minimize_valid_metric=True, | |
| verbose=True, | |
| ) | |
| """ | |
| def __init__(self, labels: List[int], mode: Union[str, int] = "downsampling"): | |
| """Sampler initialisation.""" | |
| super().__init__(labels) | |
| labels = np.array(labels) | |
| samples_per_class = {label: (labels == label).sum() for label in set(labels)} | |
| self.lbl2idx = { | |
| label: np.arange(len(labels))[labels == label].tolist() | |
| for label in set(labels) | |
| } | |
| if isinstance(mode, str): | |
| assert mode in ["downsampling", "upsampling"] | |
| if isinstance(mode, int) or mode == "upsampling": | |
| samples_per_class = ( | |
| mode if isinstance(mode, int) else max(samples_per_class.values()) | |
| ) | |
| else: | |
| samples_per_class = min(samples_per_class.values()) | |
| self.labels = labels | |
| self.samples_per_class = samples_per_class | |
| self.length = self.samples_per_class * len(set(labels)) | |
| def __iter__(self) -> Iterator[int]: | |
| """ | |
| Returns: | |
| iterator of indices of stratified sample | |
| """ | |
| indices = [] | |
| for key in sorted(self.lbl2idx): | |
| replace_flag = self.samples_per_class > len(self.lbl2idx[key]) | |
| indices += np.random.choice( | |
| self.lbl2idx[key], self.samples_per_class, replace=replace_flag | |
| ).tolist() | |
| assert len(indices) == self.length | |
| np.random.shuffle(indices) | |
| return iter(indices) | |
| def __len__(self) -> int: | |
| """ | |
| Returns: | |
| length of result sample | |
| """ | |
| return self.length | |
| class BatchBalanceClassSampler(Sampler): | |
| """ | |
| This kind of sampler can be used for both metric learning and classification task. | |
| BatchSampler with the given strategy for the C unique classes dataset: | |
| - Selection `num_classes` of C classes for each batch | |
| - Selection `num_samples` instances for each class in the batch | |
| The epoch ends after `num_batches`. | |
| So, the batch sise is `num_classes` * `num_samples`. | |
| One of the purposes of this sampler is to be used for | |
| forming triplets and pos/neg pairs inside the batch. | |
| To guarante existance of these pairs in the batch, | |
| `num_classes` and `num_samples` should be > 1. (1) | |
| This type of sampling can be found in the classical paper of Person Re-Id, | |
| where P (`num_classes`) equals 32 and K (`num_samples`) equals 4: | |
| `In Defense of the Triplet Loss for Person Re-Identification`_. | |
| Args: | |
| labels: list of classes labeles for each elem in the dataset | |
| num_classes: number of classes in a batch, should be > 1 | |
| num_samples: number of instances of each class in a batch, should be > 1 | |
| num_batches: number of batches in epoch | |
| (default = len(labels) // (num_classes * num_samples)) | |
| .. _In Defense of the Triplet Loss for Person Re-Identification: | |
| https://arxiv.org/abs/1703.07737 | |
| Python API examples: | |
| .. code-block:: python | |
| import os | |
| from torch import nn, optim | |
| from torch.utils.data import DataLoader | |
| from catalyst import dl | |
| from catalyst.data import ToTensor, BatchBalanceClassSampler | |
| from catalyst.contrib.datasets import MNIST | |
| train_data = MNIST(os.getcwd(), train=True, download=True) | |
| train_labels = train_data.targets.cpu().numpy().tolist() | |
| train_sampler = BatchBalanceClassSampler( | |
| train_labels, num_classes=10, num_samples=4) | |
| valid_data = MNIST(os.getcwd(), train=False) | |
| loaders = { | |
| "train": DataLoader(train_data, batch_sampler=train_sampler), | |
| "valid": DataLoader(valid_data, batch_size=32), | |
| } | |
| model = nn.Sequential(nn.Flatten(), nn.Linear(28 * 28, 10)) | |
| criterion = nn.CrossEntropyLoss() | |
| optimizer = optim.Adam(model.parameters(), lr=0.02) | |
| runner = dl.SupervisedRunner() | |
| # model training | |
| runner.train( | |
| model=model, | |
| criterion=criterion, | |
| optimizer=optimizer, | |
| loaders=loaders, | |
| num_epochs=1, | |
| logdir="./logs", | |
| valid_loader="valid", | |
| valid_metric="loss", | |
| minimize_valid_metric=True, | |
| verbose=True, | |
| ) | |
| """ | |
| def __init__( | |
| self, | |
| labels: Union[List[int], np.ndarray], | |
| num_classes: int, | |
| num_samples: int, | |
| num_batches: int = None, | |
| ): | |
| """Sampler initialisation.""" | |
| super().__init__(labels) | |
| classes = set(labels) | |
| assert isinstance(num_classes, int) and isinstance(num_samples, int) | |
| assert (1 < num_classes <= len(classes)) and (1 < num_samples) | |
| assert all( | |
| n > 1 for n in Counter(labels).values() | |
| ), "Each class shoud contain at least 2 instances to fit (1)" | |
| labels = np.array(labels) | |
| self._labels = list(set(labels.tolist())) | |
| self._num_classes = num_classes | |
| self._num_samples = num_samples | |
| self._batch_size = self._num_classes * self._num_samples | |
| self._num_batches = num_batches or len(labels) // self._batch_size | |
| self.lbl2idx = { | |
| label: np.arange(len(labels))[labels == label].tolist() | |
| for label in set(labels) | |
| } | |
| def batch_size(self) -> int: | |
| """ | |
| Returns: | |
| this value should be used in DataLoader as batch size | |
| """ | |
| return self._batch_size | |
| def batches_in_epoch(self) -> int: | |
| """ | |
| Returns: | |
| number of batches in an epoch | |
| """ | |
| return self._num_batches | |
| def __len__(self) -> int: | |
| """ | |
| Returns: | |
| number of samples in an epoch | |
| """ | |
| return self._num_batches # * self._batch_size | |
| def __iter__(self) -> Iterator[int]: | |
| """ | |
| Returns: | |
| indeces for sampling dataset elems during an epoch | |
| """ | |
| indices = [] | |
| for _ in range(self._num_batches): | |
| batch_indices = [] | |
| classes_for_batch = random.sample(self._labels, self._num_classes) | |
| while self._num_classes != len(set(classes_for_batch)): | |
| classes_for_batch = random.sample(self._labels, self._num_classes) | |
| for cls_id in classes_for_batch: | |
| replace_flag = self._num_samples > len(self.lbl2idx[cls_id]) | |
| batch_indices += np.random.choice( | |
| self.lbl2idx[cls_id], self._num_samples, replace=replace_flag | |
| ).tolist() | |
| indices.append(batch_indices) | |
| return iter(indices) | |
| class DynamicBalanceClassSampler(Sampler): | |
| """ | |
| This kind of sampler can be used for classification tasks with significant | |
| class imbalance. | |
| The idea of this sampler that we start with the original class distribution | |
| and gradually move to uniform class distribution like with downsampling. | |
| Let's define :math: D_i = #C_i/ #C_min where :math: #C_i is a size of class | |
| i and :math: #C_min is a size of the rarest class, so :math: D_i define | |
| class distribution. Also define :math: g(n_epoch) is a exponential | |
| scheduler. On each epoch current :math: D_i calculated as | |
| :math: current D_i = D_i ^ g(n_epoch), | |
| after this data samples according this distribution. | |
| Notes: | |
| In the end of the training, epochs will contain only | |
| min_size_class * n_classes examples. So, possible it will not | |
| necessary to do validation on each epoch. For this reason use | |
| ControlFlowCallback. | |
| Examples: | |
| >>> import torch | |
| >>> import numpy as np | |
| >>> from catalyst.data import DynamicBalanceClassSampler | |
| >>> from torch.utils import data | |
| >>> features = torch.Tensor(np.random.random((200, 100))) | |
| >>> labels = np.random.randint(0, 4, size=(200,)) | |
| >>> sampler = DynamicBalanceClassSampler(labels) | |
| >>> labels = torch.LongTensor(labels) | |
| >>> dataset = data.TensorDataset(features, labels) | |
| >>> loader = data.dataloader.DataLoader(dataset, batch_size=8) | |
| >>> for batch in loader: | |
| >>> b_features, b_labels = batch | |
| Sampler was inspired by https://arxiv.org/abs/1901.06783 | |
| """ | |
| def __init__( | |
| self, | |
| labels: List[Union[int, str]], | |
| exp_lambda: float = 0.9, | |
| start_epoch: int = 0, | |
| max_d: Optional[int] = None, | |
| mode: Union[str, int] = "downsampling", | |
| ignore_warning: bool = False, | |
| ): | |
| """ | |
| Args: | |
| labels: list of labels for each elem in the dataset | |
| exp_lambda: exponent figure for schedule | |
| start_epoch: start epoch number, can be useful for multi-stage | |
| experiments | |
| max_d: if not None, limit on the difference between the most | |
| frequent and the rarest classes, heuristic | |
| mode: number of samples per class in the end of training. Must be | |
| "downsampling" or number. Before change it, make sure that you | |
| understand how does it work | |
| ignore_warning: ignore warning about min class size | |
| """ | |
| assert isinstance(start_epoch, int) | |
| assert 0 < exp_lambda < 1, "exp_lambda must be in (0, 1)" | |
| super().__init__(labels) | |
| self.exp_lambda = exp_lambda | |
| if max_d is None: | |
| max_d = np.inf | |
| self.max_d = max_d | |
| self.epoch = start_epoch | |
| labels = np.array(labels) | |
| samples_per_class = Counter(labels) | |
| self.min_class_size = min(samples_per_class.values()) | |
| if self.min_class_size < 100 and not ignore_warning: | |
| LOGGER.warning( | |
| f"the smallest class contains only" | |
| f" {self.min_class_size} examples. At the end of" | |
| f" training, epochs will contain only" | |
| f" {self.min_class_size * len(samples_per_class)}" | |
| f" examples" | |
| ) | |
| self.original_d = { | |
| key: value / self.min_class_size for key, value in samples_per_class.items() | |
| } | |
| self.label2idxes = { | |
| label: np.arange(len(labels))[labels == label].tolist() | |
| for label in set(labels) | |
| } | |
| if isinstance(mode, int): | |
| self.min_class_size = mode | |
| else: | |
| assert mode == "downsampling" | |
| self.labels = labels | |
| self._update() | |
| def _update(self) -> None: | |
| """Update d coefficients.""" | |
| current_d = { | |
| key: min(value ** self._exp_scheduler(), self.max_d) | |
| for key, value in self.original_d.items() | |
| } | |
| samples_per_classes = { | |
| key: int(value * self.min_class_size) for key, value in current_d.items() | |
| } | |
| self.samples_per_classes = samples_per_classes | |
| self.length = np.sum(list(samples_per_classes.values())) | |
| self.epoch += 1 | |
| def _exp_scheduler(self) -> float: | |
| return self.exp_lambda**self.epoch | |
| def __iter__(self) -> Iterator[int]: | |
| """ | |
| Returns: | |
| iterator of indices of stratified sample | |
| """ | |
| indices = [] | |
| for key in sorted(self.label2idxes): | |
| samples_per_class = self.samples_per_classes[key] | |
| replace_flag = samples_per_class > len(self.label2idxes[key]) | |
| indices += np.random.choice( | |
| self.label2idxes[key], samples_per_class, replace=replace_flag | |
| ).tolist() | |
| assert len(indices) == self.length | |
| np.random.shuffle(indices) | |
| self._update() | |
| return iter(indices) | |
| def __len__(self) -> int: | |
| """ | |
| Returns: | |
| length of result sample | |
| """ | |
| return self.length | |
| class MiniEpochSampler(Sampler): | |
| """ | |
| Sampler iterates mini epochs from the dataset used by ``mini_epoch_len``. | |
| Args: | |
| data_len: Size of the dataset | |
| mini_epoch_len: Num samples from the dataset used in one | |
| mini epoch. | |
| drop_last: If ``True``, sampler will drop the last batches | |
| if its size would be less than ``batches_per_epoch`` | |
| shuffle: one of ``"always"``, ``"real_epoch"``, or `None``. | |
| The sampler will shuffle indices | |
| > "per_mini_epoch" - every mini epoch (every ``__iter__`` call) | |
| > "per_epoch" -- every real epoch | |
| > None -- don't shuffle | |
| Example: | |
| >>> MiniEpochSampler(len(dataset), mini_epoch_len=100) | |
| >>> MiniEpochSampler(len(dataset), mini_epoch_len=100, drop_last=True) | |
| >>> MiniEpochSampler(len(dataset), mini_epoch_len=100, | |
| >>> shuffle="per_epoch") | |
| """ | |
| def __init__( | |
| self, | |
| data_len: int, | |
| mini_epoch_len: int, | |
| drop_last: bool = False, | |
| shuffle: str = None, | |
| ): | |
| """Sampler initialisation.""" | |
| super().__init__(None) | |
| self.data_len = int(data_len) | |
| self.mini_epoch_len = int(mini_epoch_len) | |
| self.steps = int(data_len / self.mini_epoch_len) | |
| self.state_i = 0 | |
| has_reminder = data_len - self.steps * mini_epoch_len > 0 | |
| if self.steps == 0: | |
| self.divider = 1 | |
| elif has_reminder and not drop_last: | |
| self.divider = self.steps + 1 | |
| else: | |
| self.divider = self.steps | |
| self._indices = np.arange(self.data_len) | |
| self.indices = self._indices | |
| self.end_pointer = max(self.data_len, self.mini_epoch_len) | |
| if not (shuffle is None or shuffle in ["per_mini_epoch", "per_epoch"]): | |
| raise ValueError( | |
| "Shuffle must be one of ['per_mini_epoch', 'per_epoch']. " | |
| + f"Got {shuffle}" | |
| ) | |
| self.shuffle_type = shuffle | |
| def shuffle(self) -> None: | |
| """Shuffle sampler indices.""" | |
| if self.shuffle_type == "per_mini_epoch" or ( | |
| self.shuffle_type == "per_epoch" and self.state_i == 0 | |
| ): | |
| if self.data_len >= self.mini_epoch_len: | |
| self.indices = self._indices | |
| np.random.shuffle(self.indices) | |
| else: | |
| self.indices = np.random.choice( | |
| self._indices, self.mini_epoch_len, replace=True | |
| ) | |
| def __iter__(self) -> Iterator[int]: | |
| """Iterate over sampler. | |
| Returns: | |
| python iterator | |
| """ | |
| self.state_i = self.state_i % self.divider | |
| self.shuffle() | |
| start = self.state_i * self.mini_epoch_len | |
| stop = ( | |
| self.end_pointer | |
| if (self.state_i == self.steps) | |
| else (self.state_i + 1) * self.mini_epoch_len | |
| ) | |
| indices = self.indices[start:stop].tolist() | |
| self.state_i += 1 | |
| return iter(indices) | |
| def __len__(self) -> int: | |
| """ | |
| Returns: | |
| int: length of the mini-epoch | |
| """ | |
| return self.mini_epoch_len | |
| class DistributedSamplerWrapper(DistributedSampler): | |
| """ | |
| Wrapper over `Sampler` for distributed training. | |
| Allows you to use any sampler in distributed mode. | |
| It is especially useful in conjunction with | |
| `torch.nn.parallel.DistributedDataParallel`. In such case, each | |
| process can pass a DistributedSamplerWrapper instance as a DataLoader | |
| sampler, and load a subset of subsampled data of the original dataset | |
| that is exclusive to it. | |
| .. note:: | |
| Sampler is assumed to be of constant size. | |
| """ | |
| def __init__( | |
| self, | |
| sampler, | |
| num_replicas: Optional[int] = None, | |
| rank: Optional[int] = None, | |
| shuffle: bool = True, | |
| ): | |
| """ | |
| Args: | |
| sampler: Sampler used for subsampling | |
| num_replicas (int, optional): Number of processes participating in | |
| distributed training | |
| rank (int, optional): Rank of the current process | |
| within ``num_replicas`` | |
| shuffle (bool, optional): If true (default), | |
| sampler will shuffle the indices | |
| """ | |
| super(DistributedSamplerWrapper, self).__init__( | |
| DatasetFromSampler(sampler), | |
| num_replicas=num_replicas, | |
| rank=rank, | |
| shuffle=shuffle, | |
| ) | |
| self.sampler = sampler | |
| def __iter__(self) -> Iterator[int]: | |
| """Iterate over sampler. | |
| Returns: | |
| python iterator | |
| """ | |
| self.dataset = DatasetFromSampler(self.sampler) | |
| indexes_of_indexes = super().__iter__() | |
| subsampler_indexes = self.dataset | |
| return iter(itemgetter(*indexes_of_indexes)(subsampler_indexes)) | |
| __all__ = [ | |
| "BalanceClassSampler", | |
| "BatchBalanceClassSampler", | |
| "DistributedSamplerWrapper", | |
| "DynamicBalanceClassSampler", | |
| "MiniEpochSampler", | |
| ] | |