client.py 2.59 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
import base64
import requests
import os
from loguru import logger
import asyncio
import aiohttp

async def mineru_parse_async(session, file_path, server_url='http://127.0.0.1:8000/predict', **options):
    """
    Asynchronous version of the parse function.
    """
    try:
        # Asynchronously read and encode the file
        with open(file_path, 'rb') as f:
            file_b64 = base64.b64encode(f.read()).decode('utf-8')

        payload = {
            'file': file_b64,
            'options': options
        }

        # Use the aiohttp session to send the request
        async with session.post(server_url, json=payload) as response:
            if response.status == 200:
                result = await response.json()
                logger.info(f"✅ Processed: {file_path} -> {result.get('output_dir', 'N/A')}")
                return result
            else:
                error_text = await response.text()
                logger.error(f"❌ Server error for {file_path}: {error_text}")
                return {'error': error_text}

    except Exception as e:
        logger.error(f"❌ Failed to process {file_path}: {e}")
        return {'error': str(e)}


async def main():
    """
    Main function to run all parsing tasks concurrently.
    """
    test_files = [
        '../../demo/pdfs/demo1.pdf',
        '../../demo/pdfs/demo2.pdf',
        '../../demo/pdfs/demo3.pdf',
        '../../demo/pdfs/small_ocr.pdf',
    ]

    test_files = [os.path.join(os.path.dirname(__file__), f) for f in test_files]
    
    existing_files = [f for f in test_files if os.path.exists(f)]
    if not existing_files:
        logger.warning("No test files found.")
        return

    # Create an aiohttp session to be reused across requests
    async with aiohttp.ClientSession() as session:
        # === Basic Processing ===
        basic_tasks = [mineru_parse_async(session, file_path) for file_path in existing_files[:2]]

        # === Custom Options ===
        custom_options = {
            'backend': 'pipeline', 'lang': 'ch', 'method': 'auto',
            'formula_enable': True, 'table_enable': True
        }
        # 'backend': 'sglang-engine' requires 24+ GB VRAM per worker

        custom_tasks = [mineru_parse_async(session, file_path, **custom_options) for file_path in existing_files[2:]]

        # Start all tasks
        all_tasks = basic_tasks + custom_tasks

        all_results = await asyncio.gather(*all_tasks)

        logger.info(f"All Results: {all_results}")

        
    logger.info("🎉 All processing completed!")

if __name__ == '__main__':
    # Run the async main function
    asyncio.run(main())