naive.py 2.28 KB
Newer Older
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
1
2
3
4
5
6
import random
from typing import List

import torch
from coati.experience_maker.base import Experience

7
8
from colossalai.logging import get_dist_logger

9
from .base import ExperienceBuffer
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
10
11
from .utils import BufferItem, make_experience_batch, split_experience_batch

12
13
logger = get_dist_logger()

Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
14

15
16
class NaiveExperienceBuffer(ExperienceBuffer):
    """Naive experience buffer class. It stores experience.
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
17

18
19
20
21
    Args:
        sample_batch_size (int): Batch size when sampling.
        limit (int, optional): Limit of number of experience samples. A number <= 0 means unlimited. Defaults to 0.
        cpu_offload (bool, optional): Whether to offload experience to cpu when sampling. Defaults to True.
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
22
23
24
25
26
    """

    def __init__(self, sample_batch_size: int, limit: int = 0, cpu_offload: bool = True) -> None:
        super().__init__(sample_batch_size, limit)
        self.cpu_offload = cpu_offload
27
        self.target_device = torch.device(f"cuda:{torch.cuda.current_device()}")
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
28
29
30
31
32
33
        # TODO(ver217): add prefetch
        self.items: List[BufferItem] = []

    @torch.no_grad()
    def append(self, experience: Experience) -> None:
        if self.cpu_offload:
34
            experience.to_device(torch.device("cpu"))
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
35
36
        items = split_experience_batch(experience)
        self.items.extend(items)
37

Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
38
39
40
        if self.limit > 0:
            samples_to_remove = len(self.items) - self.limit
            if samples_to_remove > 0:
41
                logger.warning(f"Experience buffer is full. Removing {samples_to_remove} samples.")
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
42
43
44
45
46
47
48
                self.items = self.items[samples_to_remove:]

    def clear(self) -> None:
        self.items.clear()

    @torch.no_grad()
    def sample(self) -> Experience:
49
50
51
52
53
54
        """
        Randomly samples experiences from the buffer.

        Returns:
            A batch of sampled experiences.
        """
Fazzie-Maqianli's avatar
Fazzie-Maqianli committed
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
        items = random.sample(self.items, self.sample_batch_size)
        experience = make_experience_batch(items)
        if self.cpu_offload:
            experience.to_device(self.target_device)
        return experience

    def __len__(self) -> int:
        return len(self.items)

    def __getitem__(self, idx: int) -> BufferItem:
        return self.items[idx]

    def collate_fn(self, batch) -> Experience:
        experience = make_experience_batch(batch)
        return experience