compress_tar.py 2.28 KB
Newer Older
Lengyue's avatar
Lengyue committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
import tarfile
from pathlib import Path
from tqdm import tqdm
import io
import random
from multiprocessing import Process


def chunked_tarring(rank, file_list, base_folder, output_folder, chunk_size=1024**3):
    chunk_count = 1
    total_size = 0
    saved_count = 0

    buffer = io.BytesIO()
    tar = tarfile.open(fileobj=buffer, mode="w")

    for audio_file in file_list:
        txt_file = audio_file.with_suffix(".txt")
        if not txt_file.exists():
            continue

        file_size = audio_file.stat().st_size + txt_file.stat().st_size
        if total_size + file_size > chunk_size:
            tar.close()

            # write the buffer to disk
            buffer.seek(0)
            with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
                f.write(buffer.read())

            chunk_count += 1
            total_size = 0
            buffer = io.BytesIO()
            tar = tarfile.open(fileobj=buffer, mode="w")

        tar.add(audio_file, arcname=audio_file.relative_to(base_folder))
        tar.add(txt_file, arcname=txt_file.relative_to(base_folder))

        total_size += file_size

        if saved_count % 1000 == 0:
            print(f"Rank {rank}: {saved_count}/{len(file_list)}")
        
        saved_count += 1

    tar.close()
    buffer.seek(0)
    with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
        f.write(buffer.read())
    
    print(f"Rank {rank}: {saved_count}/{len(file_list)}")


if __name__ == "__main__":
    base_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/cleaned")
    output_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/compressed")
    output_folder.mkdir(exist_ok=True, parents=True)
    num_workers = 50

    file_list = list(tqdm(base_folder.rglob("*.flac")))
    random.shuffle(file_list)
    print(f"Total files: {len(file_list)}")

    chunk_size = len(file_list) // num_workers
    processes = []

    for i in range(num_workers):
        start = i * chunk_size
        end = (i + 1) * chunk_size
        if i == num_workers - 1:
            end = len(file_list)

        p = Process(target=chunked_tarring, args=(i, file_list[start:end], base_folder, output_folder))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print("Done")