registry.rs 21.9 KB
Newer Older
1
// SPDX-FileCopyrightText: Copyright (c) 2024-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
Ryan Olson's avatar
Ryan Olson committed
2
3
// SPDX-License-Identifier: Apache-2.0

4
5
6
7
//! # KV Cache Block Registration
//!
//! - This module is responsible for maintaining a registry of all blocks currently within a pool.
//!   This consists of two components: A global registry of all blocks, and a per-pool registry of blocks.
8
9
10
//! - The global registry is keyed by sequence hash and storage tier. If two blocks in different pools
//!   have the same sequence hash but live in different tiers, they keep distinct registration handles
//!   so KVBM can emit per-tier events. The global registry is shared across all pools.
11
12
13
14
15
16
17
18
19
20
21
22
//! - The per-pool registry is a mapping of sequence hashes to block handles. This is used to track which blocks are
//!   currently within a specific pool. The block handle is unique across pools, and is used to track the block's lifetime.
//! - When a block is in the registered state, it has a unique block handle and a possibly shared registration handle.
//!
//! ## Workflow
//!
//! 1. When a block is registered into a pool, we create a unique block handle.
//! 2. We then check the global registry to see if the block already exists in any other pool.
//! 3. If it does, we use the existing registration handle. Otherwise, we create a new one.
//! 4. When the block handle is dropped, it means that the block is no longer in the pool.
//! 5. When the registration handle is dropped, it means that the block is no longer in any pool.

Ryan Olson's avatar
Ryan Olson committed
23
24
use std::{
    collections::HashMap,
25
    sync::{Arc, Mutex, Weak},
Ryan Olson's avatar
Ryan Olson committed
26
27
28
29
30
};

use super::super::events::{EventManager, EventReleaseManager, PublishHandle};
use super::state::BlockState;

31
use crate::block_manager::kv_consolidator::StorageTier;
Ryan Olson's avatar
Ryan Olson committed
32
33
34
use crate::tokens::{BlockHash, SequenceHash, TokenBlock};

use derive_getters::Getters;
35
36
use tokio::{runtime::Handle, sync::mpsc};

37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
pub struct RegistrationKey {
    sequence_hash: SequenceHash,
    storage_tier: StorageTier,
}

impl RegistrationKey {
    fn new(sequence_hash: SequenceHash, storage_tier: StorageTier) -> Self {
        Self {
            sequence_hash,
            storage_tier,
        }
    }
}

pub type GlobalRegistry = Arc<Mutex<HashMap<RegistrationKey, Weak<RegistrationHandle>>>>;
Ryan Olson's avatar
Ryan Olson committed
53
54

#[derive(Debug, thiserror::Error)]
Tianer Zhou's avatar
Tianer Zhou committed
55
pub enum BlockRegistrationError {
Ryan Olson's avatar
Ryan Olson committed
56
57
58
59
60
61
62
    #[error("Block already registered")]
    BlockAlreadyRegistered(SequenceHash),

    #[error("Invalid state: {0}")]
    InvalidState(String),
}

63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
/// A block entry is a handle to a block that is registered in the pool.
/// On drop, we need to notify the pool that the block has been unregistered.
/// This is different than the registration handle, which is only dropped when the block is no longer in ANY pool.
#[derive(Debug)]
pub struct BlockHandle {
    sequence_hash: SequenceHash,
    unregister_tx: mpsc::UnboundedSender<SequenceHash>,
}

impl BlockHandle {
    pub fn new(
        sequence_hash: SequenceHash,
        unregister_tx: mpsc::UnboundedSender<SequenceHash>,
    ) -> Self {
        Self {
            sequence_hash,
            unregister_tx,
        }
    }
}

impl Drop for BlockHandle {
    fn drop(&mut self) {
        let _ = self.unregister_tx.send(self.sequence_hash);
    }
}
Ryan Olson's avatar
Ryan Olson committed
89
90

pub struct BlockRegistry {
91
    blocks: Arc<Mutex<HashMap<SequenceHash, Weak<BlockHandle>>>>,
92
    storage_tier: StorageTier,
Ryan Olson's avatar
Ryan Olson committed
93
    event_manager: Arc<dyn EventManager>,
94
95
    global_registry: GlobalRegistry,
    unregister_tx: mpsc::UnboundedSender<SequenceHash>,
Ryan Olson's avatar
Ryan Olson committed
96
97
98
}

impl BlockRegistry {
99
100
101
102
    pub fn new(
        event_manager: Arc<dyn EventManager>,
        global_registry: GlobalRegistry,
        async_runtime: Handle,
103
        storage_tier: StorageTier,
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
    ) -> Self {
        let (unregister_tx, mut unregister_rx) = mpsc::unbounded_channel();

        let blocks: Arc<Mutex<HashMap<SequenceHash, Weak<BlockHandle>>>> =
            Arc::new(Mutex::new(HashMap::new()));

        let blocks_clone = blocks.clone();
        let global_registry_clone = global_registry.clone();
        async_runtime.spawn(async move {
            let blocks = blocks_clone;
            let global_registry = global_registry_clone;
            while let Some(sequence_hash) = unregister_rx.recv().await {
                {
                    let mut blocks = blocks.lock().unwrap();

119
120
121
122
                    if let Some(handle) = blocks.get(&sequence_hash)
                        && handle.upgrade().is_none()
                    {
                        blocks.remove(&sequence_hash);
123
124
125
126
                    }
                }

                let mut global_registry = global_registry.lock().unwrap();
127
                let registration_key = RegistrationKey::new(sequence_hash, storage_tier);
128

129
                if let Some(entry) = global_registry.get(&registration_key)
130
131
                    && entry.upgrade().is_none()
                {
132
                    global_registry.remove(&registration_key);
133
134
135
136
                }
            }
        });

Ryan Olson's avatar
Ryan Olson committed
137
        Self {
138
            blocks,
139
            storage_tier,
Ryan Olson's avatar
Ryan Olson committed
140
            event_manager,
141
142
            global_registry,
            unregister_tx,
Ryan Olson's avatar
Ryan Olson committed
143
144
145
146
        }
    }

    pub fn is_registered(&self, sequence_hash: SequenceHash) -> bool {
147
        let blocks = self.blocks.lock().unwrap();
148
149
150
151
        if let Some(handle) = blocks.get(&sequence_hash)
            && let Some(_handle) = handle.upgrade()
        {
            return true;
Ryan Olson's avatar
Ryan Olson committed
152
153
154
155
156
157
158
        }
        false
    }

    pub fn register_block(
        &mut self,
        block_state: &mut BlockState,
Tianer Zhou's avatar
Tianer Zhou committed
159
    ) -> Result<Option<PublishHandle>, BlockRegistrationError> {
Ryan Olson's avatar
Ryan Olson committed
160
        match block_state {
Tianer Zhou's avatar
Tianer Zhou committed
161
            BlockState::Reset => Err(BlockRegistrationError::InvalidState(
Ryan Olson's avatar
Ryan Olson committed
162
163
                "Block is in Reset state".to_string(),
            )),
Tianer Zhou's avatar
Tianer Zhou committed
164
            BlockState::Partial(_partial) => Err(BlockRegistrationError::InvalidState(
Ryan Olson's avatar
Ryan Olson committed
165
166
167
168
169
                "Block is in Partial state".to_string(),
            )),

            BlockState::Complete(state) => {
                let sequence_hash = state.token_block().sequence_hash();
170
171
172
                let mut blocks = self.blocks.lock().unwrap();

                // If an identical block already exists in this pool, return an error.
173
174
175
176
177
178
                if let Some(handle) = blocks.get(&sequence_hash)
                    && let Some(_handle) = handle.upgrade()
                {
                    return Err(BlockRegistrationError::BlockAlreadyRegistered(
                        sequence_hash,
                    ));
Ryan Olson's avatar
Ryan Olson committed
179
180
                }

181
182
183
184
185
186
187
188
                let mut publish_handle = None;

                let block_handle =
                    Arc::new(BlockHandle::new(sequence_hash, self.unregister_tx.clone()));

                let reg_handle = 'reg_block: {
                    // Now, check the global registry.
                    let mut global_registry = self.global_registry.lock().unwrap();
189
                    let registration_key = RegistrationKey::new(sequence_hash, self.storage_tier);
190
191

                    // If an identical block exists in other pool, use the same registration handle.
192
                    if let Some(handle) = global_registry.get(&registration_key)
193
194
195
                        && let Some(handle) = handle.upgrade()
                    {
                        break 'reg_block handle;
196
                    }
Ryan Olson's avatar
Ryan Olson committed
197

198
199
200
201
                    // Otherwise, create a new registration handle.
                    publish_handle = Some(Self::create_publish_handle(
                        state.token_block(),
                        self.event_manager.clone(),
202
                        self.storage_tier,
203
204
205
206
                    ));
                    let reg_handle = publish_handle.as_ref().unwrap().remove_handle();

                    // Insert the registration handle into the global registry.
207
                    global_registry.insert(registration_key, Arc::downgrade(&reg_handle));
208
209
210
211
212

                    reg_handle
                };

                blocks.insert(sequence_hash, Arc::downgrade(&block_handle));
Ryan Olson's avatar
Ryan Olson committed
213
214

                // Update the [BlockState] to [BlockState::Registered]
215
216
217
218
                let _ = std::mem::replace(
                    block_state,
                    BlockState::Registered(reg_handle, block_handle),
                );
Ryan Olson's avatar
Ryan Olson committed
219
220
221

                Ok(publish_handle)
            }
222
            BlockState::Registered(registered, _) => Err(
Tianer Zhou's avatar
Tianer Zhou committed
223
                BlockRegistrationError::BlockAlreadyRegistered(registered.sequence_hash()),
Ryan Olson's avatar
Ryan Olson committed
224
225
226
227
228
229
230
            ),
        }
    }

    fn create_publish_handle(
        token_block: &TokenBlock,
        event_manager: Arc<dyn EventManager>,
231
        storage_tier: StorageTier,
Ryan Olson's avatar
Ryan Olson committed
232
    ) -> PublishHandle {
233
234
        let reg_handle =
            RegistrationHandle::from_token_block(token_block, event_manager.clone(), storage_tier);
Ryan Olson's avatar
Ryan Olson committed
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250

        PublishHandle::new(reg_handle, event_manager)
    }
}

#[derive(Getters)]
pub struct RegistrationHandle {
    #[getter(copy)]
    block_hash: BlockHash,

    #[getter(copy)]
    sequence_hash: SequenceHash,

    #[getter(copy)]
    parent_sequence_hash: Option<SequenceHash>,

251
252
253
254
255
256
257
258
259
    #[getter(copy)]
    external_sequence_hash: Option<SequenceHash>,

    #[getter(copy)]
    external_parent_sequence_hash: Option<SequenceHash>,

    #[getter(copy)]
    storage_tier: StorageTier,

Ryan Olson's avatar
Ryan Olson committed
260
261
    #[getter(skip)]
    release_manager: Arc<dyn EventReleaseManager>,
262
263

    token_block: TokenBlock,
Ryan Olson's avatar
Ryan Olson committed
264
265
266
}

impl RegistrationHandle {
267
268
269
270
271
272
273
274
275
276
    /// Returns the block size (number of tokens in the block)
    pub fn block_size(&self) -> usize {
        self.token_block.block_size()
    }

    /// Returns a reference to the tokens in this block
    pub fn tokens(&self) -> &crate::tokens::Tokens {
        self.token_block.tokens()
    }

277
278
279
280
281
282
283
284
285
286
287
    /// Returns the router-facing sequence hash for this block.
    pub fn published_sequence_hash(&self) -> SequenceHash {
        self.external_sequence_hash.unwrap_or(self.sequence_hash)
    }

    /// Returns the router-facing parent sequence hash for this block.
    pub fn published_parent_sequence_hash(&self) -> Option<SequenceHash> {
        self.external_parent_sequence_hash
            .or(self.parent_sequence_hash)
    }

Ryan Olson's avatar
Ryan Olson committed
288
289
290
    fn from_token_block(
        token_block: &TokenBlock,
        release_manager: Arc<dyn EventReleaseManager>,
291
        storage_tier: StorageTier,
Ryan Olson's avatar
Ryan Olson committed
292
293
294
295
296
    ) -> Self {
        Self {
            block_hash: token_block.block_hash(),
            sequence_hash: token_block.sequence_hash(),
            parent_sequence_hash: token_block.parent_sequence_hash(),
297
298
299
            external_sequence_hash: token_block.external_sequence_hash(),
            external_parent_sequence_hash: token_block.external_parent_sequence_hash(),
            storage_tier,
Ryan Olson's avatar
Ryan Olson committed
300
            release_manager,
301
            token_block: token_block.clone(),
Ryan Olson's avatar
Ryan Olson committed
302
303
304
305
306
307
308
309
        }
    }
}

impl std::fmt::Debug for RegistrationHandle {
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
310
311
312
313
314
315
316
            "RegistrationHandle {{ sequence_hash: {}; block_hash: {}; parent_sequence_hash: {:?}; external_sequence_hash: {:?}; external_parent_sequence_hash: {:?}; storage_tier: {:?} }}",
            self.sequence_hash,
            self.block_hash,
            self.parent_sequence_hash,
            self.external_sequence_hash,
            self.external_parent_sequence_hash,
            self.storage_tier
Ryan Olson's avatar
Ryan Olson committed
317
318
319
320
321
322
323
324
325
326
327
328
329
330
        )
    }
}

impl Drop for RegistrationHandle {
    fn drop(&mut self) {
        self.release_manager.block_release(self);
    }
}

#[cfg(test)]
mod tests {
    use super::*;

331
    use crate::block_manager::events::NullEventManager;
Ryan Olson's avatar
Ryan Olson committed
332
    use crate::block_manager::events::tests::{EventType, MockEventManager};
333
    use crate::block_manager::kv_consolidator::StorageTier;
Ryan Olson's avatar
Ryan Olson committed
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
    use crate::tokens::{TokenBlockSequence, Tokens};

    fn create_sequence() -> TokenBlockSequence {
        let tokens = Tokens::from(vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 10]);

        // NOTE: 1337 was the original seed, so we are temporarily using that here to prove the logic has not changed
        let sequence = TokenBlockSequence::new(tokens, 4, Some(1337_u64));

        assert_eq!(sequence.blocks().len(), 2);
        assert_eq!(sequence.current_block().len(), 2);

        assert_eq!(sequence.blocks()[0].tokens(), &vec![1, 2, 3, 4]);
        assert_eq!(sequence.blocks()[0].sequence_hash(), 14643705804678351452);

        assert_eq!(sequence.blocks()[1].tokens(), &vec![5, 6, 7, 8]);
        assert_eq!(sequence.blocks()[1].sequence_hash(), 4945711292740353085);

        assert_eq!(sequence.current_block().tokens(), &vec![9, 10]);

        sequence
    }

    #[test]
    fn test_mock_event_manager_with_single_publish_handle() {
        let sequence = create_sequence();

        let (event_manager, mut rx) = MockEventManager::new();

362
363
364
365
366
        let publish_handle = BlockRegistry::create_publish_handle(
            &sequence.blocks()[0],
            event_manager.clone(),
            StorageTier::Device,
        );
Ryan Olson's avatar
Ryan Olson committed
367
368
369
370
371
372
373
374
375
376
377
378

        // no event should have been triggered
        assert!(rx.try_recv().is_err());

        // we shoudl get two events when this is dropped, since we never took ownership of the RegistrationHandle
        drop(publish_handle);

        // the first event should be a Register event
        let events = rx.try_recv().unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0],
379
            EventType::Register(sequence.blocks()[0].sequence_hash(), StorageTier::Device)
Ryan Olson's avatar
Ryan Olson committed
380
381
382
383
384
385
386
        );

        // the second event should be a Remove event
        let events = rx.try_recv().unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0],
387
            EventType::Remove(sequence.blocks()[0].sequence_hash(), StorageTier::Device)
Ryan Olson's avatar
Ryan Olson committed
388
389
390
391
392
393
394
395
396
397
398
399
400
401
        );

        // there should be no more events
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn test_mock_event_manager_single_publish_handle_removed() {
        let sequence = create_sequence();
        let block_to_test = &sequence.blocks()[0];
        let expected_sequence_hash = block_to_test.sequence_hash();

        let (event_manager, mut rx) = MockEventManager::new();

402
403
404
405
406
        let publish_handle = BlockRegistry::create_publish_handle(
            block_to_test,
            event_manager.clone(),
            StorageTier::Device,
        );
Ryan Olson's avatar
Ryan Olson committed
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423

        // Remove the registration handle before dropping the publish handle
        let reg_handle = publish_handle.remove_handle();

        // no event should have been triggered yet
        assert!(rx.try_recv().is_err());

        // Drop the publish handle - it SHOULD trigger a Register event now because remove_handle doesn't disarm
        drop(publish_handle);
        let register_events = rx.try_recv().unwrap();
        assert_eq!(
            register_events.len(),
            1,
            "Register event should be triggered on PublishHandle drop"
        );
        assert_eq!(
            register_events[0],
424
            EventType::Register(expected_sequence_hash, StorageTier::Device),
Ryan Olson's avatar
Ryan Olson committed
425
426
427
428
429
430
431
432
433
434
            "Expected Register event"
        );

        // Drop the registration handle - this SHOULD trigger the Remove event
        drop(reg_handle);

        let events = rx.try_recv().unwrap();
        assert_eq!(events.len(), 1);
        assert_eq!(
            events[0],
435
            EventType::Remove(expected_sequence_hash, StorageTier::Device),
Ryan Olson's avatar
Ryan Olson committed
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
            "Only Remove event should be triggered"
        );

        // there should be no more events
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn test_mock_event_manager_publisher_multiple_handles_removed() {
        let sequence = create_sequence();
        let block1 = &sequence.blocks()[0];
        let block2 = &sequence.blocks()[1];
        let hash1 = block1.sequence_hash();
        let hash2 = block2.sequence_hash();

        let (event_manager, mut rx) = MockEventManager::new();
        let mut publisher = event_manager.publisher();

454
455
456
457
458
459
460
461
462
463
        let publish_handle1 = BlockRegistry::create_publish_handle(
            block1,
            event_manager.clone(),
            StorageTier::Device,
        );
        let publish_handle2 = BlockRegistry::create_publish_handle(
            block2,
            event_manager.clone(),
            StorageTier::Device,
        );
Ryan Olson's avatar
Ryan Olson committed
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485

        // Remove handles before adding to publisher
        let reg_handle1 = publish_handle1.remove_handle();
        let reg_handle2 = publish_handle2.remove_handle();

        // Add disarmed handles to publisher
        publisher.take_handle(publish_handle1);
        publisher.take_handle(publish_handle2);

        // no events yet
        assert!(rx.try_recv().is_err());

        // Drop the publisher - should trigger a single Publish event with both Register events
        drop(publisher);

        let events = rx.try_recv().unwrap();
        assert_eq!(
            events.len(),
            2,
            "Should receive two Register events in one batch"
        );
        // Order isn't guaranteed, so check for both
486
487
        assert!(events.contains(&EventType::Register(hash1, StorageTier::Device)));
        assert!(events.contains(&EventType::Register(hash2, StorageTier::Device)));
Ryan Olson's avatar
Ryan Olson committed
488
489
490
491
492
493
494
495

        // no more events immediately after publish
        assert!(rx.try_recv().is_err());

        // Drop registration handles individually - should trigger Remove events
        drop(reg_handle1);
        let events1 = rx.try_recv().unwrap();
        assert_eq!(events1.len(), 1);
496
        assert_eq!(events1[0], EventType::Remove(hash1, StorageTier::Device));
Ryan Olson's avatar
Ryan Olson committed
497
498
499
500

        drop(reg_handle2);
        let events2 = rx.try_recv().unwrap();
        assert_eq!(events2.len(), 1);
501
        assert_eq!(events2[0], EventType::Remove(hash2, StorageTier::Device));
Ryan Olson's avatar
Ryan Olson committed
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525

        // no more events
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn test_publisher_empty_drop() {
        let (event_manager, mut rx) = MockEventManager::new();
        let publisher = event_manager.publisher();

        drop(publisher);
        // No events should be sent
        assert!(rx.try_recv().is_err());
    }

    #[test]
    fn test_publisher_publish_multiple_times() {
        let sequence = create_sequence();
        let block1 = &sequence.blocks()[0];
        let hash1 = block1.sequence_hash();

        let (event_manager, mut rx) = MockEventManager::new();
        let mut publisher = event_manager.publisher();

526
527
528
529
530
        let publish_handle1 = BlockRegistry::create_publish_handle(
            block1,
            event_manager.clone(),
            StorageTier::Device,
        );
Ryan Olson's avatar
Ryan Olson committed
531
532
533
534
535
536
537

        publisher.take_handle(publish_handle1);

        // First publish call
        publisher.publish();
        let events = rx.try_recv().unwrap();
        assert_eq!(events.len(), 1);
538
        assert_eq!(events[0], EventType::Register(hash1, StorageTier::Device));
Ryan Olson's avatar
Ryan Olson committed
539
540
541
542
543
544
545
546
547
548
549

        // The RegistrationHandle Arc was taken by the publisher and dropped after the publish call
        // So, the Remove event should follow immediately.
        let remove_events = rx.try_recv().unwrap();
        assert_eq!(
            remove_events.len(),
            1,
            "Remove event should be triggered after publish consumes the handle"
        );
        assert_eq!(
            remove_events[0],
550
            EventType::Remove(hash1, StorageTier::Device),
Ryan Olson's avatar
Ryan Olson committed
551
552
553
554
555
556
557
558
559
560
561
            "Expected Remove event"
        );

        // Second publish call (should do nothing as handles were taken)
        publisher.publish();
        assert!(rx.try_recv().is_err());

        // Drop publisher (should also do nothing)
        drop(publisher);
        assert!(rx.try_recv().is_err());
    }
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646

    #[tokio::test(flavor = "current_thread")]
    async fn test_same_sequence_in_different_tiers_emits_distinct_events() {
        let sequence = create_sequence();
        let block = sequence.blocks()[0].clone();
        let sequence_hash = block.sequence_hash();

        let (event_manager, mut rx) = MockEventManager::new();
        let global_registry = GlobalRegistry::default();

        let mut host_registry = BlockRegistry::new(
            event_manager.clone(),
            global_registry.clone(),
            Handle::current(),
            StorageTier::HostPinned,
        );
        let mut disk_registry = BlockRegistry::new(
            event_manager.clone(),
            global_registry,
            Handle::current(),
            StorageTier::Disk,
        );

        let mut host_state = BlockState::Reset;
        host_state.apply_token_block(block.clone()).unwrap();
        let host_publish = host_registry
            .register_block(&mut host_state)
            .unwrap()
            .unwrap();
        drop(host_publish);

        assert_eq!(
            rx.recv().await.unwrap(),
            vec![EventType::Register(sequence_hash, StorageTier::HostPinned)]
        );

        let mut disk_state = BlockState::Reset;
        disk_state.apply_token_block(block).unwrap();
        let disk_publish = disk_registry
            .register_block(&mut disk_state)
            .unwrap()
            .unwrap();
        drop(disk_publish);

        assert_eq!(
            rx.recv().await.unwrap(),
            vec![EventType::Register(sequence_hash, StorageTier::Disk)]
        );

        drop(host_state);
        assert_eq!(
            rx.recv().await.unwrap(),
            vec![EventType::Remove(sequence_hash, StorageTier::HostPinned)]
        );

        drop(disk_state);
        assert_eq!(
            rx.recv().await.unwrap(),
            vec![EventType::Remove(sequence_hash, StorageTier::Disk)]
        );
    }

    #[test]
    fn test_registration_handle_prefers_external_hashes_for_publication() {
        let mut sequence = create_sequence();
        sequence.sync_external_sequence_hashes(&[50_001, 50_002]);

        let release_manager = NullEventManager::new();
        let registration_handle = RegistrationHandle::from_token_block(
            &sequence.blocks()[1],
            release_manager,
            StorageTier::HostPinned,
        );

        assert_eq!(registration_handle.external_sequence_hash(), Some(50_002));
        assert_eq!(
            registration_handle.external_parent_sequence_hash(),
            Some(50_001)
        );
        assert_eq!(registration_handle.published_sequence_hash(), 50_002);
        assert_eq!(
            registration_handle.published_parent_sequence_hash(),
            Some(50_001)
        );
    }
Ryan Olson's avatar
Ryan Olson committed
647
}