para_split_v3.py 12.5 KB
Newer Older
1
2
import copy

3
4
from loguru import logger

5
from magic_pdf.libs.Constants import LINES_DELETED, CROSS_PAGE
6
from magic_pdf.libs.ocr_content_type import BlockType, ContentType
7

8
LINE_STOP_FLAG = ('.', '!', '?', '。', '!', '?', ')', ')', '"', '”', ':', ':', ';', ';')
9
10
11
12
13
14
LIST_END_FLAG = ('.', '。', ';', ';')


class ListLineTag:
    IS_LIST_START_LINE = "is_list_start_line"
    IS_LIST_END_LINE = "is_list_end_line"
15
16
17


def __process_blocks(blocks):
18
19
20
    # 对所有block预处理
    # 1.通过title和interline_equation将block分组
    # 2.bbox边界根据line信息重置
21
22
23
24
25
26
27
28
29
30

    result = []
    current_group = []

    for i in range(len(blocks)):
        current_block = blocks[i]

        # 如果当前块是 text 类型
        if current_block['type'] == 'text':
            current_block["bbox_fs"] = copy.deepcopy(current_block["bbox"])
31
            if 'lines' in current_block and len(current_block["lines"]) > 0:
32
33
34
35
36
37
                current_block['bbox_fs'] = [min([line['bbox'][0] for line in current_block['lines']]),
                                            min([line['bbox'][1] for line in current_block['lines']]),
                                            max([line['bbox'][2] for line in current_block['lines']]),
                                            max([line['bbox'][3] for line in current_block['lines']])]
            current_group.append(current_block)

38
39
40
41
42
43
44
        # 检查下一个块是否存在
        if i + 1 < len(blocks):
            next_block = blocks[i + 1]
            # 如果下一个块不是 text 类型且是 title 或 interline_equation 类型
            if next_block['type'] in ['title', 'interline_equation']:
                result.append(current_group)
                current_group = []
45
46
47
48
49
50
51
52

    # 处理最后一个 group
    if current_group:
        result.append(current_group)

    return result


53
def __is_list_or_index_block(block):
54
55
56
57
    # 一个block如果是list block 应该同时满足以下特征
    # 1.block内有多个line 2.block 内有多个line左侧顶格写 3.block内有多个line 右侧不顶格(狗牙状)
    # 1.block内有多个line 2.block 内有多个line左侧顶格写 3.多个line以endflag结尾
    # 1.block内有多个line 2.block 内有多个line左侧顶格写 3.block内有多个line 左侧不顶格
58
59
60
61

    # index block 是一种特殊的list block
    # 一个block如果是index block 应该同时满足以下特征
    # 1.block内有多个line 2.block 内有多个line两侧均顶格写 3.line的开头或者结尾均为数字
62
63
64
65
66
67
68
69
    if len(block['lines']) >= 3:
        first_line = block['lines'][0]
        line_height = first_line['bbox'][3] - first_line['bbox'][1]
        block_weight = block['bbox_fs'][2] - block['bbox_fs'][0]

        left_close_num = 0
        left_not_close_num = 0
        right_not_close_num = 0
70
        right_close_num = 0
71
        lines_text_list = []
72
73
74
75
76
77
78
79
80
81
82

        multiple_para_flag = False
        last_line = block['lines'][-1]
        # 如果首行左边不顶格而右边顶格,末行左边顶格而右边不顶格 (第一行可能可以右边不顶格)
        if (first_line['bbox'][0] - block['bbox_fs'][0] > line_height / 2 and
                # block['bbox_fs'][2] - first_line['bbox'][2] < line_height and
                abs(last_line['bbox'][0] - block['bbox_fs'][0]) < line_height / 2 and
                block['bbox_fs'][2] - last_line['bbox'][2] > line_height
        ):
            multiple_para_flag = True

83
84
85
86
87
88
89
90
91
92
93
94
        for line in block['lines']:

            line_text = ""

            for span in line['spans']:
                span_type = span['type']
                if span_type == ContentType.Text:
                    line_text += span['content'].strip()

            lines_text_list.append(line_text)

            # 计算line左侧顶格数量是否大于2,是否顶格用abs(block['bbox_fs'][0] - line['bbox'][0]) < line_height/2 来判断
95
            if abs(block['bbox_fs'][0] - line['bbox'][0]) < line_height / 2:
96
97
98
99
100
                left_close_num += 1
            elif line['bbox'][0] - block['bbox_fs'][0] > line_height:
                # logger.info(f"{line_text}, {block['bbox_fs']}, {line['bbox']}")
                left_not_close_num += 1

101
            # 计算右侧是否顶格
102
            if abs(block['bbox_fs'][2] - line['bbox'][2]) < line_height:
103
                right_close_num += 1
104
105
106
107
108
109
            else:
                # 右侧不顶格情况下是否有一段距离,拍脑袋用0.3block宽度做阈值
                closed_area = 0.3 * block_weight
                # closed_area = 5 * line_height
                if block['bbox_fs'][2] - line['bbox'][2] > closed_area:
                    right_not_close_num += 1
110

111
112
        # 判断lines_text_list中的元素是否有超过80%都以LIST_END_FLAG结尾
        line_end_flag = False
113
114
        # 判断lines_text_list中的元素是否有超过80%都以数字开头或都以数字结尾
        line_num_flag = False
115
116
117
        num_start_count = 0
        num_end_count = 0
        flag_end_count = 0
118
119
120
        if len(lines_text_list) > 0:
            for line_text in lines_text_list:
                if len(line_text) > 0:
121
                    if line_text[-1] in LIST_END_FLAG:
122
                        flag_end_count += 1
123
124
125
126
127
                    if line_text[0].isdigit():
                        num_start_count += 1
                    if line_text[-1].isdigit():
                        num_end_count += 1

128
            if flag_end_count / len(lines_text_list) >= 0.8:
129
130
                line_end_flag = True

131
132
133
            if num_start_count / len(lines_text_list) >= 0.8 or num_end_count / len(lines_text_list) >= 0.8:
                line_num_flag = True

134
135
136
137
        # 有的目录右侧不贴边, 目前认为左边或者右边有一边全贴边,且符合数字规则极为index
        if ((left_close_num/len(block['lines']) >= 0.8 or right_close_num/len(block['lines']) >= 0.8)
                and line_num_flag
        ):
138
139
            for line in block['lines']:
                line[ListLineTag.IS_LIST_START_LINE] = True
140
            return BlockType.Index
141

142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
        elif left_close_num >= 2 and (
                right_not_close_num >= 2 or line_end_flag or left_not_close_num >= 2) and not multiple_para_flag:
            # 处理一种特殊的没有缩进的list,所有行都贴左边,通过右边的空隙判断是否是item尾
            if left_close_num / len(block['lines']) > 0.9:
                # 这种是每个item只有一行,且左边都贴边的短item list
                if flag_end_count == 0 and right_close_num / len(block['lines']) < 0.5:
                    for line in block['lines']:
                        if abs(block['bbox_fs'][0] - line['bbox'][0]) < line_height / 2:
                            line[ListLineTag.IS_LIST_START_LINE] = True
                # 这种是大部分line item 都有结束标识符的情况,按结束标识符区分不同item
                elif line_end_flag:
                    for i, line in enumerate(block['lines']):
                        if lines_text_list[i][-1] in LIST_END_FLAG:
                            line[ListLineTag.IS_LIST_END_LINE] = True
                            if i + 1 < len(block['lines']):
                                block['lines'][i+1][ListLineTag.IS_LIST_START_LINE] = True
                # line item基本没有结束标识符,而且也没有缩进,按右侧空隙判断哪些是item end
                else:
                    line_start_flag = False
                    for i, line in enumerate(block['lines']):
                        if line_start_flag:
                            line[ListLineTag.IS_LIST_START_LINE] = True
                            line_start_flag = False
                        elif abs(block['bbox_fs'][2] - line['bbox'][2]) > line_height:
                            line[ListLineTag.IS_LIST_END_LINE] = True
                            line_start_flag = True
            # 一种有缩进的特殊有序list,start line 左侧不贴边且以数字开头,end line 以 IS_LIST_END_LINE 结尾且数量和start line 一致
            elif num_start_count == flag_end_count:  # 简单一点先不考虑左侧不贴边的情况
                for i, line in enumerate(block['lines']):
                    if lines_text_list[i][0].isdigit():
                        line[ListLineTag.IS_LIST_START_LINE] = True
                    if lines_text_list[i][-1] in LIST_END_FLAG:
                        line[ListLineTag.IS_LIST_END_LINE] = True
            else:
                # 正常有缩进的list处理
                for line in block['lines']:
                    if abs(block['bbox_fs'][0] - line['bbox'][0]) < line_height / 2:
                        line[ListLineTag.IS_LIST_START_LINE] = True
                    if abs(block['bbox_fs'][2] - line['bbox'][2]) > line_height:
                        line[ListLineTag.IS_LIST_END_LINE] = True
182
183

            return BlockType.List
184
        else:
185
            return BlockType.Text
186
    else:
187
        return BlockType.Text
188
189
190


def __merge_2_text_blocks(block1, block2):
191
192
193
    if len(block1['lines']) > 0:
        first_line = block1['lines'][0]
        line_height = first_line['bbox'][3] - first_line['bbox'][1]
194
195
196
197
        block1_weight = block1['bbox'][2] - block1['bbox'][0]
        block2_weight = block2['bbox'][2] - block2['bbox'][0]
        min_block_weight = min(block1_weight, block2_weight)
        if abs(block1['bbox_fs'][0] - first_line['bbox'][0]) < line_height / 2:
198
199
200
201
            last_line = block2['lines'][-1]
            if len(last_line['spans']) > 0:
                last_span = last_line['spans'][-1]
                line_height = last_line['bbox'][3] - last_line['bbox'][1]
202
203
204
205
206
                if (abs(block2['bbox_fs'][2] - last_line['bbox'][2]) < line_height and
                        not last_span['content'].endswith(LINE_STOP_FLAG) and
                        # 两个block宽度差距超过2倍也不合并
                        abs(block1_weight - block2_weight) < min_block_weight
                ):
207
208
209
210
211
212
213
214
215
216
217
                    if block1['page_num'] != block2['page_num']:
                        for line in block1['lines']:
                            for span in line['spans']:
                                span[CROSS_PAGE] = True
                    block2['lines'].extend(block1['lines'])
                    block1['lines'] = []
                    block1[LINES_DELETED] = True

    return block1, block2


218
219
220
221
222
223
224
225
226
227
228
229
def __merge_2_list_blocks(block1, block2):
    if block1['page_num'] != block2['page_num']:
        for line in block1['lines']:
            for span in line['spans']:
                span[CROSS_PAGE] = True
    block2['lines'].extend(block1['lines'])
    block1['lines'] = []
    block1[LINES_DELETED] = True

    return block1, block2


230
231
232
def __para_merge_page(blocks):
    page_text_blocks_groups = __process_blocks(blocks)
    for text_blocks_group in page_text_blocks_groups:
233
234

        if len(text_blocks_group) > 0:
235
            # 需要先在合并前对所有block判断是否为list or index block
236
            for block in text_blocks_group:
237
238
239
                block_type = __is_list_or_index_block(block)
                block['type'] = block_type
                # logger.info(f"{block['type']}:{block}")
240

241
242
        if len(text_blocks_group) > 1:
            # 倒序遍历
243
            for i in range(len(text_blocks_group) - 1, -1, -1):
244
                current_block = text_blocks_group[i]
245

246
247
248
                # 检查是否有前一个块
                if i - 1 >= 0:
                    prev_block = text_blocks_group[i - 1]
249
250
251

                    if current_block['type'] == 'text' and prev_block['type'] == 'text':
                        __merge_2_text_blocks(current_block, prev_block)
252
253
254
255
                    elif (
                            (current_block['type'] == BlockType.List and prev_block['type'] == BlockType.List) or
                            (current_block['type'] == BlockType.Index and prev_block['type'] == BlockType.Index)
                    ):
256
                        __merge_2_list_blocks(current_block, prev_block)
257

258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
        else:
            continue


def para_split(pdf_info_dict, debug_mode=False):
    all_blocks = []
    for page_num, page in pdf_info_dict.items():
        blocks = copy.deepcopy(page['preproc_blocks'])
        for block in blocks:
            block['page_num'] = page_num
        all_blocks.extend(blocks)

    __para_merge_page(all_blocks)
    for page_num, page in pdf_info_dict.items():
        page['para_blocks'] = []
        for block in all_blocks:
            if block['page_num'] == page_num:
                page['para_blocks'].append(block)


if __name__ == '__main__':
279
    input_blocks = []
280
    # 调用函数
281
282
    groups = __process_blocks(input_blocks)
    for group_index, group in enumerate(groups):
283
        print(f"Group {group_index}: {group}")