import json
import os
import sys
import re
import hashlib
import aixport
from aixport.basecmdtool import BaseCommandLineTool
from aixport.exceptions import AIxPORTError
import aixport.constants
import cellmaps_utils.constants
from typing import Iterator
import time
[docs]
class DRETrainRunner(object):
"""
Defines runner for invoking various Drug Recommender engines in
Training mode
"""
def __init__(self, outdir=None, input_rocrates=None, algorithms=None,
algorithm_configs=None,
algorithm_rocrate_configs=None):
"""
Constructor
"""
self._outdir = outdir
self._algorithms = algorithms
self._input_rocrates = input_rocrates
if algorithm_configs is None:
algorithm_configs = {}
if algorithm_rocrate_configs is None:
algorithm_rocrate_configs = {}
self._algorithm_configs = algorithm_configs
self._algorithm_rocrate_configs = algorithm_rocrate_configs
self._config_path_cache = {}
[docs]
def run(self):
"""
Abstract method to run the pipeline. This method should be implemented by subclasses.
:raises NotImplementedError: If the subclass does not implement this method.
"""
raise NotImplementedError('subclasses need to implement')
def _write_input_ro_crates(self, out=None):
"""
"""
if self._input_rocrates is None or len(self._input_rocrates) == 0:
raise AIxPORTError('No input RO-Crates')
for ro_crate in self._input_rocrates:
out.write(ro_crate + '\n')
def _write_algorithm_configs(self, out=None):
"""
"""
if out is None:
return
for algo in self._algorithms:
config_value = self._algorithm_configs.get(algo, '')
config_path = self._resolve_algorithm_config(algo=algo,
rocrate_path='')
out.write(str(config_path) + '\n')
def _materialize_algorithm_config(self, algo=None, config_value=None,
config_key='default'):
"""
Returns config path/string ready to pass to an algorithm command.
"""
if config_value is None or config_value == '':
return ''
if not isinstance(config_value, dict):
return str(config_value)
cache_key = (str(algo), str(config_key))
if cache_key in self._config_path_cache:
return self._config_path_cache[cache_key]
config_dir = os.path.join(self._outdir, 'algorithm_configs')
os.makedirs(config_dir, exist_ok=True)
algo_base = os.path.basename(str(algo))
safe_algo = re.sub(r'[^A-Za-z0-9._-]+', '_', algo_base)
digest = hashlib.md5(str(config_key).encode('utf-8')).hexdigest()[:12]
config_path = os.path.join(config_dir, f'{safe_algo}_{digest}.json')
with open(config_path, 'w') as cfg:
json.dump(config_value, cfg, indent=2, sort_keys=True)
self._config_path_cache[cache_key] = config_path
return config_path
def _resolve_algorithm_config(self, algo=None, rocrate_path=None):
"""
Resolve algorithm config for a specific RO-Crate with fallback to default.
"""
default_config = self._algorithm_configs.get(algo, '')
per_rocrate = self._algorithm_rocrate_configs.get(algo, {})
override_value = None
if isinstance(per_rocrate, dict) and rocrate_path:
abs_path = os.path.abspath(rocrate_path)
real_path = os.path.realpath(abs_path)
base_name = os.path.basename(abs_path)
for key in (abs_path, real_path, base_name):
if key in per_rocrate:
override_value = per_rocrate.get(key)
break
if override_value is None:
config_value = default_config
config_key = 'default'
else:
config_value = override_value
config_key = os.path.basename(rocrate_path) if rocrate_path else 'override'
return self._materialize_algorithm_config(algo=algo,
config_value=config_value,
config_key=config_key)
[docs]
class BashTrainRunner(DRETrainRunner):
"""
Runs DREs via Bash script
"""
def __init__(self, outdir=None, input_rocrates=None, algorithms=None,
algorithm_configs=None,
algorithm_rocrate_configs=None):
"""
Constructor
"""
super().__init__(outdir=outdir, input_rocrates=input_rocrates,
algorithms=algorithms,
algorithm_configs=algorithm_configs,
algorithm_rocrate_configs=algorithm_rocrate_configs)
def _write_algorithms(self, out=None):
"""
"""
for algo in self._algorithms:
out.write(algo + '\n')
[docs]
def run(self):
"""
"""
bashjobfile = os.path.join(self._outdir, 'bash_train_job.sh')
input_rocratefile = os.path.join(self._outdir, 'input_rocrates.txt')
jobsfile = os.path.join(self._outdir, 'training_jobs.tsv')
with open(input_rocratefile, 'w') as f:
self._write_input_ro_crates(f)
with open(jobsfile, 'w') as f:
for algo in self._algorithms:
for rocrate in self._input_rocrates:
config_path = self._resolve_algorithm_config(algo=algo,
rocrate_path=rocrate)
f.write(str(algo) + '\t' + str(config_path) + '\t' + str(rocrate) + '\n')
with open(bashjobfile, 'w') as f:
f.write('#! /bin/bash\n\n')
f.write('progress_bar() {\n')
f.write(' local current=$1\n')
f.write(' local total=$2\n')
f.write(' local label="$3"\n')
f.write(' local width=30\n')
f.write(' local filled=$((current * width / total))\n')
f.write(' local empty=$((width - filled))\n')
f.write(' local bar=""\n')
f.write(' local space=""\n')
f.write(' for ((i=0; i<filled; i++)); do bar+="#"; done\n')
f.write(' for ((i=0; i<empty; i++)); do space+="-"; done\n')
f.write(' printf "\\r%s [%s%s] %d/%d" "$label" "$bar" "$space" "$current" "$total"\n')
f.write('}\n\n')
f.write('BASEDIR=`dirname $0`\n')
f.write('pushd $BASEDIR\n')
f.write('OUTDIR="' + str(aixport.constants.TRAINED_MODELS_DIRECTORY) + '"\n')
total_jobs = len(self._algorithms) * len(self._input_rocrates)
f.write('\necho "Training ' + str(total_jobs) + ' jobs (' +
str(len(self._algorithms)) + ' algorithms x ' +
str(len(self._input_rocrates)) + ' datasets)"\n')
f.write('COUNT=0\n')
f.write("while IFS=$'\\t' read -r ALGO CONFIG TRAIN_ROCRATE ; do\n")
f.write(' [ -z "$ALGO" ] && continue\n')
f.write(' TRAIN_ROCRATE_NAME=`basename "$TRAIN_ROCRATE"`\n')
f.write(' ALGO_BASE=`basename "$ALGO"`\n')
f.write(' ALGO_NOSUFFIX="${ALGO_BASE%.*}"\n')
f.write(' CONFIG="${CONFIG:-}"\n')
f.write(' if [ -n "$CONFIG" ]; then\n')
f.write(' "$ALGO" "${OUTDIR}/${TRAIN_ROCRATE_NAME}_${ALGO_NOSUFFIX}" --input_crate "$TRAIN_ROCRATE" --mode train --config "$CONFIG"\n')
f.write(' else\n')
f.write(' "$ALGO" "${OUTDIR}/${TRAIN_ROCRATE_NAME}_${ALGO_NOSUFFIX}" --input_crate "$TRAIN_ROCRATE" --mode train\n')
f.write(' fi\n')
f.write(' STATUS=$?\n')
f.write(' if [ $STATUS -ne 0 ]; then\n')
f.write(' echo "FAILED ($STATUS): $ALGO $TRAIN_ROCRATE"\n')
f.write(' fi\n')
f.write(' COUNT=$((COUNT + 1))\n')
f.write(' progress_bar "$COUNT" "' + str(total_jobs) + '" "train jobs"\n')
f.write('done < training_jobs.tsv\n')
f.write('echo ""\n')
f.write('popd\n')
os.chmod(bashjobfile, 0o755)
return 0
[docs]
class SLURMTrainRunner(DRETrainRunner):
"""
Runs DREs via SLURM
"""
def __init__(self, outdir=None, input_rocrates=None, algorithms=None,
algorithm_configs=None,
algorithm_rocrate_configs=None):
"""
Constructor
"""
super().__init__(outdir=outdir, input_rocrates=input_rocrates,
algorithms=algorithms,
algorithm_configs=algorithm_configs,
algorithm_rocrate_configs=algorithm_rocrate_configs)
self._slurm_partition = None
self._slurm_account = None
def _write_slurm_directives(self, out=None, allocated_time='4:00:00',
mem='32G', cpus_per_task='4',
job_name='Unknown',
input_rocratefile=None,
input_configfile=None):
"""
Writes SLURM job directives to a bash script file.
:param out: File handle to write the SLURM directives.
:param allocated_time: String specifying the maximum time allowed for the job.
:param mem: String specifying the memory allocated for the job.
:param cpus_per_task: String specifying the number of CPUs per task.
:param job_name: String specifying the name of the SLURM job.
"""
out.write('#!/bin/bash\n\n')
out.write('#SBATCH --job-name=' + str(job_name) + '\n')
out.write('#SBATCH --chdir=' + self._outdir + '\n')
out.write('#SBATCH --output=%x.%j.out\n')
if self._slurm_partition is not None:
out.write('#SBATCH --partition=' + self._slurm_partition + '\n')
if self._slurm_account is not None:
out.write('#SBATCH --account=' + self._slurm_account + '\n')
out.write('#SBATCH --ntasks=1\n')
out.write('#SBATCH --cpus-per-task=' + str(cpus_per_task) + '\n')
out.write('#SBATCH --mem=' + str(mem) + '\n')
out.write('#SBATCH --time=' + str(allocated_time) + '\n\n')
out.write('#SBATCH --array=1-' + str(len(self._input_rocrates) + 1) + '\n')
out.write('echo "Job.Array: ${SLURM_JOB_ID}.${SLURM_ARRAY_TASK_ID}"\n')
out.write('echo $HOSTNAME\n\n')
out.write('INPUT_ROCRATE=`head -n $SLURM_ARRAY_TASK_ID ' + input_rocratefile + ' | tail -n 1`\n')
out.write('OUTPUT_ROCRATENAME=`basename $INPUT_ROCRATE`\n')
if input_configfile is not None:
out.write('CONFIG=`head -n $SLURM_ARRAY_TASK_ID ' + input_configfile + ' | tail -n 1`\n')
else:
out.write('CONFIG=""\n')
def _generate_algorithm_command(self, algorithm=None,
input_rocratefile=None,
input_configfile=None):
"""
"""
job_script = os.path.join(self._outdir, algorithm + '.sh')
with open(job_script, 'w') as f:
self._write_slurm_directives(out=f,
job_name=algorithm + '_train',
input_rocratefile=input_rocratefile,
input_configfile=input_configfile)
if input_configfile is not None:
f.write('if [ -n "$CONFIG" ]; then\n')
f.write(algorithm + ' "' + aixport.constants.TRAINED_MODELS_DIRECTORY +
'/${OUTPUT_ROCRATENAME}_' + algorithm +
'" --input_rocrate "$INPUT_ROCRATE" --mode train --config "$CONFIG"\n')
f.write('else\n')
f.write(algorithm + ' "' + aixport.constants.TRAINED_MODELS_DIRECTORY +
'/${OUTPUT_ROCRATENAME}_' + algorithm +
'" --input_rocrate "$INPUT_ROCRATE" --mode train\n')
f.write('fi\n')
else:
f.write(algorithm + ' "' + aixport.constants.TRAINED_MODELS_DIRECTORY + '/${OUTPUT_ROCRATENAME}_' + algorithm + '" --input_rocrate "$INPUT_ROCRATE" --mode train\n')
f.write('exit $?\n')
os.chmod(job_script, 0o755)
return os.path.basename(job_script)
def _generate_final_command(self):
"""
"""
return ''
[docs]
def run(self):
"""
"""
slurmjobfile = os.path.join(self._outdir, 'slurm_train_job.sh')
job_names = []
input_rocratefile = os.path.join(self._outdir, 'input_rocrates.txt')
with open(input_rocratefile, 'w') as f:
self._write_input_ro_crates(f)
algoconfigfile = os.path.join(self._outdir, 'config_files.txt')
with open(algoconfigfile, 'w') as f:
self._write_algorithm_configs(out=f)
with open(slurmjobfile, 'w') as f:
f.write('#! /bin/bash\n\n')
for algo_index, algo in enumerate(self._algorithms):
f.write('# ' + str(algo) + ' no dependencies\n')
job_name_var = 'job' + str(algo_index)
config_file = os.path.join(self._outdir, f'config_files_{algo_index}.txt')
with open(config_file, 'w') as cfg_out:
for rocrate in self._input_rocrates:
cfg_out.write(self._resolve_algorithm_config(algo=algo,
rocrate_path=rocrate) + '\n')
f.write(job_name_var +'=$(sbatch ' +
self._generate_algorithm_command(algorithm=algo,
input_rocratefile=input_rocratefile,
input_configfile=config_file) + ' | awk \'{print $4}\')\n\n')
job_names.append(job_name_var)
dependency_str = ':'.join(job_names)
# f.write('# final clean up job\n')
# f.write(
# 'final_job=$(sbatch --dependency=afterok:' + dependency_str + ' ' + self._generate_final_command() + ' | awk \'{print $4}\')\n\n')
# f.write('echo "job submitted; here is ID of final job: $final_job"\n')
os.chmod(slurmjobfile, 0o755)
return 0