"examples/langchain-python-rag-privategpt/README.md" did not exist on "cdf5e106ae1d724627fae04078c311565ba70d6f"
upload.go 8.05 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
package server

import (
	"context"
5
	"crypto/md5"
Michael Yang's avatar
Michael Yang committed
6
7
	"errors"
	"fmt"
8
	"hash"
Michael Yang's avatar
Michael Yang committed
9
10
11
12
13
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
14
	"strings"
Michael Yang's avatar
Michael Yang committed
15
	"sync"
Michael Yang's avatar
Michael Yang committed
16
17
	"sync/atomic"
	"time"
Michael Yang's avatar
Michael Yang committed
18
19

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
20
21
	"github.com/jmorganca/ollama/format"
	"golang.org/x/sync/errgroup"
Michael Yang's avatar
Michael Yang committed
22
23
)

Michael Yang's avatar
Michael Yang committed
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
var blobUploadManager sync.Map

type blobUpload struct {
	*Layer

	Total     int64
	Completed atomic.Int64

	Parts []blobUploadPart

	nextURL chan *url.URL

	context.CancelFunc

	done       bool
	err        error
	references atomic.Int32
}

type blobUploadPart struct {
	// N is the part number
	N      int
	Offset int64
	Size   int64
48
	hash.Hash
Michael Yang's avatar
Michael Yang committed
49
50
}

51
const (
Michael Yang's avatar
Michael Yang committed
52
53
54
	numUploadParts          = 64
	minUploadPartSize int64 = 95 * 1000 * 1000
	maxUploadPartSize int64 = 1000 * 1000 * 1000
55
56
)

Michael Yang's avatar
Michael Yang committed
57
58
59
60
61
62
63
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
	p, err := GetBlobsPath(b.Digest)
	if err != nil {
		return err
	}

	if b.From != "" {
Michael Yang's avatar
Michael Yang committed
64
		values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
65
66
		values.Add("mount", b.Digest)
		values.Add("from", b.From)
Michael Yang's avatar
Michael Yang committed
67
68
69
		requestURL.RawQuery = values.Encode()
	}

Michael Yang's avatar
Michael Yang committed
70
	resp, err := makeRequestWithRetry(ctx, "POST", requestURL, nil, nil, opts)
Michael Yang's avatar
Michael Yang committed
71
	if err != nil {
Michael Yang's avatar
Michael Yang committed
72
		return err
Michael Yang's avatar
Michael Yang committed
73
74
75
	}
	defer resp.Body.Close()

76
	location := resp.Header.Get("Docker-Upload-Location")
Michael Yang's avatar
Michael Yang committed
77
	if location == "" {
78
79
80
		location = resp.Header.Get("Location")
	}

Michael Yang's avatar
Michael Yang committed
81
	fi, err := os.Stat(p)
82
	if err != nil {
Michael Yang's avatar
Michael Yang committed
83
		return err
Michael Yang's avatar
Michael Yang committed
84
85
	}

Michael Yang's avatar
Michael Yang committed
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
	b.Total = fi.Size()

	var size = b.Total / numUploadParts
	switch {
	case size < minUploadPartSize:
		size = minUploadPartSize
	case size > maxUploadPartSize:
		size = maxUploadPartSize
	}

	var offset int64
	for offset < fi.Size() {
		if offset+size > fi.Size() {
			size = fi.Size() - offset
		}

Michael Yang's avatar
Michael Yang committed
102
		// set part.N to the current number of parts
103
		b.Parts = append(b.Parts, blobUploadPart{N: len(b.Parts), Offset: offset, Size: size, Hash: md5.New()})
Michael Yang's avatar
Michael Yang committed
104
105
		offset += size
	}
Michael Yang's avatar
Michael Yang committed
106

Michael Yang's avatar
fix log  
Michael Yang committed
107
	log.Printf("uploading %s in %d %s part(s)", b.Digest[7:19], len(b.Parts), format.HumanBytes(b.Parts[0].Size))
Michael Yang's avatar
Michael Yang committed
108

Michael Yang's avatar
Michael Yang committed
109
	requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
110
111
112
113
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
114
115
116
117
118
	b.nextURL = make(chan *url.URL, 1)
	b.nextURL <- requestURL
	return nil
}

Michael Yang's avatar
Michael Yang committed
119
120
// Run uploads blob parts to the upstream. If the upstream supports redirection, parts will be uploaded
// in parallel as defined by Prepare. Otherwise, parts will be uploaded serially. Run sets b.err on error.
Michael Yang's avatar
Michael Yang committed
121
122
123
124
125
func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
	defer blobUploadManager.Delete(b.Digest)
	ctx, b.CancelFunc = context.WithCancel(ctx)

	p, err := GetBlobsPath(b.Digest)
Michael Yang's avatar
Michael Yang committed
126
	if err != nil {
Michael Yang's avatar
Michael Yang committed
127
128
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
129
130
	}

Michael Yang's avatar
Michael Yang committed
131
132
	f, err := os.Open(p)
	if err != nil {
Michael Yang's avatar
Michael Yang committed
133
134
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
135
	}
Michael Yang's avatar
Michael Yang committed
136
	defer f.Close()
Michael Yang's avatar
Michael Yang committed
137

Michael Yang's avatar
Michael Yang committed
138
139
140
141
	g, inner := errgroup.WithContext(ctx)
	g.SetLimit(numUploadParts)
	for i := range b.Parts {
		part := &b.Parts[i]
Michael Yang's avatar
Michael Yang committed
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
		select {
		case <-inner.Done():
		case requestURL := <-b.nextURL:
			g.Go(func() error {
				for try := 0; try < maxRetries; try++ {
					r := io.NewSectionReader(f, part.Offset, part.Size)
					err := b.uploadChunk(inner, http.MethodPatch, requestURL, r, part, opts)
					switch {
					case errors.Is(err, context.Canceled):
						return err
					case errors.Is(err, errMaxRetriesExceeded):
						return err
					case err != nil:
						log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
						continue
					}

					return nil
Michael Yang's avatar
Michael Yang committed
160
161
				}

Michael Yang's avatar
Michael Yang committed
162
163
164
				return errMaxRetriesExceeded
			})
		}
Michael Yang's avatar
Michael Yang committed
165
	}
166

Michael Yang's avatar
Michael Yang committed
167
	if err := g.Wait(); err != nil {
Michael Yang's avatar
Michael Yang committed
168
169
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
170
171
	}

Michael Yang's avatar
Michael Yang committed
172
173
	requestURL := <-b.nextURL

174
175
176
177
178
179
180
	var sb strings.Builder
	for _, part := range b.Parts {
		sb.Write(part.Sum(nil))
	}

	md5sum := md5.Sum([]byte(sb.String()))

Michael Yang's avatar
Michael Yang committed
181
	values := requestURL.Query()
Michael Yang's avatar
Michael Yang committed
182
	values.Add("digest", b.Digest)
183
	values.Add("etag", fmt.Sprintf("%x-%d", md5sum, len(b.Parts)))
Michael Yang's avatar
Michael Yang committed
184
185
186
187
188
189
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

Michael Yang's avatar
Michael Yang committed
190
	resp, err := makeRequestWithRetry(ctx, "PUT", requestURL, headers, nil, opts)
Michael Yang's avatar
Michael Yang committed
191
	if err != nil {
Michael Yang's avatar
Michael Yang committed
192
193
		b.err = err
		return
Michael Yang's avatar
Michael Yang committed
194
195
196
	}
	defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
197
	b.done = true
Michael Yang's avatar
Michael Yang committed
198
}
Michael Yang's avatar
Michael Yang committed
199

Michael Yang's avatar
Michael Yang committed
200
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, rs io.ReadSeeker, part *blobUploadPart, opts *RegistryOptions) error {
Michael Yang's avatar
Michael Yang committed
201
202
	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
Michael Yang's avatar
Michael Yang committed
203
	headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
204
205
206
	headers.Set("X-Redirect-Uploads", "1")

	if method == http.MethodPatch {
Michael Yang's avatar
Michael Yang committed
207
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
208
	}
Michael Yang's avatar
Michael Yang committed
209

Michael Yang's avatar
Michael Yang committed
210
	buw := blobUploadWriter{blobUpload: b}
211
	resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(rs, io.MultiWriter(&buw, part.Hash)), opts)
Michael Yang's avatar
Michael Yang committed
212
213
214
215
	if err != nil {
		return err
	}
	defer resp.Body.Close()
Michael Yang's avatar
Michael Yang committed
216

Michael Yang's avatar
Michael Yang committed
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
	location := resp.Header.Get("Docker-Upload-Location")
	if location == "" {
		location = resp.Header.Get("Location")
	}

	nextURL, err := url.Parse(location)
	if err != nil {
		return err
	}

	switch {
	case resp.StatusCode == http.StatusTemporaryRedirect:
		b.nextURL <- nextURL

		redirectURL, err := resp.Location()
		if err != nil {
			return err
		}
235

Michael Yang's avatar
Michael Yang committed
236
237
238
		for try := 0; try < maxRetries; try++ {
			rs.Seek(0, io.SeekStart)
			b.Completed.Add(-buw.written)
Michael Yang's avatar
Michael Yang committed
239
			buw.written = 0
240
			part.Hash = md5.New()
Michael Yang's avatar
Michael Yang committed
241
242
243
244
245
246
247
248
			err := b.uploadChunk(ctx, http.MethodPut, redirectURL, rs, part, nil)
			switch {
			case errors.Is(err, context.Canceled):
				return err
			case errors.Is(err, errMaxRetriesExceeded):
				return err
			case err != nil:
				log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err)
249
250
251
				continue
			}

Michael Yang's avatar
Michael Yang committed
252
253
254
255
			return nil
		}

		return errMaxRetriesExceeded
Michael Yang's avatar
Michael Yang committed
256

Michael Yang's avatar
Michael Yang committed
257
258
259
260
261
262
263
	case resp.StatusCode == http.StatusUnauthorized:
		auth := resp.Header.Get("www-authenticate")
		authRedir := ParseAuthRedirectString(auth)
		token, err := getAuthToken(ctx, authRedir)
		if err != nil {
			return err
		}
Michael Yang's avatar
Michael Yang committed
264

Michael Yang's avatar
Michael Yang committed
265
266
267
268
269
270
		opts.Token = token
		fallthrough
	case resp.StatusCode >= http.StatusBadRequest:
		body, err := io.ReadAll(resp.Body)
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
271
272
		}

Michael Yang's avatar
Michael Yang committed
273
274
		rs.Seek(0, io.SeekStart)
		b.Completed.Add(-buw.written)
Michael Yang's avatar
Michael Yang committed
275
		buw.written = 0
Michael Yang's avatar
Michael Yang committed
276
		return fmt.Errorf("http status %d %s: %s", resp.StatusCode, resp.Status, body)
Michael Yang's avatar
Michael Yang committed
277
278
	}

Michael Yang's avatar
Michael Yang committed
279
280
281
282
283
	if method == http.MethodPatch {
		b.nextURL <- nextURL
	}

	return nil
Michael Yang's avatar
Michael Yang committed
284
285
}

Michael Yang's avatar
Michael Yang committed
286
287
func (b *blobUpload) acquire() {
	b.references.Add(1)
Michael Yang's avatar
Michael Yang committed
288
289
}

Michael Yang's avatar
Michael Yang committed
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
func (b *blobUpload) release() {
	if b.references.Add(-1) == 0 {
		b.CancelFunc()
	}
}

func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) error {
	b.acquire()
	defer b.release()

	ticker := time.NewTicker(60 * time.Millisecond)
	for {
		select {
		case <-ticker.C:
		case <-ctx.Done():
			return ctx.Err()
		}

		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", b.Digest),
			Digest:    b.Digest,
			Total:     b.Total,
			Completed: b.Completed.Load(),
Michael Yang's avatar
Michael Yang committed
313
314
		})

Michael Yang's avatar
Michael Yang committed
315
316
317
		if b.done || b.err != nil {
			return b.err
		}
Michael Yang's avatar
Michael Yang committed
318
	}
Michael Yang's avatar
Michael Yang committed
319
}
Michael Yang's avatar
Michael Yang committed
320

Michael Yang's avatar
Michael Yang committed
321
322
323
324
325
326
327
328
329
type blobUploadWriter struct {
	written int64
	*blobUpload
}

func (b *blobUploadWriter) Write(p []byte) (n int, err error) {
	n = len(p)
	b.written += int64(n)
	b.Completed.Add(int64(n))
Michael Yang's avatar
Michael Yang committed
330
331
	return n, nil
}
Michael Yang's avatar
Michael Yang committed
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372

func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs", layer.Digest)

	resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	switch resp.StatusCode {
	case http.StatusNotFound:
	case http.StatusOK:
		fn(api.ProgressResponse{
			Status:    fmt.Sprintf("uploading %s", layer.Digest),
			Digest:    layer.Digest,
			Total:     layer.Size,
			Completed: layer.Size,
		})

		return nil
	default:
		return fmt.Errorf("unexpected status code %d", resp.StatusCode)
	}

	data, ok := blobUploadManager.LoadOrStore(layer.Digest, &blobUpload{Layer: layer})
	upload := data.(*blobUpload)
	if !ok {
		requestURL := mp.BaseURL()
		requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
		if err := upload.Prepare(ctx, requestURL, opts); err != nil {
			blobUploadManager.Delete(layer.Digest)
			return err
		}

		go upload.Run(context.Background(), opts)
	}

	return upload.Wait(ctx, fn)
}