instance.rs 46.5 KB
Newer Older
Ryan Olson's avatar
Ryan Olson committed
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
1001
1002
1003
1004
1005
1006
1007
1008
1009
1010
1011
1012
1013
1014
1015
1016
1017
1018
1019
1020
1021
1022
1023
1024
1025
1026
1027
1028
1029
1030
1031
1032
1033
1034
1035
1036
1037
1038
1039
1040
1041
1042
1043
1044
1045
1046
1047
1048
1049
1050
1051
1052
1053
1054
1055
1056
1057
1058
1059
1060
1061
1062
1063
1064
1065
1066
1067
1068
1069
1070
1071
1072
1073
1074
1075
1076
1077
1078
1079
1080
1081
1082
1083
1084
1085
1086
1087
1088
1089
1090
1091
1092
1093
1094
1095
1096
1097
1098
1099
1100
1101
1102
1103
1104
1105
1106
1107
1108
1109
1110
1111
1112
1113
1114
1115
1116
1117
1118
1119
1120
1121
1122
1123
1124
1125
1126
1127
1128
1129
1130
1131
1132
1133
1134
1135
1136
1137
1138
1139
1140
1141
1142
1143
1144
1145
1146
1147
1148
1149
1150
1151
1152
1153
1154
1155
1156
1157
1158
1159
1160
1161
1162
1163
1164
1165
1166
1167
1168
1169
1170
1171
1172
1173
1174
1175
1176
1177
1178
1179
1180
1181
1182
1183
1184
1185
1186
1187
1188
1189
1190
1191
1192
1193
1194
1195
1196
1197
1198
1199
1200
1201
1202
1203
1204
1205
1206
1207
1208
1209
1210
1211
1212
1213
1214
1215
1216
1217
1218
1219
1220
1221
1222
1223
1224
1225
1226
1227
1228
1229
1230
1231
1232
1233
1234
1235
1236
1237
1238
1239
1240
1241
1242
1243
1244
1245
1246
1247
1248
1249
1250
1251
1252
1253
1254
1255
1256
1257
1258
1259
1260
1261
1262
// SPDX-FileCopyrightText: Copyright (c) 2025-2026 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0

use std::collections::HashMap;

use ::velo::Messenger;
use anyhow::Result;
use dashmap::DashMap;
use tokio::sync::{Mutex, mpsc, watch};
use uuid::Uuid;

use std::sync::Arc;

use crate::{
    BlockId, G2, G3, InstanceId, SequenceHash, object::ObjectBlockOps, worker::RemoteDescriptor,
};
use kvbm_common::LogicalLayoutHandle;
use kvbm_logical::{
    blocks::{BlockRegistry, ImmutableBlock},
    manager::BlockManager,
};
use kvbm_physical::transfer::{TransferCompleteNotification, TransferOptions};

use kvbm_physical::manager::{LayoutHandle, SerializedLayout};

use super::{
    super::worker::Worker,
    super::worker::group::{ParallelWorkers, SpmdParallelWorkers},
    AsyncSessionResult,
    FindMatchesOptions,
    FindMatchesResult,
    Leader,
    OnboardingStatus,
    ReadyResult,
    // Legacy SessionHandle for deferred operations
    SessionHandle as LegacySessionHandle,
    SessionId,
    StagingMode,
    accessor::{BlockAccessor, PolicyContext},
    session::{
        BlockHolder, ControlRole, ControllableSessionOptions, ControllableSessionResult,
        InitiatorSession, MessageTransport, OnboardMessage, OnboardSessionTx, ResponderSession,
        ServerSession, ServerSessionHandle, ServerSessionOptions, SessionHandle, SessionMessage,
        SessionMessageTx, SessionPhase, create_server_session, session_handle_state_channel,
        session_message_channel,
    },
    velo::{ExportMetadataCallback, VeloLeaderService},
};

/// Primary leader implementation for the distributed KVBM system.
///
/// `InstanceLeader` coordinates block onboarding across local and remote
/// instances. It owns a G2 (host memory) `BlockManager` and an optional G3
/// (disk) `BlockManager`, a set of workers for executing physical transfers,
/// and a parallel worker abstraction for multi-rank RDMA operations.
///
/// Key responsibilities:
/// - **Block matching**: finding which requested sequence hashes are already
///   cached locally (via `BlockAccessor` policies).
/// - **Session management**: creating, attaching, and driving onboard sessions
///   between endpoint (source) and controller (destination) roles.
/// - **Remote connectivity**: exchanging serialized layout metadata with peer
///   instances so workers can perform RDMA transfers.
/// - **Velo RPC**: registering handlers via `VeloLeaderService` so remote
///   leaders can initiate sessions and exchange metadata.
#[derive(Clone)]
pub struct InstanceLeader {
    /// Nova instance for distributed communication.
    messenger: Arc<Messenger>,

    /// Block registry for deduplication.
    #[allow(dead_code)]
    pub(crate) registry: BlockRegistry,

    /// G2 (host memory) block manager (wrapped in Arc since BlockManager doesn't implement Clone).
    pub(crate) g2_manager: Arc<BlockManager<G2>>,

    /// Optional G3 (disk) block manager
    pub(crate) g3_manager: Option<Arc<BlockManager<G3>>>,

    /// Workers for executing transfers (at least 1 required).
    /// Multiple workers enable parallel transfers and redundancy.
    workers: Vec<Arc<dyn Worker>>,

    /// Parallel worker abstraction wrapping the workers.
    /// Used for RDMA transfers with proper handle mapping storage.
    parallel_worker: Option<Arc<dyn ParallelWorkers>>,

    /// Map of active sessions (session_id -> message channel).
    sessions: Arc<DashMap<SessionId, OnboardSessionTx>>,

    /// Cached worker metadata (avoids querying workers repeatedly).
    cached_worker_metadata: Option<Vec<SerializedLayout>>,

    /// Map of session states for holding blocks alive (RAII).
    session_states: Arc<DashMap<SessionId, SessionState>>,

    /// List of remote leader instance IDs (mutable for post-construction configuration).
    remote_leaders: Arc<std::sync::RwLock<Vec<InstanceId>>>,

    /// Message transport for session communication.
    transport: Arc<MessageTransport>,

    // ========================================================================
    // Unified Session Protocol
    // ========================================================================
    /// Map of session message receivers.
    /// Used by SessionHandle/SessionEndpoint/ControllableSession.
    session_sessions: Arc<DashMap<SessionId, SessionMessageTx>>,

    // ========================================================================
    // G4/Object Storage
    // ========================================================================
    /// Object storage client for G4 search and load operations.
    /// Leader calls has_blocks on S3 directly, coordinates workers for get_blocks.
    object_client: Option<Arc<dyn ObjectBlockOps>>,
}

/// Builder for InstanceLeader.
#[derive(Default)]
pub struct InstanceLeaderBuilder {
    messenger: Option<Arc<Messenger>>,
    registry: Option<BlockRegistry>,
    g2_manager: Option<Arc<BlockManager<G2>>>,
    g3_manager: Option<Arc<BlockManager<G3>>>,
    workers: Vec<Arc<dyn Worker>>,
    sessions: Option<Arc<DashMap<SessionId, OnboardSessionTx>>>,
    remote_leaders: Option<Vec<InstanceId>>,
    cached_worker_metadata: Option<Vec<SerializedLayout>>,
    object_client: Option<Arc<dyn ObjectBlockOps>>,
}

impl InstanceLeaderBuilder {
    /// Initialize builder with components from KvbmRuntime.
    ///
    /// This extracts Nova from the runtime. Use this when the runtime
    /// has already been constructed and you want the leader to share
    /// the same Nova instance for distributed communication.
    ///
    /// # Example
    /// ```ignore
    /// let runtime = KvbmRuntime::from_env_leader().await?;
    /// let leader = InstanceLeaderBuilder::default()
    ///     .with_runtime(&runtime)
    ///     .g2_manager(g2_manager)
    ///     .build()?;
    /// ```
    pub fn with_runtime(self, runtime: &crate::KvbmRuntime) -> Self {
        self.messenger(runtime.messenger().clone())
    }

    pub fn messenger(mut self, messenger: Arc<Messenger>) -> Self {
        self.messenger = Some(messenger);
        self
    }

    pub fn registry(mut self, registry: BlockRegistry) -> Self {
        self.registry = Some(registry);
        self
    }

    pub fn with_g2_manager(mut self, manager: Option<BlockManager<G2>>) -> Self {
        self.g2_manager = manager.map(Arc::new);
        self
    }

    pub fn with_g3_manager(mut self, manager: Option<BlockManager<G3>>) -> Self {
        self.g3_manager = manager.map(Arc::new);
        self
    }

    pub fn g2_manager(mut self, manager: Arc<BlockManager<G2>>) -> Self {
        self.g2_manager = Some(manager);
        self
    }

    pub fn g3_manager(mut self, manager: Arc<BlockManager<G3>>) -> Self {
        self.g3_manager = Some(manager);
        self
    }

    /// Add a single worker (convenience method).
    pub fn worker(mut self, worker: Arc<dyn Worker>) -> Self {
        self.workers.push(worker);
        self
    }

    /// Set all workers at once.
    pub fn workers(mut self, workers: Vec<Arc<dyn Worker>>) -> Self {
        self.workers = workers;
        self
    }

    pub fn remote_leaders(mut self, leaders: Vec<InstanceId>) -> Self {
        self.remote_leaders = Some(leaders);
        self
    }

    /// Cache worker metadata upfront to avoid querying workers later.
    ///
    /// This is useful when workers have already exported metadata during initialization
    /// (e.g., in the connector pattern where workers return metadata in their init response).
    pub fn with_cached_worker_metadata(mut self, metadata: Vec<SerializedLayout>) -> Self {
        self.cached_worker_metadata = Some(metadata);
        self
    }

    /// Set the object storage client for G4 search and load operations.
    ///
    /// The leader uses this client to:
    /// - Query S3 for block presence via `has_blocks`
    /// - Coordinate workers to load blocks from S3 via `get_blocks`
    pub fn object_client(mut self, client: Arc<dyn ObjectBlockOps>) -> Self {
        self.object_client = Some(client);
        self
    }

    pub fn build(self) -> Result<InstanceLeader> {
        let messenger = self
            .messenger
            .ok_or_else(|| anyhow::anyhow!("Nova instance required"))?;
        let transport = Arc::new(MessageTransport::velo(messenger.clone()));

        // Create event system for notification aggregation
        let events = Arc::new(messenger.event_manager());

        // Get current tokio runtime handle
        let runtime = tokio::runtime::Handle::current();

        // // Validate at least one worker
        // if self.workers.is_empty() {
        //     anyhow::bail!("At least one worker required");
        // }

        // todo: we will need a common builder pattern for creating "general" parallel workers
        // - we could also use an enum and match as the number of types will be limited

        // Create parallel worker if workers are provided
        let parallel_worker: Option<Arc<dyn ParallelWorkers>> = if !self.workers.is_empty() {
            Some(Arc::new(SpmdParallelWorkers::new(
                self.workers.to_vec(),
                events.clone(),
                runtime.clone(),
            )))
        } else {
            None
        };

        Ok(InstanceLeader {
            messenger,
            registry: self
                .registry
                .ok_or_else(|| anyhow::anyhow!("block registry required"))?,
            g2_manager: self
                .g2_manager
                .ok_or_else(|| anyhow::anyhow!("g2_manager required"))?,
            g3_manager: self.g3_manager,
            workers: self.workers,
            parallel_worker,
            cached_worker_metadata: self.cached_worker_metadata,
            sessions: self.sessions.unwrap_or_else(|| Arc::new(DashMap::new())),
            session_states: Arc::new(DashMap::new()),
            remote_leaders: Arc::new(std::sync::RwLock::new(
                self.remote_leaders.unwrap_or_default(),
            )),
            transport,
            session_sessions: Arc::new(DashMap::new()),
            object_client: self.object_client,
        })
    }
}

/// Internal session state for holding matched blocks.
#[allow(dead_code)] // Used for RAII block lifetime management
struct SessionState {
    session_id: SessionId,
    matched_g2_blocks: Vec<ImmutableBlock<G2>>,
    matched_g3_blocks: Vec<ImmutableBlock<G3>>,
    status_tx: watch::Sender<OnboardingStatus>,
}

/// Result of scanning for blocks across tiers.
///
/// Unlike `FindMatchesResult`, this scans all given hashes without stopping on first miss.
/// Returns blocks found in each tier along with their sorted positions.
pub struct ScanBlocksResult {
    /// Blocks found in G2 (host memory).
    pub g2_blocks: HashMap<SequenceHash, ImmutableBlock<G2>>,

    /// Blocks found in G3 (disk).
    pub g3_blocks: HashMap<SequenceHash, ImmutableBlock<G3>>,

    /// All found blocks sorted by position (lowest to highest).
    /// Each entry indicates which tier (G2/G3) the block was found in.
    pub sorted_matches: Vec<(SequenceHash, LogicalLayoutHandle)>,
}

impl InstanceLeader {
    /// Get a reference to the G2 BlockManager.
    pub fn g2_manager(&self) -> &Arc<BlockManager<G2>> {
        &self.g2_manager
    }

    /// Get a reference to the optional G3 BlockManager.
    pub fn g3_manager(&self) -> Option<&Arc<BlockManager<G3>>> {
        self.g3_manager.as_ref()
    }

    /// Get the block registry.
    pub fn registry(&self) -> &BlockRegistry {
        &self.registry
    }

    /// Get a reference to the Nova instance.
    ///
    /// This provides access to the Nova distributed system for features
    /// like event coordination and cross-instance communication.
    pub fn messenger(&self) -> &Arc<Messenger> {
        &self.messenger
    }

    /// Get the tokio runtime handle from Nova.
    ///
    /// This handle should be used for spawning background tasks that need to
    /// run on the KVBM runtime's executor (e.g., offload engine pipelines).
    pub fn runtime(&self) -> tokio::runtime::Handle {
        self.messenger.runtime().clone()
    }

    /// Check if a parallel_worker is configured.
    ///
    /// The parallel_worker is required for local transfer operations
    /// (e.g., offloading blocks between tiers).
    pub fn has_parallel_worker(&self) -> bool {
        self.parallel_worker.is_some()
    }

    /// Get the parallel worker for distributed operations.
    ///
    /// The parallel worker fans out operations to all workers and aggregates results.
    /// It implements `ObjectBlockOps` for coordinated object storage uploads.
    pub fn parallel_worker(&self) -> Option<Arc<dyn ParallelWorkers>> {
        self.parallel_worker.clone()
    }

    /// Get the object storage client for G4 operations.
    ///
    /// Returns `Some` if object storage is configured, `None` otherwise.
    /// The client is used by InitiatorSession for G4 parallel search.
    pub fn object_client(&self) -> Option<Arc<dyn ObjectBlockOps>> {
        self.object_client.clone()
    }

    /// Add a remote leader to the search list.
    ///
    /// Remote leaders are queried during `find_matches_with_options` when
    /// `search_remote == true`. This method allows adding remote leaders
    /// after construction (e.g., when instance IDs are only known after
    /// cluster setup).
    pub fn add_remote_leader(&self, instance_id: InstanceId) {
        let mut remote_leaders = self.remote_leaders.write().unwrap();
        if !remote_leaders.contains(&instance_id) {
            remote_leaders.push(instance_id);
        }
    }

    /// Set all remote leaders at once.
    pub fn set_remote_leaders(&self, instance_ids: Vec<InstanceId>) {
        let mut remote_leaders = self.remote_leaders.write().unwrap();
        *remote_leaders = instance_ids;
    }

    /// Get the list of remote leader instance IDs.
    pub fn remote_leaders(&self) -> Vec<InstanceId> {
        self.remote_leaders.read().unwrap().clone()
    }

    /// Scan for all blocks matching any of the given sequence hashes.
    ///
    /// Unlike `find_matches`, this:
    /// - Does NOT stop on first miss
    /// - Returns blocks from both G2 and G3 tiers separately
    /// - Acquires blocks from pools (caller owns until dropped via RAII)
    /// - Returns `sorted_matches` ordered by `SequenceHash::position()`
    ///
    /// # Arguments
    /// * `sequence_hashes` - Hashes to scan for
    /// * `touch` - Whether to update frequency tracking (for MultiLRU eviction policy)
    ///
    /// # Algorithm
    /// 1. Scan G2 manager for candidates
    /// 2. Scan G3 manager for remaining candidates
    /// 3. Build sorted_matches from both, sorted by position (lowest to highest)
    pub fn scan_blocks(&self, sequence_hashes: &[SequenceHash], touch: bool) -> ScanBlocksResult {
        // Step 1: Scan G2 for all candidates
        let g2_blocks = self.g2_manager.scan_matches(sequence_hashes, touch);

        // Step 2: Find remaining hashes not in G2
        let remaining: Vec<SequenceHash> = sequence_hashes
            .iter()
            .filter(|h| !g2_blocks.contains_key(h))
            .copied()
            .collect();

        // Step 3: Scan G3 for remaining (if G3 exists)
        let g3_blocks = if let Some(ref g3_manager) = self.g3_manager {
            if !remaining.is_empty() {
                g3_manager.scan_matches(&remaining, touch)
            } else {
                HashMap::new()
            }
        } else {
            HashMap::new()
        };

        // Step 4: Build sorted_matches from both tiers
        let mut sorted_matches: Vec<(SequenceHash, LogicalLayoutHandle)> =
            Vec::with_capacity(g2_blocks.len() + g3_blocks.len());

        // Add G2 matches
        for hash in g2_blocks.keys() {
            sorted_matches.push((*hash, LogicalLayoutHandle::G2));
        }

        // Add G3 matches
        for hash in g3_blocks.keys() {
            sorted_matches.push((*hash, LogicalLayoutHandle::G3));
        }

        // Sort by SequenceHash position (lowest to highest)
        sorted_matches.sort_by_key(|(hash, _)| hash.position());

        ScanBlocksResult {
            g2_blocks,
            g3_blocks,
            sorted_matches,
        }
    }

    /// Scan blocks using a custom policy that controls iteration and yields results.
    ///
    /// This provides maximum flexibility for implementing custom scanning strategies.
    /// The policy receives access to a `BlockAccessor` for acquiring blocks and a
    /// `PolicyContext` for yielding results incrementally.
    ///
    /// # Arguments
    /// * `hashes` - Sequence hashes to scan
    /// * `touch` - Whether to update frequency tracking on block access
    /// * `policy` - Function that implements the scanning strategy
    ///
    /// # Design
    ///
    /// The accessor does NOT hold locks between calls. Each `.find()` call is
    /// independent. This enables:
    /// - Custom iteration patterns (sorted, BTree scan, binary search, etc.)
    /// - Yielding results incrementally (e.g., contiguous subsequences)
    /// - Future parallel execution (accessor is Send + Sync)
    ///
    /// # Example: Simple linear scan
    /// ```ignore
    /// let blocks = leader.scan_with_policy(&hashes, true, |hashes, ctx| {
    ///     for hash in hashes {
    ///         if let Some(block) = ctx.accessor().find(*hash) {
    ///             ctx.yield_item(block);
    ///         }
    ///     }
    /// });
    /// ```
    ///
    /// # Example: Find contiguous subsequences
    /// ```ignore
    /// let runs: Vec<Vec<TieredBlock>> = leader.scan_with_policy(&hashes, true, |hashes, ctx| {
    ///     let mut run = Vec::new();
    ///     let mut last_pos: Option<u64> = None;
    ///
    ///     for hash in hashes.iter().sorted_by_key(|h| h.position()) {
    ///         if let Some(block) = ctx.accessor().find(*hash) {
    ///             let pos = block.position();
    ///             if last_pos.map_or(true, |p| pos == p + 1) {
    ///                 run.push(block);
    ///             } else {
    ///                 if !run.is_empty() { ctx.yield_item(std::mem::take(&mut run)); }
    ///                 run.push(block);
    ///             }
    ///             last_pos = Some(pos);
    ///         } else if !run.is_empty() {
    ///             ctx.yield_item(std::mem::take(&mut run));
    ///             last_pos = None;
    ///         }
    ///     }
    ///     if !run.is_empty() { ctx.yield_item(run); }
    /// });
    /// ```
    pub fn scan_with_policy<F, T>(&self, hashes: &[SequenceHash], touch: bool, policy: F) -> Vec<T>
    where
        F: FnOnce(&[SequenceHash], &mut PolicyContext<T>),
    {
        let accessor = BlockAccessor::new(self, touch);
        let mut ctx = PolicyContext {
            accessor,
            results: Vec::new(),
        };
        policy(hashes, &mut ctx);
        ctx.results
    }

    pub fn builder() -> InstanceLeaderBuilder {
        InstanceLeaderBuilder::default()
    }

    /// Register Nova handlers for leader-to-leader communication.
    ///
    /// This must be called after construction to enable distributed onboarding.
    pub fn register_handlers(&self) -> Result<()> {
        let instance_id = self.messenger.instance_id();
        let g2_manager = self.g2_manager.clone();
        let g3_manager = self.g3_manager.clone();
        let parallel_worker = self.parallel_worker.clone();
        let transport = self.transport.clone();
        let sessions = self.sessions.clone();

        let spawn_responder = move |msg: OnboardMessage| -> Result<()> {
            if let OnboardMessage::CreateSession {
                requester,
                session_id,
                sequence_hashes,
            } = msg
            {
                let (tx, rx) = mpsc::channel(100);
                sessions.insert(session_id, tx);

                let session = ResponderSession::new(
                    session_id,
                    instance_id,
                    requester,
                    g2_manager.clone(),
                    g3_manager.clone(),
                    parallel_worker.clone(),
                    transport.clone(),
                );

                tokio::spawn(async move {
                    if let Err(e) = session.run(rx, sequence_hashes).await {
                        tracing::warn!(error = %e, "ResponderSession error");
                    }
                });

                Ok(())
            } else {
                anyhow::bail!("spawn_responder called with non-CreateSession message")
            }
        };

        // Create export_metadata callback if we have workers or cached metadata
        let export_metadata_callback: Option<ExportMetadataCallback> =
            if !self.workers.is_empty() || self.cached_worker_metadata.is_some() {
                let workers = self.workers.clone();
                let cached_metadata = self.cached_worker_metadata.clone();
                Some(Arc::new(move || {
                    let workers = workers.clone();
                    let cached_metadata = cached_metadata.clone();
                    Box::pin(async move {
                        // Return cached metadata if available
                        if let Some(cached) = cached_metadata {
                            return Ok(cached);
                        }
                        // Otherwise, query workers
                        let mut metadata = Vec::with_capacity(workers.len());
                        for worker in &workers {
                            let serialized = worker.export_metadata()?.await?;
                            metadata.push(serialized);
                        }
                        Ok(metadata)
                    })
                }))
            } else {
                None
            };

        let mut service = VeloLeaderService::new(self.messenger.clone(), self.sessions.clone())
            .with_spawn_responder(spawn_responder)
            .with_session_sessions(self.session_sessions.clone());

        if let Some(callback) = export_metadata_callback {
            service = service.with_export_metadata(callback);
        }

        service.register_handlers()?;

        Ok(())
    }

    /// Store session state (held blocks and status channel).
    ///
    /// Blocks are kept alive via RAII until the session is removed from storage.
    fn store_session_state(&self, state: SessionState) {
        self.session_states.insert(state.session_id, state);
    }

    /// Release a completed session, dropping any held blocks.
    ///
    /// This is optional - sessions will naturally be cleaned up when the InstanceLeader
    /// is dropped. Call this explicitly if you need to release blocks earlier.
    pub fn release_session(&self, session_id: SessionId) {
        self.session_states.remove(&session_id);
        self.sessions.remove(&session_id);
        self.session_sessions.remove(&session_id);
    }

    // ========================================================================
    // Inverted Control Pattern (Prefill-Decode) Methods
    // ========================================================================

    /// Create a controllable session for local blocks.
    ///
    /// This is the "Decode side" of the inverted control pattern:
    /// 1. Search local G2 and G3 for matches
    /// 2. Create a ControllableSession that holds the blocks
    /// 3. Return session_id to be sent to Prefill out-of-band
    ///
    /// By default, G3→G2 staging starts immediately (auto_stage=true).
    pub fn create_controllable_session(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<ControllableSessionResult> {
        self.create_controllable_session_with_options(
            sequence_hashes,
            ControllableSessionOptions::default(),
        )
    }

    /// Create a controllable session with custom options.
    ///
    /// Use this when you need to control auto-staging behavior.
    pub fn create_controllable_session_with_options(
        &self,
        sequence_hashes: &[SequenceHash],
        options: ControllableSessionOptions,
    ) -> Result<ControllableSessionResult> {
        let session_id = SessionId::from(Uuid::new_v4());

        // Local search only
        let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes);

        // Find remaining hashes not in G2
        let remaining_hashes: Vec<_> = sequence_hashes
            .iter()
            .filter(|h| !matched_g2_blocks.iter().any(|b| b.sequence_hash() == **h))
            .copied()
            .collect();

        // Search G3 for remaining hashes
        let matched_g3_blocks = if let Some(ref g3_manager) = self.g3_manager {
            g3_manager.match_blocks(&remaining_hashes)
        } else {
            Vec::new()
        };

        let local_g2_count = matched_g2_blocks.len();
        let local_g3_count = matched_g3_blocks.len();

        // Create session channel using unified SessionMessage protocol
        let (tx, rx) = session_message_channel(100);
        self.session_sessions.insert(session_id, tx);

        // Collect G2 layout handles from workers for round-robin block allocation
        let worker_g2_handles: Vec<LayoutHandle> = self
            .parallel_worker
            .as_ref()
            .map(|pw| pw.workers().iter().filter_map(|w| w.g2_handle()).collect())
            .unwrap_or_default();

        let endpoint = super::session::SessionEndpoint::new(
            session_id,
            self.messenger.instance_id(),
            self.transport.clone(),
            rx,
        );

        let (cmd_tx, cmd_rx) = mpsc::channel(16);

        let session = ServerSession::new_with_staging(
            endpoint,
            BlockHolder::new(matched_g2_blocks),
            BlockHolder::new(matched_g3_blocks),
            worker_g2_handles,
            self.g2_manager.clone(),
            self.parallel_worker.clone(),
            cmd_rx,
            ServerSessionOptions {
                auto_stage: options.auto_stage,
            },
        );

        // Keep handle alive to prevent cmd channel from closing
        let _handle = ServerSessionHandle::new(session_id, self.messenger.instance_id(), cmd_tx);

        // Spawn session task
        let session_sessions = self.session_sessions.clone();
        tokio::spawn(async move {
            let _handle = _handle; // move handle into task to keep cmd channel open
            if let Err(e) = session.run().await {
                tracing::warn!(error = %e, "ServerSession error");
            }
            // Clean up when session completes
            session_sessions.remove(&session_id);
        });

        Ok(ControllableSessionResult {
            session_id,
            local_g2_count,
            local_g3_count,
        })
    }

    // ========================================================================
    // Unified Session Protocol
    // ========================================================================

    /// Attach to a remote session.
    /// Returns a `SessionHandle` that uses `SessionMessage` for communication.
    ///
    /// # Arguments
    /// * `remote_instance` - The instance hosting the session
    /// * `session_id` - The session to attach to
    ///
    /// # Example
    /// ```ignore
    /// let handle = leader.attach_session(remote_id, session_id).await?;
    /// let state = handle.wait_for_ready().await?;
    /// handle.trigger_staging().await?;
    /// ```
    pub async fn attach_session(
        &self,
        remote_instance: InstanceId,
        session_id: SessionId,
    ) -> Result<SessionHandle> {
        // Create local channel for receiving state updates
        let (state_tx, state_rx) = session_handle_state_channel();

        // Register handler for this session's messages
        let (msg_tx, msg_rx) = session_message_channel(100);
        self.session_sessions.insert(session_id, msg_tx);

        // Spawn receiver task to update state
        tokio::spawn(Self::run_session_receiver(msg_rx, state_tx));

        // Send attach message using new protocol
        let msg = SessionMessage::Attach {
            peer: self.messenger.instance_id(),
            session_id,
            as_role: ControlRole::Controller,
        };
        self.transport.send_session(remote_instance, msg).await?;

        let mut handle = SessionHandle::new(
            session_id,
            remote_instance,
            self.messenger.instance_id(),
            self.transport.clone(),
            state_rx,
        );

        // Add RDMA support if parallel worker is configured
        if let Some(parallel_worker) = &self.parallel_worker {
            handle = handle.with_rdma_support(parallel_worker.clone());
        }

        Ok(handle)
    }

    // ========================================================================
    // Endpoint Session Creation (Server-Side)
    // ========================================================================

    /// Create an endpoint session that a remote peer can attach to.
    ///
    /// This searches local G2/G3 for blocks matching the given sequence hashes
    /// and creates a session that exposes them for remote RDMA pull.
    ///
    /// Returns `(session_id, handle)` where:
    /// - `session_id` - Send to remote peer for attachment
    /// - `handle` - Use to control the session (send layer notifications, close)
    ///
    /// # Example
    /// ```ignore
    /// // Create session for sequence hashes
    /// let (session_id, handle) = leader.create_endpoint_session(&hashes)?;
    ///
    /// // Send session_id to remote peer out-of-band
    /// // Remote attaches via: remote_leader.attach_session(local_id, session_id)
    ///
    /// // For layerwise transfer, notify when layers are ready
    /// handle.notify_layers_ready(0..1).await?;
    /// ```
    pub fn create_endpoint_session(
        &self,
        sequence_hashes: &[SequenceHash],
    ) -> Result<(SessionId, ServerSessionHandle)> {
        let session_id = SessionId::from(uuid::Uuid::new_v4());

        // Local search
        let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes);

        // Collect layout handles from workers
        // Note: For single-worker setups, all blocks use the same handle
        // For multi-worker (SPMD), each block gets the handle from its assigned worker
        let worker_g2_handles: Vec<LayoutHandle> = self
            .parallel_worker
            .as_ref()
            .map(|pw| pw.workers().iter().filter_map(|w| w.g2_handle()).collect())
            .unwrap_or_default();

        // Assign layout handle to each matched block
        // For now, use the first worker's handle for all blocks (single-worker assumption)
        // TODO: For SPMD, map blocks to worker handles based on block assignment
        let layout_handle = worker_g2_handles
            .first()
            .copied()
            .ok_or_else(|| anyhow::anyhow!("No G2 layout handle available from workers"))?;
        let layout_handles: Vec<LayoutHandle> = vec![layout_handle; matched_g2_blocks.len()];

        // Get sequence hashes from matched blocks
        let matched_hashes: Vec<SequenceHash> = matched_g2_blocks
            .iter()
            .map(|b| b.sequence_hash())
            .collect();

        // Create the session channel
        let (msg_tx, msg_rx) = session_message_channel(100);
        self.session_sessions.insert(session_id, msg_tx);

        // Create BlockHolder from matched blocks
        let block_holder = BlockHolder::new(matched_g2_blocks);

        // Create the session and handle
        let (session, handle) = create_server_session(
            session_id,
            self.messenger.instance_id(),
            block_holder,
            layout_handles,
            matched_hashes,
            self.transport.clone(),
            msg_rx,
        );

        // Spawn the session task
        let session_sessions = self.session_sessions.clone();
        tokio::spawn(async move {
            if let Err(e) = session.run().await {
                tracing::warn!(error = %e, "ServerSession error");
            }
            // Clean up when session completes
            session_sessions.remove(&session_id);
        });

        Ok((session_id, handle))
    }

    /// Create an endpoint session for specific pre-allocated blocks.
    ///
    /// Unlike `create_endpoint_session`, this doesn't search - it uses the
    /// provided blocks directly. Useful when the caller already has blocks
    /// to expose (e.g., after prefill computation).
    ///
    /// # Arguments
    /// * `blocks` - Blocks to expose for RDMA pull
    /// * `sequence_hashes` - Sequence hashes for the blocks (must match block count)
    /// * `layout_handles` - Layout handles for the blocks (must match block count)
    ///
    /// # Example
    /// ```ignore
    /// // After prefill computation, expose blocks for Decode to pull
    /// let (session_id, handle) = leader.create_endpoint_session_for_blocks(
    ///     prefill_blocks,
    ///     &hashes,
    ///     &layout_handles,
    /// )?;
    /// ```
    pub fn create_endpoint_session_for_blocks(
        &self,
        blocks: BlockHolder<G2>,
        sequence_hashes: &[SequenceHash],
        layout_handles: &[LayoutHandle],
    ) -> Result<(SessionId, ServerSessionHandle)> {
        let session_id = SessionId::from(uuid::Uuid::new_v4());

        // Create the session channel
        let (msg_tx, msg_rx) = session_message_channel(100);
        self.session_sessions.insert(session_id, msg_tx);

        // Create the session and handle
        let (session, handle) = create_server_session(
            session_id,
            self.messenger.instance_id(),
            blocks,
            layout_handles.to_vec(),
            sequence_hashes.to_vec(),
            self.transport.clone(),
            msg_rx,
        );

        // Spawn the session task
        let session_sessions = self.session_sessions.clone();
        tokio::spawn(async move {
            if let Err(e) = session.run().await {
                tracing::warn!(error = %e, "ServerSession error");
            }
            // Clean up when session completes
            session_sessions.remove(&session_id);
        });

        Ok((session_id, handle))
    }

    /// Internal: Process incoming SessionMessage for a session.
    async fn run_session_receiver(
        mut rx: mpsc::Receiver<SessionMessage>,
        state_tx: super::session::SessionHandleStateTx,
    ) {
        while let Some(msg) = rx.recv().await {
            match msg {
                SessionMessage::StateResponse { state, .. } => {
                    state_tx.update(state);
                }
                SessionMessage::BlocksStaged {
                    staged_blocks,
                    remaining,
                    layer_range,
                    ..
                } => {
                    state_tx.add_staged_blocks(staged_blocks, remaining, layer_range);
                }
                SessionMessage::Error { message, .. } => {
                    tracing::warn!(%message, "Session error");
                    state_tx.set_failed();
                    break;
                }
                SessionMessage::Close { .. } => {
                    state_tx.set_phase(SessionPhase::Complete);
                    break;
                }
                _ => {
                    // Ignore control commands (sent by controller, not received)
                }
            }
        }
    }

    /// Get the session sessions map (for Nova handler registration).
    #[expect(dead_code)]
    pub(crate) fn session_sessions(&self) -> Arc<DashMap<SessionId, SessionMessageTx>> {
        self.session_sessions.clone()
    }

    // ========================================================================
    // RDMA Metadata Management
    // These methods handle layout metadata export/import for remote RDMA transfers.
    // ========================================================================

    /// Check if metadata for a remote instance has been loaded.
    ///
    /// Returns true if `import_remote_metadata` has been successfully called
    /// for the given instance.
    pub fn has_remote_metadata(&self, instance: InstanceId) -> bool {
        self.parallel_worker
            .as_ref()
            .map(|pw| pw.has_remote_metadata(instance))
            .unwrap_or(false)
    }

    /// Get the number of workers attached to this leader.
    pub fn worker_count(&self) -> usize {
        self.workers.len()
    }

    /// Export metadata from all workers.
    ///
    /// Returns a `Vec<SerializedLayout>` where each element corresponds to a worker
    /// in rank order. This metadata can be sent to remote instances to enable
    /// RDMA transfers.
    ///
    /// # Returns
    /// Vector of serialized layouts, one per worker
    pub async fn export_worker_metadata(&self) -> Result<Vec<SerializedLayout>> {
        // Return cached metadata if available
        if let Some(cached) = &self.cached_worker_metadata {
            return Ok(cached.clone());
        }

        // Otherwise, query workers
        let mut metadata = Vec::with_capacity(self.workers.len());

        for worker in &self.workers {
            let serialized = worker.export_metadata()?.await?;
            metadata.push(serialized);
        }

        Ok(metadata)
    }

    /// Import metadata from a remote instance's workers.
    ///
    /// This imports layout metadata from a remote instance, enabling RDMA transfers
    /// to pull data from that instance. Metadata is imported rank-by-rank:
    /// - local worker 0 imports remote worker 0's metadata
    /// - local worker 1 imports remote worker 1's metadata
    /// - etc.
    ///
    /// # Arguments
    /// * `remote_instance` - The instance ID of the remote leader
    /// * `metadata` - Vector of SerializedLayout from remote workers (one per worker)
    ///
    /// # Errors
    /// Returns an error if:
    /// - No parallel worker configured
    /// - Metadata was already imported for this instance
    /// - Worker count mismatch between local and remote
    /// - Individual worker metadata import fails
    pub async fn import_remote_metadata(
        &self,
        remote_instance: InstanceId,
        metadata: Vec<SerializedLayout>,
    ) -> Result<()> {
        let parallel_worker = self
            .parallel_worker
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?;

        // Check if already loaded
        if parallel_worker.has_remote_metadata(remote_instance) {
            anyhow::bail!("Metadata already imported for instance {}", remote_instance);
        }

        // Connect to remote - this imports metadata and stores handle mappings
        parallel_worker
            .connect_remote(remote_instance, metadata)?
            .await?;

        Ok(())
    }

    // ========================================================================
    // Private Worker Mirror Methods
    // These methods execute operations across all workers and aggregate results.
    // ========================================================================

    /// Execute local transfer across all workers, returning aggregated notification.
    ///
    /// Delegates to the parallel_worker which fans out to all workers and
    /// aggregates their notifications into a single composite notification.
    #[allow(dead_code)]
    pub(crate) fn execute_local_transfer(
        &self,
        src: LogicalLayoutHandle,
        dst: LogicalLayoutHandle,
        src_block_ids: Vec<BlockId>,
        dst_block_ids: Vec<BlockId>,
        options: TransferOptions,
    ) -> Result<TransferCompleteNotification> {
        let parallel_worker = self
            .parallel_worker
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?;

        parallel_worker.execute_local_transfer(
            src,
            dst,
            Arc::from(src_block_ids),
            Arc::from(dst_block_ids),
            options,
        )
    }

    /// Execute remote onboard across all workers, returning aggregated notification.
    ///
    /// Delegates to the parallel_worker which fans out to all workers and
    /// aggregates their notifications into a single composite notification.
    #[allow(dead_code)]
    pub(crate) fn execute_remote_onboard(
        &self,
        src: RemoteDescriptor,
        dst: LogicalLayoutHandle,
        dst_block_ids: Vec<BlockId>,
        options: TransferOptions,
    ) -> Result<TransferCompleteNotification> {
        let parallel_worker = self
            .parallel_worker
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?;

        parallel_worker.execute_remote_onboard(src, dst, Arc::from(dst_block_ids), options)
    }

    /// Execute remote offload across all workers, returning aggregated notification.
    ///
    /// Delegates to the parallel_worker which fans out to all workers and
    /// aggregates their notifications into a single composite notification.
    #[allow(dead_code)]
    pub(crate) fn execute_remote_offload(
        &self,
        src: LogicalLayoutHandle,
        dst: RemoteDescriptor,
        src_block_ids: Vec<BlockId>,
        options: TransferOptions,
    ) -> Result<TransferCompleteNotification> {
        let parallel_worker = self
            .parallel_worker
            .as_ref()
            .ok_or_else(|| anyhow::anyhow!("No parallel worker configured"))?;

        parallel_worker.execute_remote_offload(src, Arc::from(src_block_ids), dst, options)
    }
}

impl Leader for InstanceLeader {
    fn find_matches_with_options(
        &self,
        sequence_hashes: &[SequenceHash],
        options: FindMatchesOptions,
    ) -> Result<FindMatchesResult> {
        // Search G2 (host memory) for matches
        // Uses match_blocks which stops at first miss (implements "first hole" policy).
        // This ensures we only find contiguous blocks from the start of the sequence.
        // For distributed search, remote instances use scan_matches for broad coverage,
        // then first-hole filtering is applied in InitiatorSession after aggregation.

        // todo: add explicit timing tracing here
        // let start_time = Instant::now();
        let matched_g2_blocks = self.g2_manager.match_blocks(sequence_hashes);
        //let g2_search_time = Instant::now().duration_since(start_time);

        // Search G3 (disk) for remaining hashes if G3 is available
        let remaining_hashes: Vec<_> = sequence_hashes
            .iter()
            .filter(|h| !matched_g2_blocks.iter().any(|b| b.sequence_hash() == **h))
            .copied()
            .collect();

        let matched_g3_blocks = if let Some(ref g3_manager) = self.g3_manager {
            // Uses match_blocks on remaining hashes (those not found in G2).
            // Since G2 already applied first-hole policy, G3 search continues from where G2 stopped.
            g3_manager.match_blocks(&remaining_hashes)
        } else {
            Vec::new()
        };

        // Determine if we can return immediately (Ready) or need async session
        // Ready if:
        //   - g3 blocks is empty
        //   - AND NOT (search_remote AND has_remote_leaders)
        //   - AND NOT (search_remote AND has_object_client)
        //
        // AsyncSession (is_ready=false) if:
        //   - g3 is not empty, or
        //   - search_remote is true AND (has_remote_leaders OR has_object_client)
        let has_remote_leaders = !self.remote_leaders.read().unwrap().is_empty();
        let has_object_client = self.object_client.is_some();
        let needs_remote_search =
            options.search_remote && (has_remote_leaders || has_object_client);
        let is_ready = matched_g3_blocks.is_empty() && !needs_remote_search;

        if is_ready {
            // No session needed - blocks owned directly by ReadyResult (RAII)
            return Ok(FindMatchesResult::Ready(ReadyResult::new(
                matched_g2_blocks,
            )));
        }

        // AsyncSession path: G3 blocks found or remote search enabled
        let session_id = SessionId::from(Uuid::new_v4());
        let local_g2_count = matched_g2_blocks.len();
        let local_g3_count = matched_g3_blocks.len();

        // AsyncSession: staging locally and/or remote searching
        let (status_tx, status_rx) = watch::channel(OnboardingStatus::Searching);
        let all_g2_blocks = Arc::new(Mutex::new(None));

        // Store session state to keep blocks alive
        let state = SessionState {
            session_id,
            matched_g2_blocks,
            matched_g3_blocks,
            status_tx: status_tx.clone(),
        };
        self.store_session_state(state);

        // If no remote search, handle local-only staging
        if !options.search_remote {
            // Local-only staging (Prepare or Full mode)
            // TODO: Implement local G3→G2 staging
            let total_matched = local_g2_count + local_g3_count;
            status_tx
                .send(OnboardingStatus::Complete {
                    matched_blocks: total_matched,
                })
                .ok();

            return Ok(FindMatchesResult::AsyncSession(AsyncSessionResult::new(
                session_id,
                status_rx,
                all_g2_blocks,
                None, // No session handle for local-only staging (yet)
            )));
        }

        // Remote search path
        let (tx, rx) = mpsc::channel(100);
        self.sessions.insert(session_id, tx);

        // Create control channel for Hold/Prepare modes
        let (session_handle, control_rx) = if matches!(
            options.staging_mode,
            StagingMode::Hold | StagingMode::Prepare
        ) {
            let (control_tx, control_rx) = mpsc::channel(10);
            let handle = LegacySessionHandle::new(session_id, options.staging_mode, control_tx);
            (Some(handle), Some(control_rx))
        } else {
            (None, None)
        };

        let session = InitiatorSession::new(
            session_id,
            self.messenger.instance_id(),
            options.staging_mode,
            self.g2_manager.clone(),
            self.g3_manager.clone(),
            self.parallel_worker.clone(),
            self.transport.clone(),
            status_tx.clone(),
            all_g2_blocks.clone(),
            control_rx.unwrap_or_else(|| {
                let (_, rx) = mpsc::channel(1);
                rx
            }),
            self.object_client.clone(),
        );

        let remote_leaders = self.remote_leaders.read().unwrap().clone();
        let sequence_hashes = sequence_hashes.to_vec();

        let handle = self.messenger.runtime();

        handle.spawn(async move {
            if let Err(e) = session.run(rx, remote_leaders, sequence_hashes).await {
                tracing::warn!(error = %e, "InitiatorSession error");
                // Try to update status to indicate error
                status_tx
                    .send(OnboardingStatus::Complete { matched_blocks: 0 })
                    .ok();
            }
        });

        Ok(FindMatchesResult::AsyncSession(AsyncSessionResult::new(
            session_id,
            status_rx,
            all_g2_blocks,
            session_handle,
        )))
    }
}