Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in / Register
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
2fd6592f
Commit
2fd6592f
authored
Feb 13, 2025
by
Ryan Olson
Committed by
GitHub
Feb 13, 2025
Browse files
fix: tcp updates + initial zmq (#176)
parent
f38aa469
Changes
19
Show whitespace changes
Inline
Side-by-side
Showing
19 changed files
with
1889 additions
and
335 deletions
+1889
-335
examples/rust/Cargo.lock
examples/rust/Cargo.lock
+259
-84
examples/rust/Cargo.toml
examples/rust/Cargo.toml
+0
-3
examples/rust/hello_world/Cargo.toml
examples/rust/hello_world/Cargo.toml
+0
-1
examples/rust/hello_world/src/bin/client.rs
examples/rust/hello_world/src/bin/client.rs
+3
-2
examples/rust/hello_world/src/bin/server.rs
examples/rust/hello_world/src/bin/server.rs
+2
-1
llm/rust/Cargo.lock
llm/rust/Cargo.lock
+129
-6
runtime/rust/Cargo.lock
runtime/rust/Cargo.lock
+259
-84
runtime/rust/Cargo.toml
runtime/rust/Cargo.toml
+3
-1
runtime/rust/python-wheel/Cargo.lock
runtime/rust/python-wheel/Cargo.lock
+270
-3
runtime/rust/src/config.rs
runtime/rust/src/config.rs
+28
-0
runtime/rust/src/lib.rs
runtime/rust/src/lib.rs
+1
-0
runtime/rust/src/logging.rs
runtime/rust/src/logging.rs
+292
-0
runtime/rust/src/pipeline/network.rs
runtime/rust/src/pipeline/network.rs
+18
-5
runtime/rust/src/pipeline/network/tcp.rs
runtime/rust/src/pipeline/network/tcp.rs
+1
-7
runtime/rust/src/pipeline/network/tcp/client.rs
runtime/rust/src/pipeline/network/tcp/client.rs
+80
-35
runtime/rust/src/pipeline/network/tcp/server.rs
runtime/rust/src/pipeline/network/tcp/server.rs
+121
-100
runtime/rust/src/transports.rs
runtime/rust/src/transports.rs
+1
-0
runtime/rust/src/transports/zmq.rs
runtime/rust/src/transports/zmq.rs
+418
-0
runtime/rust/tests/soak.rs
runtime/rust/tests/soak.rs
+4
-3
No files found.
examples/rust/Cargo.lock
View file @
2fd6592f
...
...
@@ -27,53 +27,18 @@ dependencies = [
]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "an
style-wincon
"
version = "
3.0.7
"
name = "an
droid_system_properties
"
version = "
0.1.5
"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "
ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e
"
checksum = "
819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311
"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
"libc",
]
[[package]]
...
...
@@ -312,6 +277,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytemuck"
version = "1.21.0"
...
...
@@ -373,10 +344,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "c
olorchoice
"
version = "
1.0
.3"
name = "c
hrono
"
version = "
0.4
.3
9
"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "const-oid"
...
...
@@ -716,29 +695,6 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"humantime",
"log",
]
[[package]]
name = "equivalent"
version = "1.0.1"
...
...
@@ -1006,7 +962,6 @@ checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
name = "hello_world"
version = "0.2.0"
dependencies = [
"env_logger",
"triton-distributed",
]
...
...
@@ -1115,6 +1070,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
...
...
@@ -1295,12 +1273,6 @@ dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.13.0"
...
...
@@ -1325,6 +1297,16 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "jwalk"
version = "0.8.1"
...
...
@@ -1397,6 +1379,15 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
...
...
@@ -1546,6 +1537,16 @@ dependencies = [
"signatory",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi 0.3.9",
]
[[package]]
name = "nuid"
version = "0.5.0"
...
...
@@ -1561,6 +1562,24 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "num_threads"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
...
...
@@ -1582,6 +1601,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.3"
...
...
@@ -1925,8 +1950,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
...
...
@@ -1937,9 +1971,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax
0.8.5
",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
...
...
@@ -2204,6 +2244,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
...
...
@@ -2392,6 +2441,16 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.37"
...
...
@@ -2400,7 +2459,9 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
"powerfmt",
"serde",
"time-core",
...
...
@@ -2670,6 +2731,50 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
...
...
@@ -2684,6 +2789,7 @@ dependencies = [
"async_zmq",
"blake3",
"bytes",
"chrono",
"derive-getters",
"derive_builder",
"educe",
...
...
@@ -2693,6 +2799,7 @@ dependencies = [
"futures",
"humantime",
"local-ip-address",
"log",
"nid",
"nix",
"nuid",
...
...
@@ -2708,6 +2815,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"validator",
"xxhash-rust",
...
...
@@ -2780,12 +2888,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.13.1"
...
...
@@ -2826,6 +2928,12 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "version-compare"
version = "0.2.0"
...
...
@@ -2872,6 +2980,64 @@ dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "winapi"
version = "0.2.8"
...
...
@@ -2915,6 +3081,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
...
...
examples/rust/Cargo.toml
View file @
2fd6592f
...
...
@@ -49,6 +49,3 @@ tokio-util = { version = "0.7", features = ["codec", "net"] }
tracing
=
{
version
=
"0.1"
}
validator
=
{
version
=
"0.20.0"
,
features
=
["derive"]
}
uuid
=
{
version
=
"1"
,
features
=
[
"v4"
,
"serde"
]
}
#
env_logger
=
"0.11.6"
examples/rust/hello_world/Cargo.toml
View file @
2fd6592f
...
...
@@ -25,4 +25,3 @@ homepage.workspace = true
triton-distributed
=
{
workspace
=
true
}
# third-party
env_logger
=
{
workspace
=
true
}
examples/rust/hello_world/src/bin/client.rs
View file @
2fd6592f
...
...
@@ -15,11 +15,12 @@
use
hello_world
::
DEFAULT_NAMESPACE
;
use
triton_distributed
::{
protocols
::
annotated
::
Annotated
,
stream
::
StreamExt
,
DistributedRuntime
,
Result
,
Runtime
,
Worker
,
logging
,
protocols
::
annotated
::
Annotated
,
stream
::
StreamExt
,
DistributedRuntime
,
Result
,
Runtime
,
Worker
,
};
fn
main
()
->
Result
<
()
>
{
env_
logg
er
::
init
();
logg
ing
::
init
();
let
worker
=
Worker
::
from_settings
()
?
;
worker
.execute
(
app
)
}
...
...
examples/rust/hello_world/src/bin/server.rs
View file @
2fd6592f
...
...
@@ -16,6 +16,7 @@
use
hello_world
::
DEFAULT_NAMESPACE
;
use
std
::
sync
::
Arc
;
use
triton_distributed
::{
logging
,
pipeline
::{
async_trait
,
network
::
Ingress
,
AsyncEngine
,
AsyncEngineContextProvider
,
Error
,
ManyOut
,
ResponseStream
,
SingleIn
,
...
...
@@ -25,7 +26,7 @@ use triton_distributed::{
};
fn
main
()
->
Result
<
()
>
{
env_
logg
er
::
init
();
logg
ing
::
init
();
let
worker
=
Worker
::
from_settings
()
?
;
worker
.execute
(
app
)
}
...
...
llm/rust/Cargo.lock
View file @
2fd6592f
...
...
@@ -1029,8 +1029,8 @@ dependencies = [
"aho-corasick",
"bstr",
"log",
"regex-automata",
"regex-syntax",
"regex-automata
0.4.9
",
"regex-syntax
0.8.5
",
]
[[package]]
...
...
@@ -1532,6 +1532,15 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
...
...
@@ -1687,6 +1696,16 @@ dependencies = [
"signatory",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi 0.3.9",
]
[[package]]
name = "nuid"
version = "0.5.0"
...
...
@@ -1711,6 +1730,15 @@ dependencies = [
"autocfg",
]
[[package]]
name = "num_threads"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
...
...
@@ -1732,6 +1760,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.3"
...
...
@@ -2000,7 +2034,7 @@ dependencies = [
"rand",
"rand_chacha",
"rand_xorshift",
"regex-syntax",
"regex-syntax
0.8.5
",
"rusty-fork",
"tempfile",
"unarray",
...
...
@@ -2207,8 +2241,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
...
...
@@ -2219,9 +2262,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax
0.8.5
",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
...
...
@@ -2574,6 +2623,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
...
...
@@ -2791,6 +2849,16 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.37"
...
...
@@ -2799,7 +2867,9 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
"powerfmt",
"serde",
"time-core",
...
...
@@ -3087,6 +3157,50 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
...
...
@@ -3101,6 +3215,7 @@ dependencies = [
"async_zmq",
"blake3",
"bytes",
"chrono",
"derive-getters",
"derive_builder",
"educe",
...
...
@@ -3110,6 +3225,7 @@ dependencies = [
"futures",
"humantime",
"local-ip-address",
"log",
"nid",
"nix",
"nuid",
...
...
@@ -3125,6 +3241,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"validator",
"xxhash-rust",
...
...
@@ -3285,6 +3402,12 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "version-compare"
version = "0.2.0"
...
...
runtime/rust/Cargo.lock
View file @
2fd6592f
...
...
@@ -27,53 +27,18 @@ dependencies = [
]
[[package]]
name = "anstream"
version = "0.6.18"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8acc5369981196006228e28809f761875c0327210a891e941f4c683b3a99529b"
dependencies = [
"anstyle",
"anstyle-parse",
"anstyle-query",
"anstyle-wincon",
"colorchoice",
"is_terminal_polyfill",
"utf8parse",
]
[[package]]
name = "anstyle"
version = "1.0.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "55cc3b69f167a1ef2e161439aa98aed94e6028e5f9a59be9a6ffb47aef1651f9"
[[package]]
name = "anstyle-parse"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3b2d16507662817a6a20a9ea92df6652ee4f94f914589377d69f3b21bc5798a9"
dependencies = [
"utf8parse",
]
[[package]]
name = "anstyle-query"
version = "1.1.2"
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "79947af37f4177cfead1110013d678905c37501914fba0efea834c3fe9a8d60c"
dependencies = [
"windows-sys 0.59.0",
]
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "an
style-wincon
"
version = "
3.0.7
"
name = "an
droid_system_properties
"
version = "
0.1.5
"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "
ca3534e77181a9cc07539ad51f2141fe32f6c3ffd4df76db8ad92346b003ae4e
"
checksum = "
819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311
"
dependencies = [
"anstyle",
"once_cell",
"windows-sys 0.59.0",
"libc",
]
[[package]]
...
...
@@ -318,6 +283,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytemuck"
version = "1.21.0"
...
...
@@ -379,10 +350,18 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "c
olorchoice
"
version = "
1.0
.3"
name = "c
hrono
"
version = "
0.4
.3
9
"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5b63caa9aa9397e2d9480a9b13673856c78d8ac123288526c37d7839f2a86990"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "const-oid"
...
...
@@ -722,29 +701,6 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "env_filter"
version = "0.1.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "186e05a59d4c50738528153b83b0b0194d3a29507dfec16eccd4b342903397d0"
dependencies = [
"log",
"regex",
]
[[package]]
name = "env_logger"
version = "0.11.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dcaee3d8e3cfc3fd92428d477bc97fc29ec8716d180c0d74c643bb26166660e0"
dependencies = [
"anstream",
"anstyle",
"env_filter",
"humantime",
"log",
]
[[package]]
name = "equivalent"
version = "1.0.1"
...
...
@@ -1113,6 +1069,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
...
...
@@ -1293,12 +1272,6 @@ dependencies = [
"libc",
]
[[package]]
name = "is_terminal_polyfill"
version = "1.70.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7943c866cc5cd64cbc25b2e01621d07fa8eb2a1a23160ee81ce38704e97b8ecf"
[[package]]
name = "itertools"
version = "0.13.0"
...
...
@@ -1323,6 +1296,16 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "jwalk"
version = "0.8.1"
...
...
@@ -1395,6 +1378,15 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
...
...
@@ -1544,6 +1536,16 @@ dependencies = [
"signatory",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi 0.3.9",
]
[[package]]
name = "nuid"
version = "0.5.0"
...
...
@@ -1559,6 +1561,24 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "num_threads"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
...
...
@@ -1580,6 +1600,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking_lot"
version = "0.12.3"
...
...
@@ -1923,8 +1949,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
...
...
@@ -1935,9 +1970,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax
0.8.5
",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
...
...
@@ -2202,6 +2243,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
...
...
@@ -2390,6 +2440,16 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.37"
...
...
@@ -2398,7 +2458,9 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
"powerfmt",
"serde",
"time-core",
...
...
@@ -2668,6 +2730,50 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
...
...
@@ -2683,16 +2789,17 @@ dependencies = [
"async_zmq",
"blake3",
"bytes",
"chrono",
"derive-getters",
"derive_builder",
"educe",
"either",
"env_logger",
"etcd-client",
"figment",
"futures",
"humantime",
"local-ip-address",
"log",
"nid",
"nix",
"nuid",
...
...
@@ -2708,6 +2815,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"validator",
"xxhash-rust",
...
...
@@ -2780,12 +2888,6 @@ version = "1.0.4"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be"
[[package]]
name = "utf8parse"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]]
name = "uuid"
version = "1.12.1"
...
...
@@ -2826,6 +2928,12 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "version-compare"
version = "0.2.0"
...
...
@@ -2872,6 +2980,64 @@ dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "winapi"
version = "0.2.8"
...
...
@@ -2915,6 +3081,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
...
...
runtime/rust/Cargo.toml
View file @
2fd6592f
...
...
@@ -38,12 +38,14 @@ async-trait = { version = "0.1" }
async_zmq
=
"0.4.0"
blake3
=
"1"
bytes
=
"1"
chrono
=
"0.4"
derive_builder
=
"0.20"
derive-getters
=
"0.5"
either
=
{
version
=
"1.13"
,
features
=
["serde"]
}
figment
=
{
version
=
"0.10.19"
,
features
=
[
"env"
,
"json"
,
"toml"
,
"test"
]
}
futures
=
{
version
=
"0.3"
}
humantime
=
{
version
=
"2.1.0"
}
log
=
{
version
=
"0.4"
}
once_cell
=
"1"
prometheus
=
{
version
=
"0.13"
}
regex
=
{
version
=
"1"
}
...
...
@@ -55,6 +57,7 @@ tokio = { version = "1", features = ["full"] }
tokio-stream
=
{
version
=
"0.1"
}
tokio-util
=
{
version
=
"0.7"
,
features
=
[
"codec"
,
"net"
]
}
tracing
=
{
version
=
"0.1"
}
tracing-subscriber
=
{
version
=
"0.3"
,
features
=
[
"env-filter"
,
"local-time"
,
"json"
]
}
uuid
=
{
version
=
"1"
,
features
=
[
"v4"
,
"serde"
]
}
validator
=
{
version
=
"0.20"
,
features
=
["derive"]
}
xxhash-rust
=
{
version
=
"0.8"
,
features
=
[
"xxh3"
,
"const_xxh3"
]
}
...
...
@@ -71,4 +74,3 @@ rand = { version = "0.8"}
[dev-dependencies]
assert_matches
=
"1.5.0"
env_logger
=
"0.11"
runtime/rust/python-wheel/Cargo.lock
View file @
2fd6592f
...
...
@@ -26,6 +26,21 @@ dependencies = [
"memchr",
]
[[package]]
name = "android-tzdata"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e999941b234f3131b00bc13c22d06e8c5ff726d1b6318ac7eb276997bbb4fef0"
[[package]]
name = "android_system_properties"
version = "0.1.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "819e7219dbd41043ac279b19830f2efc897156490d7fd6ea916720117ee66311"
dependencies = [
"libc",
]
[[package]]
name = "anstream"
version = "0.6.18"
...
...
@@ -324,6 +339,12 @@ dependencies = [
"generic-array",
]
[[package]]
name = "bumpalo"
version = "3.17.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1628fb46dfa0b37568d12e5edd512553eccf6a22a78e8bde00bb4aed84d5bdbf"
[[package]]
name = "bytemuck"
version = "1.21.0"
...
...
@@ -384,6 +405,20 @@ version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]]
name = "chrono"
version = "0.4.39"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e36cc9d416881d2e24f9a963be5fb1cd90966419ac844274161d10488b3e825"
dependencies = [
"android-tzdata",
"iana-time-zone",
"js-sys",
"num-traits",
"wasm-bindgen",
"windows-targets",
]
[[package]]
name = "clap"
version = "4.5.28"
...
...
@@ -1153,6 +1188,29 @@ dependencies = [
"tracing",
]
[[package]]
name = "iana-time-zone"
version = "0.1.61"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "235e081f3925a06703c2d0117ea8b91f042756fd6e7a6e5d901e8ca1a996b220"
dependencies = [
"android_system_properties",
"core-foundation-sys",
"iana-time-zone-haiku",
"js-sys",
"wasm-bindgen",
"windows-core",
]
[[package]]
name = "iana-time-zone-haiku"
version = "0.1.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f31827a206f56af32e590ba56d5d2d085f558508192593743f16b2306495269f"
dependencies = [
"cc",
]
[[package]]
name = "icu_collections"
version = "1.5.0"
...
...
@@ -1378,6 +1436,16 @@ dependencies = [
"libc",
]
[[package]]
name = "js-sys"
version = "0.3.77"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1cfaf33c695fc6e08064efbc1f72ec937429614f25eef83af942d0e227c3a28f"
dependencies = [
"once_cell",
"wasm-bindgen",
]
[[package]]
name = "jwalk"
version = "0.8.1"
...
...
@@ -1450,6 +1518,15 @@ version = "0.4.25"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "04cbf5b083de1c7e0222a7a51dbfdba1cbe1c6ab0b15e29fff3f6c077fd9cd9f"
[[package]]
name = "matchers"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8263075bb86c5a1b1427b5ae862e8889656f126e9f77c484496e8b47cf5c5558"
dependencies = [
"regex-automata 0.1.10",
]
[[package]]
name = "matchit"
version = "0.7.3"
...
...
@@ -1608,6 +1685,16 @@ dependencies = [
"signatory",
]
[[package]]
name = "nu-ansi-term"
version = "0.46.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77a8165726e8236064dbb45459242600304b42a5ea24ee2948e18e023bf7ba84"
dependencies = [
"overload",
"winapi 0.3.9",
]
[[package]]
name = "nuid"
version = "0.5.0"
...
...
@@ -1623,6 +1710,24 @@ version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "51d515d32fb182ee37cda2ccdcb92950d6a3c2893aa280e540671c2cd0f3b1d9"
[[package]]
name = "num-traits"
version = "0.2.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841"
dependencies = [
"autocfg",
]
[[package]]
name = "num_threads"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5c7398b9c8b70908f6371f47ed36737907c87c52af34c268fed0bf0ceb92ead9"
dependencies = [
"libc",
]
[[package]]
name = "object"
version = "0.36.7"
...
...
@@ -1644,6 +1749,12 @@ version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d05e27ee213611ffe7d6348b942e8f942b37114c00cc03cec254295a4a17852e"
[[package]]
name = "overload"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b15813163c1d831bf4a13c3610c05c0d03b39feb07f7e09fa234dac9b15aaf39"
[[package]]
name = "parking"
version = "2.2.1"
...
...
@@ -2094,8 +2205,17 @@ checksum = "b544ef1b4eac5dc2db33ea63606ae9ffcfac26c1416a2806ae0bf5f56b201191"
dependencies = [
"aho-corasick",
"memchr",
"regex-automata",
"regex-syntax",
"regex-automata 0.4.9",
"regex-syntax 0.8.5",
]
[[package]]
name = "regex-automata"
version = "0.1.10"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6c230d73fb8d8c1b9c0b3135c5142a8acee3a0558fb8db5cf1cb65f8d7862132"
dependencies = [
"regex-syntax 0.6.29",
]
[[package]]
...
...
@@ -2106,9 +2226,15 @@ checksum = "809e8dc61f6de73b46c85f4c96486310fe304c434cfa43669d7b40f711150908"
dependencies = [
"aho-corasick",
"memchr",
"regex-syntax",
"regex-syntax
0.8.5
",
]
[[package]]
name = "regex-syntax"
version = "0.6.29"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f162c6dd7b008981e4d40210aca20b4bd0f9b60ca9271061b07f78537722f2e1"
[[package]]
name = "regex-syntax"
version = "0.8.5"
...
...
@@ -2373,6 +2499,15 @@ dependencies = [
"digest",
]
[[package]]
name = "sharded-slab"
version = "0.1.7"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f40ca3c46823713e0d4209592e8d6e826aa57e928f09752619fc696c499637f6"
dependencies = [
"lazy_static",
]
[[package]]
name = "shlex"
version = "1.3.0"
...
...
@@ -2581,6 +2716,16 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "thread_local"
version = "1.1.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b9ef9bad013ada3808854ceac7b46812a6465ba368859a37e2100283d2d719c"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
]
[[package]]
name = "time"
version = "0.3.37"
...
...
@@ -2589,7 +2734,9 @@ checksum = "35e7868883861bd0e56d9ac6efcaaca0d6d5d82a2a7ec8209ff492c07cf37b21"
dependencies = [
"deranged",
"itoa",
"libc",
"num-conv",
"num_threads",
"powerfmt",
"serde",
"time-core",
...
...
@@ -2859,6 +3006,50 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e672c95779cf947c5311f83787af4fa8fffd12fb27e4993211a84bdfd9610f9c"
dependencies = [
"once_cell",
"valuable",
]
[[package]]
name = "tracing-log"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ee855f1f400bd0e5c02d150ae5de3840039a3f54b025156404e34c23c03f47c3"
dependencies = [
"log",
"once_cell",
"tracing-core",
]
[[package]]
name = "tracing-serde"
version = "0.2.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "704b1aeb7be0d0a84fc9828cae51dab5970fee5088f83d1dd7ee6f6246fc6ff1"
dependencies = [
"serde",
"tracing-core",
]
[[package]]
name = "tracing-subscriber"
version = "0.3.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e8189decb5ac0fa7bc8b96b7cb9b2701d60d48805aca84a238004d665fcc4008"
dependencies = [
"matchers",
"nu-ansi-term",
"once_cell",
"regex",
"serde",
"serde_json",
"sharded-slab",
"smallvec",
"thread_local",
"time",
"tracing",
"tracing-core",
"tracing-log",
"tracing-serde",
]
[[package]]
...
...
@@ -2873,6 +3064,7 @@ dependencies = [
"async_zmq",
"blake3",
"bytes",
"chrono",
"derive-getters",
"derive_builder",
"educe",
...
...
@@ -2882,6 +3074,7 @@ dependencies = [
"futures",
"humantime",
"local-ip-address",
"log",
"nid",
"nix",
"nuid",
...
...
@@ -2897,6 +3090,7 @@ dependencies = [
"tokio-stream",
"tokio-util",
"tracing",
"tracing-subscriber",
"uuid",
"validator",
"xxhash-rust",
...
...
@@ -3039,6 +3233,12 @@ dependencies = [
"syn 2.0.98",
]
[[package]]
name = "valuable"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ba73ea9cf16a25df0c8caa16c51acb937d5712a8429db78a3ee29d5dcacd3a65"
[[package]]
name = "version-compare"
version = "0.2.0"
...
...
@@ -3085,6 +3285,64 @@ dependencies = [
"wit-bindgen-rt",
]
[[package]]
name = "wasm-bindgen"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1edc8929d7499fc4e8f0be2262a241556cfc54a0bea223790e71446f2aab1ef5"
dependencies = [
"cfg-if 1.0.0",
"once_cell",
"rustversion",
"wasm-bindgen-macro",
]
[[package]]
name = "wasm-bindgen-backend"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2f0a0651a5c2bc21487bde11ee802ccaf4c51935d0d3d42a6101f98161700bc6"
dependencies = [
"bumpalo",
"log",
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-macro"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7fe63fc6d09ed3792bd0897b314f53de8e16568c2b3f7982f468c0bf9bd0b407"
dependencies = [
"quote",
"wasm-bindgen-macro-support",
]
[[package]]
name = "wasm-bindgen-macro-support"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8ae87ea40c9f689fc23f209965b6fb8a99ad69aeeb0231408be24920604395de"
dependencies = [
"proc-macro2",
"quote",
"syn 2.0.98",
"wasm-bindgen-backend",
"wasm-bindgen-shared",
]
[[package]]
name = "wasm-bindgen-shared"
version = "0.2.100"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1a05d73b933a847d6cccdda8f838a22ff101ad9bf93e33684f39c1f5f0eece3d"
dependencies = [
"unicode-ident",
]
[[package]]
name = "winapi"
version = "0.2.8"
...
...
@@ -3128,6 +3386,15 @@ version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "windows-core"
version = "0.52.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "33ab640c8d7e35bf8ba19b884ba838ceb4fba93a4e8c65a9059d08afcfc683d9"
dependencies = [
"windows-targets",
]
[[package]]
name = "windows-sys"
version = "0.52.0"
...
...
runtime/rust/src/config.rs
View file @
2fd6592f
...
...
@@ -133,3 +133,31 @@ impl RuntimeConfigBuilder {
Ok
(
config
)
}
}
/// Check if an environment variable is truthy
pub
fn
env_is_truthy
(
env
:
&
str
)
->
bool
{
match
std
::
env
::
var
(
env
)
{
Ok
(
val
)
=>
is_truthy
(
val
.as_str
()),
Err
(
_
)
=>
false
,
}
}
/// Check if a string is truthy
/// This will be used to evaluate environment variables or any other subjective
/// configuration parameters that can be set by the user that should be evaluated
/// as a boolean value.
pub
fn
is_truthy
(
val
:
&
str
)
->
bool
{
matches!
(
val
.to_lowercase
()
.as_str
(),
"1"
|
"true"
|
"on"
|
"yes"
)
}
/// Check whether JSONL logging enabled
/// Set the `TRD_LOGGING_JSONL` environment variable a [`is_truthy`] value
pub
fn
jsonl_logging_enabled
()
->
bool
{
env_is_truthy
(
"TRD_LOGGING_JSONL"
)
}
/// Check whether logging with ANSI terminal escape codes and colors is disabled.
/// Set the `TRD_SDK_DISABLE_ANSI_LOGGING` environment variable a [`is_truthy`] value
pub
fn
disable_ansi_logging
()
->
bool
{
env_is_truthy
(
"TRD_SDK_DISABLE_ANSI_LOGGING"
)
}
runtime/rust/src/lib.rs
View file @
2fd6592f
...
...
@@ -30,6 +30,7 @@ pub use config::RuntimeConfig;
pub
mod
component
;
pub
mod
discovery
;
pub
mod
engine
;
pub
mod
logging
;
pub
mod
pipeline
;
pub
mod
protocols
;
pub
mod
runtime
;
...
...
runtime/rust/src/logging.rs
0 → 100644
View file @
2fd6592f
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! Triton Distributed Logging Module.
//!
//! - Configuration loaded from:
//! 1. Environment variables (highest priority).
//! 2. Optional TOML file pointed to by the `TRD_LOGGING_CONFIG_PATH` environment variable.
//! 3. `/opt/triton/etc/logging.toml`.
//!
//! Logging can take two forms: `READABLE` or `JSONL`. The default is `READABLE`. `JSONL`
//! can be enabled by setting the `TRD_LOGGING_JSONL` environment variable to `1`.
//!
//! Filters can be configured using the `TRD_LOG` environment variable or by setting the `filters`
//! key in the TOML configuration file. Filters are comma-separated key-value pairs where the key
//! is the crate or module name and the value is the log level. The default log level is `error`.
//!
//! Example:
//! ```toml
//! log_level = "error"
//!
//! [log_filters]
//! "test_logging" = "info"
//! "test_logging::api" = "trace"
//! ```
use
std
::
collections
::{
BTreeMap
,
HashMap
};
use
std
::
sync
::
Once
;
use
figment
::{
providers
::{
Format
,
Serialized
,
Toml
},
Figment
,
};
use
serde
::{
Deserialize
,
Serialize
};
use
tracing
::{
Event
,
Subscriber
};
use
tracing_subscriber
::
fmt
::{
format
::
Writer
,
FormattedFields
};
use
tracing_subscriber
::
fmt
::{
FmtContext
,
FormatFields
};
use
tracing_subscriber
::
prelude
::
*
;
use
tracing_subscriber
::
registry
::
LookupSpan
;
use
tracing_subscriber
::
EnvFilter
;
use
tracing_subscriber
::{
filter
::
Directive
,
fmt
};
/// ENV used to set the log level
const
FILTER_ENV
:
&
str
=
"TRD_LOG"
;
/// Default log level
const
DEFAULT_FILTER_LEVEL
:
&
str
=
"info"
;
/// ENV used to set the path to the logging configuration file
const
CONFIG_PATH_ENV
:
&
str
=
"TRD_LOGGING_CONFIG_PATH"
;
/// Once instance to ensure the logger is only initialized once
static
INIT
:
Once
=
Once
::
new
();
#[derive(Serialize,
Deserialize,
Debug)]
struct
LoggingConfig
{
log_level
:
String
,
log_filters
:
HashMap
<
String
,
String
>
,
}
impl
Default
for
LoggingConfig
{
fn
default
()
->
Self
{
LoggingConfig
{
log_level
:
DEFAULT_FILTER_LEVEL
.to_string
(),
log_filters
:
HashMap
::
new
(),
}
}
}
/// Initialize the logger
pub
fn
init
()
{
INIT
.call_once
(||
{
let
config
=
load_config
();
// Examples to remove noise
// .add_directive("rustls=warn".parse()?)
// .add_directive("tokio_util::codec=warn".parse()?)
let
mut
filter_layer
=
EnvFilter
::
builder
()
.with_default_directive
(
config
.log_level
.parse
()
.unwrap
())
.with_env_var
(
FILTER_ENV
)
.from_env_lossy
();
// apply the log_filters from the config files
for
(
module
,
level
)
in
config
.log_filters
{
match
format!
(
"{module}={level}"
)
.parse
::
<
Directive
>
()
{
Ok
(
d
)
=>
{
filter_layer
=
filter_layer
.add_directive
(
d
);
}
Err
(
e
)
=>
{
eprintln!
(
"Failed parsing filter '{level}' for module '{module}': {e}"
);
}
}
}
if
crate
::
config
::
jsonl_logging_enabled
()
{
let
l
=
fmt
::
layer
()
.with_ansi
(
false
)
// ansi terminal escapes and colors always disabled
.event_format
(
CustomJsonFormatter
)
.with_writer
(
std
::
io
::
stderr
)
.with_filter
(
filter_layer
);
//let l = fmt::layer().json().with_filter(filter_layer);
tracing_subscriber
::
registry
()
.with
(
l
)
.init
();
}
else
{
let
l
=
fmt
::
layer
()
.with_ansi
(
!
crate
::
config
::
disable_ansi_logging
())
.event_format
(
fmt
::
format
()
.compact
())
.with_writer
(
std
::
io
::
stderr
)
.with_filter
(
filter_layer
);
tracing_subscriber
::
registry
()
.with
(
l
)
.init
();
};
});
}
/// Log a message with file and line info
/// Used by Python wrapper
pub
fn
log_message
(
level
:
&
str
,
message
:
&
str
,
module
:
&
str
,
file
:
&
str
,
line
:
u32
)
{
let
level
=
match
level
{
"debug"
=>
log
::
Level
::
Debug
,
"info"
=>
log
::
Level
::
Info
,
"warn"
=>
log
::
Level
::
Warn
,
"error"
=>
log
::
Level
::
Error
,
"warning"
=>
log
::
Level
::
Warn
,
_
=>
log
::
Level
::
Info
,
};
log
::
logger
()
.log
(
&
log
::
Record
::
builder
()
.args
(
format_args!
(
"{}"
,
message
))
.level
(
level
)
.target
(
module
)
.file
(
Some
(
file
))
.line
(
Some
(
line
))
.build
(),
);
}
// TODO: This should be merged into the global config (rust/common/src/config.rs) once we have it
fn
load_config
()
->
LoggingConfig
{
let
config_path
=
std
::
env
::
var
(
CONFIG_PATH_ENV
)
.unwrap_or_else
(|
_
|
""
.to_string
());
let
figment
=
Figment
::
new
()
.merge
(
Serialized
::
defaults
(
LoggingConfig
::
default
()))
.merge
(
Toml
::
file
(
"/opt/triton/etc/logging.toml"
))
.merge
(
Toml
::
file
(
config_path
));
figment
.extract
()
.unwrap
()
}
#[derive(Serialize)]
struct
JsonLog
<
'a
>
{
time
:
String
,
level
:
String
,
#[serde(skip_serializing_if
=
"Option::is_none"
)]
file_path
:
Option
<&
'a
str
>
,
#[serde(skip_serializing_if
=
"Option::is_none"
)]
line_number
:
Option
<
u32
>
,
message
:
serde_json
::
Value
,
#[serde(flatten)]
fields
:
BTreeMap
<
String
,
serde_json
::
Value
>
,
}
/// Some teams (NVCF) require specific JSON style
struct
CustomJsonFormatter
;
impl
<
S
,
N
>
tracing_subscriber
::
fmt
::
FormatEvent
<
S
,
N
>
for
CustomJsonFormatter
where
S
:
Subscriber
+
for
<
'a
>
LookupSpan
<
'a
>
,
N
:
for
<
'a
>
FormatFields
<
'a
>
+
'static
,
{
fn
format_event
(
&
self
,
ctx
:
&
FmtContext
<
'_
,
S
,
N
>
,
mut
writer
:
Writer
<
'_
>
,
event
:
&
Event
<
'_
>
,
)
->
std
::
fmt
::
Result
{
let
mut
visitor
=
JsonVisitor
::
default
();
event
.record
(
&
mut
visitor
);
let
message
=
visitor
.fields
.remove
(
"message"
)
.unwrap_or
(
serde_json
::
Value
::
String
(
""
.to_string
()));
let
current_span
=
event
.parent
()
.and_then
(|
id
|
ctx
.span
(
id
))
.or_else
(||
ctx
.lookup_current
());
if
let
Some
(
span
)
=
current_span
{
let
ext
=
span
.extensions
();
// This won't work is there's a space in the string, and loses the types making every
// span attribute a string.
// I think the correct way is to make a Layer.
// tracing_subscriber makes everything far more complicated than necessary.
let
data
=
ext
.get
::
<
FormattedFields
<
N
>>
()
.unwrap
();
let
span_fields
:
Vec
<
(
&
str
,
&
str
)
>
=
data
.fields
.split
(
' '
)
.filter_map
(|
entry
|
entry
.split_once
(
'='
))
.collect
();
for
(
name
,
value
)
in
span_fields
{
visitor
.fields
.insert
(
name
.to_string
(),
serde_json
::
Value
::
String
(
value
.trim_matches
(
'"'
)
.to_string
()),
);
}
visitor
.fields
.insert
(
"span_name"
.to_string
(),
serde_json
::
Value
::
String
(
span
.name
()
.to_string
()),
);
}
let
metadata
=
event
.metadata
();
let
log
=
JsonLog
{
level
:
metadata
.level
()
.to_string
(),
time
:
format!
(
"{}"
,
chrono
::
Local
::
now
()
.format
(
"%m-%d %H:%M:%S.%3f"
)),
file_path
:
if
cfg!
(
debug_assertions
)
{
metadata
.file
()
}
else
{
None
},
line_number
:
if
cfg!
(
debug_assertions
)
{
metadata
.line
()
}
else
{
None
},
message
,
fields
:
visitor
.fields
,
};
let
json
=
serde_json
::
to_string
(
&
log
)
.unwrap
();
writeln!
(
writer
,
"{json}"
)
}
}
// Visitor to collect fields
#[derive(Default)]
struct
JsonVisitor
{
// BTreeMap so that it's sorted, and always prints in the same order
fields
:
BTreeMap
<
String
,
serde_json
::
Value
>
,
}
impl
tracing
::
field
::
Visit
for
JsonVisitor
{
fn
record_debug
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
&
dyn
std
::
fmt
::
Debug
)
{
self
.fields
.insert
(
field
.name
()
.to_string
(),
serde_json
::
Value
::
String
(
format!
(
"{value:?}"
)),
);
}
fn
record_str
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
&
str
)
{
self
.fields
.insert
(
field
.name
()
.to_string
(),
serde_json
::
Value
::
String
(
value
.to_string
()),
);
}
fn
record_bool
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
bool
)
{
self
.fields
.insert
(
field
.name
()
.to_string
(),
serde_json
::
Value
::
Bool
(
value
));
}
fn
record_i64
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
i64
)
{
self
.fields
.insert
(
field
.name
()
.to_string
(),
serde_json
::
Value
::
Number
(
value
.into
()),
);
}
fn
record_u64
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
u64
)
{
self
.fields
.insert
(
field
.name
()
.to_string
(),
serde_json
::
Value
::
Number
(
value
.into
()),
);
}
fn
record_f64
(
&
mut
self
,
field
:
&
tracing
::
field
::
Field
,
value
:
f64
)
{
use
serde_json
::
value
::
Number
;
self
.fields
.insert
(
field
.name
()
.to_string
(),
// Infinite or NaN values are not JSON numbers, replace them with 0.
// It's unlikely that we would log an inf or nan value.
serde_json
::
Value
::
Number
(
Number
::
from_f64
(
value
)
.unwrap_or
(
0
.into
())),
);
}
}
runtime/rust/src/pipeline/network.rs
View file @
2fd6592f
...
...
@@ -53,6 +53,14 @@ pub enum StreamType {
Response
,
}
#[derive(Debug,
Clone,
Serialize,
Deserialize,
PartialEq,
Eq)]
#[serde(rename_all
=
"snake_case"
)]
pub
enum
ControlMessage
{
Stop
,
Kill
,
Sentinel
,
}
/// This is the first message in a `ResponseStream`. This is not a message that gets process
/// by the general pipeline, but is a control message that is awaited before the
/// [`AsyncEngine::generate`] method is allowed to return.
...
...
@@ -141,11 +149,16 @@ pub struct StreamSender {
}
impl
StreamSender
{
pub
async
fn
send
(
&
self
,
data
:
Bytes
)
->
Result
<
(),
String
>
{
self
.tx
.send
(
TwoPartMessage
::
from_data
(
data
))
.await
.map_err
(|
e
|
e
.to_string
())
pub
async
fn
send
(
&
self
,
data
:
Bytes
)
->
Result
<
()
>
{
Ok
(
self
.tx
.send
(
TwoPartMessage
::
from_data
(
data
))
.await
?
)
}
pub
async
fn
send_control
(
&
self
,
control
:
ControlMessage
)
->
Result
<
()
>
{
let
bytes
=
serde_json
::
to_vec
(
&
control
)
?
;
Ok
(
self
.tx
.send
(
TwoPartMessage
::
from_header
(
bytes
.into
()))
.await
?
)
}
#[allow(clippy::needless_update)]
...
...
runtime/rust/src/pipeline/network/tcp.rs
View file @
2fd6592f
...
...
@@ -32,6 +32,7 @@
pub
mod
client
;
pub
mod
server
;
use
super
::
ControlMessage
;
use
serde
::{
Deserialize
,
Serialize
};
#[allow(unused_imports)]
...
...
@@ -91,13 +92,6 @@ struct CallHomeHandshake {
stream_type
:
StreamType
,
}
#[derive(Debug,
Clone,
Serialize,
Deserialize)]
#[serde(rename_all
=
"snake_case"
)]
enum
ControlMessage
{
Stop
,
Kill
,
}
#[cfg(test)]
mod
tests
{
use
crate
::
engine
::
AsyncEngineContextProvider
;
...
...
runtime/rust/src/pipeline/network/tcp/client.rs
View file @
2fd6592f
...
...
@@ -16,8 +16,12 @@
use
std
::
sync
::
Arc
;
use
futures
::{
SinkExt
,
StreamExt
};
use
tokio
::
io
::{
ReadHalf
,
WriteHalf
};
use
tokio
::{
io
::
AsyncWriteExt
,
net
::
TcpStream
};
use
tokio
::
io
::{
AsyncReadExt
,
ReadHalf
,
WriteHalf
};
use
tokio
::{
io
::
AsyncWriteExt
,
net
::
TcpStream
,
time
::{
self
,
Duration
,
Instant
},
};
use
tokio_util
::
codec
::{
FramedRead
,
FramedWrite
};
use
super
::{
CallHomeHandshake
,
ControlMessage
,
TcpStreamConnectionInfo
};
...
...
@@ -48,7 +52,7 @@ impl TcpClient {
}
async
fn
connect
(
address
:
&
str
)
->
std
::
io
::
Result
<
TcpStream
>
{
// try to connect to the address; retry with
exponential
backoff if AddrNotAvailable
// try to connect to the address; retry with
linear
backoff if AddrNotAvailable
let
backoff
=
std
::
time
::
Duration
::
from_millis
(
200
);
loop
{
match
TcpStream
::
connect
(
address
)
.await
{
...
...
@@ -59,11 +63,6 @@ impl TcpClient {
Err
(
e
)
=>
{
if
e
.kind
()
==
std
::
io
::
ErrorKind
::
AddrNotAvailable
{
tracing
::
warn!
(
"retry warning: failed to connect: {:?}"
,
e
);
// TODO(#173) - remove with resolution of issue
#[cfg(debug_assertions)]
eprintln!
(
"retry warning: failed to connect: {:?}"
,
e
);
tokio
::
time
::
sleep
(
backoff
)
.await
;
}
else
{
return
Err
(
e
);
...
...
@@ -109,7 +108,7 @@ impl TcpClient {
// captured by the monitor task
let
(
alive_tx
,
alive_rx
)
=
tokio
::
sync
::
oneshot
::
channel
::
<
()
>
();
let
reader_task
=
tokio
::
spawn
(
handle_reader
(
framed_reader
,
context
,
alive_tx
));
let
reader_task
=
tokio
::
spawn
(
handle_reader
(
framed_reader
,
context
.clone
()
,
alive_tx
));
// transport specific handshake message
let
handshake
=
CallHomeHandshake
{
...
...
@@ -134,11 +133,11 @@ impl TcpClient {
.map_err
(|
e
|
error!
(
"failed to send handshake: {:?}"
,
e
))
?
;
// set up the channel to send bytes to the transport layer
let
(
bytes_tx
,
bytes_rx
)
=
tokio
::
sync
::
mpsc
::
channel
(
1
6
);
let
(
bytes_tx
,
bytes_rx
)
=
tokio
::
sync
::
mpsc
::
channel
(
6
4
);
// forwards the bytes send from this stream to the transport layer; hold the alive_rx half of the oneshot channel
let
writer_task
=
tokio
::
spawn
(
handle_writer
(
framed_writer
,
bytes_rx
,
alive_rx
));
let
writer_task
=
tokio
::
spawn
(
handle_writer
(
framed_writer
,
bytes_rx
,
alive_rx
,
context
));
tokio
::
spawn
(
async
move
{
// await both tasks
...
...
@@ -147,12 +146,40 @@ impl TcpClient {
match
(
reader
,
writer
)
{
(
Ok
(
reader
),
Ok
(
writer
))
=>
{
let
reader
=
reader
.into_inner
();
let
writer
=
writer
.into_inner
();
let
writer
=
match
writer
{
Ok
(
writer
)
=>
writer
.into_inner
(),
Err
(
e
)
=>
{
tracing
::
error!
(
"failed to join writer task: {:?}"
,
e
);
return
Err
(
e
);
}
};
tracing
::
debug!
(
"joining reader and writer"
);
let
mut
stream
=
reader
.unsplit
(
writer
);
// close the stream
Ok
(
stream
.shutdown
()
.await
?
)
// await the tcp server to shutdown the socket connection
// set a timeout for the server shutdown
tracing
::
debug!
(
"awaiting server shutdown"
);
let
mut
buf
=
vec!
[
0u8
;
1024
];
let
deadline
=
Instant
::
now
()
+
Duration
::
from_secs
(
10
);
loop
{
let
n
=
time
::
timeout_at
(
deadline
,
stream
.read
(
&
mut
buf
))
.await
.inspect_err
(|
_
|
{
tracing
::
debug!
(
"server did not close socket within the deadline"
);
})
?
.inspect_err
(|
e
|
{
tracing
::
debug!
(
"failed to read from stream: {:?}"
,
e
);
})
?
;
if
n
==
0
{
// Server has closed (FIN)
log
::
debug!
(
"server closed the connection"
);
break
;
}
}
Ok
(())
}
_
=>
{
tracing
::
error!
(
"failed to join reader and writer tasks"
);
...
...
@@ -193,46 +220,41 @@ async fn handle_reader(
Ok
(
msg
)
=>
msg
,
Err
(
_
)
=>
{
// TODO(#171) - address fatal errors
tracing
::
error!
(
"fatal error - invalid control message detected"
);
break
;
panic!
(
"fatal error - invalid control message detected"
);
}
};
match
msg
{
ControlMessage
::
Stop
=>
{
context
.stop
();
break
;
}
ControlMessage
::
Kill
=>
{
context
.kill
();
break
;
}
ControlMessage
::
Sentinel
=>
{
// TODO(#171) - address fatal errors
panic!
(
"received a sentinel message; this should never happen"
);
}
}
}
_
=>
{
// not a control message, so we just continue
continue
;
panic!
(
"received a non-control message; this should never happen"
);
}
}
}
Some
(
Err
(
_
))
=>
{
// TODO(#171) - address fatal errors
// in this case the binary representation of the message is invalid
tracing
::
error!
(
"fatal error - failed to decode message from stream"
);
break
;
panic!
(
"fatal error - failed to decode message from stream; invalid line protocol"
);
}
None
=>
{
// let mut writer = framed_reader.into_inner();
// if let Err(e) = writer.shutdown().await {
// tracing::trace!("failed to shutdown reader: {:?}", e);
// }
tracing
::
debug!
(
"tcp stream closed by server"
);
break
;
}
}
}
_
=
alive_tx
.closed
()
=>
{
// the channel was closed, we should stop the stream
tracing
::
debug!
(
"writer stream closed; shutting down"
);
break
;
}
}
...
...
@@ -244,19 +266,42 @@ async fn handle_writer(
mut
framed_writer
:
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
mut
bytes_rx
:
tokio
::
sync
::
mpsc
::
Receiver
<
TwoPartMessage
>
,
alive_rx
:
tokio
::
sync
::
oneshot
::
Receiver
<
()
>
,
)
->
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
{
while
let
Some
(
msg
)
=
bytes_rx
.recv
()
.await
{
context
:
Arc
<
dyn
AsyncEngineContext
>
,
)
->
Result
<
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>>
{
loop
{
let
msg
=
tokio
::
select!
{
biased
;
_
=
context
.killed
()
=>
{
tracing
::
trace!
(
"context kill signal received; shutting down"
);
break
;
}
msg
=
bytes_rx
.recv
()
=>
{
match
msg
{
Some
(
msg
)
=>
msg
,
None
=>
{
tracing
::
trace!
(
"response channel closed; shutting down"
);
break
;
}
}
}
};
if
let
Err
(
e
)
=
framed_writer
.send
(
msg
)
.await
{
tracing
::
trace!
(
"failed to send message to
stream
; possible disconnect: {:?}"
,
"failed to send message to
network
; possible disconnect: {:?}"
,
e
);
// TODO - possibly propagate the error upstream
break
;
}
}
drop
(
alive_rx
);
framed_writer
// send sentinel message
let
message
=
serde_json
::
to_vec
(
&
ControlMessage
::
Sentinel
)
?
;
let
msg
=
TwoPartMessage
::
from_header
(
message
.into
());
framed_writer
.send
(
msg
)
.await
?
;
drop
(
alive_rx
);
Ok
(
framed_writer
)
}
runtime/rust/src/pipeline/network/tcp/server.rs
View file @
2fd6592f
...
...
@@ -25,7 +25,7 @@ use tokio::sync::Mutex;
use
bytes
::
Bytes
;
use
derive_builder
::
Builder
;
use
futures
::
StreamExt
;
use
futures
::
{
SinkExt
,
StreamExt
}
;
use
local_ip_address
::{
list_afinet_netifas
,
local_ip
};
use
serde
::{
Deserialize
,
Serialize
};
use
tokio
::{
...
...
@@ -36,8 +36,8 @@ use tokio::{
use
tokio_util
::
codec
::{
FramedRead
,
FramedWrite
};
use
super
::{
CallHomeHandshake
,
PendingConnections
,
RegisteredStream
,
StreamOptions
,
StreamReceiver
,
StreamSender
,
TcpStreamConnectionInfo
,
TwoPartCodec
,
CallHomeHandshake
,
ControlMessage
,
PendingConnections
,
RegisteredStream
,
StreamOptions
,
StreamReceiver
,
StreamSender
,
TcpStreamConnectionInfo
,
TwoPartCodec
,
};
use
crate
::
engine
::
AsyncEngineContext
;
use
crate
::
pipeline
::{
...
...
@@ -304,30 +304,6 @@ async fn tcp_listener(
}
};
// TODO(#173) - alternative / not fully functional exploration for #173; removed when resolved.
// let socket = Socket::new(Domain::IPV4, Type::STREAM, None)?;
// // Set the socket options
// socket.set_reuse_address(true)?;
// socket.set_nodelay(true)?;
// let addr: SocketAddr = addr.parse()?;
// //let addr: SocketAddr = "[::1]:0".parse()?;
// socket.bind(&addr.into())?;
// socket.listen(128)?;
// let listener: TcpListener = socket.into();
// let listener = tokio::net::TcpListener::from_std(listener)?;
// let addr = listener
// .local_addr()
// .map_err(|e| anyhow::anyhow!("Failed get SocketAddr: {:?}", e))?;
// read_tx
// .send(Ok(addr.port()))
// .expect("Failed to send ready signal");
loop
{
// todo - add instrumentation
// todo - add counter for all accepted connections
...
...
@@ -351,6 +327,13 @@ async fn tcp_listener(
}
}
match
stream
.set_linger
(
Some
(
std
::
time
::
Duration
::
from_secs
(
0
)))
{
Ok
(
_
)
=>
(),
Err
(
e
)
=>
{
tracing
::
warn!
(
"failed to set tcp stream to linger: {}"
,
e
);
}
}
tokio
::
spawn
(
handle_connection
(
stream
,
state
.clone
()));
}
...
...
@@ -461,133 +444,171 @@ async fn tcp_listener(
}
// we need to know the buffer size from the registration options; add this to the RequestRecvConnection object
let
(
tx
,
rx
)
=
mpsc
::
channel
(
1
6
);
let
(
response_tx
,
response_
rx
)
=
mpsc
::
channel
(
6
4
);
if
connection
.send
(
Ok
(
crate
::
pipeline
::
network
::
StreamReceiver
{
rx
}))
.send
(
Ok
(
crate
::
pipeline
::
network
::
StreamReceiver
{
rx
:
response_rx
,
}))
.is_err
()
{
return
Err
(
error!
(
"The requester of the stream has been dropped before the connection was established"
));
}
let
(
alive_tx
,
alive_rx
)
=
mpsc
::
channel
::
<
()
>
(
1
);
let
(
control_tx
,
_
control_rx
)
=
mpsc
::
channel
::
<
Bytes
>
(
8
);
let
(
control_tx
,
control_rx
)
=
mpsc
::
channel
::
<
ControlMessage
>
(
1
);
// monitor task
// if the context is cancelled, we need to forward the message across the transport layer
// we only determine the forwarding task on a kill signal, on a stop signal, we issue the stop signal, then await for the producer
// to naturally close the stream
let
monitor_task
=
tokio
::
spawn
(
monitor
(
writer
,
context
.clone
(),
alive_tx
));
// sender task
// issues control messages to the sender and when finished shuts down the socket
// this should be the last task to finish and must
let
send_task
=
tokio
::
spawn
(
network_send_handler
(
writer
,
control_rx
));
// forward task
let
forward
_task
=
tokio
::
spawn
(
handle_response_stream
(
let
recv
_task
=
tokio
::
spawn
(
network_receive_handler
(
reader
,
tx
,
response_
tx
,
control_tx
,
context
.clone
(),
alive_rx
,
));
// check the results of each of the tasks
let
(
monitor_result
,
forward_result
)
=
tokio
::
join!
(
monitor_task
,
forward
_task
);
let
(
monitor_result
,
forward_result
)
=
tokio
::
join!
(
send_task
,
recv
_task
);
monitor_result
?
?
;
forward_result
?
?
;
monitor_result
?
;
forward_result
?
;
Ok
(())
}
async
fn
handle_response_stream
(
async
fn
network_receive_handler
(
mut
framed_reader
:
FramedRead
<
tokio
::
io
::
ReadHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
response_tx
:
mpsc
::
Sender
<
Bytes
>
,
control_tx
:
mpsc
::
Sender
<
Bytes
>
,
control_tx
:
mpsc
::
Sender
<
ControlMessage
>
,
context
:
Arc
<
dyn
AsyncEngineContext
>
,
alive_rx
:
mpsc
::
Receiver
<
()
>
,
)
->
Result
<
()
>
{
)
{
// loop over reading the tcp stream and checking if the writer is closed
let
mut
can_stop
=
true
;
loop
{
tokio
::
select!
{
biased
;
_
=
response_tx
.closed
()
=>
{
tracing
::
trace!
(
"response channel closed before the client finished writing data"
);
control_tx
.send
(
ControlMessage
::
Kill
)
.await
.expect
(
"the control channel should not be closed"
);
break
;
}
_
=
context
.killed
()
=>
{
tracing
::
trace!
(
"context kill signal received; shutting down"
);
control_tx
.send
(
ControlMessage
::
Kill
)
.await
.expect
(
"the control channel should not be closed"
);
break
;
}
_
=
context
.stopped
(),
if
can_stop
=>
{
can_stop
=
false
;
control_tx
.send
(
ControlMessage
::
Stop
)
.await
.expect
(
"the control channel should not be closed"
);
}
msg
=
framed_reader
.next
()
=>
{
match
msg
{
Some
(
Ok
(
msg
))
=>
{
let
(
header
,
data
)
=
msg
.into_parts
();
if
!
header
.is_empty
()
&&
(
control_tx
.send
(
header
)
.await
)
.is_err
()
{
tracing
::
trace!
(
"Control channel closed"
)
// received a control message
if
!
header
.is_empty
()
{
match
process_control_message
(
header
)
{
Ok
(
ControlAction
::
Continue
)
=>
{}
Ok
(
ControlAction
::
Shutdown
)
=>
{
assert!
(
data
.is_empty
(),
"received sentinel message with data; this should never happen"
);
tracing
::
trace!
(
"received sentinel message; shutting down"
);
break
;
}
Err
(
e
)
=>
{
// TODO(#171) - address fatal errors
panic!
(
"{:?}"
,
e
);
}
}
}
if
!
data
.is_empty
()
{
if
let
Err
(
err
)
=
response_tx
.send
(
data
)
.await
{
return
Err
(
error!
(
"handle_response_stream: Failed sending to response_tx: {err}"
));
tracing
::
debug!
(
"forwarding body/data message to response channel failed: {}"
,
err
);
control_tx
.send
(
ControlMessage
::
Kill
)
.await
.expect
(
"the control channel should not be closed"
);
break
;
};
}
}
Some
(
Err
(
e
))
=>
{
return
Err
(
error!
(
"Failed to read TwoPartCodec message from TcpStream: {}"
,
e
));
Some
(
Err
(
_
))
=>
{
// TODO(#171) - address fatal errors
panic!
(
"invalid message issued over socket; this should never happen"
);
}
None
=>
{
tracing
::
trace!
(
"TcpStream closed naturally"
);
// this is allowed but we try to avoid it
// the logic is that the client will tell us when its is done and the server
// will close the connection naturally when the sentinel message is received
// the client closing early represents a transport error outside the control of the
// transport library
tracing
::
trace!
(
"tcp stream was closed by client"
);
break
;
}
}
}
_
=
response_tx
.closed
()
=>
{
break
;
}
_
=
context
.killed
()
=>
{
break
;
}
}
}
drop
(
alive_rx
);
Ok
(())
}
#[allow(dead_code)]
async
fn
handle_control_message
(
mut
control_rx
:
mpsc
::
Receiver
<
Bytes
>
,
context
:
Arc
<
dyn
AsyncEngineContext
>
,
alive_tx
:
mpsc
::
Sender
<
()
>
,
)
->
Result
<
(),
String
>
{
loop
{
tokio
::
select!
{
msg
=
control_rx
.recv
()
=>
{
match
msg
{
Some
(
_
msg
)
=>
{
// handle control message
}
None
=>
{
tracing
::
trace!
(
"Control channel closed"
);
break
;
async
fn
network_send_handler
(
socket_tx
:
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
control_rx
:
mpsc
::
Receiver
<
ControlMessage
>
,
)
{
let
mut
socket_tx
=
socket_tx
;
let
mut
control_rx
=
control_rx
;
while
let
Some
(
control_msg
)
=
control_rx
.recv
()
.await
{
assert_ne!
(
control_msg
,
ControlMessage
::
Sentinel
,
"received sentinel message; this should never happen"
);
let
bytes
=
serde_json
::
to_vec
(
&
control_msg
)
.expect
(
"failed to serialize control message"
);
let
message
=
TwoPartMessage
::
from_header
(
bytes
.into
());
match
socket_tx
.send
(
message
)
.await
{
Ok
(
_
)
=>
tracing
::
debug!
(
"issued control message {control_msg:?} to sender"
),
Err
(
_
)
=>
{
tracing
::
debug!
(
"failed to send control message {control_msg:?} to sender"
)
}
}
}
_
=
context
.killed
()
=>
{
break
;
}
let
mut
inner
=
socket_tx
.into_inner
();
if
let
Err
(
e
)
=
inner
.flush
()
.await
{
tracing
::
debug!
(
"failed to flush socket: {}"
,
e
);
}
if
let
Err
(
e
)
=
inner
.shutdown
()
.await
{
tracing
::
debug!
(
"failed to shutdown socket: {}"
,
e
);
}
drop
(
alive_tx
);
Ok
(())
}
}
async
fn
monitor
(
_
socket_tx
:
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
ctx
:
Arc
<
dyn
AsyncEngineContext
>
,
alive_tx
:
mpsc
::
Sender
<
()
>
,
)
->
Result
<
()
>
{
let
alive_tx
=
alive_tx
;
tokio
::
select!
{
_
=
ctx
.stopped
()
=>
{
// send cancellation message
panic!
(
"impl cancellation signal"
);
}
_
=
alive_tx
.closed
()
=>
{
tracing
::
trace!
(
"response stream closed naturally"
)
enum
ControlAction
{
Continue
,
Shutdown
,
}
fn
process_control_message
(
message
:
Bytes
)
->
Result
<
ControlAction
>
{
match
serde_json
::
from_slice
::
<
ControlMessage
>
(
&
message
)
?
{
ControlMessage
::
Sentinel
=>
{
// the client issued a sentinel message
// it has finished writing data and is now awaiting the server to close the connection
tracing
::
trace!
(
"sentinel received; shutting down"
);
Ok
(
ControlAction
::
Shutdown
)
}
ControlMessage
::
Kill
|
ControlMessage
::
Stop
=>
{
// TODO(#171) - address fatal errors
anyhow
::
bail!
(
"fatal error - unexpected control message received - this should never happen"
);
}
let
framed_writer
=
_
socket_tx
;
let
mut
inner
=
framed_writer
.into_inner
();
inner
.flush
()
.await
?
;
inner
.shutdown
()
.await
?
;
Ok
(())
}
}
runtime/rust/src/transports.rs
View file @
2fd6592f
...
...
@@ -21,3 +21,4 @@
pub
mod
etcd
;
pub
mod
nats
;
pub
mod
tcp
;
pub
mod
zmq
;
runtime/rust/src/transports/zmq.rs
0 → 100644
View file @
2fd6592f
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
//! ZMQ Transport
//!
//! This module provides a ZMQ transport for the [crate::DistributedRuntime].
//!
//! Currently, the [Server] consists of a [async_zmq::Router] and the [Client] leverages
//! a [async_zmq::Dealer].
//!
//! The distributed service pattern we will use is based on the Harmony pattern described in
//! [Chapter 8: A Framework for Distributed Computing](https://zguide.zeromq.org/docs/chapter8/#True-Peer-Connectivity-Harmony-Pattern).
//!
//! This is similar to the TCP implementation; however, the TCP implementation used a direct
//! connection between the client and server per stream. The ZMQ transport will enable the
//! equivalent of a connection pool per upstream service at the cost of needing an extra internal
//! routing step per service endpoint.
use
anyhow
::{
anyhow
,
Result
};
use
async_zmq
::{
Context
,
Dealer
,
Router
,
Sink
,
SinkExt
,
StreamExt
};
use
bytes
::
Bytes
;
use
derive_getters
::
Dissolve
;
use
futures
::
TryStreamExt
;
use
serde
::{
Deserialize
,
Serialize
};
use
std
::{
collections
::
HashMap
,
os
::
fd
::
FromRawFd
,
sync
::
Arc
,
time
::
Duration
,
vec
::
IntoIter
};
use
tokio
::{
sync
::{
mpsc
,
Mutex
},
task
::{
JoinError
,
JoinHandle
},
};
use
tokio_util
::
sync
::
CancellationToken
;
use
tracing
as
log
;
// Core message types
#[derive(Debug,
Clone,
Serialize,
Deserialize)]
enum
ControlMessage
{
Cancel
{
request_id
:
String
},
CancelAck
{
request_id
:
String
},
Error
{
request_id
:
String
,
error
:
String
},
Complete
{
request_id
:
String
},
}
#[derive(Debug,
Clone,
Serialize,
Deserialize)]
enum
MessageType
{
Data
(
Vec
<
u8
>
),
Control
(
ControlMessage
),
}
enum
StreamAction
{
SendEager
(
usize
),
SendDelayed
(
usize
),
Close
,
}
// Router state management
struct
RouterState
{
active_streams
:
HashMap
<
String
,
mpsc
::
Sender
<
Bytes
>>
,
control_channels
:
HashMap
<
String
,
mpsc
::
Sender
<
ControlMessage
>>
,
}
impl
RouterState
{
fn
new
()
->
Self
{
Self
{
active_streams
:
HashMap
::
new
(),
control_channels
:
HashMap
::
new
(),
}
}
fn
register_stream
(
&
mut
self
,
request_id
:
String
,
data_tx
:
mpsc
::
Sender
<
Bytes
>
,
control_tx
:
mpsc
::
Sender
<
ControlMessage
>
,
)
{
self
.active_streams
.insert
(
request_id
.clone
(),
data_tx
);
self
.control_channels
.insert
(
request_id
,
control_tx
);
}
fn
remove_stream
(
&
mut
self
,
request_id
:
&
str
)
{
self
.active_streams
.remove
(
request_id
);
self
.control_channels
.remove
(
request_id
);
}
}
// Server implementation
#[derive(Clone,
Dissolve)]
pub
struct
Server
{
state
:
Arc
<
Mutex
<
RouterState
>>
,
cancel_token
:
CancellationToken
,
fd
:
i32
,
}
impl
Server
{
/// Create a new [Server] which is a [async_zmq::Router] with the given [async_zmq::Context] and address to bind
/// the ZMQ [async_zmq::Router] socket.
///
/// If the event loop processing the router fails with an error, the signal is propagated through the [CancellationToken]
/// by issuing a [CancellationToken::cancel].
///
/// The [Server] is how you interact with the running instance.
///
/// The [ServerExecutionHandle] is the handle for background task executing the [Server].
pub
async
fn
new
(
context
:
&
Context
,
address
:
&
str
,
cancel_token
:
CancellationToken
,
)
->
Result
<
(
Self
,
ServerExecutionHandle
)
>
{
let
router
=
async_zmq
::
router
(
address
)
?
.with_context
(
context
)
.bind
()
?
;
let
fd
=
router
.as_raw_socket
()
.get_fd
()
?
;
let
state
=
Arc
::
new
(
Mutex
::
new
(
RouterState
::
new
()));
// can cancel the router's event loop
let
child
=
cancel_token
.child_token
();
let
primary_task
=
tokio
::
spawn
(
Self
::
run
(
router
,
state
.clone
(),
child
.child_token
()));
// this task captures the primary cancellation token, so if an error occurs, we can cancel the router's event loop
// but we also propagate the error to the caller's cancellation token
let
watch_task
=
tokio
::
spawn
(
async
move
{
let
result
=
primary_task
.await
.inspect_err
(|
e
|
{
log
::
error!
(
"zmq server/router task failed: {}"
,
e
);
cancel_token
.cancel
();
})
?
;
result
.inspect_err
(|
e
|
{
log
::
error!
(
"zmq server/router task failed: {}"
,
e
);
cancel_token
.cancel
();
})
});
let
handle
=
ServerExecutionHandle
{
task
:
watch_task
,
cancel_token
:
child
.clone
(),
};
Ok
((
Self
{
state
,
cancel_token
:
child
,
fd
,
},
handle
,
))
}
// pub async fn register_stream(&)
async
fn
run
(
router
:
Router
<
IntoIter
<
Vec
<
u8
>>
,
Vec
<
u8
>>
,
state
:
Arc
<
Mutex
<
RouterState
>>
,
token
:
CancellationToken
,
)
->
Result
<
()
>
{
let
mut
router
=
router
;
// todo - move this into the Server impl to discover the os port being used
// let fd = router.as_raw_socket().get_fd()?;
// let sock = unsafe { socket2::Socket::from_raw_fd(fd) };
// let addr = sock.local_addr()?;
// let port = addr.as_socket().map(|s| s.port());
// if let Some(port) = port {
// log::info!("Server listening on port {}", port);
// }
loop
{
let
frames
=
tokio
::
select!
{
biased
;
frames
=
router
.next
()
=>
{
match
frames
{
Some
(
Ok
(
frames
))
=>
{
frames
},
Some
(
Err
(
e
))
=>
{
log
::
warn!
(
"Error receiving message: {}"
,
e
);
continue
;
}
None
=>
break
,
}
}
_
=
token
.cancelled
()
=>
{
log
::
info!
(
"Server shutting down"
);
break
;
}
};
// we should have at least 3 frames
// 0: identity
// 1: request_id
// 2: message type
// if the contract is broken, we should exit
if
frames
.len
()
!=
3
{
anyhow
::
bail!
(
"Fatal Error -- Broken contract -- Expected 3 frames, got {}"
,
frames
.len
()
);
}
let
request_id
=
String
::
from_utf8_lossy
(
&
frames
[
1
])
.to_string
();
let
message
=
frames
[
2
]
.to_vec
();
let
message_size
=
message
.len
();
if
let
Some
(
tx
)
=
state
.lock
()
.await
.active_streams
.get
(
&
request_id
)
{
// first we try to send the data eagerly without blocking
let
action
=
match
tx
.try_send
(
message
.into
())
{
Ok
(
_
)
=>
{
log
::
trace!
(
request_id
,
"response data sent eagerly to stream: {} bytes"
,
message_size
);
StreamAction
::
SendEager
(
message_size
)
}
Err
(
e
)
=>
match
e
{
mpsc
::
error
::
TrySendError
::
Closed
(
_
)
=>
{
log
::
info!
(
request_id
,
"response stream was closed"
);
StreamAction
::
Close
}
mpsc
::
error
::
TrySendError
::
Full
(
data
)
=>
{
log
::
warn!
(
request_id
,
"response stream is full; backpressue alert"
);
// todo - add timeout - we are blocking all other streams
if
(
tx
.send
(
data
)
.await
)
.is_err
()
{
StreamAction
::
Close
}
else
{
StreamAction
::
SendDelayed
(
message_size
)
}
}
},
};
match
action
{
StreamAction
::
SendEager
(
_
size
)
=>
{
// increment bytes_received
// increment messages_received
// increment eager_messages_received
}
StreamAction
::
SendDelayed
(
_
size
)
=>
{
// increment bytes_received
// increment messages_received
// increment delayed_messages_received
}
StreamAction
::
Close
=>
{
state
.lock
()
.await
.active_streams
.remove
(
&
request_id
);
}
}
}
else
{
// increment bytes_dropped
// increment messages_dropped
log
::
trace!
(
request_id
,
"no active stream for request_id"
);
}
}
Ok
(())
}
}
/// The [ServerExecutionHandle] is the handle for background task executing the [Server].
///
/// You can use this to check if the server is finished or cancelled.
///
/// You can also join on the task to wait for it to finish.
pub
struct
ServerExecutionHandle
{
task
:
JoinHandle
<
Result
<
()
>>
,
cancel_token
:
CancellationToken
,
}
impl
ServerExecutionHandle
{
/// Check if the task awaiting on the [Server]s background event loop has finished.
pub
fn
is_finished
(
&
self
)
->
bool
{
self
.task
.is_finished
()
}
/// Check if the server's event loop has been cancelled.
pub
fn
is_cancelled
(
&
self
)
->
bool
{
self
.cancel_token
.is_cancelled
()
}
/// Cancel the server's event loop.
///
/// This will signal the server to stop processing requests and exit.
///
/// This will not wait for the server to finish, it will exit immediately.
///
/// This will not propagate to the [CancellationToken] used to start the [Server]
/// unless an error happens during the shutdown process.
pub
fn
cancel
(
&
self
)
{
self
.cancel_token
.cancel
();
}
/// Join on the task awaiting on the [Server]s background event loop.
///
/// This will return the result of the [Server]s background event loop.
pub
async
fn
join
(
self
)
->
Result
<
()
>
{
self
.task
.await
?
}
}
// Client implementation
struct
Client
{
dealer
:
Dealer
<
IntoIter
<
Vec
<
u8
>>
,
Vec
<
u8
>>
,
}
impl
Client
{
fn
new
(
context
:
&
Context
,
address
:
&
str
)
->
Result
<
Self
>
{
let
dealer
=
async_zmq
::
dealer
(
address
)
?
.with_context
(
context
)
.connect
()
?
;
Ok
(
Self
{
dealer
})
}
fn
dealer
(
&
mut
self
)
->
&
mut
Dealer
<
IntoIter
<
Vec
<
u8
>>
,
Vec
<
u8
>>
{
&
mut
self
.dealer
}
// async fn send_data(&self, data: Vec<u8>) -> Result<()> {
// let msg_type = MessageType::Data(data);
// let type_bytes = serde_json::to_vec(&msg_type)?;
// self.dealer
// .send_multipart(&[type_bytes, self.request_id.as_bytes().to_vec()])
// .await
// .map_err(|e| anyhow!("Failed to send data: {}", e))
// }
// async fn send_control(&self, msg: ControlMessage) -> Result<()> {
// let msg_type = MessageType::Control(msg);
// let type_bytes = serde_json::to_vec(&msg_type)?;
// self.dealer
// .send_multipart(&[type_bytes])
// .await
// .map_err(|e| anyhow!("Failed to send control message: {}", e))
// }
// async fn receive(&self) -> Result<MessageType> {
// let frames = self
// .dealer
// .recv_multipart()
// .await
// .map_err(|e| anyhow!("Failed to receive message: {}", e))?;
// if frames.is_empty() {
// return Err(anyhow!("Received empty message"));
// }
// serde_json::from_slice(&frames[0])
// .map_err(|e| anyhow!("Failed to deserialize message: {}", e))
}
#[cfg(test)]
mod
tests
{
use
super
::
*
;
use
tokio
::
time
::
timeout
;
#[tokio::test]
async
fn
test_basic_communication
()
->
Result
<
()
>
{
let
context
=
Context
::
new
();
let
address
=
"tcp://127.0.0.1:1337"
;
let
token
=
CancellationToken
::
new
();
// Start server
let
(
server
,
handle
)
=
Server
::
new
(
&
context
,
address
,
token
.clone
())
.await
?
;
let
state
=
server
.state
.clone
();
let
id
=
"test-request"
.to_string
();
let
(
tx
,
mut
rx
)
=
tokio
::
sync
::
mpsc
::
channel
(
512
);
state
.lock
()
.await
.active_streams
.insert
(
id
.clone
(),
tx
);
// Create client
let
mut
client
=
Client
::
new
(
&
context
,
address
)
?
;
client
.dealer
()
.send
(
vec!
[
id
.as_bytes
()
.to_vec
(),
id
.as_bytes
()
.to_vec
()]
.into
())
.await
?
;
let
receive_result
=
rx
.recv
()
.await
;
let
received
=
receive_result
.unwrap
();
// convert to string
let
received_str
=
String
::
from_utf8_lossy
(
&
received
)
.to_string
();
assert_eq!
(
received_str
,
"test-request"
);
client
.dealer
()
.close
()
.await
?
;
handle
.cancel
();
handle
.join
()
.await
?
;
println!
(
"done"
);
Ok
(())
}
// #[tokio::test]
// async fn test_multiple_streams() -> Result<()> {
// // Similar to above but with multiple clients/streams
// Ok(())
// }
// #[tokio::test]
// async fn test_error_handling() -> Result<()> {
// // Test various error conditions
// Ok(())
// }
}
runtime/rust/tests/soak.rs
View file @
2fd6592f
...
...
@@ -22,6 +22,7 @@ mod integration {
use
std
::{
sync
::
Arc
,
time
::
Duration
};
use
tokio
::
time
::
Instant
;
use
triton_distributed
::{
logging
,
pipeline
::{
async_trait
,
network
::
Ingress
,
AsyncEngine
,
AsyncEngineContextProvider
,
Error
,
ManyOut
,
ResponseStream
,
SingleIn
,
...
...
@@ -32,7 +33,7 @@ mod integration {
#[test]
fn
main
()
->
Result
<
()
>
{
env_
logg
er
::
init
();
logg
ing
::
init
();
let
worker
=
Worker
::
from_settings
()
?
;
worker
.execute
(
app
)
}
...
...
@@ -103,8 +104,8 @@ mod integration {
let
run_duration
=
humantime
::
parse_duration
(
&
run_duration
)
.unwrap_or
(
Duration
::
from_secs
(
60
));
let
batch_load
=
std
::
env
::
var
(
"TRD_SOAK_BATCH_LOAD"
)
.unwrap_or
(
"1000"
.to_string
());
let
batch_load
:
usize
=
batch_load
.parse
()
.unwrap_or
(
1000
);
let
batch_load
=
std
::
env
::
var
(
"TRD_SOAK_BATCH_LOAD"
)
.unwrap_or
(
"1000
0
"
.to_string
());
let
batch_load
:
usize
=
batch_load
.parse
()
.unwrap_or
(
1000
0
);
let
client
=
runtime
.namespace
(
DEFAULT_NAMESPACE
)
?
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment