formatter.py 5.66 KB
Newer Older
chenych's avatar
chenych committed
1
# Copyright 2025 the LlamaFactory team.
chenych's avatar
chenych committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
luopl's avatar
luopl committed
19
20

from typing_extensions import override
chenych's avatar
chenych committed
21
22

from .data_utils import SLOTS
luopl's avatar
luopl committed
23
from .tool_utils import FunctionCall, get_tool_utils
chenych's avatar
chenych committed
24
25
26
27
28


@dataclass
class Formatter(ABC):
    slots: SLOTS = field(default_factory=list)
shihm's avatar
uodata  
shihm committed
29
    tool_format: str | None = None
chenych's avatar
chenych committed
30
31

    @abstractmethod
luopl's avatar
luopl committed
32
    def apply(self, **kwargs) -> SLOTS:
chenych's avatar
chenych committed
33
        r"""Forms a list of slots according to the inputs to encode."""
luopl's avatar
luopl committed
34
35
        ...

shihm's avatar
uodata  
shihm committed
36
    def extract(self, content: str) -> str | list["FunctionCall"]:
chenych's avatar
chenych committed
37
        r"""Extract a list of tuples from the response message if using tools.
chenych's avatar
chenych committed
38

luopl's avatar
luopl committed
39
40
        Each tuple consists of function name and function arguments.
        """
chenych's avatar
chenych committed
41
42
43
44
45
46
47
48
49
50
51
52
53
54
        raise NotImplementedError


@dataclass
class EmptyFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if has_placeholder:
            raise ValueError("Empty formatter should not contain any placeholder.")

luopl's avatar
luopl committed
55
    @override
chenych's avatar
chenych committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
    def apply(self, **kwargs) -> SLOTS:
        return self.slots


@dataclass
class StringFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if not has_placeholder:
            raise ValueError("A placeholder is required in the string formatter.")

luopl's avatar
luopl committed
71
    @override
chenych's avatar
chenych committed
72
73
74
75
76
77
    def apply(self, **kwargs) -> SLOTS:
        elements = []
        for slot in self.slots:
            if isinstance(slot, str):
                for name, value in kwargs.items():
                    if not isinstance(value, str):
luopl's avatar
luopl committed
78
                        raise RuntimeError(f"Expected a string, got {value}")
chenych's avatar
chenych committed
79
80
81
82
83
84

                    slot = slot.replace("{{" + name + "}}", value, 1)
                elements.append(slot)
            elif isinstance(slot, (dict, set)):
                elements.append(slot)
            else:
chenych's avatar
chenych committed
85
                raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}.")
chenych's avatar
chenych committed
86
87
88
89
90

        return elements


@dataclass
chenych's avatar
chenych committed
91
class FunctionFormatter(StringFormatter):
chenych's avatar
chenych committed
92
    def __post_init__(self):
chenych's avatar
chenych committed
93
        super().__post_init__()
luopl's avatar
luopl committed
94
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
95

luopl's avatar
luopl committed
96
    @override
chenych's avatar
chenych committed
97
    def apply(self, **kwargs) -> SLOTS:
chenych's avatar
chenych committed
98
        content: str = kwargs.pop("content")
shihm's avatar
uodata  
shihm committed
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
        thought_words = kwargs.pop("thought_words", None)
        tool_call_words = kwargs.pop("tool_call_words", None)

        def _parse_functions(json_content: str) -> list["FunctionCall"]:
            try:
                tool_calls = json.loads(json_content)
                if not isinstance(tool_calls, list):  # parallel function call
                    tool_calls = [tool_calls]

                return [FunctionCall(tc["name"], json.dumps(tc["arguments"], ensure_ascii=False)) for tc in tool_calls]
            except json.JSONDecodeError:
                raise RuntimeError(f"Invalid JSON format in function message: {str([content])}.")

        tool_call_match = None
        if tool_call_words and len(tool_call_words) == 2:
            tool_call_regex = re.compile(
                rf"{re.escape(tool_call_words[0])}(.*?){re.escape(tool_call_words[1])}", re.DOTALL
            )
            tool_call_match = re.search(tool_call_regex, content)

        if tool_call_match is None:
            thought_match = None
            if thought_words and len(thought_words) == 2:
                regex = re.compile(rf"{re.escape(thought_words[0])}(.*?){re.escape(thought_words[1])}", re.DOTALL)
                thought_match = re.search(regex, content)

            if thought_match:
                json_part = content.replace(thought_match.group(0), "")
            else:
                json_part = content

            functions = _parse_functions(json_part)
            function_str = self.tool_utils.function_formatter(functions)
            if thought_match:
                function_str = thought_match.group(0) + function_str
        else:
            thought_content = content.replace(tool_call_match.group(0), "")
            functions = _parse_functions(tool_call_match.group(1))
            function_str = self.tool_utils.function_formatter(functions)
            function_str = thought_content + function_str
chenych's avatar
chenych committed
139

chenych's avatar
chenych committed
140
        return super().apply(content=function_str)
chenych's avatar
chenych committed
141
142
143
144
145


@dataclass
class ToolFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
146
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
147

luopl's avatar
luopl committed
148
    @override
chenych's avatar
chenych committed
149
150
151
152
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        try:
            tools = json.loads(content)
luopl's avatar
luopl committed
153
            return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
chenych's avatar
chenych committed
154
        except json.JSONDecodeError:
chenych's avatar
chenych committed
155
            raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}.")  # flat string
chenych's avatar
chenych committed
156

luopl's avatar
luopl committed
157
    @override
shihm's avatar
uodata  
shihm committed
158
    def extract(self, content: str) -> str | list["FunctionCall"]:
luopl's avatar
luopl committed
159
        return self.tool_utils.tool_extractor(content)