download.go 11.5 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
	"errors"
	"fmt"
	"io"
9
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
11
	"math/rand/v2"
12
	"net/http"
Michael Yang's avatar
Michael Yang committed
13
	"net/url"
14
	"os"
Michael Yang's avatar
Michael Yang committed
15
	"path/filepath"
16
	"strconv"
Michael Yang's avatar
Michael Yang committed
17
	"strings"
18
19
	"sync"
	"sync/atomic"
20
	"syscall"
21
	"time"
22

Michael Yang's avatar
Michael Yang committed
23
	"golang.org/x/sync/errgroup"
24

25
26
	"github.com/ollama/ollama/api"
	"github.com/ollama/ollama/format"
27
28
)

29
30
const maxRetries = 6

Michael Yang's avatar
lint  
Michael Yang committed
31
var (
CYJiang's avatar
CYJiang committed
32
33
34
	errMaxRetriesExceeded   = errors.New("max retries exceeded")
	errPartStalled          = errors.New("part stalled")
	errMaxRedirectsExceeded = errors.New("maximum redirects exceeded (10) for directURL")
Michael Yang's avatar
lint  
Michael Yang committed
35
)
36

37
var blobDownloadManager sync.Map
38

39
40
41
type blobDownload struct {
	Name   string
	Digest string
42

43
44
	Total     int64
	Completed atomic.Int64
45

46
	Parts []*blobDownloadPart
47

48
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
49

50
	done       chan struct{}
Michael Yang's avatar
Michael Yang committed
51
	err        error
Michael Yang's avatar
Michael Yang committed
52
	references atomic.Int32
53
}
54

55
type blobDownloadPart struct {
56
57
58
59
60
61
62
	N         int
	Offset    int64
	Size      int64
	Completed atomic.Int64

	lastUpdatedMu sync.Mutex
	lastUpdated   time.Time
Michael Yang's avatar
Michael Yang committed
63
64
65
66

	*blobDownload `json:"-"`
}

67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
type jsonBlobDownloadPart struct {
	N         int
	Offset    int64
	Size      int64
	Completed int64
}

func (p *blobDownloadPart) MarshalJSON() ([]byte, error) {
	return json.Marshal(jsonBlobDownloadPart{
		N:         p.N,
		Offset:    p.Offset,
		Size:      p.Size,
		Completed: p.Completed.Load(),
	})
}

func (p *blobDownloadPart) UnmarshalJSON(b []byte) error {
	var j jsonBlobDownloadPart
	if err := json.Unmarshal(b, &j); err != nil {
		return err
	}
	*p = blobDownloadPart{
		N:      j.N,
		Offset: j.Offset,
		Size:   j.Size,
	}
	p.Completed.Store(j.Completed)
	return nil
}

97
const (
98
	numDownloadParts          = 16
Michael Yang's avatar
Michael Yang committed
99
100
	minDownloadPartSize int64 = 100 * format.MegaByte
	maxDownloadPartSize int64 = 1000 * format.MegaByte
101
102
)

Michael Yang's avatar
Michael Yang committed
103
104
105
106
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
107
}
108

109
func (p *blobDownloadPart) StartsAt() int64 {
110
	return p.Offset + p.Completed.Load()
111
112
113
114
115
116
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

117
118
119
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.blobDownload.Completed.Add(int64(n))
120
	p.lastUpdatedMu.Lock()
121
	p.lastUpdated = time.Now()
122
	p.lastUpdatedMu.Unlock()
123
124
125
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
126
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
127
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
128
	if err != nil {
129
		return err
130
131
	}

132
133
	b.done = make(chan struct{})

Michael Yang's avatar
Michael Yang committed
134
	for _, partFilePath := range partFilePaths {
135
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
136
137
		if err != nil {
			return err
138
139
		}

140
		b.Total += part.Size
141
		b.Completed.Add(part.Completed.Load())
142
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
143
	}
144

145
	if len(b.Parts) == 0 {
Michael Yang's avatar
Michael Yang committed
146
		resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
147
		if err != nil {
Michael Yang's avatar
Michael Yang committed
148
149
150
151
			return err
		}
		defer resp.Body.Close()

152
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
153

Michael Yang's avatar
Michael Yang committed
154
		size := b.Total / numDownloadParts
155
156
157
158
159
160
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
161

162
		var offset int64
163
164
165
166
167
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
168
			if err := b.newPart(offset, size); err != nil {
169
				return err
Michael Yang's avatar
Michael Yang committed
170
171
172
			}

			offset += size
173
		}
174
175
176
	}

	if len(b.Parts) > 0 {
177
		slog.Info(fmt.Sprintf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
178
	}
179

180
181
182
	return nil
}

Michael Yang's avatar
Michael Yang committed
183
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
184
	defer close(b.done)
185
186
187
	b.err = b.run(ctx, requestURL, opts)
}

188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
func newBackoff(maxBackoff time.Duration) func(ctx context.Context) error {
	var n int
	return func(ctx context.Context) error {
		if ctx.Err() != nil {
			return ctx.Err()
		}

		n++

		// n^2 backoff timer is a little smoother than the
		// common choice of 2^n.
		d := min(time.Duration(n*n)*10*time.Millisecond, maxBackoff)
		// Randomize the delay between 0.5-1.5 x msec, in order
		// to prevent accidental "thundering herd" problems.
		d = time.Duration(float64(d) * (rand.Float64() + 0.5))
		t := time.NewTimer(d)
		defer t.Stop()
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-t.C:
			return nil
		}
	}
}

214
func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
215
216
217
	defer blobDownloadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

Michael Yang's avatar
Michael Yang committed
218
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0o644)
219
	if err != nil {
220
		return err
Michael Yang's avatar
Michael Yang committed
221
	}
222
	defer file.Close()
223
	setSparse(file)
224

Michael Yang's avatar
Michael Yang committed
225
	_ = file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
226

227
228
229
230
231
232
233
234
235
236
237
238
239
	directURL, err := func() (*url.URL, error) {
		ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
		defer cancel()

		backoff := newBackoff(10 * time.Second)
		for {
			// shallow clone opts to be used in the closure
			// without affecting the outer opts.
			newOpts := new(registryOptions)
			*newOpts = *opts

			newOpts.CheckRedirect = func(req *http.Request, via []*http.Request) error {
				if len(via) > 10 {
CYJiang's avatar
CYJiang committed
240
					return errMaxRedirectsExceeded
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
				}

				// if the hostname is the same, allow the redirect
				if req.URL.Hostname() == requestURL.Hostname() {
					return nil
				}

				// stop at the first redirect that is not
				// the same hostname as the original
				// request.
				return http.ErrUseLastResponse
			}

			resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, nil, nil, newOpts)
			if err != nil {
				slog.Warn("failed to get direct URL; backing off and retrying", "err", err)
				if err := backoff(ctx); err != nil {
					return nil, err
				}
				continue
			}
			defer resp.Body.Close()
263
			if resp.StatusCode != http.StatusTemporaryRedirect && resp.StatusCode != http.StatusOK {
264
265
266
267
268
269
270
271
272
				return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode)
			}
			return resp.Location()
		}
	}()
	if err != nil {
		return err
	}

273
274
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numDownloadParts)
275
276
	for i := range b.Parts {
		part := b.Parts[i]
277
		if part.Completed.Load() == part.Size {
Michael Yang's avatar
Michael Yang committed
278
279
			continue
		}
280

281
		g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
282
			var err error
Michael Yang's avatar
Michael Yang committed
283
			for try := 0; try < maxRetries; try++ {
284
				w := io.NewOffsetWriter(file, part.StartsAt())
285
				err = b.downloadChunk(inner, directURL, w, part)
286
				switch {
287
288
				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
					// return immediately if the context is canceled or the device is out of space
289
					return err
290
291
292
				case errors.Is(err, errPartStalled):
					try--
					continue
293
				case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
294
					sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
295
					slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
296
					time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
297
					continue
298
299
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
300
301
302
				}
			}

Michael Yang's avatar
Michael Yang committed
303
			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
304
		})
305
306
	}

Michael Yang's avatar
Michael Yang committed
307
	if err := g.Wait(); err != nil {
308
		return err
309
310
	}

311
312
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
313
		return err
Michael Yang's avatar
Michael Yang committed
314
315
	}

316
	for i := range b.Parts {
317
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
318
			return err
Michael Yang's avatar
Michael Yang committed
319
		}
320
321
	}

Michael Yang's avatar
Michael Yang committed
322
	if err := os.Rename(file.Name(), b.Name); err != nil {
323
		return err
Michael Yang's avatar
Michael Yang committed
324
325
	}

326
	return nil
327
328
}

329
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart) error {
330
331
	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
332
333
334
335
336
337
		req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL.String(), nil)
		if err != nil {
			return err
		}
		req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
		resp, err := http.DefaultClient.Do(req)
338
339
340
341
		if err != nil {
			return err
		}
		defer resp.Body.Close()
342

343
		n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed.Load())
344
345
346
347
348
		if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
			// rollback progress
			b.Completed.Add(-n)
			return err
		}
349

350
		part.Completed.Add(n)
351
352
353
354
355
		if err := b.writePart(part.Name(), part); err != nil {
			return err
		}

		// return nil or context.Canceled or UnexpectedEOF (resumable)
Michael Yang's avatar
Michael Yang committed
356
		return err
357
358
359
360
361
362
363
	})

	g.Go(func() error {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
364
				if part.Completed.Load() >= part.Size {
365
366
367
					return nil
				}

368
369
370
371
				part.lastUpdatedMu.Lock()
				lastUpdated := part.lastUpdated
				part.lastUpdatedMu.Unlock()

372
				if !lastUpdated.IsZero() && time.Since(lastUpdated) > 30*time.Second {
373
374
					const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
					slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
375
					// reset last updated
376
					part.lastUpdatedMu.Lock()
377
					part.lastUpdated = time.Time{}
378
					part.lastUpdatedMu.Unlock()
379
380
381
382
383
384
385
					return errPartStalled
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
Michael Yang's avatar
Michael Yang committed
386

387
	return g.Wait()
388
389
}

Michael Yang's avatar
Michael Yang committed
390
391
392
393
394
395
396
397
398
399
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

400
401
402
403
404
405
406
407
408
409
410
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
411

Michael Yang's avatar
Michael Yang committed
412
	part.blobDownload = b
413
	return &part, nil
Michael Yang's avatar
Michael Yang committed
414
415
}

416
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
Michael Yang's avatar
Michael Yang committed
417
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o644)
Michael Yang's avatar
Michael Yang committed
418
419
	if err != nil {
		return err
420
	}
Michael Yang's avatar
Michael Yang committed
421
	defer partFile.Close()
422

Michael Yang's avatar
Michael Yang committed
423
	return json.NewEncoder(partFile).Encode(part)
424
}
425

Michael Yang's avatar
Michael Yang committed
426
427
428
429
430
431
432
433
434
435
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

436
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
437
438
	b.acquire()
	defer b.release()
439
440
441
442

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
443
444
		case <-b.done:
			return b.err
445
		case <-ticker.C:
446
447
448
449
450
451
			fn(api.ProgressResponse{
				Status:    fmt.Sprintf("pulling %s", b.Digest[7:19]),
				Digest:    b.Digest,
				Total:     b.Total,
				Completed: b.Completed.Load(),
			})
452
453
454
455
456
457
458
459
460
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
Michael Yang's avatar
Michael Yang committed
461
	regOpts *registryOptions
462
463
464
465
	fn      func(api.ProgressResponse)
}

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
466
func downloadBlob(ctx context.Context, opts downloadOpts) (cacheHit bool, _ error) {
467
468
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
469
		return false, err
470
471
472
473
474
475
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
476
		return false, err
477
478
	default:
		opts.fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
479
			Status:    fmt.Sprintf("pulling %s", opts.digest[7:19]),
480
481
482
483
484
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

485
		return true, nil
486
487
	}

Michael Yang's avatar
names  
Michael Yang committed
488
489
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
490
491
492
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
493
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
494
			blobDownloadManager.Delete(opts.digest)
495
			return false, err
496
497
		}

Michael Yang's avatar
Michael Yang committed
498
		//nolint:contextcheck
Michael Yang's avatar
names  
Michael Yang committed
499
		go download.Run(context.Background(), requestURL, opts.regOpts)
500
501
	}

502
	return false, download.Wait(ctx, opts.fn)
503
}