upload.go 8.95 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
8
9
	"errors"
	"fmt"
	"io"
	"log"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
10
	"math"
Michael Yang's avatar
Michael Yang committed
11
12
13
	"net/http"
	"net/url"
	"os"
14
	"strings"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
21
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
22
23
)

Michael Yang's avatar
Michael Yang committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

38
39
	file *os.File

Michael Yang's avatar
Michael Yang committed
40
41
42
43
44
	done       bool
	err        error
	references atomic.Int32
}

45
const (
Michael Yang's avatar
Michael Yang committed
46
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
47
48
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
49
50
)

Michael Yang's avatar
Michael Yang committed
51
52
53
54
55
56
57
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
58
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
59
		values.Add("mount", b.Digest)
60
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
61
62
63
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
64
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
65
	if err != nil {
Michael Yang's avatar
Michael Yang committed
66
		return err
Michael Yang's avatar
Michael Yang committed
67
68
69
	}
	defer resp.Body.Close()

70
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
71
	if location == "" {
72
73
74
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
75
	fi, err := os.Stat(p)
76
	if err != nil {
Michael Yang's avatar
Michael Yang committed
77
		return err
Michael Yang's avatar
Michael Yang committed
78
79
	}

Michael Yang's avatar
Michael Yang committed
80
81
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
82
83
84
85
86
87
88
89
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
90
91
92
93
94
95
96
97
98
99
100
101
102
103
	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
104
		// set part.N to the current number of parts
105
		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
106
107
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
108

Michael Yang's avatar
fix log  
Michael Yang committed
109
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
110

Michael Yang's avatar
Michael Yang committed
111
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
112
113
114
115
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
116
117
118
119
120
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
121
122
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
123
124
125
126
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

127
128
129
130
131
132
133
134
135
136
137
138
139
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		b.err = err
		return
	}

	b.file, err = os.Open(p)
	if err != nil {
		b.err = err
		return
	}
	defer b.file.Close()

Michael Yang's avatar
Michael Yang committed
140
141
142
143
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
144
145
146
147
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
148
				var err error
Michael Yang's avatar
Michael Yang committed
149
				for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
150
					err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
151
152
153
154
155
156
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
157
						part.Reset()
Jeffrey Morgan's avatar
Jeffrey Morgan committed
158
						sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
159
160
						log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
161
162
163
164
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
165
166
				}

Michael Yang's avatar
Michael Yang committed
167
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
168
169
			})
		}
Michael Yang's avatar
Michael Yang committed
170
	}
171

Michael Yang's avatar
Michael Yang committed
172
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
173
174
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
175
176
	}

Michael Yang's avatar
Michael Yang committed
177
178
	requestURL := <-b.nextURL

179
	var sb strings.Builder
180
181

	// calculate md5 checksum and add it to the commit request
182
	for _, part := range b.Parts {
183
184
185
186
187
188
189
		hash := md5.New()
		if _, err := io.Copy(hash, io.NewSectionReader(b.file, part.Offset, part.Size)); err != nil {
			b.err = err
			return
		}

		sb.Write(hash.Sum(nil))
190
191
192
193
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
194
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
195
	values.Add("digest", b.Digest)
196
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
197
198
199
200
201
202
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
203
204
205
206
	for try := 0; try < maxRetries; try++ {
		resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if err != nil {
			b.err = err
207
208
209
210
			if errors.Is(err, context.Canceled) {
				return
			}

Jeffrey Morgan's avatar
Jeffrey Morgan committed
211
			sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
212
213
214
215
216
217
218
219
			log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep)
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()

		b.err = nil
		b.done = true
Michael Yang's avatar
Michael Yang committed
220
		return
Michael Yang's avatar
Michael Yang committed
221
222
	}
}
Michael Yang's avatar
Michael Yang committed
223

Michael Yang's avatar
Michael Yang committed
224
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
225
226
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
227
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
228
229

	if method == http.MethodPatch {
230
		headers.Set("X-Redirect-Uploads", "1")
Michael Yang's avatar
Michael Yang committed
231
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
232
	}
Michael Yang's avatar
Michael Yang committed
233

234
235
	sr := io.NewSectionReader(b.file, part.Offset, part.Size)
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, part), opts)
Michael Yang's avatar
Michael Yang committed
236
237
238
239
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
240

Michael Yang's avatar
Michael Yang committed
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
259

Jeffrey Morgan's avatar
Jeffrey Morgan committed
260
		// retry uploading to the redirect URL
Michael Yang's avatar
Michael Yang committed
261
		for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
262
			err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
263
264
265
266
267
268
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
269
				part.Reset()
Jeffrey Morgan's avatar
Jeffrey Morgan committed
270
271
272
				sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
				log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
				time.Sleep(sleep)
273
274
275
				continue
			}

Michael Yang's avatar
Michael Yang committed
276
277
278
			return nil
		}

Michael Yang's avatar
Michael Yang committed
279
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
280

Michael Yang's avatar
Michael Yang committed
281
282
283
284
285
286
287
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
288

Michael Yang's avatar
Michael Yang committed
289
290
291
292
293
294
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
295
296
		}

297
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
298
299
	}

Michael Yang's avatar
Michael Yang committed
300
301
302
303
304
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
305
306
}

Michael Yang's avatar
Michael Yang committed
307
308
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
309
310
}

Michael Yang's avatar
Michael Yang committed
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
330
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
331
332
333
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
334
335
		})

Michael Yang's avatar
Michael Yang committed
336
337
338
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
339
	}
Michael Yang's avatar
Michael Yang committed
340
}
Michael Yang's avatar
Michael Yang committed
341

Michael Yang's avatar
Michael Yang committed
342
343
type blobUploadPart struct {
	// N is the part number
344
345
346
	N       int
	Offset  int64
	Size    int64
Michael Yang's avatar
Michael Yang committed
347
348
349
350
	written int64
	*blobUpload
}

Michael Yang's avatar
Michael Yang committed
351
352
353
354
func (p *blobUploadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
355
356
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
357

Michael Yang's avatar
Michael Yang committed
358
359
360
361
362
func (p *blobUploadPart) Reset() {
	p.Completed.Add(-int64(p.written))
	p.written = 0
}

Michael Yang's avatar
Michael Yang committed
363
364
365
366
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
367
368
369
370
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
371
		return err
Michael Yang's avatar
Michael Yang committed
372
373
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
374
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
375
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}