upload.go 5.26 KB
Newer Older
Michael Yang's avatar
Michael Yang committed
1
2
3
4
5
6
7
8
9
10
11
12
package server

import (
	"context"
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"os"
	"strconv"
Michael Yang's avatar
Michael Yang committed
13
	"sync"
Michael Yang's avatar
Michael Yang committed
14
15
16
17

	"github.com/jmorganca/ollama/api"
)

18
const (
Michael Yang's avatar
Michael Yang committed
19
20
	redirectChunkSize int64 = 1024 * 1024 * 1024
	regularChunkSize  int64 = 95 * 1024 * 1024
21
22
23
)

func startUpload(ctx context.Context, mp ModelPath, layer *Layer, regOpts *RegistryOptions) (*url.URL, int64, error) {
Michael Yang's avatar
Michael Yang committed
24
25
26
27
28
29
30
31
32
33
34
35
	requestURL := mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", mp.GetNamespaceRepository(), "blobs/uploads/")
	if layer.From != "" {
		values := requestURL.Query()
		values.Add("mount", layer.Digest)
		values.Add("from", layer.From)
		requestURL.RawQuery = values.Encode()
	}

	resp, err := makeRequestWithRetry(ctx, "POST", requestURL, nil, nil, regOpts)
	if err != nil {
		log.Printf("couldn't start upload: %v", err)
36
		return nil, 0, err
Michael Yang's avatar
Michael Yang committed
37
38
39
	}
	defer resp.Body.Close()

40
41
	location := resp.Header.Get("Docker-Upload-Location")
	chunkSize := redirectChunkSize
Michael Yang's avatar
Michael Yang committed
42
	if location == "" {
43
44
45
46
47
48
49
		location = resp.Header.Get("Location")
		chunkSize = regularChunkSize
	}

	locationURL, err := url.Parse(location)
	if err != nil {
		return nil, 0, err
Michael Yang's avatar
Michael Yang committed
50
51
	}

Michael Yang's avatar
Michael Yang committed
52
	return locationURL, chunkSize, nil
Michael Yang's avatar
Michael Yang committed
53
54
}

55
func uploadBlob(ctx context.Context, requestURL *url.URL, layer *Layer, chunkSize int64, regOpts *RegistryOptions, fn func(api.ProgressResponse)) error {
Michael Yang's avatar
Michael Yang committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
	// TODO allow resumability
	// TODO allow canceling uploads via DELETE

	fp, err := GetBlobsPath(layer.Digest)
	if err != nil {
		return err
	}

	f, err := os.Open(fp)
	if err != nil {
		return err
	}
	defer f.Close()

Michael Yang's avatar
Michael Yang committed
70
71
72
73
74
75
	pw := ProgressWriter{
		status: fmt.Sprintf("uploading %s", layer.Digest),
		digest: layer.Digest,
		total:  layer.Size,
		fn:     fn,
	}
Michael Yang's avatar
Michael Yang committed
76

Michael Yang's avatar
Michael Yang committed
77
78
79
80
	for offset := int64(0); offset < layer.Size; {
		chunk := layer.Size - offset
		if chunk > chunkSize {
			chunk = chunkSize
Michael Yang's avatar
Michael Yang committed
81
82
		}

83
		resp, err := uploadBlobChunk(ctx, http.MethodPatch, requestURL, f, offset, chunk, regOpts, &pw)
Michael Yang's avatar
Michael Yang committed
84
85
		if err != nil {
			fn(api.ProgressResponse{
86
				Status:    fmt.Sprintf("error uploading chunk: %v", err),
Michael Yang's avatar
Michael Yang committed
87
88
				Digest:    layer.Digest,
				Total:     layer.Size,
Michael Yang's avatar
Michael Yang committed
89
				Completed: offset,
Michael Yang's avatar
Michael Yang committed
90
			})
Michael Yang's avatar
Michael Yang committed
91
92

			return err
Michael Yang's avatar
Michael Yang committed
93
		}
Michael Yang's avatar
Michael Yang committed
94

Michael Yang's avatar
Michael Yang committed
95
		offset += chunk
96
97
98
99
100
101
		location := resp.Header.Get("Docker-Upload-Location")
		if location == "" {
			location = resp.Header.Get("Location")
		}

		requestURL, err = url.Parse(location)
Michael Yang's avatar
Michael Yang committed
102
103
		if err != nil {
			return err
Michael Yang's avatar
Michael Yang committed
104
		}
Michael Yang's avatar
Michael Yang committed
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
	}

	values := requestURL.Query()
	values.Add("digest", layer.Digest)
	requestURL.RawQuery = values.Encode()

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", "0")

	// finish the upload
	resp, err := makeRequest(ctx, "PUT", requestURL, headers, nil, regOpts)
	if err != nil {
		log.Printf("couldn't finish upload: %v", err)
		return err
	}
	defer resp.Body.Close()

Michael Yang's avatar
Michael Yang committed
123
	if resp.StatusCode >= http.StatusBadRequest {
Michael Yang's avatar
Michael Yang committed
124
125
126
127
128
		body, _ := io.ReadAll(resp.Body)
		return fmt.Errorf("on finish upload registry responded with code %d: %v", resp.StatusCode, string(body))
	}
	return nil
}
Michael Yang's avatar
Michael Yang committed
129

130
func uploadBlobChunk(ctx context.Context, method string, requestURL *url.URL, r io.ReaderAt, offset, limit int64, opts *RegistryOptions, pw *ProgressWriter) (*http.Response, error) {
Michael Yang's avatar
Michael Yang committed
131
	sectionReader := io.NewSectionReader(r, offset, limit)
Michael Yang's avatar
Michael Yang committed
132
133
134
135

	headers := make(http.Header)
	headers.Set("Content-Type", "application/octet-stream")
	headers.Set("Content-Length", strconv.Itoa(int(limit)))
136
137
138
139
140
	headers.Set("X-Redirect-Uploads", "1")

	if method == http.MethodPatch {
		headers.Set("Content-Range", fmt.Sprintf("%d-%d", offset, offset+sectionReader.Size()-1))
	}
Michael Yang's avatar
Michael Yang committed
141

Michael Yang's avatar
Michael Yang committed
142
	for try := 0; try < maxRetries; try++ {
143
		resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sectionReader, pw), opts)
Michael Yang's avatar
Michael Yang committed
144
145
146
147
148
149
		if err != nil && !errors.Is(err, io.EOF) {
			return nil, err
		}
		defer resp.Body.Close()

		switch {
150
151
152
153
154
155
		case resp.StatusCode == http.StatusTemporaryRedirect:
			location, err := resp.Location()
			if err != nil {
				return nil, err
			}

Michael Yang's avatar
Michael Yang committed
156
			pw.completed = offset
157
158
159
160
161
162
163
			if _, err := uploadBlobChunk(ctx, http.MethodPut, location, r, offset, limit, nil, pw); err != nil {
				// retry
				log.Printf("retrying redirected upload: %v", err)
				continue
			}

			return resp, nil
Michael Yang's avatar
Michael Yang committed
164
165
166
167
168
169
170
171
172
173
		case resp.StatusCode == http.StatusUnauthorized:
			auth := resp.Header.Get("www-authenticate")
			authRedir := ParseAuthRedirectString(auth)
			token, err := getAuthToken(ctx, authRedir)
			if err != nil {
				return nil, err
			}

			opts.Token = token

Michael Yang's avatar
Michael Yang committed
174
			pw.completed = offset
Michael Yang's avatar
Michael Yang committed
175
176
177
178
179
180
181
182
183
184
185
186
187
			sectionReader = io.NewSectionReader(r, offset, limit)
			continue
		case resp.StatusCode >= http.StatusBadRequest:
			body, _ := io.ReadAll(resp.Body)
			return nil, fmt.Errorf("on upload registry responded with code %d: %s", resp.StatusCode, body)
		}

		return resp, nil
	}

	return nil, fmt.Errorf("max retries exceeded")
}

Michael Yang's avatar
Michael Yang committed
188
189
190
type ProgressWriter struct {
	status    string
	digest    string
Michael Yang's avatar
Michael Yang committed
191
192
193
	bucket    int64
	completed int64
	total     int64
Michael Yang's avatar
Michael Yang committed
194
	fn        func(api.ProgressResponse)
Michael Yang's avatar
Michael Yang committed
195
	mu        sync.Mutex
Michael Yang's avatar
Michael Yang committed
196
197
198
}

func (pw *ProgressWriter) Write(b []byte) (int, error) {
Michael Yang's avatar
Michael Yang committed
199
200
201
	pw.mu.Lock()
	defer pw.mu.Unlock()

Michael Yang's avatar
Michael Yang committed
202
	n := len(b)
Michael Yang's avatar
Michael Yang committed
203
	pw.bucket += int64(n)
Michael Yang's avatar
Michael Yang committed
204
205

	// throttle status updates to not spam the client
Michael Yang's avatar
Michael Yang committed
206
207
	if pw.bucket >= 1024*1024 || pw.completed+pw.bucket >= pw.total {
		pw.completed += pw.bucket
Michael Yang's avatar
Michael Yang committed
208
209
210
211
212
213
214
215
216
217
218
219
		pw.fn(api.ProgressResponse{
			Status:    pw.status,
			Digest:    pw.digest,
			Total:     pw.total,
			Completed: pw.completed,
		})

		pw.bucket = 0
	}

	return n, nil
}