Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
ollama
Commits
27331ae3
Commit
27331ae3
authored
Jan 08, 2024
by
Michael Yang
Browse files
download: add inactivity monitor
if a download part is inactive for some time, restart it
parent
b6c0ef1e
Changes
1
Show whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
74 additions
and
43 deletions
+74
-43
server/download.go
server/download.go
+74
-43
No files found.
server/download.go
View file @
27331ae3
...
@@ -25,6 +25,11 @@ import (
...
@@ -25,6 +25,11 @@ import (
"github.com/jmorganca/ollama/format"
"github.com/jmorganca/ollama/format"
)
)
const
maxRetries
=
6
var
errMaxRetriesExceeded
=
errors
.
New
(
"max retries exceeded"
)
var
errPartStalled
=
errors
.
New
(
"part stalled"
)
var
blobDownloadManager
sync
.
Map
var
blobDownloadManager
sync
.
Map
type
blobDownload
struct
{
type
blobDownload
struct
{
...
@@ -48,6 +53,7 @@ type blobDownloadPart struct {
...
@@ -48,6 +53,7 @@ type blobDownloadPart struct {
Offset
int64
Offset
int64
Size
int64
Size
int64
Completed
int64
Completed
int64
lastUpdated
time
.
Time
*
blobDownload
`json:"-"`
*
blobDownload
`json:"-"`
}
}
...
@@ -72,6 +78,13 @@ func (p *blobDownloadPart) StopsAt() int64 {
...
@@ -72,6 +78,13 @@ func (p *blobDownloadPart) StopsAt() int64 {
return
p
.
Offset
+
p
.
Size
return
p
.
Offset
+
p
.
Size
}
}
func
(
p
*
blobDownloadPart
)
Write
(
b
[]
byte
)
(
n
int
,
err
error
)
{
n
=
len
(
b
)
p
.
blobDownload
.
Completed
.
Add
(
int64
(
n
))
p
.
lastUpdated
=
time
.
Now
()
return
n
,
nil
}
func
(
b
*
blobDownload
)
Prepare
(
ctx
context
.
Context
,
requestURL
*
url
.
URL
,
opts
*
RegistryOptions
)
error
{
func
(
b
*
blobDownload
)
Prepare
(
ctx
context
.
Context
,
requestURL
*
url
.
URL
,
opts
*
RegistryOptions
)
error
{
partFilePaths
,
err
:=
filepath
.
Glob
(
b
.
Name
+
"-partial-*"
)
partFilePaths
,
err
:=
filepath
.
Glob
(
b
.
Name
+
"-partial-*"
)
if
err
!=
nil
{
if
err
!=
nil
{
...
@@ -157,6 +170,9 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
...
@@ -157,6 +170,9 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
case
errors
.
Is
(
err
,
context
.
Canceled
),
errors
.
Is
(
err
,
syscall
.
ENOSPC
)
:
case
errors
.
Is
(
err
,
context
.
Canceled
),
errors
.
Is
(
err
,
syscall
.
ENOSPC
)
:
// return immediately if the context is canceled or the device is out of space
// return immediately if the context is canceled or the device is out of space
return
err
return
err
case
errors
.
Is
(
err
,
errPartStalled
)
:
try
--
continue
case
err
!=
nil
:
case
err
!=
nil
:
sleep
:=
time
.
Second
*
time
.
Duration
(
math
.
Pow
(
2
,
float64
(
try
)))
sleep
:=
time
.
Second
*
time
.
Duration
(
math
.
Pow
(
2
,
float64
(
try
)))
log
.
Printf
(
"%s part %d attempt %d failed: %v, retrying in %s"
,
b
.
Digest
[
7
:
19
],
part
.
N
,
try
,
err
,
sleep
)
log
.
Printf
(
"%s part %d attempt %d failed: %v, retrying in %s"
,
b
.
Digest
[
7
:
19
],
part
.
N
,
try
,
err
,
sleep
)
...
@@ -195,6 +211,8 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
...
@@ -195,6 +211,8 @@ func (b *blobDownload) run(ctx context.Context, requestURL *url.URL, opts *Regis
}
}
func
(
b
*
blobDownload
)
downloadChunk
(
ctx
context
.
Context
,
requestURL
*
url
.
URL
,
w
io
.
Writer
,
part
*
blobDownloadPart
,
opts
*
RegistryOptions
)
error
{
func
(
b
*
blobDownload
)
downloadChunk
(
ctx
context
.
Context
,
requestURL
*
url
.
URL
,
w
io
.
Writer
,
part
*
blobDownloadPart
,
opts
*
RegistryOptions
)
error
{
g
,
ctx
:=
errgroup
.
WithContext
(
ctx
)
g
.
Go
(
func
()
error
{
headers
:=
make
(
http
.
Header
)
headers
:=
make
(
http
.
Header
)
headers
.
Set
(
"Range"
,
fmt
.
Sprintf
(
"bytes=%d-%d"
,
part
.
StartsAt
(),
part
.
StopsAt
()
-
1
))
headers
.
Set
(
"Range"
,
fmt
.
Sprintf
(
"bytes=%d-%d"
,
part
.
StartsAt
(),
part
.
StopsAt
()
-
1
))
resp
,
err
:=
makeRequestWithRetry
(
ctx
,
http
.
MethodGet
,
requestURL
,
headers
,
nil
,
opts
)
resp
,
err
:=
makeRequestWithRetry
(
ctx
,
http
.
MethodGet
,
requestURL
,
headers
,
nil
,
opts
)
...
@@ -203,7 +221,7 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
...
@@ -203,7 +221,7 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
}
}
defer
resp
.
Body
.
Close
()
defer
resp
.
Body
.
Close
()
n
,
err
:=
io
.
Copy
(
w
,
io
.
TeeReader
(
resp
.
Body
,
b
))
n
,
err
:=
io
.
Copy
(
w
,
io
.
TeeReader
(
resp
.
Body
,
part
))
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
context
.
Canceled
)
&&
!
errors
.
Is
(
err
,
io
.
ErrUnexpectedEOF
)
{
if
err
!=
nil
&&
!
errors
.
Is
(
err
,
context
.
Canceled
)
&&
!
errors
.
Is
(
err
,
io
.
ErrUnexpectedEOF
)
{
// rollback progress
// rollback progress
b
.
Completed
.
Add
(
-
n
)
b
.
Completed
.
Add
(
-
n
)
...
@@ -217,6 +235,30 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
...
@@ -217,6 +235,30 @@ func (b *blobDownload) downloadChunk(ctx context.Context, requestURL *url.URL, w
// return nil or context.Canceled or UnexpectedEOF (resumable)
// return nil or context.Canceled or UnexpectedEOF (resumable)
return
err
return
err
})
g
.
Go
(
func
()
error
{
ticker
:=
time
.
NewTicker
(
time
.
Second
)
for
{
select
{
case
<-
ticker
.
C
:
if
part
.
Completed
>=
part
.
Size
{
return
nil
}
if
!
part
.
lastUpdated
.
IsZero
()
&&
time
.
Since
(
part
.
lastUpdated
)
>
5
*
time
.
Second
{
log
.
Printf
(
"%s part %d stalled; retrying"
,
b
.
Digest
[
7
:
19
],
part
.
N
)
// reset last updated
part
.
lastUpdated
=
time
.
Time
{}
return
errPartStalled
}
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
}
}
})
return
g
.
Wait
()
}
}
func
(
b
*
blobDownload
)
newPart
(
offset
,
size
int64
)
error
{
func
(
b
*
blobDownload
)
newPart
(
offset
,
size
int64
)
error
{
...
@@ -255,12 +297,6 @@ func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error
...
@@ -255,12 +297,6 @@ func (b *blobDownload) writePart(partName string, part *blobDownloadPart) error
return
json
.
NewEncoder
(
partFile
)
.
Encode
(
part
)
return
json
.
NewEncoder
(
partFile
)
.
Encode
(
part
)
}
}
func
(
b
*
blobDownload
)
Write
(
p
[]
byte
)
(
n
int
,
err
error
)
{
n
=
len
(
p
)
b
.
Completed
.
Add
(
int64
(
n
))
return
n
,
nil
}
func
(
b
*
blobDownload
)
acquire
()
{
func
(
b
*
blobDownload
)
acquire
()
{
b
.
references
.
Add
(
1
)
b
.
references
.
Add
(
1
)
}
}
...
@@ -279,10 +315,6 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
...
@@ -279,10 +315,6 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
for
{
for
{
select
{
select
{
case
<-
ticker
.
C
:
case
<-
ticker
.
C
:
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
}
fn
(
api
.
ProgressResponse
{
fn
(
api
.
ProgressResponse
{
Status
:
fmt
.
Sprintf
(
"pulling %s"
,
b
.
Digest
[
7
:
19
]),
Status
:
fmt
.
Sprintf
(
"pulling %s"
,
b
.
Digest
[
7
:
19
]),
Digest
:
b
.
Digest
,
Digest
:
b
.
Digest
,
...
@@ -293,6 +325,9 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
...
@@ -293,6 +325,9 @@ func (b *blobDownload) Wait(ctx context.Context, fn func(api.ProgressResponse))
if
b
.
done
||
b
.
err
!=
nil
{
if
b
.
done
||
b
.
err
!=
nil
{
return
b
.
err
return
b
.
err
}
}
case
<-
ctx
.
Done
()
:
return
ctx
.
Err
()
}
}
}
}
}
...
@@ -303,10 +338,6 @@ type downloadOpts struct {
...
@@ -303,10 +338,6 @@ type downloadOpts struct {
fn
func
(
api
.
ProgressResponse
)
fn
func
(
api
.
ProgressResponse
)
}
}
const
maxRetries
=
6
var
errMaxRetriesExceeded
=
errors
.
New
(
"max retries exceeded"
)
// downloadBlob downloads a blob from the registry and stores it in the blobs directory
// downloadBlob downloads a blob from the registry and stores it in the blobs directory
func
downloadBlob
(
ctx
context
.
Context
,
opts
downloadOpts
)
error
{
func
downloadBlob
(
ctx
context
.
Context
,
opts
downloadOpts
)
error
{
fp
,
err
:=
GetBlobsPath
(
opts
.
digest
)
fp
,
err
:=
GetBlobsPath
(
opts
.
digest
)
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment