Commit 2e1941a0 authored by Dingquan Yu's avatar Dingquan Yu
Browse files

now using multiprocessing style

parent 6f3e0c0c
......@@ -21,7 +21,8 @@ import dataclasses
from multiprocessing import cpu_count
import tempfile
from typing import Mapping, Optional, Sequence, Any, MutableMapping, Union
import asyncio
import asyncio, multiprocessing
import concurrent.futures
import numpy as np
import torch
......@@ -738,36 +739,51 @@ class DataPipeline:
fp.close()
else:
# Now will split the following steps into multiple processes
async def parse_stockholm_file(alignment_dir: str, stockholm_file: str):
def parse_stockholm_file(alignment_dir: str, stockholm_file: str, queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, stockholm_file)
file_name,_ = os.path.splitext(stockholm_file)
with open(path, "r") as infile:
msa = parsers.parse_stockholm(infile.read())
infile.close()
return {file_name: msa}
queue.put({file_name: msa})
async def parse_a3m_file(alignment_dir: str, a3m_file: str):
def parse_a3m_file(alignment_dir: str, a3m_file: str,queue: multiprocessing.Queue):
path = os.path.join(alignment_dir, a3m_file)
file_name,_ = os.path.splitext(a3m_file)
with open(path, "r") as infile:
msa = parsers.parse_a3m(infile.read())
infile.close()
return {file_name: msa}
queue.put({file_name: msa})
def run_parse_all_msa_files_multiprocessing(stockholm_files: list, a3m_files: list, alignment_dir:str):
print(f"#### line 764 start running in multiprocessing way")
msa_results={}
processes = []
queue = multiprocessing.Queue()
for f in stockholm_files:
process = multiprocessing.Process(target = parse_stockholm_file, args=(alignment_dir, f, queue))
processes.append(process)
process.start()
for f in a3m_files:
process = multiprocessing.Process(target = parse_a3m_file, args=(alignment_dir, f, queue))
processes.append(process)
process.start()
for p in processes:
res = queue.get()
msa_results.update(res)
p.join()
return msa_results
async def run_parse_all_msa_files(stockholm_files: list, a3m_files: list, alignment_dir:str):
all_tasks = [asyncio.create_task(parse_stockholm_file(alignment_dir, sto)) for sto in stockholm_files]
all_tasks += [asyncio.create_task(parse_a3m_file(alignment_dir, a3m)) for a3m in a3m_files]
results = await asyncio.gather(*all_tasks)
return results
stockholm_files = [i for i in os.listdir(alignment_dir) if (i.endswith('.sto') and ("hmm_output" not in i))]
a3m_files = [i for i in os.listdir(alignment_dir) if i.endswith('.a3m')]
import time
start = time.time()
msa_results = asyncio.run(run_parse_all_msa_files(stockholm_files, a3m_files, alignment_dir))
# msa_data = asyncio.run(run_parse_all_msa_files(stockholm_files, a3m_files, alignment_dir))
msa_data = run_parse_all_msa_files_multiprocessing(stockholm_files, a3m_files, alignment_dir)
end = time.time()
calculate_elapse(start, end, "asynchronised version")
for i in msa_results:
msa_data.update({k:v for k,v in i.items()})
return msa_data
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment