batch.py 2.46 KB
Newer Older
1
2
from typing import Any, Deque, Hashable, List, Tuple

3
4
5
6
7
8
9
10
11
12
13
import torch
from energonai import BatchManager, SubmitEntry, TaskEntry


class BatchManagerForGeneration(BatchManager):
    def __init__(self, max_batch_size: int = 1, pad_token_id: int = 0) -> None:
        super().__init__()
        self.max_batch_size = max_batch_size
        self.pad_token_id = pad_token_id

    def _left_padding(self, batch_inputs):
14
15
        max_len = max(len(inputs["input_ids"]) for inputs in batch_inputs)
        outputs = {"input_ids": [], "attention_mask": []}
16
        for inputs in batch_inputs:
17
            input_ids, attention_mask = inputs["input_ids"], inputs["attention_mask"]
18
19
20
            padding_len = max_len - len(input_ids)
            input_ids = [self.pad_token_id] * padding_len + input_ids
            attention_mask = [0] * padding_len + attention_mask
21
22
            outputs["input_ids"].append(input_ids)
            outputs["attention_mask"].append(attention_mask)
23
24
25
26
27
28
29
        for k in outputs:
            outputs[k] = torch.tensor(outputs[k])
        return outputs, max_len

    @staticmethod
    def _make_batch_key(entry: SubmitEntry) -> tuple:
        data = entry.data
30
        return (data["top_k"], data["top_p"], data["temperature"])
31
32
33
34
35
36
37
38
39
40

    def make_batch(self, q: Deque[SubmitEntry]) -> Tuple[TaskEntry, dict]:
        entry = q.popleft()
        uids = [entry.uid]
        batch = [entry.data]
        while len(batch) < self.max_batch_size:
            if len(q) == 0:
                break
            if self._make_batch_key(entry) != self._make_batch_key(q[0]):
                break
41
            if q[0].data["max_tokens"] > entry.data["max_tokens"]:
42
43
44
45
46
47
48
                break
            e = q.popleft()
            batch.append(e.data)
            uids.append(e.uid)
        inputs, max_len = self._left_padding(batch)
        trunc_lens = []
        for data in batch:
49
50
51
52
53
54
            trunc_lens.append(max_len + data["max_tokens"])
        inputs["top_k"] = entry.data["top_k"]
        inputs["top_p"] = entry.data["top_p"]
        inputs["temperature"] = entry.data["temperature"]
        inputs["max_tokens"] = max_len + entry.data["max_tokens"]
        return TaskEntry(tuple(uids), inputs), {"trunc_lens": trunc_lens}
55
56
57
58
59
60

    def split_batch(self, task_entry: TaskEntry, trunc_lens: List[int] = []) -> List[Tuple[Hashable, Any]]:
        retval = []
        for uid, output, trunc_len in zip(task_entry.uids, task_entry.batch, trunc_lens):
            retval.append((uid, output[:trunc_len]))
        return retval