Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
e95f8758
Unverified
Commit
e95f8758
authored
Aug 05, 2025
by
Ryan McCormick
Committed by
GitHub
Aug 05, 2025
Browse files
ci: Improve caching on pre-merge-rust (#2253)
parent
36c4ef5e
Changes
5
Hide whitespace changes
Inline
Side-by-side
Showing
5 changed files
with
50 additions
and
24 deletions
+50
-24
.github/workflows/pre-merge-rust.yml
.github/workflows/pre-merge-rust.yml
+38
-12
lib/runtime/src/pipeline/error.rs
lib/runtime/src/pipeline/error.rs
+2
-2
lib/runtime/src/pipeline/network/egress/addressed_router.rs
lib/runtime/src/pipeline/network/egress/addressed_router.rs
+1
-1
lib/runtime/src/pipeline/nodes/sources.rs
lib/runtime/src/pipeline/nodes/sources.rs
+6
-6
lib/runtime/src/pipeline/nodes/sources/base.rs
lib/runtime/src/pipeline/nodes/sources/base.rs
+3
-3
No files found.
.github/workflows/pre-merge-rust.yml
View file @
e95f8758
...
...
@@ -41,22 +41,44 @@ jobs:
matrix
:
{
dir
:
[
'
.'
,
'
lib/bindings/python'
,
'
lib/runtime/examples'
]
}
permissions
:
contents
:
read
env
:
RUSTC_WRAPPER
:
"
sccache"
steps
:
-
uses
:
actions/checkout@v4
-
name
:
Set up system dependencies
run
:
|
sudo apt-get update -y
sudo apt-get install -y protobuf-compiler
-
name
:
Cache Cargo Registry
uses
:
actions/cache@v3
-
name
:
Cache sccache artifacts
uses
:
actions/cache@v4
with
:
path
:
~/.cargo/registry
key
:
${{ runner.os }}-cargo-registry-${{ github.head_ref || github.ref_name }}-${{ hashFiles('**/Cargo.lock') }}
-
name
:
Cache Cargo Tools
uses
:
actions/cache@v3
path
:
~/.cache/sccache/
key
:
${{ runner.os }}-sccache-${{ matrix.dir }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys
:
${{ runner.os }}-sccache-${{ matrix.dir }}-
# TODO: Consider single target/ directory for all subdirectories/crates instead.
-
name
:
Cache cargo artifacts
uses
:
actions/cache@v4
with
:
path
:
~/.cargo/bin
key
:
${{ runner.os }}-cargo-tools-${{ github.head_ref || github.ref_name }}-${{ hashFiles('**/Cargo.lock') }}
path
:
|
~/.cargo/bin/
~/.cargo/registry/index/
~/.cargo/registry/cache/
~/.cargo/git/db/
${{ matrix.dir }}/target/
key
:
${{ runner.os }}-cargo-${{ matrix.dir }}-${{ hashFiles('**/Cargo.lock') }}
restore-keys
:
${{ runner.os }}-cargo-${{ matrix.dir }}-
-
name
:
Install sccache
run
:
|
SCCACHE_VERSION="0.10.0"
SCCACHE_FILE="sccache-v${SCCACHE_VERSION}-$(uname -m)-unknown-linux-musl"
SCCACHE_URL="https://github.com/mozilla/sccache/releases/download/v${SCCACHE_VERSION}/${SCCACHE_FILE}.tar.gz"
curl -L "${SCCACHE_URL}" | tar xz
mv ${SCCACHE_FILE}/sccache /usr/local/bin/sccache
chmod +x /usr/local/bin/sccache
-
name
:
Install Rust in dev environment
# Install Rust only to run GitHub Local Actions in (dev environment) using the `ACT` environment variable.
# See act usage: https://nektosact.com/introduction.html
...
...
@@ -68,10 +90,6 @@ jobs:
echo "$HOME/.cargo/bin" >> $GITHUB_PATH
-
name
:
Set up Rust Toolchain Components
run
:
rustup component add rustfmt clippy
-
name
:
Run Cargo Check
working-directory
:
${{ matrix.dir }}
run
:
cargo check --locked
timeout-minutes
:
5
-
name
:
Verify Code Formatting
working-directory
:
${{ matrix.dir }}
run
:
cargo fmt -- --check
...
...
@@ -83,6 +101,10 @@ jobs:
run
:
|
cargo-deny --version || cargo install cargo-deny@0.16.4
cargo-deny --no-default-features check --hide-inclusion-graph licenses bans --config ${{ github.workspace }}/deny.toml
# Have an explicit step to build tests first to separate time spent on build vs execution.
-
name
:
Compile Tests
working-directory
:
${{ matrix.dir }}
run
:
cargo test --locked --no-run
-
name
:
Run Doc Tests
working-directory
:
${{ matrix.dir }}
run
:
cargo doc --no-deps && cargo test --locked --doc
...
...
@@ -90,3 +112,7 @@ jobs:
working-directory
:
${{ matrix.dir }}
# NOTE: --all-targets doesn't run doc tests
run
:
cargo test --locked --all-targets
-
name
:
Run sccache stat for check
shell
:
bash
run
:
sccache --show-stats
lib/runtime/src/pipeline/error.rs
View file @
e95f8758
...
...
@@ -64,13 +64,13 @@ pub enum PipelineError {
/// terminating sink either cannot find the `oneshot` channel sender or the corresponding
/// receiver was dropped
#[error(
"Unlinked request; initiating request task was dropped or cancelled"
)]
Deta
t
chedStreamReceiver
,
DetachedStreamReceiver
,
// In the interim between when a response was made and when the stream was received, the
// Sender for the stream was dropped. This maybe a logic error in the pipeline; and become a
// panic/fatal error in the future.
#[error(
"Unlinked response; response task was dropped or cancelled"
)]
Deta
t
chedStreamSender
,
DetachedStreamSender
,
#[error(
"Serialzation Error: {0}"
)]
SerializationError
(
String
),
...
...
lib/runtime/src/pipeline/network/egress/addressed_router.rs
View file @
e95f8758
...
...
@@ -158,7 +158,7 @@ where
log
::
trace!
(
request_id
,
"awaiting transport handshake"
);
let
response_stream
=
response_stream_provider
.await
.map_err
(|
_
|
PipelineError
::
Deta
t
chedStreamReceiver
)
?
.map_err
(|
_
|
PipelineError
::
DetachedStreamReceiver
)
?
.map_err
(
PipelineError
::
ConnectionFailed
)
?
;
// TODO: Detect end-of-stream using Server-Sent Events (SSE)
...
...
lib/runtime/src/pipeline/nodes/sources.rs
View file @
e95f8758
...
...
@@ -85,7 +85,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// let mut sinks = self.sinks.lock().unwrap();
// let tx = sinks
// .remove(context.id())
// .ok_or(PipelineError::Deta
t
chedStreamReceiver)
// .ok_or(PipelineError::DetachedStreamReceiver)
// .map_err(|e| {
// data.context().stop_generating();
// e
...
...
@@ -94,7 +94,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// let ctx = data.context();
// tx.send(data)
// .map_err(|_| PipelineError::Deta
t
chedStreamReceiver)
// .map_err(|_| PipelineError::DetachedStreamReceiver)
// .map_err(|e| {
// ctx.stop_generating();
// e
...
...
@@ -121,7 +121,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// sinks.insert(request.id().to_string(), tx);
// }
// self.on_next(request, private::Token {}).await?;
// rx.await.map_err(|_| PipelineError::Deta
t
chedStreamSender)
// rx.await.map_err(|_| PipelineError::DetachedStreamSender)
// }
// }
...
...
@@ -161,7 +161,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// let mut sinks = self.sinks.lock().unwrap();
// let tx = sinks
// .remove(context.id())
// .ok_or(PipelineError::Deta
t
chedStreamReceiver)
// .ok_or(PipelineError::DetachedStreamReceiver)
// .map_err(|e| {
// data.context().stop_generating();
// e
...
...
@@ -170,7 +170,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// let ctx = data.context();
// tx.send(data)
// .map_err(|_| PipelineError::Deta
t
chedStreamReceiver)
// .map_err(|_| PipelineError::DetachedStreamReceiver)
// .map_err(|e| {
// ctx.stop_generating();
// e
...
...
@@ -197,7 +197,7 @@ pub struct SegmentSource<In: PipelineIO, Out: PipelineIO> {
// sinks.insert(request.id().to_string(), tx);
// }
// self.on_next(request, private::Token {}).await?;
// rx.await.map_err(|_| PipelineError::Deta
t
chedStreamSender)
// rx.await.map_err(|_| PipelineError::DetachedStreamSender)
// }
// }
...
...
lib/runtime/src/pipeline/nodes/sources/base.rs
View file @
e95f8758
...
...
@@ -52,7 +52,7 @@ impl<In: PipelineIO, Out: PipelineIO + AsyncEngineContextProvider> Sink<Out> for
let
mut
sinks
=
self
.sinks
.lock
()
.unwrap
();
let
tx
=
sinks
.remove
(
ctx
.id
())
.ok_or
(
PipelineError
::
Deta
t
chedStreamReceiver
)
.ok_or
(
PipelineError
::
DetachedStreamReceiver
)
.inspect_err
(|
_
|
{
ctx
.stop_generating
();
})
?
;
...
...
@@ -60,7 +60,7 @@ impl<In: PipelineIO, Out: PipelineIO + AsyncEngineContextProvider> Sink<Out> for
Ok
(
tx
.send
(
data
)
.map_err
(|
_
|
PipelineError
::
Deta
t
chedStreamReceiver
)
.map_err
(|
_
|
PipelineError
::
DetachedStreamReceiver
)
.inspect_err
(|
_
|
{
ctx
.stop_generating
();
})
?
)
...
...
@@ -76,7 +76,7 @@ impl<In: PipelineIO + Sync, Out: PipelineIO> AsyncEngine<In, Out, Error> for Fro
sinks
.insert
(
request
.id
()
.to_string
(),
tx
);
}
self
.on_next
(
request
,
private
::
Token
{})
.await
?
;
Ok
(
rx
.await
.map_err
(|
_
|
PipelineError
::
Deta
t
chedStreamSender
)
?
)
Ok
(
rx
.await
.map_err
(|
_
|
PipelineError
::
DetachedStreamSender
)
?
)
}
}
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment