kv.rs 3.11 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use super::*;

#[pyclass]
pub(crate) struct KvRouter {
    inner: Arc<llm_rs::kv_router::KvRouter>,
}

#[pymethods]
impl KvRouter {
    #[new]
GuanLuo's avatar
GuanLuo committed
26
    // [FXIME] 'drt' can be obtained from 'component'
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
    fn new(drt: DistributedRuntime, component: Component) -> PyResult<Self> {
        let runtime = pyo3_async_runtimes::tokio::get_runtime();
        runtime.block_on(async {
            let inner = llm_rs::kv_router::KvRouter::from_runtime(
                drt.inner.clone(),
                component.inner.clone(),
            )
            .await
            .map_err(to_pyerr)?;
            Ok(Self { inner })
        })
    }

    fn schedule<'p>(
        &self,
        py: Python<'p>,
        token_ids: Vec<u32>,
        lora_id: u64,
    ) -> PyResult<Bound<'p, PyAny>> {
        let router = self.inner.clone();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
GuanLuo's avatar
GuanLuo committed
48
            let worker_id = router
49
50
51
                .schedule(&token_ids, lora_id)
                .await
                .map_err(to_pyerr)?;
GuanLuo's avatar
GuanLuo committed
52
            Ok(worker_id)
53
54
55
        })
    }
}
GuanLuo's avatar
GuanLuo committed
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71

#[pyclass]
pub(crate) struct KvMetricsPublisher {
    inner: Arc<llm_rs::kv_router::publisher::KvMetricsPublisher>,
}

#[pymethods]
impl KvMetricsPublisher {
    #[new]
    fn new() -> PyResult<Self> {
        let inner = llm_rs::kv_router::publisher::KvMetricsPublisher::new().map_err(to_pyerr)?;
        Ok(Self {
            inner: inner.into(),
        })
    }

Alec's avatar
Alec committed
72
    fn create_endpoint<'p>(
GuanLuo's avatar
GuanLuo committed
73
74
75
76
77
78
79
        &self,
        py: Python<'p>,
        component: Component,
    ) -> PyResult<Bound<'p, PyAny>> {
        let rs_publisher = self.inner.clone();
        let rs_component = component.inner.clone();
        pyo3_async_runtimes::tokio::future_into_py(py, async move {
80
            rs_publisher
81
                .create_endpoint(rs_component)
GuanLuo's avatar
GuanLuo committed
82
83
84
85
86
87
                .await
                .map_err(to_pyerr)?;
            Ok(())
        })
    }

88
    fn publish(
GuanLuo's avatar
GuanLuo committed
89
        &self,
90
        _py: Python,
GuanLuo's avatar
GuanLuo committed
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
        request_active_slots: u64,
        request_total_slots: u64,
        kv_active_blocks: u64,
        kv_total_blocks: u64,
    ) -> PyResult<()> {
        self.inner
            .publish(
                llm_rs::kv_router::protocols::ForwardPassMetrics {
                    request_active_slots,
                    request_total_slots,
                    kv_active_blocks,
                    kv_total_blocks,
                }
                .into(),
            )
            .map_err(to_pyerr)
    }
}