upload.go 7.9 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
10
11
12
13
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
14
	"strings"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
21
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
22
23
)

Michael Yang's avatar
Michael Yang committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

	done       bool
	err        error
	references atomic.Int32
}

43
const (
Michael Yang's avatar
Michael Yang committed
44
45
46
	numUploadParts          = 64
	minUploadPartSize int64 = 95 * 1000 * 1000
	maxUploadPartSize int64 = 1000 * 1000 * 1000
47
48
)

Michael Yang's avatar
Michael Yang committed
49
50
51
52
53
54
55
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
56
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
57
58
		values.Add("mount", b.Digest)
		values.Add("from", b.From)
Michael Yang's avatar
Michael Yang committed
59
60
61
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
62
	resp, err := makeRequestWithRetry(ctx, http.MethodPost, requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
63
	if err != nil {
Michael Yang's avatar
Michael Yang committed
64
		return err
Michael Yang's avatar
Michael Yang committed
65
66
67
	}
	defer resp.Body.Close()

68
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
69
	if location == "" {
70
71
72
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
73
	fi, err := os.Stat(p)
74
	if err != nil {
Michael Yang's avatar
Michael Yang committed
75
		return err
Michael Yang's avatar
Michael Yang committed
76
77
	}

Michael Yang's avatar
Michael Yang committed
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
	b.Total = fi.Size()

	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
94
		// set part.N to the current number of parts
Michael Yang's avatar
Michael Yang committed
95
		b.Parts = append(b.Parts, blobUploadPart{blobUpload: b, N: len(b.Parts), Offset: offset, Size: size})
Michael Yang's avatar
Michael Yang committed
96
97
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
98

Michael Yang's avatar
fix log  
Michael Yang committed
99
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
100

Michael Yang's avatar
Michael Yang committed
101
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
102
103
104
105
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
106
107
108
109
110
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
111
112
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
113
114
115
116
117
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

	p, err := GetBlobsPath(b.Digest)
Michael Yang's avatar
Michael Yang committed
118
	if err != nil {
Michael Yang's avatar
Michael Yang committed
119
120
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
121
122
	}

Michael Yang's avatar
Michael Yang committed
123
124
	f, err := os.Open(p)
	if err != nil {
Michael Yang's avatar
Michael Yang committed
125
126
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
127
	}
Michael Yang's avatar
Michael Yang committed
128
	defer f.Close()
Michael Yang's avatar
Michael Yang committed
129

Michael Yang's avatar
Michael Yang committed
130
131
132
133
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
134
135
136
137
138
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
				for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
139
140
					part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size)
					err := b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
Michael Yang's avatar
Michael Yang committed
141
142
143
144
145
146
147
148
149
150
151
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
						log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
152
153
				}

Michael Yang's avatar
Michael Yang committed
154
155
156
				return errMaxRetriesExceeded
			})
		}
Michael Yang's avatar
Michael Yang committed
157
	}
158

Michael Yang's avatar
Michael Yang committed
159
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
160
161
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
162
163
	}

Michael Yang's avatar
Michael Yang committed
164
165
	requestURL := <-b.nextURL

166
167
168
169
170
171
172
	var sb strings.Builder
	for _, part := range b.Parts {
		sb.Write(part.Sum(nil))
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
173
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
174
	values.Add("digest", b.Digest)
175
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
176
177
178
179
180
181
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
182
	resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
183
	if err != nil {
Michael Yang's avatar
Michael Yang committed
184
185
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
186
187
188
	}
	defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
189
	b.done = true
Michael Yang's avatar
Michael Yang committed
190
}
Michael Yang's avatar
Michael Yang committed
191

Michael Yang's avatar
Michael Yang committed
192
193
194
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
	part.Reset()

Michael Yang's avatar
Michael Yang committed
195
196
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
197
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
198
199
200
	headers.Set("X-Redirect-Uploads", "1")

	if method == http.MethodPatch {
Michael Yang's avatar
Michael Yang committed
201
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
202
	}
Michael Yang's avatar
Michael Yang committed
203

Michael Yang's avatar
Michael Yang committed
204
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts)
Michael Yang's avatar
Michael Yang committed
205
206
207
208
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
209

Michael Yang's avatar
Michael Yang committed
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
228

Michael Yang's avatar
Michael Yang committed
229
		for try := 0; try < maxRetries; try++ {
Michael Yang's avatar
Michael Yang committed
230
			err := b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
Michael Yang's avatar
Michael Yang committed
231
232
233
234
235
236
237
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
				log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
238
239
240
				continue
			}

Michael Yang's avatar
Michael Yang committed
241
242
243
244
			return nil
		}

		return errMaxRetriesExceeded
Michael Yang's avatar
Michael Yang committed
245

Michael Yang's avatar
Michael Yang committed
246
247
248
249
250
251
252
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
253

Michael Yang's avatar
Michael Yang committed
254
255
256
257
258
259
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
260
261
		}

Michael Yang's avatar
Michael Yang committed
262
		return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body)
Michael Yang's avatar
Michael Yang committed
263
264
	}

Michael Yang's avatar
Michael Yang committed
265
266
267
268
269
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
270
271
}

Michael Yang's avatar
Michael Yang committed
272
273
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
274
275
}

Michael Yang's avatar
Michael Yang committed
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
299
300
		})

Michael Yang's avatar
Michael Yang committed
301
302
303
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
304
	}
Michael Yang's avatar
Michael Yang committed
305
}
Michael Yang's avatar
Michael Yang committed
306

Michael Yang's avatar
Michael Yang committed
307
308
309
310
311
312
313
type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
	hash.Hash

Michael Yang's avatar
Michael Yang committed
314
	written int64
Michael Yang's avatar
Michael Yang committed
315
316

	io.ReadSeeker
Michael Yang's avatar
Michael Yang committed
317
318
319
	*blobUpload
}

Michael Yang's avatar
Michael Yang committed
320
321
322
323
func (p *blobUploadPart) Write(b []byte) (n int, err error) {
	n = len(b)
	p.written += int64(n)
	p.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
324
325
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
326

Michael Yang's avatar
Michael Yang committed
327
328
329
330
331
332
333
func (p *blobUploadPart) Reset() {
	p.Seek(0, io.SeekStart)
	p.Completed.Add(-int64(p.written))
	p.written = 0
	p.Hash = md5.New()
}

Michael Yang's avatar
Michael Yang committed
334
335
336
337
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

Michael Yang's avatar
Michael Yang committed
338
339
340
341
	resp, err := makeRequestWithRetry(ctx, http.MethodHead, requestURL, nil, nil, opts)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
Michael Yang's avatar
Michael Yang committed
342
		return err
Michael Yang's avatar
Michael Yang committed
343
344
	default:
		defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", layer.Digest),
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}