Unverified Commit 673822ea authored by Schwinn Saereesitthipitak's avatar Schwinn Saereesitthipitak Committed by GitHub
Browse files

fix(chrek): fix CI errors and replace SIGUSR2 with SIGKILL on checkpoint failure (#6478)

parent ea86df29
......@@ -20,7 +20,7 @@ Environment variables:
Signals handled in checkpoint mode:
- SIGUSR1: Checkpoint completed, exit process
- SIGCONT: Restore completed, wake model and continue
- SIGUSR2: Checkpoint/restore failed
- SIGKILL (from watcher on failure): Process is terminated immediately (unhandleable)
"""
import asyncio
......@@ -47,7 +47,6 @@ class CheckpointConfig:
self.is_checkpoint_job = bool(self.location)
self._checkpoint_done = asyncio.Event()
self._restore_done = asyncio.Event()
self._checkpoint_failed = asyncio.Event()
def checkpoint_exists(self) -> bool:
"""Check if a completed checkpoint already exists (idempotency).
......@@ -79,8 +78,8 @@ class CheckpointConfig:
await engine_client.sleep(level=sleep_level)
# Install signal handlers before writing the ready file so there is no
# window where the DaemonSet can send SIGUSR1/SIGUSR2/SIGCONT while the
# default signal disposition (terminate) is still in effect.
# window where the DaemonSet can send SIGUSR1/SIGCONT while the default
# signal disposition (terminate) is still in effect.
self._install_signal_handlers()
# Signal readiness
......@@ -88,7 +87,7 @@ class CheckpointConfig:
f.write("ready")
logger.info(
"Ready for checkpoint. Waiting for watcher signal "
"(SIGUSR1=checkpoint complete, SIGCONT=restore complete, SIGUSR2=failure)"
"(SIGUSR1=checkpoint complete, SIGCONT=restore complete)"
)
try:
......@@ -99,11 +98,9 @@ class CheckpointConfig:
await engine_client.wake_up()
return True
if event == "checkpoint":
logger.info("Checkpoint completion signal detected (SIGUSR1)")
return False
raise RuntimeError("Checkpoint failed (received SIGUSR2 from watcher)")
# SIGUSR1: checkpoint complete
logger.info("Checkpoint completion signal detected (SIGUSR1)")
return False
finally:
self._remove_signal_handlers()
# Remove the ready file so that a restarting pod does not leave a
......@@ -116,24 +113,22 @@ class CheckpointConfig:
def _install_signal_handlers(self) -> None:
loop = asyncio.get_running_loop()
loop.add_signal_handler(signal.SIGUSR1, self._checkpoint_done.set)
# SIGCONT is used as the restore-complete signal because SIGUSR1 and
# SIGUSR2 are already taken (checkpoint-complete and checkpoint-failed
# respectively). The chrek DaemonSet watcher is the only sender, so
# there is no conflict with POSIX job-control semantics in practice.
# SIGCONT is used as the restore-complete signal. The chrek DaemonSet
# watcher is the only sender, so there is no conflict with POSIX
# job-control semantics in practice.
loop.add_signal_handler(signal.SIGCONT, self._restore_done.set)
loop.add_signal_handler(signal.SIGUSR2, self._checkpoint_failed.set)
# No handler for checkpoint failure: the watcher sends SIGKILL, which
# terminates the process immediately (cannot be caught).
def _remove_signal_handlers(self) -> None:
loop = asyncio.get_running_loop()
loop.remove_signal_handler(signal.SIGUSR1)
loop.remove_signal_handler(signal.SIGCONT)
loop.remove_signal_handler(signal.SIGUSR2)
async def _wait_for_watcher_signal(self) -> str:
waiters = {
asyncio.create_task(self._checkpoint_done.wait()): "checkpoint",
asyncio.create_task(self._restore_done.wait()): "restore",
asyncio.create_task(self._checkpoint_failed.wait()): "failed",
}
try:
done, pending = await asyncio.wait(
......
......@@ -258,7 +258,7 @@ func (w *Watcher) handleRestorePodEvent(ctx context.Context, pod *corev1.Pod) {
// 1. Mark pod as in_progress
// 2. Resolve the container ID and host PID
// 3. Call orchestrate.Checkpoint (inspect → configure → CUDA lock/checkpoint → CRIU dump → rootfs diff)
// 4. SIGUSR1 the process on success (notify workload), SIGUSR2 on failure (wake it up)
// 4. SIGUSR1 the process on success (notify workload), SIGKILL on failure (terminate immediately)
// 5. Mark pod as completed or failed
func (w *Watcher) doCheckpoint(ctx context.Context, pod *corev1.Pod, checkpointHash, podKey string) {
defer w.release(podKey)
......@@ -315,8 +315,8 @@ func (w *Watcher) doCheckpoint(ctx context.Context, pod *corev1.Pod, checkpointH
if err := orchestrate.Checkpoint(ctx, w.containerd, log, req, w.config); err != nil {
log.Error(err, "Checkpoint failed")
emitPodEvent(ctx, w.clientset, log, pod, "chrek", corev1.EventTypeWarning, "CheckpointFailed", err.Error())
// SIGUSR2 on failure: tell the workload to wake up and continue
if signalErr := common.SendSignalToPID(log, containerPID, syscall.SIGUSR2, "checkpoint failed"); signalErr != nil {
// SIGKILL on failure: process is unrecoverable (CUDA locked), terminate immediately
if signalErr := common.SendSignalToPID(log, containerPID, syscall.SIGKILL, "checkpoint failed"); signalErr != nil {
log.Error(signalErr, "Failed to signal checkpoint failure to runtime process")
}
annotatePod(ctx, w.clientset, log, pod, map[string]string{kubeAnnotationCheckpointStatus: "failed"})
......
......@@ -161,9 +161,7 @@ The chart includes built-in validation to prevent all operator conflicts:
| dynamo-operator.webhook.certManager.certificate.rootCA.duration | string | `"87600h"` | Duration for the root CA certificate (e.g., "87600h" for 10 years). The root CA typically has a much longer lifetime than the leaf certificates it signs. |
| dynamo-operator.webhook.certManager.certificate.rootCA.renewBefore | string | `"720h"` | Time before root CA expiration to trigger renewal (e.g., "720h" for 30 days). Renewing a CA can be disruptive as all signed certificates must be reissued. |
| dynamo-operator.checkpoint.enabled | bool | `false` | Whether to enable checkpoint/restore functionality |
| dynamo-operator.checkpoint.initContainerImage | string | `"busybox:latest"` | Image used for init containers in checkpoint jobs (e.g., signal file cleanup) |
| dynamo-operator.checkpoint.readyForCheckpointFilePath | string | `"/tmp/ready-for-checkpoint"` | Path written by worker when model is loaded and ready for checkpointing |
| dynamo-operator.checkpoint.restoreMarkerFilePath | string | `"/tmp/dynamo-restored"` | Path written by restore-entrypoint after successful CRIU restore |
| dynamo-operator.checkpoint.storage.type | string | `"pvc"` | Storage backend type: pvc, s3, or oci |
| dynamo-operator.checkpoint.storage.pvc.pvcName | string | `"chrek-pvc"` | Name of the PVC created by the chrek chart |
| dynamo-operator.checkpoint.storage.pvc.basePath | string | `"/checkpoints"` | Base path within the PVC for storing checkpoints |
......
......@@ -327,9 +327,9 @@ Your application must implement the checkpoint flow. The DaemonSet communicates
- **`SIGUSR1`**: Checkpoint completed — your process should exit gracefully
- **`SIGCONT`**: Restore completed — your process should wake up and continue
- **`SIGUSR2`**: Checkpoint/restore failed
- **`SIGKILL`**: Checkpoint failed — process is terminated immediately (unhandleable)
Here's the pattern used by Dynamo vLLM (see `components/src/dynamo/vllm/chrek.py`):
Here's the pattern used by Dynamo vLLM (see `components/src/dynamo/vllm/checkpoint_restore.py`):
```python
import asyncio
......@@ -348,14 +348,13 @@ async def main():
# 1. Load your model/application
model = await load_model()
# 2. Optional: Put model to sleep for CRIU-friendly GPU state
# 2. Put model to sleep for CRIU-friendly GPU state
await model.sleep()
# 3. Write ready file — triggers DaemonSet checkpoint via readiness probe
with open(ready_file, "w") as f:
f.write("ready")
# 4. Set up signal handlers and wait for DaemonSet
# 3. Install signal handlers BEFORE writing the ready file to avoid a race
# where the DaemonSet sends a signal while default disposition (terminate)
# is still in effect. No handler needed for checkpoint failure — the
# watcher sends SIGKILL which terminates the process immediately.
checkpoint_done = asyncio.Event()
restore_done = asyncio.Event()
......@@ -363,9 +362,14 @@ async def main():
loop.add_signal_handler(signal.SIGUSR1, checkpoint_done.set)
loop.add_signal_handler(signal.SIGCONT, restore_done.set)
# 4. Write ready file — triggers DaemonSet checkpoint via readiness probe
with open(ready_file, "w") as f:
f.write("ready")
print("Ready for checkpoint. Waiting for watcher signal...")
# Wait for whichever signal comes first
# Wait for whichever signal comes first (SIGKILL on failure kills us
# immediately, so only success/restore signals reach this point)
done, pending = await asyncio.wait(
[asyncio.create_task(checkpoint_done.wait()),
asyncio.create_task(restore_done.wait())],
......@@ -390,11 +394,14 @@ async def main():
- Pod has `nvidia.com/chrek-is-checkpoint-source: "true"` label
- Pod status is `Ready` (readiness probe passes = ready file exists)
2. **Signal-based coordination**: The DaemonSet sends `SIGUSR1` after checkpoint completes and `SIGCONT` after restore completes. Your application must handle these signals (not poll for files).
2. **Signal handler ordering**: Install signal handlers **before** writing the ready file. Otherwise there is a race window where the DaemonSet sends a signal while the default disposition (terminate) is still in effect.
3. **Signal-based coordination**: The DaemonSet sends `SIGUSR1` after checkpoint completes, `SIGCONT` after restore completes, and `SIGKILL` if checkpoint fails. Your application must handle `SIGUSR1` and `SIGCONT` (not poll for files). `SIGKILL` cannot be caught — the kernel terminates the process immediately.
3. **Two exit paths**:
4. **Three exit paths**:
- **SIGUSR1 received**: Checkpoint complete, exit gracefully
- **SIGCONT received**: Process was restored, wake model and continue
- **SIGKILL received**: Checkpoint failed, process terminated immediately (no handler needed)
---
......@@ -490,7 +497,7 @@ The DaemonSet communicates checkpoint/restore completion via Unix signals, not f
|--------|-----------|---------|
| `SIGUSR1` | DaemonSet → checkpoint pod | Checkpoint completed, process should exit |
| `SIGCONT` | DaemonSet → restored pod | Restore completed, process should wake up |
| `SIGUSR2` | DaemonSet → checkpoint pod | Checkpoint failed (wake process to continue) |
| `SIGKILL` | DaemonSet → checkpoint pod | Checkpoint failed process terminated immediately |
CRIU tuning options are configured via the ChReK Helm chart's `config.checkpoint.criu` values, not environment variables. See the [Helm Chart Values](https://github.com/ai-dynamo/dynamo/tree/main/deploy/helm/charts/chrek/values.yaml) for available options.
......@@ -660,7 +667,7 @@ CRIU tuning options are configured via the ChReK Helm chart's `config.checkpoint
## Additional Resources
- [ChReK Helm Chart Values](https://github.com/ai-dynamo/dynamo/tree/main/deploy/helm/charts/chrek/values.yaml)
- [Dynamo vLLM ChReK Integration](https://github.com/ai-dynamo/dynamo/tree/main/components/src/dynamo/vllm/chrek.py) - Reference signal handler implementation
- [Dynamo vLLM ChReK Integration](https://github.com/ai-dynamo/dynamo/tree/main/components/src/dynamo/vllm/checkpoint_restore.py) - Reference signal handler implementation
- [ChReK Dockerfile](https://github.com/ai-dynamo/dynamo/tree/main/deploy/chrek/Dockerfile)
- [CRIU Documentation](https://criu.org/Main_Page)
- [CUDA Checkpoint Utility](https://github.com/NVIDIA/cuda-checkpoint)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment