request.rs 4.23 KB
Newer Older
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::cmp::Ordering;
use std::sync::Weak;
Ryan Olson's avatar
Ryan Olson committed
18
use tokio::sync::oneshot;
19

Ryan Olson's avatar
Ryan Olson committed
20
use crate::block_manager::block::{
21
    BlockMetadata, ImmutableBlock, MutableBlock, locality::LocalityProvider,
Ryan Olson's avatar
Ryan Olson committed
22
};
23
24
25
use crate::block_manager::pool::BlockPoolError;
use crate::block_manager::storage::Storage;

26
27
28
/// Higher priority offloads are done first.
/// If two offloads have the same priority, the one that was requested first is done first.
#[derive(PartialEq, Eq)]
29
30
31
32
33
pub struct OffloadRequestKey {
    pub priority: u64,
    pub timestamp: u64,
}

34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
impl PartialOrd for OffloadRequestKey {
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

impl Ord for OffloadRequestKey {
    fn cmp(&self, other: &Self) -> Ordering {
        other
            .priority
            .cmp(&self.priority)
            .then(self.timestamp.cmp(&other.timestamp))
    }
}

49
50
51
/// Data needed to offload a block.
/// While the block is in the offload queue, we hold a weak reference to it.
/// This way, we don't prevent the block from being reused if needed.
Ryan Olson's avatar
Ryan Olson committed
52
pub struct OffloadRequest<S: Storage, L: LocalityProvider, M: BlockMetadata> {
53
    pub key: OffloadRequestKey,
Ryan Olson's avatar
Ryan Olson committed
54
    pub block: Weak<MutableBlock<S, L, M>>,
55
56
57
    pub sequence_hash: u64,
}

Ryan Olson's avatar
Ryan Olson committed
58
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> PartialOrd for OffloadRequest<S, L, M> {
59
60
61
62
63
64
    fn partial_cmp(&self, other: &Self) -> Option<Ordering> {
        Some(self.cmp(other))
    }
}

/// Order offload requests by priority, high to low.
Ryan Olson's avatar
Ryan Olson committed
65
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> Ord for OffloadRequest<S, L, M> {
66
67
68
69
70
71
    fn cmp(&self, other: &Self) -> Ordering {
        self.key.cmp(&other.key)
    }
}

/// Equality is based on sequence hash, priority, and location.
Ryan Olson's avatar
Ryan Olson committed
72
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> PartialEq for OffloadRequest<S, L, M> {
73
74
75
76
77
    fn eq(&self, other: &Self) -> bool {
        self.key == other.key
    }
}

Ryan Olson's avatar
Ryan Olson committed
78
impl<S: Storage, L: LocalityProvider, M: BlockMetadata> Eq for OffloadRequest<S, L, M> {}
79

Ryan Olson's avatar
Ryan Olson committed
80
81
82
83
84
pub type BlockResult<Target, Locality, Metadata> =
    Result<Vec<ImmutableBlock<Target, Locality, Metadata>>, BlockPoolError>;

pub type ResponseSender<Target, Locality, Metadata> =
    oneshot::Sender<Result<Vec<ImmutableBlock<Target, Locality, Metadata>>, BlockPoolError>>;
85
86
87

/// Data needed for onboarding.
/// Unlike offloading, we need a means to return the resulting blocks to the caller.
Ryan Olson's avatar
Ryan Olson committed
88
89
90
91
92
93
94
95
96
pub struct OnboardRequest<
    Source: Storage,
    Target: Storage,
    Locality: LocalityProvider,
    M: BlockMetadata,
> {
    pub blocks: Vec<ImmutableBlock<Source, Locality, M>>,
    pub response_tx: ResponseSender<Target, Locality, M>,
    pub targets: Option<Vec<MutableBlock<Target, Locality, M>>>,
97
98
}

Ryan Olson's avatar
Ryan Olson committed
99
100
101
impl<Source: Storage, Target: Storage, Locality: LocalityProvider, M: BlockMetadata>
    OnboardRequest<Source, Target, Locality, M>
{
102
    pub fn new(
Ryan Olson's avatar
Ryan Olson committed
103
104
105
        blocks: Vec<ImmutableBlock<Source, Locality, M>>,
        response_tx: ResponseSender<Target, Locality, M>,
        targets: Option<Vec<MutableBlock<Target, Locality, M>>>,
106
107
108
109
    ) -> Self {
        Self {
            blocks,
            response_tx,
Ryan Olson's avatar
Ryan Olson committed
110
            targets,
111
112
113
        }
    }
}
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn test_offload_request_key_ordering() {
        let key1 = OffloadRequestKey {
            priority: 1,
            timestamp: 1,
        };

        let key2 = OffloadRequestKey {
            priority: 2,
            timestamp: 2,
        };

        assert!(key2 < key1);

        let key3 = OffloadRequestKey {
            priority: 2,
            timestamp: 3,
        };

        assert!(key2 < key3);
    }
}