compress_tar.py 2.31 KB
Newer Older
Lengyue's avatar
Lengyue committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
import tarfile
from pathlib import Path
from tqdm import tqdm
import io
import random
from multiprocessing import Process


def chunked_tarring(rank, file_list, base_folder, output_folder, chunk_size=1024**3):
    chunk_count = 1
    total_size = 0
    saved_count = 0

    buffer = io.BytesIO()
    tar = tarfile.open(fileobj=buffer, mode="w")

    for audio_file in file_list:
        txt_file = audio_file.with_suffix(".txt")
        if not txt_file.exists():
            continue

        file_size = audio_file.stat().st_size + txt_file.stat().st_size
        if total_size + file_size > chunk_size:
            tar.close()

            # write the buffer to disk
            buffer.seek(0)
            with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
                f.write(buffer.read())

            chunk_count += 1
            total_size = 0
Lengyue's avatar
Lengyue committed
33
34

            buffer.close()
Lengyue's avatar
Lengyue committed
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
            buffer = io.BytesIO()
            tar = tarfile.open(fileobj=buffer, mode="w")

        tar.add(audio_file, arcname=audio_file.relative_to(base_folder))
        tar.add(txt_file, arcname=txt_file.relative_to(base_folder))

        total_size += file_size

        if saved_count % 1000 == 0:
            print(f"Rank {rank}: {saved_count}/{len(file_list)}")
        
        saved_count += 1

    tar.close()
    buffer.seek(0)
    with open(output_folder / f"chunk-{rank:03d}-{chunk_count:04d}.tar", "wb") as f:
        f.write(buffer.read())
    
    print(f"Rank {rank}: {saved_count}/{len(file_list)}")


if __name__ == "__main__":
    base_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/cleaned")
    output_folder = Path("/mnt/nvme1/multi-modal-test/WenetSpeech/compressed")
    output_folder.mkdir(exist_ok=True, parents=True)
    num_workers = 50

    file_list = list(tqdm(base_folder.rglob("*.flac")))
    random.shuffle(file_list)
    print(f"Total files: {len(file_list)}")

    chunk_size = len(file_list) // num_workers
    processes = []

    for i in range(num_workers):
        start = i * chunk_size
        end = (i + 1) * chunk_size
        if i == num_workers - 1:
            end = len(file_list)

        p = Process(target=chunked_tarring, args=(i, file_list[start:end], base_folder, output_folder))
        p.start()
        processes.append(p)

    for p in processes:
        p.join()

    print("Done")