Commit 2f46839b by Christian Margreitter

Bug fix in backend parallelization that could lead to issues when entire…

Bug fix in backend parallelization that could lead to issues when entire sublists failed to generate a structure.
parent 7bdfd4a6
...@@ -178,12 +178,12 @@ class AutodockVina(Docker, BaseModel): ...@@ -178,12 +178,12 @@ class AutodockVina(Docker, BaseModel):
if not os.path.exists(self.parameters.receptor_pdbqt_path[0]): if not os.path.exists(self.parameters.receptor_pdbqt_path[0]):
raise DockingRunFailed("Specified PDBQT path to target (receptor) does not exist - abort.") raise DockingRunFailed("Specified PDBQT path to target (receptor) does not exist - abort.")
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_paths, tmp_output_paths, \ tmp_output_dirs, tmp_input_paths, tmp_output_paths, \
...@@ -197,10 +197,13 @@ class AutodockVina(Docker, BaseModel): ...@@ -197,10 +197,13 @@ class AutodockVina(Docker, BaseModel):
tmp_output_paths[chunk_index])) tmp_output_paths[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# parse the resulting sdf files # parse the resulting sdf files
for path_sdf_results, cur_identifier in zip(tmp_output_paths, ligand_identifiers): for path_sdf_results, cur_identifier in zip(tmp_output_paths, ligand_identifiers):
# add conformations # add conformations
...@@ -226,7 +229,7 @@ class AutodockVina(Docker, BaseModel): ...@@ -226,7 +229,7 @@ class AutodockVina(Docker, BaseModel):
# clean-up # clean-up
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# the conformers are already sorted, but some tags are missing # the conformers are already sorted, but some tags are missing
# -> <ligand_number>:<enumeration>:<conformer_number> # -> <ligand_number>:<enumeration>:<conformer_number>
......
...@@ -202,13 +202,13 @@ class Gold(Docker): ...@@ -202,13 +202,13 @@ class Gold(Docker):
start_indices, sublists = self.get_sublists_for_docking(number_cores=number_cores) start_indices, sublists = self.get_sublists_for_docking(number_cores=number_cores)
number_sublists = len(sublists) number_sublists = len(sublists)
self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG) self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_sdf_paths, \ tmp_output_dirs, tmp_input_sdf_paths, \
...@@ -223,10 +223,13 @@ class Gold(Docker): ...@@ -223,10 +223,13 @@ class Gold(Docker):
tmp_output_dirs[chunk_index])) tmp_output_dirs[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# load the chunks and recombine the result; add conformations # load the chunks and recombine the result; add conformations
for chunk_index in range(len(tmp_output_dirs)): for chunk_index in range(len(tmp_output_dirs)):
# this is a protection against the case where empty (file size == 0 bytes) files are generated due to # this is a protection against the case where empty (file size == 0 bytes) files are generated due to
...@@ -253,7 +256,7 @@ class Gold(Docker): ...@@ -253,7 +256,7 @@ class Gold(Docker):
# clean-up # clean-up
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# update conformer names to contain the conformer id # update conformer names to contain the conformer id
# -> <ligand_number>:<enumeration>:<conformer_number> # -> <ligand_number>:<enumeration>:<conformer_number>
......
...@@ -141,20 +141,20 @@ class OpenEye(Docker): ...@@ -141,20 +141,20 @@ class OpenEye(Docker):
number_sublists = len(sublists) number_sublists = len(sublists)
self._logger.log(f"Split ligands into {len(sublists)} sublists for docking.", _LE.DEBUG) self._logger.log(f"Split ligands into {len(sublists)} sublists for docking.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
processes = [] processes = []
return_queues = [] return_queues = []
for _ in range(number_cores): for _ in range(number_cores):
if jobs_submitted >= len(sublists): if sublists_submitted >= len(sublists):
continue continue
cur_queue = multiprocessing.Queue() cur_queue = multiprocessing.Queue()
p = multiprocessing.Process(target=self._dock_subjob, args=(sublists[jobs_submitted], p = multiprocessing.Process(target=self._dock_subjob, args=(sublists[sublists_submitted],
cur_queue)) cur_queue))
processes.append(p) processes.append(p)
p.start() p.start()
return_queues.append(cur_queue) return_queues.append(cur_queue)
jobs_submitted += 1 sublists_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
...@@ -164,7 +164,7 @@ class OpenEye(Docker): ...@@ -164,7 +164,7 @@ class OpenEye(Docker):
if cur_ligand_name == ligand.get_identifier(): if cur_ligand_name == ligand.get_identifier():
ligand.set_conformers(cur_slice[cur_ligand_name]) ligand.set_conformers(cur_slice[cur_ligand_name])
break break
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# update conformer names to contain the conformer id -> <ligand_number>:<enumeration>:<conformer_number> # update conformer names to contain the conformer id -> <ligand_number>:<enumeration>:<conformer_number>
for ligand in self.ligands: for ligand in self.ligands:
......
...@@ -185,13 +185,13 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel): ...@@ -185,13 +185,13 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel):
number_sublists = len(sublists) number_sublists = len(sublists)
self._logger.log(f"Split ligands into {number_sublists} sublists for embedding.", _LE.DEBUG) self._logger.log(f"Split ligands into {number_sublists} sublists for embedding.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
if isinstance(self.ligands[0].get_molecule(), Chem.Mol): if isinstance(self.ligands[0].get_molecule(), Chem.Mol):
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_smi_paths, \ tmp_output_dirs, tmp_input_smi_paths, \
...@@ -206,10 +206,13 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel): ...@@ -206,10 +206,13 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel):
tmp_output_dirs[chunk_index])) tmp_output_dirs[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# load and store the conformers; name it sequentially # load and store the conformers; name it sequentially
# note, that some backends require the H-coordinates (such as Glide) - so keep them! # note, that some backends require the H-coordinates (such as Glide) - so keep them!
ligands_embedded = [] ligands_embedded = []
...@@ -262,7 +265,7 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel): ...@@ -262,7 +265,7 @@ class OmegaLigandPreparator(LigandPreparator, BaseModel):
# remove temporary files # remove temporary files
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# check success and failure with embedding # check success and failure with embedding
failed = 0 failed = 0
......
...@@ -133,13 +133,13 @@ class OpenEyeHybrid(Docker): ...@@ -133,13 +133,13 @@ class OpenEyeHybrid(Docker):
start_indices, sublists = self.get_sublists_for_docking(number_cores=number_cores) start_indices, sublists = self.get_sublists_for_docking(number_cores=number_cores)
number_sublists = len(sublists) number_sublists = len(sublists)
self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG) self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_sdf_paths, \ tmp_output_dirs, tmp_input_sdf_paths, \
...@@ -154,10 +154,13 @@ class OpenEyeHybrid(Docker): ...@@ -154,10 +154,13 @@ class OpenEyeHybrid(Docker):
tmp_output_dirs[chunk_index])) tmp_output_dirs[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# load the chunks and recombine the result; add conformations # load the chunks and recombine the result; add conformations
for chunk_index in range(len(tmp_output_dirs)): for chunk_index in range(len(tmp_output_dirs)):
# this is a protection against the case where empty (file size == 0 bytes) files are generated due to # this is a protection against the case where empty (file size == 0 bytes) files are generated due to
...@@ -180,7 +183,7 @@ class OpenEyeHybrid(Docker): ...@@ -180,7 +183,7 @@ class OpenEyeHybrid(Docker):
# clean-up # clean-up
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# sort the conformers (best to worst), update their names to contain the conformer id and add tags # sort the conformers (best to worst), update their names to contain the conformer id and add tags
# -> <ligand_number>:<enumeration>:<conformer_number> # -> <ligand_number>:<enumeration>:<conformer_number>
......
...@@ -398,13 +398,13 @@ class Glide(Docker, BaseModel): ...@@ -398,13 +398,13 @@ class Glide(Docker, BaseModel):
number_sublists = len(sublists) number_sublists = len(sublists)
number_ligands_per_sublist = len(sublists[0]) number_ligands_per_sublist = len(sublists[0])
self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG) self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_mae_paths, \ tmp_output_dirs, tmp_input_mae_paths, \
...@@ -424,10 +424,13 @@ class Glide(Docker, BaseModel): ...@@ -424,10 +424,13 @@ class Glide(Docker, BaseModel):
number_ligands_per_sublist)) number_ligands_per_sublist))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# parse the resulting sdf files # parse the resulting sdf files
for path_sdf_results in tmp_output_sdf_paths: for path_sdf_results in tmp_output_sdf_paths:
# this is a protection against the case where empty (file size == 0 bytes) files are generated due to # this is a protection against the case where empty (file size == 0 bytes) files are generated due to
...@@ -449,7 +452,7 @@ class Glide(Docker, BaseModel): ...@@ -449,7 +452,7 @@ class Glide(Docker, BaseModel):
# clean-up # clean-up
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# sort the conformers (best to worst) and update their names to contain the conformer id # sort the conformers (best to worst) and update their names to contain the conformer id
# -> <ligand_number>:<enumeration>:<conformer_number> # -> <ligand_number>:<enumeration>:<conformer_number>
......
...@@ -254,12 +254,12 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel): ...@@ -254,12 +254,12 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel):
self._logger.log(f"Split ligands into {number_sublists} sublists for embedding.", self._logger.log(f"Split ligands into {number_sublists} sublists for embedding.",
_LE.DEBUG) _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_smi_paths, \ tmp_output_dirs, tmp_input_smi_paths, \
...@@ -279,10 +279,13 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel): ...@@ -279,10 +279,13 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel):
tmp_input_filter_paths[chunk_index])) tmp_input_filter_paths[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# load and store the conformers; name it sequentially # load and store the conformers; name it sequentially
# note, that some backends require the H-coordinates (such as Glide) - so keep them! # note, that some backends require the H-coordinates (such as Glide) - so keep them!
ligands_embedded = [] ligands_embedded = []
...@@ -307,7 +310,7 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel): ...@@ -307,7 +310,7 @@ class LigprepLigandPreparator(LigandPreparator, BaseModel):
# remove temporary files # remove temporary files
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# check success and failure with embedding # check success and failure with embedding
failed = 0 failed = 0
......
...@@ -120,12 +120,12 @@ class rDock(Docker): ...@@ -120,12 +120,12 @@ class rDock(Docker):
number_sublists = len(sublists) number_sublists = len(sublists)
self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG) self._logger.log(f"Split ligands into {number_sublists} sublists for docking.", _LE.DEBUG)
jobs_submitted = 0 sublists_submitted = 0
slices_per_iteration = min(number_cores, number_sublists) slices_per_iteration = min(number_cores, number_sublists)
while jobs_submitted < len(sublists): while sublists_submitted < len(sublists):
upper_bound_slice = min((jobs_submitted + slices_per_iteration), len(sublists)) upper_bound_slice = min((sublists_submitted + slices_per_iteration), len(sublists))
cur_slice_start_indices = start_indices[jobs_submitted:upper_bound_slice] cur_slice_start_indices = start_indices[sublists_submitted:upper_bound_slice]
cur_slice_sublists = sublists[jobs_submitted:upper_bound_slice] cur_slice_sublists = sublists[sublists_submitted:upper_bound_slice]
# generate paths and initialize molecules (so that if they fail, this can be covered) # generate paths and initialize molecules (so that if they fail, this can be covered)
tmp_output_dirs, tmp_input_sdf_paths, \ tmp_output_dirs, tmp_input_sdf_paths, \
...@@ -140,10 +140,13 @@ class rDock(Docker): ...@@ -140,10 +140,13 @@ class rDock(Docker):
tmp_output_sdf_paths[chunk_index])) tmp_output_sdf_paths[chunk_index]))
processes.append(p) processes.append(p)
p.start() p.start()
jobs_submitted += 1
for p in processes: for p in processes:
p.join() p.join()
# add the number of input sublists rather than the output temporary folders to account for cases where
# entire sublists failed to produce an input structure
sublists_submitted += len(cur_slice_sublists)
# load the chunks and recombine the result; add conformations # load the chunks and recombine the result; add conformations
for chunk_index in range(len(tmp_output_dirs)): for chunk_index in range(len(tmp_output_dirs)):
if not os.path.isfile(tmp_output_sdf_paths[chunk_index]) or os.path.getsize(tmp_output_sdf_paths[chunk_index]) == 0: if not os.path.isfile(tmp_output_sdf_paths[chunk_index]) or os.path.getsize(tmp_output_sdf_paths[chunk_index]) == 0:
...@@ -165,7 +168,7 @@ class rDock(Docker): ...@@ -165,7 +168,7 @@ class rDock(Docker):
# clean-up # clean-up
for path in tmp_output_dirs: for path in tmp_output_dirs:
shutil.rmtree(path) shutil.rmtree(path)
self._log_docking_progress(number_done=jobs_submitted, number_total=number_sublists) self._log_docking_progress(number_done=sublists_submitted, number_total=number_sublists)
# sort the conformers (best to worst), update their names to contain the conformer id and add tags # sort the conformers (best to worst), update their names to contain the conformer id and add tags
# -> <ligand_number>:<enumeration>:<conformer_number> # -> <ligand_number>:<enumeration>:<conformer_number>
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment