download.go 10.4 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
	"errors"
	"fmt"
	"io"
9
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
11
	"math/rand/v2"
12
	"net/http"
Michael Yang's avatar
Michael Yang committed
13
	"net/url"
14
	"os"
Michael Yang's avatar
Michael Yang committed
15
	"path/filepath"
16
	"strconv"
Michael Yang's avatar
Michael Yang committed
17
	"strings"
18
19
	"sync"
	"sync/atomic"
20
	"syscall"
21
	"time"
22

Michael Yang's avatar
Michael Yang committed
23
	"golang.org/x/sync/errgroup"
24

25
26
	"github.com/ollama/ollama/api"
	"github.com/ollama/ollama/format"
27
28
)

29
30
31
32
33
const maxRetries = 6

var errMaxRetriesExceeded = errors.New("max retries exceeded")
var errPartStalled = errors.New("part stalled")

34
var blobDownloadManager sync.Map
35

36
37
38
type blobDownload struct {
	Name   string
	Digest string
39

40
41
	Total     int64
	Completed atomic.Int64
42

43
	Parts []*blobDownloadPart
44

45
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
46
47
48

	done       bool
	err        error
Michael Yang's avatar
Michael Yang committed
49
	references atomic.Int32
50
}
51

52
type blobDownloadPart struct {
53
54
55
56
57
	N           int
	Offset      int64
	Size        int64
	Completed   int64
	lastUpdated time.Time
Michael Yang's avatar
Michael Yang committed
58
59
60
61

	*blobDownload `json:"-"`
}

62
63
const (
	numDownloadParts          = 64
Michael Yang's avatar
Michael Yang committed
64
65
	minDownloadPartSize int64 = 100 * format.MegaByte
	maxDownloadPartSize int64 = 1000 * format.MegaByte
66
67
)

Michael Yang's avatar
Michael Yang committed
68
69
70
71
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
72
}
73

74
75
76
77
78
79
80
81
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

82
83
84
85
86
87
88
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.blobDownload.Completed.Add(int64(n))
	p.lastUpdated = time.Now()
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
89
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
90
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
91
	if err != nil {
92
		return err
93
94
	}

Michael Yang's avatar
Michael Yang committed
95
	for _, partFilePath := range partFilePaths {
96
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
97
98
		if err != nil {
			return err
99
100
		}

101
102
103
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
104
	}
105

106
	if len(b.Parts) == 0 {
Michael Yang's avatar
Michael Yang committed
107
		resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
108
		if err != nil {
Michael Yang's avatar
Michael Yang committed
109
110
111
112
			return err
		}
		defer resp.Body.Close()

113
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
114

Michael Yang's avatar
Michael Yang committed
115
		size := b.Total / numDownloadParts
116
117
118
119
120
121
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
122

123
		var offset int64
124
125
126
127
128
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
129
			if err := b.newPart(offset, size); err != nil {
130
				return err
Michael Yang's avatar
Michael Yang committed
131
132
133
			}

			offset += size
134
135
136
		}
	}

137
	slog.Info(fmt.Sprintf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
138
139
140
	return nil
}

Michael Yang's avatar
Michael Yang committed
141
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
142
143
144
	b.err = b.run(ctx, requestURL, opts)
}

145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
func newBackoff(maxBackoff time.Duration) func(ctx context.Context) error {
	var n int
	return func(ctx context.Context) error {
		if ctx.Err() != nil {
			return ctx.Err()
		}

		n++

		// n^2 backoff timer is a little smoother than the
		// common choice of 2^n.
		d := min(time.Duration(n*n)*10*time.Millisecond, maxBackoff)
		// Randomize the delay between 0.5-1.5 x msec, in order
		// to prevent accidental "thundering herd" problems.
		d = time.Duration(float64(d) * (rand.Float64() + 0.5))
		t := time.NewTimer(d)
		defer t.Stop()
		select {
		case <-ctx.Done():
			return ctx.Err()
		case <-t.C:
			return nil
		}
	}
}

171
func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
172
173
174
	defer blobDownloadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

Michael Yang's avatar
Michael Yang committed
175
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0o644)
176
	if err != nil {
177
		return err
Michael Yang's avatar
Michael Yang committed
178
	}
179
	defer file.Close()
180

Michael Yang's avatar
Michael Yang committed
181
	_ = file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
182

183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
	directURL, err := func() (*url.URL, error) {
		ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
		defer cancel()

		backoff := newBackoff(10 * time.Second)
		for {
			// shallow clone opts to be used in the closure
			// without affecting the outer opts.
			newOpts := new(registryOptions)
			*newOpts = *opts

			newOpts.CheckRedirect = func(req *http.Request, via []*http.Request) error {
				if len(via) > 10 {
					return errors.New("maxium redirects exceeded (10) for directURL")
				}

				// if the hostname is the same, allow the redirect
				if req.URL.Hostname() == requestURL.Hostname() {
					return nil
				}

				// stop at the first redirect that is not
				// the same hostname as the original
				// request.
				return http.ErrUseLastResponse
			}

			resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, nil, nil, newOpts)
			if err != nil {
				slog.Warn("failed to get direct URL; backing off and retrying", "err", err)
				if err := backoff(ctx); err != nil {
					return nil, err
				}
				continue
			}
			defer resp.Body.Close()
			if resp.StatusCode != http.StatusTemporaryRedirect {
				return nil, fmt.Errorf("unexpected status code %d", resp.StatusCode)
			}
			return resp.Location()
		}
	}()
	if err != nil {
		return err
	}

229
230
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numDownloadParts)
231
232
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
233
234
235
		if part.Completed == part.Size {
			continue
		}
236

237
		g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
238
			var err error
Michael Yang's avatar
Michael Yang committed
239
			for try := 0; try < maxRetries; try++ {
240
				w := io.NewOffsetWriter(file, part.StartsAt())
241
				err = b.downloadChunk(inner, directURL, w, part, opts)
242
				switch {
243
244
				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
					// return immediately if the context is canceled or the device is out of space
245
					return err
246
247
248
				case errors.Is(err, errPartStalled):
					try--
					continue
249
				case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
250
					sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
251
					slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
252
					time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
253
					continue
254
255
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
256
257
258
				}
			}

Michael Yang's avatar
Michael Yang committed
259
			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
260
		})
261
262
	}

Michael Yang's avatar
Michael Yang committed
263
	if err := g.Wait(); err != nil {
264
		return err
265
266
	}

267
268
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
269
		return err
Michael Yang's avatar
Michael Yang committed
270
271
	}

272
	for i := range b.Parts {
273
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
274
			return err
Michael Yang's avatar
Michael Yang committed
275
		}
276
277
	}

Michael Yang's avatar
Michael Yang committed
278
	if err := os.Rename(file.Name(), b.Name); err != nil {
279
		return err
Michael Yang's avatar
Michael Yang committed
280
281
282
	}

	b.done = true
283
	return nil
284
285
}

Michael Yang's avatar
Michael Yang committed
286
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error {
287
288
289
290
291
292
293
294
295
	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		headers := make(http.Header)
		headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
		resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
296

Tim Scheuermann's avatar
Tim Scheuermann committed
297
		n, err := io.CopyN(w, io.TeeReader(resp.Body, part), part.Size-part.Completed)
298
299
300
301
302
		if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
			// rollback progress
			b.Completed.Add(-n)
			return err
		}
303

304
305
306
307
308
309
		part.Completed += n
		if err := b.writePart(part.Name(), part); err != nil {
			return err
		}

		// return nil or context.Canceled or UnexpectedEOF (resumable)
Michael Yang's avatar
Michael Yang committed
310
		return err
311
312
313
314
315
316
317
318
319
320
321
322
	})

	g.Go(func() error {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				if part.Completed >= part.Size {
					return nil
				}

				if !part.lastUpdated.IsZero() && time.Since(part.lastUpdated) > 5*time.Second {
323
324
					const msg = "%s part %d stalled; retrying. If this persists, press ctrl-c to exit, then 'ollama pull' to find a faster connection."
					slog.Info(fmt.Sprintf(msg, b.Digest[7:19], part.N))
325
326
327
328
329
330
331
332
333
					// reset last updated
					part.lastUpdated = time.Time{}
					return errPartStalled
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
Michael Yang's avatar
Michael Yang committed
334

335
	return g.Wait()
336
337
}

Michael Yang's avatar
Michael Yang committed
338
339
340
341
342
343
344
345
346
347
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

348
349
350
351
352
353
354
355
356
357
358
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
359

Michael Yang's avatar
Michael Yang committed
360
	part.blobDownload = b
361
	return &part, nil
Michael Yang's avatar
Michael Yang committed
362
363
}

364
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
Michael Yang's avatar
Michael Yang committed
365
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o644)
Michael Yang's avatar
Michael Yang committed
366
367
	if err != nil {
		return err
368
	}
Michael Yang's avatar
Michael Yang committed
369
	defer partFile.Close()
370

Michael Yang's avatar
Michael Yang committed
371
	return json.NewEncoder(partFile).Encode(part)
372
}
373

Michael Yang's avatar
Michael Yang committed
374
375
376
377
378
379
380
381
382
383
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

384
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
385
386
	b.acquire()
	defer b.release()
387
388
389
390
391

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
392
393
394
395
396
397
398
399
400
401
			fn(api.ProgressResponse{
				Status:    fmt.Sprintf("pulling %s", b.Digest[7:19]),
				Digest:    b.Digest,
				Total:     b.Total,
				Completed: b.Completed.Load(),
			})

			if b.done || b.err != nil {
				return b.err
			}
402
403
404
405
406
407
408
409
410
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
Michael Yang's avatar
Michael Yang committed
411
	regOpts *registryOptions
412
413
414
415
	fn      func(api.ProgressResponse)
}

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
416
func downloadBlob(ctx context.Context, opts downloadOpts) (cacheHit bool, _ error) {
417
418
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
419
		return false, err
420
421
422
423
424
425
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
426
		return false, err
427
428
	default:
		opts.fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
429
			Status:    fmt.Sprintf("pulling %s", opts.digest[7:19]),
430
431
432
433
434
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

435
		return true, nil
436
437
	}

Michael Yang's avatar
names  
Michael Yang committed
438
439
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
440
441
442
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
443
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
444
			blobDownloadManager.Delete(opts.digest)
445
			return false, err
446
447
		}

Michael Yang's avatar
Michael Yang committed
448
		//nolint:contextcheck
Michael Yang's avatar
names  
Michael Yang committed
449
		go download.Run(context.Background(), requestURL, opts.regOpts)
450
451
	}

452
	return false, download.Wait(ctx, opts.fn)
453
}