download.go 6.85 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
9
10
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
Michael Yang's avatar
Michael Yang committed
11
	"net/url"
12
	"os"
Michael Yang's avatar
Michael Yang committed
13
	"path/filepath"
14
	"strconv"
Michael Yang's avatar
Michael Yang committed
15
	"strings"
16
17
18
	"sync"
	"sync/atomic"
	"time"
19

Michael Yang's avatar
Michael Yang committed
20
	"golang.org/x/sync/errgroup"
21
22

	"github.com/jmorganca/ollama/api"
23
24
)

25
var blobDownloadManager sync.Map
26

27
28
29
type blobDownload struct {
	Name   string
	Digest string
30

31
32
	Total     int64
	Completed atomic.Int64
33

34
	Parts []*blobDownloadPart
35

36
	context.CancelFunc
Michael Yang's avatar
Michael Yang committed
37
	references atomic.Int32
38
}
39

40
type blobDownloadPart struct {
Michael Yang's avatar
Michael Yang committed
41
	N         int
42
43
44
	Offset    int64
	Size      int64
	Completed int64
Michael Yang's avatar
Michael Yang committed
45
46
47
48
49
50
51
52

	*blobDownload `json:"-"`
}

func (p *blobDownloadPart) Name() string {
	return strings.Join([]string{
		p.blobDownload.Name, "partial", strconv.Itoa(p.N),
	}, "-")
53
}
54

55
56
57
58
59
60
61
62
func (p *blobDownloadPart) StartsAt() int64 {
	return p.Offset + p.Completed
}

func (p *blobDownloadPart) StopsAt() int64 {
	return p.Offset + p.Size
}

63
64
func (b *blobDownload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	partFilePaths, err := filepath.Glob(b.Name + "-partial-*")
Michael Yang's avatar
Michael Yang committed
65
	if err != nil {
66
		return err
67
68
	}

Michael Yang's avatar
Michael Yang committed
69
	for _, partFilePath := range partFilePaths {
70
		part, err := b.readPart(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
71
72
		if err != nil {
			return err
73
74
		}

75
76
77
		b.Total += part.Size
		b.Completed.Add(part.Completed)
		b.Parts = append(b.Parts, part)
Michael Yang's avatar
Michael Yang committed
78
	}
79

80
81
	if len(b.Parts) == 0 {
		resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
82
		if err != nil {
Michael Yang's avatar
Michael Yang committed
83
84
85
86
			return err
		}
		defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
87
88
89
90
91
		if resp.StatusCode >= http.StatusBadRequest {
			body, _ := io.ReadAll(resp.Body)
			return fmt.Errorf("registry responded with code %d: %v", resp.StatusCode, string(body))
		}

92
		b.Total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)
Michael Yang's avatar
Michael Yang committed
93
94
95
96

		var offset int64
		var size int64 = 64 * 1024 * 1024

97
98
99
100
101
		for offset < b.Total {
			if offset+size > b.Total {
				size = b.Total - offset
			}

Michael Yang's avatar
Michael Yang committed
102
			if err := b.newPart(offset, size); err != nil {
103
				return err
Michael Yang's avatar
Michael Yang committed
104
105
106
			}

			offset += size
107
108
109
		}
	}

110
111
112
113
114
115
116
117
118
	log.Printf("downloading %s in %d part(s)", b.Digest[7:19], len(b.Parts))
	return nil
}

func (b *blobDownload) Run(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) (err error) {
	defer blobDownloadManager.Delete(b.Digest)

	ctx, b.CancelFunc = context.WithCancel(ctx)

119
	file, err := os.OpenFile(b.Name+"-partial", os.O_CREATE|os.O_RDWR, 0644)
120
121
	if err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
122
	}
123
	defer file.Close()
124

125
	file.Truncate(b.Total)
Michael Yang's avatar
Michael Yang committed
126

Michael Yang's avatar
Michael Yang committed
127
	g, ctx := errgroup.WithContext(ctx)
128
	// TODO(mxyng): download concurrency should be configurable
Michael Yang's avatar
Michael Yang committed
129
	g.SetLimit(64)
130
131
	for i := range b.Parts {
		part := b.Parts[i]
Michael Yang's avatar
Michael Yang committed
132
133
134
		if part.Completed == part.Size {
			continue
		}
135

Michael Yang's avatar
Michael Yang committed
136
137
138
		i := i
		g.Go(func() error {
			for try := 0; try < maxRetries; try++ {
139
140
				w := io.NewOffsetWriter(file, part.StartsAt())
				err := b.downloadChunk(ctx, requestURL, w, part, opts)
141
142
143
144
145
				switch {
				case errors.Is(err, context.Canceled):
					return err
				case err != nil:
					log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], i, try, err)
Michael Yang's avatar
Michael Yang committed
146
					continue
147
148
				default:
					return nil
Michael Yang's avatar
Michael Yang committed
149
150
151
152
153
				}
			}

			return errors.New("max retries exceeded")
		})
154
155
	}

Michael Yang's avatar
Michael Yang committed
156
157
	if err := g.Wait(); err != nil {
		return err
158
159
	}

160
161
	// explicitly close the file so we can rename it
	if err := file.Close(); err != nil {
Michael Yang's avatar
Michael Yang committed
162
163
164
		return err
	}

165
	for i := range b.Parts {
166
		if err := os.Remove(file.Name() + "-" + strconv.Itoa(i)); err != nil {
Michael Yang's avatar
Michael Yang committed
167
168
			return err
		}
169
170
	}

171
	return os.Rename(file.Name(), b.Name)
172
173
}

174
func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w io.Writer, part *blobDownloadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
175
	headers := make(http.Header)
176
	headers.Set("Range", fmt.Sprintf("bytes=%d-%d", part.StartsAt(), part.StopsAt()-1))
177
	resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
178
179
180
181
	if err != nil {
		return err
	}
	defer resp.Body.Close()
182

183
	n, err := io.Copy(w, io.TeeReader(resp.Body, b))
Michael Yang's avatar
Michael Yang committed
184
	if err != nil && !errors.Is(err, context.Canceled) {
185
186
		// rollback progress
		b.Completed.Add(-n)
Michael Yang's avatar
Michael Yang committed
187
188
		return err
	}
189

Michael Yang's avatar
Michael Yang committed
190
	part.Completed += n
Michael Yang's avatar
Michael Yang committed
191
	if err := b.writePart(part.Name(), part); err != nil {
Michael Yang's avatar
Michael Yang committed
192
193
194
195
196
		return err
	}

	// return nil or context.Canceled
	return err
197
198
}

Michael Yang's avatar
Michael Yang committed
199
200
201
202
203
204
205
206
207
208
func (b *blobDownload) newPart(offset, size int64) error {
	part := blobDownloadPart{blobDownload: b, Offset: offset, Size: size, N: len(b.Parts)}
	if err := b.writePart(part.Name(), &part); err != nil {
		return err
	}

	b.Parts = append(b.Parts, &part)
	return nil
}

209
210
211
212
213
214
215
216
217
218
219
func (b *blobDownload) readPart(partName string) (*blobDownloadPart, error) {
	var part blobDownloadPart
	partFile, err := os.Open(partName)
	if err != nil {
		return nil, err
	}
	defer partFile.Close()

	if err := json.NewDecoder(partFile).Decode(&part); err != nil {
		return nil, err
	}
220

Michael Yang's avatar
Michael Yang committed
221
	part.blobDownload = b
222
	return &part, nil
Michael Yang's avatar
Michael Yang committed
223
224
}

225
226
func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error {
	partFile, err := os.OpenFile(partName, os.O_CREATE|os.O_RDWR|os.O_TRUNC, 0644)
Michael Yang's avatar
Michael Yang committed
227
228
	if err != nil {
		return err
229
	}
Michael Yang's avatar
Michael Yang committed
230
	defer partFile.Close()
231

Michael Yang's avatar
Michael Yang committed
232
	return json.NewEncoder(partFile).Encode(part)
233
}
234
235
236
237
238
239
240

func (b *blobDownload) Write(p []byte) (n int, err error) {
	n = len(p)
	b.Completed.Add(int64(n))
	return n, nil
}

Michael Yang's avatar
Michael Yang committed
241
242
243
244
245
246
247
248
249
250
func (b *blobDownload) acquire() {
	b.references.Add(1)
}

func (b *blobDownload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

251
func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
252
253
	b.acquire()
	defer b.release()
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
		})

		if b.Completed.Load() >= b.Total {
271
272
273
274
			// wait for the file to get renamed
			if _, err := os.Stat(b.Name); err == nil {
				return nil
			}
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
		}
	}
}

type downloadOpts struct {
	mp      ModelPath
	digest  string
	regOpts *RegistryOptions
	fn      func(api.ProgressResponse)
}

const maxRetries = 3

// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
	if err != nil {
		return err
	}

	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
		opts.fn(api.ProgressResponse{
			Status:    fmt.Sprintf("downloading %s", opts.digest),
			Digest:    opts.digest,
			Total:     fi.Size(),
			Completed: fi.Size(),
		})

		return nil
	}

Michael Yang's avatar
names  
Michael Yang committed
311
312
	data, ok := blobDownloadManager.LoadOrStore(opts.digest, &blobDownload{Name: fp, Digest: opts.digest})
	download := data.(*blobDownload)
313
314
315
	if !ok {
		requestURL := opts.mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
Michael Yang's avatar
names  
Michael Yang committed
316
		if err := download.Prepare(ctx, requestURL, opts.regOpts); err != nil {
317
			blobDownloadManager.Delete(opts.digest)
318
319
320
			return err
		}

Michael Yang's avatar
names  
Michael Yang committed
321
		go download.Run(context.Background(), requestURL, opts.regOpts)
322
323
	}

Michael Yang's avatar
names  
Michael Yang committed
324
	return download.Wait(ctx, opts.fn)
325
}