formatter.py 4.88 KB
Newer Older
chenych's avatar
chenych committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2024 the LlamaFactory team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
luopl's avatar
luopl committed
19
20
21
from typing import TYPE_CHECKING, List, Optional, Tuple, Union

from typing_extensions import override
chenych's avatar
chenych committed
22
23

from .data_utils import SLOTS
luopl's avatar
luopl committed
24
25
26
27
28
from .tool_utils import get_tool_utils


if TYPE_CHECKING:
    from .tool_utils import FunctionCall
chenych's avatar
chenych committed
29
30
31
32
33


@dataclass
class Formatter(ABC):
    slots: SLOTS = field(default_factory=list)
luopl's avatar
luopl committed
34
    tool_format: Optional[str] = None
chenych's avatar
chenych committed
35
36

    @abstractmethod
luopl's avatar
luopl committed
37
38
39
40
41
42
43
44
45
    def apply(self, **kwargs) -> SLOTS:
        r"""
        Forms a list of slots according to the inputs to encode.
        """
        ...

    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        r"""
        Extract a list of tuples from the response message if using tools.
chenych's avatar
chenych committed
46

luopl's avatar
luopl committed
47
48
        Each tuple consists of function name and function arguments.
        """
chenych's avatar
chenych committed
49
50
51
52
53
54
55
56
57
58
59
60
61
62
        raise NotImplementedError


@dataclass
class EmptyFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if has_placeholder:
            raise ValueError("Empty formatter should not contain any placeholder.")

luopl's avatar
luopl committed
63
    @override
chenych's avatar
chenych committed
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
    def apply(self, **kwargs) -> SLOTS:
        return self.slots


@dataclass
class StringFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if not has_placeholder:
            raise ValueError("A placeholder is required in the string formatter.")

luopl's avatar
luopl committed
79
    @override
chenych's avatar
chenych committed
80
81
82
83
84
85
    def apply(self, **kwargs) -> SLOTS:
        elements = []
        for slot in self.slots:
            if isinstance(slot, str):
                for name, value in kwargs.items():
                    if not isinstance(value, str):
luopl's avatar
luopl committed
86
                        raise RuntimeError(f"Expected a string, got {value}")
chenych's avatar
chenych committed
87
88
89
90
91
92

                    slot = slot.replace("{{" + name + "}}", value, 1)
                elements.append(slot)
            elif isinstance(slot, (dict, set)):
                elements.append(slot)
            else:
luopl's avatar
luopl committed
93
                raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}")
chenych's avatar
chenych committed
94
95
96
97
98
99
100

        return elements


@dataclass
class FunctionFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
101
        self.slots = get_tool_utils(self.tool_format).get_function_slots() + self.slots
chenych's avatar
chenych committed
102

luopl's avatar
luopl committed
103
    @override
chenych's avatar
chenych committed
104
105
106
107
108
109
110
111
112
113
114
115
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        functions: List[Tuple[str, str]] = []
        try:
            tool_calls = json.loads(content)
            if not isinstance(tool_calls, list):  # parallel function call
                tool_calls = [tool_calls]

            for tool_call in tool_calls:
                functions.append((tool_call["name"], json.dumps(tool_call["arguments"], ensure_ascii=False)))

        except json.JSONDecodeError:
luopl's avatar
luopl committed
116
            raise RuntimeError(f"Invalid JSON format in function message: {str([content])}")  # flat string
chenych's avatar
chenych committed
117
118
119
120
121
122
123
124
125
126

        elements = []
        for name, arguments in functions:
            for slot in self.slots:
                if isinstance(slot, str):
                    slot = slot.replace("{{name}}", name).replace("{{arguments}}", arguments)
                    elements.append(slot)
                elif isinstance(slot, (dict, set)):
                    elements.append(slot)
                else:
luopl's avatar
luopl committed
127
                    raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}")
chenych's avatar
chenych committed
128
129
130
131
132
133
134

        return elements


@dataclass
class ToolFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
135
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
136

luopl's avatar
luopl committed
137
    @override
chenych's avatar
chenych committed
138
139
140
141
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        try:
            tools = json.loads(content)
luopl's avatar
luopl committed
142
            return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
chenych's avatar
chenych committed
143
        except json.JSONDecodeError:
luopl's avatar
luopl committed
144
            raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}")  # flat string
chenych's avatar
chenych committed
145

luopl's avatar
luopl committed
146
147
148
    @override
    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        return self.tool_utils.tool_extractor(content)