download.go 7.29 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
9
10
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
Michael Yang's avatar
Michael Yang committed
11
	"net/url"
12
	"os"
Michael Yang's avatar
Michael Yang committed
13
	"path/filepath"
14
	"strconv"
Michael Yang's avatar
Michael Yang committed
15
	"strings"
16
17
18
	"sync"
	"sync/atomic"
	"time"
19

Michael Yang's avatar
Michael Yang committed
20
	"golang.org/x/sync/errgroup"
21
22

	"github.com/jmorganca/ollama/api"
23
	"github.com/jmorganca/ollama/format"
24
25
)

26
var blobDownloadManager sync.Map
27

28
29
30
type blobDownload struct {
	Name   string
	Digest string
31

32
33
	Total     int64
	Completed atomic.Int64
34

35
	Parts []*blobDownloadPart
36

37
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
38
39
40

	done       bool
	err        error
Michael Yang's avatar
Michael Yang committed
41
	references atomic.Int32
42
}
43

44
type blobDownloadPart struct {
Michael Yang's avatar
Michael Yang committed
45
	N         int
46
47
48
	Offset    int64
	Size      int64
	Completed int64
Michael Yang's avatar
Michael Yang committed
49
50
51
52

	*blobDownload `json:"-"`
}

53
54
55
56
57
58
const (
	numDownloadParts          = 64
	minDownloadPartSize int64 = 32 * 1000 * 1000
	maxDownloadPartSize int64 = 256 * 1000 * 1000
)

Michael Yang's avatar
Michael Yang committed
59
60
61
62
func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
63
}
64

65
66
67
68
69
70
71
72
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

73
74
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
75
	if err != nil {
76
		return err
77
78
	}

Michael Yang's avatar
Michael Yang committed
79
	for _, partFilePath := range partFilePaths {
80
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
81
82
		if err != nil {
			return err
83
84
		}

85
86
87
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
88
	}
89

90
91
	if len(b.Parts) == 0 {
		resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
92
		if err != nil {
Michael Yang's avatar
Michael Yang committed
93
94
95
96
			return err
		}
		defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
97
98
99
100
101
		if resp.StatusCode >= http.StatusBadRequest {
			body, _ := io.ReadAll(resp.Body)
			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
		}

102
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
103

104
105
106
107
108
109
110
		var size = b.Total / numDownloadParts
		switch {
		case size < minDownloadPartSize:
			size = minDownloadPartSize
		case size > maxDownloadPartSize:
			size = maxDownloadPartSize
		}
Michael Yang's avatar
Michael Yang committed
111

112
		var offset int64
113
114
115
116
117
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
118
			if err := b.newPart(offset, size); err != nil {
119
				return err
Michael Yang's avatar
Michael Yang committed
120
121
122
			}

			offset += size
123
124
125
		}
	}

126
	log.Printf("downloading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
127
128
129
	return nil
}

Michael Yang's avatar
Michael Yang committed
130
131
132
133
134
func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) {
	b.err = b.run(ctx, requestURL, opts)
}

func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
135
136
137
138
	defer blobDownloadManager.Delete(b.Digest)

	ctx, b.CancelFunc = context.WithCancel(ctx)

139
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0644)
140
141
	if err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
142
	}
143
	defer file.Close()
144

145
	file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
146

Michael Yang's avatar
Michael Yang committed
147
	g, inner := errgroup.WithContext(ctx)
148
	g.SetLimit(numDownloadParts)
149
150
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
151
152
153
		if part.Completed == part.Size {
			continue
		}
154

Michael Yang's avatar
Michael Yang committed
155
156
157
		i := i
		g.Go(func() error {
			for try := 0; try < maxRetries; try++ {
158
				w := io.NewOffsetWriter(file, part.StartsAt())
Michael Yang's avatar
Michael Yang committed
159
				err := b.downloadChunk(inner, requestURL, w, part, opts)
160
161
162
163
164
				switch {
				case errors.Is(err, context.Canceled):
					return err
				case err != nil:
					log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], i, try, err)
Michael Yang's avatar
Michael Yang committed
165
					continue
166
167
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
168
169
170
171
172
				}
			}

			return errors.New("max retries exceeded")
		})
173
174
	}

Michael Yang's avatar
Michael Yang committed
175
176
	if err := g.Wait(); err != nil {
		return err
177
178
	}

179
180
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
Michael Yang's avatar
Michael Yang committed
181
182
183
		return err
	}

184
	for i := range b.Parts {
185
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
Michael Yang's avatar
Michael Yang committed
186
187
			return err
		}
188
189
	}

Michael Yang's avatar
Michael Yang committed
190
191
192
193
194
195
	if err := os.Rename(file.Name(), b.Name); err != nil {
		return err
	}

	b.done = true
	return nil
196
197
}

198
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
199
	headers := make(http.Header)
200
	headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
201
	resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
202
203
204
205
	if err != nil {
		return err
	}
	defer resp.Body.Close()
206

207
	n, err := io.Copy(w, io.TeeReader(resp.Body, b))
Michael Yang's avatar
Michael Yang committed
208
	if err != nil && !errors.Is(err, context.Canceled) {
209
210
		// rollback progress
		b.Completed.Add(-n)
Michael Yang's avatar
Michael Yang committed
211
212
		return err
	}
213

Michael Yang's avatar
Michael Yang committed
214
	part.Completed += n
Michael Yang's avatar
Michael Yang committed
215
	if err := b.writePart(part.Name(), part); err != nil {
Michael Yang's avatar
Michael Yang committed
216
217
218
219
220
		return err
	}

	// return nil or context.Canceled
	return err
221
222
}

Michael Yang's avatar
Michael Yang committed
223
224
225
226
227
228
229
230
231
232
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

233
234
235
236
237
238
239
240
241
242
243
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
244

Michael Yang's avatar
Michael Yang committed
245
	part.blobDownload = b
246
	return &part, nil
Michael Yang's avatar
Michael Yang committed
247
248
}

249
250
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
Michael Yang's avatar
Michael Yang committed
251
252
	if err != nil {
		return err
253
	}
Michael Yang's avatar
Michael Yang committed
254
	defer partFile.Close()
255

Michael Yang's avatar
Michael Yang committed
256
	return json.NewEncoder(partFile).Encode(part)
257
}
258
259
260
261
262
263
264

func (b *blobDownload) Write(p []byte) (n int, err error) {
	n = len(p)
	b.Completed.Add(int64(n))
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
265
266
267
268
269
270
271
272
273
274
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

275
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
276
277
	b.acquire()
	defer b.release()
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
		})

Michael Yang's avatar
Michael Yang committed
294
295
		if b.done || b.err != nil {
			return b.err
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
	regOpts *RegistryOptions
	fn      func(api.ProgressResponse)
}

const maxRetries = 3

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
		return err
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
		opts.fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", opts.digest),
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

		return nil
	}

Michael Yang's avatar
names  
Michael Yang committed
332
333
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
334
335
336
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
337
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
338
			blobDownloadManager.Delete(opts.digest)
339
340
341
			return err
		}

Michael Yang's avatar
names  
Michael Yang committed
342
		go download.Run(context.Background(), requestURL, opts.regOpts)
343
344
	}

Michael Yang's avatar
names  
Michael Yang committed
345
	return download.Wait(ctx, opts.fn)
346
}