Commit 2ef7f9de authored by AdrianWang's avatar AdrianWang
Browse files

feat(mcp): 添加mineru的mcp-server

parent b1545466
MINERU_API_BASE = "https://mineru.net"
MINERU_API_KEY = "eyJ0eXB..."
OUTPUT_DIR=./downloads
USE_LOCAL_API=false
LOCAL_MINERU_API_BASE="http://localhost:8888"
\ No newline at end of file
downloads
.env
uv.lock
.venv
src/mineru/__pycache__
dist
.DS_Store
.cursor
build
*.lock
src/mineru_mcp.egg-info
test
\ No newline at end of file
# MinerU MCP-Server Docker 部署指南
## 1. 简介
本文档提供了使用 Docker 部署 MinerU MCP-Server 的详细指南。通过 Docker 部署,你可以在任何支持 Docker 的环境中快速启动 MinerU MCP 服务器,无需考虑复杂的环境配置和依赖管理。
Docker 部署的主要优势:
- **一致的运行环境**:确保在任何平台上都有相同的运行环境
- **简化部署流程**:一键启动,无需手动安装依赖
- **易于扩展和迁移**:便于在不同环境间迁移和扩展服务
- **资源隔离**:避免与宿主机其他服务产生冲突
## 2. 先决条件
在开始之前,请确保你的系统已安装以下软件:
- [Docker](https://www.docker.com/get-started) (19.03 或更高版本)
- [Docker Compose](https://docs.docker.com/compose/install/) (1.27.0 或更高版本)
你可以通过以下命令检查它们是否已正确安装:
```bash
docker --version
docker-compose --version
```
同时,你需要:
-[MinerU 官网](https://mineru.net) 获取的 API 密钥(如果需要使用远程 API)
- 充足的硬盘空间,用于存储转换后的文件
## 3. 使用 Docker Compose 部署(推荐)
Docker Compose 提供了最简单的部署方式,特别适合快速开始使用或开发环境。
### 3.1 准备配置文件
1. 克隆仓库(如果尚未克隆):
```bash
git clone <repository-url>
cd mineru-mcp
```
2. 创建环境变量文件:
```bash
cp .env.example .env
```
3. 编辑 `.env` 文件,设置必要的环境变量:
```
MINERU_API_BASE=https://mineru.net
MINERU_API_KEY=你的API密钥
OUTPUT_DIR=./downloads
USE_LOCAL_API=false
LOCAL_MINERU_API_BASE=http://localhost:8080
```
如果你计划使用本地 API,请将 `USE_LOCAL_API` 设置为 `true`,并确保 `LOCAL_MINERU_API_BASE` 指向你的本地 API 服务地址。
### 3.2 启动服务
在项目根目录下运行:
```bash
docker-compose up -d
```
这将会:
- 构建 Docker 镜像(如果尚未构建)
- 创建并启动容器
- 在后台运行服务 (`-d` 参数)
服务将在 `http://localhost:8001` 上启动。你可以通过 MCP 客户端连接此地址。
### 3.3 查看日志
要查看服务日志,运行:
```bash
docker-compose logs -f
```
`Ctrl+C` 退出日志查看。
### 3.4 停止服务
要停止服务,运行:
```bash
docker-compose down
```
如果你想同时删除构建的镜像,可以使用:
```bash
docker-compose down --rmi local
```
## 4. 手动构建和运行 Docker 镜像
如果你需要更多的控制或自定义,你可以手动构建和运行 Docker 镜像。
### 4.1 构建镜像
在项目根目录下运行:
```bash
docker build -t mineru-mcp:latest .
```
这将根据 Dockerfile 构建一个名为 `mineru-mcp` 的 Docker 镜像,标签为 `latest`
### 4.2 运行容器
使用环境变量文件运行容器:
```bash
docker run -p 8001:8001 --env-file .env mineru-mcp:latest
```
或者直接指定环境变量:
```bash
docker run -p 8001:8001 \
-e MINERU_API_BASE=https://mineru.net \
-e MINERU_API_KEY=你的API密钥 \
-e OUTPUT_DIR=/app/downloads \
-v $(pwd)/downloads:/app/downloads \
mineru-mcp:latest
```
### 4.3 挂载卷
为了持久化存储转换后的文件,你应该挂载宿主机目录到容器的输出目录:
```bash
docker run -p 8001:8001 --env-file .env \
-v $(pwd)/downloads:/app/downloads \
mineru-mcp:latest
```
这将挂载当前工作目录下的 `downloads` 文件夹到容器内的 `/app/downloads` 目录。
## 5. 环境变量配置
Docker 环境中支持的环境变量与标准环境相同:
| 环境变量 | 说明 | 默认值 |
| ------------------------- | -------------------------------------------------------------- | ------------------------- |
| `MINERU_API_BASE` | MinerU 远程 API 的基础 URL | `https://mineru.net` |
| `MINERU_API_KEY` | MinerU API 密钥,需要从官网申请 | - |
| `OUTPUT_DIR` | 转换后文件的保存路径 | `/app/downloads` |
| `USE_LOCAL_API` | 是否使用本地 API 进行解析(仅适用于 `local_parse_pdf` 工具) | `false` |
| `LOCAL_MINERU_API_BASE` | 本地 API 的基础 URL(当 `USE_LOCAL_API=true` 时有效) | `http://localhost:8080` |
在 Docker 环境中,你可以:
- 通过 `--env-file` 指定环境变量文件
- 通过 `-e` 参数直接指定环境变量
-`docker-compose.yml` 文件中的 `environment` 部分配置环境变量
FROM python:3.12-slim
# Set working directory
WORKDIR /app
# Configure pip to use Alibaba Cloud mirror
RUN pip config set global.index-url https://mirrors.aliyun.com/pypi/simple/
# Install dependencies
RUN pip install --no-cache-dir poetry
# Copy project files
COPY pyproject.toml .
COPY README.md .
COPY src/ ./src/
# Install the package
RUN poetry config virtualenvs.create false && \
poetry install
# Create downloads directory
RUN mkdir -p /app/downloads
# Set environment variables
ENV OUTPUT_DIR=/app/downloads
# MINERU_API_KEY should be provided at runtime
ENV MINERU_API_BASE=https://mineru.net
ENV USE_LOCAL_API=false
ENV LOCAL_MINERU_API_BASE=""
# Expose the port that SSE will run on
EXPOSE 8001
# Set command to start the service with SSE transport
CMD ["mineru-mcp", "--transport", "sse", "--output-dir", "/app/downloads"]
\ No newline at end of file
# MinerU MCP-Server
## 1. 概述
这个项目提供了一个 **MinerU MCP 服务器** (`mineru-mcp`),它基于 **FastMCP** 框架构建。其主要功能是作为 **MinerU API** 的接口,用于将文档转换为 Markdown格式。
该服务器通过 MCP 协议公开了以下主要工具:
1. `parse_documents`:统一接口,支持处理本地文件和URL,自动根据配置选择最合适的处理方式,并自动读取转换后的内容
2. `get_ocr_languages`:获取OCR支持的语言列表
这使得其他应用程序或 MCP 客户端能够轻松地集成 MinerU 的 文档 到 Markdown 转换功能。
## 2. 核心功能
* **文档提取**: 接收文档文件输入(单个或多个 URL、单个或多个本地路径,支持doc、ppt、pdf、图片多种格式),调用 MinerU API 进行内容提取和格式转换,最终生成 Markdown 文件。
* **批量处理**: 支持同时处理多个文档文件(通过提供由空格、逗号或换行符分隔的 URL 列表或本地文件路径列表)。
* **OCR 支持**: 可选启用 OCR 功能(默认不开启),以处理扫描版或图片型文档。
* **多语言支持**: 支持多种语言的识别,可以自动检测文档语言或手动指定。
* **自动化流程**: 自动处理与 MinerU API 的交互,包括任务提交、状态轮询、结果下载解压、结果文件读取。
* **本地解析**: 支持调用本地部署的mineru模型直接解析文档,不依赖远程 API,适用于隐私敏感场景或离线环境。
* **智能路径处理**: 自动识别URL和本地文件路径,根据USE_LOCAL_API配置选择最合适的处理方式。
## 3. 安装
在开始安装之前,请确保您的系统满足以下基本要求:
* Python >= 3.10
### 3.1 使用 pip 安装 (推荐)
如果你的包已发布到 PyPI 或其他 Python 包索引,可以直接使用 pip 安装:
```bash
pip install mineru-mcp
```
这种方式适用于不需要修改源代码的普通用户。
### 3.2 从源码安装
如果你需要修改源代码或进行开发,可以从源码安装。
克隆仓库并进入项目目录:
```bash
git clone <repository-url> # 替换为你的仓库 URL
cd mineru-mcp
```
推荐使用 `uv``pip` 配合虚拟环境进行安装:
**使用 uv (推荐):**
```bash
# 安装 uv (如果尚未安装)
# pip install uv
# 创建并激活虚拟环境
uv venv
# Linux/macOS
source .venv/bin/activate
# Windows
# .venv\\Scripts\\activate
# 安装依赖和项目
uv pip install -e .
```
**使用 pip:**
```bash
# 创建并激活虚拟环境
python -m venv .venv
# Linux/macOS
source .venv/bin/activate
# Windows
# .venv\\Scripts\\activate
# 安装依赖和项目
pip install -e .
```
## 4. 环境变量配置
本项目支持通过环境变量进行配置。你可以选择直接设置系统环境变量,或者在项目根目录创建 `.env` 文件(参考 `.env.example` 模板)。
### 4.1 支持的环境变量
| 环境变量 | 说明 | 默认值 |
| ------------------------- | --------------------------------------------------------------- | ------------------------- |
| `MINERU_API_BASE` | MinerU 远程 API 的基础 URL | `https://mineru.net` |
| `MINERU_API_KEY` | MinerU API 密钥,需要从[官网](https://mineru.net)申请 | - |
| `OUTPUT_DIR` | 转换后文件的保存路径 | `./downloads` |
| `USE_LOCAL_API` | 是否使用本地 API 进行解析 | `false` |
| `LOCAL_MINERU_API_BASE` | 本地 API 的基础 URL(当 `USE_LOCAL_API=true` 时有效) | `http://localhost:8080` |
### 4.2 远程 API 与本地 API
本项目支持两种 API 模式:
* **远程 API**:默认模式,通过 MinerU 官方提供的云服务进行文档解析。优点是无需本地部署复杂的模型和环境,但需要网络连接和 API 密钥。
* **本地 API**:在本地部署 MinerU 引擎进行文档解析,适用于对数据隐私有高要求或需要离线使用的场景。设置 `USE_LOCAL_API=true` 时生效。
### 4.3 获取 API 密钥
要获取 `MINERU_API_KEY`,请访问 [MinerU 官网](https://mineru.net) 注册账号并申请 API 密钥。
## 5. 使用方法
### 5.1 工具概览
本项目通过 MCP 协议提供以下工具:
1. **parse_documents**:统一接口,支持处理本地文件和URL,根据 `USE_LOCAL_API` 配置自动选择合适的处理方式,并自动读取转换后的文件内容
2. **get_ocr_languages**:获取 OCR 支持的语言列表
### 5.2 参数说明
#### 5.2.1 parse_documents
| 参数 | 类型 | 说明 | 默认值 | 适用模式 |
| ------------------- | ------- | ------------------------------------------------------------------- | -------- | -------- |
| `file_sources` | 字符串 | 文件路径或URL,多个可用逗号或换行符分隔 (支持pdf、ppt、pptx、doc、docx以及图片格式jpg、jpeg、png) | - | 全部 |
| `enable_ocr` | 布尔值 | 是否启用 OCR 功能 | `false` | 全部 |
| `language` | 字符串 | 文档语言,默认"ch"中文,可选"en"英文等 | `ch` | 全部 |
| `page_ranges` | 字符串 (可选) | 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6":表示选取第2页、第4页至第6页;"2--2":表示从第2页一直选取到倒数第二页。(远程API) | `None` | 远程API |
> **注意**:
> - 当 `USE_LOCAL_API=true` 时,如果提供了URL,这些URL会被过滤掉,只处理本地文件路径
> - 当 `USE_LOCAL_API=false` 时,会同时处理URL和本地文件路径
#### 5.2.2 get_ocr_languages
无需参数
## 6. MCP 客户端集成
你可以在任何支持 MCP 协议的客户端中使用 MinerU MCP 服务器。
### 6.1 在 Claude 中使用
将 MinerU MCP 服务器配置为 Claude 的工具,即可在 Claude 中直接使用文档转 Markdown 功能。配置工具时详情请参考 MCP 工具配置文档。根据不同的安装和使用场景,你可以选择以下两种配置方式:
#### 6.1.1 源码运行方式
如果你是从源码安装并运行 MinerU MCP,可以使用以下配置。这种方式适合你需要修改源码或者进行开发调试的场景:
```json
{
"mcpServers": {
"mineru-mcp": {
"command": "uv",
"args": ["--directory", "/Users/adrianwang/Documents/minerU-mcp", "run", "-m", "mineru.cli"],
"env": {
"MINERU_API_BASE": "https://mineru.net",
"MINERU_API_KEY": "ey...",
"OUTPUT_DIR": "./downloads",
"USE_LOCAL_API": "true",
"LOCAL_MINERU_API_BASE": "http://localhost:8080"
}
}
}
}
```
这种配置的特点:
- 使用 `uv` 命令
- 通过 `--directory` 参数指定源码所在目录
- 使用 `-m mineru.cli` 运行模块
- 适合开发调试和定制化需求
#### 6.1.2 安装包运行方式
如果你是通过 pip 或 uv 安装了 mineru-mcp 包,可以使用以下更简洁的配置。这种方式适合生产环境或日常使用:
```json
{
"mcpServers": {
"mineru-mcp": {
"command": "uvx",
"args": ["mineru-mcp"],
"env": {
"MINERU_API_BASE": "https://mineru.net",
"MINERU_API_KEY": "ey...",
"OUTPUT_DIR": "./downloads",
"USE_LOCAL_API": "true",
"LOCAL_MINERU_API_BASE": "http://localhost:8080"
}
}
}
}
```
这种配置的特点:
- 使用 `uvx` 命令直接运行已安装的包
- 配置更加简洁
- 不需要指定源码目录
- 适合稳定的生产环境使用
### 6.2 在 FastMCP 客户端中使用
```python
from fastmcp import FastMCP
# 初始化 FastMCP 客户端
client = FastMCP(server_url="http://localhost:8001")
# 使用 parse_documents 工具处理单个文档
result = await client.tool_call(
tool_name="parse_documents",
params={"file_sources": "/path/to/document.pdf"}
)
# 混合处理URLs和本地文件
result = await client.tool_call(
tool_name="parse_documents",
params={"file_sources": "/path/to/file.pdf, https://example.com/document.pdf"}
)
# 启用OCR
result = await client.tool_call(
tool_name="parse_documents",
params={"file_sources": "/path/to/file.pdf", "enable_ocr": True}
)
```
### 6.3 直接运行服务
你可以通过设置环境变量并直接运行命令的方式启动 MinerU MCP 服务器,这种方式特别适合快速测试和开发环境。
#### 6.3.1 设置环境变量
首先,确保设置了必要的环境变量。你可以通过创建 `.env` 文件(参考 `.env.example`)或直接在命令行中设置:
```bash
# Linux/macOS
export MINERU_API_BASE="https://mineru.net"
export MINERU_API_KEY="your-api-key"
export OUTPUT_DIR="./downloads"
export USE_LOCAL_API="true" # 可选,如果需要本地解析
export LOCAL_MINERU_API_BASE="http://localhost:8080" # 可选,如果启用本地 API
# Windows
set MINERU_API_BASE=https://mineru.net
set MINERU_API_KEY=your-api-key
set OUTPUT_DIR=./downloads
set USE_LOCAL_API=true
set LOCAL_MINERU_API_BASE=http://localhost:8080
```
#### 6.3.2 启动服务
使用以下命令启动 MinerU MCP 服务器,支持多种传输模式:
**SSE 传输模式**
```bash
uv run mineru-mcp --transport sse
```
**Streamable HTTP 传输模式**
```bash
uv run mineru-mcp --transport streamable-http
```
或者,如果你使用全局安装:
```bash
mineru-mcp --transport sse
# 或
mineru-mcp --transport streamable-http
```
服务默认在 `http://localhost:8001` 启动,使用的传输协议取决于你指定的 `--transport` 参数。
> **注意**:不同传输模式使用不同的路由路径:
> - SSE 模式:`/sse`(例如:`http://localhost:8001/sse`)
> - Streamable HTTP 模式:`/mcp`(例如:`http://localhost:8001/mcp`)
## 7. Docker 部署
本项目支持使用 Docker 进行部署,使你能在任何支持 Docker 的环境中快速启动 MinerU MCP 服务器。
### 7.1 使用 Docker Compose
1. 确保你已经安装了 Docker 和 Docker Compose
2. 复制项目根目录中的 `.env.example` 文件为 `.env`,并根据你的需求修改环境变量
3. 运行以下命令启动服务:
```bash
docker-compose up -d
```
服务默认会在 `http://localhost:8001` 启动。
### 7.2 手动构建 Docker 镜像
如果需要手动构建 Docker 镜像,可以使用以下命令:
```bash
docker build -t mineru-mcp:latest .
```
然后启动容器:
```bash
docker run -p 8001:8001 --env-file .env mineru-mcp:latest
```
更多 Docker 相关信息,请参考 `DOCKER_README.md` 文件。
## 8. 常见问题
### 8.1 API 密钥问题
**问题**:无法连接 MinerU API 或返回 401 错误。
**解决方案**:检查你的 API 密钥是否正确设置。在 `.env` 文件中确保 `MINERU_API_KEY` 环境变量包含有效的密钥。
### 8.2 如何优雅退出服务
**问题**:如何正确地停止 MinerU MCP 服务?
**解决方案**:服务运行时,可以通过按 `Ctrl+C` 来优雅地退出。系统会自动处理正在进行的操作,并确保所有资源得到正确释放。如果一次 `Ctrl+C` 没有响应,可以再次按下 `Ctrl+C` 强制退出。
### 8.3 文件路径问题
**问题**:使用 `parse_documents` 工具处理本地文件时报找不到文件错误。
**解决方案**:请确保使用绝对路径,或者相对于服务器运行目录的正确相对路径。
### 8.4 MCP 服务调用超时问题
**问题**:调用 `parse_documents` 工具时出现 `Error calling tool 'parse_documents': MCP error -32001: Request timed out` 错误。
**解决方案**:这个问题常见于处理大型文档或网络不稳定的情况。在某些 MCP 客户端(如 Cursor)中,超时后可能导致无法再次调用 MCP 服务,需要重启客户端。最新版本的 Cursor 中可能会显示正在调用 MCP,但实际上没有真正调用成功。建议:
1. **等待官方修复**:这是Cursor客户端的已知问题,建议等待Cursor官方修复
2. **处理小文件**:尽量只处理少量小文件,避免处理大型文档导致超时
3. **分批处理**:将多个文件分成多次请求处理,每次只处理一两个文件
4. 增加超时时间设置(如果客户端支持)
5. 对于超时后无法再次调用的问题,需要重启 MCP 客户端
6. 如果反复出现超时,请检查网络连接或考虑使用本地 API 模式
## 9. 许可证
本项目采用 MIT 许可证。详见 [LICENSE](LICENSE) 文件。
version: '3'
services:
mineru-mcp:
build:
context: .
dockerfile: Dockerfile
ports:
- "8001:8001"
environment:
- MINERU_API_KEY=${MINERU_API_KEY}
volumes:
- ./downloads:/app/downloads
restart: unless-stopped
\ No newline at end of file
[project]
name = "mineru-mcp"
version = "0.1.12"
description = "MinerU MCP Server for PDF to Markdown conversion"
authors = [
{name = "minerU",email = "OpenDataLab@pjlab.org.cn"}
]
readme = "README.md"
license = {text = "MIT"}
requires-python = ">=3.10,<4.0"
classifiers = [
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
]
dependencies = [
"fastmcp>=2.5.2",
"python-dotenv>=1.0.0",
"requests>=2.31.0",
"aiohttp>=3.9.0",
"httpx>=0.24.0",
"uvicorn>=0.20.0",
"starlette>=0.27.0",
]
[project.scripts]
mineru-mcp = "mineru.cli:main"
[tool.poetry]
packages = [{include = "mineru", from = "src"}]
[[tool.poetry.source]]
name = "aliyun"
url = "https://mirrors.aliyun.com/pypi/simple/"
priority = "primary"
[build-system]
requires = ["setuptools>=42.0", "wheel"]
build-backend = "setuptools.build_meta"
"""MinerU File转Markdown转换的API客户端。"""
import asyncio
import os
import zipfile
from pathlib import Path
from typing import Any, Dict, List, Optional, Union
import aiohttp
import requests
from . import config
def singleton_func(cls):
instance = {}
def _singleton(*args, **kwargs):
if cls not in instance:
instance[cls] = cls(*args, **kwargs)
return instance[cls]
return _singleton
@singleton_func
class MinerUClient:
"""
用于与 MinerU API 交互以将 File 转换为 Markdown 的客户端。
"""
def __init__(self, api_base: Optional[str] = None, api_key: Optional[str] = None):
"""
初始化 MinerU API 客户端。
Args:
api_base: MinerU API 的基础 URL (默认: 从环境变量获取)
api_key: 用于向 MinerU 进行身份验证的 API 密钥 (默认: 从环境变量获取)
"""
self.api_base = api_base or config.MINERU_API_BASE
self.api_key = api_key or config.MINERU_API_KEY
if not self.api_key:
# 提供更友好的错误消息
raise ValueError(
"错误: MinerU API 密钥 (MINERU_API_KEY) 未设置或为空。\n"
"请确保已设置 MINERU_API_KEY 环境变量,例如:\n"
" export MINERU_API_KEY='your_actual_api_key'\n"
"或者,在项目根目录的 `.env` 文件中定义该变量。"
)
async def _request(self, method: str, endpoint: str, **kwargs) -> Dict[str, Any]:
"""
向 MinerU API 发出请求。
Args:
method: HTTP 方法 (GET, POST 等)
endpoint: API 端点路径 (不含基础 URL)
**kwargs: 传递给 aiohttp 请求的其他参数
Returns:
dict: API 响应 (JSON 格式)
"""
url = f"{self.api_base}{endpoint}"
headers = {
"Authorization": f"Bearer {self.api_key}",
"Accept": "application/json",
}
if "headers" in kwargs:
kwargs["headers"].update(headers)
else:
kwargs["headers"] = headers
# 创建一个不包含授权信息的参数副本,用于日志记录
log_kwargs = kwargs.copy()
if "headers" in log_kwargs and "Authorization" in log_kwargs["headers"]:
log_kwargs["headers"] = log_kwargs["headers"].copy()
log_kwargs["headers"]["Authorization"] = "Bearer ****" # 隐藏API密钥
config.logger.debug(f"API请求: {method} {url}")
config.logger.debug(f"请求参数: {log_kwargs}")
async with aiohttp.ClientSession() as session:
async with session.request(method, url, **kwargs) as response:
response.raise_for_status()
response_json = await response.json()
config.logger.debug(f"API响应: {response_json}")
return response_json
async def submit_file_url_task(
self,
urls: Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]],
enable_ocr: bool = True,
language: str = "ch",
page_ranges: Optional[str] = None,
) -> Dict[str, Any]:
"""
提交 File URL 以转换为 Markdown。支持单个URL或多个URL批量处理。
Args:
urls: 可以是以下形式之一:
1. 单个URL字符串
2. 多个URL的列表
3. 包含URL配置的字典列表,每个字典包含:
- url: File文件URL (必需)
- is_ocr: 是否启用OCR (可选)
- data_id: 文件数据ID (可选)
- page_ranges: 页码范围 (可选)
enable_ocr: 是否为转换启用 OCR(所有文件的默认值)
language: 指定文档语言,默认 ch,中文
page_ranges: 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6"表示选取第2页、第4页至第6页;"2--2"表示从第2页到倒数第2页。
Returns:
dict: 任务信息,包括batch_id
"""
# 统计URL数量
url_count = 1
if isinstance(urls, list):
url_count = len(urls)
config.logger.debug(
f"调用submit_file_url_task: {url_count}个URL, "
+ f"ocr={enable_ocr}, "
+ f"language={language}"
)
# 处理输入,确保我们有一个URL配置列表
urls_config = []
# 转换输入为标准格式
if isinstance(urls, str):
urls_config.append(
{"url": urls, "is_ocr": enable_ocr, "page_ranges": page_ranges}
)
elif isinstance(urls, list):
# 处理URL列表或URL配置列表
for i, url_item in enumerate(urls):
if isinstance(url_item, str):
# 简单的URL字符串
urls_config.append(
{
"url": url_item,
"is_ocr": enable_ocr,
"page_ranges": page_ranges,
}
)
elif isinstance(url_item, dict):
# 含有详细配置的URL字典
if "url" not in url_item:
raise ValueError(f"URL配置必须包含 'url' 字段: {url_item}")
url_is_ocr = url_item.get("is_ocr", enable_ocr)
url_page_ranges = url_item.get("page_ranges", page_ranges)
url_config = {"url": url_item["url"], "is_ocr": url_is_ocr}
if url_page_ranges is not None:
url_config["page_ranges"] = url_page_ranges
urls_config.append(url_config)
else:
raise TypeError(f"不支持的URL配置类型: {type(url_item)}")
elif isinstance(urls, dict):
# 单个URL配置字典
if "url" not in urls:
raise ValueError(f"URL配置必须包含 'url' 字段: {urls}")
url_is_ocr = urls.get("is_ocr", enable_ocr)
url_page_ranges = urls.get("page_ranges", page_ranges)
url_config = {"url": urls["url"], "is_ocr": url_is_ocr}
if url_page_ranges is not None:
url_config["page_ranges"] = url_page_ranges
urls_config.append(url_config)
else:
raise TypeError(f"urls 必须是字符串、列表或字典,而不是 {type(urls)}")
# 构建API请求payload
files_payload = urls_config # 与submit_file_task不同,这里直接使用URLs配置
payload = {
"language": language,
"files": files_payload,
}
# 调用批量API
response = await self._request(
"POST", "/api/v4/extract/task/batch", json=payload
)
# 检查响应
if "data" not in response or "batch_id" not in response["data"]:
raise ValueError(f"提交批量URL任务失败: {response}")
batch_id = response["data"]["batch_id"]
config.logger.info(f"开始处理 {len(urls_config)} 个文件URL")
config.logger.debug(f"批量URL任务提交成功,批次ID: {batch_id}")
# 返回包含batch_id的响应和URLs信息
result = {
"data": {
"batch_id": batch_id,
"uploaded_files": [url_config.get("url") for url_config in urls_config],
}
}
# 对于单个URL的情况,设置file_name以保持与原来返回格式的兼容性
if len(urls_config) == 1:
url = urls_config[0]["url"]
# 从URL中提取文件名
file_name = url.split("/")[-1]
result["data"]["file_name"] = file_name
return result
async def submit_file_task(
self,
files: Union[str, List[Union[str, Dict[str, Any]]], Dict[str, Any]],
enable_ocr: bool = True,
language: str = "ch",
page_ranges: Optional[str] = None,
) -> Dict[str, Any]:
"""
提交本地 File 文件以转换为 Markdown。支持单个文件路径或多个文件配置。
Args:
files: 可以是以下形式之一:
1. 单个文件路径字符串
2. 多个文件路径的列表
3. 包含文件配置的字典列表,每个字典包含:
- path/name: 文件路径或文件名
- is_ocr: 是否启用OCR (可选)
- data_id: 文件数据ID (可选)
- page_ranges: 页码范围 (可选)
enable_ocr: 是否为转换启用 OCR(所有文件的默认值)
language: 指定文档语言,默认 ch,中文
page_ranges: 指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6"表示选取第2页、第4页至第6页;"2--2"表示从第2页到倒数第2页。
Returns:
dict: 任务信息,包括batch_id
"""
# 统计文件数量
file_count = 1
if isinstance(files, list):
file_count = len(files)
config.logger.debug(
f"调用submit_file_task: {file_count}个文件, "
+ f"ocr={enable_ocr}, "
+ f"language={language}"
)
# 处理输入,确保我们有一个文件配置列表
files_config = []
# 转换输入为标准格式
if isinstance(files, str):
# 单个文件路径
file_path = Path(files)
if not file_path.exists():
raise FileNotFoundError(f"未找到 File 文件: {file_path}")
files_config.append(
{
"path": file_path,
"name": file_path.name,
"is_ocr": enable_ocr,
"page_ranges": page_ranges,
}
)
elif isinstance(files, list):
# 处理文件路径列表或文件配置列表
for i, file_item in enumerate(files):
if isinstance(file_item, str):
# 简单的文件路径
file_path = Path(file_item)
if not file_path.exists():
raise FileNotFoundError(f"未找到 File 文件: {file_path}")
files_config.append(
{
"path": file_path,
"name": file_path.name,
"is_ocr": enable_ocr,
"page_ranges": page_ranges,
}
)
elif isinstance(file_item, dict):
# 含有详细配置的文件字典
if "path" not in file_item and "name" not in file_item:
raise ValueError(
f"文件配置必须包含 'path' 或 'name' 字段: {file_item}"
)
if "path" in file_item:
file_path = Path(file_item["path"])
if not file_path.exists():
raise FileNotFoundError(f"未找到 File 文件: {file_path}")
file_name = file_path.name
else:
file_name = file_item["name"]
file_path = None
file_is_ocr = file_item.get("is_ocr", enable_ocr)
file_page_ranges = file_item.get("page_ranges", page_ranges)
file_config = {
"path": file_path,
"name": file_name,
"is_ocr": file_is_ocr,
}
if file_page_ranges is not None:
file_config["page_ranges"] = file_page_ranges
files_config.append(file_config)
else:
raise TypeError(f"不支持的文件配置类型: {type(file_item)}")
elif isinstance(files, dict):
# 单个文件配置字典
if "path" not in files and "name" not in files:
raise ValueError(f"文件配置必须包含 'path' 或 'name' 字段: {files}")
if "path" in files:
file_path = Path(files["path"])
if not file_path.exists():
raise FileNotFoundError(f"未找到 File 文件: {file_path}")
file_name = file_path.name
else:
file_name = files["name"]
file_path = None
file_is_ocr = files.get("is_ocr", enable_ocr)
file_page_ranges = files.get("page_ranges", page_ranges)
file_config = {
"path": file_path,
"name": file_name,
"is_ocr": file_is_ocr,
}
if file_page_ranges is not None:
file_config["page_ranges"] = file_page_ranges
files_config.append(file_config)
else:
raise TypeError(f"files 必须是字符串、列表或字典,而不是 {type(files)}")
# 步骤1: 构建API请求payload
files_payload = []
for file_config in files_config:
file_payload = {
"name": file_config["name"],
"is_ocr": file_config["is_ocr"],
}
if "page_ranges" in file_config and file_config["page_ranges"] is not None:
file_payload["page_ranges"] = file_config["page_ranges"]
files_payload.append(file_payload)
payload = {
"language": language,
"files": files_payload,
}
# 步骤2: 获取文件上传URL
response = await self._request("POST", "/api/v4/file-urls/batch", json=payload)
# 检查响应
if (
"data" not in response
or "batch_id" not in response["data"]
or "file_urls" not in response["data"]
):
raise ValueError(f"获取上传URL失败: {response}")
batch_id = response["data"]["batch_id"]
file_urls = response["data"]["file_urls"]
if len(file_urls) != len(files_config):
raise ValueError(
f"上传URL数量 ({len(file_urls)}) 与文件数量 ({len(files_config)}) 不匹配"
)
config.logger.info(f"开始上传 {len(file_urls)} 个本地文件")
config.logger.debug(f"获取上传URL成功,批次ID: {batch_id}")
# 步骤3: 上传所有文件
uploaded_files = []
for i, (file_config, upload_url) in enumerate(zip(files_config, file_urls)):
file_path = file_config["path"]
if file_path is None:
raise ValueError(f"文件 {file_config['name']} 没有有效的路径")
try:
with open(file_path, "rb") as f:
# 重要:不设置Content-Type,让OSS自动处理
response = requests.put(upload_url, data=f)
if response.status_code != 200:
raise ValueError(
f"文件上传失败,状态码: {response.status_code}, 响应: {response.text}"
)
config.logger.debug(f"文件 {file_path.name} 上传成功")
uploaded_files.append(file_path.name)
except Exception as e:
raise ValueError(f"文件 {file_path.name} 上传失败: {str(e)}")
config.logger.info(f"文件上传完成,共 {len(uploaded_files)} 个文件")
# 返回包含batch_id的响应和已上传的文件信息
result = {"data": {"batch_id": batch_id, "uploaded_files": uploaded_files}}
# 对于单个文件的情况,保持与原来返回格式的兼容性
if len(uploaded_files) == 1:
result["data"]["file_name"] = uploaded_files[0]
return result
async def get_batch_task_status(self, batch_id: str) -> Dict[str, Any]:
"""
获取批量转换任务的状态。
Args:
batch_id: 批量任务的ID
Returns:
dict: 批量任务状态信息
"""
response = await self._request(
"GET", f"/api/v4/extract-results/batch/{batch_id}"
)
return response
async def process_file_to_markdown(
self,
task_fn,
task_arg: Union[str, List[Dict[str, Any]], Dict[str, Any]],
enable_ocr: bool = True,
output_dir: Optional[str] = None,
max_retries: int = 180,
retry_interval: int = 10,
) -> Union[str, Dict[str, Any]]:
"""
从开始到结束处理 File 到 Markdown 的转换。
Args:
task_fn: 提交任务的函数 (submit_file_url_task 或 submit_file_task)
task_arg: 任务函数的参数,可以是:
- URL字符串
- 文件路径字符串
- 包含文件配置的字典
- 包含多个文件配置的字典列表
enable_ocr: 是否启用 OCR
output_dir: 结果的输出目录
max_retries: 最大状态检查重试次数
retry_interval: 状态检查之间的时间间隔 (秒)
Returns:
Union[str, Dict[str, Any]]:
- 单文件: 包含提取的 Markdown 文件的目录路径
- 多文件: {
"results": [
{
"filename": str,
"status": str,
"content": str,
"error_message": str,
}
],
"extract_dir": str
}
"""
try:
# 提交任务 - 使用位置参数调用,而不是命名参数
task_info = await task_fn(task_arg, enable_ocr)
# 批量任务处理
batch_id = task_info["data"]["batch_id"]
# 获取所有上传文件的名称
uploaded_files = task_info["data"].get("uploaded_files", [])
if not uploaded_files and "file_name" in task_info["data"]:
uploaded_files = [task_info["data"]["file_name"]]
if not uploaded_files:
raise ValueError("无法获取上传文件的信息")
config.logger.debug(f"批量任务提交成功。Batch ID: {batch_id}")
# 跟踪所有文件的处理状态
files_status = {} # 将使用file_name作为键
files_download_urls = {}
failed_files = {} # 记录失败的文件和错误信息
# 准备输出路径
output_path = config.ensure_output_dir(output_dir)
# 轮询任务完成情况
for i in range(max_retries):
status_info = await self.get_batch_task_status(batch_id)
config.logger.debug(f"轮训结果:{status_info}")
if (
"data" not in status_info
or "extract_result" not in status_info["data"]
):
config.logger.error(f"获取批量任务状态失败: {status_info}")
await asyncio.sleep(retry_interval)
continue
# 检查所有文件的状态
all_done = True
has_progress = False
for result in status_info["data"]["extract_result"]:
file_name = result.get("file_name")
if not file_name:
continue
# 初始化状态,如果之前没有记录
if file_name not in files_status:
files_status[file_name] = "pending"
state = result.get("state")
files_status[file_name] = state
if state == "done":
# 保存下载链接
full_zip_url = result.get("full_zip_url")
if full_zip_url:
files_download_urls[file_name] = full_zip_url
config.logger.info(f"文件 {file_name} 处理完成")
else:
config.logger.debug(
f"文件 {file_name} 标记为完成但没有下载链接"
)
all_done = False
elif state in ["failed", "error"]:
err_msg = result.get("err_msg", "未知错误")
failed_files[file_name] = err_msg
config.logger.warning(f"文件 {file_name} 处理失败: {err_msg}")
# 不抛出异常,继续处理其他文件
else:
all_done = False
# 显示进度信息
if state == "running" and "extract_progress" in result:
has_progress = True
progress = result["extract_progress"]
extracted = progress.get("extracted_pages", 0)
total = progress.get("total_pages", 0)
if total > 0:
percent = (extracted / total) * 100
config.logger.info(
f"处理进度: {file_name} "
+ f"{extracted}/{total} 页 "
+ f"({percent:.1f}%)"
)
# 检查是否所有文件都已经处理完成
expected_file_count = len(uploaded_files)
processed_file_count = len(files_status)
completed_file_count = len(files_download_urls) + len(failed_files)
# 记录当前状态
config.logger.debug(
f"文件处理状态: all_done={all_done}, "
+ f"files_status数量={processed_file_count}, "
+ f"上传文件数量={expected_file_count}, "
+ f"下载链接数量={len(files_download_urls)}, "
+ f"失败文件数量={len(failed_files)}"
)
# 判断是否所有文件都已完成(包括成功和失败的)
if (
processed_file_count > 0
and processed_file_count >= expected_file_count
and completed_file_count >= processed_file_count
):
if files_download_urls or failed_files:
config.logger.info("文件处理完成")
if failed_files:
config.logger.warning(
f"有 {len(failed_files)} 个文件处理失败"
)
break
else:
# 这种情况不应该发生,但保险起见
all_done = False
# 如果没有进度信息,只显示简单的等待消息
if not has_progress:
config.logger.info(f"等待文件处理完成... ({i+1}/{max_retries})")
await asyncio.sleep(retry_interval)
else:
# 如果超过最大重试次数,检查是否有部分文件完成
if not files_download_urls and not failed_files:
raise TimeoutError(f"批量任务 {batch_id} 未在允许的时间内完成")
else:
config.logger.warning(
"警告: 部分文件未在允许的时间内完成," + "继续处理已完成的文件"
)
# 创建主提取目录
extract_dir = output_path / batch_id
extract_dir.mkdir(exist_ok=True)
# 准备结果列表
results = []
# 下载并解压每个成功的文件的结果
for file_name, download_url in files_download_urls.items():
try:
config.logger.debug
(f"下载文件处理结果: {file_name}")
# 从下载URL中提取zip文件名作为子目录名
zip_file_name = download_url.split("/")[-1]
# 去掉.zip扩展名
zip_dir_name = os.path.splitext(zip_file_name)[0]
file_extract_dir = extract_dir / zip_dir_name
file_extract_dir.mkdir(exist_ok=True)
# 下载ZIP文件
zip_path = output_path / f"{batch_id}_{zip_file_name}"
async with aiohttp.ClientSession() as session:
async with session.get(
download_url,
headers={"Authorization": f"Bearer {self.api_key}"},
) as response:
response.raise_for_status()
with open(zip_path, "wb") as f:
f.write(await response.read())
# 解压到子文件夹
with zipfile.ZipFile(zip_path, "r") as zip_ref:
zip_ref.extractall(file_extract_dir)
# 解压后删除ZIP文件
zip_path.unlink()
# 尝试读取Markdown内容
markdown_content = ""
markdown_files = list(file_extract_dir.glob("*.md"))
if markdown_files:
with open(markdown_files[0], "r", encoding="utf-8") as f:
markdown_content = f.read()
# 添加成功结果
results.append(
{
"filename": file_name,
"status": "success",
"content": markdown_content,
"extract_path": str(file_extract_dir),
}
)
config.logger.debug(
f"文件 {file_name} 的结果已解压到: {file_extract_dir}"
)
except Exception as e:
# 下载失败,添加错误结果
error_msg = f"下载结果失败: {str(e)}"
config.logger.error(f"文件 {file_name} {error_msg}")
results.append(
{
"filename": file_name,
"status": "error",
"error_message": error_msg,
}
)
# 添加处理失败的文件到结果
for file_name, error_msg in failed_files.items():
results.append(
{
"filename": file_name,
"status": "error",
"error_message": f"处理失败: {error_msg}",
}
)
# 输出处理结果统计
success_count = len(files_download_urls)
fail_count = len(failed_files)
total_count = success_count + fail_count
config.logger.info("\n=== 文件处理结果统计 ===")
config.logger.info(f"总文件数: {total_count}")
config.logger.info(f"成功处理: {success_count}")
config.logger.info(f"处理失败: {fail_count}")
if failed_files:
config.logger.info("\n失败文件详情:")
for file_name, error_msg in failed_files.items():
config.logger.info(f" - {file_name}: {error_msg}")
if success_count > 0:
config.logger.info(f"\n结果保存目录: {extract_dir}")
else:
config.logger.info(f"\n输出目录: {extract_dir}")
# 返回详细结果
return {
"results": results,
"extract_dir": str(extract_dir),
"success_count": success_count,
"fail_count": fail_count,
"total_count": total_count,
}
except Exception as e:
config.logger.error(f"处理 File 到 Markdown 失败: {str(e)}")
raise
"""MinerU File转Markdown服务的命令行界面。"""
import sys
import argparse
from . import config
from . import server
def main():
"""命令行界面的入口点。"""
parser = argparse.ArgumentParser(description="MinerU File转Markdown转换服务")
parser.add_argument(
"--output-dir", "-o", type=str, help="保存转换后文件的目录 (默认: ./downloads)"
)
parser.add_argument(
"--transport",
"-t",
type=str,
default="stdio",
help="协议类型 (默认: stdio,可选: sse,streamable-http)",
)
parser.add_argument(
"--port",
"-p",
type=int,
default=8001,
help="服务器端口 (默认: 8001, 仅在使用HTTP协议时有效)",
)
parser.add_argument(
"--host",
type=str,
default="127.0.0.1",
help="服务器主机地址 (默认: 127.0.0.1, 仅在使用HTTP协议时有效)",
)
args = parser.parse_args()
# 检查参数有效性
if args.transport == "stdio" and (args.host != "127.0.0.1" or args.port != 8001):
print("警告: 在STDIO模式下,--host和--port参数将被忽略", file=sys.stderr)
# 验证API密钥 - 移动到这里,以便 --help 等参数可以无密钥运行
if not config.MINERU_API_KEY:
print(
"错误: 启动服务需要 MINERU_API_KEY 环境变量。"
"\\n请检查是否已设置该环境变量,例如:"
"\\n export MINERU_API_KEY='your_actual_api_key'"
"\\n或者,确保在项目根目录的 `.env` 文件中定义了该变量。"
"\\n\\n您可以使用 --help 查看可用的命令行选项。",
file=sys.stderr, # 将错误消息输出到 stderr
)
sys.exit(1)
# 如果提供了输出目录,则进行设置
if args.output_dir:
server.set_output_dir(args.output_dir)
# 打印配置信息
print("MinerU File转Markdown转换服务启动...")
if args.transport in ["sse", "streamable-http"]:
print(f"服务器地址: {args.host}:{args.port}")
print("按 Ctrl+C 可以退出服务")
server.run_server(mode=args.transport, port=args.port, host=args.host)
if __name__ == "__main__":
main()
"""MinerU File转Markdown转换服务的配置工具。"""
import os
import logging
from pathlib import Path
from dotenv import load_dotenv
# 从 .env 文件加载环境变量
load_dotenv()
# API 配置
MINERU_API_BASE = os.getenv("MINERU_API_BASE", "https://mineru.net")
MINERU_API_KEY = os.getenv("MINERU_API_KEY", "")
# 本地API配置
USE_LOCAL_API = os.getenv("USE_LOCAL_API", "").lower() in ["true", "1", "yes"]
LOCAL_MINERU_API_BASE = os.getenv("LOCAL_MINERU_API_BASE", "http://localhost:8080")
# 转换后文件的默认输出目录
DEFAULT_OUTPUT_DIR = os.getenv("OUTPUT_DIR", "./downloads")
# 设置日志系统
def setup_logging():
"""
设置日志系统,根据环境变量配置日志级别。
Returns:
logging.Logger: 配置好的日志记录器。
"""
# 获取环境变量中的日志级别设置
log_level = os.getenv("MINERU_LOG_LEVEL", "INFO").upper()
debug_mode = os.getenv("MINERU_DEBUG", "").lower() in ["true", "1", "yes"]
# 如果设置了debug_mode,则覆盖log_level
if debug_mode:
log_level = "DEBUG"
# 确保log_level是有效的
valid_levels = ["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"]
if log_level not in valid_levels:
log_level = "INFO"
# 设置日志格式
log_format = "%(asctime)s - %(name)s - %(levelname)s - %(message)s"
# 配置日志
logging.basicConfig(level=getattr(logging, log_level), format=log_format)
logger = logging.getLogger("mineru")
logger.setLevel(getattr(logging, log_level))
# 输出日志级别信息
logger.info(f"日志级别设置为: {log_level}")
return logger
# 创建默认的日志记录器
logger = setup_logging()
# 如果输出目录不存在,则创建它
def ensure_output_dir(output_dir=None):
"""
确保输出目录存在。
Args:
output_dir: 输出目录的可选路径。如果为 None,则使用 DEFAULT_OUTPUT_DIR。
Returns:
表示输出目录的 Path 对象。
"""
output_path = Path(output_dir or DEFAULT_OUTPUT_DIR)
output_path.mkdir(parents=True, exist_ok=True)
return output_path
# 验证 API 配置
def validate_api_config():
"""
验证是否已设置所需的 API 配置。
Returns:
dict: 配置状态。
"""
return {
"api_base": MINERU_API_BASE,
"api_key_set": bool(MINERU_API_KEY),
"output_dir": DEFAULT_OUTPUT_DIR,
}
"""演示如何使用 MinerU File转Markdown客户端的示例。"""
import os
import asyncio
from mcp.client import MCPClient
async def convert_file_url_example():
"""从 URL 转换 File 的示例。"""
client = MCPClient("http://localhost:8000")
# 转换单个 File URL
result = await client.call(
"convert_file_url", url="https://example.com/sample.pdf", enable_ocr=True
)
print(f"转换结果: {result}")
# 转换多个 File URL
urls = """
https://example.com/doc1.pdf
https://example.com/doc2.pdf
"""
result = await client.call("convert_file_url", url=urls, enable_ocr=True)
print(f"多个转换结果: {result}")
async def convert_file_file_example():
"""转换本地 File 文件的示例。"""
client = MCPClient("http://localhost:8000")
# 获取测试 File 的绝对路径
script_dir = os.path.dirname(os.path.abspath(__file__))
project_root = os.path.dirname(os.path.dirname(os.path.dirname(script_dir)))
test_file_path = os.path.join(project_root, "test_files", "test.pdf")
# 转换单个 File 文件
result = await client.call(
"convert_file_file", file_path=test_file_path, enable_ocr=True
)
print(f"文件转换结果: {result}")
async def get_api_status_example():
"""获取 API 状态的示例。"""
client = MCPClient("http://localhost:8000")
# 获取 API 状态
status = await client.get_resource("status://api")
print(f"API 状态: {status}")
# 获取使用帮助
help_text = await client.get_resource("help://usage")
print(f"使用帮助: {help_text[:100]}...") # 显示前 100 个字符
async def main():
"""运行所有示例。"""
print("运行 File 到 Markdown 转换示例...")
# 检查是否设置了 API_KEY
if not os.environ.get("MINERU_API_KEY"):
print("警告: MINERU_API_KEY 环境变量未设置。")
print("使用以下命令设置: export MINERU_API_KEY=your_api_key")
print("跳过需要 API 访问的示例...")
# 仅获取 API 状态
await get_api_status_example()
else:
# 运行所有示例
await convert_file_url_example()
await convert_file_file_example()
await get_api_status_example()
if __name__ == "__main__":
asyncio.run(main())
"""MinerU支持的语言列表。"""
from typing import Dict, List
# 支持的语言列表
LANGUAGES: List[Dict[str, str]] = [
{"name": "中文", "description": "Chinese & English", "code": "ch"},
{"name": "英文", "description": "English", "code": "en"},
{"name": "法文", "description": "French", "code": "fr"},
{"name": "德文", "description": "German", "code": "german"},
{"name": "日文", "description": "Japanese", "code": "japan"},
{"name": "韩文", "description": "Korean", "code": "korean"},
{"name": "中文繁体", "description": "Chinese Traditional", "code": "chinese_cht"},
{"name": "意大利文", "description": "Italian", "code": "it"},
{"name": "西班牙文", "description": "Spanish", "code": "es"},
{"name": "葡萄牙文", "description": "Portuguese", "code": "pt"},
{"name": "俄罗斯文", "description": "Russian", "code": "ru"},
{"name": "阿拉伯文", "description": "Arabic", "code": "ar"},
{"name": "印地文", "description": "Hindi", "code": "hi"},
{"name": "维吾尔", "description": "Uyghur", "code": "ug"},
{"name": "波斯文", "description": "Persian", "code": "fa"},
{"name": "乌尔都文", "description": "Urdu", "code": "ur"},
{"name": "塞尔维亚文(latin)", "description": "Serbian(latin)", "code": "rs_latin"},
{"name": "欧西坦文", "description": "Occitan", "code": "oc"},
{"name": "马拉地文", "description": "Marathi", "code": "mr"},
{"name": "尼泊尔文", "description": "Nepali", "code": "ne"},
{
"name": "塞尔维亚文(cyrillic)",
"description": "Serbian(cyrillic)",
"code": "rs_cyrillic",
},
{"name": "毛利文", "description": "Maori", "code": "mi"},
{"name": "马来文", "description": "Malay", "code": "ms"},
{"name": "马耳他文", "description": "Maltese", "code": "mt"},
{"name": "荷兰文", "description": "Dutch", "code": "nl"},
{"name": "挪威文", "description": "Norwegian", "code": "no"},
{"name": "波兰文", "description": "Polish", "code": "pl"},
{"name": "罗马尼亚文", "description": "Romanian", "code": "ro"},
{"name": "斯洛伐克文", "description": "Slovak", "code": "sk"},
{"name": "斯洛文尼亚文", "description": "Slovenian", "code": "sl"},
{"name": "阿尔巴尼亚文", "description": "Albanian", "code": "sq"},
{"name": "瑞典文", "description": "Swedish", "code": "sv"},
{"name": "西瓦希里文", "description": "Swahili", "code": "sw"},
{"name": "塔加洛文", "description": "Tagalog", "code": "tl"},
{"name": "土耳其文", "description": "Turkish", "code": "tr"},
{"name": "乌兹别克文", "description": "Uzbek", "code": "uz"},
{"name": "越南文", "description": "Vietnamese", "code": "vi"},
{"name": "蒙古文", "description": "Mongolian", "code": "mn"},
{"name": "车臣文", "description": "Chechen", "code": "che"},
{"name": "哈里亚纳语", "description": "Haryanvi", "code": "bgc"},
{"name": "保加利亚文", "description": "Bulgarian", "code": "bg"},
{"name": "乌克兰文", "description": "Ukranian", "code": "uk"},
{"name": "白俄罗斯文", "description": "Belarusian", "code": "be"},
{"name": "泰卢固文", "description": "Telugu", "code": "te"},
{"name": "阿巴扎文", "description": "Abaza", "code": "abq"},
{"name": "泰米尔文", "description": "Tamil", "code": "ta"},
{"name": "南非荷兰文", "description": "Afrikaans", "code": "af"},
{"name": "阿塞拜疆文", "description": "Azerbaijani", "code": "az"},
{"name": "波斯尼亚文", "description": "Bosnian", "code": "bs"},
{"name": "捷克文", "description": "Czech", "code": "cs"},
{"name": "威尔士文", "description": "Welsh", "code": "cy"},
{"name": "丹麦文", "description": "Danish", "code": "da"},
{"name": "爱沙尼亚文", "description": "Estonian", "code": "et"},
{"name": "爱尔兰文", "description": "Irish", "code": "ga"},
{"name": "克罗地亚文", "description": "Croatian", "code": "hr"},
{"name": "匈牙利文", "description": "Hungarian", "code": "hu"},
{"name": "印尼文", "description": "Indonesian", "code": "id"},
{"name": "冰岛文", "description": "Icelandic", "code": "is"},
{"name": "库尔德文", "description": "Kurdish", "code": "ku"},
{"name": "立陶宛文", "description": "Lithuanian", "code": "lt"},
{"name": "拉脱维亚文", "description": "Latvian", "code": "lv"},
{"name": "达尔瓦文", "description": "Dargwa", "code": "dar"},
{"name": "因古什文", "description": "Ingush", "code": "inh"},
{"name": "拉克文", "description": "Lak", "code": "lbe"},
{"name": "莱兹甘文", "description": "Lezghian", "code": "lez"},
{"name": "塔巴萨兰文", "description": "Tabassaran", "code": "tab"},
{"name": "比尔哈文", "description": "Bihari", "code": "bh"},
{"name": "迈蒂利文", "description": "Maithili", "code": "mai"},
{"name": "昂加文", "description": "Angika", "code": "ang"},
{"name": "孟加拉文", "description": "Bhojpuri", "code": "bho"},
{"name": "摩揭陀文", "description": "Magahi", "code": "mah"},
{"name": "那格浦尔文", "description": "Nagpur", "code": "sck"},
{"name": "尼瓦尔文", "description": "Newari", "code": "new"},
{"name": "保加利亚文", "description": "Goan Konkani", "code": "gom"},
{"name": "梵文", "description": "Sanskrit", "code": "sa"},
{"name": "阿瓦尔文", "description": "Avar", "code": "ava"},
{"name": "阿瓦尔文", "description": "Avar", "code": "ava"},
{"name": "阿迪赫文", "description": "Adyghe", "code": "ady"},
{"name": "巴利文", "description": "Pali", "code": "pi"},
{"name": "拉丁文", "description": "Latin", "code": "la"},
]
# 构建语言代码到语言信息的映射字典,便于快速查找
LANGUAGES_DICT: Dict[str, Dict[str, str]] = {lang["code"]: lang for lang in LANGUAGES}
def get_language_list() -> List[Dict[str, str]]:
"""获取所有支持的语言列表。"""
return LANGUAGES
def get_language_by_code(code: str) -> Dict[str, str]:
"""根据语言代码获取语言信息。"""
return LANGUAGES_DICT.get(
code, {"name": "未知", "description": "Unknown", "code": code}
)
"""MinerU File转Markdown转换的FastMCP服务器实现。"""
import json
import re
import traceback
from pathlib import Path
from typing import Annotated, Any, Dict, List, Optional
import aiohttp
import uvicorn
from fastmcp import FastMCP
from mcp.server.sse import SseServerTransport
from pydantic import Field
from starlette.applications import Starlette
from starlette.requests import Request
from starlette.routing import Mount, Route
from . import config
from .api import MinerUClient
from .language import get_language_list
# 初始化 FastMCP 服务器
mcp = FastMCP(
name="MinerU File to Markdown Conversion",
instructions="""
一个将文档转化工具,可以将文档转化成Markdown、Json等格式,支持多种文件格式,包括
PDF、Word、PPT以及图片格式(JPG、PNG、JPEG)。
系统工具:
parse_documents: 解析文档(支持本地文件和URL,自动读取内容)
get_ocr_languages: 获取OCR支持的语言列表
""",
)
# 全局客户端实例
_client_instance: Optional[MinerUClient] = None
def create_starlette_app(mcp_server, *, debug: bool = False) -> Starlette:
"""创建用于SSE传输的Starlette应用。
Args:
mcp_server: MCP服务器实例
debug: 是否启用调试模式
Returns:
Starlette: 配置好的Starlette应用实例
"""
sse = SseServerTransport("/messages/")
async def handle_sse(request: Request) -> None:
"""处理SSE连接请求。"""
async with sse.connect_sse(
request.scope,
request.receive,
request._send,
) as (read_stream, write_stream):
await mcp_server.run(
read_stream,
write_stream,
mcp_server.create_initialization_options(),
)
return Starlette(
debug=debug,
routes=[
Route("/sse", endpoint=handle_sse),
Mount("/messages/", app=sse.handle_post_message),
],
)
def run_server(mode=None, port=8001, host="127.0.0.1"):
"""运行 FastMCP 服务器。
Args:
mode: 运行模式,支持stdio、sse、streamable-http
port: 服务器端口,默认为8001,仅在HTTP模式下有效
host: 服务器主机地址,默认为127.0.0.1,仅在HTTP模式下有效
"""
# 确保输出目录存在
config.ensure_output_dir(output_dir)
# 检查是否设置了 API 密钥
if not config.MINERU_API_KEY:
config.logger.warning("警告: MINERU_API_KEY 环境变量未设置。")
config.logger.warning("使用以下命令设置: export MINERU_API_KEY=your_api_key")
# 获取MCP服务器实例
mcp_server = mcp._mcp_server
try:
# 运行服务器
if mode == "sse":
config.logger.info(f"启动SSE服务器: {host}:{port}")
starlette_app = create_starlette_app(mcp_server, debug=True)
uvicorn.run(starlette_app, host=host, port=port)
elif mode == "streamable-http":
config.logger.info(f"启动Streamable HTTP服务器: {host}:{port}")
# 在HTTP模式下传递端口参数
mcp.run(mode, port=port)
else:
# 默认stdio模式
config.logger.info("启动STDIO服务器")
mcp.run(mode or "stdio")
except Exception as e:
config.logger.error(f"\n❌ 服务异常退出: {str(e)}")
traceback.print_exc()
finally:
# 清理资源
cleanup_resources()
def cleanup_resources():
"""清理全局资源。"""
global _client_instance
if _client_instance is not None:
try:
# 如果客户端有close方法,调用它
if hasattr(_client_instance, "close"):
_client_instance.close()
except Exception as e:
config.logger.error(f"清理客户端资源时出错: {str(e)}")
finally:
_client_instance = None
config.logger.info("资源清理完成")
def get_client() -> MinerUClient:
"""获取 MinerUClient 的单例实例。如果尚未初始化,则进行初始化。"""
global _client_instance
if _client_instance is None:
_client_instance = MinerUClient() # Initialization happens here
return _client_instance
# Markdown 文件的输出目录
output_dir = config.DEFAULT_OUTPUT_DIR
def set_output_dir(dir_path: str):
"""设置转换后文件的输出目录。"""
global output_dir
output_dir = dir_path
config.ensure_output_dir(output_dir)
return output_dir
def parse_list_input(input_str: str) -> List[str]:
"""
解析可能包含由逗号或换行符分隔的多个项目的字符串输入。
Args:
input_str: 可能包含多个项目的字符串
Returns:
解析出的项目列表
"""
if not input_str:
return []
# 按逗号、换行符或空格分割
items = re.split(r"[,\n\s]+", input_str)
# 移除空项目并处理带引号的项目
result = []
for item in items:
item = item.strip()
# 如果存在引号,则移除
if (item.startswith('"') and item.endswith('"')) or (
item.startswith("'") and item.endswith("'")
):
item = item[1:-1]
if item:
result.append(item)
return result
async def convert_file_url(
url: str,
enable_ocr: bool = False,
language: str = "ch",
page_ranges: str | None = None,
) -> Dict[str, Any]:
"""
从URL转换文件到Markdown格式。支持单个或多个URL处理。
返回:
成功: {"status": "success", "result_path": "输出目录路径"}
失败: {"status": "error", "error": "错误信息"}
"""
urls_to_process = None
# 检查是否为字典或字典列表格式的URL配置
if isinstance(url, dict):
# 单个URL配置字典
urls_to_process = url
elif isinstance(url, list) and len(url) > 0 and isinstance(url[0], dict):
# URL配置字典列表
urls_to_process = url
elif isinstance(url, str):
# 检查是否为 JSON 字符串格式的多URL配置
if url.strip().startswith("[") and url.strip().endswith("]"):
try:
# 尝试解析 JSON 字符串为URL配置列表
url_configs = json.loads(url)
if not isinstance(url_configs, list):
raise ValueError("JSON URL配置必须是列表格式")
urls_to_process = url_configs
except json.JSONDecodeError:
# 不是有效的 JSON,继续使用字符串解析方式
pass
if urls_to_process is None:
# 解析普通URL列表
urls = parse_list_input(url)
if not urls:
raise ValueError("未提供有效的 URL")
if len(urls) == 1:
# 单个URL处理
urls_to_process = {"url": urls[0], "is_ocr": enable_ocr}
else:
# 多个URL,转换为URL配置列表
urls_to_process = []
for url_item in urls:
urls_to_process.append(
{
"url": url_item,
"is_ocr": enable_ocr,
}
)
# 使用submit_file_url_task处理URLs
try:
result_path = await get_client().process_file_to_markdown(
lambda urls, o: get_client().submit_file_url_task(
urls,
o,
language=language,
page_ranges=page_ranges,
),
urls_to_process,
enable_ocr,
output_dir,
)
return {"status": "success", "result_path": result_path}
except Exception as e:
return {"status": "error", "error": str(e)}
async def convert_file_path(
file_path: str,
enable_ocr: bool = False,
language: str = "ch",
page_ranges: str | None = None,
) -> Dict[str, Any]:
"""
将本地文件转换为Markdown格式。支持单个或多个文件批量处理。
返回:
成功: {"status": "success", "result_path": "输出目录路径"}
失败: {"status": "error", "error": "错误信息"}
"""
files_to_process = None
# 检查是否为字典或字典列表格式的文件配置
if isinstance(file_path, dict):
# 单个文件配置字典
files_to_process = file_path
elif (
isinstance(file_path, list)
and len(file_path) > 0
and isinstance(file_path[0], dict)
):
# 文件配置字典列表
files_to_process = file_path
elif isinstance(file_path, str):
# 检查是否为 JSON 字符串格式的多文件配置
if file_path.strip().startswith("[") and file_path.strip().endswith("]"):
try:
# 尝试解析 JSON 字符串为文件配置列表
file_configs = json.loads(file_path)
if not isinstance(file_configs, list):
raise ValueError("JSON 文件配置必须是列表格式")
files_to_process = file_configs
except json.JSONDecodeError:
# 不是有效的 JSON,继续使用字符串解析方式
pass
if files_to_process is None:
# 解析普通文件路径列表
file_paths = parse_list_input(file_path)
if not file_paths:
raise ValueError("未提供有效的文件路径")
if len(file_paths) == 1:
# 单个文件处理
files_to_process = {
"path": file_paths[0],
"is_ocr": enable_ocr,
}
else:
# 多个文件路径,转换为文件配置列表
files_to_process = []
for i, path in enumerate(file_paths):
files_to_process.append(
{
"path": path,
"is_ocr": enable_ocr,
}
)
# 使用submit_file_task处理文件
try:
result_path = await get_client().process_file_to_markdown(
lambda files, o: get_client().submit_file_task(
files,
o,
language=language,
page_ranges=page_ranges,
),
files_to_process,
enable_ocr,
output_dir,
)
return {"status": "success", "result_path": result_path}
except Exception as e:
return {
"status": "error",
"error": str(e),
"params": {
"file_path": file_path,
"enable_ocr": enable_ocr,
"language": language,
},
}
async def local_parse_file(
file_path: str,
parse_method: str = "auto",
) -> Dict[str, Any]:
"""
根据环境变量设置使用本地或远程API解析文件。
返回:
成功: {"status": "success", "result": 处理结果} 或 {"status": "success", "result_path": "输出目录路径"}
失败: {"status": "error", "error": "错误信息"}
"""
file_path = Path(file_path)
# 检查文件是否存在
if not file_path.exists():
return {"status": "error", "error": f"文件不存在: {file_path}"}
try:
# 根据环境变量决定使用本地API还是远程API
if config.USE_LOCAL_API:
config.logger.debug(f"使用本地API: {config.LOCAL_MINERU_API_BASE}")
return await _parse_file_local(
file_path=str(file_path),
parse_method=parse_method,
)
else:
return {"status": "error", "error": "远程API未配置"}
except Exception as e:
config.logger.error(f"解析文件时出错: {str(e)}")
return {"status": "error", "error": str(e)}
async def read_converted_file(
file_path: str,
) -> Dict[str, Any]:
"""
读取解析后的文件内容。主要支持Markdown和其他文本文件格式。
返回:
成功: {"status": "success", "content": "文件内容"}
失败: {"status": "error", "error": "错误信息"}
"""
try:
target_file = Path(file_path)
parent_dir = target_file.parent
suffix = target_file.suffix.lower()
# 支持的文本文件格式
text_extensions = [".md", ".txt", ".json", ".html", ".tex", ".latex"]
if suffix not in text_extensions:
return {
"status": "error",
"error": f"不支持的文件格式: {suffix}。目前仅支持以下格式: {', '.join(text_extensions)}",
}
if not target_file.exists():
if not parent_dir.exists():
return {"status": "error", "error": f"目录 {parent_dir} 不存在"}
# 递归搜索所有子目录下的同后缀文件
similar_files_paths = [
str(f) for f in parent_dir.rglob(f"*{suffix}") if f.is_file()
]
if similar_files_paths:
if len(similar_files_paths) == 1:
# 如果只找到一个文件,直接读取并返回内容
alternative_file = similar_files_paths[0]
try:
with open(alternative_file, "r", encoding="utf-8") as f:
content = f.read()
return {
"status": "success",
"content": content,
"message": f"未找到文件 {target_file.name},但找到了 {Path(alternative_file).name},已返回其内容",
}
except Exception as e:
return {
"status": "error",
"error": f"尝试读取替代文件时出错: {str(e)}",
}
else:
# 如果找到多个文件,提供建议列表
suggestion = f"你是否在找: {', '.join(similar_files_paths)}?"
return {
"status": "error",
"error": f"文件 {target_file.name} 不存在。在 {parent_dir} 及其子目录下找到以下同类型文件。{suggestion}",
}
else:
return {
"status": "error",
"error": f"文件 {target_file.name} 不存在,且在目录 {parent_dir} 及其子目录下未找到其他 {suffix} 文件。",
}
# 以文本模式读取
with open(target_file, "r", encoding="utf-8") as f:
content = f.read()
return {"status": "success", "content": content}
except Exception as e:
config.logger.error(f"读取文件时出错: {str(e)}")
return {"status": "error", "error": str(e)}
async def find_and_read_markdown_content(result_path: str) -> Dict[str, Any]:
"""
在给定的路径中寻找并读取Markdown文件内容。
查找所有可能的文件位置,返回所有找到的有效内容。
Args:
result_path: 结果目录路径
Returns:
Dict[str, Any]: 包含所有文件内容或错误信息的字典
"""
if not result_path:
return {"status": "warning", "message": "未提供有效的结果路径"}
base_path = Path(result_path)
if not base_path.exists():
return {"status": "warning", "message": f"结果路径不存在: {result_path}"}
# 使用集合来存储文件路径,确保唯一性
unique_files = set()
# 添加常见文件名
common_files = [
base_path / "full.md",
base_path / "full.txt",
base_path / "output.md",
base_path / "result.md",
]
for f in common_files:
if f.exists():
unique_files.add(str(f))
# 添加子目录中的常见文件名
for subdir in base_path.iterdir():
if subdir.is_dir():
subdir_files = [
subdir / "full.md",
subdir / "full.txt",
subdir / "output.md",
subdir / "result.md",
]
for f in subdir_files:
if f.exists():
unique_files.add(str(f))
# 查找所有的.md和.txt文件
for md_file in base_path.glob("**/*.md"):
unique_files.add(str(md_file))
for txt_file in base_path.glob("**/*.txt"):
unique_files.add(str(txt_file))
# 将集合转换回Path对象列表
possible_files = [Path(f) for f in unique_files]
config.logger.debug(f"找到 {len(possible_files)} 个可能的文件")
# 收集所有找到的有效文件内容
found_contents = []
# 尝试读取每个可能的文件
for file_path in possible_files:
if file_path.exists():
result = await read_converted_file(str(file_path))
if result["status"] == "success":
config.logger.debug(f"成功读取文件内容: {file_path}")
found_contents.append(
{"file_path": str(file_path), "content": result["content"]}
)
# 如果找到了文件内容
if found_contents:
config.logger.debug(f"在结果目录中找到了 {len(found_contents)} 个可读取的文件")
# 如果只找到一个文件,保持向后兼容的返回格式
if len(found_contents) == 1:
return {
"status": "success",
"content": found_contents[0]["content"],
"file_path": found_contents[0]["file_path"],
}
# 如果找到多个文件,返回内容列表
else:
return {"status": "success", "contents": found_contents}
# 如果没有找到任何有效的文件
return {
"status": "warning",
"message": f"无法在结果目录中找到可读取的Markdown文件: {result_path}",
}
async def _process_conversion_result(
result: Dict[str, Any], source: str, is_url: bool = False
) -> Dict[str, Any]:
"""
处理转换结果,统一格式化输出。
Args:
result: 转换函数返回的结果
source: 源文件路径或URL
is_url: 是否为URL
Returns:
格式化后的结果字典
"""
filename = source.split("/")[-1]
if is_url and "?" in filename:
filename = filename.split("?")[0]
elif not is_url:
filename = Path(source).name
base_result = {
"filename": filename,
"source_url" if is_url else "source_path": source,
}
if result["status"] == "success":
# 获取result_path,可能是字符串或字典
result_path = result.get("result_path")
# 记录调试信息
config.logger.debug(f"处理结果 result_path 类型: {type(result_path)}")
if result_path:
# 情况1: result_path是字典且包含results字段(批量处理结果)
if isinstance(result_path, dict) and "results" in result_path:
config.logger.debug("检测到批量处理结果格式")
# 查找与当前源文件匹配的结果
for item in result_path.get("results", []):
if item.get("filename") == filename or (
not is_url and Path(source).name == item.get("filename")
):
# 直接返回匹配项的状态,无论是success还是error
if item.get("status") == "success" and "content" in item:
base_result.update(
{
"status": "success",
"content": item.get("content", ""),
}
)
# 如果有extract_path,也添加进去
if "extract_path" in item:
base_result["extract_path"] = item["extract_path"]
return base_result
elif item.get("status") == "error":
# 处理失败的文件,直接返回error状态
base_result.update(
{
"status": "error",
"error_message": item.get(
"error_message", "文件处理失败"
),
}
)
return base_result
# 如果没有找到匹配的结果,但有extract_dir,尝试从那里读取
if "extract_dir" in result_path:
config.logger.debug(
f"尝试从extract_dir读取: {result_path['extract_dir']}"
)
try:
content_result = await find_and_read_markdown_content(
result_path["extract_dir"]
)
if content_result.get("status") == "success":
base_result.update(
{
"status": "success",
"content": content_result.get("content", ""),
"extract_path": result_path["extract_dir"],
}
)
return base_result
except Exception as e:
config.logger.error(f"从extract_dir读取内容时出错: {str(e)}")
# 如果上述方法都失败,返回错误
base_result.update(
{
"status": "error",
"error_message": "未能在批量处理结果中找到匹配的内容",
}
)
# 情况2: result_path是字符串(传统格式)
elif isinstance(result_path, str):
config.logger.debug(f"处理传统格式结果路径: {result_path}")
content_result = await find_and_read_markdown_content(result_path)
if content_result.get("status") == "success":
base_result.update(
{
"status": "success",
"content": content_result.get("content", ""),
"extract_path": result_path,
}
)
else:
base_result.update(
{
"status": "error",
"error_message": f"无法读取转换结果: {content_result.get('message', '')}",
}
)
# 情况3: result_path是其他类型的字典(尝试处理)
elif isinstance(result_path, dict):
config.logger.debug(f"处理其他字典格式: {result_path}")
# 尝试从字典中提取可能的路径
extract_path = (
result_path.get("extract_dir")
or result_path.get("path")
or result_path.get("dir")
)
if extract_path and isinstance(extract_path, str):
try:
content_result = await find_and_read_markdown_content(
extract_path
)
if content_result.get("status") == "success":
base_result.update(
{
"status": "success",
"content": content_result.get("content", ""),
"extract_path": extract_path,
}
)
return base_result
except Exception as e:
config.logger.error(f"从extract_path读取内容时出错: {str(e)}")
# 如果没有找到有效路径,返回错误
base_result.update(
{"status": "error", "error_message": "转换结果格式无法识别"}
)
else:
# 情况4: result_path是其他类型(错误)
base_result.update(
{
"status": "error",
"error_message": f"无法识别的result_path类型: {type(result_path)}",
}
)
else:
base_result.update(
{"status": "error", "error_message": "转换成功但未返回结果路径"}
)
else:
base_result.update(
{"status": "error", "error_message": result.get("error", "未知错误")}
)
return base_result
@mcp.tool()
async def parse_documents(
file_sources: Annotated[
str,
Field(
description="""文件路径或URL,支持以下格式:
- 单个路径或URL: "/path/to/file.pdf" 或 "https://example.com/document.pdf"
- 多个路径或URL(逗号分隔): "/path/to/file1.pdf, /path/to/file2.pdf" 或
"https://example.com/doc1.pdf, https://example.com/doc2.pdf"
- 混合路径和URL: "/path/to/file.pdf, https://example.com/document.pdf"
(支持pdf、ppt、pptx、doc、docx以及图片格式jpg、jpeg、png)"""
),
],
# 通用参数
enable_ocr: Annotated[bool, Field(description="启用OCR识别,默认False")] = False,
language: Annotated[
str, Field(description='文档语言,默认"ch"中文,可选"en"英文等')
] = "ch",
# 远程API参数
page_ranges: Annotated[
str | None,
Field(
description='指定页码范围,格式为逗号分隔的字符串。例如:"2,4-6":表示选取第2页、第4页至第6页;"2--2":表示从第2页一直选取到倒数第二页。(远程API),默认None'
),
] = None,
) -> Dict[str, Any]:
"""
统一接口,将文件转换为Markdown格式。支持本地文件和URL,会根据USE_LOCAL_API配置自动选择合适的处理方式。
当USE_LOCAL_API=true时:
- 会过滤掉http/https开头的URL路径
- 对本地文件使用本地API进行解析
当USE_LOCAL_API=false时:
- 将http/https开头的路径使用convert_file_url处理
- 将其他路径使用convert_file_path处理
处理完成后,会自动尝试读取转换后的文件内容并返回。
返回:
成功: {"status": "success", "content": "文件内容"} 或 {"status": "success", "results": [处理结果列表]}
失败: {"status": "error", "error": "错误信息"}
"""
# 解析路径列表
sources = parse_list_input(file_sources)
if not sources:
return {"status": "error", "error": "未提供有效的文件路径或URL"}
# 去重处理,使用字典来保持原始顺序
sources = list(dict.fromkeys(sources))
config.logger.debug(f"去重后的文件路径: {sources}")
# 记录去重信息
original_count = len(parse_list_input(file_sources))
unique_count = len(sources)
if original_count > unique_count:
config.logger.debug(
f"检测到重复路径,已自动去重: {original_count} -> {unique_count}"
)
# 将路径分类
url_paths = []
file_paths = []
for source in sources:
if source.lower().startswith(("http://", "https://")):
url_paths.append(source)
else:
file_paths.append(source)
results = []
# 根据USE_LOCAL_API决定处理方式
if config.USE_LOCAL_API:
# 在本地API模式下,只处理本地文件路径
if not file_paths:
return {
"status": "warning",
"message": "在本地API模式下,无法处理URL,且未提供有效的本地文件路径",
}
config.logger.info(f"使用本地API处理 {len(file_paths)} 个文件")
# 逐个处理本地文件
for path in file_paths:
try:
# 跳过不存在的文件
if not Path(path).exists():
results.append(
{
"filename": Path(path).name,
"source_path": path,
"status": "error",
"error_message": f"文件不存在: {path}",
}
)
continue
result = await local_parse_file(
file_path=path,
parse_method=(
"ocr" if enable_ocr else "txt"
), # 如果启用OCR,使用ocr,否则使用txt
)
# 添加文件名信息
result_with_filename = {
"filename": Path(path).name,
"source_path": path,
**result,
}
results.append(result_with_filename)
except Exception as e:
# 处理文件时出现异常,记录错误但继续处理下一个文件
config.logger.error(f"处理文件 {path} 时出现错误: {str(e)}")
results.append(
{
"filename": Path(path).name,
"source_path": path,
"status": "error",
"error_message": f"处理文件时出现异常: {str(e)}",
}
)
else:
# 在远程API模式下,分别处理URL和本地文件路径
if url_paths:
config.logger.info(f"使用远程API处理 {len(url_paths)} 个文件URL")
try:
# 调用convert_file_url处理URLs
url_result = await convert_file_url(
url=",".join(url_paths),
enable_ocr=enable_ocr,
language=language,
page_ranges=page_ranges,
)
if url_result["status"] == "success":
# 为每个URL生成对应的结果
for url in url_paths:
result_item = await _process_conversion_result(
url_result, url, is_url=True
)
results.append(result_item)
else:
# 转换失败,为所有URL添加错误结果
for url in url_paths:
results.append(
{
"filename": url.split("/")[-1].split("?")[0],
"source_url": url,
"status": "error",
"error_message": url_result.get("error", "URL处理失败"),
}
)
except Exception as e:
config.logger.error(f"处理URL时出现错误: {str(e)}")
for url in url_paths:
results.append(
{
"filename": url.split("/")[-1].split("?")[0],
"source_url": url,
"status": "error",
"error_message": f"处理URL时出现异常: {str(e)}",
}
)
if file_paths:
config.logger.info(f"使用远程API处理 {len(file_paths)} 个本地文件")
# 过滤出存在的文件
existing_files = []
for file_path in file_paths:
if not Path(file_path).exists():
results.append(
{
"filename": Path(file_path).name,
"source_path": file_path,
"status": "error",
"error_message": f"文件不存在: {file_path}",
}
)
else:
existing_files.append(file_path)
if existing_files:
try:
# 调用convert_file_path处理本地文件
file_result = await convert_file_path(
file_path=",".join(existing_files),
enable_ocr=enable_ocr,
language=language,
page_ranges=page_ranges,
)
config.logger.debug(f"file_result: {file_result}")
if file_result["status"] == "success":
# 为每个文件生成对应的结果
for file_path in existing_files:
result_item = await _process_conversion_result(
file_result, file_path, is_url=False
)
results.append(result_item)
else:
# 转换失败,为所有文件添加错误结果
for file_path in existing_files:
results.append(
{
"filename": Path(file_path).name,
"source_path": file_path,
"status": "error",
"error_message": file_result.get(
"error", "文件处理失败"
),
}
)
except Exception as e:
config.logger.error(f"处理本地文件时出现错误: {str(e)}")
for file_path in existing_files:
results.append(
{
"filename": Path(file_path).name,
"source_path": file_path,
"status": "error",
"error_message": f"处理文件时出现异常: {str(e)}",
}
)
# 处理结果为空的情况
if not results:
return {"status": "error", "error": "未处理任何文件"}
# 计算成功和失败的统计信息
success_count = len([r for r in results if r.get("status") == "success"])
error_count = len([r for r in results if r.get("status") == "error"])
total_count = len(results)
# 只有一个结果时,直接返回该结果(保持向后兼容)
if len(results) == 1:
result = results[0].copy()
# 为了向后兼容,移除新增的字段
if "filename" in result:
del result["filename"]
if "source_path" in result:
del result["source_path"]
if "source_url" in result:
del result["source_url"]
return result
# 多个结果时,返回详细的结果列表
# 根据成功/失败情况决定整体状态
overall_status = "success"
if success_count == 0:
# 所有文件都失败
overall_status = "error"
elif error_count > 0:
# 有部分文件失败,但不是全部
overall_status = "partial_success"
return {
"status": overall_status,
"results": results,
"summary": {
"total_files": total_count,
"success_count": success_count,
"error_count": error_count,
},
}
@mcp.tool()
async def get_ocr_languages() -> Dict[str, Any]:
"""
获取 OCR 支持的语言列表。
Returns:
Dict[str, Any]: 包含所有支持的OCR语言列表的字典
"""
try:
# 从language模块获取语言列表
languages = get_language_list()
return {"status": "success", "languages": languages}
except Exception as e:
return {"status": "error", "error": str(e)}
async def _parse_file_local(
file_path: str,
parse_method: str = "auto",
) -> Dict[str, Any]:
"""
使用本地API解析文件。
Args:
file_path: 要解析的文件路径
parse_method: 解析方法
output_dir: 输出目录
Returns:
Dict[str, Any]: 包含解析结果的字典
"""
# API URL路径
api_url = f"{config.LOCAL_MINERU_API_BASE}/file_parse"
# 使用Path对象确保文件路径正确
file_path_obj = Path(file_path)
if not file_path_obj.exists():
raise FileNotFoundError(f"文件不存在: {file_path}")
# 读取文件二进制数据
with open(file_path_obj, "rb") as f:
file_data = f.read()
# 准备用于上传文件的表单数据
file_type = file_path_obj.suffix.lower()
form_data = aiohttp.FormData()
form_data.add_field(
"file", file_data, filename=file_path_obj.name, content_type=file_type
)
form_data.add_field("parse_method", parse_method)
config.logger.debug(f"发送本地API请求到: {api_url}")
config.logger.debug(f"上传文件: {file_path_obj.name} (大小: {len(file_data)} 字节)")
# 发送请求
try:
async with aiohttp.ClientSession() as session:
async with session.post(api_url, data=form_data) as response:
if response.status != 200:
error_text = await response.text()
config.logger.error(
f"API返回错误状态码: {response.status}, 错误信息: {error_text}"
)
raise RuntimeError(f"API返回错误: {response.status}, {error_text}")
result = await response.json()
config.logger.debug(f"本地API响应: {result}")
# 处理响应
if "error" in result:
return {"status": "error", "error": result["error"]}
return {"status": "success", "result": result}
except aiohttp.ClientError as e:
error_msg = f"与本地API通信时出错: {str(e)}"
config.logger.error(error_msg)
raise RuntimeError(error_msg)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment