download.go 4.14 KB
Newer Older
1
2
3
4
package server

import (
	"context"
Michael Yang's avatar
Michael Yang committed
5
	"encoding/json"
6
7
8
9
10
	"errors"
	"fmt"
	"io"
	"log"
	"net/http"
Michael Yang's avatar
Michael Yang committed
11
	"net/url"
12
	"os"
Michael Yang's avatar
Michael Yang committed
13
	"path/filepath"
14
15
16
	"strconv"

	"github.com/jmorganca/ollama/api"
Michael Yang's avatar
Michael Yang committed
17
	"golang.org/x/sync/errgroup"
18
19
)

Michael Yang's avatar
Michael Yang committed
20
21
22
type BlobDownloadPart struct {
	Offset    int64
	Size      int64
23
24
25
	Completed int64
}

26
27
28
29
30
31
32
type downloadOpts struct {
	mp      ModelPath
	digest  string
	regOpts *RegistryOptions
	fn      func(api.ProgressResponse)
}

Michael Yang's avatar
Michael Yang committed
33
const maxRetries = 3
34

35
// downloadBlob downloads a blob from the registry and stores it in the blobs directory
36
37
func downloadBlob(ctx context.Context, opts downloadOpts) error {
	fp, err := GetBlobsPath(opts.digest)
38
39
40
41
	if err != nil {
		return err
	}

Michael Yang's avatar
Michael Yang committed
42
43
44
45
46
47
	fi, err := os.Stat(fp)
	switch {
	case errors.Is(err, os.ErrNotExist):
	case err != nil:
		return err
	default:
48
		opts.fn(api.ProgressResponse{
Michael Yang's avatar
Michael Yang committed
49
			Status:    fmt.Sprintf("downloading %s", opts.digest),
50
			Digest:    opts.digest,
Michael Yang's avatar
Michael Yang committed
51
52
			Total:     fi.Size(),
			Completed: fi.Size(),
53
54
55
56
57
		})

		return nil
	}

Michael Yang's avatar
Michael Yang committed
58
59
60
	f, err := os.OpenFile(fp+"-partial", os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		return err
61
	}
Michael Yang's avatar
Michael Yang committed
62
	defer f.Close()
63

Michael Yang's avatar
Michael Yang committed
64
65
	partFilePaths, err := filepath.Glob(fp + "-partial-*")
	if err != nil {
66
		return err
67
68
	}

Michael Yang's avatar
Michael Yang committed
69
70
71
72
73
	var total, completed int64
	var parts []BlobDownloadPart
	for _, partFilePath := range partFilePaths {
		var part BlobDownloadPart
		partFile, err := os.Open(partFilePath)
Bruce MacDonald's avatar
Bruce MacDonald committed
74
75
		if err != nil {
			return err
76
		}
Michael Yang's avatar
Michael Yang committed
77
78
79
80
		defer partFile.Close()

		if err := json.NewDecoder(partFile).Decode(&part); err != nil {
			return err
81
82
		}

Michael Yang's avatar
Michael Yang committed
83
84
		total += part.Size
		completed += part.Completed
85

Michael Yang's avatar
Michael Yang committed
86
87
		parts = append(parts, part)
	}
88

Michael Yang's avatar
Michael Yang committed
89
90
	requestURL := opts.mp.BaseURL()
	requestURL = requestURL.JoinPath("v2", opts.mp.GetNamespaceRepository(), "blobs", opts.digest)
91

Michael Yang's avatar
Michael Yang committed
92
93
	if len(parts) == 0 {
		resp, err := makeRequest(ctx, "HEAD", requestURL, nil, nil, opts.regOpts)
94
		if err != nil {
Michael Yang's avatar
Michael Yang committed
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
			return err
		}
		defer resp.Body.Close()

		total, _ = strconv.ParseInt(resp.Header.Get("Content-Length"), 10, 64)

		// reserve the file
		f.Truncate(total)

		var offset int64
		var size int64 = 64 * 1024 * 1024

		for offset < total {
			if offset+size > total {
				size = total - offset
			}

			parts = append(parts, BlobDownloadPart{
				Offset: offset,
				Size:   size,
			})

			offset += size
118
119
120
		}
	}

Michael Yang's avatar
Michael Yang committed
121
122
123
124
125
126
127
	pw := &ProgressWriter{
		status:    fmt.Sprintf("downloading %s", opts.digest),
		digest:    opts.digest,
		total:     total,
		completed: completed,
		fn:        opts.fn,
	}
Michael Yang's avatar
Michael Yang committed
128

Michael Yang's avatar
Michael Yang committed
129
130
131
132
133
134
135
	g, ctx := errgroup.WithContext(ctx)
	g.SetLimit(64)
	for i := range parts {
		part := parts[i]
		if part.Completed == part.Size {
			continue
		}
136

Michael Yang's avatar
Michael Yang committed
137
138
139
140
141
142
143
144
145
146
147
148
149
		i := i
		g.Go(func() error {
			for try := 0; try < maxRetries; try++ {
				if err := downloadBlobChunk(ctx, f, requestURL, parts, i, pw, opts); err != nil {
					log.Printf("%s part %d attempt %d failed: %v, retrying", opts.digest[7:19], i, try, err)
					continue
				}

				return nil
			}

			return errors.New("max retries exceeded")
		})
150
151
	}

Michael Yang's avatar
Michael Yang committed
152
153
	if err := g.Wait(); err != nil {
		return err
154
155
	}

Michael Yang's avatar
Michael Yang committed
156
157
158
159
160
161
162
163
	if err := f.Close(); err != nil {
		return err
	}

	for i := range parts {
		if err := os.Remove(f.Name() + "-" + strconv.Itoa(i)); err != nil {
			return err
		}
164
165
	}

Michael Yang's avatar
Michael Yang committed
166
167
	return os.Rename(f.Name(), fp)
}
168

Michael Yang's avatar
Michael Yang committed
169
170
func downloadBlobChunk(ctx context.Context, f *os.File, requestURL *url.URL, parts []BlobDownloadPart, i int, pw *ProgressWriter, opts downloadOpts) error {
	part := &parts[i]
171

Michael Yang's avatar
Michael Yang committed
172
173
174
	partName := f.Name() + "-" + strconv.Itoa(i)
	if err := flushPart(partName, part); err != nil {
		return err
175
176
	}

Michael Yang's avatar
Michael Yang committed
177
178
	offset := part.Offset + part.Completed
	w := io.NewOffsetWriter(f, offset)
179

Michael Yang's avatar
Michael Yang committed
180
181
182
183
184
185
186
	headers := make(http.Header)
	headers.Set("Range", fmt.Sprintf("bytes=%d-%d", offset, part.Offset+part.Size-1))
	resp, err := makeRequest(ctx, "GET", requestURL, headers, nil, opts.regOpts)
	if err != nil {
		return err
	}
	defer resp.Body.Close()
187

Michael Yang's avatar
Michael Yang committed
188
189
190
191
192
193
	n, err := io.Copy(w, io.TeeReader(resp.Body, pw))
	if err != nil && !errors.Is(err, io.EOF) {
		// rollback progress bar
		pw.completed -= n
		return err
	}
194

Michael Yang's avatar
Michael Yang committed
195
	part.Completed += n
196

Michael Yang's avatar
Michael Yang committed
197
198
199
200
201
202
203
	return flushPart(partName, part)
}

func flushPart(name string, part *BlobDownloadPart) error {
	partFile, err := os.OpenFile(name, os.O_CREATE|os.O_RDWR, 0644)
	if err != nil {
		return err
204
	}
Michael Yang's avatar
Michael Yang committed
205
	defer partFile.Close()
206

Michael Yang's avatar
Michael Yang committed
207
	return json.NewEncoder(partFile).Encode(part)
208
}