common.go 10 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
package runners

import (
	"compress/gzip"
	"errors"
	"fmt"
	"io"
	"io/fs"
	"log/slog"
	"os"
	"path/filepath"
	"runtime"
	"slices"
	"strconv"
	"strings"
	"sync"
	"syscall"

	"golang.org/x/sync/errgroup"

	"github.com/ollama/ollama/envconfig"
	"github.com/ollama/ollama/gpu"
)

const (
	binGlob = "*/*/*/*"
)

var (
	lock       sync.Mutex
	runnersDir = ""
)

// Return the location where runners are stored
// If runners are payloads, this will either extract them
// or refresh them if any have disappeared due to tmp cleaners
func Refresh(payloadFS fs.FS) (string, error) {
	lock.Lock()
	defer lock.Unlock()
	var err error

	// Wire up extra logging on our first load
	if runnersDir == "" {
		defer func() {
			var runners []string
			for v := range GetAvailableServers(runnersDir) {
				runners = append(runners, v)
			}
			slog.Info("Dynamic LLM libraries", "runners", runners)
			slog.Debug("Override detection logic by setting OLLAMA_LLM_LIBRARY")
		}()
	}

	if hasPayloads(payloadFS) {
		if runnersDir == "" {
			runnersDir, err = extractRunners(payloadFS)
		} else {
			err = refreshRunners(payloadFS, runnersDir)
		}
	} else if runnersDir == "" {
		runnersDir, err = locateRunners()
	}

	return runnersDir, err
}

func Cleanup(payloadFS fs.FS) {
	lock.Lock()
	defer lock.Unlock()
	if hasPayloads(payloadFS) && runnersDir != "" {
		// We want to fully clean up the tmpdir parent of the payloads dir
		tmpDir := filepath.Clean(filepath.Join(runnersDir, ".."))
		slog.Debug("cleaning up", "dir", tmpDir)
		err := os.RemoveAll(tmpDir)
		if err != nil {
			slog.Warn("failed to clean up", "dir", tmpDir, "err", err)
		}
	}
}

func locateRunners() (string, error) {
	exe, err := os.Executable()
	if err != nil {
		return "", err
	}

	cwd, err := os.Getwd()
	if err != nil {
		return "", err
	}

	var paths []string
	for _, root := range []string{filepath.Dir(exe), filepath.Join(filepath.Dir(exe), envconfig.LibRelativeToExe()), cwd} {
		paths = append(paths,
			root,
			filepath.Join(root, runtime.GOOS+"-"+runtime.GOARCH),
			filepath.Join(root, "dist", runtime.GOOS+"-"+runtime.GOARCH),
		)
	}

	// Try a few variations to improve developer experience when building from source in the local tree
	for _, path := range paths {
		candidate := filepath.Join(path, "lib", "ollama", "runners")
		if _, err := os.Stat(candidate); err == nil {
			return candidate, nil
		}
	}
	return "", fmt.Errorf("unable to locate runners in any search path %v", paths)
}

// Return true if we're carying nested payloads for the runners
func hasPayloads(payloadFS fs.FS) bool {
	files, err := fs.Glob(payloadFS, binGlob)
	if err != nil || len(files) == 0 || (len(files) == 1 && strings.Contains(files[0], "placeholder")) {
		return false
	}
	return true
}

func extractRunners(payloadFS fs.FS) (string, error) {
	cleanupTmpDirs()
	tmpDir, err := os.MkdirTemp(envconfig.TmpDir(), "ollama")
	if err != nil {
		return "", fmt.Errorf("failed to generate tmp dir: %w", err)
	}
	// Track our pid so we can clean up orphaned tmpdirs
	n := filepath.Join(tmpDir, "ollama.pid")
	if err := os.WriteFile(n, []byte(strconv.Itoa(os.Getpid())), 0o644); err != nil {
		slog.Warn("failed to write pid file", "file", n, "error", err)
	}
	// We create a distinct subdirectory for payloads within the tmpdir
	// This will typically look like /tmp/ollama3208993108/runners on linux
	rDir := filepath.Join(tmpDir, "runners")

	slog.Info("extracting embedded files", "dir", rDir)
	return rDir, refreshRunners(payloadFS, rDir)
}

func refreshRunners(payloadFS fs.FS, rDir string) error {
	// extract or refresh server libraries
	err := extractFiles(payloadFS, rDir, binGlob)
	if err != nil {
		return fmt.Errorf("extract binaries: %v", err)
	}
	return nil
}

// extract extracts the embedded files to the target directory
func extractFiles(payloadFS fs.FS, targetDir string, glob string) error {
	files, err := fs.Glob(payloadFS, glob)
	if err != nil || len(files) == 0 {
		// Should not happen
		return fmt.Errorf("extractFiles called without payload present")
	}

	if err := os.MkdirAll(targetDir, 0o755); err != nil {
		return fmt.Errorf("extractFiles could not mkdir %s: %v", targetDir, err)
	}

	g := new(errgroup.Group)

	// $OS/$GOARCH/$RUNNER/$FILE
	for _, file := range files {
		filename := file

		runner := filepath.Base(filepath.Dir(filename))

		slog.Debug("extracting", "runner", runner, "payload", filename)

		g.Go(func() error {
			srcf, err := payloadFS.Open(filename)
			if err != nil {
				return err
			}
			defer srcf.Close()

			src := io.Reader(srcf)
			if strings.HasSuffix(filename, ".gz") {
				src, err = gzip.NewReader(src)
				if err != nil {
					return fmt.Errorf("decompress payload %s: %v", filename, err)
				}
				filename = strings.TrimSuffix(filename, ".gz")
			}

			runnerDir := filepath.Join(targetDir, runner)
			if err := os.MkdirAll(runnerDir, 0o755); err != nil {
				return fmt.Errorf("extractFiles could not mkdir %s: %v", runnerDir, err)
			}

			base := filepath.Base(filename)
			destFilename := filepath.Join(runnerDir, base)

			_, err = os.Stat(destFilename)
			switch {
			case errors.Is(err, os.ErrNotExist):
				destFile, err := os.OpenFile(destFilename, os.O_WRONLY|os.O_CREATE|os.O_TRUNC, 0o755)
				if err != nil {
					return fmt.Errorf("write payload %s: %v", filename, err)
				}
				defer destFile.Close()
				if _, err := io.Copy(destFile, src); err != nil {
					return fmt.Errorf("copy payload %s: %v", filename, err)
				}
			case err != nil:
				return fmt.Errorf("stat payload %s: %v", filename, err)
			}
			return nil
		})
	}

	err = g.Wait()
	if err != nil {
		slog.Error("failed to extract files", "error", err)
		// If we fail to extract, the payload dir is most likely unusable, so cleanup whatever we extracted
		err := os.RemoveAll(targetDir)
		if err != nil {
			slog.Warn("failed to cleanup incomplete payload dir", "dir", targetDir, "error", err)
		}
		return err
	}
	return nil
}

// Best effort to clean up prior tmpdirs
func cleanupTmpDirs() {
	tmpDir := envconfig.TmpDir()
	if tmpDir == "" {
		tmpDir = os.TempDir()
	}
	matches, err := filepath.Glob(filepath.Join(tmpDir, "ollama*", "ollama.pid"))
	if err != nil {
		return
	}

	for _, match := range matches {
		raw, err := os.ReadFile(match)
		if errors.Is(err, os.ErrNotExist) {
			slog.Debug("not a ollama runtime directory, skipping", "path", match)
			continue
		} else if err != nil {
			slog.Warn("could not read ollama.pid, skipping", "path", match, "error", err)
			continue
		}

		pid, err := strconv.Atoi(string(raw))
		if err != nil {
			slog.Warn("invalid pid, skipping", "path", match, "error", err)
			continue
		}

		p, err := os.FindProcess(pid)
		if err == nil && !errors.Is(p.Signal(syscall.Signal(0)), os.ErrProcessDone) {
			slog.Warn("process still running, skipping", "pid", pid, "path", match)
			continue
		}

		if err := os.Remove(match); err != nil {
			slog.Warn("could not cleanup stale pidfile", "path", match, "error", err)
		}

		runners := filepath.Join(filepath.Dir(match), "runners")
		if err := os.RemoveAll(runners); err != nil {
			slog.Warn("could not cleanup stale runners", "path", runners, "error", err)
		}

		if err := os.Remove(filepath.Dir(match)); err != nil {
			slog.Warn("could not cleanup stale tmpdir", "path", filepath.Dir(match), "error", err)
		}
	}
}

// directory names are the name of the runner and may contain an optional
// variant prefixed with '_' as the separator. For example, "cuda_v11" and
// "cuda_v12" or "cpu" and "cpu_avx2". Any library without a variant is the
// lowest common denominator
func GetAvailableServers(payloadsDir string) map[string]string {
	if payloadsDir == "" {
		slog.Error("empty runner dir")
		return nil
	}

	// glob payloadsDir for files that start with ollama_
	pattern := filepath.Join(payloadsDir, "*", "ollama_*")

	files, err := filepath.Glob(pattern)
	if err != nil {
		slog.Debug("could not glob", "pattern", pattern, "error", err)
		return nil
	}

	servers := make(map[string]string)
	for _, file := range files {
		slog.Debug("availableServers : found", "file", file)
		servers[filepath.Base(filepath.Dir(file))] = filepath.Dir(file)
	}

	return servers
}

// serversForGpu returns a list of compatible servers give the provided GPU
// info, ordered by performance. assumes Init() has been called
// TODO - switch to metadata based mapping
func ServersForGpu(info gpu.GpuInfo) []string {
	// glob workDir for files that start with ollama_
	availableServers := GetAvailableServers(runnersDir)
	requested := info.Library
	if info.Variant != gpu.CPUCapabilityNone.String() {
		requested += "_" + info.Variant
	}

	servers := []string{}

	// exact match first
	for a := range availableServers {
		if a == requested {
			servers = []string{a}

			if a == "metal" {
				return servers
			}

			break
		}
	}

	alt := []string{}

	// Then for GPUs load alternates and sort the list for consistent load ordering
	if info.Library != "cpu" {
		for a := range availableServers {
			if info.Library == strings.Split(a, "_")[0] && a != requested {
				alt = append(alt, a)
			}
		}

		slices.Sort(alt)
		servers = append(servers, alt...)
	}

	if !(runtime.GOOS == "darwin" && runtime.GOARCH == "arm64") {
		// Load up the best CPU variant if not primary requested
		if info.Library != "cpu" {
			variant := gpu.GetCPUCapability()
			// If no variant, then we fall back to default
			// If we have a variant, try that if we find an exact match
			// Attempting to run the wrong CPU instructions will panic the
			// process
			if variant != gpu.CPUCapabilityNone {
				for cmp := range availableServers {
					if cmp == "cpu_"+variant.String() {
						servers = append(servers, cmp)
						break
					}
				}
			} else {
				servers = append(servers, "cpu")
			}
		}

		if len(servers) == 0 {
			servers = []string{"cpu"}
		}
	}

	return servers
}

// Return the optimal server for this CPU architecture
func ServerForCpu() string {
	if runtime.GOOS == "darwin" && runtime.GOARCH == "arm64" {
		return "metal"
	}
	variant := gpu.GetCPUCapability()
	availableServers := GetAvailableServers(runnersDir)
	if variant != gpu.CPUCapabilityNone {
		for cmp := range availableServers {
			if cmp == "cpu_"+variant.String() {
				return cmp
			}
		}
	}
	return "cpu"
}