download.go 10.1 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
	"errors"
	"fmt"
	"io"
9
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
11
	"net/http"
Michael Yang's avatar
Michael Yang committed
12
	"net/url"
13
	"os"
Michael Yang's avatar
Michael Yang committed
14
	"path/filepath"
15
	"strconv"
Michael Yang's avatar
Michael Yang committed
16
	"strings"
17
18
	"sync"
	"sync/atomic"
19
	"syscall"
20
	"time"
21

Michael Yang's avatar
Michael Yang committed
22
	"golang.org/x/sync/errgroup"
23
	"golang.org/x/sync/semaphore"
24
25

	"github.com/jmorganca/ollama/api"
26
	"github.com/jmorganca/ollama/format"
27
28
)

29
30
31
32
33
const maxRetries = 6

var errMaxRetriesExceeded = errors.New("max retries exceeded")
var errPartStalled = errors.New("part stalled")

34
var blobDownloadManager sync.Map
35

36
37
38
type blobDownload struct {
	Name   string
	Digest string
39

40
41
	Total     int64
	Completed atomic.Int64
42

43
	Parts []*blobDownloadPart
44

45
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
46
47
48

	done       bool
	err        error
Michael Yang's avatar
Michael Yang committed
49
	references atomic.Int32
50
}
51

52
type blobDownloadPart struct {
53
54
55
56
57
	N           int
	Offset      int64
	Size        int64
	Completed   int64
	lastUpdated time.Time
Michael Yang's avatar
Michael Yang committed
58
59
60
61

	*blobDownload `json:"-"`
}

62
63
const (
	numDownloadParts          = 64
Michael Yang's avatar
Michael Yang committed
64
65
	minDownloadPartSize int64 = 100 * format.MegaByte
	maxDownloadPartSize int64 = 1000 * format.MegaByte
66
67
)

Michael Yang's avatar
Michael Yang committed
68
69
70
71
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
72
}
73

74
75
76
77
78
79
80
81
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

82
83
84
85
86
87
88
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.blobDownload.Completed.Add(int64(n))
	p.lastUpdated = time.Now()
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
89
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
90
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
91
	if err != nil {
92
		return err
93
94
	}

Michael Yang's avatar
Michael Yang committed
95
	for _, partFilePath := range partFilePaths {
96
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
97
98
		if err != nil {
			return err
99
100
		}

101
102
103
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
104
	}
105

106
	if len(b.Parts) == 0 {
Michael Yang's avatar
Michael Yang committed
107
		resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
108
		if err != nil {
Michael Yang's avatar
Michael Yang committed
109
110
111
112
			return err
		}
		defer resp.Body.Close()

113
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
114

Michael Yang's avatar
Michael Yang committed
115
		size := b.Total / numDownloadParts
116
117
118
119
120
121
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
122

123
		var offset int64
124
125
126
127
128
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
129
			if err := b.newPart(offset, size); err != nil {
130
				return err
Michael Yang's avatar
Michael Yang committed
131
132
133
			}

			offset += size
134
135
136
		}
	}

137
	slog.Info(fmt.Sprintf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
138
139
140
	return nil
}

Michael Yang's avatar
Michael Yang committed
141
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *registryOptions) {
142
143
144
	defer blobDownloadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

Michael Yang's avatar
Michael Yang committed
145
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0o644)
146
	if err != nil {
Michael Yang's avatar
Michael Yang committed
147
148
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
149
	}
150
	defer file.Close()
151

Michael Yang's avatar
Michael Yang committed
152
	_ = file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
153

Michael Yang's avatar
Michael Yang committed
154
155
156
	var limit int64 = 2
	g, inner := NewLimitGroup(ctx, numDownloadParts, limit)
	go watchDelta(inner, g, &b.Completed, limit)
157

158
159
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
160
161
162
		if part.Completed == part.Size {
			continue
		}
163

Michael Yang's avatar
lint  
Michael Yang committed
164
		g.Go(inner, func() error {
Michael Yang's avatar
Michael Yang committed
165
			var err error
Michael Yang's avatar
Michael Yang committed
166
			for try := 0; try < maxRetries; try++ {
167
				w := io.NewOffsetWriter(file, part.StartsAt())
Michael Yang's avatar
Michael Yang committed
168
				err = b.downloadChunk(inner, requestURL, w, part, opts)
169
				switch {
170
171
				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
					// return immediately if the context is canceled or the device is out of space
172
					return err
173
174
175
				case errors.Is(err, errPartStalled):
					try--
					continue
176
				case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
177
					sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
178
					slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
179
					time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
180
					continue
181
182
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
183
184
185
				}
			}

Michael Yang's avatar
Michael Yang committed
186
			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
187
		})
188
189
	}

Michael Yang's avatar
Michael Yang committed
190
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
191
192
		b.err = err
		return
193
194
	}

195
196
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
Michael Yang's avatar
Michael Yang committed
197
198
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
199
200
	}

201
	for i := range b.Parts {
202
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
Michael Yang's avatar
Michael Yang committed
203
204
			b.err = err
			return
Michael Yang's avatar
Michael Yang committed
205
		}
206
207
	}

Michael Yang's avatar
Michael Yang committed
208
	if err := os.Rename(file.Name(), b.Name); err != nil {
Michael Yang's avatar
Michael Yang committed
209
210
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
211
212
213
	}

	b.done = true
214
215
}

Michael Yang's avatar
Michael Yang committed
216
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *registryOptions) error {
217
218
219
220
221
222
223
224
225
	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		headers := make(http.Header)
		headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
		resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
226

227
228
229
230
231
232
		n, err := io.Copy(w, io.TeeReader(resp.Body, part))
		if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
			// rollback progress
			b.Completed.Add(-n)
			return err
		}
233

234
235
236
237
238
239
		part.Completed += n
		if err := b.writePart(part.Name(), part); err != nil {
			return err
		}

		// return nil or context.Canceled or UnexpectedEOF (resumable)
Michael Yang's avatar
Michael Yang committed
240
		return err
241
242
243
244
245
246
247
248
249
250
251
252
	})

	g.Go(func() error {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				if part.Completed >= part.Size {
					return nil
				}

				if !part.lastUpdated.IsZero() && time.Since(part.lastUpdated) > 5*time.Second {
Michael Yang's avatar
Michael Yang committed
253
					slog.Info(fmt.Sprintf("%s part %d stalled; retrying", b.Digest[7:19], part.N))
254
255
256
257
258
259
260
261
262
					// reset last updated
					part.lastUpdated = time.Time{}
					return errPartStalled
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
Michael Yang's avatar
Michael Yang committed
263

264
	return g.Wait()
265
266
}

Michael Yang's avatar
Michael Yang committed
267
268
269
270
271
272
273
274
275
276
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

277
278
279
280
281
282
283
284
285
286
287
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
288

Michael Yang's avatar
Michael Yang committed
289
	part.blobDownload = b
290
	return &part, nil
Michael Yang's avatar
Michael Yang committed
291
292
}

293
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
Michael Yang's avatar
Michael Yang committed
294
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o644)
Michael Yang's avatar
Michael Yang committed
295
296
	if err != nil {
		return err
297
	}
Michael Yang's avatar
Michael Yang committed
298
	defer partFile.Close()
299

Michael Yang's avatar
Michael Yang committed
300
	return json.NewEncoder(partFile).Encode(part)
301
}
302

Michael Yang's avatar
Michael Yang committed
303
304
305
306
307
308
309
310
311
312
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

313
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
314
315
	b.acquire()
	defer b.release()
316
317
318
319
320

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
321
322
323
324
325
326
327
328
329
330
			fn(api.ProgressResponse{
				Status:    fmt.Sprintf("pulling %s", b.Digest[7:19]),
				Digest:    b.Digest,
				Total:     b.Total,
				Completed: b.Completed.Load(),
			})

			if b.done || b.err != nil {
				return b.err
			}
331
332
333
334
335
336
337
338
339
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
Michael Yang's avatar
Michael Yang committed
340
	regOpts *registryOptions
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
	fn      func(api.ProgressResponse)
}

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
		return err
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
		opts.fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
358
			Status:    fmt.Sprintf("pulling %s", opts.digest[7:19]),
359
360
361
362
363
364
365
366
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

		return nil
	}

Michael Yang's avatar
names  
Michael Yang committed
367
368
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
369
370
371
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
372
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
373
			blobDownloadManager.Delete(opts.digest)
374
375
376
			return err
		}

Michael Yang's avatar
Michael Yang committed
377
		// nolint: contextcheck
Michael Yang's avatar
names  
Michael Yang committed
378
		go download.Run(context.Background(), requestURL, opts.regOpts)
379
380
	}

Michael Yang's avatar
names  
Michael Yang committed
381
	return download.Wait(ctx, opts.fn)
382
}
383
384
385

type LimitGroup struct {
	*errgroup.Group
Michael Yang's avatar
Michael Yang committed
386
387
	*semaphore.Weighted
	size, limit int64
388
389
}

Michael Yang's avatar
Michael Yang committed
390
func NewLimitGroup(ctx context.Context, size, limit int64) (*LimitGroup, context.Context) {
391
392
	g, ctx := errgroup.WithContext(ctx)
	return &LimitGroup{
Michael Yang's avatar
Michael Yang committed
393
394
395
396
		Group:    g,
		Weighted: semaphore.NewWeighted(size),
		size:     size,
		limit:    limit,
397
398
399
	}, ctx
}

Michael Yang's avatar
lint  
Michael Yang committed
400
func (g *LimitGroup) Go(ctx context.Context, fn func() error) {
Michael Yang's avatar
Michael Yang committed
401
402
403
404
405
406
	var weight int64 = 1
	if g.limit > 0 {
		weight = g.size / g.limit
	}

	_ = g.Acquire(ctx, weight)
Michael Yang's avatar
lint  
Michael Yang committed
407
	if ctx.Err() != nil {
408
409
410
411
		return
	}

	g.Group.Go(func() error {
Michael Yang's avatar
Michael Yang committed
412
		defer g.Release(weight)
413
414
415
416
		return fn()
	})
}

Michael Yang's avatar
Michael Yang committed
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
func (g *LimitGroup) SetLimit(limit int64) {
	if limit > g.limit {
		g.limit = limit
	}
}

func watchDelta(ctx context.Context, g *LimitGroup, c *atomic.Int64, limit int64) {
	var maxDelta float64
	var buckets []int64

	// 5s ramp up period
	nextUpdate := time.Now().Add(5 * time.Second)

	ticker := time.NewTicker(time.Second)
	for {
		select {
		case <-ticker.C:
			buckets = append(buckets, c.Load())
			if len(buckets) < 2 {
				continue
			} else if len(buckets) > 10 {
				buckets = buckets[1:]
			}

			delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
			slog.Debug("", "limit", limit, "delta", format.HumanBytes(int64(delta)), "max_delta", format.HumanBytes(int64(maxDelta)))

			if time.Now().Before(nextUpdate) {
				// quiet period; do not update ccy if recently updated
				continue
			} else if maxDelta > 0 {
				x := delta / maxDelta
				if x < 1.2 {
					continue
				}

				limit += int64(x)
				slog.Debug("setting", "limit", limit)
				g.SetLimit(limit)
			}

			// 3s cooldown period
			nextUpdate = time.Now().Add(3 * time.Second)
			maxDelta = delta

		case <-ctx.Done():
			return
		}
465
466
	}
}