Changes between Version 1 and Version 2 of Workshops/JobCheckpointing/Examples


Ignore:
Timestamp:
01/22/26 11:34:55 (2 days ago)
Author:
Carl Baribault
Comment:

Filled in Python & R examples

Legend:

Unmodified
Added
Removed
Modified
  • Workshops/JobCheckpointing/Examples

    v1 v2  
    33(content subject to change prior to the workshop)
    44
     5== Python Example ==
     6
     7=== Checkpointed, self restarting job ===
     8
     9Here is a fully self restarting job and, further below, the accompanying, minimal working example of a checkpointed application in Python.
     10
     11Note that we're using the latest available Python '''module anaconda3/2023.07''' in partition '''centos7'''.
     12
     13{{{
     14#!/bin/bash
     15#SBATCH --job-name=checkpoint_example
     16#SBATCH --partition=centos7
     17#SBATCH --qos=normal
     18#SBATCH --nodes=1
     19#SBATCH --ntasks=1
     20#SBATCH --cpus-per-task=4
     21#SBATCH --time=24:00:00
     22#SBATCH --mem=16G
     23
     24# --- Logging ---
     25#SBATCH --output=output.%j.out
     26#SBATCH --error=error.%j.err
     27#SBATCH --open-mode=append
     28
     29# --- Enable automatic requeue ---
     30#SBATCH --requeue
     31
     32# --- Send SIGTERM 2 minutes before walltime ---
     33#SBATCH --signal=TERM@120
     34
     35set -euo pipefail
     36
     37echo "Job started at $(date)"
     38echo "SLURM_JOB_ID = ${SLURM_JOB_ID}"
     39echo "SLURM_RESTART_COUNT = ${SLURM_RESTART_COUNT:-0}"
     40
     41# ---------------------------------------------
     42# Application-specific configuration
     43# ---------------------------------------------
     44CHECKPOINT_DIR="$PWD/checkpoints"
     45CHECKPOINT_FILE="${CHECKPOINT_DIR}/state.chk"
     46
     47mkdir -p "${CHECKPOINT_DIR}"
     48
     49# ---------------------------------------------
     50# Launch the application
     51# ---------------------------------------------
     52# Your application must:
     53#  1) Load checkpoint if it exists
     54#  2) Catch SIGTERM
     55#  3) Write checkpoint
     56#  4) exit(99)
     57
     58module load anaconda3/2023.07
     59
     60srun ./my_simulation.py \
     61    --checkpoint "${CHECKPOINT_FILE}"
     62
     63EXIT_CODE=$?
     64
     65echo "Application exited with code ${EXIT_CODE}"
     66
     67# ---------------------------------------------
     68# Restart logic
     69# ---------------------------------------------
     70if [[ ${EXIT_CODE} -eq 0 ]]; then
     71    echo "INFO: Job completed successfully"
     72    exit 0
     73
     74elif [[ ${EXIT_CODE} -eq 99 ]]; then
     75    echo "INFO: Checkpoint written, requeuing job"
     76    scontrol requeue "${SLURM_JOB_ID}"
     77    exit 0
     78
     79else
     80    echo "ERROR: Job failed with unexpected exit code"
     81    exit ${EXIT_CODE}
     82fi
     83}}}
     84
     85=== Checkpointed application in Python ===
     86
     87Here is an accompanying, minimal working example of a checkpointed application for Python in file '''my_simulation.py'''.
     88
     89{{{
     90#!/usr/bin/env python3
     91import signal
     92import sys
     93import time
     94import os
     95import json
     96
     97CHECKPOINT_FILE = "checkpoints/state.chk"
     98
     99def save_checkpoint(i):
     100    os.makedirs("checkpoints", exist_ok=True)
     101    with open(CHECKPOINT_FILE, "w") as f:
     102        json.dump({"step": i}, f)
     103
     104def load_checkpoint():
     105    if os.path.exists(CHECKPOINT_FILE):
     106        with open(CHECKPOINT_FILE, "r") as f:
     107            return json.load(f)["step"]
     108    return 0
     109
     110def term_handler(signum, frame):
     111    print("SIGTERM received — saving checkpoint")
     112    save_checkpoint(current_step)
     113    sys.exit(99)  # <- special "requeue me" code
     114
     115signal.signal(signal.SIGTERM, term_handler)
     116
     117current_step = load_checkpoint()
     118print(f"Resuming from step {current_step}")
     119
     120for i in range(current_step, 1_000_000):
     121    current_step = i
     122    time.sleep(1)  # simulate work
     123}}}
     124
     125== R Example ==
     126
     127=== Checkpointed, self restarting job ===
     128
     129Here is a fully self restarting job and, further below, the accompanying, minimal working example of a checkpointed application in R.
     130
     131{{{
     132#!/bin/bash
     133#SBATCH --job-name=r_checkpoint_demo
     134#SBATCH --partition=centos7
     135#SBATCH --time=24:00:00
     136#SBATCH --nodes=1
     137#SBATCH --ntasks=1
     138#SBATCH --cpus-per-task=2
     139#SBATCH --mem=4G
     140#SBATCH --output=output.%j.out
     141#SBATCH --error=error.%j.err
     142#SBATCH --open-mode=append
     143#SBATCH --requeue
     144#SBATCH --signal=TERM@120   # send SIGTERM 120s before walltime
     145
     146set -euo pipefail
     147
     148mkdir -p logs checkpoints
     149
     150# Trap SIGTERM from SLURM: create a file flag that R checks for
     151trap 'echo "SIGTERM received, creating TERM.flag"; touch TERM.flag' TERM
     152
     153echo "Starting R checkpointing run at $(date)"
     154
     155# load the R module
     156module load R/4.4.1
     157
     158# Run the R script under srun
     159srun Rscript checkpoint.R || rc=$? || rc=0
     160
     161# srun exit code
     162rc=${rc:-0}
     163echo "R exited with code: $rc"
     164
     165if [[ $rc -eq 0 ]]; then
     166  echo "INFO: Finished successfully."
     167  exit 0
     168elif [[ $rc -eq 99 ]]; then
     169  echo "INFO: Checkpoint written (exit 99). Requeuing job..."
     170  rm -f TERM.flag
     171  scontrol requeue "$SLURM_JOB_ID"
     172  exit 0
     173else
     174  echo "ERROR: Unexpected failure (code $rc)."
     175  exit $rc
     176fi
     177}}}
     178
     179=== Checkpointed application in Python ===
     180
     181Here is an accompanying, minimal working example of a checkpointed application for R in file '''checkpoint.R'''.
     182
     183{{{
     184#!/usr/bin/env Rscript
     185
     186# Simple checkpointing/resume pattern for long runs in R.
     187# - Saves state as checkpoints/state.rds
     188# - Auto-resumes if that file exists
     189# - Periodically checkpoints every N iterations
     190# - If a TERM flag (created by SLURM trap) is detected, saves and exits(99)
     191
     192checkpoint_file <- "checkpoints/state.rds"
     193term_flag       <- "TERM.flag"        # created by the shell trap
     194dir.create("checkpoints", showWarnings = FALSE, recursive = TRUE)
     195
     196# --- Parameters you can tune ---
     197max_steps            <- 1e6L
     198checkpoint_every_n   <- 200L     # save every N iterations
     199sleep_seconds        <- 0.05     # simulate work
     200verbose              <- TRUE
     201
     202# --- Load or initialize state ---
     203state <- list(step = 0L, results = numeric())
     204if (file.exists(checkpoint_file)) {
     205  if (verbose) cat("Resuming from checkpoint:", checkpoint_file, "\n")
     206  state <- readRDS(checkpoint_file)
     207} else {
     208  if (verbose) cat("Starting fresh run\n")
     209}
     210
     211# --- Utility: save checkpoint ---
     212save_checkpoint <- function(st) {
     213  saveRDS(st, checkpoint_file)
     214  if (verbose) {
     215    cat(sprintf("Checkpoint saved at step %d -> %s\n", st$step, checkpoint_file))
     216  }
     217}
     218
     219# --- Main work loop ---
     220for (i in seq.int(state$step + 1L, max_steps)) {
     221  state$step <- i
     222
     223  # Simulate "work" (replace with your compute kernel)
     224  # e.g., update some running statistic
     225  x <- sin(i * 0.001) + rnorm(1, sd = 0.01)
     226  state$results <- c(state$results, x)
     227  if (sleep_seconds > 0) Sys.sleep(sleep_seconds)
     228
     229  # Periodic checkpoint
     230  if ((i %% checkpoint_every_n) == 0L) {
     231    save_checkpoint(state)
     232  }
     233
     234  # Respect pre-timeout signal from SLURM via a file flag
     235  if (file.exists(term_flag)) {
     236    cat("TERM flag detected. Saving final checkpoint and exiting with code 99.\n")
     237    save_checkpoint(state)
     238    quit(status = 99, save = "no")
     239  }
     240}
     241
     242# Finished normally
     243save_checkpoint(state)
     244cat("Completed all steps. Exiting with code 0.\n")
     245quit(status = 0, save = "no")
     246}}}