batch_build_dataset.py 4.71 KB
Newer Older
icecraft's avatar
icecraft committed
1
import concurrent.futures
icecraft's avatar
icecraft committed
2
import glob
icecraft's avatar
icecraft committed
3
import os
icecraft's avatar
icecraft committed
4
import threading
icecraft's avatar
icecraft committed
5

icecraft's avatar
icecraft committed
6
import fitz
icecraft's avatar
icecraft committed
7

icecraft's avatar
icecraft committed
8
from magic_pdf.data.dataset import PymuDocDataset
icecraft's avatar
icecraft committed
9
10
from magic_pdf.data.utils import fitz_doc_to_image  # PyMuPDF

icecraft's avatar
icecraft committed
11
12

def partition_array_greedy(arr, k):
icecraft's avatar
icecraft committed
13
14
    """Partition an array into k parts using a simple greedy approach.

icecraft's avatar
icecraft committed
15
16
17
18
19
20
    Parameters:
    -----------
    arr : list
        The input array of integers
    k : int
        Number of partitions to create
icecraft's avatar
icecraft committed
21

icecraft's avatar
icecraft committed
22
23
24
25
26
27
28
    Returns:
    --------
    partitions : list of lists
        The k partitions of the array
    """
    # Handle edge cases
    if k <= 0:
icecraft's avatar
icecraft committed
29
        raise ValueError('k must be a positive integer')
icecraft's avatar
icecraft committed
30
31
32
33
34
35
    if k > len(arr):
        k = len(arr)  # Adjust k if it's too large
    if k == 1:
        return [list(range(len(arr)))]
    if k == len(arr):
        return [[i] for i in range(len(arr))]
icecraft's avatar
icecraft committed
36

icecraft's avatar
icecraft committed
37
38
    # Sort the array in descending order
    sorted_indices = sorted(range(len(arr)), key=lambda i: arr[i][1], reverse=True)
icecraft's avatar
icecraft committed
39

icecraft's avatar
icecraft committed
40
41
42
    # Initialize k empty partitions
    partitions = [[] for _ in range(k)]
    partition_sums = [0] * k
icecraft's avatar
icecraft committed
43

icecraft's avatar
icecraft committed
44
45
46
47
    # Assign each element to the partition with the smallest current sum
    for idx in sorted_indices:
        # Find the partition with the smallest sum
        min_sum_idx = partition_sums.index(min(partition_sums))
icecraft's avatar
icecraft committed
48

icecraft's avatar
icecraft committed
49
50
51
        # Add the element to this partition
        partitions[min_sum_idx].append(idx)  # Store the original index
        partition_sums[min_sum_idx] += arr[idx][1]
icecraft's avatar
icecraft committed
52

icecraft's avatar
icecraft committed
53
54
55
56
    return partitions


def process_pdf_batch(pdf_jobs, idx):
icecraft's avatar
icecraft committed
57
58
    """Process a batch of PDF pages using multiple threads.

icecraft's avatar
icecraft committed
59
60
61
62
63
64
65
66
67
68
    Parameters:
    -----------
    pdf_jobs : list of tuples
        List of (pdf_path, page_num) tuples
    output_dir : str or None
        Directory to save images to
    num_threads : int
        Number of threads to use
    **kwargs :
        Additional arguments for process_pdf_page
icecraft's avatar
icecraft committed
69

icecraft's avatar
icecraft committed
70
71
72
73
74
75
    Returns:
    --------
    images : list
        List of processed images
    """
    images = []
icecraft's avatar
icecraft committed
76

icecraft's avatar
icecraft committed
77
78
79
80
81
82
83
84
85
86
    for pdf_path, _ in pdf_jobs:
        doc = fitz.open(pdf_path)
        tmp = []
        for page_num in range(len(doc)):
            page = doc[page_num]
            tmp.append(fitz_doc_to_image(page))
        images.append(tmp)
    return (idx, images)

def batch_build_dataset(pdf_paths, k, lang=None):
icecraft's avatar
icecraft committed
87
88
89
    """Process multiple PDFs by partitioning them into k balanced parts and
    processing each part in parallel.

icecraft's avatar
icecraft committed
90
91
92
93
94
95
96
97
98
99
100
101
    Parameters:
    -----------
    pdf_paths : list
        List of paths to PDF files
    k : int
        Number of partitions to create
    output_dir : str or None
        Directory to save images to
    threads_per_worker : int
        Number of threads to use per worker
    **kwargs :
        Additional arguments for process_pdf_page
icecraft's avatar
icecraft committed
102

icecraft's avatar
icecraft committed
103
104
105
106
107
108
109
110
    Returns:
    --------
    all_images : list
        List of all processed images
    """
    # Get page counts for each PDF
    pdf_info = []
    total_pages = 0
icecraft's avatar
icecraft committed
111

icecraft's avatar
icecraft committed
112
113
114
115
116
117
118
119
    for pdf_path in pdf_paths:
        try:
            doc = fitz.open(pdf_path)
            num_pages = len(doc)
            pdf_info.append((pdf_path, num_pages))
            total_pages += num_pages
            doc.close()
        except Exception as e:
icecraft's avatar
icecraft committed
120
121
            print(f'Error opening {pdf_path}: {e}')

icecraft's avatar
icecraft committed
122
123
124
125
    # Partition the jobs based on page countEach job has 1 page
    partitions = partition_array_greedy(pdf_info, k)

    for i, partition in enumerate(partitions):
icecraft's avatar
icecraft committed
126
127
        print(f'Partition {i+1}: {len(partition)} pdfs')

icecraft's avatar
icecraft committed
128
129
    # Process each partition in parallel
    all_images_h = {}
icecraft's avatar
icecraft committed
130

icecraft's avatar
icecraft committed
131
132
133
134
135
136
    with concurrent.futures.ProcessPoolExecutor(max_workers=k) as executor:
        # Submit one task per partition
        futures = []
        for sn, partition in enumerate(partitions):
            # Get the jobs for this partition
            partition_jobs = [pdf_info[idx] for idx in partition]
icecraft's avatar
icecraft committed
137

icecraft's avatar
icecraft committed
138
139
140
141
142
143
144
145
146
147
148
            # Submit the task
            future = executor.submit(
                process_pdf_batch,
                partition_jobs,
                sn
            )
            futures.append(future)
        # Process results as they complete
        for i, future in enumerate(concurrent.futures.as_completed(futures)):
            try:
                idx, images = future.result()
icecraft's avatar
icecraft committed
149
                print(f'Partition {i+1} completed: processed {len(images)} images')
icecraft's avatar
icecraft committed
150
151
                all_images_h[idx] = images
            except Exception as e:
icecraft's avatar
icecraft committed
152
                print(f'Error processing partition: {e}')
icecraft's avatar
icecraft committed
153
154
155
156
    results = [None] * len(pdf_paths)
    for i in range(len(partitions)):
        partition = partitions[i]
        for j in range(len(partition)):
icecraft's avatar
icecraft committed
157
            with open(pdf_info[partition[j]][0], 'rb') as f:
icecraft's avatar
icecraft committed
158
159
160
161
162
                pdf_bytes = f.read()
            dataset = PymuDocDataset(pdf_bytes, lang=lang)
            dataset.set_images(all_images_h[i][j])
            results[partition[j]] = dataset
    return results