Changes between Version 3 and Version 4 of Workshops/JobCheckpointing/Examples


Ignore:
Timestamp:
03/14/2026 01:25:38 AM (2 days ago)
Author:
Carl Baribault
Comment:

Broke out BASH, Python, R examples in separate pages

Legend:

Unmodified
Added
Removed
Modified
  • Workshops/JobCheckpointing/Examples

    v3 v4  
    22= Job Checkpointing Examples =
    33
    4 == Python Example ==
    5 
    6 === Checkpointed, self restarting job ===
    7 
    8 Here is a fully self restarting job and, further below, the accompanying, minimal working example of a checkpointed application in Python.
    9 
    10 Note that we're using the latest available Python '''module anaconda3/2023.07''' in partition '''centos7'''.
     4== Checkpoint Runner ==
     5
     6Here is a job script for a fully self restarting job for use with either of the accompanying, minimal working examples of checkpointed applications in BASH, Python, or R.
     7
     8=== checkpoint_runner.sh ===
    119
    1210{{{
    1311#!/bin/bash
    14 #SBATCH --job-name=checkpoint_example
    15 #SBATCH --partition=centos7
    16 #SBATCH --qos=normal
    17 #SBATCH --nodes=1
     12###############################################################################
     13# CHECKPOINT RUNNER: checkpoint + auto-requeue (Slurm 14.x-safe)
     14###############################################################################
     15#   MODULE_LIST     : modules loaded before running app command
     16#   APP_CMD         : command to run (string)
     17#   LAUNCH_MODE     : run app command directly or via srun
     18#   SRUN_ARGS       : arguments to srun, if needed
     19#   TIME_LIMIT      : wall time (D-HH:MM:SS or HH:MM:SS) sync w/ #SBATCH --time
     20#   MARGIN_SEC      : seconds before wall time to checkpoint (checkpoint_timer)
     21#   CKPT_PATH       : path to checkpoint file
     22#   CHECKPOINT_EVERY: checkpoint after every this many application iterations
     23#   MAX_ITER        : stop application after this many iterations total
     24#   MAX_RESTARTS    : stop requeuing after this many restarts (safety)
     25###############################################################################
     26# User settings with defaults
     27###############################################################################
     28# Edit settings below
     29# Or pass values via...
     30#   sbatch --export=ALL,Var1=...,...,VarN=... checkpoint_runner.sh
     31# Or pass values via...
     32#   Var1=... VarN=... sbatch checkpoint_runner.sh
     33###############################################################################
     34MODULE_LIST="${MODULE_LIST:-anaconda3/2023.07}"
     35                                           # space-delimited module list
     36APP_CMD="${APP_CMD:-python3 checkpoint_signal_iter.py}"
     37LAUNCH_MODE="${LAUNCH_MODE:-direct}"       # direct | srun
     38SRUN_ARGS="${SRUN_ARGS:--n 1}"             # extra srun flags
     39TIME_LIMIT="${TIME_LIMIT:-00:03:00}"       # match #SBATCH --time below
     40MARGIN_SEC="${MARGIN_SEC:-60}"             # checkpoint time before time limit
     41CKPT_PATH="${CKPT_PATH:-state_iter.txt}"   # checkpoint file path
     42CHECKPOINT_EVERY="${CHECKPOINT_EVERY:-20}" # number of iter. before checkpoint
     43MAX_ITER="${MAX_ITER:-500}"                # number of iter. total
     44MAX_RESTARTS="${MAX_RESTARTS:-10}"         # max. number of restarts
     45
     46###############################################################################
     47# Slurm directives (keep TIME_LIMIT in sync with #SBATCH --time)
     48###############################################################################
     49#SBATCH --job-name=ckpt_requeue_demo
     50#SBATCH --output=log_%j.out
     51#SBATCH --error=log_%j.err
     52#SBATCH --open-mode=append
     53#SBATCH --time=00:03:00                 # keep in sync with TIME_LIMIT above
    1854#SBATCH --ntasks=1
    19 #SBATCH --cpus-per-task=4
    20 #SBATCH --time=24:00:00
    21 #SBATCH --mem=16G
    22 
    23 # --- Logging ---
    24 #SBATCH --output=output.%j.out
    25 #SBATCH --error=error.%j.err
    26 #SBATCH --open-mode=append
    27 
    28 # --- Enable automatic requeue ---
     55#SBATCH --cpus-per-task=1
     56#SBATCH --mem=512M
    2957#SBATCH --requeue
    30 
    31 # --- Send SIGTERM 2 minutes before walltime ---
    32 #SBATCH --signal=TERM@120
     58#SBATCH --get-user-env
     59#SBATCH --partition=centos7             # change partition to defq if needed
     60#SBATCH --qos=normal                    # best qos for node availability
    3361
    3462set -euo pipefail
    35 
    36 echo "Job started at $(date)"
    37 echo "SLURM_JOB_ID = ${SLURM_JOB_ID}"
    38 echo "SLURM_RESTART_COUNT = ${SLURM_RESTART_COUNT:-0}"
    39 
    40 # ---------------------------------------------
    41 # Application-specific configuration
    42 # ---------------------------------------------
    43 CHECKPOINT_DIR="$PWD/checkpoints"
    44 CHECKPOINT_FILE="${CHECKPOINT_DIR}/state.chk"
    45 
    46 mkdir -p "${CHECKPOINT_DIR}"
    47 
    48 # ---------------------------------------------
    49 # Launch the application
    50 # ---------------------------------------------
    51 # Your application must:
    52 #  1) Load checkpoint if it exists
    53 #  2) Catch SIGTERM
    54 #  3) Write checkpoint
    55 #  4) exit(99)
    56 
    57 module load anaconda3/2023.07
    58 
    59 srun ./my_simulation.py \
    60     --checkpoint "${CHECKPOINT_FILE}"
    61 
    62 EXIT_CODE=$?
    63 
    64 echo "Application exited with code ${EXIT_CODE}"
    65 
    66 # ---------------------------------------------
    67 # Restart logic
    68 # ---------------------------------------------
    69 if [[ ${EXIT_CODE} -eq 0 ]]; then
    70     echo "INFO: Job completed successfully"
    71     exit 0
    72 
    73 elif [[ ${EXIT_CODE} -eq 99 ]]; then
    74     echo "INFO: Checkpoint written, requeuing job"
    75     scontrol requeue "${SLURM_JOB_ID}"
    76     exit 0
    77 
     63shopt -s expand_aliases
     64alias dtstamp="date +%Y%m%d-%H:%M:%S"
     65info(){ echo "Info[$(dtstamp)]: $*"; }
     66
     67info "Start on $(hostname); JOB_ID=${SLURM_JOB_ID}; RESTARTS=${SLURM_RESTART_COUNT:-0}"
     68info "Settings:"
     69info "MODULE_LIST=${MODULE_LIST}"
     70info "APP_CMD=${APP_CMD}"
     71info "LAUNCH_MODE=${LAUNCH_MODE}"
     72info "SRUN_ARGS=${SRUN_ARGS}"
     73info "TIME_LIMIT=${TIME_LIMIT}"
     74info "MARGIN_SEC=${MARGIN_SEC}"
     75info "CKPT_PATH=${CKPT_PATH}"
     76info "CHECKPOINT_EVERY=${CHECKPOINT_EVERY}"
     77info "MAX_ITER=${MAX_ITER}"
     78info "MAX_RESTARTS=${MAX_RESTARTS}"
     79
     80# Load site modules (available on Cypress workers)
     81module load slurm 2>/dev/null || true   # makes scontrol visible on worker
     82module load "${MODULE_LIST}" || true
     83
     84# Tool paths
     85SCTRL="$(command -v scontrol || true)"
     86
     87# Short diagnostics
     88if [[ -n "${SCTRL}" ]]; then
     89  echo "=== BEGIN JOB SNAPSHOT (scontrol) ==="
     90  "${SCTRL}" show job "${SLURM_JOB_ID}" | \
     91    grep -E "JobId=|Partition=|QOS=|TimeLimit=|StartTime=|EndTime=|RunTime=|State=|Restarts="
     92  echo "=== END JOB SNAPSHOT (scontrol) ==="
    7893else
    79     echo "ERROR: Job failed with unexpected exit code"
    80     exit ${EXIT_CODE}
     94  info "WARNING: scontrol not found on this node; in-place requeue will not be attempted."
     95fi
     96
     97# Helpers
     98get_ckpt_iter() {
     99  # reads the first numeric token from CKPT_PATH; returns 0 on any error
     100  [[ -f "${CKPT_PATH}" ]] || { echo 0; return; }
     101  local v
     102  v=$(tr -cd '0-9' < "${CKPT_PATH}" 2>/dev/null)
     103  echo "${v:-0}"
     104}
     105
     106get_restarts() {
     107  if [[ -n "${SLURM_RESTART_COUNT:-}" ]]; then
     108    echo "${SLURM_RESTART_COUNT}"
     109  elif [[ -n "${SCTRL}" ]]; then
     110    "${SCTRL}" show job "${SLURM_JOB_ID}" | tr ' ' '\n' | awk -F= '/^Restarts=/{print $2; exit}'
     111  else
     112    echo "0"
     113  fi
     114}
     115
     116to_seconds() {
     117  local t="$1"
     118  if [[ "$t" == *-*:*:* ]]; then
     119    local d h m s; IFS='-:' read -r d h m s <<<"$t"
     120    echo $(( d*86400 + h*3600 + m*60 + s ))
     121  else
     122    local h m s; IFS=':' read -r h m s <<<"$t"
     123    h=${h:-0}; m=${m:-0}; s=${s:-0}
     124    echo $(( h*3600 + m*60 + s ))
     125  fi
     126}
     127
     128# Trap (batch shell)
     129signal_handler () {
     130  info "TERM/INT caught in batch shell"
     131  local rc_local=0
     132
     133  if [[ -n "${child_pid:-}" ]]; then
     134    # srun or bash are group leaders; forward to their process group
     135    local child_pgid
     136    child_pgid="$(ps -o pgid= -p "${child_pid}" 2>/dev/null | awk '{print $1}')"
     137    if [[ -n "${child_pgid}" ]]; then
     138      kill -TERM "-${child_pgid}" 2>/dev/null || true
     139    else
     140      kill -TERM "${child_pid}" 2>/dev/null || true
     141    fi
     142    wait "${child_pid}" || rc_local=$?
     143  fi
     144
     145  info "Program exit code (from trap): ${rc_local}"
     146  local restarts; restarts=$(get_restarts)
     147  if [[ ${rc_local} -eq 99 && ${requeued:-0} -eq 0 && ${restarts} -lt ${MAX_RESTARTS} && -n "${SCTRL}" ]]; then
     148    requeued=1
     149    info "Checkpoint written (trap path). Requeueing in-place (same JobID)..."
     150    "${SCTRL}" requeue "${SLURM_JOB_ID}" || true
     151    info "Requeued via scontrol."
     152  fi
     153  exit 0
     154}
     155trap 'signal_handler' TERM INT
     156
     157# Launch via timeout
     158TOTAL_SEC=$(to_seconds "${TIME_LIMIT}")
     159RUN_WINDOW_SEC=$(( TOTAL_SEC - MARGIN_SEC ))
     160if (( RUN_WINDOW_SEC <= 0 )); then
     161  info "WARNING: RUN_WINDOW_SEC <= 0; using 1s."
     162  RUN_WINDOW_SEC=1
     163fi
     164
     165before_iter=$(get_ckpt_iter)
     166
     167set +e
     168if [[ "${LAUNCH_MODE}" == "srun" ]]; then
     169  # timeout -> srun -> bash -lc "APP_CMD"
     170  # On expiry, timeout sends TERM to srun; srun forwards signals to its job step tasks.
     171  timeout "${RUN_WINDOW_SEC}s" srun ${SRUN_ARGS} bash -lc "${APP_CMD}"
     172else
     173  # direct mode: run in the batch shell
     174  timeout "${RUN_WINDOW_SEC}s" bash -lc "${APP_CMD}"
     175fi
     176rc=$?
     177set -e
     178
     179info "Program exit code (from timeout wrapper): ${rc}"
     180
     181# Interpret coreutils 8.4 returns:
     182#   0   -> completed
     183#   99  -> app exited 99 before timeout expiry (valid)
     184#   124 -> timeout expired (TERM sent), treat as checkpoint cycle and requeue
     185#          (optionally confirm CKPT grew)
     186#   else-> unexpected -> propagate
     187requeued=0
     188if [[ ${rc} -eq 0 ]]; then
     189  info "Completed."
     190  exit 0
     191elif [[ ${rc} -eq 99 ]]; then
     192  restarts=$(get_restarts)
     193  if (( restarts < MAX_RESTARTS )) && [[ -n "${SCTRL}" ]]; then
     194    requeued=1
     195    info "Checkpoint written. Requeueing in-place (same JobID)..."
     196    "${SCTRL}" requeue "${SLURM_JOB_ID}" || true
     197    info "Requeued via scontrol."
     198  else
     199    info "WARNING: cannot requeue (scontrol unavailable or MAX_RESTARTS reached)."
     200  fi
     201  exit 0
     202elif [[ ${rc} -eq 124 ]]; then
     203        after_iter=$(get_ckpt_iter)
     204        if (( after_iter > before_iter )); then
     205          restarts=$(get_restarts)
     206          if (( restarts < MAX_RESTARTS )) && [[ -n "${SCTRL}" ]]; then
     207            requeued=1
     208            info "Timeout TERM observed; checkpoint advanced (${before_iter}->${after_iter}). Requeueing..."
     209            "${SCTRL}" requeue "${SLURM_JOB_ID}" || true
     210            info "Requeued via scontrol."
     211            exit 0
     212          else
     213            info "WARNING: cannot requeue (scontrol unavailable or MAX_RESTARTS reached)."
     214            exit 0
     215          fi
     216        else
     217          info "Timeout TERM observed but checkpoint did not advance; marking as failure."
     218          exit 1
     219        fi
     220else
     221  info "Unexpected exit code: ${rc}"
     222  exit "${rc}"
    81223fi
    82224}}}
    83225
    84 === Checkpointed application in Python ===
    85 
    86 Here is an accompanying, minimal working example of a checkpointed application for Python in file '''my_simulation.py'''.
    87 
    88 {{{
    89 #!/usr/bin/env python3
    90 import signal
    91 import sys
    92 import time
    93 import os
    94 import json
    95 
    96 CHECKPOINT_FILE = "checkpoints/state.chk"
    97 
    98 def save_checkpoint(i):
    99     os.makedirs("checkpoints", exist_ok=True)
    100     with open(CHECKPOINT_FILE, "w") as f:
    101         json.dump({"step": i}, f)
    102 
    103 def load_checkpoint():
    104     if os.path.exists(CHECKPOINT_FILE):
    105         with open(CHECKPOINT_FILE, "r") as f:
    106             return json.load(f)["step"]
    107     return 0
    108 
    109 def term_handler(signum, frame):
    110     print("SIGTERM received — saving checkpoint")
    111     save_checkpoint(current_step)
    112     sys.exit(99)  # <- special "requeue me" code
    113 
    114 signal.signal(signal.SIGTERM, term_handler)
    115 
    116 current_step = load_checkpoint()
    117 print(f"Resuming from step {current_step}")
    118 
    119 for i in range(current_step, 1_000_000):
    120     current_step = i
    121     time.sleep(1)  # simulate work
    122 }}}
    123 
    124 == R Example ==
    125 
    126 === Checkpointed, self restarting job ===
    127 
    128 Here is a fully self restarting job and, further below, the accompanying, minimal working example of a checkpointed application in R.
    129 
    130 {{{
    131 #!/bin/bash
    132 #SBATCH --job-name=r_checkpoint_demo
    133 #SBATCH --partition=centos7
    134 #SBATCH --time=24:00:00
    135 #SBATCH --nodes=1
    136 #SBATCH --ntasks=1
    137 #SBATCH --cpus-per-task=2
    138 #SBATCH --mem=4G
    139 #SBATCH --output=output.%j.out
    140 #SBATCH --error=error.%j.err
    141 #SBATCH --open-mode=append
    142 #SBATCH --requeue
    143 #SBATCH --signal=TERM@120   # send SIGTERM 120s before walltime
    144 
    145 set -euo pipefail
    146 
    147 mkdir -p logs checkpoints
    148 
    149 # Trap SIGTERM from SLURM: create a file flag that R checks for
    150 trap 'echo "SIGTERM received, creating TERM.flag"; touch TERM.flag' TERM
    151 
    152 echo "Starting R checkpointing run at $(date)"
    153 
    154 # load the R module
    155 module load R/4.4.1
    156 
    157 # Run the R script under srun
    158 srun Rscript checkpoint.R || rc=$? || rc=0
    159 
    160 # srun exit code
    161 rc=${rc:-0}
    162 echo "R exited with code: $rc"
    163 
    164 if [[ $rc -eq 0 ]]; then
    165   echo "INFO: Finished successfully."
    166   exit 0
    167 elif [[ $rc -eq 99 ]]; then
    168   echo "INFO: Checkpoint written (exit 99). Requeuing job..."
    169   rm -f TERM.flag
    170   scontrol requeue "$SLURM_JOB_ID"
    171   exit 0
    172 else
    173   echo "ERROR: Unexpected failure (code $rc)."
    174   exit $rc
    175 fi
    176 }}}
    177 
    178 === Checkpointed application in Python ===
    179 
    180 Here is an accompanying, minimal working example of a checkpointed application for R in file '''checkpoint.R'''.
    181 
    182 {{{
    183 #!/usr/bin/env Rscript
    184 
    185 # Simple checkpointing/resume pattern for long runs in R.
    186 # - Saves state as checkpoints/state.rds
    187 # - Auto-resumes if that file exists
    188 # - Periodically checkpoints every N iterations
    189 # - If a TERM flag (created by SLURM trap) is detected, saves and exits(99)
    190 
    191 checkpoint_file <- "checkpoints/state.rds"
    192 term_flag       <- "TERM.flag"        # created by the shell trap
    193 dir.create("checkpoints", showWarnings = FALSE, recursive = TRUE)
    194 
    195 # --- Parameters you can tune ---
    196 max_steps            <- 1e6L
    197 checkpoint_every_n   <- 200L     # save every N iterations
    198 sleep_seconds        <- 0.05     # simulate work
    199 verbose              <- TRUE
    200 
    201 # --- Load or initialize state ---
    202 state <- list(step = 0L, results = numeric())
    203 if (file.exists(checkpoint_file)) {
    204   if (verbose) cat("Resuming from checkpoint:", checkpoint_file, "\n")
    205   state <- readRDS(checkpoint_file)
    206 } else {
    207   if (verbose) cat("Starting fresh run\n")
    208 }
    209 
    210 # --- Utility: save checkpoint ---
    211 save_checkpoint <- function(st) {
    212   saveRDS(st, checkpoint_file)
    213   if (verbose) {
    214     cat(sprintf("Checkpoint saved at step %d -> %s\n", st$step, checkpoint_file))
    215   }
    216 }
    217 
    218 # --- Main work loop ---
    219 for (i in seq.int(state$step + 1L, max_steps)) {
    220   state$step <- i
    221 
    222   # Simulate "work" (replace with your compute kernel)
    223   # e.g., update some running statistic
    224   x <- sin(i * 0.001) + rnorm(1, sd = 0.01)
    225   state$results <- c(state$results, x)
    226   if (sleep_seconds > 0) Sys.sleep(sleep_seconds)
    227 
    228   # Periodic checkpoint
    229   if ((i %% checkpoint_every_n) == 0L) {
    230     save_checkpoint(state)
    231   }
    232 
    233   # Respect pre-timeout signal from SLURM via a file flag
    234   if (file.exists(term_flag)) {
    235     cat("TERM flag detected. Saving final checkpoint and exiting with code 99.\n")
    236     save_checkpoint(state)
    237     quit(status = 99, save = "no")
    238   }
    239 }
    240 
    241 # Finished normally
    242 save_checkpoint(state)
    243 cat("Completed all steps. Exiting with code 0.\n")
    244 quit(status = 0, save = "no")
    245 }}}
     226== BASH Checkpointing Example ==
     227
     228See [wiki:Workshops/JobCheckpointing/Examples/BASH BASH Checkpointing Example]
     229
     230== Python Checkpointing Example ==
     231
     232See [wiki:Workshops/JobCheckpointing/Examples/Python Python Checkpointing Example]
     233
     234== R Checkpointing Example ==
     235
     236See [wiki:Workshops/JobCheckpointing/Examples/R R Checkpointing Example]
     237