download.go 6.82 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
9
10
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
Michael Yang's avatar
Michael Yang committed
11
	"net/url"
12
	"os"
Michael Yang's avatar
Michael Yang committed
13
	"path/filepath"
14
	"strconv"
Michael Yang's avatar
Michael Yang committed
15
	"strings"
16
17
18
	"sync"
	"sync/atomic"
	"time"
19

Michael Yang's avatar
Michael Yang committed
20
	"golang.org/x/sync/errgroup"
21
22

	"github.com/jmorganca/ollama/api"
23
24
)

25
var blobDownloadManager sync.Map
26

27
28
29
type blobDownload struct {
	Name   string
	Digest string
30

31
32
	Total     int64
	Completed atomic.Int64
Michael Yang's avatar
Michael Yang committed
33
	done      bool
34

35
	Parts []*blobDownloadPart
36

37
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
38
	references atomic.Int32
39
}
40

41
type blobDownloadPart struct {
Michael Yang's avatar
Michael Yang committed
42
	N         int
43
44
45
	Offset    int64
	Size      int64
	Completed int64
Michael Yang's avatar
Michael Yang committed
46
47
48
49
50
51
52
53

	*blobDownload `json:"-"`
}

func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
54
}
55

56
57
58
59
60
61
62
63
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

64
65
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
66
	if err != nil {
67
		return err
68
69
	}

Michael Yang's avatar
Michael Yang committed
70
	for _, partFilePath := range partFilePaths {
71
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
72
73
		if err != nil {
			return err
74
75
		}

76
77
78
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
79
	}
80

81
82
	if len(b.Parts) == 0 {
		resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
83
		if err != nil {
Michael Yang's avatar
Michael Yang committed
84
85
86
87
			return err
		}
		defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
88
89
90
91
92
		if resp.StatusCode >= http.StatusBadRequest {
			body, _ := io.ReadAll(resp.Body)
			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
		}

93
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
94
95
96
97

		var offset int64
		var size int64 = 64 * 1024 * 1024

98
99
100
101
102
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
103
			if err := b.newPart(offset, size); err != nil {
104
				return err
Michael Yang's avatar
Michael Yang committed
105
106
107
			}

			offset += size
108
109
110
		}
	}

111
112
113
114
115
116
117
118
119
	log.Printf("downloading %s in %d part(s)", b.Digest[7:19], len(b.Parts))
	return nil
}

func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) (err error) {
	defer blobDownloadManager.Delete(b.Digest)

	ctx, b.CancelFunc = context.WithCancel(ctx)

120
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0644)
121
122
	if err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
123
	}
124
	defer file.Close()
125

126
	file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
127

Michael Yang's avatar
Michael Yang committed
128
	g, _ := errgroup.WithContext(ctx)
129
	// TODO(mxyng): download concurrency should be configurable
Michael Yang's avatar
Michael Yang committed
130
	g.SetLimit(64)
131
132
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
133
134
135
		if part.Completed == part.Size {
			continue
		}
136

Michael Yang's avatar
Michael Yang committed
137
138
139
		i := i
		g.Go(func() error {
			for try := 0; try < maxRetries; try++ {
140
141
				w := io.NewOffsetWriter(file, part.StartsAt())
				err := b.downloadChunk(ctx, requestURL, w, part, opts)
142
143
144
145
146
				switch {
				case errors.Is(err, context.Canceled):
					return err
				case err != nil:
					log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], i, try, err)
Michael Yang's avatar
Michael Yang committed
147
					continue
148
149
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
150
151
152
153
154
				}
			}

			return errors.New("max retries exceeded")
		})
155
156
	}

Michael Yang's avatar
Michael Yang committed
157
158
	if err := g.Wait(); err != nil {
		return err
159
160
	}

161
162
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
Michael Yang's avatar
Michael Yang committed
163
164
165
		return err
	}

166
	for i := range b.Parts {
167
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
Michael Yang's avatar
Michael Yang committed
168
169
			return err
		}
170
171
	}

Michael Yang's avatar
Michael Yang committed
172
173
174
175
176
177
	if err := os.Rename(file.Name(), b.Name); err != nil {
		return err
	}

	b.done = true
	return nil
178
179
}

180
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
181
	headers := make(http.Header)
182
	headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
183
	resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
184
185
186
187
	if err != nil {
		return err
	}
	defer resp.Body.Close()
188

189
	n, err := io.Copy(w, io.TeeReader(resp.Body, b))
Michael Yang's avatar
Michael Yang committed
190
	if err != nil && !errors.Is(err, context.Canceled) {
191
192
		// rollback progress
		b.Completed.Add(-n)
Michael Yang's avatar
Michael Yang committed
193
194
		return err
	}
195

Michael Yang's avatar
Michael Yang committed
196
	part.Completed += n
Michael Yang's avatar
Michael Yang committed
197
	if err := b.writePart(part.Name(), part); err != nil {
Michael Yang's avatar
Michael Yang committed
198
199
200
201
202
		return err
	}

	// return nil or context.Canceled
	return err
203
204
}

Michael Yang's avatar
Michael Yang committed
205
206
207
208
209
210
211
212
213
214
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

215
216
217
218
219
220
221
222
223
224
225
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
226

Michael Yang's avatar
Michael Yang committed
227
	part.blobDownload = b
228
	return &part, nil
Michael Yang's avatar
Michael Yang committed
229
230
}

231
232
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
Michael Yang's avatar
Michael Yang committed
233
234
	if err != nil {
		return err
235
	}
Michael Yang's avatar
Michael Yang committed
236
	defer partFile.Close()
237

Michael Yang's avatar
Michael Yang committed
238
	return json.NewEncoder(partFile).Encode(part)
239
}
240
241
242
243
244
245
246

func (b *blobDownload) Write(p []byte) (n int, err error) {
	n = len(p)
	b.Completed.Add(int64(n))
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
247
248
249
250
251
252
253
254
255
256
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

257
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
258
259
	b.acquire()
	defer b.release()
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
		})

Michael Yang's avatar
Michael Yang committed
276
277
		if b.done {
			return nil
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
	regOpts *RegistryOptions
	fn      func(api.ProgressResponse)
}

const maxRetries = 3

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
		return err
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
		opts.fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", opts.digest),
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

		return nil
	}

Michael Yang's avatar
names  
Michael Yang committed
314
315
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
316
317
318
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
319
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
320
			blobDownloadManager.Delete(opts.digest)
321
322
323
			return err
		}

Michael Yang's avatar
names  
Michael Yang committed
324
		go download.Run(context.Background(), requestURL, opts.regOpts)
325
326
	}

Michael Yang's avatar
names  
Michael Yang committed
327
	return download.Wait(ctx, opts.fn)
328
}