formatter.py 4.55 KB
Newer Older
chenych's avatar
chenych committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
# Copyright 2024 the LlamaFactory team.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import json
import re
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
luopl's avatar
luopl committed
19
from typing import List, Optional, Union
luopl's avatar
luopl committed
20
21

from typing_extensions import override
chenych's avatar
chenych committed
22
23

from .data_utils import SLOTS
luopl's avatar
luopl committed
24
from .tool_utils import FunctionCall, get_tool_utils
chenych's avatar
chenych committed
25
26
27
28
29


@dataclass
class Formatter(ABC):
    slots: SLOTS = field(default_factory=list)
luopl's avatar
luopl committed
30
    tool_format: Optional[str] = None
chenych's avatar
chenych committed
31
32

    @abstractmethod
luopl's avatar
luopl committed
33
34
35
36
37
38
39
40
41
    def apply(self, **kwargs) -> SLOTS:
        r"""
        Forms a list of slots according to the inputs to encode.
        """
        ...

    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        r"""
        Extract a list of tuples from the response message if using tools.
chenych's avatar
chenych committed
42

luopl's avatar
luopl committed
43
44
        Each tuple consists of function name and function arguments.
        """
chenych's avatar
chenych committed
45
46
47
48
49
50
51
52
53
54
55
56
57
58
        raise NotImplementedError


@dataclass
class EmptyFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if has_placeholder:
            raise ValueError("Empty formatter should not contain any placeholder.")

luopl's avatar
luopl committed
59
    @override
chenych's avatar
chenych committed
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
    def apply(self, **kwargs) -> SLOTS:
        return self.slots


@dataclass
class StringFormatter(Formatter):
    def __post_init__(self):
        has_placeholder = False
        for slot in filter(lambda s: isinstance(s, str), self.slots):
            if re.search(r"\{\{[a-zA-Z_][a-zA-Z0-9_]*\}\}", slot):
                has_placeholder = True

        if not has_placeholder:
            raise ValueError("A placeholder is required in the string formatter.")

luopl's avatar
luopl committed
75
    @override
chenych's avatar
chenych committed
76
77
78
79
80
81
    def apply(self, **kwargs) -> SLOTS:
        elements = []
        for slot in self.slots:
            if isinstance(slot, str):
                for name, value in kwargs.items():
                    if not isinstance(value, str):
luopl's avatar
luopl committed
82
                        raise RuntimeError(f"Expected a string, got {value}")
chenych's avatar
chenych committed
83
84
85
86
87
88

                    slot = slot.replace("{{" + name + "}}", value, 1)
                elements.append(slot)
            elif isinstance(slot, (dict, set)):
                elements.append(slot)
            else:
luopl's avatar
luopl committed
89
                raise RuntimeError(f"Input must be string, set[str] or dict[str, str], got {type(slot)}")
chenych's avatar
chenych committed
90
91
92
93
94
95
96

        return elements


@dataclass
class FunctionFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
97
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
98

luopl's avatar
luopl committed
99
    @override
chenych's avatar
chenych committed
100
101
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
luopl's avatar
luopl committed
102
        functions: List["FunctionCall"] = []
chenych's avatar
chenych committed
103
104
105
106
107
108
        try:
            tool_calls = json.loads(content)
            if not isinstance(tool_calls, list):  # parallel function call
                tool_calls = [tool_calls]

            for tool_call in tool_calls:
luopl's avatar
luopl committed
109
110
111
                functions.append(
                    FunctionCall(tool_call["name"], json.dumps(tool_call["arguments"], ensure_ascii=False))
                )
chenych's avatar
chenych committed
112
113

        except json.JSONDecodeError:
luopl's avatar
luopl committed
114
            raise RuntimeError(f"Invalid JSON format in function message: {str([content])}")  # flat string
chenych's avatar
chenych committed
115
116

        elements = []
luopl's avatar
luopl committed
117
118
119
120
121
        for slot in self.slots:
            if slot == "{{content}}":
                elements += self.tool_utils.function_formatter(functions)
            else:
                elements.append(slot)
chenych's avatar
chenych committed
122
123
124
125
126
127
128

        return elements


@dataclass
class ToolFormatter(Formatter):
    def __post_init__(self):
luopl's avatar
luopl committed
129
        self.tool_utils = get_tool_utils(self.tool_format)
chenych's avatar
chenych committed
130

luopl's avatar
luopl committed
131
    @override
chenych's avatar
chenych committed
132
133
134
135
    def apply(self, **kwargs) -> SLOTS:
        content = kwargs.pop("content")
        try:
            tools = json.loads(content)
luopl's avatar
luopl committed
136
            return [self.tool_utils.tool_formatter(tools) if len(tools) != 0 else ""]
chenych's avatar
chenych committed
137
        except json.JSONDecodeError:
luopl's avatar
luopl committed
138
            raise RuntimeError(f"Invalid JSON format in tool description: {str([content])}")  # flat string
chenych's avatar
chenych committed
139

luopl's avatar
luopl committed
140
141
142
    @override
    def extract(self, content: str) -> Union[str, List["FunctionCall"]]:
        return self.tool_utils.tool_extractor(content)