"deploy/git@developer.sourcefind.cn:OpenDAS/dynamo.git" did not exist on "602352ce190bcb02013c62c2337e8b8678015699"
Commit c0e008b4 authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

fix: async access to gil updates; notes on perf (#159)

parent c2a9636c
...@@ -318,18 +318,18 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" ...@@ -318,18 +318,18 @@ checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b"
[[package]] [[package]]
name = "bytes" name = "bytes"
version = "1.9.0" version = "1.10.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "325918d6fe32f23b19878fe4b34794ae41fc19ddbe53b10571a4874d44ffd39b" checksum = "f61dac84819c6588b558454b194026eb1f09c293b9036ae9b159e74e73ab6cf9"
dependencies = [ dependencies = [
"serde", "serde",
] ]
[[package]] [[package]]
name = "cc" name = "cc"
version = "1.2.11" version = "1.2.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e4730490333d58093109dc02c23174c3f4d490998c3fed3cc8e82d57afedb9cf" checksum = "c7777341816418c02e033934a09f20dc0ccaf65a5201ef8a450ae0105a573fda"
dependencies = [ dependencies = [
"shlex", "shlex",
] ]
...@@ -348,9 +348,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724" ...@@ -348,9 +348,9 @@ checksum = "613afe47fcd5fac7ccf1db93babcb082c5994d996f20b8b159f2ad1658eb5724"
[[package]] [[package]]
name = "clap" name = "clap"
version = "4.5.27" version = "4.5.28"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "769b0145982b4b48713e01ec42d61614425f27b7058bda7180a3a41f30104796" checksum = "3e77c3243bd94243c03672cb5154667347c457ca271254724f9f393aee1c05ff"
dependencies = [ dependencies = [
"clap_builder", "clap_builder",
] ]
...@@ -514,9 +514,9 @@ dependencies = [ ...@@ -514,9 +514,9 @@ dependencies = [
[[package]] [[package]]
name = "data-encoding" name = "data-encoding"
version = "2.7.0" version = "2.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e60eed09d8c01d3cee5b7d30acb059b76614c918fa0f992e0dd6eeb10daad6f" checksum = "575f75dfd25738df5b91b8e43e14d44bda14637a58fae779fd2b064f8bf3e010"
[[package]] [[package]]
name = "der" name = "der"
...@@ -1522,18 +1522,18 @@ dependencies = [ ...@@ -1522,18 +1522,18 @@ dependencies = [
[[package]] [[package]]
name = "pin-project" name = "pin-project"
version = "1.1.8" version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1e2ec53ad785f4d35dac0adea7f7dc6f1bb277ad84a680c7afefeae05d1f5916" checksum = "dfe2e71e1471fe07709406bf725f710b02927c9c54b2b5b2ec0e8087d97c327d"
dependencies = [ dependencies = [
"pin-project-internal", "pin-project-internal",
] ]
[[package]] [[package]]
name = "pin-project-internal" name = "pin-project-internal"
version = "1.1.8" version = "1.1.9"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d56a66c0c55993aa927429d0f8a0abfd74f084e4d9c192cffed01e418d83eefb" checksum = "f6e859e6e5bd50440ab63c47e3ebabc90f26251f7c73c3d3e837b74a1cc3fa67"
dependencies = [ dependencies = [
"proc-macro2", "proc-macro2",
"quote", "quote",
...@@ -2459,9 +2459,9 @@ dependencies = [ ...@@ -2459,9 +2459,9 @@ dependencies = [
[[package]] [[package]]
name = "toml" name = "toml"
version = "0.8.19" version = "0.8.20"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a1ed1f98e3fdc28d6d910e6737ae6ab1a93bf1985935a1193e68f93eeb68d24e" checksum = "cd87a5cdd6ffab733b2f74bc4fd7ee5fff6634124999ac278c35fc78c6120148"
dependencies = [ dependencies = [
"serde", "serde",
"serde_spanned", "serde_spanned",
...@@ -2480,9 +2480,9 @@ dependencies = [ ...@@ -2480,9 +2480,9 @@ dependencies = [
[[package]] [[package]]
name = "toml_edit" name = "toml_edit"
version = "0.22.23" version = "0.22.24"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02a8b472d1a3d7c18e2d61a489aee3453fd9031c33e4f55bd533f4a7adca1bee" checksum = "17b4795ff5edd201c7cd6dca065ae59972ce77d1b80fa0a84d94950ece7d1474"
dependencies = [ dependencies = [
"indexmap 2.7.1", "indexmap 2.7.1",
"serde", "serde",
...@@ -2749,11 +2749,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" ...@@ -2749,11 +2749,11 @@ checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
[[package]] [[package]]
name = "uuid" name = "uuid"
version = "1.12.1" version = "1.13.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b3758f5e68192bb96cc8f9b7e2c2cfdabb435499a28499a42f8f984092adad4b" checksum = "ced87ca4be083373936a67f8de945faa23b6b42384bd5b64434850802c6dccd0"
dependencies = [ dependencies = [
"getrandom 0.2.15", "getrandom 0.3.1",
"serde", "serde",
] ]
...@@ -2901,9 +2901,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" ...@@ -2901,9 +2901,9 @@ checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec"
[[package]] [[package]]
name = "winnow" name = "winnow"
version = "0.7.0" version = "0.7.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7e49d2d35d3fad69b39b94139037ecfb4f359f08958b9c11e7315ce770462419" checksum = "59690dea168f2198d1a3b0cac23b8063efcd11012f10ae4698f284808c8ef603"
dependencies = [ dependencies = [
"memchr", "memchr",
] ]
......
...@@ -80,3 +80,21 @@ one server instance above, you should see the requests from the client being ...@@ -80,3 +80,21 @@ one server instance above, you should see the requests from the client being
distributed across the server instances in each server's output. If only one distributed across the server instances in each server's output. If only one
server instance is started, you should see the requests go to that server server instance is started, you should see the requests go to that server
each time. each time.
# Performance
The performance impacts of synchrononizing the Python and Rust async runtimes
is a critical consideration when optimizing the performance of a highly
concurrent and parallel distributed system.
The Python GIL is a global critical section and is ultimately the death of
parallelism. To compound that, when Rust async futures become ready,
accessing the GIL on those async event loop needs to be considered carefully.
Under high load, accessing the GIL or performing CPU intenstive tasks on
on the event loop threads can starve out other async tasks for CPU resources.
However, performing a `tokio::task::spawn_blocking` is not without overheads
as well.
If bouncing many small message back-and-forth between the Python and Rust
event loops where Rust requires GIL access, this is pattern where moving the
code from Python to Rust will give you significant gains.
...@@ -13,6 +13,8 @@ ...@@ -13,6 +13,8 @@
// See the License for the specific language governing permissions and // See the License for the specific language governing permissions and
// limitations under the License. // limitations under the License.
use std::sync::Arc;
pub use serde::{Deserialize, Serialize}; pub use serde::{Deserialize, Serialize};
pub use triton_distributed::{ pub use triton_distributed::{
error, error,
...@@ -86,8 +88,8 @@ enum ResponseProcessingError { ...@@ -86,8 +88,8 @@ enum ResponseProcessingError {
#[pyclass] #[pyclass]
#[derive(Clone)] #[derive(Clone)]
pub struct PythonAsyncEngine { pub struct PythonAsyncEngine {
generator: PyObject, generator: Arc<PyObject>,
event_loop: PyObject, event_loop: Arc<PyObject>,
} }
#[pymethods] #[pymethods]
...@@ -105,8 +107,8 @@ impl PythonAsyncEngine { ...@@ -105,8 +107,8 @@ impl PythonAsyncEngine {
#[new] #[new]
pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> { pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> {
Ok(PythonAsyncEngine { Ok(PythonAsyncEngine {
generator, generator: Arc::new(generator),
event_loop, event_loop: Arc::new(event_loop),
}) })
} }
} }
...@@ -130,12 +132,18 @@ where ...@@ -130,12 +132,18 @@ where
// Create a channel to communicate between the Python thread and the Rust async context // Create a channel to communicate between the Python thread and the Rust async context
let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128); let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128);
let stream = Python::with_gil(|py| { let generator = self.generator.clone();
let py_request = pythonize(py, &request)?; let event_loop = self.event_loop.clone();
let gen = self.generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(self.event_loop.bind(py).clone()); let stream = tokio::task::spawn_blocking(move || {
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py)) Python::with_gil(|py| {
})?; let py_request = pythonize(py, &request)?;
let gen = generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})
})
.await??;
let stream = Box::pin(stream); let stream = Box::pin(stream);
......
...@@ -55,7 +55,7 @@ async def client_init(runtime: DistributedRuntime, ns: str): ...@@ -55,7 +55,7 @@ async def client_init(runtime: DistributedRuntime, ns: str):
# Issue many concurrent requests to put load on the server, # Issue many concurrent requests to put load on the server,
# the task should issue the request and process the response # the task should issue the request and process the response
tasks = [] tasks = []
for i in range(10000): for i in range(20000):
tasks.append(asyncio.create_task(do_one(client))) tasks.append(asyncio.create_task(do_one(client)))
await asyncio.gather(*tasks) await asyncio.gather(*tasks)
......
...@@ -15,6 +15,7 @@ ...@@ -15,6 +15,7 @@
use anyhow::Result; use anyhow::Result;
use async_nats::client::Client; use async_nats::client::Client;
use tracing as log;
use super::*; use super::*;
...@@ -85,7 +86,7 @@ where ...@@ -85,7 +86,7 @@ where
U: Data + for<'de> Deserialize<'de>, U: Data + for<'de> Deserialize<'de>,
{ {
async fn generate(&self, request: SingleIn<AddressedRequest<T>>) -> Result<ManyOut<U>, Error> { async fn generate(&self, request: SingleIn<AddressedRequest<T>>) -> Result<ManyOut<U>, Error> {
let id = request.context().id().to_string(); let request_id = request.context().id().to_string();
let (addressed_request, context) = request.transfer(()); let (addressed_request, context) = request.transfer(());
let (request, address) = addressed_request.into_parts(); let (request, address) = addressed_request.into_parts();
let engine_ctx = context.context(); let engine_ctx = context.context();
...@@ -140,9 +141,9 @@ where ...@@ -140,9 +141,9 @@ where
} }
}; };
tracing::trace!( log::trace!(
"[req: {}] packaging two-part message; ctrl: {} bytes, data: {} bytes", request_id,
id, "packaging two-part message; ctrl: {} bytes, data: {} bytes",
ctrl.len(), ctrl.len(),
data.len() data.len()
); );
...@@ -157,7 +158,7 @@ where ...@@ -157,7 +158,7 @@ where
// TRANSPORT ABSTRACT REQUIRED - END HERE // TRANSPORT ABSTRACT REQUIRED - END HERE
tracing::trace!("[req: {}] enqueueing two-part message to nats", id); log::trace!(request_id, "enqueueing two-part message to nats");
// we might need to add a timeout on this if there is no subscriber to the subject; however, I think nats // we might need to add a timeout on this if there is no subscriber to the subject; however, I think nats
// will handle this for us // will handle this for us
...@@ -166,7 +167,7 @@ where ...@@ -166,7 +167,7 @@ where
.request(address.to_string(), buffer) .request(address.to_string(), buffer)
.await?; .await?;
tracing::trace!("[req: {}] awaiting transport handshake", id); log::trace!(request_id, "awaiting transport handshake");
let response_stream = response_stream_provider let response_stream = response_stream_provider
.await .await
.map_err(|_| PipelineError::DetatchedStreamReceiver)? .map_err(|_| PipelineError::DetatchedStreamReceiver)?
...@@ -179,7 +180,7 @@ where ...@@ -179,7 +180,7 @@ where
Ok(r) => Some(r), Ok(r) => Some(r),
Err(err) => { Err(err) => {
let json_str = String::from_utf8_lossy(&msg); let json_str = String::from_utf8_lossy(&msg);
tracing::error!(%err, %json_str, "Failed deserializing JSON to response"); log::warn!(%err, %json_str, "Failed deserializing JSON to response");
None None
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment