compress_and_package.py 2.09 KB
Newer Older
researcher2's avatar
researcher2 committed
1
import argparse
2
3
import glob
import logging
researcher2's avatar
researcher2 committed
4
5
import os
import shutil
6
import subprocess
researcher2's avatar
researcher2 committed
7
8
9
10

from tqdm import tqdm
from tqdm_multiprocess import TqdmMultiProcessPool
from tqdm_multiprocess.logger import setup_logger_tqdm
Fabrizio Milo's avatar
Fabrizio Milo committed
11

12

researcher2's avatar
researcher2 committed
13
14
logger = logging.getLogger(__name__)

Fabrizio Milo's avatar
Fabrizio Milo committed
15
16
17
18

def process_task(
    working_directory, output_directory, bucket_file_path, tqdm_func, global_tqdm
):
researcher2's avatar
researcher2 committed
19
    command = f"zstd {bucket_file_path}"
Fabrizio Milo's avatar
Fabrizio Milo committed
20
    logger.info(command)
researcher2's avatar
researcher2 committed
21
22
23
24
25
26
27
28
29
    subprocess.call(command, shell=True)

    compressed_file = bucket_file_path + ".zst"
    if output_directory:
        shutil.move(compressed_file, output_directory)

    os.remove(bucket_file_path)
    global_tqdm.update()

Fabrizio Milo's avatar
Fabrizio Milo committed
30

researcher2's avatar
researcher2 committed
31
32
33
def compress_and_move(working_directory, output_directory, process_count):
    os.makedirs(output_directory, exist_ok=True)
    original_info_file_path = os.path.join(working_directory, "info.json")
Fabrizio Milo's avatar
Fabrizio Milo committed
34
    assert os.path.exists(original_info_file_path)
researcher2's avatar
researcher2 committed
35
36

    tasks = []
Fabrizio Milo's avatar
Fabrizio Milo committed
37
    bucket_file_paths = glob.glob(
38
        os.path.join(working_directory, "output", "*.bkt.txt.sorted")
Fabrizio Milo's avatar
Fabrizio Milo committed
39
    )
researcher2's avatar
researcher2 committed
40
41
42
43
    for bucket_file_path in bucket_file_paths:
        task = (process_task, (working_directory, output_directory, bucket_file_path))
        tasks.append(task)

Fabrizio Milo's avatar
Fabrizio Milo committed
44
    pool = TqdmMultiProcessPool(process_count)
45
46
47
48
49
50

    def on_done(_):
        return None

    def on_error(_):
        return None
researcher2's avatar
researcher2 committed
51

Fabrizio Milo's avatar
Fabrizio Milo committed
52
53
54
    global_progress = tqdm(
        total=len(bucket_file_paths), dynamic_ncols=True, unit="file"
    )
researcher2's avatar
researcher2 committed
55
56
57
58
    _ = pool.map(global_progress, tasks, on_error, on_done)

    shutil.copy(original_info_file_path, os.path.join(output_directory, "info.json"))

Fabrizio Milo's avatar
Fabrizio Milo committed
59
60

parser = argparse.ArgumentParser(description="sort 13gram buckets")
researcher2's avatar
researcher2 committed
61
62
63
64
parser.add_argument("-dir", "--working_directory", required=True)
parser.add_argument("-output", "--output_directory", required=True)
parser.add_argument("-procs", "--process_count", type=int, default=8)

Fabrizio Milo's avatar
Fabrizio Milo committed
65
if __name__ == "__main__":
researcher2's avatar
researcher2 committed
66
67
68
69
70
71
72
    version = 1.00
    print(f"Running version {version}")

    logfile_path = "compress_and_package.log"
    setup_logger_tqdm(logfile_path)

    args = parser.parse_args()
Fabrizio Milo's avatar
Fabrizio Milo committed
73
    compress_and_move(args.working_directory, args.output_directory, args.process_count)