Unverified Commit 45cacbaf authored by Daniel Hiltgen's avatar Daniel Hiltgen Committed by GitHub
Browse files

Merge pull request #4517 from dhiltgen/gpu_incremental

Enhanced GPU discovery and multi-gpu support with concurrency
parents 6b800aa7 17df6520
...@@ -11,7 +11,8 @@ import ( ...@@ -11,7 +11,8 @@ import (
) )
func TestContextExhaustion(t *testing.T) { func TestContextExhaustion(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Minute) // TODO maybe shorter? // Longer needed for small footprint GPUs
ctx, cancel := context.WithTimeout(context.Background(), 6*time.Minute)
defer cancel() defer cancel()
// Set up the test data // Set up the test data
req := api.GenerateRequest{ req := api.GenerateRequest{
......
...@@ -32,7 +32,11 @@ func TestIntegrationMultimodal(t *testing.T) { ...@@ -32,7 +32,11 @@ func TestIntegrationMultimodal(t *testing.T) {
resp := "the ollam" resp := "the ollam"
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute) ctx, cancel := context.WithTimeout(context.Background(), 3*time.Minute)
defer cancel() defer cancel()
GenerateTestHelper(ctx, t, req, []string{resp}) client, _, cleanup := InitServerConnection(ctx, t)
defer cleanup()
require.NoError(t, PullIfMissing(ctx, client, req.Model))
// llava models on CPU can be quite slow to start,
DoGenerate(ctx, t, client, req, []string{resp}, 120*time.Second, 30*time.Second)
} }
const imageEncoding = `iVBORw0KGgoAAAANSUhEUgAAANIAAAB4CAYAAACHHqzKAAAAAXNSR0IArs4c6QAAAIRlWElmTU0AKgAAAAgABQESAAMAAAABAAEAAAEaAAUAAAABAAAASgEb const imageEncoding = `iVBORw0KGgoAAAANSUhEUgAAANIAAAB4CAYAAACHHqzKAAAAAXNSR0IArs4c6QAAAIRlWElmTU0AKgAAAAgABQESAAMAAAABAAEAAAEaAAUAAAABAAAASgEb
......
...@@ -140,7 +140,7 @@ func PullIfMissing(ctx context.Context, client *api.Client, modelName string) er ...@@ -140,7 +140,7 @@ func PullIfMissing(ctx context.Context, client *api.Client, modelName string) er
showCtx, cancel := context.WithDeadlineCause( showCtx, cancel := context.WithDeadlineCause(
ctx, ctx,
time.Now().Add(5*time.Second), time.Now().Add(10*time.Second),
fmt.Errorf("show for existing model %s took too long", modelName), fmt.Errorf("show for existing model %s took too long", modelName),
) )
defer cancel() defer cancel()
...@@ -290,6 +290,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -290,6 +290,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
Model: "orca-mini", Model: "orca-mini",
Prompt: "why is the ocean blue?", Prompt: "why is the ocean blue?",
Stream: &stream, Stream: &stream,
KeepAlive: &api.Duration{Duration: 10 * time.Second},
Options: map[string]interface{}{ Options: map[string]interface{}{
"seed": 42, "seed": 42,
"temperature": 0.0, "temperature": 0.0,
...@@ -298,6 +299,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -298,6 +299,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
Model: "orca-mini", Model: "orca-mini",
Prompt: "why is the color of dirt brown?", Prompt: "why is the color of dirt brown?",
Stream: &stream, Stream: &stream,
KeepAlive: &api.Duration{Duration: 10 * time.Second},
Options: map[string]interface{}{ Options: map[string]interface{}{
"seed": 42, "seed": 42,
"temperature": 0.0, "temperature": 0.0,
...@@ -306,6 +308,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -306,6 +308,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
Model: "orca-mini", Model: "orca-mini",
Prompt: "what is the origin of the us thanksgiving holiday?", Prompt: "what is the origin of the us thanksgiving holiday?",
Stream: &stream, Stream: &stream,
KeepAlive: &api.Duration{Duration: 10 * time.Second},
Options: map[string]interface{}{ Options: map[string]interface{}{
"seed": 42, "seed": 42,
"temperature": 0.0, "temperature": 0.0,
...@@ -314,6 +317,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -314,6 +317,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
Model: "orca-mini", Model: "orca-mini",
Prompt: "what is the origin of independence day?", Prompt: "what is the origin of independence day?",
Stream: &stream, Stream: &stream,
KeepAlive: &api.Duration{Duration: 10 * time.Second},
Options: map[string]interface{}{ Options: map[string]interface{}{
"seed": 42, "seed": 42,
"temperature": 0.0, "temperature": 0.0,
...@@ -322,6 +326,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -322,6 +326,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
Model: "orca-mini", Model: "orca-mini",
Prompt: "what is the composition of air?", Prompt: "what is the composition of air?",
Stream: &stream, Stream: &stream,
KeepAlive: &api.Duration{Duration: 10 * time.Second},
Options: map[string]interface{}{ Options: map[string]interface{}{
"seed": 42, "seed": 42,
"temperature": 0.0, "temperature": 0.0,
...@@ -331,7 +336,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) { ...@@ -331,7 +336,7 @@ func GenerateRequests() ([]api.GenerateRequest, [][]string) {
[][]string{ [][]string{
[]string{"sunlight"}, []string{"sunlight"},
[]string{"soil", "organic", "earth", "black", "tan"}, []string{"soil", "organic", "earth", "black", "tan"},
[]string{"england", "english", "massachusetts", "pilgrims"}, []string{"england", "english", "massachusetts", "pilgrims", "british"},
[]string{"fourth", "july", "declaration", "independence"}, []string{"fourth", "july", "declaration", "independence"},
[]string{"nitrogen", "oxygen", "carbon", "dioxide"}, []string{"nitrogen", "oxygen", "carbon", "dioxide"},
} }
......
...@@ -2335,9 +2335,9 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g ...@@ -2335,9 +2335,9 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g
invalid_param = true; invalid_param = true;
break; break;
} }
#ifndef GGML_USE_CUBLAS #ifndef GGML_USE_CUDA
fprintf(stderr, "warning: llama.cpp was compiled without cuBLAS. Setting the split mode has no effect.\n"); fprintf(stderr, "warning: llama.cpp was compiled without CUDA. Setting the split mode has no effect.\n");
#endif // GGML_USE_CUBLAS #endif // GGML_USE_CUDA
} }
else if (arg == "--tensor-split" || arg == "-ts") else if (arg == "--tensor-split" || arg == "-ts")
{ {
...@@ -2346,7 +2346,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g ...@@ -2346,7 +2346,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g
invalid_param = true; invalid_param = true;
break; break;
} }
#if defined(GGML_USE_CUBLAS) || defined(GGML_USE_SYCL) #if defined(GGML_USE_CUDA) || defined(GGML_USE_SYCL)
std::string arg_next = argv[i]; std::string arg_next = argv[i];
// split string by , and / // split string by , and /
...@@ -2367,8 +2367,8 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g ...@@ -2367,8 +2367,8 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g
} }
} }
#else #else
LOG_WARNING("llama.cpp was compiled without cuBLAS. It is not possible to set a tensor split.\n", {}); LOG_WARNING("llama.cpp was compiled without CUDA. It is not possible to set a tensor split.\n", {});
#endif // GGML_USE_CUBLAS #endif // GGML_USE_CUDA
} }
else if (arg == "--main-gpu" || arg == "-mg") else if (arg == "--main-gpu" || arg == "-mg")
{ {
...@@ -2377,7 +2377,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g ...@@ -2377,7 +2377,7 @@ static void server_params_parse(int argc, char **argv, server_params &sparams, g
invalid_param = true; invalid_param = true;
break; break;
} }
#if defined(GGML_USE_CUBLAS) || defined(GGML_USE_SYCL) #if defined(GGML_USE_CUDA) || defined(GGML_USE_SYCL)
params.main_gpu = std::stoi(argv[i]); params.main_gpu = std::stoi(argv[i]);
#else #else
LOG_WARNING("llama.cpp was compiled without cuBLAS. It is not possible to set a main GPU.", {}); LOG_WARNING("llama.cpp was compiled without cuBLAS. It is not possible to set a main GPU.", {});
......
...@@ -307,6 +307,7 @@ func (llm GGML) GraphSize(context, batch uint64) (partialOffload, fullOffload ui ...@@ -307,6 +307,7 @@ func (llm GGML) GraphSize(context, batch uint64) (partialOffload, fullOffload ui
partialOffload = 4 * batch * embedding partialOffload = 4 * batch * embedding
partialOffload += max( partialOffload += max(
// 4*batch*(4+6*embedding+context*(2*heads)+llm.KV().GQA()),
4*batch*(1+embedding+max(context, embedding))+embedding*embedding*9/16+4*context*(batch*heads+embedding/heads*headsKV), 4*batch*(1+embedding+max(context, embedding))+embedding*embedding*9/16+4*context*(batch*heads+embedding/heads*headsKV),
4*batch*(embedding+vocab)+embedding*vocab*105/128, 4*batch*(embedding+vocab)+embedding*vocab*105/128,
) )
......
package llm package llm
import ( import (
"fmt"
"log/slog" "log/slog"
"strconv"
"strings"
"github.com/ollama/ollama/api" "github.com/ollama/ollama/api"
"github.com/ollama/ollama/envconfig"
"github.com/ollama/ollama/format" "github.com/ollama/ollama/format"
"github.com/ollama/ollama/gpu" "github.com/ollama/ollama/gpu"
) )
...@@ -16,7 +16,8 @@ func PredictServerFit(allGpus gpu.GpuInfoList, ggml *GGML, adapters, projectors ...@@ -16,7 +16,8 @@ func PredictServerFit(allGpus gpu.GpuInfoList, ggml *GGML, adapters, projectors
var estimatedVRAM uint64 var estimatedVRAM uint64
for _, gpus := range allGpus.ByLibrary() { for _, gpus := range allGpus.ByLibrary() {
var layerCount int var layerCount int
layerCount, estimatedVRAM, _ = EstimateGPULayers(gpus, ggml, projectors, opts) estimate := EstimateGPULayers(gpus, ggml, projectors, opts)
layerCount, estimatedVRAM = estimate.Layers, estimate.VRAMSize
if opts.NumGPU < 0 { if opts.NumGPU < 0 {
if layerCount > 0 && layerCount >= int(ggml.KV().BlockCount()+1) { if layerCount > 0 && layerCount >= int(ggml.KV().BlockCount()+1) {
return true, estimatedVRAM return true, estimatedVRAM
...@@ -30,24 +31,64 @@ func PredictServerFit(allGpus gpu.GpuInfoList, ggml *GGML, adapters, projectors ...@@ -30,24 +31,64 @@ func PredictServerFit(allGpus gpu.GpuInfoList, ggml *GGML, adapters, projectors
return false, estimatedVRAM return false, estimatedVRAM
} }
type MemoryEstimate struct {
// How many layers we predict we can load
Layers int
// The size of the graph which occupies the main GPU
Graph uint64
// How much VRAM will be allocated given the number of layers we predict
VRAMSize uint64
// The total size of the model if loaded into VRAM. If all layers are loaded, VRAMSize == TotalSize
TotalSize uint64
// For multi-GPU scenarios, this provides the tensor split parameter
TensorSplit string
// For multi-GPU scenarios, this is the size in bytes per GPU
GPUSizes []uint64
}
// Given a model and one or more GPU targets, predict how many layers and bytes we can load, and the total size // Given a model and one or more GPU targets, predict how many layers and bytes we can load, and the total size
// The GPUs provided must all be the same Library // The GPUs provided must all be the same Library
func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts api.Options) (int, uint64, uint64) { func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts api.Options) MemoryEstimate {
var memoryAvailable uint64 // Graph size for a partial offload, applies to all GPUs
for _, info := range gpus { var graphPartialOffload uint64
memoryAvailable += info.FreeMemory
} // Graph size when all layers are offloaded, applies to all GPUs
if envconfig.MaxVRAM > 0 { var graphFullOffload uint64
memoryAvailable = envconfig.MaxVRAM
} // Final graph offload once we know full or partial
var graphOffload uint64
// Projectors loaded into GPU0 only
var projectorSize uint64
// Conditional output size on GPU 0
var memoryLayerOutput uint64
// The sizes of a layer
var layerSize uint64
slog.Debug("evaluating", "library", gpus[0].Library, "gpu_count", len(gpus), "available", format.HumanBytes2(memoryAvailable)) // The sum of all the layer sizes (just for logging)
var memoryWeights uint64
// TODO - this is probably wrong, first GPU vs secondaries will have different overheads // True if all the layers are loaded
memoryMinimum := gpus[0].MinimumMemory var fullyLoaded bool
// Overflow that didn't fit into the GPU
var overflow uint64
availableList := make([]string, len(gpus))
for i, gpu := range gpus {
availableList[i] = format.HumanBytes2(gpu.FreeMemory)
}
slog.Debug("evaluating", "library", gpus[0].Library, "gpu_count", len(gpus), "available", availableList)
for _, projector := range projectors { for _, projector := range projectors {
memoryMinimum += projectorMemoryRequirements(projector) projectorSize += projectorMemoryRequirements(projector)
// multimodal models require at least 2048 context // multimodal models require at least 2048 context
opts.NumCtx = max(opts.NumCtx, 2048) opts.NumCtx = max(opts.NumCtx, 2048)
...@@ -56,79 +97,160 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts ...@@ -56,79 +97,160 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
layers := ggml.Tensors().Layers() layers := ggml.Tensors().Layers()
// add one layer worth of memory as a buffer // add one layer worth of memory as a buffer
if blk0, ok := layers["blk.0"]; ok { if blk0, ok := layers["blk.0"]; ok {
memoryMinimum += blk0.size() layerSize = blk0.size()
} else {
slog.Warn("model missing blk.0 layer size")
} }
// fp16 k,v = (1 (k) + 1 (v)) * sizeof(float16) * n_ctx * n_layer * n_embd / n_head * n_head_kv // fp16 k,v = (1 (k) + 1 (v)) * sizeof(float16) * n_ctx * n_layer * n_embd / n_head * n_head_kv
var kv uint64 = 2 * 2 * uint64(opts.NumCtx) * ggml.KV().BlockCount() * ggml.KV().EmbeddingLength() / ggml.KV().HeadCount() * ggml.KV().HeadCountKV() var kv uint64 = 2 * 2 * uint64(opts.NumCtx) * ggml.KV().BlockCount() * ggml.KV().EmbeddingLength() / ggml.KV().HeadCount() * ggml.KV().HeadCountKV()
graphPartialOffload, graphFullOffload := ggml.GraphSize(uint64(opts.NumCtx), uint64(min(opts.NumCtx, opts.NumBatch))) // KV is proportional to the number of layers
layerSize += kv / ggml.KV().BlockCount()
graphPartialOffload, graphFullOffload = ggml.GraphSize(uint64(opts.NumCtx), uint64(min(opts.NumCtx, opts.NumBatch)))
if graphPartialOffload == 0 { if graphPartialOffload == 0 {
graphPartialOffload = ggml.KV().GQA() * kv / 6 graphPartialOffload = ggml.KV().GQA() * kv / 6
} }
if graphFullOffload == 0 { if graphFullOffload == 0 {
graphFullOffload = graphPartialOffload graphFullOffload = graphPartialOffload
} }
graphFullOffload *= uint64(len(gpus))
graphPartialOffload *= uint64(len(gpus))
// on metal there's no partial offload overhead // on metal there's no partial offload overhead
if gpus[0].Library == "metal" { if gpus[0].Library == "metal" {
graphPartialOffload = graphFullOffload graphPartialOffload = graphFullOffload
} else if len(gpus) > 1 {
// multigpu should always use the partial graph size
graphFullOffload = graphPartialOffload
} }
// memoryRequiredTotal represents the memory required for full GPU offloading (all layers)
memoryRequiredTotal := memoryMinimum + graphFullOffload
// memoryRequiredPartial represents the memory required for partial GPU offloading (n > 0, n < layers)
memoryRequiredPartial := memoryMinimum + graphPartialOffload
var memoryLayerOutput uint64
if layer, ok := layers["output_norm"]; ok { if layer, ok := layers["output_norm"]; ok {
memoryLayerOutput += layer.size() memoryLayerOutput += layer.size()
} }
if layer, ok := layers["output"]; ok { if layer, ok := layers["output"]; ok {
memoryLayerOutput += layer.size() memoryLayerOutput += layer.size()
} else if layer, ok := layers["token_embd"]; ok { } else if layer, ok := layers["token_embd"]; ok {
memoryLayerOutput += layer.size() memoryLayerOutput += layer.size()
} }
if gpus[0].Library == "metal" && opts.UseMMap { // Output layer handled at the end if we have space
// memory is preallocated for output tensors gpuZeroOverhead := projectorSize
memoryRequiredTotal += memoryLayerOutput
memoryRequiredPartial += memoryLayerOutput
}
// Reduce set of GPUs to only those that have sufficient space to fit overhead and at least one layer
var layerCount int var layerCount int
layerCounts := make([]int, len(gpus))
gpuAllocations := make([]uint64, len(gpus))
type gs struct {
i int
g *gpu.GpuInfo
}
gpusWithSpace := []gs{}
for i := range gpus {
var gzo uint64
if len(gpusWithSpace) == 0 {
gzo = gpuZeroOverhead
}
// Only include GPUs that can fit the graph, gpu minimum, the layer buffer and at least more layer
if gpus[i].FreeMemory < gzo+max(graphPartialOffload, graphFullOffload)+gpus[i].MinimumMemory+2*layerSize {
slog.Debug("gpu has too little memory to allocate any layers", "gpu", gpus[i])
continue
}
gpusWithSpace = append(gpusWithSpace, gs{i, &gpus[i]})
gpuAllocations[i] += gpus[i].MinimumMemory + layerSize // We hold off on graph until we know partial vs. full
}
var gpuZeroID int
if len(gpusWithSpace) > 0 {
gpuZeroID = gpusWithSpace[0].i
gpuAllocations[gpuZeroID] += gpuZeroOverhead
}
// For all the layers, find where they can fit on the GPU(s)
for i := range int(ggml.KV().BlockCount()) { for i := range int(ggml.KV().BlockCount()) {
if blk, ok := layers[fmt.Sprintf("blk.%d", i)]; ok { memoryWeights += layerSize
memoryLayer := blk.size()
// KV is proportional to the number of layers if opts.NumGPU >= 0 && layerCount >= opts.NumGPU {
memoryLayer += kv / ggml.KV().BlockCount() // Stop allocating on GPU(s) once we hit the users target NumGPU
continue
}
// distribute the layers across the GPU(s) that have space
for j := len(gpusWithSpace); j > 0; j-- {
g := gpusWithSpace[i%j]
used := gpuAllocations[g.i] + max(graphPartialOffload, graphFullOffload)
if g.g.FreeMemory > used+layerSize {
gpuAllocations[g.i] += layerSize
layerCounts[g.i]++
layerCount++
break
} else {
gpusWithSpace = append(gpusWithSpace[:i%j], gpusWithSpace[i%j+1:]...)
}
}
}
if layerCount >= int(ggml.KV().BlockCount()) {
fullyLoaded = true
} else {
for i := layerCount; i < int(ggml.KV().BlockCount()); i++ {
overflow += layerSize
}
}
memoryRequiredTotal += memoryLayer // Determine if we need to consider output then find where it fits
if (opts.NumGPU >= 0 && layerCount+1 <= opts.NumGPU) || (opts.NumGPU < 0 && memoryAvailable > memoryRequiredPartial+memoryLayer) { if memoryLayerOutput > 0 && (opts.NumGPU < 0 || layerCount < opts.NumGPU) {
memoryRequiredPartial += memoryLayer for j := len(gpusWithSpace); j > 0; j-- {
g := gpusWithSpace[layerCount%j]
used := gpuAllocations[g.i] + max(graphPartialOffload, graphFullOffload)
if g.g.FreeMemory > used+memoryLayerOutput {
gpuAllocations[g.i] += memoryLayerOutput
layerCounts[g.i]++
layerCount++ layerCount++
break
} }
} }
if layerCount < int(ggml.KV().BlockCount())+1 {
fullyLoaded = false
overflow += memoryLayerOutput
}
} }
if gpus[0].Library != "metal" || !opts.UseMMap { // Add the applicable (full or partial) graph allocations
// memory was not preallocated for output tensors for i := range gpus {
memoryRequiredTotal += memoryLayerOutput if layerCounts[i] <= 0 {
continue
}
if fullyLoaded {
gpuAllocations[i] += graphFullOffload
} else {
gpuAllocations[i] += graphPartialOffload
}
}
if fullyLoaded {
graphOffload = graphFullOffload
} else {
graphOffload = graphPartialOffload
} }
if (opts.NumGPU >= 0 && layerCount+1 <= opts.NumGPU) || (opts.NumGPU < 0 && memoryAvailable > memoryRequiredTotal) { // Summaries for the log
layerCount = int(ggml.KV().BlockCount()) + 1 var memoryRequiredPartial, memoryRequiredTotal uint64
memoryRequiredPartial = memoryRequiredTotal for i := range gpuAllocations {
memoryRequiredPartial += gpuAllocations[i]
} }
memoryRequiredTotal = memoryRequiredPartial + overflow
memoryWeights := memoryRequiredTotal - memoryMinimum - graphFullOffload - kv tensorSplit := ""
if len(gpus) > 1 {
splits := make([]string, len(gpus))
for i, count := range layerCounts {
splits[i] = strconv.Itoa(count)
}
tensorSplit = strings.Join(splits, ",")
}
allocationsList := []string{}
for _, a := range gpuAllocations {
allocationsList = append(allocationsList, format.HumanBytes2(a))
}
slog.Info( slog.Info(
"offload to gpu", "offload to gpu",
...@@ -136,13 +258,17 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts ...@@ -136,13 +258,17 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
"layers", "layers",
// requested number of layers to offload // requested number of layers to offload
"requested", opts.NumGPU, "requested", opts.NumGPU,
// The number of layers the model has (including output)
"model", int(ggml.KV().BlockCount())+1,
// estimated number of layers that can be offloaded // estimated number of layers that can be offloaded
"real", layerCount, "offload", layerCount,
// multi-gpu split for tesnors
"split", tensorSplit,
), ),
slog.Group( slog.Group(
"memory", "memory",
// memory available for offloading // memory available by GPU for offloading
"available", format.HumanBytes2(memoryAvailable), "available", availableList,
slog.Group( slog.Group(
"required", "required",
// memory required for full offloading // memory required for full offloading
...@@ -151,6 +277,8 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts ...@@ -151,6 +277,8 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
"partial", format.HumanBytes2(memoryRequiredPartial), "partial", format.HumanBytes2(memoryRequiredPartial),
// memory of KV cache // memory of KV cache
"kv", format.HumanBytes2(kv), "kv", format.HumanBytes2(kv),
// Allocations across the GPUs
"allocations", allocationsList,
), ),
slog.Group( slog.Group(
"weights", "weights",
...@@ -171,12 +299,31 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts ...@@ -171,12 +299,31 @@ func EstimateGPULayers(gpus []gpu.GpuInfo, ggml *GGML, projectors []string, opts
), ),
) )
if gpus[0].Library == "cpu" { if gpus[0].Library == "cpu" {
return 0, 0, memoryRequiredTotal return MemoryEstimate{
Layers: 0,
Graph: 0,
VRAMSize: 0,
TotalSize: memoryRequiredTotal,
GPUSizes: []uint64{},
} }
if memoryRequiredPartial > memoryAvailable { }
if layerCount == 0 {
slog.Debug("insufficient VRAM to load any model layers") slog.Debug("insufficient VRAM to load any model layers")
return 0, 0, memoryRequiredTotal return MemoryEstimate{
Layers: 0,
Graph: 0,
VRAMSize: 0,
TotalSize: memoryRequiredTotal,
GPUSizes: []uint64{},
}
} }
return layerCount, memoryRequiredPartial, memoryRequiredTotal return MemoryEstimate{
Layers: layerCount,
Graph: graphOffload,
VRAMSize: memoryRequiredPartial,
TotalSize: memoryRequiredTotal,
TensorSplit: tensorSplit,
GPUSizes: gpuAllocations,
}
} }
package llm
import (
"bytes"
"encoding/binary"
"fmt"
"os"
"testing"
"github.com/ollama/ollama/api"
"github.com/ollama/ollama/envconfig"
"github.com/ollama/ollama/gpu"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func TestEstimateGPULayers(t *testing.T) {
envconfig.Debug = true
modelName := "dummy"
f, err := os.CreateTemp(t.TempDir(), modelName)
require.NoError(t, err)
defer f.Close()
gguf := NewGGUFV3(binary.LittleEndian)
inputLayerCount := 5
tensors := []Tensor{
{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "blk.1.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "blk.2.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "blk.3.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "blk.4.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
}
assert.Len(t, tensors, inputLayerCount+1)
err = gguf.Encode(f, KV{
"general.architecture": "llama",
"general.name": "name",
"llama.context_length": uint32(32),
"llama.embedding_length": uint32(4096),
"llama.block_count": uint32(inputLayerCount),
"llama.attention.head_count": uint32(32),
"llama.attention.head_count_kv": uint32(32),
"tokenizer.ggml.tokens": []string{" "},
"tokenizer.ggml.scores": []float32{0},
"tokenizer.ggml.token_type": []int32{0},
}, tensors)
require.NoError(t, err)
ggml, err := LoadModel(f.Name())
require.NoError(t, err)
// Simple CPU scenario
gpus := []gpu.GpuInfo{
{
Library: "cpu",
},
}
projectors := []string{}
opts := api.DefaultOptions()
t.Run("cpu", func(t *testing.T) {
estimate := EstimateGPULayers(gpus, ggml, projectors, opts)
assert.Equal(t, 0, estimate.Layers)
assert.Equal(t, uint64(0), estimate.Graph)
})
// derived from the dummy ggml file above
graphPartialOffload := uint64(202377216)
graphFullOffload := uint64(171968512)
layerSize := uint64(33554436)
projectorSize := uint64(0)
memoryLayerOutput := uint64(4)
// Dual CUDA scenario with assymetry
gpuMinimumMemory := uint64(2048)
gpus = []gpu.GpuInfo{
{
Library: "cuda",
MinimumMemory: gpuMinimumMemory,
},
{
Library: "cuda",
MinimumMemory: gpuMinimumMemory,
},
}
// Nested array: GPU0 layer space, GPU1 layer space, expected gpu0, expected gpu1
for i, s := range []struct {
layer0, layer1 uint64
expect0, expect1 uint64
}{
{1, 1, 1, 1},
{2, 1, 2, 1},
{2, 2, 2, 2},
{1, 2, 1, 2},
{3, 3, 3, 3},
{4, 4, 3, 3},
{6, 6, 3, 3},
{0, 3, 0, 3},
} {
t.Run(fmt.Sprintf("%v", s), func(t *testing.T) {
gpus[0].FreeMemory = 0
gpus[1].FreeMemory = 0
gpus[0].FreeMemory += projectorSize
if s.layer0 > 0 {
gpus[0].FreeMemory += memoryLayerOutput
} else {
gpus[1].FreeMemory += memoryLayerOutput
}
gpus[0].FreeMemory += gpuMinimumMemory + layerSize + s.layer0*layerSize + 1
gpus[1].FreeMemory += gpuMinimumMemory + layerSize + s.layer1*layerSize + 1
gpus[0].FreeMemory += max(graphFullOffload, graphPartialOffload)
gpus[1].FreeMemory += max(graphFullOffload, graphPartialOffload)
estimate := EstimateGPULayers(gpus, ggml, projectors, opts)
assert.Equal(t, int(s.expect0+s.expect1), estimate.Layers, "scenario %d: %v", i, s)
assert.Equal(t, fmt.Sprintf("%d,%d", s.expect0, s.expect1), estimate.TensorSplit, "scenario %d: %v", i, s)
var layerSums uint64
for _, b := range estimate.GPUSizes {
layerSums += b
}
if estimate.Layers < inputLayerCount+1 {
assert.Less(t, estimate.VRAMSize, estimate.TotalSize, "scenario %d: %v %+v", i, s, estimate)
assert.Equal(t, estimate.VRAMSize, layerSums, "scenario %d: %v %+v", i, s, estimate)
} else {
assert.Equal(t, estimate.VRAMSize, estimate.TotalSize, "scenario %d: %v %+v", i, s, estimate)
assert.Equal(t, estimate.TotalSize, layerSums, "scenario %d: %v %+v", i, s, estimate)
}
})
}
}
...@@ -82,8 +82,8 @@ func serversForGpu(info gpu.GpuInfo) []string { ...@@ -82,8 +82,8 @@ func serversForGpu(info gpu.GpuInfo) []string {
// glob workDir for files that start with ollama_ // glob workDir for files that start with ollama_
availableServers := availableServers() availableServers := availableServers()
requested := info.Library requested := info.Library
if info.Variant != "" { if info.Variant != gpu.CPUCapabilityNone {
requested += "_" + info.Variant requested += "_" + info.Variant.String()
} }
servers := []string{} servers := []string{}
...@@ -117,14 +117,14 @@ func serversForGpu(info gpu.GpuInfo) []string { ...@@ -117,14 +117,14 @@ func serversForGpu(info gpu.GpuInfo) []string {
// Load up the best CPU variant if not primary requested // Load up the best CPU variant if not primary requested
if info.Library != "cpu" { if info.Library != "cpu" {
variant := gpu.GetCPUVariant() variant := gpu.GetCPUCapability()
// If no variant, then we fall back to default // If no variant, then we fall back to default
// If we have a variant, try that if we find an exact match // If we have a variant, try that if we find an exact match
// Attempting to run the wrong CPU instructions will panic the // Attempting to run the wrong CPU instructions will panic the
// process // process
if variant != "" { if variant != gpu.CPUCapabilityNone {
for cmp := range availableServers { for cmp := range availableServers {
if cmp == "cpu_"+variant { if cmp == "cpu_"+variant.String() {
servers = append(servers, cmp) servers = append(servers, cmp)
break break
} }
...@@ -146,11 +146,11 @@ func serverForCpu() string { ...@@ -146,11 +146,11 @@ func serverForCpu() string {
if runtime.GOOS == "darwin" && runtime.GOARCH == "arm64" { if runtime.GOOS == "darwin" && runtime.GOARCH == "arm64" {
return "metal" return "metal"
} }
variant := gpu.GetCPUVariant() variant := gpu.GetCPUCapability()
availableServers := availableServers() availableServers := availableServers()
if variant != "" { if variant != gpu.CPUCapabilityNone {
for cmp := range availableServers { for cmp := range availableServers {
if cmp == "cpu_"+variant { if cmp == "cpu_"+variant.String() {
return cmp return cmp
} }
} }
......
...@@ -37,8 +37,9 @@ type LlamaServer interface { ...@@ -37,8 +37,9 @@ type LlamaServer interface {
Tokenize(ctx context.Context, content string) ([]int, error) Tokenize(ctx context.Context, content string) ([]int, error)
Detokenize(ctx context.Context, tokens []int) (string, error) Detokenize(ctx context.Context, tokens []int) (string, error)
Close() error Close() error
EstimatedVRAM() uint64 EstimatedVRAM() uint64 // Total VRAM across all GPUs
EstimatedTotal() uint64 EstimatedTotal() uint64
EstimatedVRAMByGPU(gpuID string) uint64
} }
// llmServer is an instance of the llama.cpp server // llmServer is an instance of the llama.cpp server
...@@ -49,11 +50,10 @@ type llmServer struct { ...@@ -49,11 +50,10 @@ type llmServer struct {
status *StatusWriter status *StatusWriter
options api.Options options api.Options
// TODO - this should be broken down by GPU estimate MemoryEstimate
estimatedVRAM uint64 // Estimated usage of VRAM by the loaded model
estimatedTotal uint64 // Total size of model
totalLayers uint64 totalLayers uint64
gpuCount int // gpuCount int
gpus gpu.GpuInfoList // Recorded just before the model loaded, free space will be incorrect
loadDuration time.Duration // Record how long it took the model to load loadDuration time.Duration // Record how long it took the model to load
loadProgress float32 loadProgress float32
...@@ -80,16 +80,16 @@ func LoadModel(model string) (*GGML, error) { ...@@ -80,16 +80,16 @@ func LoadModel(model string) (*GGML, error) {
func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options) (LlamaServer, error) { func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, projectors []string, opts api.Options) (LlamaServer, error) {
var err error var err error
var cpuRunner string var cpuRunner string
var estimatedVRAM uint64 var estimate MemoryEstimate
var estimatedTotal uint64
var systemMemory uint64 var systemMemory uint64
gpuCount := len(gpus)
if (len(gpus) == 1 && gpus[0].Library == "cpu") || opts.NumGPU == 0 {
// TODO evaluate system memory to see if we should block the load, or force an unload of another CPU runner
// If the user wants zero GPU layers, reset the gpu list to be CPU/system ram info
if opts.NumGPU == 0 {
gpus = gpu.GetCPUInfo()
}
if len(gpus) == 1 && gpus[0].Library == "cpu" {
cpuRunner = serverForCpu() cpuRunner = serverForCpu()
gpuCount = 0 estimate = EstimateGPULayers(gpus, ggml, projectors, opts)
_, _, estimatedTotal = EstimateGPULayers(gpus, ggml, projectors, opts)
} else { } else {
if gpus[0].Library == "metal" { if gpus[0].Library == "metal" {
memInfo, err := gpu.GetCPUMem() memInfo, err := gpu.GetCPUMem()
...@@ -100,20 +100,19 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr ...@@ -100,20 +100,19 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
slog.Debug("system memory", "total", format.HumanBytes2(systemMemory)) slog.Debug("system memory", "total", format.HumanBytes2(systemMemory))
} }
} }
var layers int estimate = EstimateGPULayers(gpus, ggml, projectors, opts)
layers, estimatedVRAM, estimatedTotal = EstimateGPULayers(gpus, ggml, projectors, opts)
switch { switch {
case gpus[0].Library == "metal" && estimatedVRAM > systemMemory: case gpus[0].Library == "metal" && estimate.VRAMSize > systemMemory:
// disable partial offloading when model is greater than total system memory as this // disable partial offloading when model is greater than total system memory as this
// can lead to locking up the system // can lead to locking up the system
opts.NumGPU = 0 opts.NumGPU = 0
case gpus[0].Library != "metal" && layers == 0: case gpus[0].Library != "metal" && estimate.Layers == 0:
// Don't bother loading into the GPU if no layers can fit // Don't bother loading into the GPU if no layers can fit
cpuRunner = serverForCpu() cpuRunner = serverForCpu()
gpuCount = 0 gpus = gpu.GetCPUInfo()
case opts.NumGPU < 0 && layers > 0 && gpus[0].Library != "cpu": case opts.NumGPU < 0 && estimate.Layers > 0 && gpus[0].Library != "cpu":
opts.NumGPU = layers opts.NumGPU = estimate.Layers
} }
} }
...@@ -232,6 +231,14 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr ...@@ -232,6 +231,14 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
params = append(params, "--parallel", fmt.Sprintf("%d", numParallel)) params = append(params, "--parallel", fmt.Sprintf("%d", numParallel))
if estimate.TensorSplit != "" {
params = append(params, "--tensor-split", estimate.TensorSplit)
}
if estimate.TensorSplit != "" {
params = append(params, "--tensor-split", estimate.TensorSplit)
}
for i := range len(servers) { for i := range len(servers) {
dir := availableServers[servers[i]] dir := availableServers[servers[i]]
if dir == "" { if dir == "" {
...@@ -242,8 +249,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr ...@@ -242,8 +249,7 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
} }
if strings.HasPrefix(servers[i], "cpu") { if strings.HasPrefix(servers[i], "cpu") {
// TODO if we tried a gpu runner first, and it failed, record the error and bubble that back up gpus = gpu.GetCPUInfo()
gpuCount = 0
} }
// Find an availableServers port, retry on each iteration in case the failure was a port conflict race // Find an availableServers port, retry on each iteration in case the failure was a port conflict race
...@@ -303,11 +309,10 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr ...@@ -303,11 +309,10 @@ func NewLlamaServer(gpus gpu.GpuInfoList, model string, ggml *GGML, adapters, pr
cmd: exec.Command(server, finalParams...), cmd: exec.Command(server, finalParams...),
status: NewStatusWriter(os.Stderr), status: NewStatusWriter(os.Stderr),
options: opts, options: opts,
estimatedVRAM: estimatedVRAM, estimate: estimate,
estimatedTotal: estimatedTotal,
sem: semaphore.NewWeighted(int64(numParallel)), sem: semaphore.NewWeighted(int64(numParallel)),
totalLayers: ggml.KV().BlockCount() + 1, totalLayers: ggml.KV().BlockCount() + 1,
gpuCount: gpuCount, gpus: gpus,
done: make(chan error, 1), done: make(chan error, 1),
} }
...@@ -1004,11 +1009,20 @@ func (s *llmServer) Close() error { ...@@ -1004,11 +1009,20 @@ func (s *llmServer) Close() error {
} }
func (s *llmServer) EstimatedVRAM() uint64 { func (s *llmServer) EstimatedVRAM() uint64 {
return s.estimatedVRAM return s.estimate.VRAMSize
} }
func (s *llmServer) EstimatedTotal() uint64 { func (s *llmServer) EstimatedTotal() uint64 {
return s.estimatedTotal return s.estimate.TotalSize
}
func (s *llmServer) EstimatedVRAMByGPU(gpuID string) uint64 {
for i, gpu := range s.gpus {
if gpu.ID == gpuID {
return s.estimate.GPUSizes[i]
}
}
return 0
} }
func parseDurationMs(ms float64) time.Duration { func parseDurationMs(ms float64) time.Duration {
......
...@@ -7,7 +7,6 @@ import ( ...@@ -7,7 +7,6 @@ import (
"log/slog" "log/slog"
"reflect" "reflect"
"runtime" "runtime"
"slices"
"sort" "sort"
"strings" "strings"
"sync" "sync"
...@@ -27,6 +26,7 @@ type LlmRequest struct { ...@@ -27,6 +26,7 @@ type LlmRequest struct {
sessionDuration time.Duration sessionDuration time.Duration
successCh chan *runnerRef successCh chan *runnerRef
errCh chan error errCh chan error
schedAttempts uint
} }
type Scheduler struct { type Scheduler struct {
...@@ -41,6 +41,8 @@ type Scheduler struct { ...@@ -41,6 +41,8 @@ type Scheduler struct {
loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) loadFn func(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList)
newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) newServerFn func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error)
getGpuFn func() gpu.GpuInfoList getGpuFn func() gpu.GpuInfoList
getCpuFn func() gpu.GpuInfoList
reschedDelay time.Duration
} }
var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded") var ErrMaxQueue = fmt.Errorf("server busy, please try again. maximum pending requests exceeded")
...@@ -54,6 +56,8 @@ func InitScheduler(ctx context.Context) *Scheduler { ...@@ -54,6 +56,8 @@ func InitScheduler(ctx context.Context) *Scheduler {
loaded: make(map[string]*runnerRef), loaded: make(map[string]*runnerRef),
newServerFn: llm.NewLlamaServer, newServerFn: llm.NewLlamaServer,
getGpuFn: gpu.GetGPUInfo, getGpuFn: gpu.GetGPUInfo,
getCpuFn: gpu.GetCPUInfo,
reschedDelay: 250 * time.Millisecond,
} }
sched.loadFn = sched.load sched.loadFn = sched.load
return sched return sched
...@@ -105,6 +109,7 @@ func (s *Scheduler) processPending(ctx context.Context) { ...@@ -105,6 +109,7 @@ func (s *Scheduler) processPending(ctx context.Context) {
return return
case pending := <-s.pendingReqCh: case pending := <-s.pendingReqCh:
// Block other requests until we get this pending request running // Block other requests until we get this pending request running
pending.schedAttempts++
if pending.ctx.Err() != nil { if pending.ctx.Err() != nil {
slog.Debug("pending request cancelled or timed out, skipping scheduling") slog.Debug("pending request cancelled or timed out, skipping scheduling")
...@@ -131,7 +136,12 @@ func (s *Scheduler) processPending(ctx context.Context) { ...@@ -131,7 +136,12 @@ func (s *Scheduler) processPending(ctx context.Context) {
} else { } else {
// Either no models are loaded or below envconfig.MaxRunners // Either no models are loaded or below envconfig.MaxRunners
// Get a refreshed GPU list // Get a refreshed GPU list
gpus := s.getGpuFn() var gpus gpu.GpuInfoList
if pending.opts.NumGPU == 0 {
gpus = s.getCpuFn()
} else {
gpus = s.getGpuFn()
}
// Load model for fitting // Load model for fitting
ggml, err := llm.LoadModel(pending.model.ModelPath) ggml, err := llm.LoadModel(pending.model.ModelPath)
...@@ -140,16 +150,22 @@ func (s *Scheduler) processPending(ctx context.Context) { ...@@ -140,16 +150,22 @@ func (s *Scheduler) processPending(ctx context.Context) {
break break
} }
// If we're CPU only mode, just limit by envconfig.MaxRunners above // Evaluate if the model will fit in the available system memory, or if we should unload a model first
// TODO handle system memory exhaustion if len(gpus) == 1 && gpus[0].Library == "cpu" {
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 { if loadedCount == 0 {
slog.Debug("cpu mode with existing models, loading") slog.Debug("cpu mode with first model, loading")
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, gpus)
break break
} }
runnerToExpire = s.maybeFindCPURunnerToUnload(pending, ggml, gpus)
if runnerToExpire == nil {
slog.Debug("cpu mode with available system memory or first model, loading")
s.loadFn(pending, ggml, gpus)
break
}
// else we need to expire a runner
} else if loadedCount == 0 {
// No models loaded. Load the model but prefer the best fit. // No models loaded. Load the model but prefer the best fit.
if loadedCount == 0 {
slog.Debug("loading first model", "model", pending.model.ModelPath) slog.Debug("loading first model", "model", pending.model.ModelPath)
g := pickBestFitGPUs(pending, ggml, gpus) g := pickBestFitGPUs(pending, ggml, gpus)
if g != nil { if g != nil {
...@@ -159,17 +175,45 @@ func (s *Scheduler) processPending(ctx context.Context) { ...@@ -159,17 +175,45 @@ func (s *Scheduler) processPending(ctx context.Context) {
break break
} }
// More than one loaded model, so we have to see if the new one fits if runnerToExpire == nil {
// More than one loaded model, so we have to see if the
// new one fits
//
// We want to avoid loading on any GPUs that have other
// models still loading on them to avoid potential races
// with VRAM consumption ramping up during load
availGpus := s.filterGPUsWithoutLoadingModels(gpus)
// Update free memory from currently loaded models // Update free memory from currently loaded models
s.updateFreeSpace(gpus) s.updateFreeSpace(availGpus)
gpus = pickBestFitGPUs(pending, ggml, gpus) fitGpus := pickBestFitGPUs(pending, ggml, availGpus)
if gpus != nil { if fitGpus != nil {
slog.Debug("new model fits with existing models, loading") slog.Debug("new model fits with existing models, loading")
s.loadFn(pending, ggml, gpus) s.loadFn(pending, ggml, fitGpus)
break
}
// We couldn't find a set of GPUs to fully load the new
// model. If no other models are loading (both GPU lists
// are the same) then we need to unload another model to
// make room
if len(availGpus) < len(gpus) {
// There are other requests pending, and this one
// needs more time, so put it on the back of the
// queue so that we might satisfy other pending
// requests that aren't blocked
go func() {
// Process in a go routine to avoid deadlocking
// the scheduler if our queue is full
slog.Debug("delaying scheduling while other models finish loading", "attempts", pending.schedAttempts, "model", pending.model.ModelPath)
time.Sleep(s.reschedDelay)
s.pendingReqCh <- pending
}()
break break
} }
runnerToExpire = s.findRunnerToUnload() runnerToExpire = s.findRunnerToUnload()
} }
}
if runnerToExpire == nil { if runnerToExpire == nil {
// Shouildn't happen // Shouildn't happen
...@@ -368,17 +412,9 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) { ...@@ -368,17 +412,9 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
s.loadedMu.Lock() s.loadedMu.Lock()
for _, r := range s.loaded { for _, r := range s.loaded {
r.refMu.Lock() r.refMu.Lock()
gpuIDs := make([]string, 0, len(r.gpus))
if r.llama != nil { if r.llama != nil {
// TODO this should be broken down by GPU instead of assuming uniform spread
estimatedVRAMPerGPU := r.llama.EstimatedVRAM() / uint64(len(r.gpus))
for _, gpu := range r.gpus {
gpuIDs = append(gpuIDs, gpu.ID)
}
for _, gpu := range allGpus { for _, gpu := range allGpus {
if slices.Contains(gpuIDs, gpu.ID) { predMap[predKey{gpu.Library, gpu.ID}] += r.llama.EstimatedVRAMByGPU(gpu.ID)
predMap[predKey{gpu.Library, gpu.ID}] += estimatedVRAMPerGPU
}
} }
} else { } else {
slog.Warn("unexpected nil runner reference, memory prediction may be incorrect") slog.Warn("unexpected nil runner reference, memory prediction may be incorrect")
...@@ -401,11 +437,36 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) { ...@@ -401,11 +437,36 @@ func (s *Scheduler) updateFreeSpace(allGpus gpu.GpuInfoList) {
// after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent. // after we start our first runner, then we'll never acount for that, so picking the smallest free value seems prudent.
allGpus[i].FreeMemory = allGpus[i].TotalMemory - p allGpus[i].FreeMemory = allGpus[i].TotalMemory - p
} }
slog.Info("updated VRAM", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory)) slog.Info("updated VRAM based on existing loaded models", "gpu", allGpus[i].ID, "library", allGpus[i].Library, "total", format.HumanBytes2(allGpus[i].TotalMemory), "available", format.HumanBytes2(allGpus[i].FreeMemory))
} }
} }
} }
// While models are loading the VRAM consumption numbers will be indeterminate, so we have
// to avoid scheduling another model on the same GPU(s) that haven't stabilized.
// This routine returns the set of GPUs that do not have an active loading model.
// If all GPUs have loading models, an empty list will be returned (not a single CPU entry)
func (s *Scheduler) filterGPUsWithoutLoadingModels(allGpus gpu.GpuInfoList) gpu.GpuInfoList {
ret := append(gpu.GpuInfoList{}, allGpus...)
s.loadedMu.Lock()
defer s.loadedMu.Unlock()
for _, runner := range s.loaded {
if runner.loading {
slog.Debug("overlapping loads detected", "gpus", runner.gpus, "model", runner.modelPath)
for _, busyGPU := range runner.gpus {
for i := range ret {
if ret[i].ID == busyGPU.ID {
ret = append(ret[:i], ret[i+1:]...)
break
}
}
}
}
}
return ret
}
// TODO consolidate sched_types.go
type runnerRef struct { type runnerRef struct {
refMu sync.Mutex refMu sync.Mutex
// refCond sync.Cond // Signaled on transition from 1 -> 0 refCount // refCond sync.Cond // Signaled on transition from 1 -> 0 refCount
...@@ -487,8 +548,11 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool ...@@ -487,8 +548,11 @@ func (runner *runnerRef) needsReload(ctx context.Context, req *LlmRequest) bool
func (runner *runnerRef) waitForVRAMRecovery() chan interface{} { func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
finished := make(chan interface{}, 1) finished := make(chan interface{}, 1)
// CPU or Metal don't need checking, so no waiting required, windows can page VRAM, and the APIs we query tend to be optimistic on free space // CPU or Metal don't need checking, so no waiting required
if (len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) || runtime.GOOS == "windows" { // windows can page VRAM, only cuda currently can report accurate used vram usage
if len(runner.gpus) == 0 ||
(len(runner.gpus) == 1 && (runner.gpus[0].Library == "cpu" || runner.gpus[0].Library == "metal")) ||
(runtime.GOOS == "windows" && runner.gpus[0].Library != "cuda") {
finished <- struct{}{} finished <- struct{}{}
return finished return finished
} }
...@@ -508,7 +572,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} { ...@@ -508,7 +572,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
for { for {
<-ticker.C <-ticker.C
if time.Now().After(expiresAt) { if time.Now().After(expiresAt) {
slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds()) slog.Warn("gpu VRAM usage didn't recover within timeout", "seconds", time.Since(start).Seconds(), "model", runner.modelPath)
finished <- struct{}{} finished <- struct{}{}
} }
...@@ -521,7 +585,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} { ...@@ -521,7 +585,7 @@ func (runner *runnerRef) waitForVRAMRecovery() chan interface{} {
} }
// If we're within ~80% of the estimated memory usage recovered, bail out // If we're within ~80% of the estimated memory usage recovered, bail out
if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 { if float32(freeMemoryNow-freeMemoryBefore) > float32(runner.estimatedVRAM)*0.8 {
slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds())) slog.Debug(fmt.Sprintf("gpu VRAM free memory converged after %0.2f seconds", time.Since(start).Seconds()), "model", runner.modelPath)
finished <- struct{}{} finished <- struct{}{}
return return
} }
...@@ -558,12 +622,14 @@ func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu. ...@@ -558,12 +622,14 @@ func pickBestFitGPUs(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) gpu.
sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl))) sort.Sort(sort.Reverse(gpu.ByFreeMemory(sgl)))
// First attempt to fit the model into a single GPU // First attempt to fit the model into a single GPU
if !envconfig.SchedSpread {
for _, g := range sgl { for _, g := range sgl {
if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok { if ok, estimatedVRAM = llm.PredictServerFit([]gpu.GpuInfo{g}, ggml, req.model.AdapterPaths, req.model.ProjectorPaths, req.opts); ok {
slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM)) slog.Debug("new model will fit in available VRAM in single GPU, loading", "model", req.model.ModelPath, "gpu", g.ID, "available", g.FreeMemory, "required", format.HumanBytes2(estimatedVRAM))
return []gpu.GpuInfo{g} return []gpu.GpuInfo{g}
} }
} }
}
// TODO future refinements // TODO future refinements
// - if multiple Libraries, see if any single GPU in any Library will fit // - if multiple Libraries, see if any single GPU in any Library will fit
...@@ -586,6 +652,10 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef { ...@@ -586,6 +652,10 @@ func (s *Scheduler) findRunnerToUnload() *runnerRef {
runnerList = append(runnerList, r) runnerList = append(runnerList, r)
} }
s.loadedMu.Unlock() s.loadedMu.Unlock()
if len(runnerList) == 0 {
slog.Debug("no loaded runner to unload")
return nil
}
// In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload // In the future we can enhance the algorithm to be smarter about picking the optimal runner to unload
// e.g., if we have multiple options, will one make room for the request? // e.g., if we have multiple options, will one make room for the request?
...@@ -616,3 +686,18 @@ func (s *Scheduler) unloadAllRunners() { ...@@ -616,3 +686,18 @@ func (s *Scheduler) unloadAllRunners() {
} }
} }
} }
// If other runners are loaded, make sure the pending request will fit in system memory
// If not, pick a runner to unload, else return nil and the request can be loaded
func (s *Scheduler) maybeFindCPURunnerToUnload(req *LlmRequest, ggml *llm.GGML, gpus gpu.GpuInfoList) *runnerRef {
slog.Debug("evaluating if CPU model load will fit in available system memory")
estimate := llm.EstimateGPULayers(gpus, ggml, req.model.ProjectorPaths, req.opts)
if estimate.TotalSize <= gpus[0].FreeMemory {
slog.Debug("cpu inference mode, model fits in available system memory", "model", format.HumanBytes2(estimate.TotalSize), "available", format.HumanBytes2(gpus[0].FreeMemory))
return nil
}
// TODO - optimization: try to find CPU only runners first, or partial offloads with enough in system memory to make room
return s.findRunnerToUnload()
}
...@@ -60,7 +60,7 @@ func TestLoad(t *testing.T) { ...@@ -60,7 +60,7 @@ func TestLoad(t *testing.T) {
err := <-req.errCh err := <-req.errCh
require.Contains(t, err.Error(), "this model may be incompatible") require.Contains(t, err.Error(), "this model may be incompatible")
server := &mockLlm{estimatedVRAM: 10} server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) { s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options) (llm.LlamaServer, error) {
return server, nil return server, nil
} }
...@@ -129,6 +129,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV ...@@ -129,6 +129,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
"tokenizer.ggml.token_type": []int32{0}, "tokenizer.ggml.token_type": []int32{0},
}, []llm.Tensor{ }, []llm.Tensor{
{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}}, {Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
{Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: &bytes.Reader{}},
}) })
require.NoError(t, err) require.NoError(t, err)
...@@ -145,7 +146,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV ...@@ -145,7 +146,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
} }
scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM} scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
return scenario return scenario
} }
...@@ -155,7 +156,7 @@ func TestRequests(t *testing.T) { ...@@ -155,7 +156,7 @@ func TestRequests(t *testing.T) {
// Same model, same request // Same model, same request
scenario1a := newScenario(t, ctx, "ollama-model-1", 10) scenario1a := newScenario(t, ctx, "ollama-model-1", 10)
scenario1a.req.sessionDuration = 0 scenario1a.req.sessionDuration = 5 * time.Millisecond
scenario1b := newScenario(t, ctx, "ollama-model-1", 11) scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
scenario1b.req.model = scenario1a.req.model scenario1b.req.model = scenario1a.req.model
scenario1b.ggml = scenario1a.ggml scenario1b.ggml = scenario1a.ggml
...@@ -166,6 +167,7 @@ func TestRequests(t *testing.T) { ...@@ -166,6 +167,7 @@ func TestRequests(t *testing.T) {
tmpModel := *scenario1a.req.model tmpModel := *scenario1a.req.model
scenario2a.req.model = &tmpModel scenario2a.req.model = &tmpModel
scenario2a.ggml = scenario1a.ggml scenario2a.ggml = scenario1a.ggml
scenario2a.req.sessionDuration = 5 * time.Millisecond
// Multiple loaded models // Multiple loaded models
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
...@@ -181,6 +183,12 @@ func TestRequests(t *testing.T) { ...@@ -181,6 +183,12 @@ func TestRequests(t *testing.T) {
g.FreeMemory = 12 * format.GigaByte g.FreeMemory = 12 * format.GigaByte
return []gpu.GpuInfo{g} return []gpu.GpuInfo{g}
} }
s.getCpuFn = func() gpu.GpuInfoList {
g := gpu.GpuInfo{Library: "cpu"}
g.TotalMemory = 32 * format.GigaByte
g.FreeMemory = 26 * format.GigaByte
return []gpu.GpuInfo{g}
}
s.newServerFn = scenario1a.newServer s.newServerFn = scenario1a.newServer
slog.Info("scenario1a") slog.Info("scenario1a")
s.pendingReqCh <- scenario1a.req s.pendingReqCh <- scenario1a.req
...@@ -309,7 +317,6 @@ func TestGetRunner(t *testing.T) { ...@@ -309,7 +317,6 @@ func TestGetRunner(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done() defer done()
// Same model, same request
scenario1a := newScenario(t, ctx, "ollama-model-1a", 10) scenario1a := newScenario(t, ctx, "ollama-model-1a", 10)
scenario1a.req.sessionDuration = 0 scenario1a.req.sessionDuration = 0
scenario1b := newScenario(t, ctx, "ollama-model-1b", 10) scenario1b := newScenario(t, ctx, "ollama-model-1b", 10)
...@@ -419,7 +426,7 @@ func TestUseLoadedRunner(t *testing.T) { ...@@ -419,7 +426,7 @@ func TestUseLoadedRunner(t *testing.T) {
sessionDuration: 2, sessionDuration: 2,
} }
finished := make(chan *LlmRequest) finished := make(chan *LlmRequest)
llm1 := &mockLlm{} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1, sessionDuration: 1} r1 := &runnerRef{llama: llm1, sessionDuration: 1}
req.useLoadedRunner(r1, finished) req.useLoadedRunner(r1, finished)
require.Equal(t, uint(1), r1.refCount) require.Equal(t, uint(1), r1.refCount)
...@@ -452,8 +459,8 @@ func TestUpdateFreeSpace(t *testing.T) { ...@@ -452,8 +459,8 @@ func TestUpdateFreeSpace(t *testing.T) {
gpus[0].FreeMemory = 900 gpus[0].FreeMemory = 900
gpus[1].TotalMemory = 2000 gpus[1].TotalMemory = 2000
gpus[1].FreeMemory = 1900 gpus[1].FreeMemory = 1900
llm1 := &mockLlm{estimatedVRAM: 100} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
llm2 := &mockLlm{estimatedVRAM: 200} llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
r1 := &runnerRef{llama: llm1, gpus: gpus} r1 := &runnerRef{llama: llm1, gpus: gpus}
r2 := &runnerRef{llama: llm2, gpus: gpus} r2 := &runnerRef{llama: llm2, gpus: gpus}
...@@ -464,8 +471,42 @@ func TestUpdateFreeSpace(t *testing.T) { ...@@ -464,8 +471,42 @@ func TestUpdateFreeSpace(t *testing.T) {
s.loadedMu.Unlock() s.loadedMu.Unlock()
s.updateFreeSpace(gpus) s.updateFreeSpace(gpus)
require.Equal(t, uint64(850), gpus[0].FreeMemory) require.Equal(t, uint64(1000-50-125), gpus[0].FreeMemory)
require.Equal(t, uint64(1850), gpus[1].FreeMemory) require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory)
}
func TestFilterGPUsWithoutLoadingModels(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done()
gpus := gpu.GpuInfoList{
{
Library: "cuda",
ID: "0",
},
{
Library: "cuda",
ID: "1",
},
}
r1 := &runnerRef{gpus: gpu.GpuInfoList{gpus[0]}, loading: true}
s := InitScheduler(ctx)
s.loadedMu.Lock()
s.loaded["a"] = r1
s.loadedMu.Unlock()
tmp := s.filterGPUsWithoutLoadingModels(gpus)
require.Len(t, tmp, 1)
require.Equal(t, "1", tmp[0].ID)
r1.gpus = gpu.GpuInfoList{gpus[1]}
tmp = s.filterGPUsWithoutLoadingModels(gpus)
require.Len(t, tmp, 1)
require.Equal(t, "0", tmp[0].ID)
r1.gpus = gpu.GpuInfoList{}
tmp = s.filterGPUsWithoutLoadingModels(gpus)
require.Len(t, tmp, 2)
} }
func TestFindRunnerToUnload(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) {
...@@ -492,7 +533,7 @@ func TestNeedsReload(t *testing.T) { ...@@ -492,7 +533,7 @@ func TestNeedsReload(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done() defer done()
llm := &mockLlm{} llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
do := api.DefaultOptions() do := api.DefaultOptions()
runner := &runnerRef{ runner := &runnerRef{
model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}}, model: &Model{AdapterPaths: []string{"adapter1"}, ProjectorPaths: []string{"projector1"}},
...@@ -535,8 +576,8 @@ func TestUnloadAllRunners(t *testing.T) { ...@@ -535,8 +576,8 @@ func TestUnloadAllRunners(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer done() defer done()
llm1 := &mockLlm{} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
llm2 := &mockLlm{} llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.unloadAllRunners() s.unloadAllRunners()
...@@ -554,7 +595,7 @@ func TestUnloadAllRunners(t *testing.T) { ...@@ -554,7 +595,7 @@ func TestUnloadAllRunners(t *testing.T) {
} }
func TestUnload(t *testing.T) { func TestUnload(t *testing.T) {
llm1 := &mockLlm{} llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
r1 := &runnerRef{llama: llm1} r1 := &runnerRef{llama: llm1}
r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}} r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}}
r1.unload() r1.unload()
...@@ -577,6 +618,7 @@ type mockLlm struct { ...@@ -577,6 +618,7 @@ type mockLlm struct {
closeCalled bool closeCalled bool
estimatedVRAM uint64 estimatedVRAM uint64
estimatedTotal uint64 estimatedTotal uint64
estimatedVRAMByGPU map[string]uint64
} }
func (s *mockLlm) Ping(ctx context.Context) error { return s.pingResp } func (s *mockLlm) Ping(ctx context.Context) error { return s.pingResp }
...@@ -599,3 +641,4 @@ func (s *mockLlm) Close() error { ...@@ -599,3 +641,4 @@ func (s *mockLlm) Close() error {
} }
func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM } func (s *mockLlm) EstimatedVRAM() uint64 { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal } func (s *mockLlm) EstimatedTotal() uint64 { return s.estimatedTotal }
func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment