Unverified Commit bce74588 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

chore: Rust to 1.89 and edition 2024 (#2659)

parent 268d017e
......@@ -7,8 +7,8 @@ pub mod distributed_leader_worker;
pub mod null;
use crate::block_manager::block::{
transfer::{TransferContext, TransferError, WriteToStrategy},
BlockDataProvider, ReadableBlock, WritableBlock,
transfer::{TransferContext, TransferError, WriteToStrategy},
};
use crate::block_manager::locality::Logical;
use crate::block_manager::storage::{self, nixl::NixlDescriptor};
......
......@@ -16,7 +16,7 @@
use super::*;
use crate::block_manager::block::transfer::{
handle_local_transfer, TransferContext, TransferError, WriteToStrategy,
TransferContext, TransferError, WriteToStrategy, handle_local_transfer,
};
use crate::block_manager::storage::{self, nixl::NixlDescriptor};
......
......@@ -109,21 +109,21 @@ impl BlockRegistry {
{
let mut blocks = blocks.lock().unwrap();
if let Some(handle) = blocks.get(&sequence_hash) {
if handle.upgrade().is_none() {
if let Some(handle) = blocks.get(&sequence_hash)
&& handle.upgrade().is_none()
{
blocks.remove(&sequence_hash);
}
}
}
let mut global_registry = global_registry.lock().unwrap();
if let Some(entry) = global_registry.get(&sequence_hash) {
if entry.upgrade().is_none() {
if let Some(entry) = global_registry.get(&sequence_hash)
&& entry.upgrade().is_none()
{
global_registry.remove(&sequence_hash);
}
}
}
});
Self {
......@@ -136,11 +136,11 @@ impl BlockRegistry {
pub fn is_registered(&self, sequence_hash: SequenceHash) -> bool {
let blocks = self.blocks.lock().unwrap();
if let Some(handle) = blocks.get(&sequence_hash) {
if let Some(_handle) = handle.upgrade() {
if let Some(handle) = blocks.get(&sequence_hash)
&& let Some(_handle) = handle.upgrade()
{
return true;
}
}
false
}
......@@ -161,13 +161,13 @@ impl BlockRegistry {
let mut blocks = self.blocks.lock().unwrap();
// If an identical block already exists in this pool, return an error.
if let Some(handle) = blocks.get(&sequence_hash) {
if let Some(_handle) = handle.upgrade() {
if let Some(handle) = blocks.get(&sequence_hash)
&& let Some(_handle) = handle.upgrade()
{
return Err(BlockRegistrationError::BlockAlreadyRegistered(
sequence_hash,
));
}
}
let mut publish_handle = None;
......@@ -179,11 +179,11 @@ impl BlockRegistry {
let mut global_registry = self.global_registry.lock().unwrap();
// If an identical block exists in other pool, use the same registration handle.
if let Some(handle) = global_registry.get(&sequence_hash) {
if let Some(handle) = handle.upgrade() {
if let Some(handle) = global_registry.get(&sequence_hash)
&& let Some(handle) = handle.upgrade()
{
break 'reg_block handle;
}
}
// Otherwise, create a new registration handle.
publish_handle = Some(Self::create_publish_handle(
......
......@@ -17,8 +17,8 @@ use std::sync::Arc;
use derive_getters::Getters;
use super::registry::{BlockHandle, RegistrationHandle};
use super::Result;
use super::registry::{BlockHandle, RegistrationHandle};
use crate::tokens::{PartialTokenBlock, SaltHash, Token, TokenBlock, Tokens};
#[derive(Debug, thiserror::Error)]
......
......@@ -22,8 +22,8 @@ mod strategy;
use super::*;
use crate::block_manager::storage::{
nixl::{NixlRegisterableStorage, NixlStorage},
DeviceStorage, DiskStorage, PinnedStorage, SystemStorage,
nixl::{NixlRegisterableStorage, NixlStorage},
};
use cudarc::driver::CudaStream;
......
......@@ -15,7 +15,7 @@
use super::*;
use cudarc::driver::{sys::CUevent_flags, CudaEvent, CudaStream};
use cudarc::driver::{CudaEvent, CudaStream, sys::CUevent_flags};
use nixl_sys::Agent as NixlAgent;
use std::sync::Arc;
......@@ -107,10 +107,10 @@ impl TransferContext {
impl Drop for TransferContext {
fn drop(&mut self) {
self.cancel_token.cancel();
if let Some(handle) = self.cuda_event_worker.take() {
if let Err(e) = handle.join() {
if let Some(handle) = self.cuda_event_worker.take()
&& let Err(e) = handle.join()
{
tracing::error!("Error joining CUDA event worker: {:?}", e);
}
}
}
}
......@@ -177,9 +177,11 @@ unsafe fn cuda_memcpy_h2d(
"Source and destination device memory regions must not overlap for D2D copy"
);
unsafe {
let src_slice = std::slice::from_raw_parts(src_ptr, size);
cuda_result::memcpy_htod_async(dst_ptr as u64, src_slice, stream.cu_stream())
.map_err(|e| TransferError::ExecutionError(format!("CUDA H2D memcpy failed: {}", e)))?;
.map_err(|e| TransferError::ExecutionError(format!("CUDA H2D memcpy failed: {}", e)))?
};
Ok(())
}
......@@ -199,9 +201,11 @@ unsafe fn cuda_memcpy_d2h(
"Source and destination device memory regions must not overlap for D2D copy"
);
unsafe {
let dst_slice = std::slice::from_raw_parts_mut(dst_ptr, size);
cuda_result::memcpy_dtoh_async(dst_slice, src_ptr as u64, stream.cu_stream())
.map_err(|e| TransferError::ExecutionError(format!("CUDA D2H memcpy failed: {}", e)))?;
}
Ok(())
}
......@@ -221,8 +225,10 @@ unsafe fn cuda_memcpy_d2d(
"Source and destination device memory regions must not overlap for D2D copy"
);
unsafe {
cuda_result::memcpy_dtod_async(dst_ptr as u64, src_ptr as u64, size, stream.cu_stream())
.map_err(|e| TransferError::ExecutionError(format!("CUDA D2D memcpy failed: {}", e)))?;
.map_err(|e| TransferError::ExecutionError(format!("CUDA D2D memcpy failed: {}", e)))?
};
Ok(())
}
......
......@@ -78,5 +78,5 @@ unsafe fn memcpy(src_ptr: *const u8, dst_ptr: *mut u8, size: usize) {
"Source and destination memory regions must not overlap for copy_nonoverlapping"
);
std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size);
unsafe { std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, size) };
}
......@@ -53,7 +53,7 @@
//!
//! [`SchedulerOutput`] is transform
use super::scheduler::{SchedulingDecision, DISCONNECTED_WARNING};
use super::scheduler::{DISCONNECTED_WARNING, SchedulingDecision};
use super::*;
use tokio::sync::oneshot;
......@@ -194,14 +194,14 @@ impl TransferCompletionHandle for ScheduledTransferCompletionHandle {
}
async fn mark_complete(&self, result: anyhow::Result<()>) {
if let Some(completion_tx) = self.completion_tx.lock().unwrap().take() {
if completion_tx.send(result).is_err() {
if let Some(completion_tx) = self.completion_tx.lock().unwrap().take()
&& completion_tx.send(result).is_err()
{
tracing::error!(
"failed to send completion status; this could lead to silent data corruption"
);
}
}
}
}
impl Drop for ScheduledTransferCompletionHandle {
......@@ -256,8 +256,8 @@ impl TransferCompletionHandle for ImmediateTransferCompletionHandle {
let mut guard = self.completion_tx.lock().unwrap();
guard.take()
};
if let Some(completion_tx) = completion_tx {
if completion_tx
if let Some(completion_tx) = completion_tx
&& completion_tx
.send(TransferToSchedulerMessage::ImmediateResult(
ImmediateTransferResult {
request_id: self.request_id.clone(),
......@@ -271,7 +271,6 @@ impl TransferCompletionHandle for ImmediateTransferCompletionHandle {
tracing::error!(DISCONNECTED_WARNING);
}
}
}
}
impl Drop for ImmediateTransferCompletionHandle {
......
......@@ -12,8 +12,8 @@ use serde::{Deserialize, Serialize};
use dynamo_runtime::{
pipeline::{
async_trait, network::Ingress, AsyncEngine, AsyncEngineContextProvider, Error, ManyOut,
ResponseStream, SingleIn,
AsyncEngine, AsyncEngineContextProvider, Error, ManyOut, ResponseStream, SingleIn,
async_trait, network::Ingress,
},
protocols::annotated::Annotated,
traits::DistributedRuntimeProvider,
......
......@@ -43,22 +43,22 @@ pub struct SchedulerRequest<T> {
mod tests {
use super::*;
use crate::block_manager::block::data::logical::distributed_leader_worker::DistributedLeaderWorkerResources;
use crate::block_manager::KvBlockManager;
use crate::block_manager::block::BasicMetadata;
use crate::block_manager::block::data::logical::distributed_leader_worker::DistributedLeaderWorkerResources;
use crate::block_manager::config::*;
use crate::block_manager::locality::Logical;
use crate::block_manager::storage::{
torch::{TorchDevice, TorchTensor},
DeviceAllocator, Storage, StorageAllocator,
torch::{TorchDevice, TorchTensor},
};
use crate::block_manager::KvBlockManager;
use anyhow::Result;
use rstest::*;
use std::sync::{
atomic::{AtomicUsize, Ordering},
Arc,
atomic::{AtomicUsize, Ordering},
};
use tokio_util::sync::CancellationToken;
......
......@@ -10,15 +10,15 @@ use zmq::*;
use BlockTransferPool::*;
use crate::block_manager::{
BasicMetadata, Storage,
block::{
Block, BlockDataProvider, BlockDataProviderMut, ReadableBlock, WritableBlock,
data::local::LocalBlockData,
locality,
transfer::{TransferContext, WriteTo, WriteToStrategy},
Block, BlockDataProvider, BlockDataProviderMut, ReadableBlock, WritableBlock,
},
connector::scheduler::{SchedulingDecision, TransferSchedulerClient},
storage::{DeviceStorage, DiskStorage, Local, PinnedStorage},
BasicMetadata, Storage,
};
use anyhow::Result;
......@@ -113,15 +113,13 @@ impl BlockTransferHandler {
.collect();
// Perform the transfer, and return the notifying channel.
let channel = match sources.write_to(&mut targets, self.context.clone()) {
match sources.write_to(&mut targets, self.context.clone()) {
Ok(channel) => Ok(channel),
Err(e) => {
tracing::error!("Failed to write to blocks: {:?}", e);
Err(e.into())
}
};
channel
}
}
pub async fn execute_transfer(&self, request: BlockTransferRequest) -> Result<()> {
......
......@@ -10,11 +10,11 @@ use utils::*;
use zmq::*;
use crate::block_manager::{
block::{layout_to_blocks, locality, transfer::TransferContext, Block},
BasicMetadata, BlockMetadata, LayoutConfigBuilder, NixlLayout, Storage,
block::{Block, layout_to_blocks, locality, transfer::TransferContext},
connector::scheduler::TransferSchedulerClient,
layout::LayoutType,
storage::{torch::TorchTensor, DeviceAllocator, DeviceStorage, DiskAllocator, PinnedAllocator},
BasicMetadata, BlockMetadata, LayoutConfigBuilder, NixlLayout, Storage,
storage::{DeviceAllocator, DeviceStorage, DiskAllocator, PinnedAllocator, torch::TorchTensor},
};
use derive_builder::Builder;
......@@ -28,8 +28,8 @@ use tokio::sync::oneshot;
use tokio_util::sync::CancellationToken;
use dynamo_runtime::{
utils::{leader_worker_barrier::WorkerBarrier, task::CriticalTaskExecutionHandle},
DistributedRuntime,
utils::{leader_worker_barrier::WorkerBarrier, task::CriticalTaskExecutionHandle},
};
#[derive(Debug, Clone, Serialize, Deserialize)]
......
......@@ -13,13 +13,13 @@ use std::collections::{HashMap, VecDeque};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tmq::{
publish::{publish, Publish},
pull::{pull, Pull},
push::{push, Push},
subscribe::{subscribe, Subscribe},
Context, Message, Multipart,
publish::{Publish, publish},
pull::{Pull, pull},
push::{Push, push},
subscribe::{Subscribe, subscribe},
};
use tokio::sync::{oneshot, Mutex};
use tokio::sync::{Mutex, oneshot};
use tokio_util::sync::CancellationToken;
use futures_util::{SinkExt, StreamExt};
......
......@@ -110,8 +110,8 @@ use super::{
};
use super::super::storage::{
nixl::{NixlAgent, NixlRegisterableStorage, NixlStorage, OptArgs},
Storage, StorageAllocator,
nixl::{NixlAgent, NixlRegisterableStorage, NixlStorage, OptArgs},
};
use super::{FullyContiguous, FullyContiguousConfig, LayerSeparate, LayerSeparateConfig};
use serde::{Deserialize, Serialize};
......
......@@ -15,9 +15,9 @@
use anyhow::Result;
use prometheus::{
IntCounterVec, IntGaugeVec, Opts, Registry,
core::{AtomicI64, AtomicU64, GenericCounter, GenericGauge},
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry, IntCounterVec,
IntGaugeVec, Opts, Registry,
register_int_counter_vec_with_registry, register_int_gauge_vec_with_registry,
};
use std::sync::Arc;
pub struct BlockManagerMetrics {
......
......@@ -45,8 +45,8 @@
//! of the [`OffloadManager::offload_worker`] and [`OffloadManager::onboard_worker`] methods.
use super::block::{
locality::LocalityProvider, transfer::TransferContext, BlockError, BlockMetadata, BlockState,
ImmutableBlock, MutableBlock,
BlockError, BlockMetadata, BlockState, ImmutableBlock, MutableBlock,
locality::LocalityProvider, transfer::TransferContext,
};
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::pool::{BlockPool, BlockPoolError};
......@@ -56,8 +56,9 @@ use nixl_sys::Agent as NixlAgent;
use std::sync::Arc;
use tokio::runtime::Handle;
use tokio::sync::{
Mutex,
mpsc::{self, error::TryRecvError},
oneshot, Mutex,
oneshot,
};
use tokio_util::sync::CancellationToken;
......@@ -320,20 +321,21 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
if let Ok(blocks) = target_pool
.match_sequence_hashes(vec![request.sequence_hash].as_slice())
.await
&& !blocks.is_empty()
{
if !blocks.is_empty() {
continue;
}
}
let target_block = 'target_block: {
if let Ok(blocks) = target_pool.allocate_blocks(1).await {
if let Some(block) = blocks.into_iter().next() {
if let Ok(blocks) = target_pool.allocate_blocks(1).await
&& let Some(block) = blocks.into_iter().next()
{
break 'target_block Some(block);
}
}
tracing::warn!("Target pool full. Skipping offload. This should only ever happen with very small pool sizes.");
tracing::warn!(
"Target pool full. Skipping offload. This should only ever happen with very small pool sizes."
);
None
};
......@@ -504,15 +506,15 @@ impl<Locality: LocalityProvider + 'static, Metadata: BlockMetadata>
}
}
if let Some(targets) = targets.as_ref() {
if targets.len() != blocks.len() {
if let Some(targets) = targets.as_ref()
&& targets.len() != blocks.len()
{
tx.send(Err(BlockPoolError::BlockError(BlockError::Other(
anyhow::anyhow!("Number of targets does not match number of blocks."),
))))
.unwrap();
return rx;
}
}
if blocks.is_empty() {
tx.send(Ok(vec![])).unwrap();
......@@ -582,16 +584,16 @@ mod tests {
use super::*;
use crate::block_manager::{
LayoutConfig, NixlRegisterableStorage,
block::{
locality::Local, BasicMetadata, BlockDataExt, BlockDataProvider, Blocks, MutableBlock,
BasicMetadata, BlockDataExt, BlockDataProvider, Blocks, MutableBlock, locality::Local,
},
layout::{nixl::NixlLayout, FullyContiguous, LayerSeparate, LayoutType},
layout::{FullyContiguous, LayerSeparate, LayoutType, nixl::NixlLayout},
pool::{BlockRegistrationDuplicationSetting, ManagedBlockPool},
storage::{
DeviceAllocator, DeviceStorage, DiskAllocator, DiskStorage, PinnedAllocator,
PinnedStorage, StorageAllocator, StorageType,
},
LayoutConfig, NixlRegisterableStorage,
};
use crate::tokens::{TokenBlockSequence, Tokens};
use nixl_sys::{MemoryRegion, NixlDescriptor};
......
......@@ -48,10 +48,10 @@ use tokio::sync::{mpsc, oneshot};
use tokio_util::sync::CancellationToken;
use crate::block_manager::block::{
locality::LocalityProvider,
transfer::{TransferContext, WriteTo, WriteToStrategy},
BlockDataProvider, BlockDataProviderMut, BlockError, BlockMetadata, BlockState, ImmutableBlock,
MutableBlock, ReadableBlock, WritableBlock,
locality::LocalityProvider,
transfer::{TransferContext, WriteTo, WriteToStrategy},
};
use crate::block_manager::metrics::PoolMetrics;
use crate::block_manager::pool::{BlockPool, BlockPoolError};
......@@ -59,7 +59,7 @@ use crate::block_manager::storage::{Local, Storage};
use anyhow::Result;
use async_trait::async_trait;
use futures::{stream::FuturesUnordered, StreamExt};
use futures::{StreamExt, stream::FuturesUnordered};
use super::BlockResult;
......
......@@ -18,7 +18,7 @@ use std::sync::Weak;
use tokio::sync::oneshot;
use crate::block_manager::block::{
locality::LocalityProvider, BlockMetadata, ImmutableBlock, MutableBlock,
BlockMetadata, ImmutableBlock, MutableBlock, locality::LocalityProvider,
};
use crate::block_manager::pool::BlockPoolError;
use crate::block_manager::storage::Storage;
......
......@@ -23,15 +23,15 @@ use serde::{Deserialize, Serialize};
pub use super::block::{ImmutableBlock, MutableBlock};
use super::block::{
nixl::short_type_name, private, registry::BlockRegistry, Block, BlockError, BlockMetadata,
GlobalRegistry, MaybeReturnableBlock,
Block, BlockError, BlockMetadata, GlobalRegistry, MaybeReturnableBlock, nixl::short_type_name,
private, registry::BlockRegistry,
};
use super::events::{EventManager, NullEventManager};
use super::metrics::{BlockManagerMetrics, PoolMetrics};
use super::storage::Storage;
use crate::block_manager::block::locality::LocalityProvider;
use crate::block_manager::CacheLevel;
use crate::block_manager::block::locality::LocalityProvider;
use crate::tokens::{SequenceHash, TokenBlock};
use async_trait::async_trait;
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment