Commit d344a08c authored by mashun1's avatar mashun1
Browse files

openmanus

parents
Pipeline #2480 failed with stages
in 0 seconds
from typing import Dict, List, Literal, Optional, Union
from openai import (
APIError,
AsyncOpenAI,
AuthenticationError,
OpenAIError,
RateLimitError,
)
from langchain_ollama import ChatOllama
from tenacity import retry, stop_after_attempt, wait_random_exponential
from app.config import LLMSettings, config
from app.logger import logger # Assuming a logger is set up in your app
from app.schema import Message
class LLM:
_instances: Dict[str, "LLM"] = {}
def __new__(
cls, config_name: str = "default", llm_config: Optional[LLMSettings] = None
):
if config_name not in cls._instances:
instance = super().__new__(cls)
instance.__init__(config_name, llm_config)
cls._instances[config_name] = instance
return cls._instances[config_name]
def __init__(
self, config_name: str = "default", llm_config: Optional[LLMSettings] = None
):
if not hasattr(self, "client"): # Only initialize if not already initialized
llm_config = llm_config or config.llm
llm_config = llm_config.get(config_name, llm_config["default"])
self.model = llm_config.model
self.max_tokens = llm_config.max_tokens
self.temperature = llm_config.temperature
self.client = AsyncOpenAI(
api_key=llm_config.api_key, base_url=llm_config.base_url
)
@staticmethod
def format_messages(messages: List[Union[dict, Message]]) -> List[dict]:
"""
Format messages for LLM by converting them to OpenAI message format.
Args:
messages: List of messages that can be either dict or Message objects
Returns:
List[dict]: List of formatted messages in OpenAI format
Raises:
ValueError: If messages are invalid or missing required fields
TypeError: If unsupported message types are provided
Examples:
>>> msgs = [
... Message.system_message("You are a helpful assistant"),
... {"role": "user", "content": "Hello"},
... Message.user_message("How are you?")
... ]
>>> formatted = LLM.format_messages(msgs)
"""
formatted_messages = []
for message in messages:
if isinstance(message, dict):
# If message is already a dict, ensure it has required fields
if "role" not in message:
raise ValueError("Message dict must contain 'role' field")
formatted_messages.append(message)
elif isinstance(message, Message):
# If message is a Message object, convert it to dict
formatted_messages.append(message.to_dict())
else:
raise TypeError(f"Unsupported message type: {type(message)}")
# Validate all messages have required fields
for msg in formatted_messages:
if msg["role"] not in ["system", "user", "assistant", "tool"]:
raise ValueError(f"Invalid role: {msg['role']}")
if "content" not in msg and "tool_calls" not in msg:
raise ValueError(
"Message must contain either 'content' or 'tool_calls'"
)
return formatted_messages
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
async def ask(
self,
messages: List[Union[dict, Message]],
system_msgs: Optional[List[Union[dict, Message]]] = None,
stream: bool = True,
temperature: Optional[float] = None,
) -> str:
"""
Send a prompt to the LLM and get the response.
Args:
messages: List of conversation messages
system_msgs: Optional system messages to prepend
stream (bool): Whether to stream the response
temperature (float): Sampling temperature for the response
Returns:
str: The generated response
Raises:
ValueError: If messages are invalid or response is empty
OpenAIError: If API call fails after retries
Exception: For unexpected errors
"""
try:
# Format system and user messages
if system_msgs:
system_msgs = self.format_messages(system_msgs)
messages = system_msgs + self.format_messages(messages)
else:
messages = self.format_messages(messages)
if not stream:
# Non-streaming request
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=self.max_tokens,
temperature=temperature or self.temperature,
stream=False,
)
if not response.choices or not response.choices[0].message.content:
raise ValueError("Empty or invalid response from LLM")
return response.choices[0].message.content
# Streaming request
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
max_tokens=self.max_tokens,
temperature=temperature or self.temperature,
stream=True,
)
collected_messages = []
async for chunk in response:
chunk_message = chunk.choices[0].delta.content or ""
collected_messages.append(chunk_message)
print(chunk_message, end="", flush=True)
print() # Newline after streaming
full_response = "".join(collected_messages).strip()
if not full_response:
raise ValueError("Empty response from streaming LLM")
return full_response
except ValueError as ve:
logger.error(f"Validation error: {ve}")
raise
except OpenAIError as oe:
logger.error(f"OpenAI API error: {oe}")
raise
except Exception as e:
logger.error(f"Unexpected error in ask: {e}")
raise
@retry(
wait=wait_random_exponential(min=1, max=60),
stop=stop_after_attempt(6),
)
async def ask_tool(
self,
messages: List[Union[dict, Message]],
system_msgs: Optional[List[Union[dict, Message]]] = None,
timeout: int = 60,
tools: Optional[List[dict]] = None,
tool_choice: Literal["none", "auto", "required"] = "auto",
temperature: Optional[float] = None,
**kwargs,
):
"""
Ask LLM using functions/tools and return the response.
Args:
messages: List of conversation messages
system_msgs: Optional system messages to prepend
timeout: Request timeout in seconds
tools: List of tools to use
tool_choice: Tool choice strategy
temperature: Sampling temperature for the response
**kwargs: Additional completion arguments
Returns:
ChatCompletionMessage: The model's response
Raises:
ValueError: If tools, tool_choice, or messages are invalid
OpenAIError: If API call fails after retries
Exception: For unexpected errors
"""
try:
# Validate tool_choice
if tool_choice not in ["none", "auto", "required"]:
raise ValueError(f"Invalid tool_choice: {tool_choice}")
# Format messages
if system_msgs:
system_msgs = self.format_messages(system_msgs)
messages = system_msgs + self.format_messages(messages)
else:
messages = self.format_messages(messages)
# Validate tools if provided
if tools:
for tool in tools:
if not isinstance(tool, dict) or "type" not in tool:
raise ValueError("Each tool must be a dict with 'type' field")
# Set up the completion request
response = await self.client.chat.completions.create(
model=self.model,
messages=messages,
temperature=temperature or self.temperature,
max_tokens=self.max_tokens,
tools=tools,
tool_choice=tool_choice,
timeout=timeout,
**kwargs,
)
# Check if response is valid
if not response.choices or not response.choices[0].message:
print(response)
raise ValueError("Invalid or empty response from LLM")
return response.choices[0].message
except ValueError as ve:
logger.error(f"Validation error in ask_tool: {ve}")
raise
except OpenAIError as oe:
if isinstance(oe, AuthenticationError):
logger.error("Authentication failed. Check API key.")
elif isinstance(oe, RateLimitError):
logger.error("Rate limit exceeded. Consider increasing retry attempts.")
elif isinstance(oe, APIError):
logger.error(f"API error: {oe}")
raise
except Exception as e:
logger.error(f"Unexpected error in ask_tool: {e}")
raise
import sys
from datetime import datetime
from loguru import logger as _logger
from app.config import PROJECT_ROOT
_print_level = "INFO"
def define_log_level(print_level="INFO", logfile_level="DEBUG", name: str = None):
"""Adjust the log level to above level"""
global _print_level
_print_level = print_level
current_date = datetime.now()
formatted_date = current_date.strftime("%Y%m%d")
log_name = (
f"{name}_{formatted_date}" if name else formatted_date
) # name a log with prefix name
_logger.remove()
_logger.add(sys.stderr, level=print_level)
_logger.add(PROJECT_ROOT / f"logs/{log_name}.log", level=logfile_level)
return _logger
logger = define_log_level()
if __name__ == "__main__":
logger.info("Starting application")
logger.debug("Debug message")
logger.warning("Warning message")
logger.error("Error message")
logger.critical("Critical message")
try:
raise ValueError("Test error")
except Exception as e:
logger.exception(f"An error occurred: {e}")
SYSTEM_PROMPT = "You are OpenManus, an all-capable AI assistant, aimed at solving any task presented by the user. You have various tools at your disposal that you can call upon to efficiently complete complex requests. Whether it's programming, information retrieval, file processing, or web browsing, you can handle it all."
NEXT_STEP_PROMPT = """You can interact with the computer using PythonExecute, save important content and information files through FileSaver, open browsers with BrowserUseTool, and retrieve information using BingSearch.
PythonExecute: Execute Python code to interact with the computer system, data processing, automation tasks, etc.
FileSaver: Save files locally, such as txt, py, html, etc.
BrowserUseTool: Open, browse, and use web browsers.If you open a local HTML file, you must provide the absolute path to the file.
BingSearch: Perform web information retrieval
Based on user needs, proactively select the most appropriate tool or combination of tools. For complex tasks, you can break down the problem and use different tools step by step to solve it. After using each tool, clearly explain the execution results and suggest the next steps.
"""
PLANNING_SYSTEM_PROMPT = """
You are an expert Planning Agent tasked with solving complex problems by creating and managing structured plans.
Your job is:
1. Analyze requests to understand the task scope
2. Create clear, actionable plans with the `planning` tool
3. Execute steps using available tools as needed
4. Track progress and adapt plans dynamically
5. Use `finish` to conclude when the task is complete
Available tools will vary by task but may include:
- `planning`: Create, update, and track plans (commands: create, update, mark_step, etc.)
- `finish`: End the task when complete
Break tasks into logical, sequential steps. Think about dependencies and verification methods.
"""
NEXT_STEP_PROMPT = """
Based on the current state, what's your next step?
Consider:
1. Do you need to create or refine a plan?
2. Are you ready to execute a specific step?
3. Have you completed the task?
Provide reasoning, then select the appropriate tool or action.
"""
SYSTEM_PROMPT = """SETTING: You are an autonomous programmer, and you're working directly in the command line with a special interface.
The special interface consists of a file editor that shows you {{WINDOW}} lines of a file at a time.
In addition to typical bash commands, you can also use specific commands to help you navigate and edit files.
To call a command, you need to invoke it with a function call/tool call.
Please note that THE EDIT COMMAND REQUIRES PROPER INDENTATION.
If you'd like to add the line ' print(x)' you must fully write that out, with all those spaces before the code! Indentation is important and code that is not indented correctly will fail and require fixing before it can be run.
RESPONSE FORMAT:
Your shell prompt is formatted as follows:
(Open file: <path>)
(Current directory: <cwd>)
bash-$
First, you should _always_ include a general thought about what you're going to do next.
Then, for every response, you must include exactly _ONE_ tool call/function call.
Remember, you should always include a _SINGLE_ tool call/function call and then wait for a response from the shell before continuing with more discussion and commands. Everything you include in the DISCUSSION section will be saved for future reference.
If you'd like to issue two commands at once, PLEASE DO NOT DO THAT! Please instead first submit just the first tool call, and then after receiving a response you'll be able to issue the second tool call.
Note that the environment does NOT support interactive session commands (e.g. python, vim), so please do not invoke them.
"""
NEXT_STEP_TEMPLATE = """{{observation}}
(Open file: {{open_file}})
(Current directory: {{working_dir}})
bash-$
"""
SYSTEM_PROMPT = "You are an agent that can execute tool calls"
NEXT_STEP_PROMPT = (
"If you want to stop interaction, use `terminate` tool/function call."
)
from enum import Enum
from typing import Any, List, Literal, Optional, Union
from pydantic import BaseModel, Field
class AgentState(str, Enum):
"""Agent execution states"""
IDLE = "IDLE"
RUNNING = "RUNNING"
FINISHED = "FINISHED"
ERROR = "ERROR"
class Function(BaseModel):
name: str
arguments: str
class ToolCall(BaseModel):
"""Represents a tool/function call in a message"""
id: str
type: str = "function"
function: Function
class Message(BaseModel):
"""Represents a chat message in the conversation"""
role: Literal["system", "user", "assistant", "tool"] = Field(...)
content: Optional[str] = Field(default=None)
tool_calls: Optional[List[ToolCall]] = Field(default=None)
name: Optional[str] = Field(default=None)
tool_call_id: Optional[str] = Field(default=None)
def __add__(self, other) -> List["Message"]:
"""支持 Message + list 或 Message + Message 的操作"""
if isinstance(other, list):
return [self] + other
elif isinstance(other, Message):
return [self, other]
else:
raise TypeError(
f"unsupported operand type(s) for +: '{type(self).__name__}' and '{type(other).__name__}'"
)
def __radd__(self, other) -> List["Message"]:
"""支持 list + Message 的操作"""
if isinstance(other, list):
return other + [self]
else:
raise TypeError(
f"unsupported operand type(s) for +: '{type(other).__name__}' and '{type(self).__name__}'"
)
def to_dict(self) -> dict:
"""Convert message to dictionary format"""
message = {"role": self.role}
if self.content is not None:
message["content"] = self.content
if self.tool_calls is not None:
message["tool_calls"] = [tool_call.dict() for tool_call in self.tool_calls]
if self.name is not None:
message["name"] = self.name
if self.tool_call_id is not None:
message["tool_call_id"] = self.tool_call_id
return message
@classmethod
def user_message(cls, content: str) -> "Message":
"""Create a user message"""
return cls(role="user", content=content)
@classmethod
def system_message(cls, content: str) -> "Message":
"""Create a system message"""
return cls(role="system", content=content)
@classmethod
def assistant_message(cls, content: Optional[str] = None) -> "Message":
"""Create an assistant message"""
return cls(role="assistant", content=content)
@classmethod
def tool_message(cls, content: str, name, tool_call_id: str) -> "Message":
"""Create a tool message"""
return cls(role="tool", content=content, name=name, tool_call_id=tool_call_id)
@classmethod
def from_tool_calls(
cls, tool_calls: List[Any], content: Union[str, List[str]] = "", **kwargs
):
"""Create ToolCallsMessage from raw tool calls.
Args:
tool_calls: Raw tool calls from LLM
content: Optional message content
"""
formatted_calls = [
{"id": call.id, "function": call.function.model_dump(), "type": "function"}
for call in tool_calls
]
return cls(
role="assistant", content=content, tool_calls=formatted_calls, **kwargs
)
class Memory(BaseModel):
messages: List[Message] = Field(default_factory=list)
max_messages: int = Field(default=100)
def add_message(self, message: Message) -> None:
"""Add a message to memory"""
self.messages.append(message)
# Optional: Implement message limit
if len(self.messages) > self.max_messages:
self.messages = self.messages[-self.max_messages :]
def add_messages(self, messages: List[Message]) -> None:
"""Add multiple messages to memory"""
self.messages.extend(messages)
def clear(self) -> None:
"""Clear all messages"""
self.messages.clear()
def get_recent_messages(self, n: int) -> List[Message]:
"""Get n most recent messages"""
return self.messages[-n:]
def to_dict_list(self) -> List[dict]:
"""Convert messages to list of dicts"""
return [msg.to_dict() for msg in self.messages]
from app.tool.base import BaseTool
from app.tool.bash import Bash
from app.tool.create_chat_completion import CreateChatCompletion
from app.tool.planning import PlanningTool
from app.tool.str_replace_editor import StrReplaceEditor
from app.tool.terminate import Terminate
from app.tool.tool_collection import ToolCollection
__all__ = [
"BaseTool",
"Bash",
"Terminate",
"StrReplaceEditor",
"ToolCollection",
"CreateChatCompletion",
"PlanningTool",
]
import asyncio
from typing import List
from baidusearch.baidusearch import search
from app.tool.base import BaseTool
class BaiduSearch(BaseTool):
name: str = "baidu_search"
description: str = """Perform a Baidu search and return a list of relevant links.
Use this tool when you need to find information on the web, get up-to-date data, or research specific topics.
The tool returns a list of URLs that match the search query.
"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(required) The search query to submit to Baidu.",
},
"num_results": {
"type": "integer",
"description": "(optional) The number of search results to return. Default is 10.",
"default": 10,
},
},
"required": ["query"],
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
Execute a Baidu search and return a list of URLs.
Args:
query (str): The search query to submit to Baidu.
num_results (int, optional): The number of search results to return. Default is 10.
Returns:
List[str]: A list of URLs matching the search query.
"""
# Run the search in a thread pool to prevent blocking
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(
None, lambda: [result['url'] for result in search(query, num_results=num_results)]
)
return links
if __name__ == '__main__':
result = search("测试", num_results=10)
print(result)
from abc import ABC, abstractmethod
from typing import Any, Dict, Optional
from pydantic import BaseModel, Field
class BaseTool(ABC, BaseModel):
name: str
description: str
parameters: Optional[dict] = None
class Config:
arbitrary_types_allowed = True
async def __call__(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
return await self.execute(**kwargs)
@abstractmethod
async def execute(self, **kwargs) -> Any:
"""Execute the tool with given parameters."""
def to_param(self) -> Dict:
"""Convert tool to function call format."""
return {
"type": "function",
"function": {
"name": self.name,
"description": self.description,
"parameters": self.parameters,
},
}
class ToolResult(BaseModel):
"""Represents the result of a tool execution."""
output: Any = Field(default=None)
error: Optional[str] = Field(default=None)
system: Optional[str] = Field(default=None)
class Config:
arbitrary_types_allowed = True
def __bool__(self):
return any(getattr(self, field) for field in self.__fields__)
def __add__(self, other: "ToolResult"):
def combine_fields(
field: Optional[str], other_field: Optional[str], concatenate: bool = True
):
if field and other_field:
if concatenate:
return field + other_field
raise ValueError("Cannot combine tool results")
return field or other_field
return ToolResult(
output=combine_fields(self.output, other.output),
error=combine_fields(self.error, other.error),
system=combine_fields(self.system, other.system),
)
def __str__(self):
return f"Error: {self.error}" if self.error else self.output
def replace(self, **kwargs):
"""Returns a new ToolResult with the given fields replaced."""
# return self.copy(update=kwargs)
return type(self)(**{**self.dict(), **kwargs})
class CLIResult(ToolResult):
"""A ToolResult that can be rendered as a CLI output."""
class ToolFailure(ToolResult):
"""A ToolResult that represents a failure."""
class AgentAwareTool:
agent: Optional = None
import asyncio
import os
from typing import Optional
from app.exceptions import ToolError
from app.tool.base import BaseTool, CLIResult, ToolResult
_BASH_DESCRIPTION = """Execute a bash command in the terminal.
* Long running commands: For commands that may run indefinitely, it should be run in the background and the output should be redirected to a file, e.g. command = `python3 app.py > server.log 2>&1 &`.
* Interactive: If a bash command returns exit code `-1`, this means the process is not yet finished. The assistant must then send a second call to terminal with an empty `command` (which will retrieve any additional logs), or it can send additional text (set `command` to the text) to STDIN of the running process, or it can send command=`ctrl+c` to interrupt the process.
* Timeout: If a command execution result says "Command timed out. Sending SIGINT to the process", the assistant should retry running the command in the background.
"""
class _BashSession:
"""A session of a bash shell."""
_started: bool
_process: asyncio.subprocess.Process
command: str = "/bin/bash"
_output_delay: float = 0.2 # seconds
_timeout: float = 120.0 # seconds
_sentinel: str = "<<exit>>"
def __init__(self):
self._started = False
self._timed_out = False
async def start(self):
if self._started:
return
self._process = await asyncio.create_subprocess_shell(
self.command,
preexec_fn=os.setsid,
shell=True,
bufsize=0,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
self._started = True
def stop(self):
"""Terminate the bash shell."""
if not self._started:
raise ToolError("Session has not started.")
if self._process.returncode is not None:
return
self._process.terminate()
async def run(self, command: str):
"""Execute a command in the bash shell."""
if not self._started:
raise ToolError("Session has not started.")
if self._process.returncode is not None:
return ToolResult(
system="tool must be restarted",
error=f"bash has exited with returncode {self._process.returncode}",
)
if self._timed_out:
raise ToolError(
f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
)
# we know these are not None because we created the process with PIPEs
assert self._process.stdin
assert self._process.stdout
assert self._process.stderr
# send command to the process
self._process.stdin.write(
command.encode() + f"; echo '{self._sentinel}'\n".encode()
)
await self._process.stdin.drain()
# read output from the process, until the sentinel is found
try:
async with asyncio.timeout(self._timeout):
while True:
await asyncio.sleep(self._output_delay)
# if we read directly from stdout/stderr, it will wait forever for
# EOF. use the StreamReader buffer directly instead.
output = (
self._process.stdout._buffer.decode()
) # pyright: ignore[reportAttributeAccessIssue]
if self._sentinel in output:
# strip the sentinel and break
output = output[: output.index(self._sentinel)]
break
except asyncio.TimeoutError:
self._timed_out = True
raise ToolError(
f"timed out: bash has not returned in {self._timeout} seconds and must be restarted",
) from None
if output.endswith("\n"):
output = output[:-1]
error = (
self._process.stderr._buffer.decode()
) # pyright: ignore[reportAttributeAccessIssue]
if error.endswith("\n"):
error = error[:-1]
# clear the buffers so that the next output can be read correctly
self._process.stdout._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
self._process.stderr._buffer.clear() # pyright: ignore[reportAttributeAccessIssue]
return CLIResult(output=output, error=error)
class Bash(BaseTool):
"""A tool for executing bash commands"""
name: str = "bash"
description: str = _BASH_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"command": {
"type": "string",
"description": "The bash command to execute. Can be empty to view additional logs when previous exit code is `-1`. Can be `ctrl+c` to interrupt the currently running process.",
},
},
"required": ["command"],
}
_session: Optional[_BashSession] = None
async def execute(
self, command: str | None = None, restart: bool = False, **kwargs
) -> CLIResult:
if restart:
if self._session:
self._session.stop()
self._session = _BashSession()
await self._session.start()
return ToolResult(system="tool has been restarted.")
if self._session is None:
self._session = _BashSession()
await self._session.start()
if command is not None:
return await self._session.run(command)
raise ToolError("no command provided.")
if __name__ == "__main__":
bash = Bash()
rst = asyncio.run(bash.execute("ls -l"))
print(rst)
import asyncio
from typing import List
from urllib.parse import quote
import requests
from bs4 import BeautifulSoup
from app.tool.base import BaseTool
class BingSearch(BaseTool):
name: str = "bing_search"
description: str = """执行必应搜索并返回相关链接列表。
当需要获取国际信息或英文内容时建议使用此工具。
工具返回与搜索查询匹配的URL列表。"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(必填) 提交给必应的搜索关键词"
},
"num_results": {
"type": "integer",
"description": "(可选) 返回的搜索结果数量,默认10",
"default": 10
}
},
"required": ["query"]
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
执行必应搜索并返回URL列表
Args:
query: 搜索关键词
num_results: 返回结果数量
Returns:
匹配搜索结果的URL列表
"""
def sync_search():
headers = {
'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
'Accept-Language': 'en-US,en;q=0.9'
}
url = f'https://www.bing.com/search?q={quote(query)}'
links = []
for page in range(0, num_results // 10 + 1):
resp = requests.get(
f'{url}&first={page * 10}',
headers=headers,
timeout=30
)
soup = BeautifulSoup(resp.text, 'html.parser')
for result in soup.select('.b_algo'):
link = result.find('a', href=True)
if link and 'href' in link.attrs:
links.append(link['href'])
if len(links) >= num_results:
return links
rst = links[:num_results]
return rst
loop = asyncio.get_event_loop()
return await loop.run_in_executor(None, sync_search)
import asyncio
import aiohttp
from bs4 import BeautifulSoup
from typing import List
from app.tool.base import BaseTool
class BingSearch(BaseTool):
name: str = "bing_search"
description: str = """使用必应搜索返回相关链接列表。
当需要查找网络信息、获取最新数据或研究特定主题时使用此工具。
该工具返回与搜索查询匹配的URL列表。
"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(必填) 提交给必应的搜索查询。",
},
"num_results": {
"type": "integer",
"description": "(可选) 要返回的搜索结果数量。默认为10。",
"default": 10,
},
},
"required": ["query"],
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
执行必应搜索并返回URL列表。
参数:
query (str): 要提交给必应的搜索查询。
num_results (int, optional): 要返回的搜索结果数量。默认为10。
返回:
List[str]: 与搜索查询匹配的URL列表。
"""
headers = {
"User-Agent": "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/120.0.0.0 Safari/537.36",
"Accept-Language": "en-US,en;q=0.9,zh-CN;q=0.8,zh;q=0.7"
}
search_url = f"https://www.cn.bing.com/search?q={query}"
async with aiohttp.ClientSession() as session:
try:
async with session.get(search_url, headers=headers) as response:
response.raise_for_status()
html = await response.text()
except Exception as e:
raise RuntimeError(f"必应搜索请求失败: {str(e)}")
soup = BeautifulSoup(html, 'html.parser')
links = []
# 必应搜索结果链接通常在类名为"b_algo"的div内,具体选择器可能需要根据实际页面结构调整
for result in soup.select('.b_algo'):
a_tag = result.select_one('a')
if a_tag and 'href' in a_tag.attrs:
link = a_tag['href']
links.append(link)
if len(links) >= num_results:
break
return links[:num_results]
import asyncio
import json
from typing import Optional
from browser_use import Browser as BrowserUseBrowser
from browser_use import BrowserConfig
from browser_use.browser.context import BrowserContext
from browser_use.dom.service import DomService
from pydantic import Field, field_validator
from pydantic_core.core_schema import ValidationInfo
from app.tool.base import BaseTool, ToolResult
_BROWSER_DESCRIPTION = """
Interact with a web browser to perform various actions such as navigation, element interaction,
content extraction, and tab management. Supported actions include:
- 'navigate': Go to a specific URL
- 'click': Click an element by index
- 'input_text': Input text into an element
- 'screenshot': Capture a screenshot
- 'get_html': Get page HTML content
- 'execute_js': Execute JavaScript code
- 'scroll': Scroll the page
- 'switch_tab': Switch to a specific tab
- 'new_tab': Open a new tab
- 'close_tab': Close the current tab
- 'refresh': Refresh the current page
"""
class BrowserUseTool(BaseTool):
name: str = "browser_use"
description: str = _BROWSER_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"action": {
"type": "string",
"enum": [
"navigate",
"click",
"input_text",
"screenshot",
"get_html",
"execute_js",
"scroll",
"switch_tab",
"new_tab",
"close_tab",
"refresh",
],
"description": "The browser action to perform",
},
"url": {
"type": "string",
"description": "URL for 'navigate' or 'new_tab' actions",
},
"index": {
"type": "integer",
"description": "Element index for 'click' or 'input_text' actions",
},
"text": {"type": "string", "description": "Text for 'input_text' action"},
"script": {
"type": "string",
"description": "JavaScript code for 'execute_js' action",
},
"scroll_amount": {
"type": "integer",
"description": "Pixels to scroll (positive for down, negative for up) for 'scroll' action",
},
"tab_id": {
"type": "integer",
"description": "Tab ID for 'switch_tab' action",
},
},
"required": ["action"],
"dependencies": {
"navigate": ["url"],
"click": ["index"],
"input_text": ["index", "text"],
"execute_js": ["script"],
"switch_tab": ["tab_id"],
"new_tab": ["url"],
"scroll": ["scroll_amount"],
},
}
lock: asyncio.Lock = Field(default_factory=asyncio.Lock)
browser: Optional[BrowserUseBrowser] = Field(default=None, exclude=True)
context: Optional[BrowserContext] = Field(default=None, exclude=True)
dom_service: Optional[DomService] = Field(default=None, exclude=True)
@field_validator("parameters", mode="before")
def validate_parameters(cls, v: dict, info: ValidationInfo) -> dict:
if not v:
raise ValueError("Parameters cannot be empty")
return v
async def _ensure_browser_initialized(self) -> BrowserContext:
"""Ensure browser and context are initialized."""
if self.browser is None:
self.browser = BrowserUseBrowser(BrowserConfig(headless=True))
if self.context is None:
self.context = await self.browser.new_context()
self.dom_service = DomService(await self.context.get_current_page())
return self.context
async def execute(
self,
action: str,
url: Optional[str] = None,
index: Optional[int] = None,
text: Optional[str] = None,
script: Optional[str] = None,
scroll_amount: Optional[int] = None,
tab_id: Optional[int] = None,
**kwargs,
) -> ToolResult:
"""
Execute a specified browser action.
Args:
action: The browser action to perform
url: URL for navigation or new tab
index: Element index for click or input actions
text: Text for input action
script: JavaScript code for execution
scroll_amount: Pixels to scroll for scroll action
tab_id: Tab ID for switch_tab action
**kwargs: Additional arguments
Returns:
ToolResult with the action's output or error
"""
async with self.lock:
try:
context = await self._ensure_browser_initialized()
if action == "navigate":
if not url:
return ToolResult(error="URL is required for 'navigate' action")
await context.navigate_to(url)
return ToolResult(output=f"Navigated to {url}")
elif action == "click":
if index is None:
return ToolResult(error="Index is required for 'click' action")
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
download_path = await context._click_element_node(element)
output = f"Clicked element at index {index}"
if download_path:
output += f" - Downloaded file to {download_path}"
return ToolResult(output=output)
elif action == "input_text":
if index is None or not text:
return ToolResult(
error="Index and text are required for 'input_text' action"
)
element = await context.get_dom_element_by_index(index)
if not element:
return ToolResult(error=f"Element with index {index} not found")
await context._input_text_element_node(element, text)
return ToolResult(
output=f"Input '{text}' into element at index {index}"
)
elif action == "screenshot":
screenshot = await context.take_screenshot(full_page=True)
return ToolResult(
output=f"Screenshot captured (base64 length: {len(screenshot)})",
system=screenshot,
)
elif action == "get_html":
html = await context.get_page_html()
truncated = html[:2000] + "..." if len(html) > 2000 else html
return ToolResult(output=truncated)
elif action == "execute_js":
if not script:
return ToolResult(
error="Script is required for 'execute_js' action"
)
result = await context.execute_javascript(script)
return ToolResult(output=str(result))
elif action == "scroll":
if scroll_amount is None:
return ToolResult(
error="Scroll amount is required for 'scroll' action"
)
await context.execute_javascript(
f"window.scrollBy(0, {scroll_amount});"
)
direction = "down" if scroll_amount > 0 else "up"
return ToolResult(
output=f"Scrolled {direction} by {abs(scroll_amount)} pixels"
)
elif action == "switch_tab":
if tab_id is None:
return ToolResult(
error="Tab ID is required for 'switch_tab' action"
)
await context.switch_to_tab(tab_id)
return ToolResult(output=f"Switched to tab {tab_id}")
elif action == "new_tab":
if not url:
return ToolResult(error="URL is required for 'new_tab' action")
await context.create_new_tab(url)
return ToolResult(output=f"Opened new tab with URL {url}")
elif action == "close_tab":
await context.close_current_tab()
return ToolResult(output="Closed current tab")
elif action == "refresh":
await context.refresh_page()
return ToolResult(output="Refreshed current page")
else:
return ToolResult(error=f"Unknown action: {action}")
except Exception as e:
return ToolResult(error=f"Browser action '{action}' failed: {str(e)}")
async def get_current_state(self) -> ToolResult:
"""Get the current browser state as a ToolResult."""
async with self.lock:
try:
context = await self._ensure_browser_initialized()
state = await context.get_state()
state_info = {
"url": state.url,
"title": state.title,
"tabs": [tab.model_dump() for tab in state.tabs],
"interactive_elements": state.element_tree.clickable_elements_to_string(),
}
return ToolResult(output=json.dumps(state_info))
except Exception as e:
return ToolResult(error=f"Failed to get browser state: {str(e)}")
async def cleanup(self):
"""Clean up browser resources."""
async with self.lock:
if self.context is not None:
await self.context.close()
self.context = None
self.dom_service = None
if self.browser is not None:
await self.browser.close()
self.browser = None
def __del__(self):
"""Ensure cleanup when object is destroyed."""
if self.browser is not None or self.context is not None:
try:
asyncio.run(self.cleanup())
except RuntimeError:
loop = asyncio.new_event_loop()
loop.run_until_complete(self.cleanup())
loop.close()
from typing import Any, List, Optional, Type, Union, get_args, get_origin
from pydantic import BaseModel, Field
from app.tool import BaseTool
class CreateChatCompletion(BaseTool):
name: str = "create_chat_completion"
description: str = (
"Creates a structured completion with specified output formatting."
)
# Type mapping for JSON schema
type_mapping: dict = {
str: "string",
int: "integer",
float: "number",
bool: "boolean",
dict: "object",
list: "array",
}
response_type: Optional[Type] = None
required: List[str] = Field(default_factory=lambda: ["response"])
def __init__(self, response_type: Optional[Type] = str):
"""Initialize with a specific response type."""
super().__init__()
self.response_type = response_type
self.parameters = self._build_parameters()
def _build_parameters(self) -> dict:
"""Build parameters schema based on response type."""
if self.response_type == str:
return {
"type": "object",
"properties": {
"response": {
"type": "string",
"description": "The response text that should be delivered to the user.",
},
},
"required": self.required,
}
if isinstance(self.response_type, type) and issubclass(
self.response_type, BaseModel
):
schema = self.response_type.model_json_schema()
return {
"type": "object",
"properties": schema["properties"],
"required": schema.get("required", self.required),
}
return self._create_type_schema(self.response_type)
def _create_type_schema(self, type_hint: Type) -> dict:
"""Create a JSON schema for the given type."""
origin = get_origin(type_hint)
args = get_args(type_hint)
# Handle primitive types
if origin is None:
return {
"type": "object",
"properties": {
"response": {
"type": self.type_mapping.get(type_hint, "string"),
"description": f"Response of type {type_hint.__name__}",
}
},
"required": self.required,
}
# Handle List type
if origin is list:
item_type = args[0] if args else Any
return {
"type": "object",
"properties": {
"response": {
"type": "array",
"items": self._get_type_info(item_type),
}
},
"required": self.required,
}
# Handle Dict type
if origin is dict:
value_type = args[1] if len(args) > 1 else Any
return {
"type": "object",
"properties": {
"response": {
"type": "object",
"additionalProperties": self._get_type_info(value_type),
}
},
"required": self.required,
}
# Handle Union type
if origin is Union:
return self._create_union_schema(args)
return self._build_parameters()
def _get_type_info(self, type_hint: Type) -> dict:
"""Get type information for a single type."""
if isinstance(type_hint, type) and issubclass(type_hint, BaseModel):
return type_hint.model_json_schema()
return {
"type": self.type_mapping.get(type_hint, "string"),
"description": f"Value of type {getattr(type_hint, '__name__', 'any')}",
}
def _create_union_schema(self, types: tuple) -> dict:
"""Create schema for Union types."""
return {
"type": "object",
"properties": {
"response": {"anyOf": [self._get_type_info(t) for t in types]}
},
"required": self.required,
}
async def execute(self, required: list | None = None, **kwargs) -> Any:
"""Execute the chat completion with type conversion.
Args:
required: List of required field names or None
**kwargs: Response data
Returns:
Converted response based on response_type
"""
required = required or self.required
# Handle case when required is a list
if isinstance(required, list) and len(required) > 0:
if len(required) == 1:
required_field = required[0]
result = kwargs.get(required_field, "")
else:
# Return multiple fields as a dictionary
return {field: kwargs.get(field, "") for field in required}
else:
required_field = "response"
result = kwargs.get(required_field, "")
# Type conversion logic
if self.response_type == str:
return result
if isinstance(self.response_type, type) and issubclass(
self.response_type, BaseModel
):
return self.response_type(**kwargs)
if get_origin(self.response_type) in (list, dict):
return result # Assuming result is already in correct format
try:
return self.response_type(result)
except (ValueError, TypeError):
return result
import asyncio
import os
import aiofiles
from app.tool.base import BaseTool
class FileSaver(BaseTool):
name: str = "file_saver"
description: str = """Save content to a local file at a specified path.
Use this tool when you need to save text, code, or generated content to a file on the local filesystem.
The tool accepts content and a file path, and saves the content to that location.
"""
parameters: dict = {
"type": "object",
"properties": {
"content": {
"type": "string",
"description": "(required) The content to save to the file.",
},
"file_path": {
"type": "string",
"description": "(required) The path where the file should be saved, including filename and extension.",
},
"mode": {
"type": "string",
"description": "(optional) The file opening mode. Default is 'w' for write. Use 'a' for append.",
"enum": ["w", "a"],
"default": "w",
},
},
"required": ["content", "file_path"],
}
async def execute(self, content: str, file_path: str, mode: str = "w") -> str:
"""
Save content to a file at the specified path.
Args:
content (str): The content to save to the file.
file_path (str): The path where the file should be saved.
mode (str, optional): The file opening mode. Default is 'w' for write. Use 'a' for append.
Returns:
str: A message indicating the result of the operation.
"""
try:
# Ensure the directory exists
directory = os.path.dirname(file_path)
if directory and not os.path.exists(directory):
os.makedirs(directory)
# Write directly to the file
async with aiofiles.open(file_path, mode, encoding="utf-8") as file:
await file.write(content)
return f"Content successfully saved to {file_path}"
except Exception as e:
return f"Error saving file: {str(e)}"
import asyncio
from typing import List
from googlesearch import search
from app.tool.base import BaseTool
class GoogleSearch(BaseTool):
name: str = "google_search"
description: str = """Perform a Google search and return a list of relevant links.
Use this tool when you need to find information on the web, get up-to-date data, or research specific topics.
The tool returns a list of URLs that match the search query.
"""
parameters: dict = {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "(required) The search query to submit to Google.",
},
"num_results": {
"type": "integer",
"description": "(optional) The number of search results to return. Default is 10.",
"default": 10,
},
},
"required": ["query"],
}
async def execute(self, query: str, num_results: int = 10) -> List[str]:
"""
Execute a Google search and return a list of URLs.
Args:
query (str): The search query to submit to Google.
num_results (int, optional): The number of search results to return. Default is 10.
Returns:
List[str]: A list of URLs matching the search query.
"""
# Run the search in a thread pool to prevent blocking
loop = asyncio.get_event_loop()
links = await loop.run_in_executor(
None, lambda: list(search(query, num_results=num_results))
)
return links
# tool/planning.py
from typing import Dict, List, Literal, Optional
from app.exceptions import ToolError
from app.tool.base import BaseTool, ToolResult
_PLANNING_TOOL_DESCRIPTION = """
A planning tool that allows the agent to create and manage plans for solving complex tasks.
The tool provides functionality for creating plans, updating plan steps, and tracking progress.
"""
class PlanningTool(BaseTool):
"""
A planning tool that allows the agent to create and manage plans for solving complex tasks.
The tool provides functionality for creating plans, updating plan steps, and tracking progress.
"""
name: str = "planning"
description: str = _PLANNING_TOOL_DESCRIPTION
parameters: dict = {
"type": "object",
"properties": {
"command": {
"description": "The command to execute. Available commands: create, update, list, get, set_active, mark_step, delete.",
"enum": [
"create",
"update",
"list",
"get",
"set_active",
"mark_step",
"delete",
],
"type": "string",
},
"plan_id": {
"description": "Unique identifier for the plan. Required for create, update, set_active, and delete commands. Optional for get and mark_step (uses active plan if not specified).",
"type": "string",
},
"title": {
"description": "Title for the plan. Required for create command, optional for update command.",
"type": "string",
},
"steps": {
"description": "List of plan steps. Required for create command, optional for update command.",
"type": "array",
"items": {"type": "string"},
},
"step_index": {
"description": "Index of the step to update (0-based). Required for mark_step command.",
"type": "integer",
},
"step_status": {
"description": "Status to set for a step. Used with mark_step command.",
"enum": ["not_started", "in_progress", "completed", "blocked"],
"type": "string",
},
"step_notes": {
"description": "Additional notes for a step. Optional for mark_step command.",
"type": "string",
},
},
"required": ["command"],
"additionalProperties": False,
}
plans: dict = {} # Dictionary to store plans by plan_id
_current_plan_id: Optional[str] = None # Track the current active plan
async def execute(
self,
*,
command: Literal[
"create", "update", "list", "get", "set_active", "mark_step", "delete"
],
plan_id: Optional[str] = None,
title: Optional[str] = None,
steps: Optional[List[str]] = None,
step_index: Optional[int] = None,
step_status: Optional[
Literal["not_started", "in_progress", "completed", "blocked"]
] = None,
step_notes: Optional[str] = None,
**kwargs,
):
"""
Execute the planning tool with the given command and parameters.
Parameters:
- command: The operation to perform
- plan_id: Unique identifier for the plan
- title: Title for the plan (used with create command)
- steps: List of steps for the plan (used with create command)
- step_index: Index of the step to update (used with mark_step command)
- step_status: Status to set for a step (used with mark_step command)
- step_notes: Additional notes for a step (used with mark_step command)
"""
if command == "create":
return self._create_plan(plan_id, title, steps)
elif command == "update":
return self._update_plan(plan_id, title, steps)
elif command == "list":
return self._list_plans()
elif command == "get":
return self._get_plan(plan_id)
elif command == "set_active":
return self._set_active_plan(plan_id)
elif command == "mark_step":
return self._mark_step(plan_id, step_index, step_status, step_notes)
elif command == "delete":
return self._delete_plan(plan_id)
else:
raise ToolError(
f"Unrecognized command: {command}. Allowed commands are: create, update, list, get, set_active, mark_step, delete"
)
def _create_plan(
self, plan_id: Optional[str], title: Optional[str], steps: Optional[List[str]]
) -> ToolResult:
"""Create a new plan with the given ID, title, and steps."""
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: create")
if plan_id in self.plans:
raise ToolError(
f"A plan with ID '{plan_id}' already exists. Use 'update' to modify existing plans."
)
if not title:
raise ToolError("Parameter `title` is required for command: create")
if (
not steps
or not isinstance(steps, list)
or not all(isinstance(step, str) for step in steps)
):
raise ToolError(
"Parameter `steps` must be a non-empty list of strings for command: create"
)
# Create a new plan with initialized step statuses
plan = {
"plan_id": plan_id,
"title": title,
"steps": steps,
"step_statuses": ["not_started"] * len(steps),
"step_notes": [""] * len(steps),
}
self.plans[plan_id] = plan
self._current_plan_id = plan_id # Set as active plan
return ToolResult(
output=f"Plan created successfully with ID: {plan_id}\n\n{self._format_plan(plan)}"
)
def _update_plan(
self, plan_id: Optional[str], title: Optional[str], steps: Optional[List[str]]
) -> ToolResult:
"""Update an existing plan with new title or steps."""
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: update")
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
plan = self.plans[plan_id]
if title:
plan["title"] = title
if steps:
if not isinstance(steps, list) or not all(
isinstance(step, str) for step in steps
):
raise ToolError(
"Parameter `steps` must be a list of strings for command: update"
)
# Preserve existing step statuses for unchanged steps
old_steps = plan["steps"]
old_statuses = plan["step_statuses"]
old_notes = plan["step_notes"]
# Create new step statuses and notes
new_statuses = []
new_notes = []
for i, step in enumerate(steps):
# If the step exists at the same position in old steps, preserve status and notes
if i < len(old_steps) and step == old_steps[i]:
new_statuses.append(old_statuses[i])
new_notes.append(old_notes[i])
else:
new_statuses.append("not_started")
new_notes.append("")
plan["steps"] = steps
plan["step_statuses"] = new_statuses
plan["step_notes"] = new_notes
return ToolResult(
output=f"Plan updated successfully: {plan_id}\n\n{self._format_plan(plan)}"
)
def _list_plans(self) -> ToolResult:
"""List all available plans."""
if not self.plans:
return ToolResult(
output="No plans available. Create a plan with the 'create' command."
)
output = "Available plans:\n"
for plan_id, plan in self.plans.items():
current_marker = " (active)" if plan_id == self._current_plan_id else ""
completed = sum(
1 for status in plan["step_statuses"] if status == "completed"
)
total = len(plan["steps"])
progress = f"{completed}/{total} steps completed"
output += f"• {plan_id}{current_marker}: {plan['title']} - {progress}\n"
return ToolResult(output=output)
def _get_plan(self, plan_id: Optional[str]) -> ToolResult:
"""Get details of a specific plan."""
if not plan_id:
# If no plan_id is provided, use the current active plan
if not self._current_plan_id:
raise ToolError(
"No active plan. Please specify a plan_id or set an active plan."
)
plan_id = self._current_plan_id
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
plan = self.plans[plan_id]
return ToolResult(output=self._format_plan(plan))
def _set_active_plan(self, plan_id: Optional[str]) -> ToolResult:
"""Set a plan as the active plan."""
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: set_active")
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
self._current_plan_id = plan_id
return ToolResult(
output=f"Plan '{plan_id}' is now the active plan.\n\n{self._format_plan(self.plans[plan_id])}"
)
def _mark_step(
self,
plan_id: Optional[str],
step_index: Optional[int],
step_status: Optional[str],
step_notes: Optional[str],
) -> ToolResult:
"""Mark a step with a specific status and optional notes."""
if not plan_id:
# If no plan_id is provided, use the current active plan
if not self._current_plan_id:
raise ToolError(
"No active plan. Please specify a plan_id or set an active plan."
)
plan_id = self._current_plan_id
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
if step_index is None:
raise ToolError("Parameter `step_index` is required for command: mark_step")
plan = self.plans[plan_id]
if step_index < 0 or step_index >= len(plan["steps"]):
raise ToolError(
f"Invalid step_index: {step_index}. Valid indices range from 0 to {len(plan['steps'])-1}."
)
if step_status and step_status not in [
"not_started",
"in_progress",
"completed",
"blocked",
]:
raise ToolError(
f"Invalid step_status: {step_status}. Valid statuses are: not_started, in_progress, completed, blocked"
)
if step_status:
plan["step_statuses"][step_index] = step_status
if step_notes:
plan["step_notes"][step_index] = step_notes
return ToolResult(
output=f"Step {step_index} updated in plan '{plan_id}'.\n\n{self._format_plan(plan)}"
)
def _delete_plan(self, plan_id: Optional[str]) -> ToolResult:
"""Delete a plan."""
if not plan_id:
raise ToolError("Parameter `plan_id` is required for command: delete")
if plan_id not in self.plans:
raise ToolError(f"No plan found with ID: {plan_id}")
del self.plans[plan_id]
# If the deleted plan was the active plan, clear the active plan
if self._current_plan_id == plan_id:
self._current_plan_id = None
return ToolResult(output=f"Plan '{plan_id}' has been deleted.")
def _format_plan(self, plan: Dict) -> str:
"""Format a plan for display."""
output = f"Plan: {plan['title']} (ID: {plan['plan_id']})\n"
output += "=" * len(output) + "\n\n"
# Calculate progress statistics
total_steps = len(plan["steps"])
completed = sum(1 for status in plan["step_statuses"] if status == "completed")
in_progress = sum(
1 for status in plan["step_statuses"] if status == "in_progress"
)
blocked = sum(1 for status in plan["step_statuses"] if status == "blocked")
not_started = sum(
1 for status in plan["step_statuses"] if status == "not_started"
)
output += f"Progress: {completed}/{total_steps} steps completed "
if total_steps > 0:
percentage = (completed / total_steps) * 100
output += f"({percentage:.1f}%)\n"
else:
output += "(0%)\n"
output += f"Status: {completed} completed, {in_progress} in progress, {blocked} blocked, {not_started} not started\n\n"
output += "Steps:\n"
# Add each step with its status and notes
for i, (step, status, notes) in enumerate(
zip(plan["steps"], plan["step_statuses"], plan["step_notes"])
):
status_symbol = {
"not_started": "[ ]",
"in_progress": "[→]",
"completed": "[✓]",
"blocked": "[!]",
}.get(status, "[ ]")
output += f"{i}. {status_symbol} {step}\n"
if notes:
output += f" Notes: {notes}\n"
return output
import threading
from typing import Dict
from app.tool.base import BaseTool
class PythonExecute(BaseTool):
"""A tool for executing Python code with timeout and safety restrictions."""
name: str = "python_execute"
description: str = "Executes Python code string. Note: Only print outputs are visible, function return values are not captured. Use print statements to see results."
parameters: dict = {
"type": "object",
"properties": {
"code": {
"type": "string",
"description": "The Python code to execute.",
},
},
"required": ["code"],
}
async def execute(
self,
code: str,
timeout: int = 5,
) -> Dict:
"""
Executes the provided Python code with a timeout.
Args:
code (str): The Python code to execute.
timeout (int): Execution timeout in seconds.
Returns:
Dict: Contains 'output' with execution output or error message and 'success' status.
"""
result = {"observation": ""}
def run_code():
try:
safe_globals = {"__builtins__": dict(__builtins__)}
import sys
from io import StringIO
output_buffer = StringIO()
sys.stdout = output_buffer
exec(code, safe_globals, {})
sys.stdout = sys.__stdout__
result["observation"] = output_buffer.getvalue()
except Exception as e:
result["observation"] = str(e)
result["success"] = False
thread = threading.Thread(target=run_code)
thread.start()
thread.join(timeout)
if thread.is_alive():
return {
"observation": f"Execution timeout after {timeout} seconds",
"success": False,
}
return result
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment