Unverified Commit 5ebcde15 authored by Matt Williams's avatar Matt Williams Committed by GitHub
Browse files

Merge branch 'main' into install-instructions-archlinux

parents e1cd3152 45206cb7
package progressbar
var spinners = map[int][]string{
0: {"←", "↖", "↑", "↗", "→", "↘", "↓", "↙"},
1: {"▁", "▃", "▄", "▅", "▆", "▇", "█", "▇", "▆", "▅", "▄", "▃", "▁"},
2: {"▖", "▘", "▝", "▗"},
3: {"┤", "┘", "┴", "└", "├", "┌", "┬", "┐"},
4: {"◢", "◣", "◤", "◥"},
5: {"◰", "◳", "◲", "◱"},
6: {"◴", "◷", "◶", "◵"},
7: {"◐", "◓", "◑", "◒"},
8: {".", "o", "O", "@", "*"},
9: {"|", "/", "-", "\\"},
10: {"◡◡", "⊙⊙", "◠◠"},
11: {"⣾", "⣽", "⣻", "⢿", "⡿", "⣟", "⣯", "⣷"},
12: {">))'>", " >))'>", " >))'>", " >))'>", " >))'>", " <'((<", " <'((<", " <'((<"},
13: {"⠁", "⠂", "⠄", "⡀", "⢀", "⠠", "⠐", "⠈"},
14: {"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"},
15: {"a", "b", "c", "d", "e", "f", "g", "h", "i", "j", "k", "l", "m", "n", "o", "p", "q", "r", "s", "t", "u", "v", "w", "x", "y", "z"},
16: {"▉", "▊", "▋", "▌", "▍", "▎", "▏", "▎", "▍", "▌", "▋", "▊", "▉"},
17: {"■", "□", "▪", "▫"},
18: {"←", "↑", "→", "↓"},
19: {"╫", "╪"},
20: {"⇐", "⇖", "⇑", "⇗", "⇒", "⇘", "⇓", "⇙"},
21: {"⠁", "⠁", "⠉", "⠙", "⠚", "⠒", "⠂", "⠂", "⠒", "⠲", "⠴", "⠤", "⠄", "⠄", "⠤", "⠠", "⠠", "⠤", "⠦", "⠖", "⠒", "⠐", "⠐", "⠒", "⠓", "⠋", "⠉", "⠈", "⠈"},
22: {"⠈", "⠉", "⠋", "⠓", "⠒", "⠐", "⠐", "⠒", "⠖", "⠦", "⠤", "⠠", "⠠", "⠤", "⠦", "⠖", "⠒", "⠐", "⠐", "⠒", "⠓", "⠋", "⠉", "⠈"},
23: {"⠁", "⠉", "⠙", "⠚", "⠒", "⠂", "⠂", "⠒", "⠲", "⠴", "⠤", "⠄", "⠄", "⠤", "⠴", "⠲", "⠒", "⠂", "⠂", "⠒", "⠚", "⠙", "⠉", "⠁"},
24: {"⠋", "⠙", "⠚", "⠒", "⠂", "⠂", "⠒", "⠲", "⠴", "⠦", "⠖", "⠒", "⠐", "⠐", "⠒", "⠓", "⠋"},
25: {"ヲ", "ァ", "ィ", "ゥ", "ェ", "ォ", "ャ", "ュ", "ョ", "ッ", "ア", "イ", "ウ", "エ", "オ", "カ", "キ", "ク", "ケ", "コ", "サ", "シ", "ス", "セ", "ソ", "タ", "チ", "ツ", "テ", "ト", "ナ", "ニ", "ヌ", "ネ", "ノ", "ハ", "ヒ", "フ", "ヘ", "ホ", "マ", "ミ", "ム", "メ", "モ", "ヤ", "ユ", "ヨ", "ラ", "リ", "ル", "レ", "ロ", "ワ", "ン"},
26: {".", "..", "..."},
27: {"▁", "▂", "▃", "▄", "▅", "▆", "▇", "█", "▉", "▊", "▋", "▌", "▍", "▎", "▏", "▏", "▎", "▍", "▌", "▋", "▊", "▉", "█", "▇", "▆", "▅", "▄", "▃", "▂", "▁"},
28: {".", "o", "O", "°", "O", "o", "."},
29: {"+", "x"},
30: {"v", "<", "^", ">"},
31: {">>--->", " >>--->", " >>--->", " >>--->", " >>--->", " <---<<", " <---<<", " <---<<", " <---<<", "<---<<"},
32: {"|", "||", "|||", "||||", "|||||", "|||||||", "||||||||", "|||||||", "||||||", "|||||", "||||", "|||", "||", "|"},
33: {"[ ]", "[= ]", "[== ]", "[=== ]", "[==== ]", "[===== ]", "[====== ]", "[======= ]", "[======== ]", "[========= ]", "[==========]"},
34: {"(*---------)", "(-*--------)", "(--*-------)", "(---*------)", "(----*-----)", "(-----*----)", "(------*---)", "(-------*--)", "(--------*-)", "(---------*)"},
35: {"█▒▒▒▒▒▒▒▒▒", "███▒▒▒▒▒▒▒", "█████▒▒▒▒▒", "███████▒▒▒", "██████████"},
36: {"[ ]", "[=> ]", "[===> ]", "[=====> ]", "[======> ]", "[========> ]", "[==========> ]", "[============> ]", "[==============> ]", "[================> ]", "[==================> ]", "[===================>]"},
37: {"ဝ", "၀"},
38: {"▌", "▀", "▐▄"},
39: {"🌍", "🌎", "🌏"},
40: {"◜", "◝", "◞", "◟"},
41: {"⬒", "⬔", "⬓", "⬕"},
42: {"⬖", "⬘", "⬗", "⬙"},
43: {"[>>> >]", "[]>>>> []", "[] >>>> []", "[] >>>> []", "[] >>>> []", "[] >>>>[]", "[>> >>]"},
44: {"♠", "♣", "♥", "♦"},
45: {"➞", "➟", "➠", "➡", "➠", "➟"},
46: {" | ", ` \ `, "_ ", ` \ `, " | ", " / ", " _", " / "},
47: {" . . . .", ". . . .", ". . . .", ". . . .", ". . . . ", ". . . . ."},
48: {" | ", " / ", " _ ", ` \ `, " | ", ` \ `, " _ ", " / "},
49: {"⎺", "⎻", "⎼", "⎽", "⎼", "⎻"},
50: {"▹▹▹▹▹", "▸▹▹▹▹", "▹▸▹▹▹", "▹▹▸▹▹", "▹▹▹▸▹", "▹▹▹▹▸"},
51: {"[ ]", "[ =]", "[ ==]", "[ ===]", "[====]", "[=== ]", "[== ]", "[= ]"},
52: {"( ● )", "( ● )", "( ● )", "( ● )", "( ●)", "( ● )", "( ● )", "( ● )", "( ● )"},
53: {"✶", "✸", "✹", "✺", "✹", "✷"},
54: {"▐|\\____________▌", "▐_|\\___________▌", "▐__|\\__________▌", "▐___|\\_________▌", "▐____|\\________▌", "▐_____|\\_______▌", "▐______|\\______▌", "▐_______|\\_____▌", "▐________|\\____▌", "▐_________|\\___▌", "▐__________|\\__▌", "▐___________|\\_▌", "▐____________|\\▌", "▐____________/|▌", "▐___________/|_▌", "▐__________/|__▌", "▐_________/|___▌", "▐________/|____▌", "▐_______/|_____▌", "▐______/|______▌", "▐_____/|_______▌", "▐____/|________▌", "▐___/|_________▌", "▐__/|__________▌", "▐_/|___________▌", "▐/|____________▌"},
55: {"▐⠂ ▌", "▐⠈ ▌", "▐ ⠂ ▌", "▐ ⠠ ▌", "▐ ⡀ ▌", "▐ ⠠ ▌", "▐ ⠂ ▌", "▐ ⠈ ▌", "▐ ⠂ ▌", "▐ ⠠ ▌", "▐ ⡀ ▌", "▐ ⠠ ▌", "▐ ⠂ ▌", "▐ ⠈ ▌", "▐ ⠂▌", "▐ ⠠▌", "▐ ⡀▌", "▐ ⠠ ▌", "▐ ⠂ ▌", "▐ ⠈ ▌", "▐ ⠂ ▌", "▐ ⠠ ▌", "▐ ⡀ ▌", "▐ ⠠ ▌", "▐ ⠂ ▌", "▐ ⠈ ▌", "▐ ⠂ ▌", "▐ ⠠ ▌", "▐ ⡀ ▌", "▐⠠ ▌"},
56: {"¿", "?"},
57: {"⢹", "⢺", "⢼", "⣸", "⣇", "⡧", "⡗", "⡏"},
58: {"⢄", "⢂", "⢁", "⡁", "⡈", "⡐", "⡠"},
59: {". ", ".. ", "...", " ..", " .", " "},
60: {".", "o", "O", "°", "O", "o", "."},
61: {"▓", "▒", "░"},
62: {"▌", "▀", "▐", "▄"},
63: {"⊶", "⊷"},
64: {"▪", "▫"},
65: {"□", "■"},
66: {"▮", "▯"},
67: {"-", "=", "≡"},
68: {"d", "q", "p", "b"},
69: {"∙∙∙", "●∙∙", "∙●∙", "∙∙●", "∙∙∙"},
70: {"🌑 ", "🌒 ", "🌓 ", "🌔 ", "🌕 ", "🌖 ", "🌗 ", "🌘 "},
71: {"☗", "☖"},
72: {"⧇", "⧆"},
73: {"◉", "◎"},
74: {"㊂", "㊀", "㊁"},
75: {"⦾", "⦿"},
}
...@@ -10,6 +10,8 @@ docker buildx build \ ...@@ -10,6 +10,8 @@ docker buildx build \
--platform=linux/arm64,linux/amd64 \ --platform=linux/arm64,linux/amd64 \
--build-arg=VERSION \ --build-arg=VERSION \
--build-arg=GOFLAGS \ --build-arg=GOFLAGS \
--cache-from type=local,src=.cache \
--cache-to type=local,dest=.cache \
-f Dockerfile \ -f Dockerfile \
-t ollama \ -t ollama \
. .
...@@ -10,6 +10,7 @@ docker buildx build \ ...@@ -10,6 +10,7 @@ docker buildx build \
--platform=linux/arm64,linux/amd64 \ --platform=linux/arm64,linux/amd64 \
--build-arg=VERSION \ --build-arg=VERSION \
--build-arg=GOFLAGS \ --build-arg=GOFLAGS \
--cache-from type=local,src=.cache \
-f Dockerfile \ -f Dockerfile \
-t ollama/ollama -t ollama/ollama:$VERSION \ -t ollama/ollama -t ollama/ollama:$VERSION \
. .
...@@ -7,6 +7,7 @@ import ( ...@@ -7,6 +7,7 @@ import (
"fmt" "fmt"
"io" "io"
"log" "log"
"math"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
...@@ -53,8 +54,8 @@ type blobDownloadPart struct { ...@@ -53,8 +54,8 @@ type blobDownloadPart struct {
const ( const (
numDownloadParts = 64 numDownloadParts = 64
minDownloadPartSize int64 = 32 * 1000 * 1000 minDownloadPartSize int64 = 100 * format.MegaByte
maxDownloadPartSize int64 = 256 * 1000 * 1000 maxDownloadPartSize int64 = 1000 * format.MegaByte
) )
func (p *blobDownloadPart) Name() string { func (p *blobDownloadPart) Name() string {
...@@ -147,7 +148,6 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis ...@@ -147,7 +148,6 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
continue continue
} }
i := i
g.Go(func() error { g.Go(func() error {
var err error var err error
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
...@@ -158,12 +158,11 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis ...@@ -158,12 +158,11 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
// return immediately if the context is canceled or the device is out of space // return immediately if the context is canceled or the device is out of space
return err return err
case err != nil: case err != nil:
log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], i, try, err) sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
time.Sleep(sleep)
continue continue
default: default:
if try > 0 {
log.Printf("%s part %d completed after %d retries", b.Digest[7:19], i, try)
}
return nil return nil
} }
} }
...@@ -285,7 +284,7 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse)) ...@@ -285,7 +284,7 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
} }
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: fmt.Sprintf("downloading %s", b.Digest), Status: fmt.Sprintf("pulling %s", b.Digest[7:19]),
Digest: b.Digest, Digest: b.Digest,
Total: b.Total, Total: b.Total,
Completed: b.Completed.Load(), Completed: b.Completed.Load(),
...@@ -304,7 +303,7 @@ type downloadOpts struct { ...@@ -304,7 +303,7 @@ type downloadOpts struct {
fn func(api.ProgressResponse) fn func(api.ProgressResponse)
} }
const maxRetries = 3 const maxRetries = 6
var errMaxRetriesExceeded = errors.New("max retries exceeded") var errMaxRetriesExceeded = errors.New("max retries exceeded")
...@@ -322,7 +321,7 @@ func downloadBlob(ctx context.Context, opts downloadOpts) error { ...@@ -322,7 +321,7 @@ func downloadBlob(ctx context.Context, opts downloadOpts) error {
return err return err
default: default:
opts.fn(api.ProgressResponse{ opts.fn(api.ProgressResponse{
Status: fmt.Sprintf("downloading %s", opts.digest), Status: fmt.Sprintf("pulling %s", opts.digest[7:19]),
Digest: opts.digest, Digest: opts.digest,
Total: fi.Size(), Total: fi.Size(),
Completed: fi.Size(), Completed: fi.Size(),
......
...@@ -228,26 +228,6 @@ func GetModel(name string) (*Model, error) { ...@@ -228,26 +228,6 @@ func GetModel(name string) (*Model, error) {
return model, nil return model, nil
} }
func filenameWithPath(path, f string) (string, error) {
// if filePath starts with ~/, replace it with the user's home directory.
if strings.HasPrefix(f, fmt.Sprintf("~%s", string(os.PathSeparator))) {
parts := strings.Split(f, string(os.PathSeparator))
home, err := os.UserHomeDir()
if err != nil {
return "", fmt.Errorf("failed to open file: %v", err)
}
f = filepath.Join(home, filepath.Join(parts[1:]...))
}
// if filePath is not an absolute path, make it relative to the modelfile path
if !filepath.IsAbs(f) {
f = filepath.Join(filepath.Dir(path), f)
}
return f, nil
}
func realpath(p string) string { func realpath(p string) string {
abspath, err := filepath.Abs(p) abspath, err := filepath.Abs(p)
if err != nil { if err != nil {
...@@ -1146,43 +1126,49 @@ func GetSHA256Digest(r io.Reader) (string, int64) { ...@@ -1146,43 +1126,49 @@ func GetSHA256Digest(r io.Reader) (string, int64) {
var errUnauthorized = fmt.Errorf("unauthorized") var errUnauthorized = fmt.Errorf("unauthorized")
func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.ReadSeeker, regOpts *RegistryOptions) (*http.Response, error) { func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.ReadSeeker, regOpts *RegistryOptions) (*http.Response, error) {
lastErr := errMaxRetriesExceeded resp, err := makeRequest(ctx, method, requestURL, headers, body, regOpts)
for try := 0; try < maxRetries; try++ { if err != nil {
resp, err := makeRequest(ctx, method, requestURL, headers, body, regOpts) if !errors.Is(err, context.Canceled) {
log.Printf("request failed: %v", err)
}
return nil, err
}
switch {
case resp.StatusCode == http.StatusUnauthorized:
// Handle authentication error with one retry
auth := resp.Header.Get("www-authenticate")
authRedir := ParseAuthRedirectString(auth)
token, err := getAuthToken(ctx, authRedir)
if err != nil { if err != nil {
log.Printf("couldn't start upload: %v", err)
return nil, err return nil, err
} }
regOpts.Token = token
switch { if body != nil {
case resp.StatusCode == http.StatusUnauthorized: _, err = body.Seek(0, io.SeekStart)
auth := resp.Header.Get("www-authenticate")
authRedir := ParseAuthRedirectString(auth)
token, err := getAuthToken(ctx, authRedir)
if err != nil { if err != nil {
return nil, err return nil, err
} }
}
regOpts.Token = token resp, err := makeRequest(ctx, method, requestURL, headers, body, regOpts)
if body != nil { if resp.StatusCode == http.StatusUnauthorized {
body.Seek(0, io.SeekStart) return nil, errUnauthorized
} }
lastErr = errUnauthorized
case resp.StatusCode == http.StatusNotFound:
return nil, os.ErrNotExist
case resp.StatusCode >= http.StatusBadRequest:
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("%d: %s", resp.StatusCode, err)
}
return nil, fmt.Errorf("%d: %s", resp.StatusCode, body) return resp, err
default: case resp.StatusCode == http.StatusNotFound:
return resp, nil return nil, os.ErrNotExist
case resp.StatusCode >= http.StatusBadRequest:
responseBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("%d: %s", resp.StatusCode, err)
} }
return nil, fmt.Errorf("%d: %s", resp.StatusCode, responseBody)
} }
return nil, lastErr return resp, nil
} }
func makeRequest(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) { func makeRequest(ctx context.Context, method string, requestURL *url.URL, headers http.Header, body io.Reader, regOpts *RegistryOptions) (*http.Response, error) {
......
...@@ -666,8 +666,14 @@ func HeadBlobHandler(c *gin.Context) { ...@@ -666,8 +666,14 @@ func HeadBlobHandler(c *gin.Context) {
} }
func CreateBlobHandler(c *gin.Context) { func CreateBlobHandler(c *gin.Context) {
targetPath, err := GetBlobsPath(c.Param("digest"))
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
hash := sha256.New() hash := sha256.New()
temp, err := os.CreateTemp("", c.Param("digest")) temp, err := os.CreateTemp(filepath.Dir(targetPath), c.Param("digest")+"-")
if err != nil { if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return return
...@@ -690,12 +696,6 @@ func CreateBlobHandler(c *gin.Context) { ...@@ -690,12 +696,6 @@ func CreateBlobHandler(c *gin.Context) {
return return
} }
targetPath, err := GetBlobsPath(c.Param("digest"))
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
if err := os.Rename(temp.Name(), targetPath); err != nil { if err := os.Rename(temp.Name(), targetPath); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) c.AbortWithStatusJSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return return
...@@ -794,7 +794,7 @@ func Serve(ln net.Listener, allowOrigins []string) error { ...@@ -794,7 +794,7 @@ func Serve(ln net.Listener, allowOrigins []string) error {
if runtime.GOOS == "linux" { if runtime.GOOS == "linux" {
// check compatibility to log warnings // check compatibility to log warnings
if _, err := llm.CheckVRAM(); err != nil { if _, err := llm.CheckVRAM(); err != nil {
log.Printf("Warning: GPU support may not be enabled, check you have installed GPU drivers: %v", err) log.Printf(err.Error())
} }
} }
......
...@@ -5,9 +5,9 @@ import ( ...@@ -5,9 +5,9 @@ import (
"crypto/md5" "crypto/md5"
"errors" "errors"
"fmt" "fmt"
"hash"
"io" "io"
"log" "log"
"math"
"net/http" "net/http"
"net/url" "net/url"
"os" "os"
...@@ -35,6 +35,8 @@ type blobUpload struct { ...@@ -35,6 +35,8 @@ type blobUpload struct {
context.CancelFunc context.CancelFunc
file *os.File
done bool done bool
err error err error
references atomic.Int32 references atomic.Int32
...@@ -42,8 +44,8 @@ type blobUpload struct { ...@@ -42,8 +44,8 @@ type blobUpload struct {
const ( const (
numUploadParts = 64 numUploadParts = 64
minUploadPartSize int64 = 95 * 1000 * 1000 minUploadPartSize int64 = 100 * format.MegaByte
maxUploadPartSize int64 = 1000 * 1000 * 1000 maxUploadPartSize int64 = 1000 * format.MegaByte
) )
func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error { func (b *blobUpload) Prepare(ctx context.Context, requestURL *url.URL, opts *RegistryOptions) error {
...@@ -128,12 +130,12 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { ...@@ -128,12 +130,12 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
return return
} }
f, err := os.Open(p) b.file, err = os.Open(p)
if err != nil { if err != nil {
b.err = err b.err = err
return return
} }
defer f.Close() defer b.file.Close()
g, inner := errgroup.WithContext(ctx) g, inner := errgroup.WithContext(ctx)
g.SetLimit(numUploadParts) g.SetLimit(numUploadParts)
...@@ -145,7 +147,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { ...@@ -145,7 +147,6 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
g.Go(func() error { g.Go(func() error {
var err error var err error
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
part.ReadSeeker = io.NewSectionReader(f, part.Offset, part.Size)
err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts) err = b.uploadChunk(inner, http.MethodPatch, requestURL, part, opts)
switch { switch {
case errors.Is(err, context.Canceled): case errors.Is(err, context.Canceled):
...@@ -153,7 +154,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { ...@@ -153,7 +154,10 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
case errors.Is(err, errMaxRetriesExceeded): case errors.Is(err, errMaxRetriesExceeded):
return err return err
case err != nil: case err != nil:
log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err) part.Reset()
sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
time.Sleep(sleep)
continue continue
} }
...@@ -173,8 +177,16 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { ...@@ -173,8 +177,16 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
requestURL := <-b.nextURL requestURL := <-b.nextURL
var sb strings.Builder var sb strings.Builder
// calculate md5 checksum and add it to the commit request
for _, part := range b.Parts { for _, part := range b.Parts {
sb.Write(part.Sum(nil)) hash := md5.New()
if _, err := io.Copy(hash, io.NewSectionReader(b.file, part.Offset, part.Size)); err != nil {
b.err = err
return
}
sb.Write(hash.Sum(nil))
} }
md5sum := md5.Sum([]byte(sb.String())) md5sum := md5.Sum([]byte(sb.String()))
...@@ -188,29 +200,39 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) { ...@@ -188,29 +200,39 @@ func (b *blobUpload) Run(ctx context.Context, opts *RegistryOptions) {
headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Type", "application/octet-stream")
headers.Set("Content-Length", "0") headers.Set("Content-Length", "0")
resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts) for try := 0; try < maxRetries; try++ {
if err != nil { resp, err := makeRequestWithRetry(ctx, http.MethodPut, requestURL, headers, nil, opts)
b.err = err if err != nil {
b.err = err
if errors.Is(err, context.Canceled) {
return
}
sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
log.Printf("%s complete upload attempt %d failed: %v, retrying in %s", b.Digest[7:19], try, err, sleep)
time.Sleep(sleep)
continue
}
defer resp.Body.Close()
b.err = nil
b.done = true
return return
} }
defer resp.Body.Close()
b.done = true
} }
func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error { func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL *url.URL, part *blobUploadPart, opts *RegistryOptions) error {
part.Reset()
headers := make(http.Header) headers := make(http.Header)
headers.Set("Content-Type", "application/octet-stream") headers.Set("Content-Type", "application/octet-stream")
headers.Set("Content-Length", fmt.Sprintf("%d", part.Size)) headers.Set("Content-Length", fmt.Sprintf("%d", part.Size))
headers.Set("X-Redirect-Uploads", "1")
if method == http.MethodPatch { if method == http.MethodPatch {
headers.Set("X-Redirect-Uploads", "1")
headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1)) headers.Set("Content-Range", fmt.Sprintf("%d-%d", part.Offset, part.Offset+part.Size-1))
} }
resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(part.ReadSeeker, io.MultiWriter(part, part.Hash)), opts) sr := io.NewSectionReader(b.file, part.Offset, part.Size)
resp, err := makeRequest(ctx, method, requestURL, headers, io.TeeReader(sr, part), opts)
if err != nil { if err != nil {
return err return err
} }
...@@ -235,6 +257,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL ...@@ -235,6 +257,7 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
return err return err
} }
// retry uploading to the redirect URL
for try := 0; try < maxRetries; try++ { for try := 0; try < maxRetries; try++ {
err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil) err = b.uploadChunk(ctx, http.MethodPut, redirectURL, part, nil)
switch { switch {
...@@ -243,7 +266,10 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL ...@@ -243,7 +266,10 @@ func (b *blobUpload) uploadChunk(ctx context.Context, method string, requestURL
case errors.Is(err, errMaxRetriesExceeded): case errors.Is(err, errMaxRetriesExceeded):
return err return err
case err != nil: case err != nil:
log.Printf("%s part %d attempt %d failed: %v, retrying", b.Digest[7:19], part.N, try, err) part.Reset()
sleep := time.Second * time.Duration(math.Pow(2, float64(try)))
log.Printf("%s part %d attempt %d failed: %v, retrying in %s", b.Digest[7:19], part.N, try, err, sleep)
time.Sleep(sleep)
continue continue
} }
...@@ -301,7 +327,7 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er ...@@ -301,7 +327,7 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er
} }
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", b.Digest), Status: fmt.Sprintf("pushing %s", b.Digest[7:19]),
Digest: b.Digest, Digest: b.Digest,
Total: b.Total, Total: b.Total,
Completed: b.Completed.Load(), Completed: b.Completed.Load(),
...@@ -315,14 +341,10 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er ...@@ -315,14 +341,10 @@ func (b *blobUpload) Wait(ctx context.Context, fn func(api.ProgressResponse)) er
type blobUploadPart struct { type blobUploadPart struct {
// N is the part number // N is the part number
N int N int
Offset int64 Offset int64
Size int64 Size int64
hash.Hash
written int64 written int64
io.ReadSeeker
*blobUpload *blobUpload
} }
...@@ -334,10 +356,8 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) { ...@@ -334,10 +356,8 @@ func (p *blobUploadPart) Write(b []byte) (n int, err error) {
} }
func (p *blobUploadPart) Reset() { func (p *blobUploadPart) Reset() {
p.Seek(0, io.SeekStart)
p.Completed.Add(-int64(p.written)) p.Completed.Add(-int64(p.written))
p.written = 0 p.written = 0
p.Hash = md5.New()
} }
func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error { func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryOptions, fn func(api.ProgressResponse)) error {
...@@ -352,7 +372,7 @@ func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryO ...@@ -352,7 +372,7 @@ func uploadBlob(ctx context.Context, mp ModelPath, layer *Layer, opts *RegistryO
default: default:
defer resp.Body.Close() defer resp.Body.Close()
fn(api.ProgressResponse{ fn(api.ProgressResponse{
Status: fmt.Sprintf("uploading %s", layer.Digest), Status: fmt.Sprintf("pushing %s", layer.Digest[7:19]),
Digest: layer.Digest, Digest: layer.Digest,
Total: layer.Size, Total: layer.Size,
Completed: layer.Size, Completed: layer.Size,
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment