Commit d6e3b645 authored by Daniel Hiltgen's avatar Daniel Hiltgen
Browse files

Fix concurrency for CPU mode

Prior refactoring passes accidentally removed the logic to bypass VRAM
checks for CPU loads.  This adds that back, along with test coverage.

This also fixes loaded map access in the unit test to be behind the mutex which was
likely the cause of various flakes in the tests.
parent 114c932a
...@@ -149,6 +149,14 @@ func (s *Scheduler) processPending(ctx context.Context) { ...@@ -149,6 +149,14 @@ func (s *Scheduler) processPending(ctx context.Context) {
break break
} }
// If we're CPU only mode, just limit by loadedMax above
// TODO handle system memory exhaustion
if (len(gpus) == 1 && gpus[0].Library == "cpu") || pending.opts.NumGPU == 0 {
slog.Debug("cpu mode with existing models, loading")
s.loadFn(pending, ggml, gpus)
break
}
// No models loaded. Load the model but prefer the best fit. // No models loaded. Load the model but prefer the best fit.
if loadedCount == 0 { if loadedCount == 0 {
slog.Debug("loading first model", "model", pending.model.ModelPath) slog.Debug("loading first model", "model", pending.model.ModelPath)
......
...@@ -28,19 +28,33 @@ func TestInitScheduler(t *testing.T) { ...@@ -28,19 +28,33 @@ func TestInitScheduler(t *testing.T) {
ctx, done := context.WithCancel(context.Background()) ctx, done := context.WithCancel(context.Background())
defer done() defer done()
initialMax := loadedMax initialMax := loadedMax
initialParallel := numParallel
s := InitScheduler(ctx) s := InitScheduler(ctx)
require.Equal(t, initialMax, loadedMax) require.Equal(t, initialMax, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue") os.Setenv("OLLAMA_MAX_LOADED_MODELS", "blue")
s = InitScheduler(ctx) s = InitScheduler(ctx)
require.Equal(t, initialMax, loadedMax) require.Equal(t, initialMax, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0") os.Setenv("OLLAMA_MAX_LOADED_MODELS", "0")
s = InitScheduler(ctx) s = InitScheduler(ctx)
require.Equal(t, 0, loadedMax) require.Equal(t, 0, loadedMax)
s.loadedMu.Lock()
require.NotNil(t, s.loaded) require.NotNil(t, s.loaded)
s.loadedMu.Unlock()
os.Setenv("OLLAMA_NUM_PARALLEL", "blue")
_ = InitScheduler(ctx)
require.Equal(t, initialParallel, numParallel)
os.Setenv("OLLAMA_NUM_PARALLEL", "10")
_ = InitScheduler(ctx)
require.Equal(t, 10, numParallel)
} }
func TestLoad(t *testing.T) { func TestLoad(t *testing.T) {
...@@ -51,6 +65,7 @@ func TestLoad(t *testing.T) { ...@@ -51,6 +65,7 @@ func TestLoad(t *testing.T) {
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
model: &Model{ModelPath: "foo"}, model: &Model{ModelPath: "foo"},
opts: api.DefaultOptions(),
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
sessionDuration: 2, sessionDuration: 2,
...@@ -63,7 +78,9 @@ func TestLoad(t *testing.T) { ...@@ -63,7 +78,9 @@ func TestLoad(t *testing.T) {
s.load(req, ggml, gpus) s.load(req, ggml, gpus)
require.Len(t, req.successCh, 0) require.Len(t, req.successCh, 0)
require.Len(t, req.errCh, 1) require.Len(t, req.errCh, 1)
s.loadedMu.Lock()
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
s.loadedMu.Unlock()
err := <-req.errCh err := <-req.errCh
require.Contains(t, err.Error(), "this model may be incompatible") require.Contains(t, err.Error(), "this model may be incompatible")
...@@ -78,7 +95,9 @@ func TestLoad(t *testing.T) { ...@@ -78,7 +95,9 @@ func TestLoad(t *testing.T) {
case resp := <-req.successCh: case resp := <-req.successCh:
require.Equal(t, uint64(10), resp.estimatedVRAM) require.Equal(t, uint64(10), resp.estimatedVRAM)
require.Equal(t, uint(1), resp.refCount) require.Equal(t, uint(1), resp.refCount)
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
} }
req.model.ModelPath = "dummy_model_path" req.model.ModelPath = "dummy_model_path"
...@@ -90,7 +109,9 @@ func TestLoad(t *testing.T) { ...@@ -90,7 +109,9 @@ func TestLoad(t *testing.T) {
case resp := <-req.successCh: case resp := <-req.successCh:
t.Errorf("unexpected success %v", resp) t.Errorf("unexpected success %v", resp)
} }
s.loadedMu.Lock()
runner := s.loaded["dummy_model_path"] runner := s.loaded["dummy_model_path"]
s.loadedMu.Unlock()
require.NotNil(t, runner) require.NotNil(t, runner)
require.Equal(t, uint(0), runner.refCount) require.Equal(t, uint(0), runner.refCount)
time.Sleep(1 * time.Millisecond) time.Sleep(1 * time.Millisecond)
...@@ -143,6 +164,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV ...@@ -143,6 +164,7 @@ func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedV
scenario.req = &LlmRequest{ scenario.req = &LlmRequest{
ctx: scenario.ctx, ctx: scenario.ctx,
model: model, model: model,
opts: api.DefaultOptions(),
sessionDuration: 5 * time.Millisecond, sessionDuration: 5 * time.Millisecond,
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
errCh: make(chan error, 1), errCh: make(chan error, 1),
...@@ -171,7 +193,9 @@ func TestRequests(t *testing.T) { ...@@ -171,7 +193,9 @@ func TestRequests(t *testing.T) {
// Multiple loaded models // Multiple loaded models
scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte) scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte) scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte)
scenario3c := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded scenario3c := newScenario(t, ctx, "ollama-model-4a", 30)
scenario3c.req.opts.NumGPU = 0 // CPU load, will be allowed
scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.getGpuFn = func() gpu.GpuInfoList { s.getGpuFn = func() gpu.GpuInfoList {
...@@ -240,7 +264,9 @@ func TestRequests(t *testing.T) { ...@@ -240,7 +264,9 @@ func TestRequests(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
loadedMax = 0 loadedMax = 0
s.newServerFn = scenario3b.newServer s.newServerFn = scenario3b.newServer
...@@ -254,28 +280,52 @@ func TestRequests(t *testing.T) { ...@@ -254,28 +280,52 @@ func TestRequests(t *testing.T) {
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
s.loadedMu.Lock()
require.Len(t, s.loaded, 2) require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
// Try to load a model that wont fit // This is a CPU load with NumGPU = 0 so it should load
s.newServerFn = scenario3c.newServer s.newServerFn = scenario3c.newServer
slog.Info("scenario3c") slog.Info("scenario3c")
require.Len(t, s.loaded, 2) s.pendingReqCh <- scenario3c.req
select {
case resp := <-scenario3c.req.successCh:
require.Equal(t, resp.llama, scenario3c.srv)
require.Len(t, s.pendingReqCh, 0)
require.Len(t, scenario3c.req.errCh, 0)
case <-ctx.Done():
t.Errorf("timeout")
}
s.loadedMu.Lock()
require.Len(t, s.loaded, 3)
s.loadedMu.Unlock()
// Try to load a model that wont fit
s.newServerFn = scenario3d.newServer
slog.Info("scenario3d")
s.loadedMu.Lock()
require.Len(t, s.loaded, 3)
s.loadedMu.Unlock()
scenario3a.ctxDone() // Won't help since this one isn't big enough to make room scenario3a.ctxDone() // Won't help since this one isn't big enough to make room
time.Sleep(2 * time.Millisecond) time.Sleep(2 * time.Millisecond)
s.pendingReqCh <- scenario3c.req s.pendingReqCh <- scenario3d.req
// finish prior request, so new model can load // finish prior request, so new model can load
time.Sleep(6 * time.Millisecond) time.Sleep(6 * time.Millisecond)
require.Len(t, s.loaded, 1) s.loadedMu.Lock()
require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
scenario3b.ctxDone() scenario3b.ctxDone()
select { select {
case resp := <-scenario3c.req.successCh: case resp := <-scenario3d.req.successCh:
require.Equal(t, resp.llama, scenario3c.srv) require.Equal(t, resp.llama, scenario3d.srv)
require.Len(t, s.pendingReqCh, 0) require.Len(t, s.pendingReqCh, 0)
require.Len(t, scenario3c.req.errCh, 0) require.Len(t, scenario3d.req.errCh, 0)
case <-ctx.Done(): case <-ctx.Done():
t.Errorf("timeout") t.Errorf("timeout")
} }
require.Len(t, s.loaded, 1) s.loadedMu.Lock()
require.Len(t, s.loaded, 2)
s.loadedMu.Unlock()
} }
func TestGetRunner(t *testing.T) { func TestGetRunner(t *testing.T) {
...@@ -318,7 +368,9 @@ func TestGetRunner(t *testing.T) { ...@@ -318,7 +368,9 @@ func TestGetRunner(t *testing.T) {
t.Errorf("timeout") t.Errorf("timeout")
} }
scenario1a.ctxDone() scenario1a.ctxDone()
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
scenario1c.req.model.ModelPath = "bad path" scenario1c.req.model.ModelPath = "bad path"
slog.Info("scenario1c") slog.Info("scenario1c")
...@@ -328,7 +380,9 @@ func TestGetRunner(t *testing.T) { ...@@ -328,7 +380,9 @@ func TestGetRunner(t *testing.T) {
require.Len(t, errCh1c, 0) require.Len(t, errCh1c, 0)
time.Sleep(5 * time.Millisecond) time.Sleep(5 * time.Millisecond)
s.loadedMu.Lock()
require.Len(t, s.loaded, 0) require.Len(t, s.loaded, 0)
s.loadedMu.Unlock()
require.Len(t, errCh1c, 1) require.Len(t, errCh1c, 1)
err = <-errCh1c err = <-errCh1c
require.Contains(t, err.Error(), "bad path") require.Contains(t, err.Error(), "bad path")
...@@ -358,7 +412,9 @@ func TestPrematureExpired(t *testing.T) { ...@@ -358,7 +412,9 @@ func TestPrematureExpired(t *testing.T) {
require.Equal(t, resp.llama, scenario1a.srv) require.Equal(t, resp.llama, scenario1a.srv)
require.Len(t, s.pendingReqCh, 0) require.Len(t, s.pendingReqCh, 0)
require.Len(t, errCh1a, 0) require.Len(t, errCh1a, 0)
s.loadedMu.Lock()
require.Len(t, s.loaded, 1) require.Len(t, s.loaded, 1)
s.loadedMu.Unlock()
slog.Info("sending premature expired event now") slog.Info("sending premature expired event now")
s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
case <-ctx.Done(): case <-ctx.Done():
...@@ -383,6 +439,7 @@ func TestUseLoadedRunner(t *testing.T) { ...@@ -383,6 +439,7 @@ func TestUseLoadedRunner(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
req := &LlmRequest{ req := &LlmRequest{
ctx: ctx, ctx: ctx,
opts: api.DefaultOptions(),
successCh: make(chan *runnerRef, 1), successCh: make(chan *runnerRef, 1),
sessionDuration: 2, sessionDuration: 2,
} }
...@@ -426,8 +483,10 @@ func TestUpdateFreeSpace(t *testing.T) { ...@@ -426,8 +483,10 @@ func TestUpdateFreeSpace(t *testing.T) {
r2 := &runnerRef{llama: llm2, gpus: gpus} r2 := &runnerRef{llama: llm2, gpus: gpus}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
s.updateFreeSpace(gpus) s.updateFreeSpace(gpus)
require.Equal(t, uint64(850), gpus[0].FreeMemory) require.Equal(t, uint64(850), gpus[0].FreeMemory)
...@@ -437,13 +496,18 @@ func TestUpdateFreeSpace(t *testing.T) { ...@@ -437,13 +496,18 @@ func TestUpdateFreeSpace(t *testing.T) {
func TestFindRunnerToUnload(t *testing.T) { func TestFindRunnerToUnload(t *testing.T) {
ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond) ctx, done := context.WithTimeout(context.Background(), 5*time.Millisecond)
defer done() defer done()
req := &LlmRequest{ctx: ctx} req := &LlmRequest{
ctx: ctx,
opts: api.DefaultOptions(),
}
r1 := &runnerRef{refCount: 1, sessionDuration: 1} r1 := &runnerRef{refCount: 1, sessionDuration: 1}
r2 := &runnerRef{sessionDuration: 2} r2 := &runnerRef{sessionDuration: 2}
s := InitScheduler(ctx) s := InitScheduler(ctx)
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
resp := s.findRunnerToUnload(req) resp := s.findRunnerToUnload(req)
require.Equal(t, r2, resp) require.Equal(t, r2, resp)
...@@ -458,10 +522,11 @@ func TestNeedsReload(t *testing.T) { ...@@ -458,10 +522,11 @@ func TestNeedsReload(t *testing.T) {
defer done() defer done()
llm := &mockLlm{} llm := &mockLlm{}
do := api.DefaultOptions()
runner := &runnerRef{ runner := &runnerRef{
adapters: []string{"adapter1"}, adapters: []string{"adapter1"},
projectors: []string{"projector1"}, projectors: []string{"projector1"},
Options: &api.Options{}, Options: &do,
llama: llm, llama: llm,
} }
req := &LlmRequest{ req := &LlmRequest{
...@@ -469,7 +534,7 @@ func TestNeedsReload(t *testing.T) { ...@@ -469,7 +534,7 @@ func TestNeedsReload(t *testing.T) {
AdapterPaths: []string{"adapter2"}, AdapterPaths: []string{"adapter2"},
ProjectorPaths: []string{"projector2"}, ProjectorPaths: []string{"projector2"},
}, },
opts: api.Options{}, opts: api.DefaultOptions(),
} }
resp := runner.needsReload(ctx, req) resp := runner.needsReload(ctx, req)
require.True(t, resp) require.True(t, resp)
...@@ -508,8 +573,10 @@ func TestUnloadAllRunners(t *testing.T) { ...@@ -508,8 +573,10 @@ func TestUnloadAllRunners(t *testing.T) {
r1 := &runnerRef{llama: llm1} r1 := &runnerRef{llama: llm1}
r2 := &runnerRef{llama: llm2} r2 := &runnerRef{llama: llm2}
s.loadedMu.Lock()
s.loaded["a"] = r1 s.loaded["a"] = r1
s.loaded["b"] = r2 s.loaded["b"] = r2
s.loadedMu.Unlock()
s.unloadAllRunners() s.unloadAllRunners()
require.True(t, llm1.closeCalled) require.True(t, llm1.closeCalled)
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment