"create_dataset.py" did not exist on "dc8c4458d46d8ab4111c5d25b5c7f3d9e1c4fdc8"
download.go 8.36 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
	"errors"
	"fmt"
	"io"
9
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
11
	"net/http"
Michael Yang's avatar
Michael Yang committed
12
	"net/url"
13
	"os"
Michael Yang's avatar
Michael Yang committed
14
	"path/filepath"
15
	"strconv"
Michael Yang's avatar
Michael Yang committed
16
	"strings"
17
18
	"sync"
	"sync/atomic"
19
	"syscall"
20
	"time"
21

Michael Yang's avatar
Michael Yang committed
22
	"golang.org/x/sync/errgroup"
23
24

	"github.com/jmorganca/ollama/api"
25
	"github.com/jmorganca/ollama/format"
26
27
)

28
29
30
31
32
const maxRetries = 6

var errMaxRetriesExceeded = errors.New("max retries exceeded")
var errPartStalled = errors.New("part stalled")

33
var blobDownloadManager sync.Map
34

35
36
37
type blobDownload struct {
	Name   string
	Digest string
38

39
40
	Total     int64
	Completed atomic.Int64
41

42
	Parts []*blobDownloadPart
43

44
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
45
46
47

	done       bool
	err        error
Michael Yang's avatar
Michael Yang committed
48
	references atomic.Int32
49
}
50

51
type blobDownloadPart struct {
52
53
54
55
56
	N           int
	Offset      int64
	Size        int64
	Completed   int64
	lastUpdated time.Time
Michael Yang's avatar
Michael Yang committed
57
58
59
60

	*blobDownload `json:"-"`
}

61
62
const (
	numDownloadParts          = 64
Michael Yang's avatar
Michael Yang committed
63
64
	minDownloadPartSize int64 = 100 * format.MegaByte
	maxDownloadPartSize int64 = 1000 * format.MegaByte
65
66
)

Michael Yang's avatar
Michael Yang committed
67
68
69
70
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
71
}
72

73
74
75
76
77
78
79
80
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

81
82
83
84
85
86
87
func (p *blobDownloadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.blobDownload.Completed.Add(int64(n))
	p.lastUpdated = time.Now()
	return n, nil
}

88
89
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
90
	if err != nil {
91
		return err
92
93
	}

Michael Yang's avatar
Michael Yang committed
94
	for _, partFilePath := range partFilePaths {
95
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
96
97
		if err != nil {
			return err
98
99
		}

100
101
102
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
103
	}
104

105
	if len(b.Parts) == 0 {
Michael Yang's avatar
Michael Yang committed
106
		resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
107
		if err != nil {
Michael Yang's avatar
Michael Yang committed
108
109
110
111
			return err
		}
		defer resp.Body.Close()

112
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
113

Michael Yang's avatar
Michael Yang committed
114
		size := b.Total / numDownloadParts
115
116
117
118
119
120
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
121

122
		var offset int64
123
124
125
126
127
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
128
			if err := b.newPart(offset, size); err != nil {
129
				return err
Michael Yang's avatar
Michael Yang committed
130
131
132
			}

			offset += size
133
134
135
		}
	}

136
	slog.Info(fmt.Sprintf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
137
138
139
	return nil
}

Michael Yang's avatar
Michael Yang committed
140
141
142
143
144
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) {
	b.err = b.run(ctx, requestURL, opts)
}

func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
145
146
147
	defer blobDownloadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

Michael Yang's avatar
Michael Yang committed
148
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0o644)
149
150
	if err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
151
	}
152
	defer file.Close()
153

Michael Yang's avatar
Michael Yang committed
154
	_ = file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
155

Michael Yang's avatar
Michael Yang committed
156
	g, inner := errgroup.WithContext(ctx)
157
	g.SetLimit(numDownloadParts)
158
159
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
160
161
162
		if part.Completed == part.Size {
			continue
		}
163

Michael Yang's avatar
Michael Yang committed
164
		g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
165
			var err error
Michael Yang's avatar
Michael Yang committed
166
			for try := 0; try < maxRetries; try++ {
167
				w := io.NewOffsetWriter(file, part.StartsAt())
Michael Yang's avatar
Michael Yang committed
168
				err = b.downloadChunk(inner, requestURL, w, part, opts)
169
				switch {
170
171
				case errors.Is(err, context.Canceled), errors.Is(err, syscall.ENOSPC):
					// return immediately if the context is canceled or the device is out of space
172
					return err
173
174
175
				case errors.Is(err, errPartStalled):
					try--
					continue
176
				case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
177
					sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
178
					slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
179
					time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
180
					continue
181
182
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
183
184
185
				}
			}

Michael Yang's avatar
Michael Yang committed
186
			return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
187
		})
188
189
	}

Michael Yang's avatar
Michael Yang committed
190
191
	if err := g.Wait(); err != nil {
		return err
192
193
	}

194
195
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
Michael Yang's avatar
Michael Yang committed
196
197
198
		return err
	}

199
	for i := range b.Parts {
200
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
Michael Yang's avatar
Michael Yang committed
201
202
			return err
		}
203
204
	}

Michael Yang's avatar
Michael Yang committed
205
206
207
208
209
210
	if err := os.Rename(file.Name(), b.Name); err != nil {
		return err
	}

	b.done = true
	return nil
211
212
}

213
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
214
215
216
217
218
219
220
221
222
	g, ctx := errgroup.WithContext(ctx)
	g.Go(func() error {
		headers := make(http.Header)
		headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
		resp, err := makeRequestWithRetry(ctx, http.MethodGet, requestURL, headers, nil, opts)
		if err != nil {
			return err
		}
		defer resp.Body.Close()
223

224
225
226
227
228
229
		n, err := io.Copy(w, io.TeeReader(resp.Body, part))
		if err != nil && !errors.Is(err, context.Canceled) && !errors.Is(err, io.ErrUnexpectedEOF) {
			// rollback progress
			b.Completed.Add(-n)
			return err
		}
230

231
232
233
234
235
236
		part.Completed += n
		if err := b.writePart(part.Name(), part); err != nil {
			return err
		}

		// return nil or context.Canceled or UnexpectedEOF (resumable)
Michael Yang's avatar
Michael Yang committed
237
		return err
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
	})

	g.Go(func() error {
		ticker := time.NewTicker(time.Second)
		for {
			select {
			case <-ticker.C:
				if part.Completed >= part.Size {
					return nil
				}

				if !part.lastUpdated.IsZero() && time.Since(part.lastUpdated) > 5*time.Second {
					log.Printf("%s part %d stalled; retrying", b.Digest[7:19], part.N)
					// reset last updated
					part.lastUpdated = time.Time{}
					return errPartStalled
				}
			case <-ctx.Done():
				return ctx.Err()
			}
		}
	})
Michael Yang's avatar
Michael Yang committed
260

261
	return g.Wait()
262
263
}

Michael Yang's avatar
Michael Yang committed
264
265
266
267
268
269
270
271
272
273
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

274
275
276
277
278
279
280
281
282
283
284
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
285

Michael Yang's avatar
Michael Yang committed
286
	part.blobDownload = b
287
	return &part, nil
Michael Yang's avatar
Michael Yang committed
288
289
}

290
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
Michael Yang's avatar
Michael Yang committed
291
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0o644)
Michael Yang's avatar
Michael Yang committed
292
293
	if err != nil {
		return err
294
	}
Michael Yang's avatar
Michael Yang committed
295
	defer partFile.Close()
296

Michael Yang's avatar
Michael Yang committed
297
	return json.NewEncoder(partFile).Encode(part)
298
}
299

Michael Yang's avatar
Michael Yang committed
300
301
302
303
304
305
306
307
308
309
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

310
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
311
312
	b.acquire()
	defer b.release()
313
314
315
316
317

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
318
319
320
321
322
323
324
325
326
327
			fn(api.ProgressResponse{
				Status:    fmt.Sprintf("pulling %s", b.Digest[7:19]),
				Digest:    b.Digest,
				Total:     b.Total,
				Completed: b.Completed.Load(),
			})

			if b.done || b.err != nil {
				return b.err
			}
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
		case <-ctx.Done():
			return ctx.Err()
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
	regOpts *RegistryOptions
	fn      func(api.ProgressResponse)
}

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
		return err
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
		opts.fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
355
			Status:    fmt.Sprintf("pulling %s", opts.digest[7:19]),
356
357
358
359
360
361
362
363
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

		return nil
	}

Michael Yang's avatar
names  
Michael Yang committed
364
365
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
366
367
368
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
369
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
370
			blobDownloadManager.Delete(opts.digest)
371
372
373
			return err
		}

Michael Yang's avatar
Michael Yang committed
374
		// nolint: contextcheck
Michael Yang's avatar
names  
Michael Yang committed
375
		go download.Run(context.Background(), requestURL, opts.regOpts)
376
377
	}

Michael Yang's avatar
names  
Michael Yang committed
378
	return download.Wait(ctx, opts.fn)
379
}