upload.go 9.6 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
	"io"
10
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
11
	"math"
Michael Yang's avatar
Michael Yang committed
12
13
14
	"net/http"
	"net/url"
	"os"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
	"github.com/jmorganca/ollama/format"
Michael Yang's avatar
Michael Yang committed
21
22
)

Michael Yang's avatar
Michael Yang committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

37
38
	file *os.File

Michael Yang's avatar
Michael Yang committed
39
40
41
42
43
	done       bool
	err        error
	references atomic.Int32
}

44
const (
Michael Yang's avatar
Michael Yang committed
45
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
46
47
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
48
49
)

Michael Yang's avatar
Michael Yang committed
50
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
Michael Yang's avatar
Michael Yang committed
51
52
53
54
55
56
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
57
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
58
		values.Add("mount", b.Digest)
59
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
60
61
62
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
63
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
64
	if err != nil {
Michael Yang's avatar
Michael Yang committed
65
		return err
Michael Yang's avatar
Michael Yang committed
66
67
68
	}
	defer resp.Body.Close()

69
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
70
	if location == "" {
71
72
73
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
74
	fi, err := os.Stat(p)
75
	if err != nil {
Michael Yang's avatar
Michael Yang committed
76
		return err
Michael Yang's avatar
Michael Yang committed
77
78
	}

Michael Yang's avatar
Michael Yang committed
79
80
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
81
82
83
84
85
86
87
88
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
89
	size := b.Total / numUploadParts
Michael Yang's avatar
Michael Yang committed
90
91
92
93
94
95
96
97
98
99
100
101
102
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
103
		// set part.N to the current number of parts
104
		b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
105
106
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
107

108
	slog.Info(fmt.Sprintf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
Michael Yang's avatar
Michael Yang committed
109

Michael Yang's avatar
Michael Yang committed
110
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
111
112
113
114
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
115
116
117
118
119
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
120
121
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
122
func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) {
Michael Yang's avatar
Michael Yang committed
123
124
125
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

126
127
128
129
130
131
132
133
134
135
136
137
138
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		b.err = err
		return
	}

	b.file, err = os.Open(p)
	if err != nil {
		b.err = err
		return
	}
	defer b.file.Close()

Michael Yang's avatar
Michael Yang committed
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
	g, inner := NewLimitGroup(ctx, numUploadParts)

	go func() {
		ticker := time.NewTicker(time.Second)
		var n int64 = 1
		var maxDelta float64
		var buckets []int64
		for {
			select {
			case <-ticker.C:
				buckets = append(buckets, b.Completed.Load())
				if len(buckets) < 2 {
					continue
				} else if len(buckets) > 10 {
					buckets = buckets[1:]
				}

				delta := float64((buckets[len(buckets)-1] - buckets[0])) / float64(len(buckets))
				slog.Debug(fmt.Sprintf("delta: %s/s max_delta: %s/s", format.HumanBytes(int64(delta)), format.HumanBytes(int64(maxDelta))))
				if delta > maxDelta*1.5 {
					maxDelta = delta
					g.SetLimit(n)
					n++
				}

			case <-ctx.Done():
				return
			}
		}
	}()

Michael Yang's avatar
Michael Yang committed
170
171
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
172
173
174
175
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
176
				var err error
Michael Yang's avatar
Michael Yang committed
177
				for try := 0; try < maxRetries; try++ {
178
					err = b.uploadPart(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
179
180
181
182
183
184
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
185
						sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
186
						slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
187
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
188
189
190
191
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
192
193
				}

Michael Yang's avatar
Michael Yang committed
194
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
195
196
			})
		}
Michael Yang's avatar
Michael Yang committed
197
	}
198

Michael Yang's avatar
Michael Yang committed
199
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
200
201
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
202
203
	}

Michael Yang's avatar
Michael Yang committed
204
205
	requestURL := <-b.nextURL

206
	// calculate md5 checksum and add it to the commit request
207
	md5sum := md5.New()
208
	for _, part := range b.Parts {
209
		md5sum.Write(part.Sum(nil))
210
211
	}

Michael Yang's avatar
Michael Yang committed
212
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
213
	values.Add("digest", b.Digest)
214
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum.Sum(nil), len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
215
216
217
218
219
220
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
221
	for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
222
223
224
225
226
		var resp *http.Response
		resp, err = makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if errors.Is(err, context.Canceled) {
			break
		} else if err != nil {
Jeffrey Morgan's avatar
Jeffrey Morgan committed
227
			sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
228
			slog.Info(fmt.Sprintf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep))
Michael Yang's avatar
Michael Yang committed
229
230
231
232
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
233
		break
Michael Yang's avatar
Michael Yang committed
234
	}
Michael Yang's avatar
Michael Yang committed
235
236
237

	b.err = err
	b.done = true
Michael Yang's avatar
Michael Yang committed
238
}
Michael Yang's avatar
Michael Yang committed
239

Michael Yang's avatar
Michael Yang committed
240
func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *registryOptions) error {
Michael Yang's avatar
Michael Yang committed
241
242
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
243
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
244
245

	if method == http.MethodPatch {
246
		headers.Set("X-Redirect-Uploads", "1")
Michael Yang's avatar
Michael Yang committed
247
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
248
	}
Michael Yang's avatar
Michael Yang committed
249

250
	sr := io.NewSectionReader(b.file, part.Offset, part.Size)
251
252
253
254

	md5sum := md5.New()
	w := &progressWriter{blobUpload: b}

Michael Yang's avatar
Michael Yang committed
255
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(w, md5sum)), opts)
Michael Yang's avatar
Michael Yang committed
256
	if err != nil {
257
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
258
259
260
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
261

Michael Yang's avatar
Michael Yang committed
262
263
264
265
266
267
268
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
269
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
270
271
272
273
274
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
275
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
276
277
278
279
280
281
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
282

Jeffrey Morgan's avatar
Jeffrey Morgan committed
283
		// retry uploading to the redirect URL
Michael Yang's avatar
Michael Yang committed
284
		for try := 0; try < maxRetries; try++ {
285
			err = b.uploadPart(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
286
287
288
289
290
291
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
292
				sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
293
				slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Jeffrey Morgan's avatar
Jeffrey Morgan committed
294
				time.Sleep(sleep)
295
296
297
				continue
			}

Michael Yang's avatar
Michael Yang committed
298
299
300
			return nil
		}

Michael Yang's avatar
Michael Yang committed
301
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
302

Michael Yang's avatar
Michael Yang committed
303
	case resp.StatusCode == http.StatusUnauthorized:
304
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
305
306
		challenge := parseRegistryChallenge(resp.Header.Get("www-authenticate"))
		token, err := getAuthorizationToken(ctx, challenge)
Michael Yang's avatar
Michael Yang committed
307
308
309
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
310

Michael Yang's avatar
Michael Yang committed
311
312
313
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
314
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
315
316
317
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
318
319
		}

320
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
321
322
	}

Michael Yang's avatar
Michael Yang committed
323
324
325
326
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

Michael Yang's avatar
Michael Yang committed
327
	part.Hash = md5sum
Michael Yang's avatar
Michael Yang committed
328
	return nil
Michael Yang's avatar
Michael Yang committed
329
330
}

Michael Yang's avatar
Michael Yang committed
331
332
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
333
334
}

Michael Yang's avatar
Michael Yang committed
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
354
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
355
356
357
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
358
359
		})

Michael Yang's avatar
Michael Yang committed
360
361
362
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
363
	}
Michael Yang's avatar
Michael Yang committed
364
}
Michael Yang's avatar
Michael Yang committed
365

Michael Yang's avatar
Michael Yang committed
366
367
type blobUploadPart struct {
	// N is the part number
368
369
370
371
372
	N      int
	Offset int64
	Size   int64
	hash.Hash
}
373

374
375
type progressWriter struct {
	written int64
Michael Yang's avatar
Michael Yang committed
376
377
378
	*blobUpload
}

379
func (p *progressWriter) Write(b []byte) (n int, err error) {
Michael Yang's avatar
Michael Yang committed
380
381
382
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
383
384
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
385

386
387
func (p *progressWriter) Rollback() {
	p.Completed.Add(-p.written)
Michael Yang's avatar
Michael Yang committed
388
389
390
	p.written = 0
}

Michael Yang's avatar
Michael Yang committed
391
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *registryOptions, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
392
393
394
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
395
396
397
398
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
399
		return err
Michael Yang's avatar
Michael Yang committed
400
401
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
402
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
403
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

Michael Yang's avatar
Michael Yang committed
422
		// nolint: contextcheck
Michael Yang's avatar
Michael Yang committed
423
424
425
426
427
		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}