download.go 11.3 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
	"errors"
	"fmt"
	"io"
9
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
11
	"math/rand/v2"
12
	"net/http"
Michael Yang's avatar
Michael Yang committed
13
	"net/url"
14
	"os"
Michael Yang's avatar
Michael Yang committed
15
	"path/filepath"
16
	"strconv"
Michael Yang's avatar
Michael Yang committed
17
	"strings"
18
19
	"sync"
	"sync/atomic"
20
	"syscall"
21
	"time"
22

Michael Yang's avatar
Michael Yang committed
23
	"golang.org/x/sync/errgroup"
24

25
26
	"github.com/ollama/ollama/api"
	"github.com/ollama/ollama/format"
27
28
)

29
30
31
32
33
const maxRetries = 6

var errMaxRetriesExceeded = errors.New("max retries exceeded")
var errPartStalled = errors.New("part stalled")

34
var blobDownloadManager sync.Map
35

36
37
38
type blobDownload struct {
	Name   string
	Digest string
39

40
41
	Total     int64
	Completed atomic.Int64
42

43
	Parts []*blobDownloadPart
44

45
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
46

47
	done       chan struct{}
Michael Yang's avatar
Michael Yang committed
48
	err        error
Michael Yang's avatar
Michael Yang committed
49
	references atomic.Int32
50
}
51

52
type blobDownloadPart struct {
53
54
55
56
57
58
59
	N         int
	Offset    int64
	Size      int64
	Completed atomic.Int64

	lastUpdatedMu sync.Mutex
	lastUpdated   time.Time
Michael Yang's avatar
Michael Yang committed
60
61
62
63

	*blobDownload `json:"-"`
}

64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
type jsonBlobDownloadPart struct {
	N         int
	Offset    int64
	Size      int64
	Completed int64
}

func (p *blobDownloadPart) MarshalJSON() ([]byte, error) {
	return json.Marshal(jsonBlobDownloadPart{
		N:         p.N,
		Offset:    p.Offset,
		Size:      p.Size,
		Completed: p.Completed.Load(),
	})
}

func (p *blobDownloadPart) UnmarshalJSON(b []byte) error {
	var j jsonBlobDownloadPart
	if err := json.Unmarshal(b, &j); err != nil {
		return err
	}
	*p = blobDownloadPart{
		N:      j.N,
		Offset: j.Offset,
		Size:   j.Size,
	}
	p.Completed.Store(j.Completed)
	return nil
}

94
95
const (
	numDownloadParts          = 64
Michael Yang's avatar
Michael Yang committed
96
97
	minDownloadPartSize int64 = 100 * format.MegaByte
	maxDownloadPartSize int64 = 1000 * format.MegaByte
98
99
)

Michael Yang's avatar
Michael Yang committed
100
101
102
103
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
104
}
105

106
func (p *blobDownloadPart) StartsAt() int64 {
107
	return p.Offset + p.Completed.Load()
108
109
110
111
112
113
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

114
115
116
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.blobDownload.Completed.Add(int64(n))
117
	p.lastUpdatedMu.Lock()
118
	p.lastUpdated = time.Now()
119
	p.lastUpdatedMu.Unlock()
120
121
122
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
123
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
124
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
125
	if err != nil {
126
		return err
127
128
	}

129
130
	b.done = make(chan struct{})

Michael Yang's avatar
Michael Yang committed
131
	for _, partFilePath := range partFilePaths {
132
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
133
134
		if err != nil {
			return err
135
136
		}

137
		b.Total += part.Size
138
		b.Completed.Add(part.Completed.Load())
139
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
140
	}
141

142
	if len(b.Parts) == 0 {
Michael Yang's avatar
Michael Yang committed
143
		resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
144
		if err != nil {
Michael Yang's avatar
Michael Yang committed
145
146
147
148
			return err
		}
		defer resp.Body.Close()

149
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
150

Michael Yang's avatar
Michael Yang committed
151
		size := b.Total / numDownloadParts
152
153
154
155
156
157
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
158

159
		var offset int64
160
161
162
163
164
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
165
			if err := b.newPart(offset, size); err != nil {
166
				return err
Michael Yang's avatar
Michael Yang committed
167
168
169
			}

			offset += size
170
171
172
		}
	}

173
	slog.Info(fmt.Sprintf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
174
175
176
	return nil
}

Michael Yang's avatar
Michael Yang committed
177
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
178
	defer close(b.done)
179
180
181
	b.err = b.run(ctx, requestURL, opts)
}

182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
func newBackoff(maxBackoff time.Duration) func(ctx context.Context) error {
	var n int
	return func(ctx context.Context) error {
		if ctx.Err() != nil {
			return ctx.Err()
		}

		n++

		// n^2 backoff timer is a little smoother than the
		// common choice of 2^n.
		d := min(time.Duration(n*n)*10*time.Millisecond, maxBackoff)
		// Randomize the delay between 0.5-1.5 x msec, in order
		// to prevent accidental "thundering herd" problems.
		d = time.Duration(float64(d) * (rand.Float64() + 0.5))
		t := time.NewTimer(d)
		defer t.Stop()
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-t.C:
			return nil
		}
	}
}

208
func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
209
210
211
	defer blobDownloadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

Michael Yang's avatar
Michael Yang committed
212
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0o644)
213
	if err != nil {
214
		return err
Michael Yang's avatar
Michael Yang committed
215
	}
216
	defer file.Close()
217

Michael Yang's avatar
Michael Yang committed
218
	_ = file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
219

220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
	directURL, err := func() (*url.URL, error) {
		ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
		defer cancel()

		backoff := newBackoff(10 * time.Second)
		for {
			// shallow clone opts to be used in the closure
			// without affecting the outer opts.
			newOpts := new(registryOptions)
			*newOpts = *opts

			newOpts.CheckRedirect = func(req *http.Request, via []*http.Request) error {
				if len(via) > 10 {
					return errors.New("maxium redirects exceeded (10) for directURL")
				}

				// if the hostname is the same, allow the redirect
				if req.URL.Hostname() == requestURL.Hostname() {
					return nil
				}

				// stop at the first redirect that is not
				// the same hostname as the original
				// request.
				return http.ErrUseLastResponse
			}

			resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, nil, nil, newOpts)
			if err != nil {
				slog.Warn("failed to get direct URL; backing off and retrying", "err", err)
				if err := backoff(ctx); err != nil {
					return nil, err
				}
				continue
			}
			defer resp.Body.Close()
			if resp.StatusCode != http.StatusTemporaryRedirect {
				return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode)
			}
			return resp.Location()
		}
	}()
	if err != nil {
		return err
	}

266
267
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numDownloadParts)
268
269
	for i := range b.Parts {
		part := b.Parts[i]
270
		if part.Completed.Load() == part.Size {
Michael Yang's avatar
Michael Yang committed
271
272
			continue
		}
273

274
		g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
275
			var err error
Michael Yang's avatar
Michael Yang committed
276
			for try := 0; try < maxRetries; try++ {
277
				w := io.NewOffsetWriter(file, part.StartsAt())
278
				err = b.downloadChunk(inner, directURL, w, part)
279
				switch {
280
281
				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
					// return immediately if the context is canceled or the device is out of space
282
					return err
283
284
285
				case errors.Is(err, errPartStalled):
					try--
					continue
286
				case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
287
					sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
288
					slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
289
					time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
290
					continue
291
292
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
293
294
295
				}
			}

Michael Yang's avatar
Michael Yang committed
296
			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
297
		})
298
299
	}

Michael Yang's avatar
Michael Yang committed
300
	if err := g.Wait(); err != nil {
301
		return err
302
303
	}

304
305
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
306
		return err
Michael Yang's avatar
Michael Yang committed
307
308
	}

309
	for i := range b.Parts {
310
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
311
			return err
Michael Yang's avatar
Michael Yang committed
312
		}
313
314
	}

Michael Yang's avatar
Michael Yang committed
315
	if err := os.Rename(file.Name(), b.Name); err != nil {
316
		return err
Michael Yang's avatar
Michael Yang committed
317
318
	}

319
	return nil
320
321
}

322
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart) error {
323
324
	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
325
326
327
328
329
330
		req, err := http.NewRequestWithContext(ctx, http.MethodGet, requestURL.String(), nil)
		if err != nil {
			return err
		}
		req.Header.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
		resp, err := http.DefaultClient.Do(req)
331
332
333
334
		if err != nil {
			return err
		}
		defer resp.Body.Close()
335

336
		n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed.Load())
337
338
339
340
341
		if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
			// rollback progress
			b.Completed.Add(-n)
			return err
		}
342

343
		part.Completed.Add(n)
344
345
346
347
348
		if err := b.writePart(part.Name(), part); err != nil {
			return err
		}

		// return nil or context.Canceled or UnexpectedEOF (resumable)
Michael Yang's avatar
Michael Yang committed
349
		return err
350
351
352
353
354
355
356
	})

	g.Go(func() error {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
357
				if part.Completed.Load() >= part.Size {
358
359
360
					return nil
				}

361
362
363
364
365
				part.lastUpdatedMu.Lock()
				lastUpdated := part.lastUpdated
				part.lastUpdatedMu.Unlock()

				if !lastUpdated.IsZero() && time.Since(lastUpdated) > 5*time.Second {
366
367
					const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
					slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
368
					// reset last updated
369
					part.lastUpdatedMu.Lock()
370
					part.lastUpdated = time.Time{}
371
					part.lastUpdatedMu.Unlock()
372
373
374
375
376
377
378
					return errPartStalled
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
Michael Yang's avatar
Michael Yang committed
379

380
	return g.Wait()
381
382
}

Michael Yang's avatar
Michael Yang committed
383
384
385
386
387
388
389
390
391
392
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

393
394
395
396
397
398
399
400
401
402
403
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
404

Michael Yang's avatar
Michael Yang committed
405
	part.blobDownload = b
406
	return &part, nil
Michael Yang's avatar
Michael Yang committed
407
408
}

409
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
Michael Yang's avatar
Michael Yang committed
410
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o644)
Michael Yang's avatar
Michael Yang committed
411
412
	if err != nil {
		return err
413
	}
Michael Yang's avatar
Michael Yang committed
414
	defer partFile.Close()
415

Michael Yang's avatar
Michael Yang committed
416
	return json.NewEncoder(partFile).Encode(part)
417
}
418

Michael Yang's avatar
Michael Yang committed
419
420
421
422
423
424
425
426
427
428
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

429
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
430
431
	b.acquire()
	defer b.release()
432
433
434
435

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
436
437
		case <-b.done:
			return b.err
438
		case <-ticker.C:
439
440
441
442
443
444
			fn(api.ProgressResponse{
				Status:    fmt.Sprintf("pulling %s", b.Digest[7:19]),
				Digest:    b.Digest,
				Total:     b.Total,
				Completed: b.Completed.Load(),
			})
445
446
447
448
449
450
451
452
453
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
Michael Yang's avatar
Michael Yang committed
454
	regOpts *registryOptions
455
456
457
458
	fn      func(api.ProgressResponse)
}

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
459
func downloadBlob(ctx context.Context, opts downloadOpts) (cacheHit bool, _ error) {
460
461
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
462
		return false, err
463
464
465
466
467
468
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
469
		return false, err
470
471
	default:
		opts.fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
472
			Status:    fmt.Sprintf("pulling %s", opts.digest[7:19]),
473
474
475
476
477
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

478
		return true, nil
479
480
	}

Michael Yang's avatar
names  
Michael Yang committed
481
482
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
483
484
485
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
486
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
487
			blobDownloadManager.Delete(opts.digest)
488
			return false, err
489
490
		}

Michael Yang's avatar
Michael Yang committed
491
		//nolint:contextcheck
Michael Yang's avatar
names  
Michael Yang committed
492
		go download.Run(context.Background(), requestURL, opts.regOpts)
493
494
	}

495
	return false, download.Wait(ctx, opts.fn)
496
}