Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
ollama
Commits
982c5354
Unverified
Commit
982c5354
authored
Aug 30, 2023
by
Michael Yang
Committed by
GitHub
Aug 30, 2023
Browse files
Merge pull request #428 from jmorganca/mxyng/upload-chunks
update upload chunks
parents
7df342a6
16b06699
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
91 additions
and
51 deletions
+91
-51
api/client.go
api/client.go
+2
-2
server/auth.go
server/auth.go
+1
-1
server/download.go
server/download.go
+1
-1
server/images.go
server/images.go
+9
-9
server/upload.go
server/upload.go
+78
-38
No files found.
api/client.go
View file @
982c5354
...
...
@@ -29,7 +29,7 @@ type Client struct {
}
func
checkError
(
resp
*
http
.
Response
,
body
[]
byte
)
error
{
if
resp
.
StatusCode
>=
200
&&
resp
.
StatusCode
<
400
{
if
resp
.
StatusCode
<
http
.
StatusBadRequest
{
return
nil
}
...
...
@@ -165,7 +165,7 @@ func (c *Client) stream(ctx context.Context, method, path string, data any, fn f
return
fmt
.
Errorf
(
errorResponse
.
Error
)
}
if
response
.
StatusCode
>=
400
{
if
response
.
StatusCode
>=
http
.
StatusBadRequest
{
return
StatusError
{
StatusCode
:
response
.
StatusCode
,
Status
:
response
.
Status
,
...
...
server/auth.go
View file @
982c5354
...
...
@@ -109,7 +109,7 @@ func getAuthToken(ctx context.Context, redirData AuthRedirect, regOpts *Registry
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!
=
http
.
Status
OK
{
if
resp
.
StatusCode
>
=
http
.
Status
BadRequest
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
""
,
fmt
.
Errorf
(
"on pull registry responded with code %d: %s"
,
resp
.
StatusCode
,
body
)
}
...
...
server/download.go
View file @
982c5354
...
...
@@ -168,7 +168,7 @@ func doDownload(ctx context.Context, opts downloadOpts, f *FileDownload) error {
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!
=
http
.
Status
OK
&&
resp
.
StatusCode
!=
http
.
StatusPartialConten
t
{
if
resp
.
StatusCode
>
=
http
.
Status
BadReques
t
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
fmt
.
Errorf
(
"%w: on download registry responded with code %d: %v"
,
errDownload
,
resp
.
StatusCode
,
string
(
body
))
}
...
...
server/images.go
View file @
982c5354
...
...
@@ -980,7 +980,7 @@ func PushModel(ctx context.Context, name string, regOpts *RegistryOptions, fn fu
continue
}
if
err
:=
uploadBlobChunked
(
ctx
,
mp
,
location
,
layer
,
regOpts
,
fn
);
err
!=
nil
{
if
err
:=
uploadBlobChunked
(
ctx
,
location
,
layer
,
regOpts
,
fn
);
err
!=
nil
{
log
.
Printf
(
"error uploading blob: %v"
,
err
)
return
err
}
...
...
@@ -1092,11 +1092,11 @@ func pullModelManifest(ctx context.Context, mp ModelPath, regOpts *RegistryOptio
}
defer
resp
.
Body
.
Close
()
// Check for success: For a successful upload, the Docker registry will respond with a 201 Created
if
resp
.
StatusCode
!=
http
.
StatusOK
{
if
resp
.
StatusCode
>=
http
.
StatusBadRequest
{
if
resp
.
StatusCode
==
http
.
StatusNotFound
{
return
nil
,
fmt
.
Errorf
(
"model not found"
)
}
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"on pull registry responded with code %d: %s"
,
resp
.
StatusCode
,
body
)
}
...
...
@@ -1157,7 +1157,7 @@ func checkBlobExistence(ctx context.Context, mp ModelPath, digest string, regOpt
defer
resp
.
Body
.
Close
()
// Check for success: If the blob exists, the Docker registry will respond with a 200 OK
return
resp
.
StatusCode
==
http
.
Status
OK
,
nil
return
resp
.
StatusCode
<
http
.
Status
BadRequest
,
nil
}
func
makeRequestWithRetry
(
ctx
context
.
Context
,
method
string
,
requestURL
*
url
.
URL
,
headers
http
.
Header
,
body
io
.
ReadSeeker
,
regOpts
*
RegistryOptions
)
(
*
http
.
Response
,
error
)
{
...
...
@@ -1171,10 +1171,8 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR
status
=
resp
.
Status
switch
resp
.
StatusCode
{
case
http
.
StatusAccepted
,
http
.
StatusCreated
:
return
resp
,
nil
case
http
.
StatusUnauthorized
:
switch
{
case
resp
.
StatusCode
==
http
.
StatusUnauthorized
:
auth
:=
resp
.
Header
.
Get
(
"www-authenticate"
)
authRedir
:=
ParseAuthRedirectString
(
auth
)
token
,
err
:=
getAuthToken
(
ctx
,
authRedir
,
regOpts
)
...
...
@@ -1190,9 +1188,11 @@ func makeRequestWithRetry(ctx context.Context, method string, requestURL *url.UR
}
continue
defaul
t
:
case
resp
.
StatusCode
>=
http
.
StatusBadReques
t
:
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
nil
,
fmt
.
Errorf
(
"on upload registry responded with code %d: %s"
,
resp
.
StatusCode
,
body
)
default
:
return
resp
,
nil
}
}
...
...
server/upload.go
View file @
982c5354
...
...
@@ -40,7 +40,7 @@ func startUpload(ctx context.Context, mp ModelPath, layer *Layer, regOpts *Regis
return
url
.
Parse
(
location
)
}
func
uploadBlobChunked
(
ctx
context
.
Context
,
mp
ModelPath
,
requestURL
*
url
.
URL
,
layer
*
Layer
,
regOpts
*
RegistryOptions
,
fn
func
(
api
.
ProgressResponse
))
error
{
func
uploadBlobChunked
(
ctx
context
.
Context
,
requestURL
*
url
.
URL
,
layer
*
Layer
,
regOpts
*
RegistryOptions
,
fn
func
(
api
.
ProgressResponse
))
error
{
// TODO allow resumability
// TODO allow canceling uploads via DELETE
...
...
@@ -55,48 +55,88 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l
}
defer
f
.
Close
()
var
completed
int64
chunkSize
:=
10
*
1024
*
1024
// 95MB chunk size
chunkSize
:=
95
*
1024
*
1024
for
{
chunk
:=
int64
(
layer
.
Size
)
-
completed
for
offset
:=
int64
(
0
);
offset
<
int64
(
layer
.
Size
);
{
chunk
:=
int64
(
layer
.
Size
)
-
offset
if
chunk
>
int64
(
chunkSize
)
{
chunk
=
int64
(
chunkSize
)
}
sectionReader
:=
io
.
NewSectionReader
(
f
,
int64
(
completed
),
chunk
)
headers
:=
make
(
http
.
Header
)
headers
.
Set
(
"Content-Type"
,
"application/octet-stream"
)
headers
.
Set
(
"Content-Length"
,
strconv
.
Itoa
(
int
(
chunk
)))
headers
.
Set
(
"Content-Range"
,
fmt
.
Sprintf
(
"%d-%d"
,
completed
,
completed
+
sectionReader
.
Size
()
-
1
))
resp
,
err
:=
makeRequestWithRetry
(
ctx
,
"PATCH"
,
requestURL
,
headers
,
sectionReader
,
regOpts
)
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
io
.
EOF
)
{
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"error uploading chunk: %v"
,
err
),
Digest
:
layer
.
Digest
,
Total
:
layer
.
Size
,
Completed
:
int
(
completed
),
})
return
err
}
defer
resp
.
Body
.
Close
()
completed
+=
sectionReader
.
Size
()
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"uploading %s"
,
layer
.
Digest
),
Digest
:
layer
.
Digest
,
Total
:
layer
.
Size
,
Completed
:
int
(
completed
),
})
requestURL
,
err
=
url
.
Parse
(
resp
.
Header
.
Get
(
"Location"
))
if
err
!=
nil
{
return
err
}
sectionReader
:=
io
.
NewSectionReader
(
f
,
int64
(
offset
),
chunk
)
for
try
:=
0
;
try
<
MaxRetries
;
try
++
{
r
,
w
:=
io
.
Pipe
()
defer
r
.
Close
()
go
func
()
{
defer
w
.
Close
()
for
chunked
:=
int64
(
0
);
chunked
<
chunk
;
{
n
,
err
:=
io
.
CopyN
(
w
,
sectionReader
,
1024
*
1024
)
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
io
.
EOF
)
{
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"error reading chunk: %v"
,
err
),
Digest
:
layer
.
Digest
,
Total
:
layer
.
Size
,
Completed
:
int
(
offset
),
})
return
}
chunked
+=
n
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"uploading %s"
,
layer
.
Digest
),
Digest
:
layer
.
Digest
,
Total
:
layer
.
Size
,
Completed
:
int
(
offset
)
+
int
(
chunked
),
})
}
}()
headers
:=
make
(
http
.
Header
)
headers
.
Set
(
"Content-Type"
,
"application/octet-stream"
)
headers
.
Set
(
"Content-Length"
,
strconv
.
Itoa
(
int
(
chunk
)))
headers
.
Set
(
"Content-Range"
,
fmt
.
Sprintf
(
"%d-%d"
,
offset
,
offset
+
sectionReader
.
Size
()
-
1
))
resp
,
err
:=
makeRequest
(
ctx
,
"PATCH"
,
requestURL
,
headers
,
r
,
regOpts
)
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
io
.
EOF
)
{
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"error uploading chunk: %v"
,
err
),
Digest
:
layer
.
Digest
,
Total
:
layer
.
Size
,
Completed
:
int
(
offset
),
})
return
err
}
defer
resp
.
Body
.
Close
()
switch
{
case
resp
.
StatusCode
==
http
.
StatusUnauthorized
:
auth
:=
resp
.
Header
.
Get
(
"www-authenticate"
)
authRedir
:=
ParseAuthRedirectString
(
auth
)
token
,
err
:=
getAuthToken
(
ctx
,
authRedir
,
regOpts
)
if
err
!=
nil
{
return
err
}
regOpts
.
Token
=
token
if
_
,
err
:=
sectionReader
.
Seek
(
0
,
io
.
SeekStart
);
err
!=
nil
{
return
err
}
continue
case
resp
.
StatusCode
>=
http
.
StatusBadRequest
:
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
fmt
.
Errorf
(
"on upload registry responded with code %d: %s"
,
resp
.
StatusCode
,
body
)
}
offset
+=
sectionReader
.
Size
()
requestURL
,
err
=
url
.
Parse
(
resp
.
Header
.
Get
(
"Location"
))
if
err
!=
nil
{
return
err
}
if
completed
>=
int64
(
layer
.
Size
)
{
break
}
}
...
...
@@ -117,7 +157,7 @@ func uploadBlobChunked(ctx context.Context, mp ModelPath, requestURL *url.URL, l
}
defer
resp
.
Body
.
Close
()
if
resp
.
StatusCode
!
=
http
.
Status
Created
{
if
resp
.
StatusCode
>
=
http
.
Status
BadRequest
{
body
,
_
:=
io
.
ReadAll
(
resp
.
Body
)
return
fmt
.
Errorf
(
"on finish upload registry responded with code %d: %v"
,
resp
.
StatusCode
,
string
(
body
))
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment