upload.go 9.04 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
	"io"
10
	"log/slog"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
11
	"math"
Michael Yang's avatar
Michael Yang committed
12
13
14
	"net/http"
	"net/url"
	"os"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
	"github.com/jmorganca/ollama/format"
Michael Yang's avatar
Michael Yang committed
21
22
)

Michael Yang's avatar
Michael Yang committed
23
24
25
26
27
28
29
30
31
32
33
34
35
36
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

37
38
	file *os.File

Michael Yang's avatar
Michael Yang committed
39
40
41
42
43
	done       bool
	err        error
	references atomic.Int32
}

44
const (
Michael Yang's avatar
Michael Yang committed
45
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
46
47
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
48
49
)

Michael Yang's avatar
Michael Yang committed
50
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *registryOptions) error {
Michael Yang's avatar
Michael Yang committed
51
52
53
54
55
56
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
57
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
58
		values.Add("mount", b.Digest)
59
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
60
61
62
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
63
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
64
	if err != nil {
Michael Yang's avatar
Michael Yang committed
65
		return err
Michael Yang's avatar
Michael Yang committed
66
67
68
	}
	defer resp.Body.Close()

69
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
70
	if location == "" {
71
72
73
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
74
	fi, err := os.Stat(p)
75
	if err != nil {
Michael Yang's avatar
Michael Yang committed
76
		return err
Michael Yang's avatar
Michael Yang committed
77
78
	}

Michael Yang's avatar
Michael Yang committed
79
80
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
81
82
83
84
85
86
87
88
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
89
	size := b.Total / numUploadParts
Michael Yang's avatar
Michael Yang committed
90
91
92
93
94
95
96
97
98
99
100
101
102
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
103
		// set part.N to the current number of parts
104
		b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
105
106
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
107

108
	slog.Info(fmt.Sprintf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size)))
Michael Yang's avatar
Michael Yang committed
109

Michael Yang's avatar
Michael Yang committed
110
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
111
112
113
114
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
115
116
117
118
119
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
120
121
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
122
func (b *blobUpload) Run(ctx context.Context, opts *registryOptions) {
Michael Yang's avatar
Michael Yang committed
123
124
125
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

126
127
128
129
130
131
132
133
134
135
136
137
138
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		b.err = err
		return
	}

	b.file, err = os.Open(p)
	if err != nil {
		b.err = err
		return
	}
	defer b.file.Close()

Michael Yang's avatar
Michael Yang committed
139
140
141
	var limit int64 = 2
	g, inner := NewLimitGroup(ctx, numUploadParts, limit)
	go watchDelta(inner, g, &b.Completed, limit)
Michael Yang's avatar
Michael Yang committed
142

Michael Yang's avatar
Michael Yang committed
143
144
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
145
146
		select {
		case <-inner.Done():
Michael Yang's avatar
Michael Yang committed
147
			break
Michael Yang's avatar
Michael Yang committed
148
		case requestURL := <-b.nextURL:
Michael Yang's avatar
lint  
Michael Yang committed
149
			g.Go(inner, func() error {
Michael Yang's avatar
Michael Yang committed
150
				var err error
Michael Yang's avatar
Michael Yang committed
151
				for try := 0; try < maxRetries; try++ {
152
					err = b.uploadPart(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
153
154
155
156
157
158
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
159
						sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
160
						slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Michael Yang's avatar
Michael Yang committed
161
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
162
163
164
165
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
166
167
				}

Michael Yang's avatar
Michael Yang committed
168
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
169
170
			})
		}
Michael Yang's avatar
Michael Yang committed
171
	}
172

Michael Yang's avatar
Michael Yang committed
173
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
174
175
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
176
177
	}

Michael Yang's avatar
Michael Yang committed
178
179
	requestURL := <-b.nextURL

180
	// calculate md5 checksum and add it to the commit request
181
	md5sum := md5.New()
182
	for _, part := range b.Parts {
183
		md5sum.Write(part.Sum(nil))
184
185
	}

Michael Yang's avatar
Michael Yang committed
186
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
187
	values.Add("digest", b.Digest)
188
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum.Sum(nil), len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
189
190
191
192
193
194
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
195
	for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
196
197
198
199
200
		var resp *http.Response
		resp, err = makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if errors.Is(err, context.Canceled) {
			break
		} else if err != nil {
Jeffrey Morgan's avatar
Jeffrey Morgan committed
201
			sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
202
			slog.Info(fmt.Sprintf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep))
Michael Yang's avatar
Michael Yang committed
203
204
205
206
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
207
		break
Michael Yang's avatar
Michael Yang committed
208
	}
Michael Yang's avatar
Michael Yang committed
209
210
211

	b.err = err
	b.done = true
Michael Yang's avatar
Michael Yang committed
212
}
Michael Yang's avatar
Michael Yang committed
213

Michael Yang's avatar
Michael Yang committed
214
func (b *blobUpload) uploadPart(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *registryOptions) error {
Michael Yang's avatar
Michael Yang committed
215
216
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
217
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
218
219

	if method == http.MethodPatch {
220
		headers.Set("X-Redirect-Uploads", "1")
Michael Yang's avatar
Michael Yang committed
221
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
222
	}
Michael Yang's avatar
Michael Yang committed
223

224
	sr := io.NewSectionReader(b.file, part.Offset, part.Size)
225
226
227
228

	md5sum := md5.New()
	w := &progressWriter{blobUpload: b}

Michael Yang's avatar
Michael Yang committed
229
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, io.MultiWriter(w, md5sum)), opts)
Michael Yang's avatar
Michael Yang committed
230
	if err != nil {
231
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
232
233
234
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
235

Michael Yang's avatar
Michael Yang committed
236
237
238
239
240
241
242
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
243
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
244
245
246
247
248
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
249
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
250
251
252
253
254
255
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
256

Jeffrey Morgan's avatar
Jeffrey Morgan committed
257
		// retry uploading to the redirect URL
Michael Yang's avatar
Michael Yang committed
258
		for try := 0; try < maxRetries; try++ {
259
			err = b.uploadPart(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
260
261
262
263
264
265
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
266
				sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
267
				slog.Info(fmt.Sprintf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep))
Jeffrey Morgan's avatar
Jeffrey Morgan committed
268
				time.Sleep(sleep)
269
270
271
				continue
			}

Michael Yang's avatar
Michael Yang committed
272
273
274
			return nil
		}

Michael Yang's avatar
Michael Yang committed
275
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
276

Michael Yang's avatar
Michael Yang committed
277
	case resp.StatusCode == http.StatusUnauthorized:
278
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
279
280
		challenge := parseRegistryChallenge(resp.Header.Get("www-authenticate"))
		token, err := getAuthorizationToken(ctx, challenge)
Michael Yang's avatar
Michael Yang committed
281
282
283
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
284

Michael Yang's avatar
Michael Yang committed
285
286
287
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
288
		w.Rollback()
Michael Yang's avatar
Michael Yang committed
289
290
291
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
292
293
		}

294
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
295
296
	}

Michael Yang's avatar
Michael Yang committed
297
298
299
300
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

Michael Yang's avatar
Michael Yang committed
301
	part.Hash = md5sum
Michael Yang's avatar
Michael Yang committed
302
	return nil
Michael Yang's avatar
Michael Yang committed
303
304
}

Michael Yang's avatar
Michael Yang committed
305
306
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
307
308
}

Michael Yang's avatar
Michael Yang committed
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
328
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
329
330
331
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
332
333
		})

Michael Yang's avatar
Michael Yang committed
334
335
336
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
337
	}
Michael Yang's avatar
Michael Yang committed
338
}
Michael Yang's avatar
Michael Yang committed
339

Michael Yang's avatar
Michael Yang committed
340
341
type blobUploadPart struct {
	// N is the part number
342
343
344
345
346
	N      int
	Offset int64
	Size   int64
	hash.Hash
}
347

348
349
type progressWriter struct {
	written int64
Michael Yang's avatar
Michael Yang committed
350
351
352
	*blobUpload
}

353
func (p *progressWriter) Write(b []byte) (n int, err error) {
Michael Yang's avatar
Michael Yang committed
354
355
356
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
357
358
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
359

360
361
func (p *progressWriter) Rollback() {
	p.Completed.Add(-p.written)
Michael Yang's avatar
Michael Yang committed
362
363
364
	p.written = 0
}

Michael Yang's avatar
Michael Yang committed
365
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *registryOptions, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
366
367
368
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
369
370
371
372
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
373
		return err
Michael Yang's avatar
Michael Yang committed
374
375
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
376
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
377
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

Michael Yang's avatar
Michael Yang committed
396
		// nolint: contextcheck
Michael Yang's avatar
Michael Yang committed
397
398
399
400
401
		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}