Source code for spikewrap.utils._slurm
from __future__ import annotations
import datetime
import subprocess
from pathlib import Path
from typing import Callable
import submitit
from spikewrap.configs.hpc import (
default_slurm_options,
)
from spikewrap.utils import _utils
from spikewrap.utils._checks import _system_call_success
[docs]
def run_in_slurm(
slurm_opts: None | dict,
func_to_run: Callable,
log_base_path: Path,
suffix_name: str | None = None,
):
"""
Run a function in a slurm job.
Parameters
----------
slurm_opts
A dictionary of options that control execution of the slurm
job. See spikewrap.default_slurm_options() for details.
func_to_run
The function to run inside the slurm job.
log_base_path
The path where a folder slurm logs will be output
suffix_name
If given (default `None`), a suffix for the output folder e.g. _sort
"""
_run_in_slurm_core(slurm_opts, func_to_run, {}, log_base_path, suffix_name)
# Private Functions
# -----------------------------------------------------------------------------
def _run_in_slurm_core(
slurm_opts: None | bool | dict,
func_to_run: Callable,
func_opts: dict,
log_base_path: Path,
suffix_name: str | None = None,
):
"""
Run a function in SLURM using submitit.
This is very similar to `run_in_slurm` but exposes
passing `func_opts` to pass at runtime, which is not
exposed publicly to avoid confusion.
Parameters
----------
slurm_opts
If `True`, default options are used. If a dict, options
are used directly from the dict.
func_to_run
The function to run in a SLURM job.
func_opts
A dictionary of kwargs to run in `func_to_run`.
suffix_name
If given (default `None`), a suffix for the output folder e.g. _sort
"""
if not _is_slurm_installed():
raise RuntimeError("Cannot run with slurm, slurm is not found on this system.")
used_slurm_opts = default_slurm_options()
if isinstance(slurm_opts, dict):
used_slurm_opts.update(slurm_opts)
should_wait = used_slurm_opts.pop("wait")
env_name = used_slurm_opts.pop("env_name")
log_path = _make_job_log_output_path(log_base_path, suffix_name)
executor = _get_executor(log_path, used_slurm_opts)
job = executor.submit(
_wrap_function_with_env_setup, func_to_run, env_name, func_opts
)
if should_wait:
job.wait()
_send_user_start_message(func_to_run.__name__, log_path, job, func_opts)
return job
# Utils --------------------------------------------------------------------------------
def _get_executor(log_path: Path, slurm_opts: dict) -> submitit.AutoExecutor:
"""
Return the executor object that defines parameters of the SLURM node to
request and the path to logs.
Parameters
----------
log_path
Path to log the SLURM output to.
slurm_opts
The slurm options to run.
Returns
-------
executor
submitit executor object defining requested SLURM node parameters.
"""
print(f"\nThe SLURM batch output logs will " f"be saved to {log_path}\n")
executor = submitit.AutoExecutor(
folder=log_path,
)
executor.update_parameters(**slurm_opts)
return executor
def _wrap_function_with_env_setup(
function: Callable, env_name: str, func_opts: dict
) -> None:
"""
Set up the environment from within the SLURM job,
prior to running the processing function.
This is required to set up the conda environment within the job
or the processing function will fail.
Parameters
----------
function
A function to run in the SLURM job.
env_name
The name of the conda environment to run the job in
func_opts
All arguments passed to the public function.
"""
print(f"\nrunning {function.__name__} with SLURM....\n")
subprocess.run(
f"module load miniconda; module load matlab; source activate {env_name}; module load cuda", # TODO: expose this, these are SWC defaults. matlab not always necessary.
executable="/bin/bash",
shell=True,
)
function(**func_opts)
def _make_job_log_output_path(log_base_path: Path, suffix_name: str | None) -> Path:
"""
The SLURM job logs are saved to a folder 'slurm_logs'.
Parameters
----------
log_base_path
Path to the folder that will contain the folder 'slurm_logs'
Returns
-------
log_path
The path to the SLURM log output folder for the current job.
The logs are saved to a folder with the machine datetime as name.
"""
now = datetime.datetime.now()
folder_name = f"{now.strftime('%Y-%m-%d_%H-%M-%S')}"
if suffix_name is not None:
folder_name += suffix_name
log_subpath = Path("slurm_logs") / folder_name
log_path = log_base_path / log_subpath
log_path.mkdir(exist_ok=True, parents=True)
return log_path
def _send_user_start_message(
processing_function: str, log_path: Path, job: submitit.Job, func_opts: dict
) -> None:
"""
Convenience function to print important information
regarding the SLURM job.
Parameters
----------
processing_function
The function being run (i.e. run_full_pipeline, run_sorting)
log_path
The path to the SLURM log output folder for the current job.
job
submitit.job object holding the SLURM job_id
func_opts
Keyword arguments passed to the function to run in SLURM.
"""
_utils.message_user(
f"---------------------- SLURM job submitted ----------------------\n"
f"The function {processing_function} submitted to SLURM with job id {job.job_id}\n"
f"Output will be logged to: {log_path}\n"
f"Function called with arguments{func_opts}"
)
def _is_slurm_installed():
slurm_installed = _system_call_success("sinfo -v")
return slurm_installed
# TODO: reinstante slurm delete thing!