formatter.py 4.74 KB
Newer Older
chenych's avatar
chenych committed
1
# Copyright 2025 the LlamaFactory team.
chenych's avatar
chenych committed
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
luopl's avatar
luopl committed
19
from typing import List, Optional, Union
luopl's avatar
luopl committed
20
21

from typing_extensions import override
chenych's avatar
chenych committed
22
23

from .data_utils import SLOTS
luopl's avatar
luopl committed
24
from .tool_utils import FunctionCall, get_tool_utils
chenych's avatar
chenych committed
25
26
27
28
29


@dataclass
class Formatter(ABC):
    slots: SLOTS = field(default_factory=list)
luopl's avatar
luopl committed
30
    tool_format: Optional[str] = None
chenych's avatar
chenych committed
31
32

    @abstractmethod
luopl's avatar
luopl committed
33
34
35
36
37
38
39
40
41
    def apply(self, **kwargs) -> SLOTS:
        r"""
        Forms a list of slots according to the inputs to encode.
        """
        ...

    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        r"""
        Extract a list of tuples from the response message if using tools.
chenych's avatar
chenych committed
42

luopl's avatar
luopl committed
43
44
        Each tuple consists of function name and function arguments.
        """
chenych's avatar
chenych committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
        raise NotImplementedError


@dataclass
class EmptyFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if has_placeholder:
            raise ValueError("Empty formatter should not contain any placeholder.")

luopl's avatar
luopl committed
59
    @override
chenych's avatar
chenych committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    def apply(self, **kwargs) -> SLOTS:
        return self.slots


@dataclass
class StringFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if not has_placeholder:
            raise ValueError("A placeholder is required in the string formatter.")

luopl's avatar
luopl committed
75
    @override
chenych's avatar
chenych committed
76
77
78
79
80
81
    def apply(self, **kwargs) -> SLOTS:
        elements = []
        for slot in self.slots:
            if isinstance(slot, str):
                for name, value in kwargs.items():
                    if not isinstance(value, str):
luopl's avatar
luopl committed
82
                        raise RuntimeError(f"Expected a string, got {value}")
chenych's avatar
chenych committed
83
84
85
86
87
88

                    slot = slot.replace("{{" + name + "}}", value, 1)
                elements.append(slot)
            elif isinstance(slot, (dict, set)):
                elements.append(slot)
            else:
chenych's avatar
chenych committed
89
                raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}.")
chenych's avatar
chenych committed
90
91
92
93
94

        return elements


@dataclass
chenych's avatar
chenych committed
95
class FunctionFormatter(StringFormatter):
chenych's avatar
chenych committed
96
    def __post_init__(self):
chenych's avatar
chenych committed
97
        super().__post_init__()
luopl's avatar
luopl committed
98
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
99

luopl's avatar
luopl committed
100
    @override
chenych's avatar
chenych committed
101
    def apply(self, **kwargs) -> SLOTS:
chenych's avatar
chenych committed
102
103
104
105
106
107
        content: str = kwargs.pop("content")
        regex = re.compile(r"<think>(.*)</think>", re.DOTALL)
        thought = re.search(regex, content)
        if thought:
            content = content.replace(thought.group(0), "")

luopl's avatar
luopl committed
108
        functions: List["FunctionCall"] = []
chenych's avatar
chenych committed
109
110
111
112
113
114
        try:
            tool_calls = json.loads(content)
            if not isinstance(tool_calls, list):  # parallel function call
                tool_calls = [tool_calls]

            for tool_call in tool_calls:
luopl's avatar
luopl committed
115
116
117
                functions.append(
                    FunctionCall(tool_call["name"], json.dumps(tool_call["arguments"], ensure_ascii=False))
                )
chenych's avatar
chenych committed
118
119

        except json.JSONDecodeError:
chenych's avatar
chenych committed
120
            raise RuntimeError(f"Invalid JSON format in function message: {str([content])}.")  # flat string
chenych's avatar
chenych committed
121

chenych's avatar
chenych committed
122
123
124
        function_str = self.tool_utils.function_formatter(functions)
        if thought:
            function_str = thought.group(1) + function_str
chenych's avatar
chenych committed
125

chenych's avatar
chenych committed
126
        return super().apply(content=function_str)
chenych's avatar
chenych committed
127
128
129
130
131


@dataclass
class ToolFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
132
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
133

luopl's avatar
luopl committed
134
    @override
chenych's avatar
chenych committed
135
136
137
138
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        try:
            tools = json.loads(content)
luopl's avatar
luopl committed
139
            return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
chenych's avatar
chenych committed
140
        except json.JSONDecodeError:
chenych's avatar
chenych committed
141
            raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}.")  # flat string
chenych's avatar
chenych committed
142

luopl's avatar
luopl committed
143
144
145
    @override
    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        return self.tool_utils.tool_extractor(content)