Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
ba9a8a9f
Unverified
Commit
ba9a8a9f
authored
Mar 23, 2026
by
Schwinn Saereesitthipitak
Committed by
GitHub
Mar 23, 2026
Browse files
fix(snapshot): persist CUDA namespace PIDs in manifest (#7539)
parent
ed4d8068
Changes
9
Show whitespace changes
Inline
Side-by-side
Showing
9 changed files
with
399 additions
and
64 deletions
+399
-64
components/src/dynamo/common/utils/snapshot.py
components/src/dynamo/common/utils/snapshot.py
+9
-0
deploy/snapshot/Dockerfile
deploy/snapshot/Dockerfile
+6
-3
deploy/snapshot/pkg/common/process.go
deploy/snapshot/pkg/common/process.go
+182
-1
deploy/snapshot/pkg/common/process_test.go
deploy/snapshot/pkg/common/process_test.go
+119
-0
deploy/snapshot/pkg/cuda/cuda.go
deploy/snapshot/pkg/cuda/cuda.go
+5
-46
deploy/snapshot/pkg/cuda/shim.go
deploy/snapshot/pkg/cuda/shim.go
+23
-0
deploy/snapshot/pkg/executor/checkpoint.go
deploy/snapshot/pkg/executor/checkpoint.go
+22
-7
deploy/snapshot/pkg/executor/nsrestore.go
deploy/snapshot/pkg/executor/nsrestore.go
+31
-6
deploy/snapshot/pkg/types/inspect.go
deploy/snapshot/pkg/types/inspect.go
+2
-1
No files found.
components/src/dynamo/common/utils/snapshot.py
View file @
ba9a8a9f
...
...
@@ -167,6 +167,15 @@ def configure_checkpoint_transport_env() -> None:
)
os
.
environ
[
"NCCL_IB_DISABLE"
]
=
"1"
nccl_ras_enable
=
os
.
environ
.
get
(
"NCCL_RAS_ENABLE"
)
if
nccl_ras_enable
and
nccl_ras_enable
!=
"0"
:
logger
.
warning
(
"Overriding NCCL_RAS_ENABLE=%r with '0' for checkpoint mode "
"because NCCL RAS background state is not part of the checkpoint contract"
,
nccl_ras_enable
,
)
os
.
environ
[
"NCCL_RAS_ENABLE"
]
=
"0"
torch_nccl_monitoring
=
os
.
environ
.
get
(
"TORCH_NCCL_ENABLE_MONITORING"
)
if
torch_nccl_monitoring
and
torch_nccl_monitoring
!=
"0"
:
logger
.
warning
(
...
...
deploy/snapshot/Dockerfile
View file @
ba9a8a9f
...
...
@@ -17,7 +17,7 @@
ARG
DOCKER_PROXY
ARG
GO_VERSION=1.25
ARG
CRIU_REPO=https://github.com/dfeigin-nv/criu.git
ARG
CRIU_
VERSION=add-aio-and-parallel-memfd
ARG
CRIU_
COMMIT=777baaf27f6a76f743c9bf24b64886297dc0129b
ARG
AGENT_BASE_IMAGE=nvcr.io/nvidia/cuda-dl-base:25.11-cuda13.0-devel-ubuntu24.04
# For placeholder target only - this default allows agent builds to succeed,
...
...
@@ -76,7 +76,7 @@ RUN CGO_ENABLED=0 GOOS=${TARGETOS} GOARCH=${TARGETARCH} go build -ldflags="-w -s
FROM
ubuntu:24.04 AS criu-builder
ARG
CRIU_REPO
ARG
CRIU_
VERSION
ARG
CRIU_
COMMIT
RUN
apt-get update
&&
apt-get
install
-y
--no-install-recommends
\
git
\
...
...
@@ -99,8 +99,11 @@ RUN apt-get update && apt-get install -y --no-install-recommends \
uuid-dev
\
&&
rm
-rf
/var/lib/apt/lists/
*
RUN
git
clone
--depth
1
--branch
${
CRIU_VERSION
}
${
CRIU_REPO
}
/tmp/criu
\
RUN
git
init
/tmp/criu
\
&&
cd
/tmp/criu
\
&&
git remote add origin
${
CRIU_REPO
}
\
&&
git fetch
--depth
1 origin
${
CRIU_COMMIT
}
\
&&
git checkout FETCH_HEAD
\
&&
make
-j
$(
nproc
)
\
&&
make
DESTDIR
=
/criu-install install-criu install-lib install-cuda_plugin
...
...
deploy/snapshot/pkg/common/process.go
View file @
ba9a8a9f
...
...
@@ -3,6 +3,8 @@ package common
import
(
"fmt"
"os"
"path/filepath"
"sort"
"strconv"
"strings"
"syscall"
...
...
@@ -14,8 +16,187 @@ import (
// HostProcPath is the mount point for the host's /proc in DaemonSet pods.
const
HostProcPath
=
"/host/proc"
// ProcessDetails captures the parent link plus the observed, outermost, and innermost
// PID views for one proc entry. ObservedPID is relative to the proc root being read.
type
ProcessDetails
struct
{
ObservedPID
int
ParentPID
int
OutermostPID
int
InnermostPID
int
NamespacePIDs
[]
int
Cmdline
string
}
// ReadProcessDetails reads one proc entry from a proc root.
func
ReadProcessDetails
(
procRoot
string
,
pid
int
)
(
ProcessDetails
,
error
)
{
if
pid
<=
0
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"invalid PID %d"
,
pid
)
}
statusPath
:=
filepath
.
Join
(
procRoot
,
strconv
.
Itoa
(
pid
),
"status"
)
statusBytes
,
err
:=
os
.
ReadFile
(
statusPath
)
if
err
!=
nil
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"failed to read %s: %w"
,
statusPath
,
err
)
}
status
:=
string
(
statusBytes
)
parentPID
:=
0
parentPIDFound
:=
false
for
_
,
line
:=
range
strings
.
Split
(
status
,
"
\n
"
)
{
if
strings
.
HasPrefix
(
line
,
"PPid:"
)
{
value
:=
strings
.
TrimSpace
(
strings
.
TrimPrefix
(
line
,
"PPid:"
))
parsed
,
err
:=
strconv
.
Atoi
(
value
)
if
err
!=
nil
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"failed to parse PPid value %q: %w"
,
value
,
err
)
}
parentPID
=
parsed
parentPIDFound
=
true
break
}
}
if
!
parentPIDFound
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"missing PPid in process status"
)
}
var
nspids
[]
int
for
_
,
line
:=
range
strings
.
Split
(
status
,
"
\n
"
)
{
if
!
strings
.
HasPrefix
(
line
,
"NSpid:"
)
{
continue
}
fields
:=
strings
.
Fields
(
strings
.
TrimPrefix
(
line
,
"NSpid:"
))
if
len
(
fields
)
==
0
{
break
}
nspids
=
make
([]
int
,
0
,
len
(
fields
))
for
_
,
field
:=
range
fields
{
value
,
err
:=
strconv
.
Atoi
(
field
)
if
err
!=
nil
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"failed to parse NSpid %q: %w"
,
field
,
err
)
}
nspids
=
append
(
nspids
,
value
)
}
break
}
if
len
(
nspids
)
==
0
{
return
ProcessDetails
{},
fmt
.
Errorf
(
"missing NSpid in process status"
)
}
cmdline
:=
""
if
data
,
err
:=
os
.
ReadFile
(
filepath
.
Join
(
procRoot
,
strconv
.
Itoa
(
pid
),
"cmdline"
));
err
==
nil
{
cmdline
=
strings
.
TrimSpace
(
strings
.
ReplaceAll
(
string
(
data
),
"
\x00
"
,
" "
))
}
if
cmdline
==
""
{
comm
,
err
:=
os
.
ReadFile
(
filepath
.
Join
(
procRoot
,
strconv
.
Itoa
(
pid
),
"comm"
))
if
err
==
nil
{
cmdline
=
strings
.
TrimSpace
(
string
(
comm
))
}
}
return
ProcessDetails
{
ObservedPID
:
pid
,
ParentPID
:
parentPID
,
OutermostPID
:
nspids
[
0
],
InnermostPID
:
nspids
[
len
(
nspids
)
-
1
],
NamespacePIDs
:
nspids
,
Cmdline
:
cmdline
,
},
nil
}
// ReadProcessTable snapshots every numeric proc entry under procRoot.
// Used by restore-side PID remap and diagnostics after CRIU restore.
func
ReadProcessTable
(
procRoot
string
)
([]
ProcessDetails
,
error
)
{
entries
,
err
:=
os
.
ReadDir
(
procRoot
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to read proc root %s: %w"
,
procRoot
,
err
)
}
processes
:=
make
([]
ProcessDetails
,
0
,
len
(
entries
))
for
_
,
entry
:=
range
entries
{
if
!
entry
.
IsDir
()
{
continue
}
pid
,
err
:=
strconv
.
Atoi
(
entry
.
Name
())
if
err
!=
nil
{
continue
}
process
,
err
:=
ReadProcessDetails
(
procRoot
,
pid
)
if
err
!=
nil
{
continue
}
processes
=
append
(
processes
,
process
)
}
// Keep restore diagnostics deterministic by ordering on the manifest-facing PID view first.
sort
.
Slice
(
processes
,
func
(
i
,
j
int
)
bool
{
leftPID
:=
processes
[
i
]
.
ObservedPID
if
processes
[
i
]
.
InnermostPID
>
0
{
leftPID
=
processes
[
i
]
.
InnermostPID
}
rightPID
:=
processes
[
j
]
.
ObservedPID
if
processes
[
j
]
.
InnermostPID
>
0
{
rightPID
=
processes
[
j
]
.
InnermostPID
}
if
leftPID
==
rightPID
{
return
processes
[
i
]
.
ObservedPID
<
processes
[
j
]
.
ObservedPID
}
return
leftPID
<
rightPID
})
return
processes
,
nil
}
// ResolveManifestPIDsToObservedPIDs is the restore-side remap from checkpoint-time
// innermost namespace PIDs onto the current observed PIDs in the restored subtree rooted at restoredPID.
func
ResolveManifestPIDsToObservedPIDs
(
processes
[]
ProcessDetails
,
restoredPID
int
,
manifestPIDs
[]
int
)
([]
int
,
error
)
{
processByObservedPID
:=
make
(
map
[
int
]
ProcessDetails
,
len
(
processes
))
childrenByParentPID
:=
make
(
map
[
int
][]
int
,
len
(
processes
))
for
_
,
process
:=
range
processes
{
processByObservedPID
[
process
.
ObservedPID
]
=
process
childrenByParentPID
[
process
.
ParentPID
]
=
append
(
childrenByParentPID
[
process
.
ParentPID
],
process
.
ObservedPID
)
}
if
_
,
ok
:=
processByObservedPID
[
restoredPID
];
!
ok
{
return
nil
,
fmt
.
Errorf
(
"restored root pid %d not found in process table"
,
restoredPID
)
}
innermostToObservedPID
:=
map
[
int
]
int
{}
queue
:=
[]
int
{
restoredPID
}
for
len
(
queue
)
>
0
{
pid
:=
queue
[
0
]
queue
=
queue
[
1
:
]
process
,
ok
:=
processByObservedPID
[
pid
]
if
!
ok
{
continue
}
if
len
(
process
.
NamespacePIDs
)
!=
2
{
return
nil
,
fmt
.
Errorf
(
"restored process %d has namespace depth %d, want 2"
,
pid
,
len
(
process
.
NamespacePIDs
))
}
if
existingPID
,
ok
:=
innermostToObservedPID
[
process
.
InnermostPID
];
ok
{
return
nil
,
fmt
.
Errorf
(
"multiple restored processes map to innermost pid %d: %d and %d"
,
process
.
InnermostPID
,
existingPID
,
process
.
ObservedPID
)
}
innermostToObservedPID
[
process
.
InnermostPID
]
=
process
.
ObservedPID
queue
=
append
(
queue
,
childrenByParentPID
[
pid
]
...
)
}
restorePIDs
:=
make
([]
int
,
0
,
len
(
manifestPIDs
))
for
_
,
manifestPID
:=
range
manifestPIDs
{
observedPID
,
ok
:=
innermostToObservedPID
[
manifestPID
]
if
!
ok
{
return
nil
,
fmt
.
Errorf
(
"manifest cuda pid %d not found under restored subtree rooted at %d"
,
manifestPID
,
restoredPID
)
}
restorePIDs
=
append
(
restorePIDs
,
observedPID
)
}
return
restorePIDs
,
nil
}
// ProcessTreePIDs walks the process tree rooted at rootPID and returns all PIDs.
// Used
by nsrestore for in-namespace CUDA PID discovery
.
// Used
during checkpoint to enumerate the source process tree before CUDA filtering
.
func
ProcessTreePIDs
(
rootPID
int
)
[]
int
{
if
rootPID
<=
0
{
return
nil
...
...
deploy/snapshot/pkg/common/process_test.go
View file @
ba9a8a9f
package
common
import
(
"os"
"path/filepath"
"strconv"
"testing"
)
...
...
@@ -65,3 +68,119 @@ func TestParseProcExitCode(t *testing.T) {
})
}
}
func
TestReadProcessDetails
(
t
*
testing
.
T
)
{
procRoot
:=
t
.
TempDir
()
pid
:=
1018
procDir
:=
filepath
.
Join
(
procRoot
,
"1018"
)
if
err
:=
os
.
MkdirAll
(
procDir
,
0755
);
err
!=
nil
{
t
.
Fatalf
(
"MkdirAll(%q): %v"
,
procDir
,
err
)
}
if
err
:=
os
.
WriteFile
(
filepath
.
Join
(
procDir
,
"status"
),
[]
byte
(
"Name:
\t
python3
\n
PPid:
\t
0
\n
NSpid:
\t
2402711 1018
\n
"
),
0644
);
err
!=
nil
{
t
.
Fatalf
(
"WriteFile(status): %v"
,
err
)
}
if
err
:=
os
.
WriteFile
(
filepath
.
Join
(
procDir
,
"cmdline"
),
[]
byte
(
"python3
\x00
-m
\x00
dynamo.vllm
\x00
"
),
0644
);
err
!=
nil
{
t
.
Fatalf
(
"WriteFile(cmdline): %v"
,
err
)
}
details
,
err
:=
ReadProcessDetails
(
procRoot
,
pid
)
if
err
!=
nil
{
t
.
Fatalf
(
"ReadProcessDetails(%q, %d): %v"
,
procRoot
,
pid
,
err
)
}
if
details
.
ObservedPID
!=
1018
{
t
.
Fatalf
(
"ObservedPID = %d, want 1018"
,
details
.
ObservedPID
)
}
if
details
.
ParentPID
!=
0
{
t
.
Fatalf
(
"ParentPID = %d, want 0"
,
details
.
ParentPID
)
}
if
details
.
OutermostPID
!=
2402711
{
t
.
Fatalf
(
"OutermostPID = %d, want 2402711"
,
details
.
OutermostPID
)
}
if
details
.
InnermostPID
!=
1018
{
t
.
Fatalf
(
"InnermostPID = %d, want 1018"
,
details
.
InnermostPID
)
}
if
len
(
details
.
NamespacePIDs
)
!=
2
||
details
.
NamespacePIDs
[
0
]
!=
2402711
||
details
.
NamespacePIDs
[
1
]
!=
1018
{
t
.
Fatalf
(
"NamespacePIDs = %v, want [2402711 1018]"
,
details
.
NamespacePIDs
)
}
if
details
.
Cmdline
!=
"python3 -m dynamo.vllm"
{
t
.
Fatalf
(
"Cmdline = %q, want %q"
,
details
.
Cmdline
,
"python3 -m dynamo.vllm"
)
}
}
func
TestReadProcessTable
(
t
*
testing
.
T
)
{
procRoot
:=
t
.
TempDir
()
writeEntry
:=
func
(
pid
int
,
status
string
,
cmdline
string
)
{
t
.
Helper
()
procDir
:=
filepath
.
Join
(
procRoot
,
strconv
.
Itoa
(
pid
))
if
err
:=
os
.
MkdirAll
(
procDir
,
0755
);
err
!=
nil
{
t
.
Fatalf
(
"MkdirAll(%q): %v"
,
procDir
,
err
)
}
if
err
:=
os
.
WriteFile
(
filepath
.
Join
(
procDir
,
"status"
),
[]
byte
(
status
),
0644
);
err
!=
nil
{
t
.
Fatalf
(
"WriteFile(status): %v"
,
err
)
}
if
err
:=
os
.
WriteFile
(
filepath
.
Join
(
procDir
,
"cmdline"
),
[]
byte
(
cmdline
),
0644
);
err
!=
nil
{
t
.
Fatalf
(
"WriteFile(cmdline): %v"
,
err
)
}
}
writeEntry
(
768
,
"Name:
\t
worker
\n
PPid:
\t
1
\n
NSpid:
\t
2444000 768
\n
"
,
"VLLM::Worker_TP0
\x00
"
)
writeEntry
(
1
,
"Name:
\t
python3
\n
PPid:
\t
0
\n
NSpid:
\t
2443990 1
\n
"
,
"python3
\x00
-m
\x00
dynamo.vllm
\x00
"
)
processes
,
err
:=
ReadProcessTable
(
procRoot
)
if
err
!=
nil
{
t
.
Fatalf
(
"ReadProcessTable(%q): %v"
,
procRoot
,
err
)
}
if
len
(
processes
)
!=
2
{
t
.
Fatalf
(
"len(ReadProcessTable(%q)) = %d, want 2"
,
procRoot
,
len
(
processes
))
}
if
processes
[
0
]
.
InnermostPID
!=
1
||
processes
[
1
]
.
InnermostPID
!=
768
{
t
.
Fatalf
(
"process order innermost PIDs = [%d %d], want [1 768]"
,
processes
[
0
]
.
InnermostPID
,
processes
[
1
]
.
InnermostPID
)
}
}
func
TestResolveManifestPIDsToObservedPIDs
(
t
*
testing
.
T
)
{
processes
:=
[]
ProcessDetails
{
{
ObservedPID
:
1
,
ParentPID
:
0
,
OutermostPID
:
1
,
InnermostPID
:
1
,
NamespacePIDs
:
[]
int
{
1
},
Cmdline
:
"sleep infinity"
},
{
ObservedPID
:
50
,
ParentPID
:
0
,
OutermostPID
:
50
,
InnermostPID
:
50
,
NamespacePIDs
:
[]
int
{
50
},
Cmdline
:
"nsrestore"
},
{
ObservedPID
:
74
,
ParentPID
:
50
,
OutermostPID
:
74
,
InnermostPID
:
1
,
NamespacePIDs
:
[]
int
{
74
,
1
},
Cmdline
:
"python3 -m dynamo.vllm"
},
{
ObservedPID
:
80
,
ParentPID
:
74
,
OutermostPID
:
80
,
InnermostPID
:
750
,
NamespacePIDs
:
[]
int
{
80
,
750
},
Cmdline
:
"VLLM::EngineCore"
},
{
ObservedPID
:
81
,
ParentPID
:
74
,
OutermostPID
:
81
,
InnermostPID
:
749
,
NamespacePIDs
:
[]
int
{
81
,
749
},
Cmdline
:
"resource_tracker"
},
}
resolved
,
err
:=
ResolveManifestPIDsToObservedPIDs
(
processes
,
74
,
[]
int
{
1
,
750
})
if
err
!=
nil
{
t
.
Fatalf
(
"ResolveManifestPIDsToObservedPIDs(...) returned error: %v"
,
err
)
}
if
len
(
resolved
)
!=
2
{
t
.
Fatalf
(
"len(resolved) = %d, want 2"
,
len
(
resolved
))
}
if
resolved
[
0
]
!=
74
||
resolved
[
1
]
!=
80
{
t
.
Fatalf
(
"resolved PIDs = %v, want [74 80]"
,
resolved
)
}
}
func
TestResolveManifestPIDsToObservedPIDsFailsWhenManifestPIDMissingFromRestoredSubtree
(
t
*
testing
.
T
)
{
processes
:=
[]
ProcessDetails
{
{
ObservedPID
:
1
,
ParentPID
:
0
,
OutermostPID
:
1
,
InnermostPID
:
1
,
NamespacePIDs
:
[]
int
{
1
},
Cmdline
:
"sleep infinity"
},
{
ObservedPID
:
50
,
ParentPID
:
0
,
OutermostPID
:
50
,
InnermostPID
:
50
,
NamespacePIDs
:
[]
int
{
50
},
Cmdline
:
"nsrestore"
},
{
ObservedPID
:
74
,
ParentPID
:
50
,
OutermostPID
:
74
,
InnermostPID
:
1
,
NamespacePIDs
:
[]
int
{
74
,
1
},
Cmdline
:
"python3 -m dynamo.vllm"
},
}
_
,
err
:=
ResolveManifestPIDsToObservedPIDs
(
processes
,
74
,
[]
int
{
1
,
750
})
if
err
==
nil
{
t
.
Fatal
(
"ResolveManifestPIDsToObservedPIDs(...) unexpectedly succeeded"
)
}
}
func
TestResolveManifestPIDsToObservedPIDsFailsWhenNamespaceDepthIsNotTwo
(
t
*
testing
.
T
)
{
processes
:=
[]
ProcessDetails
{
{
ObservedPID
:
50
,
ParentPID
:
0
,
OutermostPID
:
50
,
InnermostPID
:
50
,
NamespacePIDs
:
[]
int
{
50
},
Cmdline
:
"nsrestore"
},
{
ObservedPID
:
74
,
ParentPID
:
50
,
OutermostPID
:
74
,
InnermostPID
:
1
,
NamespacePIDs
:
[]
int
{
900
,
74
,
1
},
Cmdline
:
"python3 -m dynamo.vllm"
},
{
ObservedPID
:
80
,
ParentPID
:
74
,
OutermostPID
:
80
,
InnermostPID
:
750
,
NamespacePIDs
:
[]
int
{
900
,
80
,
750
},
Cmdline
:
"VLLM::EngineCore"
},
}
_
,
err
:=
ResolveManifestPIDsToObservedPIDs
(
processes
,
74
,
[]
int
{
1
,
750
})
if
err
==
nil
{
t
.
Fatal
(
"ResolveManifestPIDsToObservedPIDs(...) unexpectedly succeeded"
)
}
}
deploy/snapshot/pkg/cuda/cuda.go
View file @
ba9a8a9f
...
...
@@ -139,24 +139,18 @@ func BuildDeviceMap(sourceUUIDs, targetUUIDs []string) (string, error) {
}
// LockAndCheckpointProcessTree locks and checkpoints CUDA state for all given PIDs.
// On
partial
failure,
already-checkpointed PIDs are restored+unlocke
d.
// On failure,
the caller is expected to fail the operation and terminate the workloa
d.
func
LockAndCheckpointProcessTree
(
ctx
context
.
Context
,
cudaPIDs
[]
int
,
log
logr
.
Logger
)
error
{
locked
:=
make
([]
int
,
0
,
len
(
cudaPIDs
))
for
_
,
pid
:=
range
cudaPIDs
{
if
err
:=
lock
(
ctx
,
pid
,
log
);
err
!=
nil
{
bulkUnlock
(
context
.
Background
(),
locked
,
log
)
return
fmt
.
Errorf
(
"cuda lock failed for PID %d: %w"
,
pid
,
err
)
return
err
}
locked
=
append
(
locked
,
pid
)
}
checkpointed
:=
make
([]
int
,
0
,
len
(
cudaPIDs
))
for
_
,
pid
:=
range
cudaPIDs
{
if
err
:=
checkpoint
(
ctx
,
pid
,
log
);
err
!=
nil
{
recoverCheckpointed
(
context
.
Background
(),
checkpointed
,
locked
,
log
)
return
fmt
.
Errorf
(
"cuda checkpoint failed for PID %d: %w"
,
pid
,
err
)
return
err
}
checkpointed
=
append
(
checkpointed
,
pid
)
}
return
nil
...
...
@@ -166,7 +160,7 @@ func LockAndCheckpointProcessTree(ctx context.Context, cudaPIDs []int, log logr.
func
RestoreAndUnlockProcessTree
(
ctx
context
.
Context
,
cudaPIDs
[]
int
,
deviceMap
string
,
log
logr
.
Logger
)
error
{
for
_
,
pid
:=
range
cudaPIDs
{
if
err
:=
restoreProcess
(
ctx
,
pid
,
deviceMap
,
log
);
err
!=
nil
{
return
fmt
.
Errorf
(
"cuda restore failed for PID %d: %w"
,
pid
,
err
)
return
err
}
}
for
_
,
pid
:=
range
cudaPIDs
{
...
...
@@ -176,43 +170,8 @@ func RestoreAndUnlockProcessTree(ctx context.Context, cudaPIDs []int, deviceMap
log
.
Info
(
"cuda-checkpoint unlock returned error but process is already running"
,
"pid"
,
pid
)
continue
}
return
fmt
.
Errorf
(
"failed to unlock CUDA process %d: %w"
,
pid
,
err
)
return
err
}
}
return
nil
}
// bulkUnlock unlocks a list of CUDA PIDs (best-effort).
func
bulkUnlock
(
ctx
context
.
Context
,
pids
[]
int
,
log
logr
.
Logger
)
{
for
_
,
pid
:=
range
pids
{
if
err
:=
unlock
(
ctx
,
pid
,
log
);
err
!=
nil
{
log
.
Error
(
err
,
"Failed to unlock CUDA process"
,
"pid"
,
pid
)
}
}
}
// recoverCheckpointed is best-effort cleanup when checkpoint fails partway.
// Checkpointed PIDs need restore+unlock; locked-only PIDs just need unlock.
func
recoverCheckpointed
(
ctx
context
.
Context
,
checkpointed
,
locked
[]
int
,
log
logr
.
Logger
)
{
checkpointedSet
:=
make
(
map
[
int
]
struct
{},
len
(
checkpointed
))
for
_
,
pid
:=
range
checkpointed
{
checkpointedSet
[
pid
]
=
struct
{}{}
}
for
_
,
pid
:=
range
checkpointed
{
if
err
:=
restoreProcess
(
ctx
,
pid
,
""
,
log
);
err
!=
nil
{
log
.
Error
(
err
,
"Failed to restore CUDA process during cleanup"
,
"pid"
,
pid
)
continue
}
if
err
:=
unlock
(
ctx
,
pid
,
log
);
err
!=
nil
{
log
.
Error
(
err
,
"Failed to unlock CUDA process after restore during cleanup"
,
"pid"
,
pid
)
}
}
for
_
,
pid
:=
range
locked
{
if
_
,
ok
:=
checkpointedSet
[
pid
];
ok
{
continue
}
if
err
:=
unlock
(
ctx
,
pid
,
log
);
err
!=
nil
{
log
.
Error
(
err
,
"Failed to unlock CUDA process during cleanup"
,
"pid"
,
pid
)
}
}
}
deploy/snapshot/pkg/cuda/shim.go
View file @
ba9a8a9f
...
...
@@ -9,6 +9,8 @@ import (
"time"
"github.com/go-logr/logr"
"github.com/ai-dynamo/dynamo/deploy/snapshot/pkg/common"
)
const
(
...
...
@@ -55,15 +57,36 @@ func runAction(ctx context.Context, pid int, action, deviceMap string, log logr.
args
=
append
(
args
,
"--device-map"
,
deviceMap
)
}
cmd
:=
exec
.
CommandContext
(
ctx
,
cudaCheckpointBinary
,
args
...
)
details
:=
common
.
ProcessDetails
{
ObservedPID
:
pid
,
OutermostPID
:
pid
,
InnermostPID
:
pid
,
NamespacePIDs
:
[]
int
{
pid
},
}
if
process
,
err
:=
common
.
ReadProcessDetails
(
"/proc"
,
pid
);
err
==
nil
{
details
=
process
}
start
:=
time
.
Now
()
output
,
err
:=
cmd
.
CombinedOutput
()
duration
:=
time
.
Since
(
start
)
out
:=
strings
.
TrimSpace
(
string
(
output
))
if
err
!=
nil
{
log
.
Error
(
err
,
"cuda-checkpoint command failed"
,
"pid"
,
pid
,
"outermost_pid"
,
details
.
OutermostPID
,
"innermost_pid"
,
details
.
InnermostPID
,
"cmdline"
,
details
.
Cmdline
,
"action"
,
action
,
"duration"
,
duration
,
"output"
,
out
,
)
return
fmt
.
Errorf
(
"cuda-checkpoint %v failed for pid %d after %s: %w (output: %s)"
,
args
,
pid
,
duration
,
err
,
out
)
}
log
.
Info
(
"cuda-checkpoint command succeeded"
,
"pid"
,
pid
,
"outermost_pid"
,
details
.
OutermostPID
,
"innermost_pid"
,
details
.
InnermostPID
,
"cmdline"
,
details
.
Cmdline
,
"action"
,
action
,
"duration"
,
duration
,
"output"
,
out
,
...
...
deploy/snapshot/pkg/executor/checkpoint.go
View file @
ba9a8a9f
...
...
@@ -141,9 +141,23 @@ func inspectContainer(ctx context.Context, ctrd *containerd.Client, log logr.Log
// Discover CUDA processes and GPU UUIDs
allPIDs
:=
common
.
ProcessTreePIDs
(
pid
)
cudaPIDs
:=
cuda
.
FilterProcesses
(
ctx
,
allPIDs
,
log
)
cudaHostPIDs
:=
cuda
.
FilterProcesses
(
ctx
,
allPIDs
,
log
)
cudaNamespacePIDs
:=
make
([]
int
,
0
,
len
(
cudaHostPIDs
))
for
_
,
cudaHostPID
:=
range
cudaHostPIDs
{
process
,
err
:=
common
.
ReadProcessDetails
(
common
.
HostProcPath
,
cudaHostPID
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to read process details for CUDA process %d: %w"
,
cudaHostPID
,
err
)
}
if
len
(
process
.
NamespacePIDs
)
!=
2
{
return
nil
,
fmt
.
Errorf
(
"CUDA process %d has namespace depth %d, want 2"
,
cudaHostPID
,
len
(
process
.
NamespacePIDs
))
}
cudaNamespacePIDs
=
append
(
cudaNamespacePIDs
,
process
.
InnermostPID
)
}
if
len
(
cudaHostPIDs
)
>
0
{
log
.
Info
(
"Resolved checkpoint CUDA PID mapping"
,
"host_pids"
,
cudaHostPIDs
,
"namespace_pids"
,
cudaNamespacePIDs
)
}
var
gpuUUIDs
[]
string
if
len
(
cudaPIDs
)
>
0
{
if
len
(
cuda
Host
PIDs
)
>
0
{
gpuUUIDs
,
err
=
cuda
.
GetPodGPUUUIDs
(
ctx
,
req
.
PodName
,
req
.
PodNamespace
,
req
.
ContainerName
)
if
err
!=
nil
{
return
nil
,
fmt
.
Errorf
(
"failed to discover source GPU UUIDs: %w"
,
err
)
...
...
@@ -159,7 +173,8 @@ func inspectContainer(ctx context.Context, ctrd *containerd.Client, log logr.Log
NetNSInode
:
netNSInode
,
StdioFDs
:
stdioFDs
,
HostCgroupPath
:
hostCgroupPath
,
CUDAPIDs
:
cudaPIDs
,
CUDAHostPIDs
:
cudaHostPIDs
,
CUDANSPIDs
:
cudaNamespacePIDs
,
GPUUUIDs
:
gpuUUIDs
,
},
nil
}
...
...
@@ -182,8 +197,8 @@ func configureCheckpoint(
types
.
NewSourcePodManifest
(
req
.
ContainerID
,
state
.
PID
,
req
.
NodeName
,
req
.
PodName
,
req
.
PodNamespace
,
state
.
StdioFDs
),
types
.
NewOverlayManifest
(
cfg
.
Overlay
,
state
.
UpperDir
,
state
.
OCISpec
),
)
if
len
(
state
.
CUDAPIDs
)
>
0
{
m
.
CUDA
=
types
.
NewCUDAManifest
(
state
.
CUDAPIDs
,
state
.
GPUUUIDs
)
if
len
(
state
.
CUDA
NS
PIDs
)
>
0
{
m
.
CUDA
=
types
.
NewCUDAManifest
(
state
.
CUDA
NS
PIDs
,
state
.
GPUUUIDs
)
}
if
err
:=
types
.
WriteManifest
(
checkpointDir
,
m
);
err
!=
nil
{
...
...
@@ -195,8 +210,8 @@ func configureCheckpoint(
func
captureCheckpoint
(
ctx
context
.
Context
,
criuOpts
*
criurpc
.
CriuOpts
,
criuSettings
*
types
.
CRIUSettings
,
data
*
types
.
CheckpointManifest
,
state
*
types
.
CheckpointContainerSnapshot
,
checkpointDir
string
,
log
logr
.
Logger
)
(
time
.
Duration
,
error
)
{
// CUDA lock+checkpoint must happen before CRIU dump
if
len
(
state
.
CUDAPIDs
)
>
0
{
if
err
:=
cuda
.
LockAndCheckpointProcessTree
(
ctx
,
state
.
CUDAPIDs
,
log
);
err
!=
nil
{
if
len
(
state
.
CUDA
Host
PIDs
)
>
0
{
if
err
:=
cuda
.
LockAndCheckpointProcessTree
(
ctx
,
state
.
CUDA
Host
PIDs
,
log
);
err
!=
nil
{
return
0
,
fmt
.
Errorf
(
"CUDA checkpoint failed: %w"
,
err
)
}
}
...
...
deploy/snapshot/pkg/executor/nsrestore.go
View file @
ba9a8a9f
...
...
@@ -85,15 +85,40 @@ func executeRestore(ctx context.Context, criuOpts *criurpc.CriuOpts, m *types.Ch
if
err
!=
nil
{
return
0
,
err
}
processes
,
err
:=
common
.
ReadProcessTable
(
"/proc"
)
if
err
!=
nil
{
return
0
,
fmt
.
Errorf
(
"failed to read restored process table: %w"
,
err
)
}
log
.
Info
(
"Restored process table snapshot"
,
"proc_root"
,
"/proc"
,
"criu_callback_pid"
,
restoredPID
,
"process_count"
,
len
(
processes
),
"manifest_cuda_pids"
,
m
.
CUDA
.
PIDs
,
)
for
_
,
process
:=
range
processes
{
log
.
Info
(
"Restored process entry"
,
"observed_pid"
,
process
.
ObservedPID
,
"parent_pid"
,
process
.
ParentPID
,
"outermost_pid"
,
process
.
OutermostPID
,
"innermost_pid"
,
process
.
InnermostPID
,
"namespace_pids"
,
process
.
NamespacePIDs
,
"cmdline"
,
process
.
Cmdline
,
)
}
// CUDA restore — discover PIDs in the restored process tree, then restore+unlock
// CUDA restore — remap checkpoint-time innermost namespace PIDs onto the
// current visible restored PIDs before invoking cuda-checkpoint.
if
!
m
.
CUDA
.
IsEmpty
()
{
candidates
:=
common
.
ProcessTreePIDs
(
int
(
restoredPID
))
cudaPIDs
:=
cuda
.
FilterProcesses
(
ctx
,
candidates
,
log
)
if
len
(
cudaPIDs
)
==
0
{
return
0
,
fmt
.
Errorf
(
"checkpoint has %d CUDA PIDs but none found in restored process tree"
,
len
(
m
.
CUDA
.
PIDs
))
restorePIDs
,
err
:=
common
.
ResolveManifestPIDsToObservedPIDs
(
processes
,
int
(
restoredPID
),
m
.
CUDA
.
PIDs
)
if
err
!=
nil
{
return
0
,
fmt
.
Errorf
(
"failed to resolve restored CUDA PIDs: %w"
,
err
)
}
if
err
:=
cuda
.
RestoreAndUnlockProcessTree
(
ctx
,
cudaPIDs
,
opts
.
CUDADeviceMap
,
log
);
err
!=
nil
{
log
.
Info
(
"Resolved manifest CUDA PIDs to current restore PIDs"
,
"manifest_cuda_pids"
,
m
.
CUDA
.
PIDs
,
"restored_cuda_pids"
,
restorePIDs
,
"criu_callback_pid"
,
restoredPID
,
)
if
err
:=
cuda
.
RestoreAndUnlockProcessTree
(
ctx
,
restorePIDs
,
opts
.
CUDADeviceMap
,
log
);
err
!=
nil
{
return
0
,
fmt
.
Errorf
(
"CUDA restore failed: %w"
,
err
)
}
}
...
...
deploy/snapshot/pkg/types/inspect.go
View file @
ba9a8a9f
...
...
@@ -25,7 +25,8 @@ type CheckpointContainerSnapshot struct {
NetNSInode
uint64
StdioFDs
[]
string
// readlink targets for FDs 0, 1, 2 (e.g. "pipe:[12345]")
HostCgroupPath
string
// host filesystem path for CRIU's --freeze-cgroup
CUDAPIDs
[]
int
// PIDs with CUDA state in the container
CUDAHostPIDs
[]
int
// host-visible PIDs used for checkpoint-side CUDA actions
CUDANSPIDs
[]
int
// namespace-relative PIDs stored in the checkpoint manifest
GPUUUIDs
[]
string
// source GPU UUIDs from kubelet PodResources API
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment