upload.go 8.63 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
10
11
12
13
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
14
	"strings"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
21
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
22
23
)

Michael Yang's avatar
Michael Yang committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

	done       bool
	err        error
	references atomic.Int32
}

43
const (
Michael Yang's avatar
Michael Yang committed
44
	numUploadParts          = 64
Michael Yang's avatar
Michael Yang committed
45
46
	minUploadPartSize int64 = 100 * format.MegaByte
	maxUploadPartSize int64 = 1000 * format.MegaByte
47
48
)

Michael Yang's avatar
Michael Yang committed
49
50
51
52
53
54
55
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
56
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
57
		values.Add("mount", b.Digest)
58
		values.Add("from", ParseModelPath(b.From).GetNamespaceRepository())
Michael Yang's avatar
Michael Yang committed
59
60
61
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
62
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
63
	if err != nil {
Michael Yang's avatar
Michael Yang committed
64
		return err
Michael Yang's avatar
Michael Yang committed
65
66
67
	}
	defer resp.Body.Close()

68
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
69
	if location == "" {
70
71
72
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
73
	fi, err := os.Stat(p)
74
	if err != nil {
Michael Yang's avatar
Michael Yang committed
75
		return err
Michael Yang's avatar
Michael Yang committed
76
77
	}

Michael Yang's avatar
Michael Yang committed
78
79
	b.Total = fi.Size()

Michael Yang's avatar
Michael Yang committed
80
81
82
83
84
85
86
87
	// http.StatusCreated indicates a blob has been mounted
	// ref: https://distribution.github.io/distribution/spec/api/#cross-repository-blob-mount
	if resp.StatusCode == http.StatusCreated {
		b.Completed.Store(b.Total)
		b.done = true
		return nil
	}

Michael Yang's avatar
Michael Yang committed
88
89
90
91
92
93
94
95
96
97
98
99
100
101
	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
102
		// set part.N to the current number of parts
Michael Yang's avatar
Michael Yang committed
103
		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
104
105
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
106

Michael Yang's avatar
fix log  
Michael Yang committed
107
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
108

Michael Yang's avatar
Michael Yang committed
109
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
110
111
112
113
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
114
115
116
117
118
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
119
120
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
121
122
123
124
125
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

	p, err := GetBlobsPath(b.Digest)
Michael Yang's avatar
Michael Yang committed
126
	if err != nil {
Michael Yang's avatar
Michael Yang committed
127
128
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
129
130
	}

Michael Yang's avatar
Michael Yang committed
131
132
	f, err := os.Open(p)
	if err != nil {
Michael Yang's avatar
Michael Yang committed
133
134
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
135
	}
Michael Yang's avatar
Michael Yang committed
136
	defer f.Close()
Michael Yang's avatar
Michael Yang committed
137

Michael Yang's avatar
Michael Yang committed
138
139
140
141
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
142
143
144
145
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
Michael Yang's avatar
Michael Yang committed
146
				var err error
Michael Yang's avatar
Michael Yang committed
147
				for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
148
					part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size)
Michael Yang's avatar
Michael Yang committed
149
					err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
150
151
152
153
154
155
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
Michael Yang's avatar
Michael Yang committed
156
157
158
						sleep := 200*time.Millisecond + time.Duration(try)*time.Second/4
						log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
						time.Sleep(sleep)
Michael Yang's avatar
Michael Yang committed
159
160
161
162
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
163
164
				}

Michael Yang's avatar
Michael Yang committed
165
				return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
166
167
			})
		}
Michael Yang's avatar
Michael Yang committed
168
	}
169

Michael Yang's avatar
Michael Yang committed
170
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
171
172
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
173
174
	}

Michael Yang's avatar
Michael Yang committed
175
176
	requestURL := <-b.nextURL

177
178
179
180
181
182
183
	var sb strings.Builder
	for _, part := range b.Parts {
		sb.Write(part.Sum(nil))
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
184
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
185
	values.Add("digest", b.Digest)
186
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
187
188
189
190
191
192
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
193
194
195
196
197
198
199
200
201
202
203
204
205
	for try := 0; try < maxRetries; try++ {
		resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
		if err != nil {
			b.err = err
			sleep := 200*time.Millisecond + time.Duration(try)*time.Second/4
			log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep)
			time.Sleep(sleep)
			continue
		}
		defer resp.Body.Close()

		b.err = nil
		b.done = true
Michael Yang's avatar
Michael Yang committed
206
		return
Michael Yang's avatar
Michael Yang committed
207
208
	}
}
Michael Yang's avatar
Michael Yang committed
209

Michael Yang's avatar
Michael Yang committed
210
211
212
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
	part.Reset()

Michael Yang's avatar
Michael Yang committed
213
214
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
215
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
216
217
218
	headers.Set("X-Redirect-Uploads", "1")

	if method == http.MethodPatch {
Michael Yang's avatar
Michael Yang committed
219
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
220
	}
Michael Yang's avatar
Michael Yang committed
221

Michael Yang's avatar
Michael Yang committed
222
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts)
Michael Yang's avatar
Michael Yang committed
223
224
225
226
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
227

Michael Yang's avatar
Michael Yang committed
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
246

Michael Yang's avatar
Michael Yang committed
247
		for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
248
			err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
249
250
251
252
253
254
255
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
				log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
256
257
258
				continue
			}

Michael Yang's avatar
Michael Yang committed
259
260
261
			return nil
		}

Michael Yang's avatar
Michael Yang committed
262
		return fmt.Errorf("%w: %w", errMaxRetriesExceeded, err)
Michael Yang's avatar
Michael Yang committed
263

Michael Yang's avatar
Michael Yang committed
264
265
266
267
268
269
270
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
271

Michael Yang's avatar
Michael Yang committed
272
273
274
275
276
277
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
278
279
		}

280
		return fmt.Errorf("http status %s: %s", resp.Status, body)
Michael Yang's avatar
Michael Yang committed
281
282
	}

Michael Yang's avatar
Michael Yang committed
283
284
285
286
287
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
288
289
}

Michael Yang's avatar
Michael Yang committed
290
291
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
292
293
}

Michael Yang's avatar
Michael Yang committed
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
313
			Status:    fmt.Sprintf("pushing %s", b.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
314
315
316
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
317
318
		})

Michael Yang's avatar
Michael Yang committed
319
320
321
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
322
	}
Michael Yang's avatar
Michael Yang committed
323
}
Michael Yang's avatar
Michael Yang committed
324

Michael Yang's avatar
Michael Yang committed
325
326
327
328
329
330
331
type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
	hash.Hash

Michael Yang's avatar
Michael Yang committed
332
	written int64
Michael Yang's avatar
Michael Yang committed
333
334

	io.ReadSeeker
Michael Yang's avatar
Michael Yang committed
335
336
337
	*blobUpload
}

Michael Yang's avatar
Michael Yang committed
338
339
340
341
func (p *blobUploadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
342
343
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
344

Michael Yang's avatar
Michael Yang committed
345
346
347
348
349
350
351
func (p *blobUploadPart) Reset() {
	p.Seek(0, io.SeekStart)
	p.Completed.Add(-int64(p.written))
	p.written = 0
	p.Hash = md5.New()
}

Michael Yang's avatar
Michael Yang committed
352
353
354
355
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
356
357
358
359
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
360
		return err
Michael Yang's avatar
Michael Yang committed
361
362
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
363
		fn(api.ProgressResponse{
Jeffrey Morgan's avatar
Jeffrey Morgan committed
364
			Status:    fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Michael Yang's avatar
Michael Yang committed
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}