upload.go 8.7 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
10
	"io"
	"log"
Jeffrey Morgan's avatar
Jeffrey Morgan committed
11
	"math"
Michael Yang's avatar
Michael Yang committed
12
13
14
	"net/http"
	"net/url"
	"os"
15
	"strings"
Michael Yang's avatar
Michael Yang committed
16
	"sync"
Michael Yang's avatar
Michael Yang committed
17
18
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
19
20

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
21
22
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
23
24
)

Michael Yang's avatar
Michael Yang committed
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

	done       bool
	err        error
	references atomic.Int32
}

44
const (
Michael Yang's avatar
Michael Yang committed
45
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
46
47
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
48
49
)

Michael Yang's avatar
Michael Yang committed
50
51
52
53
54
55
56
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
57
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
58
		values.Add("mount", b.Digest)
59
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
60
61
62
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
63
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
64
	if err != nil {
Michael Yang's avatar
Michael Yang committed
65
		return err
Michael Yang's avatar
Michael Yang committed
66
67
68
	}
	defer resp.Body.Close()

69
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
70
	if location == "" {
71
72
73
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
74
	fi, err := os.Stat(p)
75
	if err != nil {
Michael Yang's avatar
Michael Yang committed
76
		return err
Michael Yang's avatar
Michael Yang committed
77
78
	}

Michael Yang's avatar
Michael Yang committed
79
80
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
81
82
83
84
85
86
87
88
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
89
90
91
92
93
94
95
96
97
98
99
100
101
102
	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
103
		// set part.N to the current number of parts
Michael Yang's avatar
Michael Yang committed
104
		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
105
106
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
107

Michael Yang's avatar
fix log  
Michael Yang committed
108
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
109

Michael Yang's avatar
Michael Yang committed
110
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
111
112
113
114
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
115
116
117
118
119
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
120
121
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
122
123
124
125
126
127
128
129
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
130
131
132
133
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
134
				var err error
Michael Yang's avatar
Michael Yang committed
135
				for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
136
					err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
137
138
139
140
141
142
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
143
						sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
144
145
						log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
146
147
148
149
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
150
151
				}

Michael Yang's avatar
Michael Yang committed
152
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
153
154
			})
		}
Michael Yang's avatar
Michael Yang committed
155
	}
156

Michael Yang's avatar
Michael Yang committed
157
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
158
159
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
160
161
	}

Michael Yang's avatar
Michael Yang committed
162
163
	requestURL := <-b.nextURL

164
165
166
167
168
169
170
	var sb strings.Builder
	for _, part := range b.Parts {
		sb.Write(part.Sum(nil))
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
171
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
172
	values.Add("digest", b.Digest)
173
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
174
175
176
177
178
179
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
180
181
182
183
	for try := 0; try < maxRetries; try++ {
		resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if err != nil {
			b.err = err
Jeffrey Morgan's avatar
Jeffrey Morgan committed
184
			sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
Michael Yang's avatar
Michael Yang committed
185
186
187
188
189
190
191
192
			log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep)
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()

		b.err = nil
		b.done = true
Michael Yang's avatar
Michael Yang committed
193
		return
Michael Yang's avatar
Michael Yang committed
194
195
	}
}
Michael Yang's avatar
Michael Yang committed
196

Michael Yang's avatar
Michael Yang committed
197
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
198
199
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
200
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
201
202

	if method == http.MethodPatch {
203
		headers.Set("X-Redirect-Uploads", "1")
Michael Yang's avatar
Michael Yang committed
204
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
205
	}
Michael Yang's avatar
Michael Yang committed
206

207
208
209
210
211
212
213
214
215
216
217
218
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	f, err := os.Open(p)
	if err != nil {
		return err
	}
	defer f.Close()

	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(io.NewSectionReader(f, part.Offset, part.Size), part), opts)
Michael Yang's avatar
Michael Yang committed
219
220
221
222
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
223

Michael Yang's avatar
Michael Yang committed
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
242

Jeffrey Morgan's avatar
Jeffrey Morgan committed
243
		// retry uploading to the redirect URL
Michael Yang's avatar
Michael Yang committed
244
		for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
245
			err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
246
247
248
249
250
251
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
Jeffrey Morgan's avatar
Jeffrey Morgan committed
252
253
254
				sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
				log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
				time.Sleep(sleep)
255
256
257
				continue
			}

Michael Yang's avatar
Michael Yang committed
258
259
260
			return nil
		}

Michael Yang's avatar
Michael Yang committed
261
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
262

Michael Yang's avatar
Michael Yang committed
263
264
265
266
267
268
269
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
270

Michael Yang's avatar
Michael Yang committed
271
272
273
274
275
276
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
277
278
		}

279
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
280
281
	}

Michael Yang's avatar
Michael Yang committed
282
283
284
285
286
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
287
288
}

Michael Yang's avatar
Michael Yang committed
289
290
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
291
292
}

Michael Yang's avatar
Michael Yang committed
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
312
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
313
314
315
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
316
317
		})

Michael Yang's avatar
Michael Yang committed
318
319
320
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
321
	}
Michael Yang's avatar
Michael Yang committed
322
}
Michael Yang's avatar
Michael Yang committed
323

Michael Yang's avatar
Michael Yang committed
324
325
326
327
328
329
330
type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
	hash.Hash

Michael Yang's avatar
Michael Yang committed
331
	written int64
Michael Yang's avatar
Michael Yang committed
332

Michael Yang's avatar
Michael Yang committed
333
334
335
	*blobUpload
}

Michael Yang's avatar
Michael Yang committed
336
337
338
339
func (p *blobUploadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
340
341
342
343
344
345
346

	if p.Hash == nil {
		p.Hash = md5.New()
	}

	p.Hash.Write(b)

Michael Yang's avatar
Michael Yang committed
347
348
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
349

Michael Yang's avatar
Michael Yang committed
350
351
352
func (p *blobUploadPart) Reset() {
	p.Completed.Add(-int64(p.written))
	p.written = 0
353
	p.Hash.Reset()
Michael Yang's avatar
Michael Yang committed
354
355
}

Michael Yang's avatar
Michael Yang committed
356
357
358
359
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
360
361
362
363
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
364
		return err
Michael Yang's avatar
Michael Yang committed
365
366
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
367
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
368
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}