detached_replay_buffer.py 2.6 KB
Newer Older
1
2
import asyncio
import copy
3
import random
4
5
6
from threading import Lock
from typing import Any, List

7
import ray
8
import torch
9
10
from coati.experience_buffer import ExperienceBuffer
from coati.experience_buffer.utils import BufferItem, make_experience_batch, split_experience_batch
11
from coati.experience_maker.base import Experience
12
13
14
# from torch.multiprocessing import Queue
from ray.util.queue import Queue

15
16
17

class DetachedReplayBuffer:
    '''
18
19
        Detached replay buffer. Share Experience across workers on the same node.
        Therefore a trainer node is expected to have only one instance.
20
        It is ExperienceMakerHolder's duty to call append(exp) method, remotely.
21

22
23
24
25
26
27
28
    Args:
        sample_batch_size: Batch size when sampling. Exp won't enqueue until they formed a batch.
        tp_world_size: Number of workers in the same tp group
        limit: Limit of number of experience sample BATCHs. A number <= 0 means unlimited. Defaults to 0.
        cpu_offload: Whether to offload experience to cpu when sampling. Defaults to True.
    '''

29
    def __init__(self, sample_batch_size: int, limit: int = 0) -> None:
30
31
        self.sample_batch_size = sample_batch_size
        self.limit = limit
32
33
        self.items = Queue(self.limit, actor_options={"num_cpus": 1})
        self.batch_collector: List[BufferItem] = []
34

35
36
    @torch.no_grad()
    def append(self, experience: Experience) -> None:
37
        '''
38
        Expected to be called remotely.
39
        '''
40
41
        items = split_experience_batch(experience)
        self.extend(items)
42
43

    @torch.no_grad()
44
    def extend(self, items: List[BufferItem]) -> None:
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
        '''
        Expected to be called remotely.
        '''
        self.batch_collector.extend(items)
        while len(self.batch_collector) >= self.sample_batch_size:
            items = self.batch_collector[:self.sample_batch_size]
            experience = make_experience_batch(items)
            self.items.put(experience, block=True)
            self.batch_collector = self.batch_collector[self.sample_batch_size:]

    def clear(self) -> None:
        # self.items.close()
        self.items.shutdown()
        self.items = Queue(self.limit)
        self.worker_state = [False] * self.tp_world_size
        self.batch_collector = []
61

62
    @torch.no_grad()
63
64
    def sample(self, worker_rank=0, to_device="cpu") -> Experience:
        ret = self._sample_and_erase()
65
66
67
68
69
70
71
72
73
74
        ret.to_device(to_device)
        return ret

    @torch.no_grad()
    def _sample_and_erase(self) -> Experience:
        ret = self.items.get(block=True)
        return ret

    def get_length(self) -> int:
        ret = self.items.qsize()
75
        return ret