warc_parser.py 4.7 KB
Newer Older
wanglch's avatar
wanglch committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
#!/usr/bin/env python3
import argparse
import sqlite3
from concurrent.futures import ThreadPoolExecutor
from functools import partial

import boto3
from tqdm import tqdm
from warcio.archiveiterator import ArchiveIterator


def parse_s3_path(s3_path):
    """
    Parses an S3 path of the form s3://bucket/prefix and returns the bucket and prefix.
    """
    if not s3_path.startswith("s3://"):
        raise ValueError("S3 path must start with s3://")
    without_prefix = s3_path[5:]
    parts = without_prefix.split("/", 1)
    bucket = parts[0]
    prefix = parts[1] if len(parts) > 1 else ""
    return bucket, prefix


def list_s3_warc_objects(s3_path, suffix=".warc.gz"):
    """
    Lists all objects under the given S3 path that end with the provided suffix.
    Uses a paginator to handle large result sets.
    """
    bucket, prefix = parse_s3_path(s3_path)
    s3_client = boto3.client("s3")
    paginator = s3_client.get_paginator("list_objects_v2")
    warc_keys = []
    for page in paginator.paginate(Bucket=bucket, Prefix=prefix):
        if "Contents" in page:
            for obj in page["Contents"]:
                key = obj["Key"]
                if key.endswith(suffix):
                    warc_keys.append(key)
    return bucket, warc_keys, s3_client


def extract_target_uri_s3(bucket, key, s3_client, head_bytes=1048576):
    """
    Retrieves the first head_bytes bytes (1 MB by default) from the S3 object using a range request,
    and extracts the first response record's target URI from the HTTP headers.
    """
    target_uri = None
    try:
        response = s3_client.get_object(Bucket=bucket, Key=key, Range=f"bytes=0-{head_bytes-1}")
        stream = response["Body"]
        for record in ArchiveIterator(stream):
            for name, value in record.rec_headers.headers:
                if name == "WARC-Target-URI":
                    target_uri = value
                    break
            if target_uri:
                break  # Only use the first valid response record
    except Exception as e:
        tqdm.write(f"Error processing s3://{bucket}/{key}: {e}")
    return target_uri


def create_db(db_path):
    """
    Creates (or opens) the SQLite database and ensures that the pdf_mapping table exists,
    including an index on pdf_hash.
    """
    conn = sqlite3.connect(db_path)
    cursor = conn.cursor()
    cursor.execute(
        """
        CREATE TABLE IF NOT EXISTS pdf_mapping (
            pdf_hash TEXT PRIMARY KEY,
            uri TEXT
        )
    """
    )
    cursor.execute(
        """
        CREATE INDEX IF NOT EXISTS idx_pdf_hash ON pdf_mapping (pdf_hash)
    """
    )
    conn.commit()
    return conn


def process_warc_file(key, bucket, s3_client):
    """
    Processes a single WARC file from S3 and returns a tuple (pdf_hash, uri)
    if successful, otherwise returns None.
    """
    uri = extract_target_uri_s3(bucket, key, s3_client, head_bytes=1048576)
    if uri:
        # Derive pdf_hash as the file's basename with .warc.gz replaced by .pdf.
        pdf_hash = key.split("/")[-1].replace(".warc.gz", ".pdf")
        return (pdf_hash, uri)
    else:
        tqdm.write(f"Warning: No valid response record found in s3://{bucket}/{key}")
        return None


def process_s3_folder(s3_path, db_path):
    """
    Lists all .warc.gz files under the provided S3 path, then processes each file in parallel
    to extract the target URI from the HTTP headers. The resulting mapping (derived from the file's
    basename with .warc.gz replaced by .pdf) is stored in the SQLite database.
    """
    bucket, warc_keys, s3_client = list_s3_warc_objects(s3_path, suffix=".warc.gz")
    conn = create_db(db_path)
    cursor = conn.cursor()

    # Process WARC files concurrently using ThreadPoolExecutor.
    results = []
    func = partial(process_warc_file, bucket=bucket, s3_client=s3_client)
    with ThreadPoolExecutor() as executor:
        for result in tqdm(executor.map(func, warc_keys), total=len(warc_keys), desc="Processing S3 WARC files"):
            if result is not None:
                results.append(result)

    # Bulk insert into the database.
    conn.execute("BEGIN")
    for pdf_hash, uri in results:
        cursor.execute("INSERT OR REPLACE INTO pdf_mapping (pdf_hash, uri) VALUES (?, ?)", (pdf_hash, uri))
    conn.commit()
    conn.close()


def main():
    parser = argparse.ArgumentParser(description="Create an SQLite database mapping PDF file names to target URIs from S3 WARC files.")
    parser.add_argument("s3_path", help="S3 path (e.g., s3://bucket/prefix) containing .warc.gz files")
    parser.add_argument("db_file", help="Path for the output SQLite database file")
    args = parser.parse_args()
    process_s3_folder(args.s3_path, args.db_file)


if __name__ == "__main__":
    main()