Unverified Commit f67c723f authored by Xiaomeng Zhao's avatar Xiaomeng Zhao Committed by GitHub
Browse files

Merge pull request #2727 from yuanjua/multi_gpu_v2

Example: update multi gpu example based on v2.0
parents c6881d83 d3f6736e
...@@ -2,5 +2,8 @@ ...@@ -2,5 +2,8 @@
## Project List ## Project List
- Projects compatible with version 2.0:
- [multi_gpu_v2](./multi_gpu_v2/README.md): Multi-GPU parallel processing based on LitServe
- Projects not yet compatible with version 2.0: - Projects not yet compatible with version 2.0:
- [mcp](./mcp/README.md): MCP server based on the official API - [mcp](./mcp/README.md): MCP server based on the official API
...@@ -2,5 +2,8 @@ ...@@ -2,5 +2,8 @@
## 项目列表 ## 项目列表
- 已兼容2.0版本的项目列表
- [multi_gpu_v2](./multi_gpu_v2/README_zh.md): 基于 LitServe 的多 GPU 并行处理
- 未兼容2.0版本的项目列表 - 未兼容2.0版本的项目列表
- [mcp](./mcp/README.md): 基于官方api的mcp server - [mcp](./mcp/README.md): 基于官方api的mcp server
# MinerU v2.0 Multi-GPU Server
[简体中文](README_zh.md)
A streamlined multi-GPU server implementation.
## Quick Start
### 1. install MinerU
```bash
pip install --upgrade pip
pip install uv
uv pip install -U "mineru[core]"
uv pip install litserve aiohttp loguru
```
### 2. Start the Server
```bash
python server.py
```
### 3. Start the Client
```bash
python client.py
```
Now, pdf files under folder [demo](../../demo/) will be processed in parallel. Assuming you have 2 gpus, if you change the `workers_per_device` to `2`, 4 pdf files will be processed at the same time!
## Customize
### Server
Example showing how to start the server with custom settings:
```python
server = ls.LitServer(
MinerUAPI(output_dir='/tmp/mineru_output'),
accelerator='auto', # You can specify 'cuda'
devices='auto', # "auto" uses all available GPUs
workers_per_device=1, # One worker instance per GPU
timeout=False # Disable timeout for long processing
)
server.run(port=8000, generate_client_file=False)
```
### Client
The client supports both synchronous and asynchronous processing:
```python
import asyncio
import aiohttp
from client import mineru_parse_async
async def process_documents():
async with aiohttp.ClientSession() as session:
# Basic usage
result = await mineru_parse_async(session, 'document.pdf')
# With custom options
result = await mineru_parse_async(
session,
'document.pdf',
backend='pipeline',
lang='ch',
formula_enable=True,
table_enable=True
)
# Run async processing
asyncio.run(process_documents())
```
### Concurrent Processing
Process multiple files simultaneously:
```python
async def process_multiple_files():
files = ['doc1.pdf', 'doc2.pdf', 'doc3.pdf']
async with aiohttp.ClientSession() as session:
tasks = [mineru_parse_async(session, file) for file in files]
results = await asyncio.gather(*tasks)
return results
```
# MinerU v2.0 多GPU服务器
[English](README.md)
这是一个精简的多GPU服务器实现。
## 快速开始
### 1. 安装 MinerU
```bash
pip install --upgrade pip
pip install uv
uv pip install -U "mineru[core]"
uv pip install litserve aiohttp loguru
```
### 2. 启动服务器
```bash
python server.py
```
### 3. 启动客户端
```bash
python client.py
```
现在,`[demo](../../demo/)` 文件夹下的PDF文件将并行处理。假设您有2个GPU,如果您将 `workers_per_device` 更改为 `2`,则可以同时处理4个PDF文件!
## 自定义
### 服务器
以下示例展示了如何启动带有自定义设置的服务器:
```python
server = ls.LitServer(
MinerUAPI(output_dir='/tmp/mineru_output'), # 自定义输出文件夹
accelerator='auto', # 您可以指定 'cuda'
devices='auto', # "auto" 使用所有可用的GPU
workers_per_device=1, # 每个GPU启动一个工作实例
timeout=False # 禁用超时,用于长时间处理
)
server.run(port=8000, generate_client_file=False)
```
### 客户端
客户端支持同步和异步处理:
```python
import asyncio
import aiohttp
from client import mineru_parse_async
async def process_documents():
async with aiohttp.ClientSession() as session:
# 基本用法
result = await mineru_parse_async(session, 'document.pdf')
# 带自定义选项
result = await mineru_parse_async(
session,
'document.pdf',
backend='pipeline',
lang='ch',
formula_enable=True,
table_enable=True
)
# 运行异步处理
asyncio.run(process_documents())
```
### 并行处理
同时处理多个文件:
```python
async def process_multiple_files():
files = ['doc1.pdf', 'doc2.pdf', 'doc3.pdf']
async with aiohttp.ClientSession() as session:
tasks = [mineru_parse_async(session, file) for file in files]
results = await asyncio.gather(*tasks)
return results
```
\ No newline at end of file
import requests
import os
import logging
logging.basicConfig(level=logging.INFO)
# test connection to huggingface
TIMEOUT = 3
def config_endpoint():
"""
Checks for connectivity to Hugging Face and sets the model source accordingly.
If the Hugging Face endpoint is reachable, it sets MINERU_MODEL_SOURCE to 'huggingface'.
Otherwise, it falls back to 'modelscope'.
"""
os.environ.setdefault('MINERU_MODEL_SOURCE', 'huggingface')
model_list_url = f"https://huggingface.co/models"
modelscope_url = f"https://modelscope.cn/models"
# Use a specific check for the Hugging Face source
if os.environ['MINERU_MODEL_SOURCE'] == 'huggingface':
try:
response = requests.head(model_list_url, timeout=TIMEOUT)
# Check for any successful status code (2xx)
if response.ok:
logging.info(f"Successfully connected to Hugging Face. Using 'huggingface' as model source.")
return True
else:
logging.warning(f"Hugging Face endpoint returned a non-200 status code: {response.status_code}")
except requests.exceptions.RequestException as e:
logging.error(f"Failed to connect to Hugging Face at {model_list_url}: {e}")
# If any of the above checks fail, switch to modelscope
logging.info("Falling back to 'modelscope' as model source.")
os.environ['MINERU_MODEL_SOURCE'] = 'modelscope'
elif os.environ['MINERU_MODEL_SOURCE'] == 'modelscope':
try:
response = requests.head(modelscope_url, timeout=TIMEOUT)
if response.ok:
logging.info(f"Successfully connected to ModelScope. Using 'modelscope' as model source.")
return True
except requests.exceptions.RequestException as e:
logging.error(f"Failed to connect to ModelScope at {model_list_url}: {e}")
elif os.environ['MINERU_MODEL_SOURCE'] == 'local':
logging.info("Using 'local' as model source.")
return True
else:
logging.error(f"Using custom model source: {os.environ['MINERU_MODEL_SOURCE']}")
return True
return False
if __name__ == '__main__':
print(config_endpoint())
import base64
import requests
import os
from loguru import logger
import asyncio
import aiohttp
async def mineru_parse_async(session, file_path, server_url='http://127.0.0.1:8000/predict', **options):
"""
Asynchronous version of the parse function.
"""
try:
# Asynchronously read and encode the file
with open(file_path, 'rb') as f:
file_b64 = base64.b64encode(f.read()).decode('utf-8')
payload = {
'file': file_b64,
'options': options
}
# Use the aiohttp session to send the request
async with session.post(server_url, json=payload) as response:
if response.status == 200:
result = await response.json()
logger.info(f"✅ Processed: {file_path} -> {result.get('output_dir', 'N/A')}")
return result
else:
error_text = await response.text()
logger.error(f"❌ Server error for {file_path}: {error_text}")
return {'error': error_text}
except Exception as e:
logger.error(f"❌ Failed to process {file_path}: {e}")
return {'error': str(e)}
async def main():
"""
Main function to run all parsing tasks concurrently.
"""
test_files = [
'../../demo/pdfs/demo1.pdf',
'../../demo/pdfs/demo2.pdf',
'../../demo/pdfs/demo3.pdf',
'../../demo/pdfs/small_ocr.pdf',
]
test_files = [os.path.join(os.path.dirname(__file__), f) for f in test_files]
existing_files = [f for f in test_files if os.path.exists(f)]
if not existing_files:
logger.warning("No test files found.")
return
# Create an aiohttp session to be reused across requests
async with aiohttp.ClientSession() as session:
# === Basic Processing ===
basic_tasks = [mineru_parse_async(session, file_path) for file_path in existing_files[:2]]
# === Custom Options ===
custom_options = {
'backend': 'pipeline', 'lang': 'ch', 'method': 'auto',
'formula_enable': True, 'table_enable': True
}
# 'backend': 'sglang-engine' requires 24+ GB VRAM per worker
custom_tasks = [mineru_parse_async(session, file_path, **custom_options) for file_path in existing_files[2:]]
# Start all tasks
all_tasks = basic_tasks + custom_tasks
all_results = await asyncio.gather(*all_tasks)
logger.info(f"All Results: {all_results}")
logger.info("🎉 All processing completed!")
if __name__ == '__main__':
# Run the async main function
asyncio.run(main())
\ No newline at end of file
import os
import base64
import tempfile
from pathlib import Path
import litserve as ls
from fastapi import HTTPException
from loguru import logger
from mineru.cli.common import do_parse, read_fn
from mineru.utils.config_reader import get_device
from mineru.utils.model_utils import get_vram
from _config_endpoint import config_endpoint
class MinerUAPI(ls.LitAPI):
def __init__(self, output_dir='/tmp'):
super().__init__()
self.output_dir = output_dir
def setup(self, device):
"""Setup environment variables exactly like MinerU CLI does"""
logger.info(f"Setting up on device: {device}")
if os.getenv('MINERU_DEVICE_MODE', None) == None:
os.environ['MINERU_DEVICE_MODE'] = device if device != 'auto' else get_device()
device_mode = os.environ['MINERU_DEVICE_MODE']
if os.getenv('MINERU_VIRTUAL_VRAM_SIZE', None) == None:
if device_mode.startswith("cuda") or device_mode.startswith("npu"):
vram = round(get_vram(device_mode))
os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = str(vram)
else:
os.environ['MINERU_VIRTUAL_VRAM_SIZE'] = '1'
logger.info(f"MINERU_VIRTUAL_VRAM_SIZE: {os.environ['MINERU_VIRTUAL_VRAM_SIZE']}")
if os.getenv('MINERU_MODEL_SOURCE', None) in ['huggingface', None]:
config_endpoint()
logger.info(f"MINERU_MODEL_SOURCE: {os.environ['MINERU_MODEL_SOURCE']}")
def decode_request(self, request):
"""Decode file and options from request"""
file_b64 = request['file']
options = request.get('options', {})
file_bytes = base64.b64decode(file_b64)
with tempfile.NamedTemporaryFile(suffix='.pdf', delete=False) as temp:
temp.write(file_bytes)
temp_file = Path(temp.name)
return {
'input_path': str(temp_file),
'backend': options.get('backend', 'pipeline'),
'method': options.get('method', 'auto'),
'lang': options.get('lang', 'ch'),
'formula_enable': options.get('formula_enable', True),
'table_enable': options.get('table_enable', True),
'start_page_id': options.get('start_page_id', 0),
'end_page_id': options.get('end_page_id', None),
'server_url': options.get('server_url', None),
}
def predict(self, inputs):
"""Call MinerU's do_parse - same as CLI"""
input_path = inputs['input_path']
output_dir = Path(self.output_dir) / Path(input_path).stem
try:
os.makedirs(output_dir, exist_ok=True)
file_name = Path(input_path).stem
pdf_bytes = read_fn(Path(input_path))
do_parse(
output_dir=str(output_dir),
pdf_file_names=[file_name],
pdf_bytes_list=[pdf_bytes],
p_lang_list=[inputs['lang']],
backend=inputs['backend'],
parse_method=inputs['method'],
formula_enable=inputs['formula_enable'],
table_enable=inputs['table_enable'],
server_url=inputs['server_url'],
start_page_id=inputs['start_page_id'],
end_page_id=inputs['end_page_id']
)
return str(output_dir)
except Exception as e:
logger.error(f"Processing failed: {e}")
raise HTTPException(status_code=500, detail=str(e))
finally:
# Cleanup temp file
if Path(input_path).exists():
Path(input_path).unlink()
def encode_response(self, response):
return {'output_dir': response}
if __name__ == '__main__':
server = ls.LitServer(
MinerUAPI(output_dir='/tmp/mineru_output'),
accelerator='auto',
devices='auto',
workers_per_device=1,
timeout=False
)
logger.info("Starting MinerU server on port 8000")
server.run(port=8000, generate_client_file=False)
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment