Commit 4b6cfc1b authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: KV recorder for dumping router events into a jsonl (#505)

parent 4c7dceca
......@@ -72,6 +72,7 @@ fn _core(m: &Bound<'_, PyModule>) -> PyResult<()> {
m.add_class::<llm::kv::AggregatedMetrics>()?;
m.add_class::<llm::kv::KvMetricsAggregator>()?;
m.add_class::<llm::kv::KvEventPublisher>()?;
m.add_class::<llm::kv::KvRecorder>()?;
m.add_class::<http::HttpService>()?;
m.add_class::<http::HttpError>()?;
m.add_class::<http::HttpAsyncEngine>()?;
......
......@@ -392,3 +392,122 @@ impl KvMetricsAggregator {
})
}
}
#[pyclass]
pub(crate) struct KvRecorder {
inner: Arc<llm_rs::kv_router::recorder::KvRecorder>,
}
#[pymethods]
impl KvRecorder {
#[new]
#[pyo3(signature = (component, output_path=None, max_lines_per_file=None, max_count=None, max_time=None))]
fn new(
component: Component,
output_path: Option<String>,
max_lines_per_file: Option<usize>,
max_count: Option<usize>,
max_time: Option<f64>,
) -> PyResult<Self> {
let runtime = pyo3_async_runtimes::tokio::get_runtime();
runtime.block_on(async {
let token = component.inner.drt().runtime().child_token();
// Create a temp path if none provided
let path = match output_path {
Some(p) => p,
None => {
let temp_dir = std::env::temp_dir();
temp_dir
.join("kv_events.jsonl")
.to_string_lossy()
.to_string()
}
};
let inner = llm_rs::kv_router::recorder::KvRecorder::new(
token.clone(),
path,
max_lines_per_file,
max_count,
max_time,
)
.await
.map_err(to_pyerr)?;
// Subscribe to KV events
let mut kv_events_rx = component
.inner
.subscribe(llm_rs::kv_router::KV_EVENT_SUBJECT)
.await
.map_err(to_pyerr)?;
let event_tx = inner.event_sender();
// Spawn a task to forward events to the recorder
tokio::spawn(async move {
while let Some(event) = kv_events_rx.next().await {
let event: llm_rs::kv_router::indexer::RouterEvent =
serde_json::from_slice(&event.payload).unwrap();
tracing::debug!("KvRecorder received kv event: {:?}", event);
if let Err(e) = event_tx.send(event).await {
tracing::trace!(
"KvRecorder failed to send kv event; shutting down: {:?}",
e
);
}
}
});
Ok(Self {
inner: Arc::new(inner),
})
})
}
fn event_count<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let recorder = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let count = recorder.event_count().await;
Ok(count)
})
}
fn elapsed_time<'py>(&self, py: Python<'py>) -> PyResult<Bound<'py, PyAny>> {
let recorder = self.inner.clone();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
match recorder.elapsed_time().await {
Ok(elapsed) => Ok(elapsed.as_secs_f64()),
Err(_) => Ok(0.0), // Return 0.0 when no events have been received yet
}
})
}
#[pyo3(signature = (indexer, timed=false, max_count=None, max_time=None))]
fn replay_events<'py>(
&self,
py: Python<'py>,
indexer: &KvIndexer,
timed: bool,
max_count: Option<usize>,
max_time: Option<f64>,
) -> PyResult<Bound<'py, PyAny>> {
let event_tx = indexer.inner.event_sender();
pyo3_async_runtimes::tokio::future_into_py(py, async move {
let count = llm_rs::kv_router::recorder::KvRecorder::send_events(
"dummy_path", // This doesn't matter as we'll use the provided event_tx
&event_tx,
timed,
max_count,
max_time,
)
.await
.map_err(to_pyerr)?;
Ok(count)
})
}
fn shutdown(&self) -> PyResult<()> {
self.inner.shutdown();
Ok(())
}
}
......@@ -108,7 +108,6 @@ class Component:
"""
...
class Endpoint:
"""
An Endpoint is a single API endpoint
......@@ -330,6 +329,79 @@ class KvIndexer:
"""
...
class KvRecorder:
"""
A recorder for KV Router events.
"""
...
def __init__(
self,
component: Component,
output_path: Optional[str] = None,
max_lines_per_file: Optional[int] = None,
max_count: Optional[int] = None,
max_time: Optional[float] = None,
) -> None:
"""
Create a new KvRecorder instance.
Args:
component: The component to associate with this recorder
output_path: Path to the JSONL file to write events to
max_lines_per_file: Maximum number of lines per file before rotating to a new file
max_count: Maximum number of events to record before shutting down
max_time: Maximum duration in seconds to record before shutting down
"""
...
def event_count(self) -> int:
"""
Get the count of recorded events.
Returns:
The number of events recorded
"""
...
def elapsed_time(self) -> float:
"""
Get the elapsed time since the recorder was started.
Returns:
The elapsed time in seconds as a float
"""
...
def replay_events(
self,
indexer: KvIndexer,
timed: bool = False,
max_count: Optional[int] = None,
max_time: Optional[float] = None,
) -> int:
"""
Populate an indexer with the recorded events.
Args:
indexer: The KvIndexer to populate with events
timed: If true, events will be sent according to their recorded timestamps.
If false, events will be sent without any delay in between.
max_count: Maximum number of events to send before stopping
max_time: Maximum duration in seconds to send events before stopping
Returns:
The number of events sent to the indexer
"""
...
def shutdown(self) -> None:
"""
Shutdown the recorder.
"""
...
class AggregatedMetrics:
"""
A collection of metrics of the endpoints
......@@ -362,12 +434,23 @@ class KvEventPublisher:
...
def __init__(self, component: Component, worker_id: int, kv_block_size: int) -> None:
def __init__(
self, component: Component, worker_id: int, kv_block_size: int
) -> None:
"""
Create a `KvEventPublisher` object
"""
def publish_stored(self, event_id, int, token_ids: List[int], num_block_tokens: List[int], block_hashes: List[int], lora_id: int, parent_hash: Optional[int] = None) -> None:
def publish_stored(
self,
event_id,
int,
token_ids: List[int],
num_block_tokens: List[int],
block_hashes: List[int],
lora_id: int,
parent_hash: Optional[int] = None,
) -> None:
"""
Publish a KV stored event.
"""
......
......@@ -22,5 +22,6 @@ from dynamo._core import KvEventPublisher as KvEventPublisher
from dynamo._core import KvIndexer as KvIndexer
from dynamo._core import KvMetricsAggregator as KvMetricsAggregator
from dynamo._core import KvMetricsPublisher as KvMetricsPublisher
from dynamo._core import KvRecorder as KvRecorder
from dynamo._core import KvRouter as KvRouter
from dynamo._core import OverlapScores as OverlapScores
......@@ -30,6 +30,7 @@ pub mod indexer;
pub mod metrics_aggregator;
pub mod protocols;
pub mod publisher;
pub mod recorder;
pub mod scheduler;
pub mod scoring;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use crate::kv_router::indexer::RouterEvent;
use crate::recorder::Recorder;
// Type alias for backward compatibility
pub type KvRecorder = Recorder<RouterEvent>;
#[cfg(test)]
mod tests {
use super::*;
use crate::kv_router::indexer::KvIndexer;
use crate::kv_router::indexer::WorkerId;
use crate::kv_router::protocols::*;
use std::time::Duration;
use tempfile::tempdir;
use tokio::fs;
use tokio_util::sync::CancellationToken;
fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
hashes
.iter()
.map(|i| KvCacheStoredBlockData {
tokens_hash: LocalBlockHash(*i),
block_hash: ExternalSequenceBlockHash(*i * 100),
})
.collect()
}
fn add_blocks(
hashes: Vec<u64>,
parent_hash: Option<ExternalSequenceBlockHash>,
) -> KvCacheEventData {
KvCacheEventData::Stored(KvCacheStoreData {
parent_hash,
blocks: make_blocks(hashes),
})
}
fn create_store_event(
worker_id: WorkerId,
event_id: u64,
hashes: Vec<u64>,
parent: Option<ExternalSequenceBlockHash>,
) -> RouterEvent {
RouterEvent::new(
worker_id,
KvCacheEvent {
event_id,
data: add_blocks(hashes, parent),
},
)
}
fn create_remove_event(worker_id: WorkerId, event_id: u64, hashes: Vec<u64>) -> RouterEvent {
RouterEvent::new(
worker_id,
KvCacheEvent {
event_id,
data: KvCacheEventData::Removed(KvCacheRemoveData {
block_hashes: hashes
.iter()
.map(|i| ExternalSequenceBlockHash(*i * 100))
.collect(),
}),
},
)
}
#[tokio::test]
async fn test_recorder_streams_events_to_file() {
// Create a temporary directory for output files
let dir = tempdir().unwrap();
let file_path = dir.path().join("kv_events.jsonl");
// Part 1: Record events to a file
let token = CancellationToken::new();
let recorder = KvRecorder::new(token.clone(), &file_path, None, None, None)
.await
.unwrap();
let event_tx = recorder.event_sender();
// Create first event from worker 1 using helper function
let event1 = create_store_event(1, 42, vec![1, 2, 3], None);
// Create second event from worker 2 using helper function
let event2 = create_remove_event(1, 43, vec![2, 3]);
// Send both events one after another
event_tx.send(event1).await.unwrap();
event_tx.send(event2).await.unwrap();
// Allow some time for processing
tokio::time::sleep(Duration::from_millis(10)).await;
// Check that both events were recorded
assert_eq!(recorder.event_count().await, 2);
// Force shutdown to flush file
recorder.shutdown();
tokio::time::sleep(Duration::from_millis(10)).await;
// Read the file and verify content
let content = fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
// Print the content of the JSONL file
println!("JSONL file content:");
for (i, line) in lines.iter().enumerate() {
println!("Line {}: {}", i + 1, line);
}
assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
// Part 2: Now create a KvIndexer and load the events from the file
let indexer_token = CancellationToken::new();
let kv_block_size = 32; // Default block size for testing
let indexer = KvIndexer::new(indexer_token.clone(), kv_block_size);
let indexer_event_tx = indexer.event_sender();
// Use the send_events method to load events from file to indexer
let count = KvRecorder::send_events(&file_path, &indexer_event_tx, false, None, None)
.await
.unwrap();
assert_eq!(count, 2, "Expected to send 2 events from file to indexer");
}
}
......@@ -29,6 +29,7 @@ pub mod model_card;
pub mod model_type;
pub mod preprocessor;
pub mod protocols;
pub mod recorder;
pub mod tokenizers;
pub mod tokens;
pub mod types;
......
// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
// SPDX-License-Identifier: Apache-2.0
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
use serde::{Deserialize, Serialize};
use std::io;
use std::path::{Path, PathBuf};
use std::sync::Arc;
use std::time::{Duration, Instant};
use tokio::fs::{self, File, OpenOptions};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader, BufWriter};
use tokio::sync::{mpsc, Mutex};
use tokio_util::sync::CancellationToken;
use tracing as log;
/// Record entry that will be serialized to JSONL
#[derive(Serialize, Deserialize)]
struct RecordEntry<T>
where
T: Clone,
{
timestamp: u64,
event: T,
}
/// A generic recorder for events that streams directly to a JSONL file
pub struct Recorder<T> {
/// A sender for events that can be cloned and shared with producers
event_tx: mpsc::Sender<T>,
/// A cancellation token for managing shutdown
cancel: CancellationToken,
/// Counter for the number of events written
event_count: Arc<Mutex<usize>>,
/// Time when the first event was received
first_event_time: Arc<Mutex<Option<Instant>>>,
}
impl<T> Recorder<T>
where
T: Serialize + for<'de> Deserialize<'de> + Clone + Send + Sync + 'static,
{
/// Create a new Recorder that streams events directly to a JSONL file
///
/// ### Arguments
///
/// * `token` - A cancellation token for managing shutdown
/// * `output_path` - Path to the JSONL file to write events to
/// * `max_lines_per_file` - Maximum number of lines per file before rotating to a new file.
/// If None, no rotation will occur.
/// * `max_count` - Maximum number of events to record before shutting down.
/// If None, no limit will be applied.
/// * `max_time` - Maximum duration in seconds to record before shutting down.
/// If None, no time limit will be applied.
///
/// ### Returns
///
/// A Result with a new Recorder that streams events to the specified file
pub async fn new<P: AsRef<Path>>(
token: CancellationToken,
output_path: P,
max_lines_per_file: Option<usize>,
max_count: Option<usize>,
max_time: Option<f64>,
) -> io::Result<Self> {
let (event_tx, mut event_rx) = mpsc::channel::<T>(2048);
let event_count = Arc::new(Mutex::new(0));
let event_count_clone = event_count.clone();
let cancel_clone = token.clone();
let start_time = Instant::now();
let first_event_time = Arc::new(Mutex::new(None));
let first_event_time_clone = first_event_time.clone();
// Ensure the directory exists
if let Some(parent) = output_path.as_ref().parent() {
if !parent.exists() {
fs::create_dir_all(parent).await?;
}
}
// Create the file for writing
let file = OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&output_path)
.await?;
let file_path = output_path.as_ref().to_path_buf();
// Spawn a task to receive events and write them to the file
tokio::spawn(async move {
let start_time = start_time;
let mut writer = BufWriter::with_capacity(32768, file);
let mut line_count = 0;
let mut file_index = 0;
let base_path = file_path.clone();
// Set up max time deadline if specified
let max_time_deadline = max_time.map(|secs| {
let duration = Duration::from_secs_f64(secs);
start_time + duration
});
loop {
// Check time limit if set
if let Some(deadline) = max_time_deadline {
if Instant::now() >= deadline {
log::info!("Recorder reached max time limit, shutting down");
// Flush and cancel
if let Err(e) = writer.flush().await {
log::error!("Failed to flush on time limit shutdown: {}", e);
}
cancel_clone.cancel();
return;
}
}
tokio::select! {
biased;
_ = cancel_clone.cancelled() => {
// Flush any pending writes before shutting down
if let Err(e) = writer.flush().await {
log::error!("Failed to flush on shutdown: {}", e);
}
log::debug!("Recorder task shutting down");
return;
}
Some(event) = event_rx.recv() => {
// Update first_event_time if this is the first event
{
let mut first_time = first_event_time_clone.lock().await;
if first_time.is_none() {
*first_time = Some(Instant::now());
}
}
// Calculate elapsed time in milliseconds
let elapsed_ms = start_time.elapsed().as_millis() as u64;
// Create the record entry
let entry = RecordEntry {
timestamp: elapsed_ms,
event,
};
// Serialize to JSON string
let json = match serde_json::to_string(&entry) {
Ok(json) => json,
Err(e) => {
log::error!("Failed to serialize event: {}", e);
continue;
}
};
// Write JSON line
if let Err(e) = writer.write_all(json.as_bytes()).await {
log::error!("Failed to write event: {}", e);
continue;
}
// Add a newline
if let Err(e) = writer.write_all(b"\n").await {
log::error!("Failed to write newline: {}", e);
continue;
}
// Increment line count
line_count += 1;
// Check if we need to rotate to a new file
if let Some(max_lines) = max_lines_per_file {
if line_count >= max_lines {
// Flush the current file
if let Err(e) = writer.flush().await {
log::error!("Failed to flush file before rotation: {}", e);
}
// Create new filename with suffix
file_index += 1;
let new_path = create_rotated_path(&base_path, file_index);
// Open new file
match OpenOptions::new()
.create(true)
.write(true)
.truncate(true)
.open(&new_path)
.await
{
Ok(new_file) => {
writer = BufWriter::with_capacity(32768, new_file);
line_count = 0;
log::info!("Rotated to new file: {}", new_path.display());
},
Err(e) => {
log::error!("Failed to open rotated file {}: {}", new_path.display(), e);
// Continue with the existing file if rotation fails
}
}
}
}
// Update event count
let mut count = event_count_clone.lock().await;
*count += 1;
// Check if we've reached the maximum count
if let Some(max) = max_count {
if *count >= max {
log::info!("Recorder reached max event count ({}), shutting down", max);
// Flush buffer before shutting down
if let Err(e) = writer.flush().await {
log::error!("Failed to flush on count limit shutdown: {}", e);
}
// Drop the lock before cancelling
drop(count);
cancel_clone.cancel();
return;
}
}
}
}
}
});
Ok(Self {
event_tx,
cancel: token,
event_count,
first_event_time,
})
}
/// Get a sender that can be used to send events to the recorder
pub fn event_sender(&self) -> mpsc::Sender<T> {
self.event_tx.clone()
}
/// Get the count of recorded events
pub async fn event_count(&self) -> usize {
*self.event_count.lock().await
}
/// Get the elapsed time since the first event was received
///
/// Returns a Result with the elapsed time or an error if no events have been received yet
pub async fn elapsed_time(&self) -> io::Result<Duration> {
let first_time = self.first_event_time.lock().await;
match *first_time {
Some(time) => Ok(time.elapsed()),
None => Err(io::Error::new(
io::ErrorKind::Other,
"No events received yet",
)),
}
}
/// Shutdown the recorder
pub fn shutdown(&self) {
self.cancel.cancel();
}
/// Send events from a JSONL file to the provided event sender
///
/// ### Arguments
///
/// * `filename` - Path to the JSONL file to read events from
/// * `event_tx` - A sender for events
/// * `timed` - If true, events will be sent according to their recorded timestamps.
/// If false, events will be sent as fast as possible without delay.
/// * `max_count` - Maximum number of events to send before stopping. If None, all events will be sent.
/// * `max_time` - Maximum duration in seconds to send events before stopping. If None, no time limit.
///
/// ### Returns
///
/// A Result indicating success or failure with the number of events sent
pub async fn send_events<P: AsRef<Path>>(
filename: P,
event_tx: &mpsc::Sender<T>,
timed: bool,
max_count: Option<usize>,
max_time: Option<f64>,
) -> io::Result<usize> {
// Store the display name before using filename
let display_name = filename.as_ref().display().to_string();
// Check if file exists
if !filename.as_ref().exists() {
return Err(io::Error::new(
io::ErrorKind::NotFound,
format!("File not found: {}", display_name),
));
}
// Set up start time and deadline if max_time is specified
let start_time = Instant::now();
let deadline = max_time.map(|secs| start_time + Duration::from_secs_f64(secs));
// Open the file for reading using tokio's async file I/O
let file = File::open(&filename).await?;
let reader = BufReader::with_capacity(32768, file);
let mut lines = reader.lines();
let mut count = 0;
let mut line_number = 0;
let mut prev_timestamp: Option<u64> = None;
// Read and send events line by line
while let Some(line) = lines.next_line().await? {
// Check if we've reached the maximum count
if let Some(max) = max_count {
if count >= max {
log::info!("Reached maximum event count ({}), stopping", max);
break;
}
}
// Check if we've exceeded the time limit
if let Some(end_time) = deadline {
if Instant::now() >= end_time {
log::info!("Reached maximum time limit, stopping");
break;
}
}
line_number += 1;
// Skip empty lines
if line.trim().is_empty() {
continue;
}
// Try to parse the JSON
let record: RecordEntry<T> = match serde_json::from_str(&line) {
Ok(record) => record,
Err(e) => {
log::warn!(
"Failed to parse JSON on line {}: {}. Skipping.",
line_number,
e
);
continue;
}
};
let timestamp = record.timestamp;
let event = record.event;
// Handle timing if needed
if timed && prev_timestamp.is_some() {
let prev = prev_timestamp.unwrap();
if timestamp > prev {
let wait_time = timestamp - prev;
tokio::time::sleep(Duration::from_millis(wait_time)).await;
}
}
// Send the event
event_tx.send(event).await.map_err(|e| {
io::Error::new(io::ErrorKind::Other, format!("Failed to send event: {}", e))
})?;
// Update previous timestamp and count
prev_timestamp = Some(timestamp);
count += 1;
}
if count == 0 {
log::warn!("No events to send from file: {}", display_name);
} else {
log::info!("Sent {} events from {}", count, display_name);
}
Ok(count)
}
}
/// Helper function to create a rotated file path with an index suffix
fn create_rotated_path(base_path: &Path, index: usize) -> PathBuf {
let path_str = base_path.to_string_lossy();
if let Some(ext_pos) = path_str.rfind('.') {
// If there's an extension, insert the index before it
let (file_path, extension) = path_str.split_at(ext_pos);
PathBuf::from(format!("{}{}{}", file_path, index, extension))
} else {
// If there's no extension, just append the index
PathBuf::from(format!("{}{}", path_str, index))
}
}
#[cfg(test)]
mod tests {
use super::*;
use std::time::Duration;
use tempfile::tempdir;
// Type alias for the TestEvent recorder
type TestEventRecorder = Recorder<TestEvent>;
// More complex event type
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
struct TestEvent {
id: u64,
name: String,
values: Vec<i32>,
}
impl TestEvent {
// Helper method to generate a random test event
fn new(id: u64) -> Self {
// Generate a random number of values between 1 and 100
let num_values = rand::random_range(1..=100);
// Generate random values (integers between -100 and 100)
let values = (0..num_values)
.map(|_| rand::random_range(-100..=100))
.collect();
// Create a name based on the ID
let name = format!("event_{}", id);
TestEvent { id, name, values }
}
// Helper method to generate a vector of random events
fn generate_events(count: usize) -> Vec<Self> {
(0..count).map(|i| Self::new(i as u64)).collect()
}
}
#[tokio::test]
async fn test_recorder_streams_events_to_file() {
// Create a temporary directory for output files
let dir = tempdir().unwrap();
let file_path = dir.path().join("events.jsonl");
let token = CancellationToken::new();
let recorder = TestEventRecorder::new(token.clone(), &file_path, None, None, None)
.await
.unwrap();
let event_tx = recorder.event_sender();
// Create test events using generate_events
let events = TestEvent::generate_events(2);
let event1 = events[0].clone();
let event2 = events[1].clone();
// Wait some time before the first event
tokio::time::sleep(Duration::from_millis(10)).await;
// Send the events
for event in &events {
event_tx.send(event.clone()).await.unwrap();
}
// Allow some time for processing
tokio::time::sleep(Duration::from_millis(10)).await;
// Check that both events were recorded
assert_eq!(recorder.event_count().await, 2);
// Check that the elapsed time is between 9 and 11 milliseconds
let elapsed_ms = recorder.elapsed_time().await.unwrap().as_millis();
if !(9..=11).contains(&elapsed_ms) {
println!("Actual elapsed time: {} ms", elapsed_ms);
assert!((9..=11).contains(&elapsed_ms));
}
// Force shutdown to flush file
recorder.shutdown();
tokio::time::sleep(Duration::from_millis(10)).await;
// Read the file and verify content
let content = fs::read_to_string(&file_path).await.unwrap();
let lines: Vec<&str> = content.lines().collect();
// Print the content of the JSONL file
println!("JSONL file content:");
for (i, line) in lines.iter().enumerate() {
println!("Line {}: {}", i + 1, line);
}
assert_eq!(lines.len(), 2, "Expected 2 lines in the file");
// Parse the lines to verify events
let entry1: RecordEntry<TestEvent> = serde_json::from_str(lines[0]).unwrap();
let entry2: RecordEntry<TestEvent> = serde_json::from_str(lines[1]).unwrap();
assert_eq!(entry1.event, event1);
assert_eq!(entry2.event, event2);
assert!(entry2.timestamp >= entry1.timestamp);
}
#[ignore]
#[tokio::test]
async fn load_test_100k_events() {
// Create a temporary directory for output files
let dir = tempdir().unwrap();
let file_path = dir.path().join("events.jsonl");
// Create a cancellation token for the recorder
let token = CancellationToken::new();
// Set max lines per file to 10,000 (should create 10 files total)
const MAX_LINES_PER_FILE: usize = 10_000;
let recorder = TestEventRecorder::new(
token.clone(),
&file_path,
Some(MAX_LINES_PER_FILE),
None,
None,
)
.await
.unwrap();
let event_tx = recorder.event_sender();
// Define number of events to generate
const NUM_EVENTS: usize = 100_000;
println!("Generating {} events...", NUM_EVENTS);
// Generate events using the helper method
let events = TestEvent::generate_events(NUM_EVENTS);
// Send events with progress reporting
for (i, event) in events.iter().enumerate() {
event_tx.send(event.clone()).await.unwrap();
// Print progress every 10,000 events
if i > 0 && i % 10_000 == 0 {
println!("Sent {} events...", i);
}
}
// Allow time for the recorder to process all events
println!("Waiting for events to be processed...");
tokio::time::sleep(Duration::from_millis(1000)).await;
// Verify that all events were recorded
let count = recorder.event_count().await;
println!("Recorded event count: {}", count);
assert_eq!(count, NUM_EVENTS);
// Force a clean shutdown to flush all pending writes
recorder.shutdown();
tokio::time::sleep(Duration::from_millis(100)).await;
// Check for the existence of all expected files
let base_file = file_path.clone();
let mut found_files = Vec::new();
// Check base file
if base_file.exists() {
found_files.push(base_file.clone());
}
// Check rotated files (1-9)
for i in 1..=9 {
let rotated_path = create_rotated_path(&base_file, i);
if rotated_path.exists() {
found_files.push(rotated_path);
}
}
// Check that we have exactly 10 files
assert_eq!(
found_files.len(),
10,
"Expected 10 files due to rotation with 10k events each"
);
// Add more stringent check for each file size
for (i, file_path) in found_files.iter().enumerate() {
let content = fs::read_to_string(file_path).await.unwrap();
let line_count = content.lines().count();
if i < found_files.len() - 1 {
// All files except the last one should have exactly MAX_LINES_PER_FILE lines
assert_eq!(
line_count,
MAX_LINES_PER_FILE,
"File {} should contain exactly {} lines",
file_path.display(),
MAX_LINES_PER_FILE
);
} else {
// The last file might have fewer lines
assert!(
line_count <= MAX_LINES_PER_FILE,
"Last file should contain at most {} lines",
MAX_LINES_PER_FILE
);
}
}
// Count total lines across all files
let mut total_lines = 0;
// Check that timestamps are weakly sorted within each file
for (i, file_path) in found_files.iter().enumerate() {
println!("Checking file {}: {}", i, file_path.display());
// Count lines in the file
let content = fs::read_to_string(file_path).await.unwrap();
let line_count = content.lines().count();
// Should have MAX_LINES_PER_FILE lines in each file (except maybe the last one)
if i < found_files.len() - 1 {
assert_eq!(line_count, MAX_LINES_PER_FILE, "Each file except possibly the last should have exactly MAX_LINES_PER_FILE lines");
}
total_lines += line_count;
// Check that timestamps are weakly sorted within each file
let file = File::open(file_path).await.unwrap();
let reader = BufReader::new(file);
let mut lines = reader.lines();
let mut prev_timestamp: Option<u64> = None;
let mut line_number = 0;
let mut unsorted_count = 0;
// Check timestamps in the file without loading everything into memory
while let Some(line) = lines.next_line().await.unwrap() {
line_number += 1;
let entry: RecordEntry<TestEvent> = serde_json::from_str(&line).unwrap();
if let Some(prev) = prev_timestamp {
if entry.timestamp < prev {
unsorted_count += 1;
if unsorted_count <= 5 {
// Only log first 5 violations to avoid spam
println!(
"Timestamp order violation in file {} at line {}: {} < {}",
file_path.display(),
line_number,
entry.timestamp,
prev
);
}
}
}
prev_timestamp = Some(entry.timestamp);
}
assert_eq!(
unsorted_count, 0,
"Timestamps should be weakly sorted within each file"
);
}
assert_eq!(
total_lines, NUM_EVENTS,
"Total lines across all files should match NUM_EVENTS"
);
println!("Load test with file rotation completed successfully");
}
}
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment