Commit 03b0101e authored by Ryan Olson's avatar Ryan Olson Committed by GitHub
Browse files

feat: add python bindings + wheel build (#94)


Co-authored-by: default avatarRyan McCormick <rmccormick@nvidia.com>
Co-authored-by: default avatarNeelay Shah <neelays@nvidia.com>
parent ffbc06cc
import asyncio
from protocol import Request
from triton_distributed_rs import DistributedRuntime, triton_worker
@triton_worker()
async def worker(runtime: DistributedRuntime):
"""
Instantiate a `backend` client and call the `generate` endpoint
"""
# get endpoint
endpoint = (
runtime.namespace("triton-init").component("backend").endpoint("generate")
)
# create client
client = await endpoint.client()
# list the endpoints
print(client.endpoint_ids())
# issue request
stream = await client.generate(Request(data="hello world").model_dump_json())
# process response
async for char in stream:
print(char)
asyncio.run(worker())
from pydantic import BaseModel
class Request(BaseModel):
data: str
class Response(BaseModel):
char: str
import asyncio
import uvloop
from protocol import Request, Response
from triton_distributed_rs import DistributedRuntime, triton_endpoint, triton_worker
uvloop.install()
class RequestHandler:
"""
Request handler for the generate endpoint
"""
@triton_endpoint(Request, Response)
async def generate(self, request):
for char in request.data:
yield char
@triton_worker()
async def worker(runtime: DistributedRuntime):
"""
Instantiate a `backend` component and serve the `generate` endpoint
A `Component` can serve multiple endpoints
"""
component = runtime.namespace("triton-init").component("backend")
await component.create_service()
endpoint = component.endpoint("generate")
await endpoint.serve_endpoint(RequestHandler().generate)
asyncio.run(worker())
[project]
name = "triton-distributed-rs"
version = "0.1.1"
description = "Distributed LLM Framework"
# readme = "README.md"
authors = [
{ name = "Ryan Olson", email = "rolson@nvidia.com" }
]
requires-python = ">=3.10"
dependencies = [
"pydantic>=2.10.6",
"uvloop>=0.21.0",
]
# [project.scripts]
# triton-distributed = "triton_distributed_rs:main"
[tool.maturin]
module-name = "triton_distributed_rs._core"
python-packages = ["triton_distributed_rs"]
python-source = "python"
[build-system]
requires = ["maturin>=1.0,<2.0", "patchelf"]
build-backend = "maturin"
[tool.codespell]
# note: pre-commit passes explicit lists of files here, which this skip file list doesn't override -
# this is only to allow you to run codespell interactively
# this also overrides the grpc_generated folder, since it is generated
# TODO add skip files for generated code
# skip = "./.git,./.github,./src/grpc_generated"
skip = "./.git,./.github"
# ignore short words, and typename parameters like OffsetT
ignore-regex = "\\b(.{1,4}|[A-Z]\\w*T)\\b"
# ignore allowed words
# ignoring atleast to avoid testing::AtLeast from getting flagged
ignore-words-list = "atleast"
# use the 'clear' dictionary for unambiguous spelling mistakes
builtin = "clear"
# disable warnings about binary files and wrong encoding
quiet-level = 3
[tool.isort]
profile = "black"
use_parentheses = true
multi_line_output = 3
include_trailing_comma = true
force_grid_wrap = 0
ensure_newline_before_comments = true
line_length = 88
balanced_wrapping = true
indent = " "
[tool.ruff]
# Same as Black.
line-length = 88
indent-width = 4
[tool.mypy]
# --disable-error-code: WAR large set of errors due to mypy not being run
# previously. We can slowly enable sets of errors to fix over time.
# disable_error_code = []
# --explicit-package-bases: WAR errors about duplicate module names used
# throughout project such as launch_workers.py
# explicit_package_bases = true
# --ignore-missing-imports: WAR too many errors when developing outside
# of container environment with PYTHONPATH set and packages installed.
# NOTE: Can possibly move mypy from pre-commit to a github action run only in
# a container with the expected environment and PYTHONPATH setup.
ignore_missing_imports = true
check_untyped_defs = true
import asyncio
from functools import wraps
from typing import Any, AsyncGenerator, Callable, Type
from pydantic import BaseModel, ValidationError
from triton_distributed_rs._core import DistributedRuntime
def triton_worker():
def decorator(func):
@wraps(func)
async def wrapper(*args, **kwargs):
loop = asyncio.get_running_loop()
runtime = DistributedRuntime(loop)
await func(runtime, *args, **kwargs)
# # wait for one of
# # 1. the task to complete
# # 2. the task to be cancelled
# done, pending = await asyncio.wait({task, cancelled}, return_when=asyncio.FIRST_COMPLETED)
# # i want to catch a SIGINT or SIGTERM or a cancellation event here
# try:
# # Call the actual function
# return await func(runtime, *args, **kwargs)
# finally:
# print("Decorator: Cleaning up runtime resources")
# # Perform cleanup actions here
return wrapper
return decorator
def triton_endpoint(
request_model: Type[BaseModel], response_model: Type[BaseModel]
) -> Callable:
def decorator(
func: Callable[..., AsyncGenerator[Any, None]]
) -> Callable[..., AsyncGenerator[Any, None]]:
@wraps(func)
async def wrapper(*args, **kwargs) -> AsyncGenerator[Any, None]:
# Validate the request
try:
if len(args) in [1, 2]:
args = list(args)
args[-1] = request_model.parse_raw(args[-1])
except ValidationError as e:
raise ValueError(f"Invalid request: {e}")
# Wrap the async generator
async for item in func(*args, **kwargs):
# Validate the response
# TODO: Validate the response
try:
yield item
except ValidationError as e:
raise ValueError(f"Invalid response: {e}")
return wrapper
return decorator
from typing import AsyncGenerator, AsyncIterator, Callable
class JsonLike:
"""
Any PyObject which can be serialized to JSON
"""
...
RequestHandler = Callable[[JsonLike], AsyncGenerator[JsonLike, None]]
class DistributedRuntime:
"""
The runtime object for a distributed NOVA applications
"""
...
def namespace(self, name: str, path: str) -> Namespace:
"""
Create a `Namespace` object
"""
...
class Namespace:
"""
A namespace is a collection of components
"""
...
def component(self, name: str) -> Component:
"""
Create a `Component` object
"""
...
class Component:
"""
A component is a collection of endpoints
"""
...
def create_service(self) -> None:
"""
Create a service
"""
...
def endpoint(self, name: str) -> Endpoint:
"""
Create an endpoint
"""
...
class Endpoint:
"""
An Endpoint is a single API endpoint
"""
...
async def serve_endpoint(self, handler: RequestHandler) -> None:
"""
Serve an endpoint discoverable by all connected clients at
`{{ namespace }}/components/{{ component_name }}/endpoints/{{ endpoint_name }}`
"""
...
async def client() -> Client:
"""
Create a `Client` capable of calling served instances of this endpoint
"""
...
class Client:
"""
A client capable of calling served instances of an endpoint
"""
...
async def random(self, request: JsonLike) -> AsyncIterator[JsonLike]:
"""
Pick a random instance of the endpoint and issue the request
"""
...
async def round_robin(self, request: JsonLike) -> AsyncIterator[JsonLike]:
"""
Pick the next instance of the endpoint in a round-robin fashion
"""
...
async def direct(self, request: JsonLike, instance: str) -> AsyncIterator[JsonLike]:
"""
Pick a specific instance of the endpoint
"""
...
pub use serde::{Deserialize, Serialize};
pub use triton_distributed::{
error,
pipeline::{
async_trait, AsyncEngine, AsyncEngineContextProvider, Data, ManyOut, ResponseStream,
SingleIn,
},
protocols::annotated::Annotated,
Error, Result,
};
use pyo3::prelude::*;
use pyo3_async_runtimes::TaskLocals;
use pythonize::{depythonize, pythonize};
use tokio::sync::mpsc;
use tokio_stream::{wrappers::ReceiverStream, StreamExt};
use tracing as log;
/// Add bingings from this crate to the provided module
pub fn add_to_module(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<PythonAsyncEngine>()?;
Ok(())
}
// todos:
// - [ ] enable context cancellation
// - this will likely require a change to the function signature python calling arguments
// - [ ] rename `PythonAsyncEngine` to `PythonServerStreamingEngine` to be more descriptive
// - [ ] other `AsyncEngine` implementations will have a similar pattern, i.e. one AsyncEngine
// implementation per struct
/// Rust/Python bridge that maps to the [`AsyncEngine`] trait
///
/// Currently this is only implemented for the [`SingleIn`] and [`ManyOut`] types; however,
/// more [`AsyncEngine`] implementations can be added in the future.
///
/// For the [`SingleIn`] and [`ManyOut`] case, this implementation will take a Python async
/// generator and convert it to a Rust async stream.
///
/// ```python
/// class ComputeEngine:
/// def __init__(self):
/// self.compute_engine = make_compute_engine()
///
/// def generate(self, request):
/// async generator():
/// async for output in self.compute_engine.generate(request):
/// yield output
/// return generator()
///
/// def main():
/// loop = asyncio.create_event_loop()
/// compute_engine = ComputeEngine()
/// engine = PythonAsyncEngine(compute_engine.generate, loop)
/// service = RustService()
/// service.add_engine("model_name", engine)
/// loop.run_until_complete(service.run())
/// ```
#[pyclass]
#[derive(Clone)]
pub struct PythonAsyncEngine {
generator: PyObject,
event_loop: PyObject,
}
#[pymethods]
impl PythonAsyncEngine {
/// Create a new instance of the PythonAsyncEngine
///
/// # Arguments
/// - `generator`: a Python async generator that will be used to generate responses
/// - `event_loop`: the Python event loop that will be used to run the generator
///
/// Note: In Rust land, the request and the response are both concrete; however, in
/// Python land, the request and response not strongly typed, meaning the generator
/// could accept a different type of request or return a different type of response
/// and we would not know until runtime.
#[new]
pub fn new(generator: PyObject, event_loop: PyObject) -> PyResult<Self> {
Ok(PythonAsyncEngine {
generator,
event_loop,
})
}
}
#[async_trait]
impl<Req, Resp> AsyncEngine<SingleIn<Req>, ManyOut<Annotated<Resp>>, Error> for PythonAsyncEngine
where
Req: Data + Serialize,
Resp: Data + for<'de> Deserialize<'de>,
{
async fn generate(&self, request: SingleIn<Req>) -> Result<ManyOut<Annotated<Resp>>, Error> {
// Create a context
let (request, context) = request.transfer(());
let id = context.id().to_string();
log::trace!("processing request: {}", id);
// Clone the PyObject to move into the thread
// Create a channel to communicate between the Python thread and the Rust async context
let (tx, rx) = mpsc::channel::<Annotated<Resp>>(128);
let tx_error = tx.clone();
let stream = Python::with_gil(|py| {
let py_request = pythonize(py, &request)?;
let gen = self.generator.call1(py, (py_request,))?;
let locals = TaskLocals::new(self.event_loop.bind(py).clone());
pyo3_async_runtimes::tokio::into_stream_with_locals_v1(locals, gen.into_bound(py))
})?;
let stream = Box::pin(stream);
let process = |item: Result<Py<PyAny>, PyErr>| -> Result<Annotated<Resp>, Error> {
let item = item
.map_err(|err| error!("error processing python async generator stream: {}", err))?;
let response = Python::with_gil(|py| depythonize::<Resp>(&item.into_bound(py)))?;
let response = Annotated::from_data(response);
Ok(response)
};
// process the stream
// any error thrown in the stream will be caught and complete the processing task
// errors are captured by a task that is watching the processing task
// the error will be emitted as an annotated error
let processor = tokio::spawn(async move {
log::trace!("processing stream from python async generator: {}", id);
let mut stream = stream;
while let Some(item) = stream.next().await {
// let mut done = false;
let response = match process(item) {
Ok(response) => response,
Err(err) => {
// done = true;
Annotated::from_error(err.to_string())
}
};
if tx.send(response).await.is_err() {
log::error!("generator response channel was dropped: {}", id);
return Err(error!("generator response channel was dropped"));
}
// if done {
// break;
// }
}
Result::<()>::Ok(())
});
tokio::spawn(async move {
match processor.await {
Ok(Ok(_)) => {}
Ok(Err(err)) => {
log::error!("error processing python async generator: {}", err);
tx_error
.send(Annotated::from_error(err.to_string()))
.await
.unwrap();
}
Err(err) => {
log::error!(
"error on tokio task for processing python async generator stream: {}",
err
);
tx_error
.send(Annotated::from_error(err.to_string()))
.await
.unwrap();
}
}
});
let stream = ReceiverStream::new(rx);
Ok(ResponseStream::new(Box::pin(stream), context.context()))
}
}
use futures::StreamExt;
use once_cell::sync::OnceCell;
use pyo3::exceptions::PyStopAsyncIteration;
use pyo3::types::PyString;
use pyo3::IntoPyObjectExt;
use pyo3::{exceptions::PyException, prelude::*};
use rs::pipeline::network::Ingress;
use std::{fmt::Display, sync::Arc};
use tokio::sync::Mutex;
use tracing as log;
use triton_distributed::{
self as rs,
pipeline::{EngineStream, ManyOut, SingleIn},
protocols::annotated::Annotated as RsAnnotated,
};
mod engine;
type JsonServerStreamingIngress =
Ingress<SingleIn<serde_json::Value>, ManyOut<RsAnnotated<serde_json::Value>>>;
static INIT: OnceCell<()> = OnceCell::new();
const DEFAULT_ANNOTATED_SETTING: Option<bool> = Some(true);
/// A Python module implemented in Rust. The name of this function must match
/// the `lib.name` setting in the `Cargo.toml`, else Python will not be able to
/// import the module.
#[pymodule]
fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<DistributedRuntime>()?;
m.add_class::<CancellationToken>()?;
m.add_class::<Namespace>()?;
m.add_class::<Component>()?;
m.add_class::<Endpoint>()?;
m.add_class::<Client>()?;
m.add_class::<AsyncResponseStream>()?;
engine::add_to_module(m)?;
// llm::http::add_to_module(m)?;
Ok(())
}
pub fn to_pyerr<E>(err: E) -> PyErr
where
E: Display,
{
PyException::new_err(format!("{}", err))
}
#[pyclass]
#[derive(Clone)]
struct DistributedRuntime {
inner: rs::DistributedRuntime,
event_loop: PyObject,
}
#[derive(Clone)]
#[pyclass]
struct CancellationToken {
inner: rs::CancellationToken,
}
#[pyclass]
struct Namespace {
inner: rs::component::Namespace,
event_loop: PyObject,
}
#[pyclass]
struct Component {
inner: rs::component::Component,
event_loop: PyObject,
}
#[pyclass]
struct Endpoint {
inner: rs::component::Endpoint,
event_loop: PyObject,
}
#[pyclass]
struct Client {
inner: rs::component::Client<serde_json::Value, serde_json::Value>,
}
#[pymethods]
impl DistributedRuntime {
#[new]
fn new(event_loop: PyObject) -> PyResult<Self> {
let rt = rs::Worker::from_settings().map_err(to_pyerr)?;
INIT.get_or_try_init(|| {
let primary = rt.tokio_runtime()?;
pyo3_async_runtimes::tokio::init_with_runtime(primary)
.map_err(|e| rs::error!("failed to initialize pyo3 static runtime: {:?}", e))?;
rs::OK(())
})
.map_err(to_pyerr)?;
let runtime = rt.runtime().clone();
let inner = rt
.runtime()
.secondary()
.block_on(rs::DistributedRuntime::from_settings(runtime))
.map_err(to_pyerr)?;
Ok(DistributedRuntime { inner, event_loop })
}
fn namespace(&self, name: String) -> PyResult<Namespace> {
Ok(Namespace {
inner: self.inner.namespace(name).map_err(to_pyerr)?,
event_loop: self.event_loop.clone(),
})
}
fn primary_token(&self) -> CancellationToken {
let inner = self.inner.runtime().primary_token();
CancellationToken { inner }
}
fn child_token(&self) -> CancellationToken {
let inner = self.inner.runtime().child_token();
CancellationToken { inner }
}
fn shutdown(&self) {
self.inner.runtime().shutdown();
}
fn event_loop(&self) -> PyObject {
self.event_loop.clone()
}
}
#[pymethods]
impl CancellationToken {
fn cancel(&self) {
self.inner.cancel();
}
fn cancelled<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let token = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
token.cancelled().await;
Ok(())
})
}
}
#[pymethods]
impl Component {
fn endpoint(&self, name: String) -> PyResult<Endpoint> {
let inner = self.inner.endpoint(name);
Ok(Endpoint {
inner,
event_loop: self.event_loop.clone(),
})
}
fn create_service<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let builder = self.inner.service_builder();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let _ = builder.create().await.map_err(to_pyerr)?;
Ok(())
})
}
}
#[pymethods]
impl Endpoint {
fn serve_endpoint<'p>(
&self,
py: Python<'p>,
generator: PyObject,
) -> PyResult<Bound<'p, PyAny>> {
let engine = Arc::new(engine::PythonAsyncEngine::new(
generator,
self.event_loop.clone(),
)?);
let ingress = JsonServerStreamingIngress::for_engine(engine).map_err(to_pyerr)?;
let builder = self.inner.endpoint_builder().handler(ingress);
pyo3_async_runtimes::tokio::future_into_py(py, async move {
builder.start().await.map_err(to_pyerr)?;
Ok(())
})
}
fn client<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let client = inner
.client::<serde_json::Value, serde_json::Value>()
.await
.map_err(to_pyerr)?;
Ok(Client { inner: client })
})
}
}
#[pymethods]
impl Namespace {
fn component(&self, name: String) -> PyResult<Component> {
let inner = self.inner.component(name).map_err(to_pyerr)?;
Ok(Component {
inner,
event_loop: self.event_loop.clone(),
})
}
}
#[pymethods]
impl Client {
/// Get list of current endpoints
fn endpoint_ids(&self) -> Vec<i64> {
self.inner.endpoint_ids().borrow().clone()
}
fn wait_for_endpoints<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let inner = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
inner.wait_for_endpoints().await.map_err(to_pyerr)
})
}
/// Issue a request to the endpoint using the default routing strategy.
#[pyo3(signature = (request, annotated=DEFAULT_ANNOTATED_SETTING))]
fn generate<'p>(
&self,
py: Python<'p>,
request: PyObject,
annotated: Option<bool>,
) -> PyResult<Bound<'p, PyAny>> {
self.random(py, request, annotated)
}
/// Send a request to the next endpoint in a round-robin fashion.
#[pyo3(signature = (request, annotated=DEFAULT_ANNOTATED_SETTING))]
fn round_robin<'p>(
&self,
py: Python<'p>,
request: PyObject,
annotated: Option<bool>,
) -> PyResult<Bound<'p, PyAny>> {
let request: serde_json::Value = pythonize::depythonize(&request.into_bound(py))?;
let annotated = annotated.unwrap_or(false);
let (tx, rx) = tokio::sync::mpsc::channel(32);
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let stream = client.round_robin(request.into()).await.map_err(to_pyerr)?;
tokio::spawn(process_stream(stream, tx));
Ok(AsyncResponseStream {
rx: Arc::new(Mutex::new(rx)),
annotated,
})
})
}
/// Send a request to a random endpoint.
#[pyo3(signature = (request, annotated=DEFAULT_ANNOTATED_SETTING))]
fn random<'p>(
&self,
py: Python<'p>,
request: PyObject,
annotated: Option<bool>,
) -> PyResult<Bound<'p, PyAny>> {
let request: serde_json::Value = pythonize::depythonize(&request.into_bound(py))?;
let annotated = annotated.unwrap_or(false);
let (tx, rx) = tokio::sync::mpsc::channel(32);
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let stream = client.random(request.into()).await.map_err(to_pyerr)?;
tokio::spawn(process_stream(stream, tx));
Ok(AsyncResponseStream {
rx: Arc::new(Mutex::new(rx)),
annotated,
})
})
}
/// Directly send a request to a specific endpoint.
#[pyo3(signature = (request, endpoint_id, annotated=DEFAULT_ANNOTATED_SETTING))]
fn direct<'p>(
&self,
py: Python<'p>,
request: PyObject,
endpoint_id: i64,
annotated: Option<bool>,
) -> PyResult<Bound<'p, PyAny>> {
let request: serde_json::Value = pythonize::depythonize(&request.into_bound(py))?;
let annotated = annotated.unwrap_or(false);
let (tx, rx) = tokio::sync::mpsc::channel(32);
let client = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let stream = client
.direct(request.into(), endpoint_id)
.await
.map_err(to_pyerr)?;
tokio::spawn(process_stream(stream, tx));
Ok(AsyncResponseStream {
rx: Arc::new(Mutex::new(rx)),
annotated,
})
})
}
}
async fn process_stream(
stream: EngineStream<serde_json::Value>,
tx: tokio::sync::mpsc::Sender<RsAnnotated<PyObject>>,
) {
let mut stream = stream;
while let Some(response) = stream.next().await {
// Convert the response to a PyObject using Python's GIL
let annotated: RsAnnotated<serde_json::Value> = serde_json::from_value(response).unwrap();
let annotated: RsAnnotated<PyObject> = annotated.map_data(|data| {
let result = Python::with_gil(|py| match pythonize::pythonize(py, &data) {
Ok(pyobj) => Ok(pyobj.into()),
Err(e) => Err(e.to_string()),
});
result
});
let is_error = annotated.is_error();
// Send the PyObject through the channel or log an error
if let Err(e) = tx.send(annotated).await {
log::error!("Failed to send response: {:?}", e);
}
if is_error {
break;
}
}
}
#[pyclass]
struct AsyncResponseStream {
rx: Arc<Mutex<tokio::sync::mpsc::Receiver<RsAnnotated<PyObject>>>>,
annotated: bool,
}
#[pymethods]
impl AsyncResponseStream {
/// This method is required to implement the `AsyncIterator` protocol.
#[pyo3(name = "__aiter__")]
fn aiter(slf: PyRef<Self>, py: Python) -> PyResult<Py<PyAny>> {
slf.into_py_any(py)
}
/// This method is required to implement the `AsyncIterator` protocol.
#[pyo3(name = "__anext__")]
fn next<'p>(&self, py: Python<'p>) -> PyResult<Bound<'p, PyAny>> {
let rx = self.rx.clone();
let annotated = self.annotated;
pyo3_async_runtimes::tokio::future_into_py(py, async move {
loop {
let value = rx.lock().await.recv().await;
match value {
Some(pyobj) => {
let pyobj = match pyobj.ok() {
Ok(pyobj) => pyobj,
Err(e) => {
return Err(PyErr::new::<pyo3::exceptions::PyValueError, _>(e));
}
};
if annotated {
let object = Annotated { inner: pyobj };
#[allow(deprecated)]
let object = Python::with_gil(|py| object.into_py(py));
return Ok(object);
} else {
match pyobj.data {
Some(data) => return Ok(data),
None => continue,
}
}
}
None => return Err(PyStopAsyncIteration::new_err("Stream exhausted")),
}
}
})
}
}
#[pyclass]
struct Annotated {
inner: RsAnnotated<PyObject>,
}
#[pymethods]
impl Annotated {
#[new]
fn new(data: PyObject) -> Self {
Annotated {
inner: RsAnnotated::from_data(data),
}
}
fn is_error(&self) -> bool {
self.inner.is_error()
}
fn data(&self) -> Option<PyObject> {
self.inner.data.clone()
}
fn event(&self) -> Option<String> {
self.inner.event.clone()
}
fn comments(&self) -> Option<Vec<String>> {
self.inner.comment.clone()
}
fn id(&self) -> Option<String> {
self.inner.id.clone()
}
#[pyo3(name = "__repr__")]
fn _repr(&self, py: Python) -> String {
let data = self.inner.data.clone().map(|obj| {
obj.call_method0(py, "__repr__")
.and_then(|repr_obj| repr_obj.extract::<Py<PyString>>(py))
.map(|py_str| py_str.to_string_lossy(py).into_owned())
.unwrap_or_else(|_| "<failed_repr>".to_string())
});
format!(
"Annotated(data={}, event={}, comment={:?}, id={})",
data.unwrap_or_else(|| "<no_data>".to_string()),
self.inner.event.as_deref().unwrap_or("None"),
self.inner.comment.as_deref().unwrap_or(&[]),
self.inner.id.as_deref().unwrap_or("None")
)
}
}
This diff is collapsed.
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment