upload.go 8.82 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
10
	"io"
	"log"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
11
	"math"
Michael Yang's avatar
Michael Yang committed
12
13
14
	"net/http"
	"net/url"
	"os"
15
	"strings"
Michael Yang's avatar
Michael Yang committed
16
	"sync"
Michael Yang's avatar
Michael Yang committed
17
18
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
19
20

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
21
22
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
23
24
)

Michael Yang's avatar
Michael Yang committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

39
40
	file *os.File

Michael Yang's avatar
Michael Yang committed
41
42
43
44
45
	done       bool
	err        error
	references atomic.Int32
}

46
const (
Michael Yang's avatar
Michael Yang committed
47
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
48
49
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
50
51
)

Michael Yang's avatar
Michael Yang committed
52
53
54
55
56
57
58
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
59
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
60
		values.Add("mount", b.Digest)
61
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
62
63
64
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
65
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
66
	if err != nil {
Michael Yang's avatar
Michael Yang committed
67
		return err
Michael Yang's avatar
Michael Yang committed
68
69
70
	}
	defer resp.Body.Close()

71
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
72
	if location == "" {
73
74
75
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
76
	fi, err := os.Stat(p)
77
	if err != nil {
Michael Yang's avatar
Michael Yang committed
78
		return err
Michael Yang's avatar
Michael Yang committed
79
80
	}

Michael Yang's avatar
Michael Yang committed
81
82
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
83
84
85
86
87
88
89
90
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
105
		// set part.N to the current number of parts
106
		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Hash: md5.New(), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
107
108
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
109

Michael Yang's avatar
fix log  
Michael Yang committed
110
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
111

Michael Yang's avatar
Michael Yang committed
112
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
113
114
115
116
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
117
118
119
120
121
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
122
123
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
124
125
126
127
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

128
129
130
131
132
133
134
135
136
137
138
139
140
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		b.err = err
		return
	}

	b.file, err = os.Open(p)
	if err != nil {
		b.err = err
		return
	}
	defer b.file.Close()

Michael Yang's avatar
Michael Yang committed
141
142
143
144
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
145
146
147
148
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
149
				var err error
Michael Yang's avatar
Michael Yang committed
150
				for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
151
					err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
152
153
154
155
156
157
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
158
						part.Reset()
Jeffrey Morgan's avatar
Jeffrey Morgan committed
159
						sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
160
161
						log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
162
163
164
165
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
166
167
				}

Michael Yang's avatar
Michael Yang committed
168
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
169
170
			})
		}
Michael Yang's avatar
Michael Yang committed
171
	}
172

Michael Yang's avatar
Michael Yang committed
173
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
174
175
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
176
177
	}

Michael Yang's avatar
Michael Yang committed
178
179
	requestURL := <-b.nextURL

180
181
182
183
184
185
186
	var sb strings.Builder
	for _, part := range b.Parts {
		sb.Write(part.Sum(nil))
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
187
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
188
	values.Add("digest", b.Digest)
189
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
190
191
192
193
194
195
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
196
197
198
199
	for try := 0; try < maxRetries; try++ {
		resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if err != nil {
			b.err = err
200
201
202
203
			if errors.Is(err, context.Canceled) {
				return
			}

Jeffrey Morgan's avatar
Jeffrey Morgan committed
204
			sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
205
206
207
208
209
210
211
212
			log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep)
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()

		b.err = nil
		b.done = true
Michael Yang's avatar
Michael Yang committed
213
		return
Michael Yang's avatar
Michael Yang committed
214
215
	}
}
Michael Yang's avatar
Michael Yang committed
216

Michael Yang's avatar
Michael Yang committed
217
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
218
219
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
220
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
221
222

	if method == http.MethodPatch {
223
		headers.Set("X-Redirect-Uploads", "1")
Michael Yang's avatar
Michael Yang committed
224
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
225
	}
Michael Yang's avatar
Michael Yang committed
226

227
228
	sr := io.NewSectionReader(b.file, part.Offset, part.Size)
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, part), opts)
Michael Yang's avatar
Michael Yang committed
229
230
231
232
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
233

Michael Yang's avatar
Michael Yang committed
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
252

Jeffrey Morgan's avatar
Jeffrey Morgan committed
253
		// retry uploading to the redirect URL
Michael Yang's avatar
Michael Yang committed
254
		for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
255
			err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
256
257
258
259
260
261
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
262
				part.Reset()
Jeffrey Morgan's avatar
Jeffrey Morgan committed
263
264
265
				sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
				log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
				time.Sleep(sleep)
266
267
268
				continue
			}

Michael Yang's avatar
Michael Yang committed
269
270
271
			return nil
		}

Michael Yang's avatar
Michael Yang committed
272
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
273

Michael Yang's avatar
Michael Yang committed
274
275
276
277
278
279
280
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
281

Michael Yang's avatar
Michael Yang committed
282
283
284
285
286
287
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
288
289
		}

290
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
291
292
	}

Michael Yang's avatar
Michael Yang committed
293
294
295
296
297
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
298
299
}

Michael Yang's avatar
Michael Yang committed
300
301
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
302
303
}

Michael Yang's avatar
Michael Yang committed
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
323
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
324
325
326
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
327
328
		})

Michael Yang's avatar
Michael Yang committed
329
330
331
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
332
	}
Michael Yang's avatar
Michael Yang committed
333
}
Michael Yang's avatar
Michael Yang committed
334

Michael Yang's avatar
Michael Yang committed
335
336
337
338
339
340
341
type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
	hash.Hash

Michael Yang's avatar
Michael Yang committed
342
	written int64
Michael Yang's avatar
Michael Yang committed
343

Michael Yang's avatar
Michael Yang committed
344
345
346
	*blobUpload
}

Michael Yang's avatar
Michael Yang committed
347
348
349
350
func (p *blobUploadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
351
	p.Hash.Write(b)
Michael Yang's avatar
Michael Yang committed
352
353
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
354

Michael Yang's avatar
Michael Yang committed
355
356
357
func (p *blobUploadPart) Reset() {
	p.Completed.Add(-int64(p.written))
	p.written = 0
358
	p.Hash.Reset()
Michael Yang's avatar
Michael Yang committed
359
360
}

Michael Yang's avatar
Michael Yang committed
361
362
363
364
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
365
366
367
368
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
369
		return err
Michael Yang's avatar
Michael Yang committed
370
371
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
372
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
373
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}