upload.go 7.46 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
5
6
7
8
9
10
11
package server

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
Michael Yang's avatar
Michael Yang committed
12
	"sync"
Michael Yang's avatar
Michael Yang committed
13
14
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
15
16

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
17
18
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
19
20
)

Michael Yang's avatar
Michael Yang committed
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

	done       bool
	err        error
	references atomic.Int32
}

type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
}

47
const (
Michael Yang's avatar
Michael Yang committed
48
49
50
	numUploadParts          = 64
	minUploadPartSize int64 = 95 * 1000 * 1000
	maxUploadPartSize int64 = 1000 * 1000 * 1000
51
52
)

Michael Yang's avatar
Michael Yang committed
53
54
55
56
57
58
59
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
60
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
61
62
		values.Add("mount", b.Digest)
		values.Add("from", b.From)
Michael Yang's avatar
Michael Yang committed
63
64
65
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
66
	resp, err := makeRequestWithRetry(ctx, "POST", requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
67
	if err != nil {
Michael Yang's avatar
Michael Yang committed
68
		return err
Michael Yang's avatar
Michael Yang committed
69
70
71
	}
	defer resp.Body.Close()

72
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
73
	if location == "" {
74
75
76
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
77
	fi, err := os.Stat(p)
78
	if err != nil {
Michael Yang's avatar
Michael Yang committed
79
		return err
Michael Yang's avatar
Michael Yang committed
80
81
	}

Michael Yang's avatar
Michael Yang committed
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
	b.Total = fi.Size()

	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

		b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size})
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
101

Michael Yang's avatar
Michael Yang committed
102
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(size))
Michael Yang's avatar
Michael Yang committed
103

Michael Yang's avatar
Michael Yang committed
104
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
105
106
107
108
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
109
110
111
112
113
114
115
116
117
118
119
120
121
122
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	b.err = b.run(ctx, opts)
}

func (b *blobUpload) run(ctx context.Context, opts *RegistryOptions) error {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

	p, err := GetBlobsPath(b.Digest)
Michael Yang's avatar
Michael Yang committed
123
124
125
126
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
127
128
129
	f, err := os.Open(p)
	if err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
130
	}
Michael Yang's avatar
Michael Yang committed
131
	defer f.Close()
Michael Yang's avatar
Michael Yang committed
132

Michael Yang's avatar
Michael Yang committed
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
		requestURL := <-b.nextURL
		g.Go(func() error {
			for try := 0; try < maxRetries; try++ {
				r := io.NewSectionReader(f, part.Offset, part.Size)
				err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts)
				switch {
				case errors.Is(err, context.Canceled):
					return err
				case errors.Is(err, errMaxRetriesExceeded):
					return err
				case err != nil:
					log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
					continue
				}

				return nil
			}
Michael Yang's avatar
Michael Yang committed
154

Michael Yang's avatar
Michael Yang committed
155
156
157
			return errMaxRetriesExceeded
		})
	}
158

Michael Yang's avatar
Michael Yang committed
159
160
	if err := g.Wait(); err != nil {
		return err
Michael Yang's avatar
Michael Yang committed
161
162
	}

Michael Yang's avatar
Michael Yang committed
163
164
	requestURL := <-b.nextURL

Michael Yang's avatar
Michael Yang committed
165
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
166
	values.Add("digest", b.Digest)
Michael Yang's avatar
Michael Yang committed
167
168
169
170
171
172
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
173
	resp, err := makeRequest(ctx, "PUT", requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
174
175
176
177
178
	if err != nil {
		return err
	}
	defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
179
	b.done = true
Michael Yang's avatar
Michael Yang committed
180
181
	return nil
}
Michael Yang's avatar
Michael Yang committed
182

Michael Yang's avatar
Michael Yang committed
183
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
184
185
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
186
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
187
188
189
	headers.Set("X-Redirect-Uploads", "1")

	if method == http.MethodPatch {
Michael Yang's avatar
Michael Yang committed
190
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
191
	}
Michael Yang's avatar
Michael Yang committed
192

Michael Yang's avatar
Michael Yang committed
193
194
195
196
197
198
	buw := blobUploadWriter{blobUpload: b}
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, &buw), opts)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
199

Michael Yang's avatar
Michael Yang committed
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
218

Michael Yang's avatar
Michael Yang committed
219
220
221
222
223
224
225
226
227
228
229
		for try := 0; try < maxRetries; try++ {
			rs.Seek(0, io.SeekStart)
			b.Completed.Add(-buw.written)
			err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil)
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
				log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
230
231
232
				continue
			}

Michael Yang's avatar
Michael Yang committed
233
234
235
236
			return nil
		}

		return errMaxRetriesExceeded
Michael Yang's avatar
Michael Yang committed
237

Michael Yang's avatar
Michael Yang committed
238
239
240
241
242
243
244
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
245

Michael Yang's avatar
Michael Yang committed
246
247
248
249
250
251
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
252
253
		}

Michael Yang's avatar
Michael Yang committed
254
255
256
		rs.Seek(0, io.SeekStart)
		b.Completed.Add(-buw.written)
		return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body)
Michael Yang's avatar
Michael Yang committed
257
258
	}

Michael Yang's avatar
Michael Yang committed
259
260
261
262
263
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
264
265
}

Michael Yang's avatar
Michael Yang committed
266
267
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
268
269
}

Michael Yang's avatar
Michael Yang committed
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
293
294
		})

Michael Yang's avatar
Michael Yang committed
295
296
297
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
298
	}
Michael Yang's avatar
Michael Yang committed
299
}
Michael Yang's avatar
Michael Yang committed
300

Michael Yang's avatar
Michael Yang committed
301
302
303
304
305
306
307
308
309
type blobUploadWriter struct {
	written int64
	*blobUpload
}

func (b *blobUploadWriter) Write(p []byte) (n int, err error) {
	n = len(p)
	b.written += int64(n)
	b.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
310
311
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352

func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

	resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	switch resp.StatusCode {
	case http.StatusNotFound:
	case http.StatusOK:
		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", layer.Digest),
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	default:
		return fmt.Errorf("unexpected status code %d", resp.StatusCode)
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}