server.go 30.6 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
package llm

import (
	"bufio"
	"bytes"
	"context"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"log"
	"log/slog"
	"math/rand"
	"net"
	"net/http"
	"os"
	"os/exec"
	"path/filepath"
	"runtime"
20
	"slices"
21
22
	"strconv"
	"strings"
23
	"sync"
24
25
	"time"

Daniel Hiltgen's avatar
Daniel Hiltgen committed
26
27
	"golang.org/x/sync/semaphore"

28
	"github.com/ollama/ollama/api"
29
	"github.com/ollama/ollama/discover"
30
	"github.com/ollama/ollama/envconfig"
31
	"github.com/ollama/ollama/format"
Michael Yang's avatar
Michael Yang committed
32
	"github.com/ollama/ollama/fs/ggml"
33
	"github.com/ollama/ollama/llama"
34
	"github.com/ollama/ollama/logutil"
35
	"github.com/ollama/ollama/model"
36
37
)

38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
type filteredEnv []string

func (e filteredEnv) LogValue() slog.Value {
	var attrs []slog.Attr
	for _, env := range e {
		if key, value, ok := strings.Cut(env, "="); ok {
			switch {
			case strings.HasPrefix(key, "OLLAMA_"),
				strings.HasPrefix(key, "CUDA_"),
				strings.HasPrefix(key, "ROCR_"),
				strings.HasPrefix(key, "ROCM_"),
				strings.HasPrefix(key, "HIP_"),
				strings.HasPrefix(key, "GPU_"),
				strings.HasPrefix(key, "HSA_"),
				strings.HasPrefix(key, "GGML_"),
				slices.Contains([]string{
					"PATH",
					"LD_LIBRARY_PATH",
					"DYLD_LIBRARY_PATH",
				}, key):
				attrs = append(attrs, slog.String(key, value))
			}
		}
	}
	return slog.GroupValue(attrs...)
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
65
66
67
68
type LlamaServer interface {
	Ping(ctx context.Context) error
	WaitUntilRunning(ctx context.Context) error
	Completion(ctx context.Context, req CompletionRequest, fn func(CompletionResponse)) error
69
	Embedding(ctx context.Context, input string) ([]float32, error)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
70
71
72
	Tokenize(ctx context.Context, content string) ([]int, error)
	Detokenize(ctx context.Context, tokens []int) (string, error)
	Close() error
73
	EstimatedVRAM() uint64 // Total VRAM across all GPUs
74
	EstimatedTotal() uint64
Daniel Hiltgen's avatar
Daniel Hiltgen committed
75
	EstimatedVRAMByGPU(gpuID string) uint64
76
	Pid() int
Daniel Hiltgen's avatar
Daniel Hiltgen committed
77
78
79
80
}

// llmServer is an instance of the llama.cpp server
type llmServer struct {
81
82
83
84
85
86
	port        int
	cmd         *exec.Cmd
	done        chan error // Channel to signal when the process exits
	status      *StatusWriter
	options     api.Options
	numParallel int
87
	modelPath   string
88
89
90
91
92
93
94
95
96

	// llamaModel is an instance of the cgo llama.cpp model definition
	// nil if this server is running the new engine
	llamaModel     *llama.Model
	llamaModelLock sync.Mutex

	// textProcessor handles text encoding/decoding for the model in the Ollama engine
	// nil if this server is running the llama.cpp based engine
	textProcessor model.TextProcessor
Daniel Hiltgen's avatar
Daniel Hiltgen committed
97

98
99
100
	estimate    MemoryEstimate
	totalLayers uint64
	// gpuCount     int
101
102
	gpus         discover.GpuInfoList // Recorded just before the model loaded, free space will be incorrect
	loadDuration time.Duration        // Record how long it took the model to load
103
	loadProgress float32
Daniel Hiltgen's avatar
Daniel Hiltgen committed
104
105

	sem *semaphore.Weighted
106
107
}

108
109
110
111
112
// LoadModel will load a model from disk. The model must be in the GGML format.
//
// It collects array values for arrays with a size less than or equal to
// maxArraySize. If maxArraySize is 0, the default value of 1024 is used. If
// the maxArraySize is negative, all arrays are collected.
Michael Yang's avatar
Michael Yang committed
113
func LoadModel(model string, maxArraySize int) (*ggml.GGML, error) {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
114
115
116
117
	if _, err := os.Stat(model); err != nil {
		return nil, err
	}

118
119
120
121
122
123
	f, err := os.Open(model)
	if err != nil {
		return nil, err
	}
	defer f.Close()

124
	ggml, err := ggml.Decode(f, maxArraySize)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
125
126
	return ggml, err
}
127

Daniel Hiltgen's avatar
Daniel Hiltgen committed
128
129
// NewLlamaServer will run a server for the given GPUs
// The gpu list must be a single family.
130
func NewLlamaServer(gpus discover.GpuInfoList, modelPath string, f *ggml.GGML, adapters, projectors []string, opts api.Options, numParallel int) (LlamaServer, error) {
131
	systemInfo := discover.GetSystemInfo()
Michael Yang's avatar
Michael Yang committed
132
133
134
	systemTotalMemory := systemInfo.System.TotalMemory
	systemFreeMemory := systemInfo.System.FreeMemory
	systemSwapFreeMemory := systemInfo.System.FreeSwap
135
	slog.Info("system memory", "total", format.HumanBytes2(systemTotalMemory), "free", format.HumanBytes2(systemFreeMemory), "free_swap", format.HumanBytes2(systemSwapFreeMemory))
136

137
138
	// If the user wants zero GPU layers, reset the gpu list to be CPU/system ram info
	if opts.NumGPU == 0 {
139
		gpus = discover.GetCPUInfo()
140
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
141

142
143
144
145
146
147
148
	// Verify the requested context size is <= the model training size
	trainCtx := f.KV().ContextLength()
	if opts.NumCtx/numParallel > int(trainCtx) && trainCtx > 0 {
		slog.Warn("requested context size too large for model", "num_ctx", opts.NumCtx, "num_parallel", numParallel, "n_ctx_train", trainCtx)
		opts.NumCtx = int(trainCtx) * numParallel
	}

149
	estimate := EstimateGPULayers(gpus, f, projectors, opts, numParallel)
Michael Yang's avatar
Michael Yang committed
150
	if len(gpus) > 1 || gpus[0].Library != "cpu" {
Michael Yang's avatar
Michael Yang committed
151
		switch {
152
		case gpus[0].Library == "metal" && estimate.VRAMSize > systemTotalMemory:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
153
154
155
			// disable partial offloading when model is greater than total system memory as this
			// can lead to locking up the system
			opts.NumGPU = 0
156
		case gpus[0].Library != "metal" && estimate.Layers == 0:
157
			// Don't bother loading into the GPU if no layers can fit
158
			gpus = discover.GetCPUInfo()
159
160
		case opts.NumGPU < 0 && estimate.Layers > 0 && gpus[0].Library != "cpu":
			opts.NumGPU = estimate.Layers
161
162
163
		}
	}

164
165
166
	// On linux and windows, over-allocating CPU memory will almost always result in an error
	// Darwin has fully dynamic swap so has no direct concept of free swap space
	if runtime.GOOS != "darwin" {
167
		systemMemoryRequired := estimate.TotalSize - estimate.VRAMSize
168
		available := systemFreeMemory + systemSwapFreeMemory
169
170
171
		if systemMemoryRequired > available {
			slog.Warn("model request too large for system", "requested", format.HumanBytes2(systemMemoryRequired), "available", available, "total", format.HumanBytes2(systemTotalMemory), "free", format.HumanBytes2(systemFreeMemory), "swap", format.HumanBytes2(systemSwapFreeMemory))
			return nil, fmt.Errorf("model requires more system memory (%s) than is available (%s)", format.HumanBytes2(systemMemoryRequired), format.HumanBytes2(available))
172
173
174
		}
	}

Michael Yang's avatar
Michael Yang committed
175
	slog.Info("offload", "", estimate)
176

177
	params := []string{
178
		"--model", modelPath,
Michael Yang's avatar
lint  
Michael Yang committed
179
180
		"--ctx-size", strconv.Itoa(opts.NumCtx),
		"--batch-size", strconv.Itoa(opts.NumBatch),
181
	}
Michael Yang's avatar
Michael Yang committed
182

Michael Yang's avatar
Michael Yang committed
183
	if opts.NumGPU >= 0 {
Michael Yang's avatar
lint  
Michael Yang committed
184
		params = append(params, "--n-gpu-layers", strconv.Itoa(opts.NumGPU))
185
186
187
	}

	if opts.MainGPU > 0 {
Michael Yang's avatar
lint  
Michael Yang committed
188
		params = append(params, "--main-gpu", strconv.Itoa(opts.MainGPU))
189
190
191
	}

	if len(adapters) > 0 {
192
193
194
		for _, adapter := range adapters {
			params = append(params, "--lora", adapter)
		}
195
196
	}

197
	defaultThreads := systemInfo.GetOptimalThreadCount()
198
	if opts.NumThread > 0 {
Michael Yang's avatar
lint  
Michael Yang committed
199
		params = append(params, "--threads", strconv.Itoa(opts.NumThread))
200
201
	} else if defaultThreads > 0 {
		params = append(params, "--threads", strconv.Itoa(defaultThreads))
202
203
	}

204
205
206
207
208
	fa := envconfig.FlashAttention()
	if fa && !gpus.FlashAttentionSupported() {
		slog.Warn("flash attention enabled but not supported by gpu")
		fa = false
	}
Sam's avatar
Sam committed
209

Michael Yang's avatar
Michael Yang committed
210
	if fa && !f.SupportsFlashAttention() {
211
212
213
214
		slog.Warn("flash attention enabled but not supported by model")
		fa = false
	}

215
	kvct := strings.ToLower(envconfig.KvCacheType())
216
217
218
219
220
221
222

	if fa {
		slog.Info("enabling flash attention")
		params = append(params, "--flash-attn")

		// Flash Attention also supports kv cache quantization
		// Enable if the requested and kv cache type is supported by the model
Michael Yang's avatar
Michael Yang committed
223
		if kvct != "" && f.SupportsKVCacheType(kvct) {
224
225
226
			params = append(params, "--kv-cache-type", kvct)
		} else {
			slog.Warn("kv cache type not supported by model", "type", kvct)
Sam's avatar
Sam committed
227
		}
228
229
230
	} else if kvct != "" && kvct != "f16" {
		slog.Warn("quantized kv cache requested but flash attention disabled", "type", kvct)
	}
231

232
233
	// mmap has issues with partial offloading on metal
	for _, g := range gpus {
234
235
		if g.Library == "metal" &&
			uint64(opts.NumGPU) > 0 &&
Michael Yang's avatar
Michael Yang committed
236
			uint64(opts.NumGPU) < f.KV().BlockCount()+1 {
237
238
			opts.UseMMap = new(bool)
			*opts.UseMMap = false
239
		}
Sam's avatar
Sam committed
240
	}
241

242
	// Windows CUDA should not use mmap for best performance
243
	// Linux  with a model larger than free space, mmap leads to thrashing
Daniel Hiltgen's avatar
Daniel Hiltgen committed
244
	// For CPU loads we want the memory to be allocated, not FS cache
245
246
247
248
	if (runtime.GOOS == "windows" && gpus[0].Library == "cuda" && opts.UseMMap == nil) ||
		(runtime.GOOS == "linux" && systemFreeMemory < estimate.TotalSize && opts.UseMMap == nil) ||
		(gpus[0].Library == "cpu" && opts.UseMMap == nil) ||
		(opts.UseMMap != nil && !*opts.UseMMap) {
249
250
251
		params = append(params, "--no-mmap")
	}

252
	// TODO - NUMA support currently doesn't work properly
253

Michael Yang's avatar
lint  
Michael Yang committed
254
	params = append(params, "--parallel", strconv.Itoa(numParallel))
Daniel Hiltgen's avatar
Daniel Hiltgen committed
255

256
257
258
259
	if estimate.TensorSplit != "" {
		params = append(params, "--tensor-split", estimate.TensorSplit)
	}

260
261
262
263
	if envconfig.MultiUserCache() {
		params = append(params, "--multiuser-cache")
	}

Michael Yang's avatar
Michael Yang committed
264
	libs := make(map[string]string)
265
266
	if entries, err := os.ReadDir(discover.LibOllamaPath); err == nil {
		for _, entry := range entries {
Michael Yang's avatar
Michael Yang committed
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
			libs[entry.Name()] = filepath.Join(discover.LibOllamaPath, entry.Name())
		}
	}

	lib := gpus[0].RunnerName()
	requested := envconfig.LLMLibrary()
	if libs[requested] != "" {
		slog.Info("using requested gpu library", "requested", requested)
		lib = requested
	}

	var compatible []string
	for k := range libs {
		// exact match first
		if k == lib {
			compatible = append([]string{k}, compatible...)
283
284
			continue
		}
285

Michael Yang's avatar
Michael Yang committed
286
287
288
		// then match the family (e.g. 'cuda')
		if strings.Split(k, "_")[0] == strings.Split(lib, "_")[0] {
			compatible = append(compatible, k)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
289
		}
Michael Yang's avatar
Michael Yang committed
290
291
	}
	slog.Debug("compatible gpu libraries", "compatible", compatible)
292
293
294
295
296
297
298
299
300
301
302
	exe, err := os.Executable()
	if err != nil {
		return nil, fmt.Errorf("unable to lookup executable path: %w", err)
	}

	if eval, err := filepath.EvalSymlinks(exe); err == nil {
		exe = eval
	}

	var llamaModel *llama.Model
	var textProcessor model.TextProcessor
303
	if envconfig.NewEngine() || f.KV().OllamaEngineRequired() {
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
		textProcessor, err = model.NewTextProcessor(modelPath)
		if err != nil {
			// To prepare for opt-out mode, instead of treating this as an error, we fallback to the old runner
			slog.Debug("model not yet supported by Ollama engine, switching to compatibility mode", "model", modelPath, "error", err)
		}
	}
	if textProcessor == nil {
		llamaModel, err = llama.LoadModelFromFile(modelPath, llama.ModelParams{VocabOnly: true})
		if err != nil {
			return nil, err
		}
	}

	if len(projectors) > 0 && llamaModel != nil {
		params = append(params, "--mmproj", projectors[0])
	}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
320

Daniel Hiltgen's avatar
Daniel Hiltgen committed
321
	// iterate through compatible GPU libraries such as 'cuda_v12', 'rocm', etc.
Michael Yang's avatar
Michael Yang committed
322
323
324
	// adding each library's respective path to the LD_LIBRARY_PATH, until finally running
	// without any LD_LIBRARY_PATH flags
	for {
325
326
327
328
329
330
331
332
333
		port := 0
		if a, err := net.ResolveTCPAddr("tcp", "localhost:0"); err == nil {
			var l *net.TCPListener
			if l, err = net.ListenTCP("tcp", a); err == nil {
				port = l.Addr().(*net.TCPAddr).Port
				l.Close()
			}
		}
		if port == 0 {
334
			slog.Debug("ResolveTCPAddr failed, using random port")
335
336
			port = rand.Intn(65535-49152) + 49152 // get a random port in the ephemeral range
		}
337
		finalParams := []string{"runner"}
338
339
340
		if textProcessor != nil {
			// New engine
			// TODO - if we have failure to load scenarios, add logic to retry with the old runner
Jesse Gross's avatar
Jesse Gross committed
341
342
			finalParams = append(finalParams, "--ollama-engine")
		}
343
344
		finalParams = append(finalParams, params...)
		finalParams = append(finalParams, "--port", strconv.Itoa(port))
345

346
347
348
		var pathEnv string
		switch runtime.GOOS {
		case "windows":
349
			pathEnv = "PATH"
350
351
352
353
		case "darwin":
			pathEnv = "DYLD_LIBRARY_PATH"
		default:
			pathEnv = "LD_LIBRARY_PATH"
354
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
355

356
357
358
		// Note: we always put our dependency paths first
		// since these are the exact version we compiled/linked against
		libraryPaths := []string{discover.LibOllamaPath}
359
		if libraryPath, ok := os.LookupEnv(pathEnv); ok {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
360
			libraryPaths = append(libraryPaths, filepath.SplitList(libraryPath)...)
361
362
		}

363
		ggmlPaths := []string{discover.LibOllamaPath}
Michael Yang's avatar
Michael Yang committed
364
365
366
367
		if len(compatible) > 0 {
			c := compatible[0]
			if libpath, ok := libs[c]; ok {
				slog.Debug("adding gpu library", "path", libpath)
368
				libraryPaths = append([]string{libpath}, libraryPaths...)
369
				ggmlPaths = append(ggmlPaths, libpath)
Michael Yang's avatar
Michael Yang committed
370
371
372
			}
		}

373
		if gpus[0].DependencyPath != nil {
Michael Yang's avatar
Michael Yang committed
374
			slog.Debug("adding gpu dependency paths", "paths", gpus[0].DependencyPath)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
375
			// assume gpus from the same library have the same dependency path
376
			libraryPaths = append(gpus[0].DependencyPath, libraryPaths...)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
377
378
		}

Michael Yang's avatar
Michael Yang committed
379
380
381
		// finally, add the root library path
		libraryPaths = append(libraryPaths, discover.LibOllamaPath)

Daniel Hiltgen's avatar
Daniel Hiltgen committed
382
		s := &llmServer{
383
384
385
386
387
388
389
390
391
392
393
394
395
			port:          port,
			cmd:           exec.Command(exe, finalParams...),
			status:        NewStatusWriter(os.Stderr),
			options:       opts,
			modelPath:     modelPath,
			llamaModel:    llamaModel,
			textProcessor: textProcessor,
			estimate:      estimate,
			numParallel:   numParallel,
			sem:           semaphore.NewWeighted(int64(numParallel)),
			totalLayers:   f.KV().BlockCount() + 1,
			gpus:          gpus,
			done:          make(chan error, 1),
396
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
397

398
		s.cmd.Env = os.Environ()
399
400
		s.cmd.Stdout = os.Stdout
		s.cmd.Stderr = s.status
401
		s.cmd.SysProcAttr = LlamaServerSysProcAttr
402

403
404
		s.cmd.Env = append(s.cmd.Env, "OLLAMA_LIBRARY_PATH="+strings.Join(ggmlPaths, string(filepath.ListSeparator)))

Daniel Hiltgen's avatar
Daniel Hiltgen committed
405
406
407
408
		envWorkarounds := [][2]string{}
		for _, gpu := range gpus {
			envWorkarounds = append(envWorkarounds, gpu.EnvWorkarounds...)
		}
Michael Yang's avatar
lint  
Michael Yang committed
409
		visibleDevicesEnv, visibleDevicesEnvVal := gpus.GetVisibleDevicesEnv()
410
411
412
413
414
415
416
417
418
419
420
421
422
		pathEnvVal := strings.Join(libraryPaths, string(filepath.ListSeparator))

		// Update or add the path and visible devices variable with our adjusted version
		pathNeeded := true
		devicesNeeded := visibleDevicesEnv != ""
		for i := range s.cmd.Env {
			cmp := strings.SplitN(s.cmd.Env[i], "=", 2)
			if strings.EqualFold(cmp[0], pathEnv) {
				s.cmd.Env[i] = pathEnv + "=" + pathEnvVal
				pathNeeded = false
			} else if devicesNeeded && strings.EqualFold(cmp[0], visibleDevicesEnv) {
				s.cmd.Env[i] = visibleDevicesEnv + "=" + visibleDevicesEnvVal
				devicesNeeded = false
Daniel Hiltgen's avatar
Daniel Hiltgen committed
423
424
425
426
427
428
			} else if len(envWorkarounds) != 0 {
				for _, kv := range envWorkarounds {
					if strings.EqualFold(cmp[0], kv[0]) {
						s.cmd.Env[i] = kv[0] + "=" + kv[1]
					}
				}
429
			}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
430
		}
431
432
		if pathNeeded {
			s.cmd.Env = append(s.cmd.Env, pathEnv+"="+pathEnvVal)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
433
		}
434
435
		if devicesNeeded {
			s.cmd.Env = append(s.cmd.Env, visibleDevicesEnv+"="+visibleDevicesEnvVal)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
436
437
		}

438
		slog.Info("starting llama server", "cmd", s.cmd)
439
		slog.Debug("subprocess", "", filteredEnv(s.cmd.Env))
440
441

		if err = s.cmd.Start(); err != nil {
Michael Yang's avatar
Michael Yang committed
442
			var msg string
443
444
445
			if s.status != nil && s.status.LastErrMsg != "" {
				msg = s.status.LastErrMsg
			}
Michael Yang's avatar
Michael Yang committed
446
447
			err := fmt.Errorf("error starting runner: %v %s", err, msg)
			if len(compatible) == 0 {
448
449
450
				if llamaModel != nil {
					llama.FreeModel(llamaModel)
				}
Michael Yang's avatar
Michael Yang committed
451
452
453
454
455
				return nil, err
			}

			slog.Warn("unable to start runner with compatible gpu", "error", err, "compatible", compatible)
			compatible = compatible[1:]
456
457
458
			continue
		}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
459
460
		// reap subprocess when it exits
		go func() {
461
462
463
			err := s.cmd.Wait()
			// Favor a more detailed message over the process exit status
			if err != nil && s.status != nil && s.status.LastErrMsg != "" {
Michael Yang's avatar
Michael Yang committed
464
				slog.Error("llama runner terminated", "error", err)
465
466
467
				if strings.Contains(s.status.LastErrMsg, "unknown model") {
					s.status.LastErrMsg = "this model is not supported by your version of Ollama. You may need to upgrade"
				}
Michael Yang's avatar
lint  
Michael Yang committed
468
				s.done <- errors.New(s.status.LastErrMsg)
469
470
471
			} else {
				s.done <- err
			}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
472
473
		}()

474
475
476
477
478
479
480
481
		return s, nil
	}
}

type ServerStatus int

const ( // iota is reset to 0
	ServerStatusReady ServerStatus = iota
482
	ServerStatusNoSlotsAvailable
483
484
485
486
487
	ServerStatusLoadingModel
	ServerStatusNotResponding
	ServerStatusError
)

488
func (s ServerStatus) String() string {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
489
490
491
	switch s {
	case ServerStatusReady:
		return "llm server ready"
492
	case ServerStatusNoSlotsAvailable:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
493
494
495
496
497
498
499
500
501
502
		return "llm busy - no slots available"
	case ServerStatusLoadingModel:
		return "llm server loading model"
	case ServerStatusNotResponding:
		return "llm server not responding"
	default:
		return "llm server error"
	}
}

503
504
505
type ServerStatusResponse struct {
	Status   ServerStatus `json:"status"`
	Progress float32      `json:"progress"`
506
507
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
508
func (s *llmServer) getServerStatus(ctx context.Context) (ServerStatus, error) {
509
510
511
512
513
514
	// Fail fast if its exited
	if s.cmd.ProcessState != nil {
		msg := ""
		if s.status != nil && s.status.LastErrMsg != "" {
			msg = s.status.LastErrMsg
		}
515
516
		if s.cmd.ProcessState.ExitCode() == -1 {
			// Most likely a signal killed it, log some more details to try to help troubleshoot
517
			slog.Warn("llama runner process no longer running", "sys", s.cmd.ProcessState.Sys(), "string", s.cmd.ProcessState)
518
		}
519
520
521
522
523
524
525
526
527
528
529
530
		return ServerStatusError, fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg)
	}

	req, err := http.NewRequestWithContext(ctx, http.MethodGet, fmt.Sprintf("http://127.0.0.1:%d/health", s.port), nil)
	if err != nil {
		return ServerStatusError, fmt.Errorf("error creating GET request: %v", err)
	}
	req.Header.Set("Content-Type", "application/json")

	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		if errors.Is(err, context.DeadlineExceeded) {
Michael Yang's avatar
Michael Yang committed
531
			return ServerStatusNotResponding, errors.New("server not responding")
532
		}
533
534
535
		if strings.Contains(err.Error(), "connection refused") {
			return ServerStatusNotResponding, errors.New("connection refused")
		}
536
537
538
539
540
541
542
543
544
		return ServerStatusError, fmt.Errorf("health resp: %w", err)
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return ServerStatusError, fmt.Errorf("read health request: %w", err)
	}

545
546
	var ssr ServerStatusResponse
	if err := json.Unmarshal(body, &ssr); err != nil {
547
548
549
		return ServerStatusError, fmt.Errorf("health unmarshal encode response: %w", err)
	}

550
551
552
553
554
555
	switch ssr.Status {
	case ServerStatusLoadingModel:
		s.loadProgress = ssr.Progress
		return ssr.Status, nil
	case ServerStatusReady, ServerStatusNoSlotsAvailable:
		return ssr.Status, nil
556
	default:
557
		return ssr.Status, fmt.Errorf("server error: %+v", ssr)
558
559
560
	}
}

561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
// getServerStatusRetry will retry if ServerStatusNoSlotsAvailable is received
func (s *llmServer) getServerStatusRetry(ctx context.Context) (ServerStatus, error) {
	var retries int
	for {
		status, err := s.getServerStatus(ctx)
		if err != nil {
			return status, err
		}

		if status == ServerStatusNoSlotsAvailable {
			if retries >= 10 {
				return status, fmt.Errorf("no slots available after %d retries", retries)
			}

			time.Sleep(5 * time.Millisecond)
			retries++
			continue
		}

		return status, nil
	}
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
584
func (s *llmServer) Ping(ctx context.Context) error {
585
586
587
588
589
590
591
592
	_, err := s.getServerStatus(ctx)
	if err != nil {
		slog.Debug("server unhealthy", "error", err)
		return err
	}
	return nil
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
593
func (s *llmServer) WaitUntilRunning(ctx context.Context) error {
594
	start := time.Now()
595
	stallDuration := envconfig.LoadTimeout()    // If no progress happens
596
	stallTimer := time.Now().Add(stallDuration) // give up if we stall
597
598
599

	slog.Info("waiting for llama runner to start responding")
	var lastStatus ServerStatus = -1
600
	fullyLoaded := false
ManniX-ITA's avatar
ManniX-ITA committed
601

602
603
	for {
		select {
Daniel Hiltgen's avatar
Daniel Hiltgen committed
604
		case <-ctx.Done():
605
			slog.Warn("client connection closed before server finished loading, aborting load")
606
			return fmt.Errorf("timed out waiting for llama runner to start: %w", ctx.Err())
607
		case err := <-s.done:
608
			return fmt.Errorf("llama runner process has terminated: %w", err)
609
610
		default:
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
611
		if time.Now().After(stallTimer) {
ManniX-ITA's avatar
ManniX-ITA committed
612
			// timeout
613
614
615
616
			msg := ""
			if s.status != nil && s.status.LastErrMsg != "" {
				msg = s.status.LastErrMsg
			}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
617
			return fmt.Errorf("timed out waiting for llama runner to start - progress %0.2f - %s", s.loadProgress, msg)
ManniX-ITA's avatar
ManniX-ITA committed
618
619
620
621
622
		}
		if s.cmd.ProcessState != nil {
			msg := ""
			if s.status != nil && s.status.LastErrMsg != "" {
				msg = s.status.LastErrMsg
623
			}
ManniX-ITA's avatar
ManniX-ITA committed
624
625
			return fmt.Errorf("llama runner process no longer running: %d %s", s.cmd.ProcessState.ExitCode(), msg)
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
626
627
		ctx, cancel := context.WithTimeout(ctx, 200*time.Millisecond)
		defer cancel()
Daniel Hiltgen's avatar
Daniel Hiltgen committed
628
		priorProgress := s.loadProgress
Daniel Hiltgen's avatar
Daniel Hiltgen committed
629
630
631
		status, _ := s.getServerStatus(ctx)
		if lastStatus != status && status != ServerStatusReady {
			// Only log on status changes
632
			slog.Info("waiting for server to become available", "status", status)
Daniel Hiltgen's avatar
Daniel Hiltgen committed
633
		}
ManniX-ITA's avatar
ManniX-ITA committed
634
635
		switch status {
		case ServerStatusReady:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
636
637
			s.loadDuration = time.Since(start)
			slog.Info(fmt.Sprintf("llama runner started in %0.2f seconds", s.loadDuration.Seconds()))
ManniX-ITA's avatar
ManniX-ITA committed
638
639
			return nil
		default:
Daniel Hiltgen's avatar
Daniel Hiltgen committed
640
			lastStatus = status
Daniel Hiltgen's avatar
Daniel Hiltgen committed
641
642
643
644
			// Reset the timer as long as we're making forward progress on the load
			if priorProgress != s.loadProgress {
				slog.Debug(fmt.Sprintf("model load progress %0.2f", s.loadProgress))
				stallTimer = time.Now().Add(stallDuration)
645
			} else if !fullyLoaded && int(s.loadProgress*100.0) >= 100 {
646
				slog.Debug("model load completed, waiting for server to become available", "status", status)
647
				stallTimer = time.Now().Add(stallDuration)
648
				fullyLoaded = true
Daniel Hiltgen's avatar
Daniel Hiltgen committed
649
			}
ManniX-ITA's avatar
ManniX-ITA committed
650
651
			time.Sleep(time.Millisecond * 250)
			continue
652
653
654
655
		}
	}
}

656
657
658
659
660
661
662
func (s *llmServer) Pid() int {
	if s.cmd != nil && s.cmd.Process != nil {
		return s.cmd.Process.Pid
	}
	return -1
}

663
var grammarJSON = `
664
665
666
667
root   ::= object
value  ::= object | array | string | number | ("true" | "false" | "null") ws
object ::=
  "{" ws (
668
         string ":" ws value
669
    ("," ws string ":" ws value)*
670
  )? ws "}" 
671
672
673
674
array  ::=
  "[" ws (
            value
    ("," ws value)*
675
  )? ws "]" 
676
677
string ::=
  "\"" (
678
    [^"\\\x7F\x00-\x1F] |
679
    "\\" (["\\/bfnrt] | "u" [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F] [0-9a-fA-F]) # escapes
680
681
  )* "\"" 
number ::= ("-"? ([0-9] | [1-9] [0-9]*)) ("." [0-9]+)? ([eE] [-+]? [0-9]+)? 
682
683
684
685
686
687
688
# Optional space: by convention, applied in this grammar after literal chars when allowed
ws ::= ([ \t\n] ws)?
`

const maxBufferSize = 512 * format.KiloByte

type ImageData struct {
689
690
	Data []byte `json:"data"`
	ID   int    `json:"id"`
691
692
693
694
}

type CompletionRequest struct {
	Prompt  string
695
	Format  json.RawMessage
696
	Images  []ImageData
Michael Yang's avatar
Michael Yang committed
697
	Options *api.Options
698
699

	Grammar string // set before sending the request to the subprocess
700
701
}

702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
// DoneReason represents the reason why a completion response is done
type DoneReason int

const (
	// DoneReasonStop indicates the completion stopped naturally
	DoneReasonStop DoneReason = iota
	// DoneReasonLength indicates the completion stopped due to length limits
	DoneReasonLength
	// DoneReasonConnectionClosed indicates the completion stopped due to the connection being closed
	DoneReasonConnectionClosed
)

func (d DoneReason) String() string {
	switch d {
	case DoneReasonLength:
		return "length"
	case DoneReasonStop:
		return "stop"
	default:
		return "" // closed
	}
}

725
type CompletionResponse struct {
726
	Content            string        `json:"content"`
727
	DoneReason         DoneReason    `json:"done_reason"`
728
729
730
731
732
	Done               bool          `json:"done"`
	PromptEvalCount    int           `json:"prompt_eval_count"`
	PromptEvalDuration time.Duration `json:"prompt_eval_duration"`
	EvalCount          int           `json:"eval_count"`
	EvalDuration       time.Duration `json:"eval_duration"`
733
734
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
735
func (s *llmServer) Completion(ctx context.Context, req CompletionRequest, fn func(CompletionResponse)) error {
736
737
738
	slog.Debug("completion request", "images", len(req.Images), "prompt", len(req.Prompt), "format", string(req.Format))
	slog.Log(ctx, logutil.LevelTrace, "completion request", "prompt", req.Prompt)

739
	if len(req.Format) > 0 {
740
741
742
743
744
745
		switch string(req.Format) {
		case `null`, `""`:
			// Field was set, but "missing" a value. We accept
			// these as "not set".
			break
		case `"json"`:
746
			req.Grammar = grammarJSON
747
748
749
750
		default:
			if req.Format[0] != '{' {
				return fmt.Errorf("invalid format: %q; expected \"json\" or a valid JSON Schema object", req.Format)
			}
751

752
753
754
755
			// User provided a JSON schema
			g := llama.SchemaToGrammar(req.Format)
			if g == nil {
				return fmt.Errorf("invalid JSON schema in format")
756
			}
757
			req.Grammar = string(g)
758
759
760
		}
	}

761
762
763
764
765
	if req.Options == nil {
		opts := api.DefaultOptions()
		req.Options = &opts
	}

766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
	if err := s.sem.Acquire(ctx, 1); err != nil {
		if errors.Is(err, context.Canceled) {
			slog.Info("aborting completion request due to client closing the connection")
		} else {
			slog.Error("Failed to acquire semaphore", "error", err)
		}
		return err
	}
	defer s.sem.Release(1)

	// put an upper limit on num_predict to avoid the model running on forever
	if req.Options.NumPredict < 0 || req.Options.NumPredict > 10*s.options.NumCtx {
		req.Options.NumPredict = 10 * s.options.NumCtx
	}

781
	// Make sure the server is ready
782
	status, err := s.getServerStatusRetry(ctx)
783
784
785
	if err != nil {
		return err
	} else if status != ServerStatusReady {
786
		return fmt.Errorf("unexpected server status: %s", status)
787
788
	}

789
790
791
792
	// Handling JSON marshaling with special characters unescaped.
	buffer := &bytes.Buffer{}
	enc := json.NewEncoder(buffer)
	enc.SetEscapeHTML(false)
793

794
	if err := enc.Encode(req); err != nil {
795
796
		return fmt.Errorf("failed to marshal data: %v", err)
	}
797

798
799
800
801
802
803
	endpoint := fmt.Sprintf("http://127.0.0.1:%d/completion", s.port)
	serverReq, err := http.NewRequestWithContext(ctx, http.MethodPost, endpoint, buffer)
	if err != nil {
		return fmt.Errorf("error creating POST request: %v", err)
	}
	serverReq.Header.Set("Content-Type", "application/json")
804

805
806
	res, err := http.DefaultClient.Do(serverReq)
	if err != nil {
807
808
		slog.Error("post predict", "error", err)
		return errors.New("model runner has unexpectedly stopped, this may be due to resource limitations or an internal error, check ollama server logs for details")
809
810
	}
	defer res.Body.Close()
811

812
813
	if res.StatusCode >= 400 {
		bodyBytes, err := io.ReadAll(res.Body)
814
		if err != nil {
815
			return fmt.Errorf("failed reading llm error response: %w", err)
816
		}
817
818
819
		log.Printf("llm predict error: %s", bodyBytes)
		return fmt.Errorf("%s", bodyBytes)
	}
820

821
822
823
	scanner := bufio.NewScanner(res.Body)
	buf := make([]byte, 0, maxBufferSize)
	scanner.Buffer(buf, maxBufferSize)
824

825
826
827
	// keep track of the last token generated, this is used to abort if the model starts looping
	var lastToken string
	var tokenRepeat int
828

829
830
831
832
833
834
835
836
837
838
	for scanner.Scan() {
		select {
		case <-ctx.Done():
			// This handles the request cancellation
			return ctx.Err()
		default:
			line := scanner.Bytes()
			if len(line) == 0 {
				continue
			}
839

840
841
			evt, ok := bytes.CutPrefix(line, []byte("data: "))
			if !ok {
842
				evt = line
843
			}
844

845
			var c CompletionResponse
846
			if err := json.Unmarshal(evt, &c); err != nil {
847
				return fmt.Errorf("error unmarshalling llm prediction response: %v", err)
848
849
850
851
852
853
854
855
			}
			switch {
			case strings.TrimSpace(c.Content) == lastToken:
				tokenRepeat++
			default:
				lastToken = strings.TrimSpace(c.Content)
				tokenRepeat = 0
			}
856

857
858
859
860
861
			// 30 picked as an arbitrary max token repeat limit, modify as needed
			if tokenRepeat > 30 {
				slog.Debug("prediction aborted, token repeat limit reached")
				return ctx.Err()
			}
862

863
864
865
866
867
			if c.Content != "" {
				fn(CompletionResponse{
					Content: c.Content,
				})
			}
868

869
870
			if c.Done {
				fn(c)
871
				return nil
872
873
			}
		}
874
	}
875

876
	if err := scanner.Err(); err != nil {
877
		if strings.Contains(err.Error(), "unexpected EOF") || strings.Contains(err.Error(), "forcibly closed") {
878
			s.Close()
879
			var msg string
880
881
			if s.status != nil && s.status.LastErrMsg != "" {
				msg = s.status.LastErrMsg
882
883
			} else {
				msg = err.Error()
884
			}
885
			return fmt.Errorf("an error was encountered while running the model: %s", msg)
886
887
		}

888
		return fmt.Errorf("error reading llm response: %v", err)
889
890
	}

891
	return nil
892
893
}

894
895
type EmbeddingRequest struct {
	Content string `json:"content"`
896
897
}

898
899
type EmbeddingResponse struct {
	Embedding []float32 `json:"embedding"`
900
901
}

902
func (s *llmServer) Embedding(ctx context.Context, input string) ([]float32, error) {
903
904
	slog.Log(ctx, logutil.LevelTrace, "embedding request", "input", input)

905
	if err := s.sem.Acquire(ctx, 1); err != nil {
906
907
908
909
910
		if errors.Is(err, context.Canceled) {
			slog.Info("aborting embedding request due to client closing the connection")
		} else {
			slog.Error("Failed to acquire semaphore", "error", err)
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
911
912
		return nil, err
	}
913
	defer s.sem.Release(1)
914

915
	// Make sure the server is ready
916
	status, err := s.getServerStatusRetry(ctx)
917
918
919
	if err != nil {
		return nil, err
	} else if status != ServerStatusReady {
920
		return nil, fmt.Errorf("unexpected server status: %s", status)
921
922
	}

923
	data, err := json.Marshal(EmbeddingRequest{Content: input})
Michael Yang's avatar
Michael Yang committed
924
	if err != nil {
925
926
927
		return nil, fmt.Errorf("error marshaling embed data: %w", err)
	}

928
	r, err := http.NewRequestWithContext(ctx, http.MethodPost, fmt.Sprintf("http://127.0.0.1:%d/embedding", s.port), bytes.NewBuffer(data))
929
930
931
	if err != nil {
		return nil, fmt.Errorf("error creating embed request: %w", err)
	}
932
	r.Header.Set("Content-Type", "application/json")
933

934
	resp, err := http.DefaultClient.Do(r)
935
936
937
938
939
940
941
942
943
944
945
	if err != nil {
		return nil, fmt.Errorf("do embedding request: %w", err)
	}
	defer resp.Body.Close()

	body, err := io.ReadAll(resp.Body)
	if err != nil {
		return nil, fmt.Errorf("error reading embed response: %w", err)
	}

	if resp.StatusCode >= 400 {
946
		log.Printf("llm embedding error: %s", body)
947
948
949
		return nil, fmt.Errorf("%s", body)
	}

950
	var e EmbeddingResponse
951
	if err := json.Unmarshal(body, &e); err != nil {
952
953
954
		return nil, fmt.Errorf("unmarshal tokenize response: %w", err)
	}

955
	return e.Embedding, nil
956
957
}

Michael Yang's avatar
Michael Yang committed
958
959
960
961
962
963
964
965
type TokenizeRequest struct {
	Content string `json:"content"`
}

type TokenizeResponse struct {
	Tokens []int `json:"tokens"`
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
966
func (s *llmServer) Tokenize(ctx context.Context, content string) ([]int, error) {
967
968
	s.llamaModelLock.Lock()
	defer s.llamaModelLock.Unlock()
969

970
971
	if s.llamaModel != nil {
		return s.llamaModel.Tokenize(content, false, true)
Michael Yang's avatar
Michael Yang committed
972
	}
973
	if s.textProcessor != nil {
974
		tokens, err := s.textProcessor.Encode(content, false)
975
976
		if err != nil {
			return nil, err
977
		}
978
979
980
981
982
		toks := make([]int, len(tokens))
		for i, t := range tokens {
			toks[i] = int(t)
		}
		return toks, nil
Michael Yang's avatar
Michael Yang committed
983
	}
984
985
	// not reached
	return nil, fmt.Errorf("no tokenizer configured")
Michael Yang's avatar
Michael Yang committed
986
987
988
989
990
991
992
993
}

type DetokenizeRequest struct {
	Tokens []int `json:"tokens"`
}

type DetokenizeResponse struct {
	Content string `json:"content"`
994
995
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
996
func (s *llmServer) Detokenize(ctx context.Context, tokens []int) (string, error) {
997
998
999
1000
	s.llamaModelLock.Lock()
	defer s.llamaModelLock.Unlock()

	if s.llamaModel != nil {
1001
1002
		var resp string
		for _, token := range tokens {
1003
			resp += s.llamaModel.TokenToPiece(token)
1004
1005
1006
		}
		return resp, nil
	}
1007
1008
1009
1010
	if s.textProcessor != nil {
		toks := make([]int32, len(tokens))
		for i, t := range tokens {
			toks[i] = int32(t)
1011
		}
1012
1013
1014
		content, err := s.textProcessor.Decode(toks)
		if err != nil {
			return "", err
1015
		}
1016
		return content, nil
Michael Yang's avatar
Michael Yang committed
1017
	}
1018
1019
	// not reached
	return "", fmt.Errorf("no tokenizer configured")
1020
1021
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
1022
func (s *llmServer) Close() error {
1023
1024
1025
1026
	s.llamaModelLock.Lock()
	if s.llamaModel != nil {
		llama.FreeModel(s.llamaModel)
		s.llamaModel = nil
1027
	}
1028
	s.llamaModelLock.Unlock()
1029

1030
	if s.cmd != nil {
1031
		slog.Debug("stopping llama server", "pid", s.Pid())
1032
1033
1034
		if err := s.cmd.Process.Kill(); err != nil {
			return err
		}
Daniel Hiltgen's avatar
Daniel Hiltgen committed
1035
1036
		// if ProcessState is already populated, Wait already completed, no need to wait again
		if s.cmd.ProcessState == nil {
1037
			slog.Debug("waiting for llama server to exit", "pid", s.Pid())
Daniel Hiltgen's avatar
Daniel Hiltgen committed
1038
1039
			<-s.done
		}
1040

1041
		slog.Debug("llama server stopped", "pid", s.Pid())
1042
1043
1044
1045
1046
	}

	return nil
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
1047
func (s *llmServer) EstimatedVRAM() uint64 {
1048
	return s.estimate.VRAMSize
Daniel Hiltgen's avatar
Daniel Hiltgen committed
1049
1050
}

1051
func (s *llmServer) EstimatedTotal() uint64 {
1052
	return s.estimate.TotalSize
1053
1054
}

Daniel Hiltgen's avatar
Daniel Hiltgen committed
1055
func (s *llmServer) EstimatedVRAMByGPU(gpuID string) uint64 {
1056
1057
	for i, gpu := range s.gpus {
		if gpu.ID == gpuID {
1058
1059
1060
			if i < len(s.estimate.GPUSizes) {
				return s.estimate.GPUSizes[i]
			}
1061
1062
1063
1064
		}
	}
	return 0
}