concurrency_test.go 5.69 KB
Newer Older
Daniel Hiltgen's avatar
Daniel Hiltgen committed
1
2
3
4
5
6
//go:build integration

package integration

import (
	"context"
7
	"fmt"
Daniel Hiltgen's avatar
Daniel Hiltgen committed
8
	"log/slog"
9
	"math"
10
	"math/rand"
Michael Yang's avatar
Michael Yang committed
11
	"os"
Daniel Hiltgen's avatar
Daniel Hiltgen committed
12
13
14
15
16
17
	"strconv"
	"sync"
	"testing"
	"time"

	"github.com/stretchr/testify/require"
Michael Yang's avatar
uint64  
Michael Yang committed
18
19

	"github.com/ollama/ollama/api"
20
	"github.com/ollama/ollama/envconfig"
Michael Yang's avatar
uint64  
Michael Yang committed
21
	"github.com/ollama/ollama/format"
Daniel Hiltgen's avatar
Daniel Hiltgen committed
22
23
)

24
25
26
// Send multiple requests in parallel (concurrently) to a single model and ensure responses are expected
func TestConcurrentGenerate(t *testing.T) {
	// Assumes all requests have the same model
Daniel Hiltgen's avatar
Daniel Hiltgen committed
27
	req, resp := GenerateRequests()
28
29
	numParallel := int(envconfig.NumParallel() + 1)
	iterLimit := 3
Daniel Hiltgen's avatar
Daniel Hiltgen committed
30

31
32
	softTimeout, hardTimeout := getTimeouts(t)
	ctx, cancel := context.WithTimeout(context.Background(), hardTimeout)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
33
34
35
36
37
	defer cancel()
	client, _, cleanup := InitServerConnection(ctx, t)
	defer cleanup()

	// Get the server running (if applicable) warm the model up with a single initial request
38
39
40
41
42
43
44
45
	slog.Info("loading", "model", req[0].Model)
	err := client.Generate(ctx,
		&api.GenerateRequest{Model: req[0].Model, KeepAlive: &api.Duration{Duration: 10 * time.Second}},
		func(response api.GenerateResponse) error { return nil },
	)
	if err != nil {
		t.Fatalf("failed to load model %s: %s", req[0].Model, err)
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
46
47

	var wg sync.WaitGroup
48
49
50
	r := rand.New(rand.NewSource(0))
	wg.Add(numParallel)
	for i := range numParallel {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
51
52
		go func(i int) {
			defer wg.Done()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
53
			for j := 0; j < iterLimit; j++ {
54
55
56
57
58
59
				if time.Now().Sub(started) > softTimeout {
					slog.Info("exceeded soft timeout, winding down test")
					return
				}
				k := r.Int() % len(req)
				slog.Info("Starting", "thread", i, "iter", j)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
60
				// On slower GPUs it can take a while to process the concurrent requests
Daniel Hiltgen's avatar
Daniel Hiltgen committed
61
				// so we allow a much longer initial timeout
62
				DoGenerate(ctx, t, client, req[k], resp[k], 120*time.Second, 20*time.Second)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
63
64
65
66
67
68
			}
		}(i)
	}
	wg.Wait()
}

69
70
// Stress the scheduler and attempt to load more models than will fit to cause thrashing
// This test will always load at least 2 models even on CPU based systems
Daniel Hiltgen's avatar
Daniel Hiltgen committed
71
func TestMultiModelStress(t *testing.T) {
72
	s := os.Getenv("OLLAMA_MAX_VRAM")
Michael Yang's avatar
uint64  
Michael Yang committed
73
	if s == "" {
74
		s = "0"
Daniel Hiltgen's avatar
Daniel Hiltgen committed
75
	}
Michael Yang's avatar
uint64  
Michael Yang committed
76
77
78
79
80

	maxVram, err := strconv.ParseUint(s, 10, 64)
	if err != nil {
		t.Fatal(err)
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
81

82
83
84
85
86
87
	smallModels := []string{
		"llama3.2:1b",
		"qwen3:0.6b",
		"gemma:2b",
		"deepseek-r1:1.5b",
		"starcoder2:3b",
Daniel Hiltgen's avatar
Daniel Hiltgen committed
88
	}
89
90
91
92
93
94
95
96
	mediumModels := []string{
		"qwen3:8b",
		"llama2",
		"deepseek-r1:7b",
		"mistral",
		"dolphin-mistral",
		"gemma:7b",
		"codellama:7b",
Daniel Hiltgen's avatar
Daniel Hiltgen committed
97
98
	}

99
	var chosenModels []string
Daniel Hiltgen's avatar
Daniel Hiltgen committed
100
	switch {
Michael Yang's avatar
uint64  
Michael Yang committed
101
	case maxVram < 10000*format.MebiByte:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
102
103
104
105
106
107
108
		slog.Info("selecting small models")
		chosenModels = smallModels
	default:
		slog.Info("selecting medium models")
		chosenModels = mediumModels
	}

109
110
	softTimeout, hardTimeout := getTimeouts(t)
	ctx, cancel := context.WithTimeout(context.Background(), hardTimeout)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
111
112
113
114
115
	defer cancel()
	client, _, cleanup := InitServerConnection(ctx, t)
	defer cleanup()

	// Make sure all the models are pulled before we get started
116
117
	for _, model := range chosenModels {
		require.NoError(t, PullIfMissing(ctx, client, model))
Daniel Hiltgen's avatar
Daniel Hiltgen committed
118
119
	}

120
121
122
123
124
125
126
127
128
129
	// Determine how many models we can load in parallel before we exceed VRAM
	// The intent is to go 1 over what can fit so we force the scheduler to thrash
	targetLoadCount := 0
	slog.Info("Loading models to find how many can fit in VRAM before overflowing")
	for i, model := range chosenModels {
		req := &api.GenerateRequest{Model: model}
		slog.Info("loading", "model", model)
		err = client.Generate(ctx, req, func(response api.GenerateResponse) error { return nil })
		if err != nil {
			t.Fatalf("failed to load model %s: %s", model, err)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
130
		}
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
		targetLoadCount++
		if i > 0 {
			models, err := client.ListRunning(ctx)
			if err != nil {
				t.Fatalf("failed to list running models: %s", err)
			}
			if len(models.Models) < targetLoadCount {
				loaded := []string{}
				for _, m := range models.Models {
					loaded = append(loaded, m.Name)
				}
				slog.Info("found model load capacity", "target", targetLoadCount, "current", loaded, "chosen", chosenModels[:targetLoadCount])
				break
			}
		}
	}
	if targetLoadCount == len(chosenModels) {
		// TODO consider retrying the medium models
		slog.Warn("all models being used without exceeding VRAM, set OLLAMA_MAX_VRAM so test can pick larger models")
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
151

152
153
154
	r := rand.New(rand.NewSource(0))
	var wg sync.WaitGroup
	for i := range targetLoadCount {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
155
156
157
		wg.Add(1)
		go func(i int) {
			defer wg.Done()
158
			reqs, resps := GenerateRequests()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
159
			for j := 0; j < 3; j++ {
160
161
162
163
164
165
166
167
168
169
170
				if time.Now().Sub(started) > softTimeout {
					slog.Info("exceeded soft timeout, winding down test")
					return
				}
				k := r.Int() % len(reqs)
				reqs[k].Model = chosenModels[i]
				slog.Info("Starting", "model", reqs[k].Model, "iteration", j, "request", reqs[k].Prompt)
				DoGenerate(ctx, t, client, reqs[k], resps[k],
					120*time.Second, // Be extra patient for the model to load initially
					10*time.Second,  // Once results start streaming, fail if they stall
				)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
171
172
173
			}
		}(i)
	}
174
175
	go func() {
		for {
176
			time.Sleep(10 * time.Second)
177
178
179
180
181
182
183
184
185
186
			select {
			case <-ctx.Done():
				return
			default:
				models, err := client.ListRunning(ctx)
				if err != nil {
					slog.Warn("failed to list running models", "error", err)
					continue
				}
				for _, m := range models.Models {
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
					var procStr string
					switch {
					case m.SizeVRAM == 0:
						procStr = "100% CPU"
					case m.SizeVRAM == m.Size:
						procStr = "100% GPU"
					case m.SizeVRAM > m.Size || m.Size == 0:
						procStr = "Unknown"
					default:
						sizeCPU := m.Size - m.SizeVRAM
						cpuPercent := math.Round(float64(sizeCPU) / float64(m.Size) * 100)
						procStr = fmt.Sprintf("%d%%/%d%%", int(cpuPercent), int(100-cpuPercent))
					}

					slog.Info("loaded model snapshot", "model", m.Name, "CPU/GPU", procStr, "expires", format.HumanTime(m.ExpiresAt, "Never"))
202
203
204
205
				}
			}
		}
	}()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
206
207
	wg.Wait()
}