formatter.py 4.7 KB
Newer Older
chenych's avatar
chenych committed
1
# Copyright 2025 the LlamaFactory team.
chenych's avatar
chenych committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
chenych's avatar
chenych committed
19
from typing import Optional, Union
luopl's avatar
luopl committed
20
21

from typing_extensions import override
chenych's avatar
chenych committed
22
23

from .data_utils import SLOTS
luopl's avatar
luopl committed
24
from .tool_utils import FunctionCall, get_tool_utils
chenych's avatar
chenych committed
25
26
27
28
29


@dataclass
class Formatter(ABC):
    slots: SLOTS = field(default_factory=list)
luopl's avatar
luopl committed
30
    tool_format: Optional[str] = None
chenych's avatar
chenych committed
31
32

    @abstractmethod
luopl's avatar
luopl committed
33
    def apply(self, **kwargs) -> SLOTS:
chenych's avatar
chenych committed
34
        r"""Forms a list of slots according to the inputs to encode."""
luopl's avatar
luopl committed
35
36
        ...

chenych's avatar
chenych committed
37
38
    def extract(self, content: str) -> Union[str, list["FunctionCall"]]:
        r"""Extract a list of tuples from the response message if using tools.
chenych's avatar
chenych committed
39

luopl's avatar
luopl committed
40
41
        Each tuple consists of function name and function arguments.
        """
chenych's avatar
chenych committed
42
43
44
45
46
47
48
49
50
51
52
53
54
55
        raise NotImplementedError


@dataclass
class EmptyFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if has_placeholder:
            raise ValueError("Empty formatter should not contain any placeholder.")

luopl's avatar
luopl committed
56
    @override
chenych's avatar
chenych committed
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
    def apply(self, **kwargs) -> SLOTS:
        return self.slots


@dataclass
class StringFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if not has_placeholder:
            raise ValueError("A placeholder is required in the string formatter.")

luopl's avatar
luopl committed
72
    @override
chenych's avatar
chenych committed
73
74
75
76
77
78
    def apply(self, **kwargs) -> SLOTS:
        elements = []
        for slot in self.slots:
            if isinstance(slot, str):
                for name, value in kwargs.items():
                    if not isinstance(value, str):
luopl's avatar
luopl committed
79
                        raise RuntimeError(f"Expected a string, got {value}")
chenych's avatar
chenych committed
80
81
82
83
84
85

                    slot = slot.replace("{{" + name + "}}", value, 1)
                elements.append(slot)
            elif isinstance(slot, (dict, set)):
                elements.append(slot)
            else:
chenych's avatar
chenych committed
86
                raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}.")
chenych's avatar
chenych committed
87
88
89
90
91

        return elements


@dataclass
chenych's avatar
chenych committed
92
class FunctionFormatter(StringFormatter):
chenych's avatar
chenych committed
93
    def __post_init__(self):
chenych's avatar
chenych committed
94
        super().__post_init__()
luopl's avatar
luopl committed
95
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
96

luopl's avatar
luopl committed
97
    @override
chenych's avatar
chenych committed
98
    def apply(self, **kwargs) -> SLOTS:
chenych's avatar
chenych committed
99
100
101
102
103
104
        content: str = kwargs.pop("content")
        regex = re.compile(r"<think>(.*)</think>", re.DOTALL)
        thought = re.search(regex, content)
        if thought:
            content = content.replace(thought.group(0), "")

chenych's avatar
chenych committed
105
        functions: list[FunctionCall] = []
chenych's avatar
chenych committed
106
107
108
109
110
111
        try:
            tool_calls = json.loads(content)
            if not isinstance(tool_calls, list):  # parallel function call
                tool_calls = [tool_calls]

            for tool_call in tool_calls:
luopl's avatar
luopl committed
112
113
114
                functions.append(
                    FunctionCall(tool_call["name"], json.dumps(tool_call["arguments"], ensure_ascii=False))
                )
chenych's avatar
chenych committed
115
116

        except json.JSONDecodeError:
chenych's avatar
chenych committed
117
            raise RuntimeError(f"Invalid JSON format in function message: {str([content])}.")  # flat string
chenych's avatar
chenych committed
118

chenych's avatar
chenych committed
119
120
        function_str = self.tool_utils.function_formatter(functions)
        if thought:
chenych's avatar
chenych committed
121
            function_str = thought.group(0) + function_str
chenych's avatar
chenych committed
122

chenych's avatar
chenych committed
123
        return super().apply(content=function_str)
chenych's avatar
chenych committed
124
125
126
127
128


@dataclass
class ToolFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
129
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
130

luopl's avatar
luopl committed
131
    @override
chenych's avatar
chenych committed
132
133
134
135
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        try:
            tools = json.loads(content)
luopl's avatar
luopl committed
136
            return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
chenych's avatar
chenych committed
137
        except json.JSONDecodeError:
chenych's avatar
chenych committed
138
            raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}.")  # flat string
chenych's avatar
chenych committed
139

luopl's avatar
luopl committed
140
    @override
chenych's avatar
chenych committed
141
    def extract(self, content: str) -> Union[str, list["FunctionCall"]]:
luopl's avatar
luopl committed
142
        return self.tool_utils.tool_extractor(content)