deepseek_ocr_server.py 15.1 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
DeepSeek OCR API Server (vLLM) - 极简版 + 优化版
"""
import os
import io
import re
import argparse
import asyncio
from io import BytesIO
from typing import List
from concurrent.futures import ThreadPoolExecutor

import torch
from PIL import Image

try:
    import fitz
except Exception:
    fitz = None

from fastapi import FastAPI, File, UploadFile, HTTPException, Form
from fastapi.responses import JSONResponse
from fastapi.middleware.cors import CORSMiddleware
import uvicorn

from vllm import LLM, SamplingParams
from vllm.model_executor.models.registry import ModelRegistry
from deepseek_ocr import DeepseekOCRForCausalLM
from process.ngram_norepeat import NoRepeatNGramLogitsProcessor
from process.image_process import DeepseekOCRProcessor


app = FastAPI(title="DeepSeek OCR API (vLLM) - Optimized", version="2.0.0")
app.add_middleware(
    CORSMiddleware,
    allow_origins=["*"],
    allow_credentials=True,
    allow_methods=["*"],
    allow_headers=["*"],
)


llm = None

cpu_executor = ThreadPoolExecutor(max_workers=8, thread_name_prefix="CPU-Worker")
gpu_executor = ThreadPoolExecutor(max_workers=1, thread_name_prefix="GPU-Worker")

vllm_lock = asyncio.Lock()

PROMPT_OCR = "<image>\n<|grounding|>Convert the document to markdown."
PROMPT_DESC = "<image>\nDescribe this image in detail."

# -----------------------
# Monkey Patch
# -----------------------
_original_tokenize = DeepseekOCRProcessor.tokenize_with_images

def _patched_tokenize(self, images, bos=True, eos=True, cropping=True, prompt=None):
    if prompt is not None:
        import config
        old = config.PROMPT
        config.PROMPT = prompt
        try:
            return _original_tokenize(self, images, bos, eos, cropping)
        finally:
            config.PROMPT = old
    return _original_tokenize(self, images, bos, eos, cropping)

DeepseekOCRProcessor.tokenize_with_images = _patched_tokenize


def pdf_to_images_sync(pdf_bytes: bytes, dpi: int = 144) -> List[Image.Image]:
    """PDF 转图片 """
    if fitz is None:
        raise RuntimeError("Please install PyMuPDF")
    
    images = []
    doc = fitz.open(stream=pdf_bytes, filetype="pdf")
    matrix = fitz.Matrix(dpi / 72.0, dpi / 72.0)
    
    for page in doc:
        pix = page.get_pixmap(matrix=matrix, alpha=False)
        img = Image.open(io.BytesIO(pix.tobytes("png")))
        
        if img.mode != "RGB":
            if img.mode in ("RGBA", "LA"):
                bg = Image.new("RGB", img.size, (255, 255, 255))
                bg.paste(img, mask=img.split()[-1])
                img = bg
            else:
                img = img.convert("RGB")
        
        images.append(img)
    
    doc.close()
    return images


def image_open_sync(image_bytes: bytes) -> Image.Image:
    """打开图片 (同步版本)"""
    return Image.open(BytesIO(image_bytes)).convert("RGB")


def clear_vllm_cache_sync():
    """清理 vLLM 缓存 (同步版本)"""
    if llm is None:
        return
    try:
        if hasattr(llm.llm_engine, 'input_preprocessor'):
            prep = llm.llm_engine.input_preprocessor
            if hasattr(prep, '_mm_processor_cache'):
                prep._mm_processor_cache.clear()
    except:
        pass


def tokenize_image_sync(image: Image.Image, prompt: str):
    """
    图像 tokenize (同步版本, CPU 密集)
    WARNING: 这是最大的优化点!
    """
    processor = DeepseekOCRProcessor()
    return processor.tokenize_with_images(images=[image], prompt=prompt)


def vllm_generate_sync(tokenized, prompt: str) -> str:
    """
    vLLM 推理 (同步版本, GPU 密集)
    注意: tokenized 已经在 CPU 线程池完成
    """
    batch_inputs = [{
        "prompt": prompt,
        "multi_modal_data": {"image": tokenized}
    }]
    
    if prompt == PROMPT_OCR:
        logits_proc = [NoRepeatNGramLogitsProcessor(20, 50, {128821, 128822})]
        params = SamplingParams(
            temperature=0.0,
            max_tokens=8192,
            skip_special_tokens=False,
            logits_processors=logits_proc,
            repetition_penalty=1.05,
        )
    else:
        params = SamplingParams(
            temperature=0.0,
            max_tokens=8192,
            skip_special_tokens=False,
        )
    
    outputs = llm.generate(batch_inputs, params)
    return outputs[0].outputs[0].text


def clean_markdown_sync(text: str) -> str:
    """清理 Markdown (同步版本)"""
    text = re.sub(r'<\|ref\|>.*?<\|/ref\|>', '', text)
    text = re.sub(r'<\|det\|>.*?<\|/det\|>', '', text)
    text = re.sub(r'<\|.*?\|>', '', text)
    text = re.sub(r'\[\[.*?\]\]', '', text)
    text = re.sub(r'={50,}.*?={50,}', '', text, flags=re.DOTALL)
    text = re.sub(r'\n{3,}', '\n\n', text)
    return text.strip()



async def pdf_to_images_async(pdf_bytes: bytes, dpi: int = 144) -> List[Image.Image]:
    """PDF 转图片 (异步)"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(cpu_executor, pdf_to_images_sync, pdf_bytes, dpi)


async def image_open_async(image_bytes: bytes) -> Image.Image:
    """打开图片 (异步)"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(cpu_executor, image_open_sync, image_bytes)


async def tokenize_image_async(image: Image.Image, prompt: str):
    """
    图像 tokenize (异步)
    NOTE: 关键优化: 在 CPU 线程池执行
    """
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(cpu_executor, tokenize_image_sync, image, prompt)


async def vllm_generate_async(image: Image.Image, prompt: str) -> str:
    """
    完整的 vLLM 推理流程 (异步)
    优化: 分离 tokenize (CPU) 和 generate (GPU)
    """
    # 步骤1: tokenize (CPU 密集, 在 CPU 线程池执行)
    tokenized = await tokenize_image_async(image, prompt)
    
    # 步骤2: GPU 推理 (GPU 密集, 在 GPU 线程池执行, 有锁保护)
    async with vllm_lock:
        # 清理缓存 (在 GPU 线程池执行)
        loop = asyncio.get_event_loop()
        await loop.run_in_executor(gpu_executor, clear_vllm_cache_sync)
        
        # GPU 推理
        result = await loop.run_in_executor(
            gpu_executor,
            vllm_generate_sync,
            tokenized,
            prompt
        )
        
        return result


async def clean_markdown_async(text: str) -> str:
    """清理 Markdown (异步)"""
    loop = asyncio.get_event_loop()
    return await loop.run_in_executor(cpu_executor, clean_markdown_sync, text)


async def generate_image_description_async(image: Image.Image) -> str:
    """生成图片描述 (异步)"""
    try:
        # GPU 推理
        result = await vllm_generate_async(image, PROMPT_DESC)
        
        # CPU 后处理
        loop = asyncio.get_event_loop()
        
        def process_desc(text):
            desc = re.sub(r'<\|ref\|>.*?<\|/ref\|>', '', text)
            desc = re.sub(r'<\|det\|>.*?<\|/det\|>', '', desc)
            desc = re.sub(r'<\|.*?\|>', '', desc)
            desc = re.sub(r'\[\[.*?\]\]', '', desc)
            desc = re.sub(r'\s+', ' ', desc).strip()
            
            if len(desc) > 200:
                cutoff = desc[:200].rfind('.')
                if cutoff > 100:
                    desc = desc[:cutoff + 1]
                else:
                    desc = desc[:200].rsplit(' ', 1)[0] + '...'
            
            return desc
        
        desc = await loop.run_in_executor(cpu_executor, process_desc, result)
        return desc
    
    except Exception as e:
        print(f"WARNING: 图片描述失败: {e}")
        return ""


# -----------------------
# 模型初始化
# -----------------------
def initialize_model(model_path: str, gpu_id: int):
    global llm
    
    ModelRegistry.register_model("DeepseekOCRForCausalLM", DeepseekOCRForCausalLM)
    
    os.environ["CUDA_VISIBLE_DEVICES"] = str(gpu_id)
    
    os.environ['VLLM_USE_V1'] = '0'
    
    print(f"[INFO] 加载模型: {model_path}")
    
    llm = LLM(
        model=model_path,
        hf_overrides={"architectures": ["DeepseekOCRForCausalLM"]},
        block_size=64,
        enforce_eager=False,
        trust_remote_code=True,
        max_model_len=8192,
        tensor_parallel_size=1,
        gpu_memory_utilization=0.9,
        max_num_seqs=20,
        disable_mm_preprocessor_cache=True,
    )
    
    print("[SUCCESS] 模型加载完成")
    #print(f"[INFO] 线程池配置:")
    #print(f"   - CPU 线程池: {cpu_executor._max_workers} 线程")
    #print(f"   - GPU 线程池: {gpu_executor._max_workers} 线程")


# -----------------------
# API 路由
# -----------------------
@app.get("/")
async def root():
    return {
        "service": "DeepSeek OCR (vLLM) - Optimized",
        "version": "2.0.0",
        "status": "running"
    }


@app.get("/health")
async def health():
    return {
        "status": "healthy",
        "model_ready": llm is not None,
        "cpu_workers": cpu_executor._max_workers,
        "gpu_workers": gpu_executor._max_workers,
    }


async def vllm_generate_batch_async(
    images: List[Image.Image], 
    prompt: str,
    show_progress: bool = True
) -> List[str]:
    """
    批量 vLLM 推理 - 真正的批处理优化
    
    Args:
        images: 图片列表
        prompt: 提示词
        show_progress: 是否显示进度
    
    Returns:
        生成的文本列表
    """
    total = len(images)
    
    # 步骤1: 并发 tokenize
    # 标准化图片 -> Vision Encoder (ViT) -> 图像特征向量 (例如:[196, 1024] - 196个位置,每个1024维)
    if show_progress:
        print(f"   [1/3] Tokenize {total} 页...")
    
    tokenize_tasks = [tokenize_image_async(img, prompt) for img in images]
    all_tokenized = await asyncio.gather(*tokenize_tasks)
    
    if show_progress:
        print(f"   [1/3] Tokenize 完成")
    
    # 步骤2: 构造批量输入
    batch_inputs = [
        {
            "prompt": prompt,
            "multi_modal_data": {"image": tok}
        }
        for tok in all_tokenized
    ]
    
    # 步骤3: 批量 GPU 推理
    async with vllm_lock:
        if show_progress:
            print(f"   [2/3] GPU 批量推理 {total} 页...")
        
        loop = asyncio.get_event_loop()
        
        # 清理缓存
        await loop.run_in_executor(gpu_executor, clear_vllm_cache_sync)
        
        # 批量推理
        def batch_generate():
            # 根据 prompt 类型选择参数
            if prompt == PROMPT_OCR:
                logits_proc = [NoRepeatNGramLogitsProcessor(20, 50, {128821, 128822})]
                params = SamplingParams(
                    temperature=0.0,
                    max_tokens=8192,
                    skip_special_tokens=False,
                    logits_processors=logits_proc,
                    repetition_penalty=1.05,
                )
            else:
                params = SamplingParams(
                    temperature=0.0,
                    max_tokens=8192,
                    skip_special_tokens=False,
                )
            
            # NOTE: 关键: 批量调用
            outputs = llm.generate(batch_inputs, params)
            return [out.outputs[0].text for out in outputs]
        
        results = await loop.run_in_executor(gpu_executor, batch_generate)
        
        if show_progress:
            print(f"   [2/3] GPU 推理完成")
        
        return results


@app.post("/ocr")
async def ocr(
    file: UploadFile = File(...),
    enable_description: bool = Form(False),
):
    """OCmR 接口 (批量处理)"""
    if llm is None:
        raise HTTPException(503, "模型未加载")
    
    import time
    start_time = time.time()
    
    try:
        # 1. 读取文件
        contents = await file.read()
        t1 = time.time()
        
        # 2. 解析文件
        if file.filename.lower().endswith('.pdf'):
            # 如果是PDF,则转换为图片列表
            images = await pdf_to_images_async(contents)
        else:
            # 如果是图片,则直接打开
            images = [await image_open_async(contents)]
        
        t2 = time.time()
  
        # 3. 批量 OCR
        raw_results = await vllm_generate_batch_async(images, PROMPT_OCR)
        t3 = time.time()
        print(f"   OCR 耗时: {t3 - t2:.2f}s")
        
        # 4. 后处理
        print(f"   [3/3] 后处理...")
        
        async def postprocess(idx: int, raw: str, img: Image.Image) -> str:
            # 图片描述
            if enable_description:
                img_pattern = r'<\|ref\|>image<\|/ref\|><\|det\|>\[\[.*?\]\]<\|/det\|>'
                matches = list(re.finditer(img_pattern, raw))
                
                for match in matches:
                    desc = await generate_image_description_async(img)
                    replacement = f"[图片: {desc}]" if desc else "[图片]"
                    raw = raw.replace(match.group(0), replacement)
            
            # 清理 Markdown
            cleaned = await clean_markdown_async(raw)
            return cleaned if cleaned else ""
        
        tasks = [postprocess(i, raw, img) for i, (raw, img) in enumerate(zip(raw_results, images))]
        md_parts = await asyncio.gather(*tasks)
        
        t4 = time.time()
        print(f"   [3/3] 后处理完成 ({t4 - t3:.2f}s)")
        
        # 5. 合并结果
        final_md = "\n\n".join([md for md in md_parts if md])
        
        total_time = time.time() - start_time
        print(f"{'='*60}")
        print(f"[SUCCESS] 全部完成")
        print(f"   总耗时: {total_time:.2f}s")
        print(f"   平均: {total_time / len(images):.2f}s/页")
        print(f"{'='*60}\n")
        
        return JSONResponse({
            "markdown": final_md,
            "page_count": len(images),
            "processing_time": round(total_time, 2),
        })
    
    except Exception as e:
        import traceback
        print(f"[ERROR] 处理失败: {e}")
        print(traceback.format_exc())
        raise HTTPException(500, f"处理失败: {e}")


# -----------------------
# 优雅关闭
# -----------------------
@app.on_event("shutdown")
async def shutdown_event():
    print("[INFO] 关闭线程池...")
    cpu_executor.shutdown(wait=True)
    gpu_executor.shutdown(wait=True)
    print("[SUCCESS] 线程池已关闭")


# -----------------------
# 启动
# -----------------------
def main():
    parser = argparse.ArgumentParser()
    parser.add_argument("--model-path", required=True, help="模型路径")
    parser.add_argument("--gpu-id", type=int, default=0, help="GPU ID")
    parser.add_argument("--port", type=int, default=8002, help="端口")
    parser.add_argument("--host", default="0.0.0.0", help="监听地址")
    parser.add_argument("--cpu-workers", type=int, default=2, help="CPU 线程池大小")
    
    args = parser.parse_args()
    
    # 更新线程池大小
    global cpu_executor
    cpu_executor = ThreadPoolExecutor(
        max_workers=args.cpu_workers,
        thread_name_prefix="CPU-Worker"
    )
    
    initialize_model(args.model_path, args.gpu_id)
    
    print(f"\n[INFO] 服务启动: http://{args.host}:{args.port}")
    print(f"[INFO] 接口文档: http://{args.host}:{args.port}/docs\n")
    
    uvicorn.run(app, host=args.host, port=args.port, workers=1)


if __name__ == "__main__":
    main()