Commit c3c627e7 authored by Dingquan Yu's avatar Dingquan Yu
Browse files

now run in a subprocess

parent 2e1941a0
......@@ -21,15 +21,14 @@ import dataclasses
from multiprocessing import cpu_count
import tempfile
from typing import Mapping, Optional, Sequence, Any, MutableMapping, Union
import asyncio, multiprocessing
import concurrent.futures
import subprocess
import numpy as np
import torch
from openfold.data import templates, parsers, mmcif_parsing, msa_identifiers, msa_pairing, feature_processing_multimer
from openfold.data.templates import get_custom_template_features, empty_template_feats
from openfold.data.tools import jackhmmer, hhblits, hhsearch, hmmsearch
from openfold.data.tools.utils import to_date
from openfold.data.tools.utils import to_date, NonDaemonicProcess, NonDaemonicProcessPool
from openfold.np import residue_constants, protein
......@@ -739,51 +738,21 @@ class DataPipeline:
fp.close()
else:
# Now will split the following steps into multiple processes
def parse_stockholm_file(alignment_dir: str, stockholm_file: str, queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, stockholm_file)
file_name,_ = os.path.splitext(stockholm_file)
with open(path, "r") as infile:
msa = parsers.parse_stockholm(infile.read())
infile.close()
queue.put({file_name: msa})
def parse_a3m_file(alignment_dir: str, a3m_file: str,queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, a3m_file)
file_name,_ = os.path.splitext(a3m_file)
with open(path, "r") as infile:
msa = parsers.parse_a3m(infile.read())
infile.close()
queue.put({file_name: msa})
def run_parse_all_msa_files_multiprocessing(stockholm_files: list, a3m_files: list, alignment_dir:str):
print(f"#### line 764 start running in multiprocessing way")
msa_results={}
processes = []
queue = multiprocessing.Queue()
for f in stockholm_files:
process = multiprocessing.Process(target = parse_stockholm_file, args=(alignment_dir, f, queue))
processes.append(process)
process.start()
for f in a3m_files:
process = multiprocessing.Process(target = parse_a3m_file, args=(alignment_dir, f, queue))
processes.append(process)
process.start()
for p in processes:
res = queue.get()
msa_results.update(res)
p.join()
return msa_results
stockholm_files = [i for i in os.listdir(alignment_dir) if (i.endswith('.sto') and ("hmm_output" not in i))]
a3m_files = [i for i in os.listdir(alignment_dir) if i.endswith('.a3m')]
import time
# a3m_tasks = [(alignment_dir, a3m) for a3m in a3m_files]
# sto_tasks = [(alignment_dir, sto) for sto in stockholm_files]
# with NonDaemonicProcessPool(len(a3m_tasks) + len(sto_tasks)) as pool:
# a3m_results = pool.starmap(parse_a3m_file, a3m_tasks)
# sto_results = pool.starmap(parse_stockholm_file, sto_tasks)
# msa_results = {**a3m_results, **sto_results}
import time, json
current_directory = os.path.dirname(os.path.abspath(__file__))
start = time.time()
# msa_data = asyncio.run(run_parse_all_msa_files(stockholm_files, a3m_files, alignment_dir))
msa_data = run_parse_all_msa_files_multiprocessing(stockholm_files, a3m_files, alignment_dir)
cmd = f"{current_directory}/parse_msa_files.py {alignment_dir}"
msa_data = subprocess.check_output(['python', cmd], capture_output=True, text= True)
msa_data = json.load(msa_data)
end = time.time()
calculate_elapse(start, end, "asynchronised version")
calculate_elapse(start, end, "multiprocessing version")
return msa_data
......
import os, multiprocessing, argparse, json
from openfold.data import parsers
def parse_stockholm_file(alignment_dir: str, stockholm_file: str, queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, stockholm_file)
file_name,_ = os.path.splitext(stockholm_file)
with open(path, "r") as infile:
msa = parsers.parse_stockholm(infile.read())
infile.close()
queue.put({file_name: msa})
def parse_a3m_file(alignment_dir: str, a3m_file: str,queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, a3m_file)
file_name,_ = os.path.splitext(a3m_file)
with open(path, "r") as infile:
msa = parsers.parse_a3m(infile.read())
infile.close()
queue.put({file_name: msa})
def run_parse_all_msa_files_multiprocessing(stockholm_files: list, a3m_files: list, alignment_dir:str):
print(f"#### line 764 start running in multiprocessing way")
msa_results={}
processes = []
queue = multiprocessing.Queue()
for f in stockholm_files:
process = multiprocessing.Process(target = parse_stockholm_file, args=(alignment_dir, f, queue))
process.deamon = False
processes.append(process)
process.start()
for f in a3m_files:
process = multiprocessing.Process(target = parse_a3m_file, args=(alignment_dir, f, queue))
process.daemon = False
processes.append(process)
process.start()
for p in processes:
res = queue.get()
msa_results.update(res)
p.join()
def main(alignment_dir):
parser = argparse.ArgumentParser(description='Process msa files in parallel')
parser.add_argument('alignment_dir', metavar='N', type=int, nargs='+',
help='an integer for the accumulator')
args = parser.parse_args()
stockholm_files = [i for i in os.listdir(alignment_dir) if (i.endswith('.sto') and ("hmm_output" not in i))]
a3m_files = [i for i in os.listdir(alignment_dir) if i.endswith('.a3m')]
msa_data = run_parse_all_msa_files_multiprocessing(stockholm_files, a3m_files, alignment_dir)
return json.dumps(msa_data)
if __name__ == "__main__":
main()
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment