sched_test.go 20.4 KB
Newer Older
Daniel Hiltgen's avatar
Daniel Hiltgen committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
package server

import (
	"bytes"
	"context"
	"encoding/binary"
	"fmt"
	"log/slog"
	"os"
	"testing"
	"time"

	"github.com/ollama/ollama/api"
	"github.com/ollama/ollama/app/lifecycle"
Michael Yang's avatar
lint  
Michael Yang committed
15
	"github.com/ollama/ollama/envconfig"
Daniel Hiltgen's avatar
Daniel Hiltgen committed
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
	"github.com/ollama/ollama/format"
	"github.com/ollama/ollama/gpu"
	"github.com/ollama/ollama/llm"
	"github.com/stretchr/testify/require"
)

func init() {
	os.Setenv("OLLAMA_DEBUG", "1")
	lifecycle.InitLogging()
}

func TestInitScheduler(t *testing.T) {
	ctx, done := context.WithCancel(context.Background())
	defer done()
	s := InitScheduler(ctx)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
31
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
32
	require.NotNil(t, s.loaded)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
33
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
34
35
36
}

func TestLoad(t *testing.T) {
37
	ctx, done := context.WithTimeout(context.Background(), 20*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
38
39
	defer done()
	s := InitScheduler(ctx)
40
	var ggml *llm.GGML // value not used in tests
Daniel Hiltgen's avatar
Daniel Hiltgen committed
41
42
43
	req := &LlmRequest{
		ctx:             ctx,
		model:           &Model{ModelPath: "foo"},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
44
		opts:            api.DefaultOptions(),
Daniel Hiltgen's avatar
Daniel Hiltgen committed
45
46
		successCh:       make(chan *runnerRef, 1),
		errCh:           make(chan error, 1),
47
		sessionDuration: &api.Duration{Duration: 2 * time.Second},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
48
49
	}
	// Fail to load model first
Daniel Hiltgen's avatar
Daniel Hiltgen committed
50
	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
51
52
53
		return nil, fmt.Errorf("something failed to load model blah")
	}
	gpus := gpu.GpuInfoList{}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
54
	s.load(req, ggml, gpus, 0)
Michael Yang's avatar
lint  
Michael Yang committed
55
	require.Empty(t, req.successCh)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
56
	require.Len(t, req.errCh, 1)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
57
	s.loadedMu.Lock()
Michael Yang's avatar
lint  
Michael Yang committed
58
	require.Empty(t, s.loaded)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
59
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
60
61
62
	err := <-req.errCh
	require.Contains(t, err.Error(), "this model may be incompatible")

63
	server := &mockLlm{estimatedVRAM: 10, estimatedVRAMByGPU: map[string]uint64{}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
64
	s.newServerFn = func(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
65
66
		return server, nil
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
67
	s.load(req, ggml, gpus, 0)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
68
69
70
71
72
73
	select {
	case err := <-req.errCh:
		require.NoError(t, err)
	case resp := <-req.successCh:
		require.Equal(t, uint64(10), resp.estimatedVRAM)
		require.Equal(t, uint(1), resp.refCount)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
74
		s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
75
		require.Len(t, s.loaded, 1)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
76
		s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
77
78
79
80
	}

	req.model.ModelPath = "dummy_model_path"
	server.waitResp = fmt.Errorf("wait failure")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
81
	s.load(req, ggml, gpus, 0)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
82
83
84
85
	select {
	case err := <-req.errCh:
		require.Contains(t, err.Error(), "wait failure")
	case resp := <-req.successCh:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
86
		t.Fatalf("unexpected success %v", resp)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
87
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
88
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
89
	runner := s.loaded["dummy_model_path"]
Daniel Hiltgen's avatar
Daniel Hiltgen committed
90
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
91
92
	require.NotNil(t, runner)
	require.Equal(t, uint(0), runner.refCount)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
93
	time.Sleep(1 * time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
94
95
96
97
98
99
100
101
	require.Len(t, s.expiredCh, 1)
}

type bundle struct {
	ctx     context.Context //nolint:containedctx
	ctxDone func()
	srv     *mockLlm
	req     *LlmRequest
102
	ggml    *llm.GGML
Daniel Hiltgen's avatar
Daniel Hiltgen committed
103
104
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
105
func (scenario *bundle) newServer(gpus gpu.GpuInfoList, model string, ggml *llm.GGML, adapters []string, projectors []string, opts api.Options, numParallel int) (llm.LlamaServer, error) {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
106
107
108
109
110
111
112
113
114
	return scenario.srv, nil
}

func newScenario(t *testing.T, ctx context.Context, modelName string, estimatedVRAM uint64) *bundle {
	scenario := &bundle{}
	scenario.ctx, scenario.ctxDone = context.WithCancel(ctx)
	t.Helper()

	f, err := os.CreateTemp(t.TempDir(), modelName)
Michael Yang's avatar
lint  
Michael Yang committed
115
	require.NoError(t, err)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
	defer f.Close()

	gguf := llm.NewGGUFV3(binary.LittleEndian)
	err = gguf.Encode(f, llm.KV{
		"general.architecture":          "llama",
		"general.name":                  "name",
		"llama.context_length":          uint32(32),
		"llama.embedding_length":        uint32(4096),
		"llama.block_count":             uint32(1),
		"llama.attention.head_count":    uint32(32),
		"llama.attention.head_count_kv": uint32(32),
		"tokenizer.ggml.tokens":         []string{" "},
		"tokenizer.ggml.scores":         []float32{0},
		"tokenizer.ggml.token_type":     []int32{0},
	}, []llm.Tensor{
131
132
		{Name: "blk.0.attn.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
		{Name: "output.weight", Kind: uint32(0), Offset: uint64(0), Shape: []uint64{1, 1, 1, 1}, WriterTo: bytes.NewReader(make([]byte, 32))},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
133
	})
Michael Yang's avatar
lint  
Michael Yang committed
134
	require.NoError(t, err)
135

Daniel Hiltgen's avatar
Daniel Hiltgen committed
136
137
	fname := f.Name()
	model := &Model{Name: modelName, ModelPath: fname}
138
	scenario.ggml, err = llm.LoadModel(model.ModelPath, 0)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
139
	require.NoError(t, err)
140

Daniel Hiltgen's avatar
Daniel Hiltgen committed
141
142
143
	scenario.req = &LlmRequest{
		ctx:             scenario.ctx,
		model:           model,
Daniel Hiltgen's avatar
Daniel Hiltgen committed
144
		opts:            api.DefaultOptions(),
145
		sessionDuration: &api.Duration{Duration: 5 * time.Millisecond},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
146
147
148
		successCh:       make(chan *runnerRef, 1),
		errCh:           make(chan error, 1),
	}
149
	scenario.srv = &mockLlm{estimatedVRAM: estimatedVRAM, estimatedVRAMByGPU: map[string]uint64{"": estimatedVRAM}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
150
151
152
153
	return scenario
}

func TestRequests(t *testing.T) {
154
	ctx, done := context.WithTimeout(context.Background(), 10*time.Second)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
155
156
157
158
	defer done()

	// Same model, same request
	scenario1a := newScenario(t, ctx, "ollama-model-1", 10)
159
	scenario1a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
160
161
	scenario1b := newScenario(t, ctx, "ollama-model-1", 11)
	scenario1b.req.model = scenario1a.req.model
162
	scenario1b.ggml = scenario1a.ggml
163
	scenario1b.req.sessionDuration = &api.Duration{Duration: 0}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
164
165
166

	// simple reload of same model
	scenario2a := newScenario(t, ctx, "ollama-model-1", 20)
167
168
	tmpModel := *scenario1a.req.model
	scenario2a.req.model = &tmpModel
169
	scenario2a.ggml = scenario1a.ggml
170
	scenario2a.req.sessionDuration = &api.Duration{Duration: 5 * time.Millisecond}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
171
172
173
174

	// Multiple loaded models
	scenario3a := newScenario(t, ctx, "ollama-model-3a", 1*format.GigaByte)
	scenario3b := newScenario(t, ctx, "ollama-model-3b", 24*format.GigaByte)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
175
176
177
	scenario3c := newScenario(t, ctx, "ollama-model-4a", 30)
	scenario3c.req.opts.NumGPU = 0                           // CPU load, will be allowed
	scenario3d := newScenario(t, ctx, "ollama-model-3c", 30) // Needs prior unloaded
Daniel Hiltgen's avatar
Daniel Hiltgen committed
178
179
180
181
182
183
184
185

	s := InitScheduler(ctx)
	s.getGpuFn = func() gpu.GpuInfoList {
		g := gpu.GpuInfo{Library: "metal"}
		g.TotalMemory = 24 * format.GigaByte
		g.FreeMemory = 12 * format.GigaByte
		return []gpu.GpuInfo{g}
	}
186
187
188
189
190
191
	s.getCpuFn = func() gpu.GpuInfoList {
		g := gpu.GpuInfo{Library: "cpu"}
		g.TotalMemory = 32 * format.GigaByte
		g.FreeMemory = 26 * format.GigaByte
		return []gpu.GpuInfo{g}
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
192
193
194
195
196
197
198
199
	s.newServerFn = scenario1a.newServer
	slog.Info("scenario1a")
	s.pendingReqCh <- scenario1a.req
	require.Len(t, s.pendingReqCh, 1)
	s.Run(ctx)
	select {
	case resp := <-scenario1a.req.successCh:
		require.Equal(t, resp.llama, scenario1a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
200
201
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario1a.req.errCh)
202
203
	case err := <-scenario1a.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
204
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
205
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
206
207
208
209
210
211
212
213
214
	}

	// Same runner as first request due to not needing a reload
	s.newServerFn = scenario1b.newServer
	slog.Info("scenario1b")
	s.pendingReqCh <- scenario1b.req
	select {
	case resp := <-scenario1b.req.successCh:
		require.Equal(t, resp.llama, scenario1a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
215
216
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario1b.req.errCh)
217
218
	case err := <-scenario1b.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
219
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
220
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
221
222
223
224
225
226
227
228
229
230
231
232
233
234
	}

	// Trigger a reload
	s.newServerFn = scenario2a.newServer
	scenario2a.req.model.AdapterPaths = []string{"new"}
	slog.Info("scenario2a")
	s.pendingReqCh <- scenario2a.req
	// finish first two requests, so model can reload
	time.Sleep(1 * time.Millisecond)
	scenario1a.ctxDone()
	scenario1b.ctxDone()
	select {
	case resp := <-scenario2a.req.successCh:
		require.Equal(t, resp.llama, scenario2a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
235
236
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario2a.req.errCh)
237
238
	case err := <-scenario2a.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
239
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
240
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
241
242
	}

243
	envconfig.MaxRunners = 1
Daniel Hiltgen's avatar
Daniel Hiltgen committed
244
245
246
247
248
249
250
251
252
	s.newServerFn = scenario3a.newServer
	slog.Info("scenario3a")
	s.pendingReqCh <- scenario3a.req
	// finish prior request, so new model can load
	time.Sleep(1 * time.Millisecond)
	scenario2a.ctxDone()
	select {
	case resp := <-scenario3a.req.successCh:
		require.Equal(t, resp.llama, scenario3a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
253
254
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario3a.req.errCh)
255
256
	case err := <-scenario3a.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
257
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
258
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
259
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
260
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
261
	require.Len(t, s.loaded, 1)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
262
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
263

264
	envconfig.MaxRunners = 0
Daniel Hiltgen's avatar
Daniel Hiltgen committed
265
266
267
268
269
270
	s.newServerFn = scenario3b.newServer
	slog.Info("scenario3b")
	s.pendingReqCh <- scenario3b.req
	select {
	case resp := <-scenario3b.req.successCh:
		require.Equal(t, resp.llama, scenario3b.srv)
Michael Yang's avatar
lint  
Michael Yang committed
271
272
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario3b.req.errCh)
273
274
	case err := <-scenario3b.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
275
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
276
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
277
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
278
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
279
	require.Len(t, s.loaded, 2)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
280
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
281

Daniel Hiltgen's avatar
Daniel Hiltgen committed
282
	// This is a CPU load with NumGPU = 0 so it should load
Daniel Hiltgen's avatar
Daniel Hiltgen committed
283
284
	s.newServerFn = scenario3c.newServer
	slog.Info("scenario3c")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
285
286
287
288
	s.pendingReqCh <- scenario3c.req
	select {
	case resp := <-scenario3c.req.successCh:
		require.Equal(t, resp.llama, scenario3c.srv)
Michael Yang's avatar
lint  
Michael Yang committed
289
290
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario3c.req.errCh)
291
292
	case err := <-scenario3c.req.errCh:
		t.Fatal(err.Error())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
293
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
294
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
295
296
297
298
299
300
301
302
303
304
305
	}
	s.loadedMu.Lock()
	require.Len(t, s.loaded, 3)
	s.loadedMu.Unlock()

	// Try to load a model that wont fit
	s.newServerFn = scenario3d.newServer
	slog.Info("scenario3d")
	s.loadedMu.Lock()
	require.Len(t, s.loaded, 3)
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
306
307
	scenario3a.ctxDone() // Won't help since this one isn't big enough to make room
	time.Sleep(2 * time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
308
	s.pendingReqCh <- scenario3d.req
Daniel Hiltgen's avatar
Daniel Hiltgen committed
309
310
	// finish prior request, so new model can load
	time.Sleep(6 * time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
311
312
313
	s.loadedMu.Lock()
	require.Len(t, s.loaded, 2)
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
314
315
	scenario3b.ctxDone()
	select {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
316
317
	case resp := <-scenario3d.req.successCh:
		require.Equal(t, resp.llama, scenario3d.srv)
Michael Yang's avatar
lint  
Michael Yang committed
318
319
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, scenario3d.req.errCh)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
320
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
321
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
322
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
323
324
325
	s.loadedMu.Lock()
	require.Len(t, s.loaded, 2)
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
326
327
328
}

func TestGetRunner(t *testing.T) {
329
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
330
331
332
	defer done()

	scenario1a := newScenario(t, ctx, "ollama-model-1a", 10)
333
	scenario1a.req.sessionDuration = &api.Duration{Duration: 0}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
334
	scenario1b := newScenario(t, ctx, "ollama-model-1b", 10)
335
	scenario1b.req.sessionDuration = &api.Duration{Duration: 0}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
336
	scenario1c := newScenario(t, ctx, "ollama-model-1c", 10)
337
	scenario1c.req.sessionDuration = &api.Duration{Duration: 0}
338
	envconfig.MaxQueuedRequests = 1
Daniel Hiltgen's avatar
Daniel Hiltgen committed
339
340
341
342
343
344
345
346
347
348
349
350
351
352
	s := InitScheduler(ctx)
	s.getGpuFn = func() gpu.GpuInfoList {
		g := gpu.GpuInfo{Library: "metal"}
		g.TotalMemory = 24 * format.GigaByte
		g.FreeMemory = 12 * format.GigaByte
		return []gpu.GpuInfo{g}
	}
	s.newServerFn = scenario1a.newServer
	slog.Info("scenario1a")
	successCh1a, errCh1a := s.GetRunner(scenario1a.ctx, scenario1a.req.model, scenario1a.req.opts, scenario1a.req.sessionDuration)
	require.Len(t, s.pendingReqCh, 1)
	slog.Info("scenario1b")
	successCh1b, errCh1b := s.GetRunner(scenario1b.ctx, scenario1b.req.model, scenario1b.req.opts, scenario1b.req.sessionDuration)
	require.Len(t, s.pendingReqCh, 1)
Michael Yang's avatar
lint  
Michael Yang committed
353
	require.Empty(t, successCh1b)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
354
355
356
357
358
359
360
	require.Len(t, errCh1b, 1)
	err := <-errCh1b
	require.Contains(t, err.Error(), "server busy")
	s.Run(ctx)
	select {
	case resp := <-successCh1a:
		require.Equal(t, resp.llama, scenario1a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
361
362
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, errCh1a)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
363
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
364
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
365
366
	}
	scenario1a.ctxDone()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
367
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
368
	require.Len(t, s.loaded, 1)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
369
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
370
371
372
373

	scenario1c.req.model.ModelPath = "bad path"
	slog.Info("scenario1c")
	successCh1c, errCh1c := s.GetRunner(scenario1c.ctx, scenario1c.req.model, scenario1c.req.opts, scenario1c.req.sessionDuration)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
374
	// Starts in pending channel, then should be quickly processsed to return an error
375
	time.Sleep(5 * time.Millisecond)
Michael Yang's avatar
lint  
Michael Yang committed
376
	require.Empty(t, successCh1c)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
377
	s.loadedMu.Lock()
Michael Yang's avatar
lint  
Michael Yang committed
378
	require.Empty(t, s.loaded)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
379
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
380
381
382
383
384
385
386
387
	require.Len(t, errCh1c, 1)
	err = <-errCh1c
	require.Contains(t, err.Error(), "bad path")
	scenario1b.ctxDone()
}

// TODO - add one scenario that triggers the bogus finished event with positive ref count
func TestPrematureExpired(t *testing.T) {
388
	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
	defer done()

	// Same model, same request
	scenario1a := newScenario(t, ctx, "ollama-model-1a", 10)
	s := InitScheduler(ctx)
	s.getGpuFn = func() gpu.GpuInfoList {
		g := gpu.GpuInfo{Library: "metal"}
		g.TotalMemory = 24 * format.GigaByte
		g.FreeMemory = 12 * format.GigaByte
		return []gpu.GpuInfo{g}
	}
	s.newServerFn = scenario1a.newServer
	successCh1a, errCh1a := s.GetRunner(scenario1a.ctx, scenario1a.req.model, scenario1a.req.opts, scenario1a.req.sessionDuration)
	require.Len(t, s.pendingReqCh, 1)
	s.Run(ctx)
	select {
	case resp := <-successCh1a:
		require.Equal(t, resp.llama, scenario1a.srv)
Michael Yang's avatar
lint  
Michael Yang committed
407
408
		require.Empty(t, s.pendingReqCh)
		require.Empty(t, errCh1a)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
409
		s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
410
		require.Len(t, s.loaded, 1)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
411
		s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
412
413
414
		slog.Info("sending premature expired event now")
		s.expiredCh <- resp // Shouldn't happen in real life, but make sure its safe
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
415
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
416
	}
417
	time.Sleep(scenario1a.req.sessionDuration.Duration)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
418
419
420
421
	scenario1a.ctxDone()
	time.Sleep(20 * time.Millisecond)
	require.LessOrEqual(t, len(s.finishedReqCh), 1)
	time.Sleep(10 * time.Millisecond)
Michael Yang's avatar
lint  
Michael Yang committed
422
	require.Empty(t, s.finishedReqCh)
423
	s.loadedMu.Lock()
Michael Yang's avatar
lint  
Michael Yang committed
424
	require.Empty(t, s.loaded)
425
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
426
427
428
429
430
431
432

	// also shouldn't happen in real life
	s.finishedReqCh <- scenario1a.req
	time.Sleep(5 * time.Millisecond)
}

func TestUseLoadedRunner(t *testing.T) {
433
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
434
435
	req := &LlmRequest{
		ctx:             ctx,
Daniel Hiltgen's avatar
Daniel Hiltgen committed
436
		opts:            api.DefaultOptions(),
Daniel Hiltgen's avatar
Daniel Hiltgen committed
437
		successCh:       make(chan *runnerRef, 1),
438
		sessionDuration: &api.Duration{Duration: 2},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
439
440
	}
	finished := make(chan *LlmRequest)
441
	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
442
	r1 := &runnerRef{llama: llm1, sessionDuration: 1, numParallel: 1}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
443
444
445
446
447
448
449
	req.useLoadedRunner(r1, finished)
	require.Equal(t, uint(1), r1.refCount)
	require.Equal(t, time.Duration(2), r1.sessionDuration)
	select {
	case success := <-req.successCh:
		require.Equal(t, r1, success)
	case <-ctx.Done():
Daniel Hiltgen's avatar
Daniel Hiltgen committed
450
		t.Fatal("timeout")
Daniel Hiltgen's avatar
Daniel Hiltgen committed
451
452
453
454
455
456
457
	}
	done()
	fin := <-finished
	require.Equal(t, req, fin)
}

func TestUpdateFreeSpace(t *testing.T) {
458
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
	defer done()
	gpus := gpu.GpuInfoList{
		{
			Library: "a",
			ID:      "1",
		},
		{
			Library: "a",
			ID:      "2",
		},
	}
	gpus[0].TotalMemory = 1000
	gpus[0].FreeMemory = 900
	gpus[1].TotalMemory = 2000
	gpus[1].FreeMemory = 1900
474
475
	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 50, "2": 50}}
	llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{"1": 125, "2": 75}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
476
477
	r1 := &runnerRef{llama: llm1, gpus: gpus, numParallel: 1}
	r2 := &runnerRef{llama: llm2, gpus: gpus, numParallel: 1}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
478
479

	s := InitScheduler(ctx)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
480
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
481
482
	s.loaded["a"] = r1
	s.loaded["b"] = r2
Daniel Hiltgen's avatar
Daniel Hiltgen committed
483
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
484
485

	s.updateFreeSpace(gpus)
486
487
	require.Equal(t, uint64(1000-50-125), gpus[0].FreeMemory)
	require.Equal(t, uint64(2000-50-75), gpus[1].FreeMemory)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
488
489
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
func TestFilterGPUsWithoutLoadingModels(t *testing.T) {
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
	defer done()
	gpus := gpu.GpuInfoList{
		{
			Library: "cuda",
			ID:      "0",
		},
		{
			Library: "cuda",
			ID:      "1",
		},
	}
	r1 := &runnerRef{gpus: gpu.GpuInfoList{gpus[0]}, loading: true}

	s := InitScheduler(ctx)
	s.loadedMu.Lock()
	s.loaded["a"] = r1
	s.loadedMu.Unlock()

	tmp := s.filterGPUsWithoutLoadingModels(gpus)
	require.Len(t, tmp, 1)
	require.Equal(t, "1", tmp[0].ID)

	r1.gpus = gpu.GpuInfoList{gpus[1]}
	tmp = s.filterGPUsWithoutLoadingModels(gpus)
	require.Len(t, tmp, 1)
	require.Equal(t, "0", tmp[0].ID)

	r1.gpus = gpu.GpuInfoList{}
	tmp = s.filterGPUsWithoutLoadingModels(gpus)
	require.Len(t, tmp, 2)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
522
523
524
}

func TestFindRunnerToUnload(t *testing.T) {
525
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
526
	defer done()
527

Daniel Hiltgen's avatar
Daniel Hiltgen committed
528
529
	r1 := &runnerRef{refCount: 1, sessionDuration: 1, numParallel: 1}
	r2 := &runnerRef{sessionDuration: 2, numParallel: 1}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
530
531

	s := InitScheduler(ctx)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
532
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
533
534
	s.loaded["a"] = r1
	s.loaded["b"] = r2
Daniel Hiltgen's avatar
Daniel Hiltgen committed
535
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
536

537
	resp := s.findRunnerToUnload()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
538
539
	require.Equal(t, r2, resp)
	r2.refCount = 1
540
	resp = s.findRunnerToUnload()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
541
542
543
544
	require.Equal(t, r1, resp)
}

func TestNeedsReload(t *testing.T) {
545
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
546
547
	defer done()

548
	llm := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
549
	do := api.DefaultOptions()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
550
	runner := &runnerRef{
Daniel Hiltgen's avatar
Daniel Hiltgen committed
551
552
553
554
555
556
557
		model: &Model{
			AdapterPaths:   []string{"adapter1"},
			ProjectorPaths: []string{"projector1"},
		},
		Options:     &do,
		llama:       llm,
		numParallel: 1,
Daniel Hiltgen's avatar
Daniel Hiltgen committed
558
559
560
561
562
563
	}
	req := &LlmRequest{
		model: &Model{
			AdapterPaths:   []string{"adapter2"},
			ProjectorPaths: []string{"projector2"},
		},
Daniel Hiltgen's avatar
Daniel Hiltgen committed
564
		opts: api.DefaultOptions(),
Daniel Hiltgen's avatar
Daniel Hiltgen committed
565
566
567
	}
	resp := runner.needsReload(ctx, req)
	require.True(t, resp)
568
	req.model.AdapterPaths = runner.model.AdapterPaths
Daniel Hiltgen's avatar
Daniel Hiltgen committed
569
570
	resp = runner.needsReload(ctx, req)
	require.True(t, resp)
571
	req.model.ProjectorPaths = runner.model.ProjectorPaths
Daniel Hiltgen's avatar
Daniel Hiltgen committed
572
573
574
575
576
577
578
579
580
581
582
583
584
	runner.loading = true
	req.opts.NumBatch = 1234
	resp = runner.needsReload(ctx, req)
	require.True(t, resp)
	req.opts.NumBatch = runner.Options.NumBatch
	llm.pingResp = fmt.Errorf("foo")
	resp = runner.needsReload(ctx, req)
	require.True(t, resp)
	llm.pingResp = nil
	resp = runner.needsReload(ctx, req)
	require.False(t, resp)
	req.opts.NumGPU = 99
	resp = runner.needsReload(ctx, req)
585
586
587
	require.True(t, resp)
	req.opts.NumGPU = -1
	resp = runner.needsReload(ctx, req)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
588
589
590
591
	require.False(t, resp)
}

func TestUnloadAllRunners(t *testing.T) {
592
	ctx, done := context.WithTimeout(context.Background(), 100*time.Millisecond)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
593
594
	defer done()

595
596
	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
	llm2 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
597
598
599
	s := InitScheduler(ctx)
	s.unloadAllRunners()

Daniel Hiltgen's avatar
Daniel Hiltgen committed
600
601
	r1 := &runnerRef{llama: llm1, numParallel: 1}
	r2 := &runnerRef{llama: llm2, numParallel: 1}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
602

Daniel Hiltgen's avatar
Daniel Hiltgen committed
603
	s.loadedMu.Lock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
604
605
	s.loaded["a"] = r1
	s.loaded["b"] = r2
Daniel Hiltgen's avatar
Daniel Hiltgen committed
606
	s.loadedMu.Unlock()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
607
608
609
610
611
612
613
	s.unloadAllRunners()

	require.True(t, llm1.closeCalled)
	require.True(t, llm2.closeCalled)
}

func TestUnload(t *testing.T) {
614
	llm1 := &mockLlm{estimatedVRAMByGPU: map[string]uint64{}}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
615
616
	r1 := &runnerRef{llama: llm1, numParallel: 1}
	r2 := &runnerRef{model: &Model{AdapterPaths: []string{"A"}}, numParallel: 1}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
617
618
619
	r1.unload()
	require.True(t, llm1.closeCalled)
	r2.unload()
620
	require.Nil(t, r2.model)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
621
622
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
623
624
625
626
627
628
func TestAlreadyCanceled(t *testing.T) {
	ctx, done := context.WithTimeout(context.Background(), 500*time.Millisecond)
	defer done()
	dctx, done2 := context.WithCancel(ctx)
	done2()
	scenario1a := newScenario(t, dctx, "ollama-model-1", 10)
629
	scenario1a.req.sessionDuration = &api.Duration{Duration: 0}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
630
631
632
633
634
635
636
637
638
639
640
	s := InitScheduler(ctx)
	slog.Info("scenario1a")
	s.pendingReqCh <- scenario1a.req
	require.Len(t, s.pendingReqCh, 1)
	s.Run(ctx)
	time.Sleep(5 * time.Millisecond)
	require.Empty(t, s.pendingReqCh)
	require.Empty(t, scenario1a.req.errCh)
	require.Empty(t, scenario1a.req.successCh)
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
641
type mockLlm struct {
642
643
644
645
646
647
648
649
650
651
652
653
654
655
	pingResp           error
	waitResp           error
	completionResp     error
	embeddingResp      []float64
	embeddingRespErr   error
	tokenizeResp       []int
	tokenizeRespErr    error
	detokenizeResp     string
	detonekizeRespErr  error
	closeResp          error
	closeCalled        bool
	estimatedVRAM      uint64
	estimatedTotal     uint64
	estimatedVRAMByGPU map[string]uint64
Daniel Hiltgen's avatar
Daniel Hiltgen committed
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
}

func (s *mockLlm) Ping(ctx context.Context) error             { return s.pingResp }
func (s *mockLlm) WaitUntilRunning(ctx context.Context) error { return s.waitResp }
func (s *mockLlm) Completion(ctx context.Context, req llm.CompletionRequest, fn func(llm.CompletionResponse)) error {
	return s.completionResp
}
func (s *mockLlm) Embedding(ctx context.Context, prompt string) ([]float64, error) {
	return s.embeddingResp, s.embeddingRespErr
}
func (s *mockLlm) Tokenize(ctx context.Context, content string) ([]int, error) {
	return s.tokenizeResp, s.tokenizeRespErr
}
func (s *mockLlm) Detokenize(ctx context.Context, tokens []int) (string, error) {
	return s.detokenizeResp, s.detonekizeRespErr
}
func (s *mockLlm) Close() error {
	s.closeCalled = true
	return s.closeResp
}
676
677
func (s *mockLlm) EstimatedVRAM() uint64                  { return s.estimatedVRAM }
func (s *mockLlm) EstimatedTotal() uint64                 { return s.estimatedTotal }
Daniel Hiltgen's avatar
Daniel Hiltgen committed
678
func (s *mockLlm) EstimatedVRAMByGPU(gpuid string) uint64 { return s.estimatedVRAMByGPU[gpuid] }