Unverified Commit 43e9bd0f authored by Fazzie-Maqianli's avatar Fazzie-Maqianli Committed by GitHub
Browse files

update fastfold_data_workflow (#67)

parent 9d46d329
...@@ -23,7 +23,7 @@ class FastFoldDataWorkFlow: ...@@ -23,7 +23,7 @@ class FastFoldDataWorkFlow:
uniref_max_hits: int = 10000, uniref_max_hits: int = 10000,
mgnify_max_hits: int = 5000, mgnify_max_hits: int = 5000,
): ):
self.db_map = { db_map = {
"jackhmmer": { "jackhmmer": {
"binary": jackhmmer_binary_path, "binary": jackhmmer_binary_path,
"dbs": [ "dbs": [
...@@ -46,14 +46,14 @@ class FastFoldDataWorkFlow: ...@@ -46,14 +46,14 @@ class FastFoldDataWorkFlow:
}, },
} }
for name, dic in self.db_map.items(): for name, dic in db_map.items():
binary, dbs = dic["binary"], dic["dbs"] binary, dbs = dic["binary"], dic["dbs"]
if(binary is None and not all([x is None for x in dbs])): if(binary is None and not all([x is None for x in dbs])):
raise ValueError( raise ValueError(
f"{name} DBs provided but {name} binary is None" f"{name} DBs provided but {name} binary is None"
) )
if(not all([x is None for x in self.db_map["hhsearch"]["dbs"]]) if(not all([x is None for x in db_map["hhsearch"]["dbs"]])
and uniref90_database_path is None): and uniref90_database_path is None):
raise ValueError( raise ValueError(
"""uniref90_database_path must be specified in order to perform """uniref90_database_path must be specified in order to perform
...@@ -69,7 +69,61 @@ class FastFoldDataWorkFlow: ...@@ -69,7 +69,61 @@ class FastFoldDataWorkFlow:
else: else:
self.no_cpus = no_cpus self.no_cpus = no_cpus
def run(self, fasta_path: str, output_dir: str, alignment_dir: str=None) -> None: # create JackHmmer workflow generator
self.jackhmmer_uniref90_factory = None
if jackhmmer_binary_path is not None and uniref90_database_path is not None:
jh_config = {
"binary_path": db_map["jackhmmer"]["binary"],
"database_path": uniref90_database_path,
"n_cpu": no_cpus,
"uniref_max_hits": uniref_max_hits,
}
self.jackhmmer_uniref90_factory = JackHmmerFactory(config = jh_config)
# create HHSearch workflow generator
self.hhsearch_pdb_factory = None
if pdb70_database_path is not None:
hhs_config = {
"binary_path": db_map["hhsearch"]["binary"],
"databases": db_map["hhsearch"]["dbs"],
"n_cpu": self.no_cpus,
}
self.hhsearch_pdb_factory = HHSearchFactory(config=hhs_config)
self.jackhmmer_mgnify_factory = None
if jackhmmer_binary_path is not None and mgnify_database_path is not None:
jh_config = {
"binary_path": db_map["jackhmmer"]["binary"],
"database_path": mgnify_database_path,
"n_cpu": no_cpus,
"uniref_max_hits": mgnify_max_hits,
}
self.jackhmmer_mgnify_factory = JackHmmerFactory(config=jh_config)
if bfd_database_path is not None:
if not use_small_bfd:
hhb_config = {
"binary_path": db_map["hhblits"]["binary"],
"databases": db_map["hhblits"]["dbs"],
"n_cpu": self.no_cpus,
}
self.hhblits_bfd_factory = HHBlitsFactory(config=hhb_config)
else:
jh_config = {
"binary_path": db_map["jackhmmer"]["binary"],
"database_path": bfd_database_path,
"n_cpu": no_cpus,
}
self.jackhmmer_small_bfd_factory = JackHmmerFactory(config=jh_config)
def run(self, fasta_path: str, output_dir: str, alignment_dir: str=None, storage_dir: str=None) -> None:
# storage_dir = "file:///tmp/ray/lcmql/workflow_data"
if storage_dir is not None:
if not os.path.exists(storage_dir):
os.makedirs(storage_dir)
ray.init(storage=storage_dir)
localtime = time.asctime(time.localtime(time.time())) localtime = time.asctime(time.localtime(time.time()))
workflow_id = 'fastfold_data_workflow ' + str(localtime) workflow_id = 'fastfold_data_workflow ' + str(localtime)
...@@ -89,66 +143,33 @@ class FastFoldDataWorkFlow: ...@@ -89,66 +143,33 @@ class FastFoldDataWorkFlow:
os.makedirs(alignment_dir) os.makedirs(alignment_dir)
# Run JackHmmer on UNIREF90 # Run JackHmmer on UNIREF90
# create JackHmmer workflow generator
jh_config = {
"binary_path": self.db_map["jackhmmer"]["binary"],
"database_path": self.db_map["jackhmmer"]["dbs"][0],
"n_cpu": self.no_cpus,
"uniref_max_hits": self.uniref_max_hits,
}
jh_fac = JackHmmerFactory(config = jh_config)
# set jackhmmer output path
uniref90_out_path = os.path.join(alignment_dir, "uniref90_hits.a3m") uniref90_out_path = os.path.join(alignment_dir, "uniref90_hits.a3m")
# generate the workflow with i/o path # generate the workflow with i/o path
jh_node_1 = jh_fac.gen_node(fasta_path, uniref90_out_path) uniref90_node = self.jackhmmer_uniref90_factory.gen_node(fasta_path, uniref90_out_path)
#Run HHSearch on STEP1's result with PDB70""" #Run HHSearch on STEP1's result with PDB70"""
# create HHSearch workflow generator
hhs_config = {
"binary_path": self.db_map["hhsearch"]["binary"],
"databases": self.db_map["hhsearch"]["dbs"],
"n_cpu": self.no_cpus,
}
hhs_fac = HHSearchFactory(config=hhs_config)
# set HHSearch output path
pdb70_out_path = os.path.join(alignment_dir, "pdb70_hits.hhr") pdb70_out_path = os.path.join(alignment_dir, "pdb70_hits.hhr")
# generate the workflow (STEP2 depend on STEP1) # generate the workflow (STEP2 depend on STEP1)
hhs_node = hhs_fac.gen_node(uniref90_out_path, pdb70_out_path, after=[jh_node_1]) hhs_node = self.hhsearch_pdb_factory.gen_node(uniref90_out_path, pdb70_out_path, after=[uniref90_node])
# Run JackHmmer on MGNIFY # Run JackHmmer on MGNIFY
# reconfigure jackhmmer factory to use MGNIFY DB instead
jh_fac.configure('database_path', self.db_map["jackhmmer"]["dbs"][1])
# set jackhmmer output path
mgnify_out_path = os.path.join(alignment_dir, "mgnify_hits.a3m") mgnify_out_path = os.path.join(alignment_dir, "mgnify_hits.a3m")
# generate workflow for STEP3 # generate workflow for STEP3
jh_node_2 = jh_fac.gen_node(fasta_path, mgnify_out_path) mgnify_node = self.jackhmmer_mgnify_factory.gen_node(fasta_path, mgnify_out_path)
if not self.use_small_bfd: if not self.use_small_bfd:
# Run HHBlits on BFD # Run HHBlits on BFD
# create HHBlits workflow generator
hhb_config = {
"binary_path": self.db_map["hhblits"]["binary"],
"databases": self.db_map["hhblits"]["dbs"],
"n_cpu": self.no_cpus,
}
hhb_fac = HHBlitsFactory(config=hhb_config)
# set HHBlits output path
bfd_out_path = os.path.join(alignment_dir, "bfd_uniclust_hits.a3m") bfd_out_path = os.path.join(alignment_dir, "bfd_uniclust_hits.a3m")
# generate workflow for STEP4 # generate workflow for STEP4
hhb_node = hhb_fac.gen_node(fasta_path, bfd_out_path) bdf_node = self.hhblits_bfd_factory.gen_node(fasta_path, bfd_out_path)
# run workflow
batch_run(workflow_id=workflow_id, dags=[hhs_node, jh_node_2, hhb_node])
else: else:
# Run Jackhmmer on small_bfd # Run Jackhmmer on small_bfd
# reconfigure jackhmmer factory to use small_bfd DB instead
jh_fac.configure('database_path', self.db_map["jackhmmer"]["dbs"][2])
# set jackhmmer output path
bfd_out_path = os.path.join(alignment_dir, "bfd_uniclust_hits.a3m") bfd_out_path = os.path.join(alignment_dir, "bfd_uniclust_hits.a3m")
# generate workflow for STEP4_2 # generate workflow for STEP4_2
jh_node_3 = jh_fac.gen_node(fasta_path, bfd_out_path) bdf_node = self.jackhmmer_small_bfd_factory.gen_node(fasta_path, bfd_out_path)
# run workflow # run workflow
batch_run(workflow_id=workflow_id, dags=[hhs_node, jh_node_2, jh_node_3]) batch_run(workflow_id=workflow_id, dags=[hhs_node, mgnify_node, bdf_node])
return return
\ No newline at end of file
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment