"ssh:/git@developer.sourcefind.cn:2222/OpenDAS/dynamo.git" did not exist on "98a6d3b91076f8239b9db0e131175de2de990b4a"
Unverified Commit 3bde1e45 authored by Graham King's avatar Graham King Committed by GitHub
Browse files

fix: Fix race condition in kv_router unit test (#1174)

Removed the hard coded sleeps, explained what we're testing.

Closes https://github.com/ai-dynamo/dynamo/issues/1132

The race condition is that `apply_event` sends a message on a channel, it does not directly apply the event. At some later point the tokio runtime schedules the task running the channel receiver, which applies the event. If that had not happened yet the test would fail.
parent 5d5080ba
...@@ -55,6 +55,7 @@ api_server_models/ ...@@ -55,6 +55,7 @@ api_server_models/
server/ server/
# Replay/Snapshot test artifacts # Replay/Snapshot test artifacts
*.new *.new
lib/llm/tests/data/sample-models/models--meta-llama--Llama-3.1-70B-Instruct/
**/*backups* **/*backups*
...@@ -90,4 +91,4 @@ TensorRT-LLM ...@@ -90,4 +91,4 @@ TensorRT-LLM
# Local build artifacts for devcontainer # Local build artifacts for devcontainer
.build/ .build/
# Copied binaries to ignore # Copied binaries to ignore
deploy/sdk/src/dynamo/sdk/cli/bin deploy/sdk/src/dynamo/sdk/cli/bin
\ No newline at end of file
...@@ -752,7 +752,7 @@ impl KvIndexerSharded { ...@@ -752,7 +752,7 @@ impl KvIndexerSharded {
} }
_ = cancel.cancelled() => { _ = cancel.cancelled() => {
tracing::debug!("KvCacheIndexer progress loop shutting down"); tracing::trace!("KvCacheIndexer progress loop shutting down");
return; return;
} }
...@@ -896,6 +896,10 @@ mod tests { ...@@ -896,6 +896,10 @@ mod tests {
use tokio::time; use tokio::time;
use tokio_util::sync::CancellationToken; use tokio_util::sync::CancellationToken;
fn setup() {
dynamo_runtime::logging::init();
}
fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> { fn make_blocks(hashes: Vec<u64>) -> Vec<KvCacheStoredBlockData> {
hashes hashes
.iter() .iter()
...@@ -948,6 +952,8 @@ mod tests { ...@@ -948,6 +952,8 @@ mod tests {
#[test] #[test]
fn test_radix_tree() { fn test_radix_tree() {
setup();
let mut trie = RadixTree::new(); let mut trie = RadixTree::new();
let worker_1 = 0; let worker_1 = 0;
...@@ -1151,6 +1157,7 @@ mod tests { ...@@ -1151,6 +1157,7 @@ mod tests {
#[test] #[test]
fn test_remove_worker() { fn test_remove_worker() {
setup();
let mut trie = RadixTree::new(); let mut trie = RadixTree::new();
let worker_0 = 0; let worker_0 = 0;
...@@ -1175,6 +1182,7 @@ mod tests { ...@@ -1175,6 +1182,7 @@ mod tests {
#[test] #[test]
fn test_early_stopping() { fn test_early_stopping() {
setup();
let mut trie = RadixTree::new(); let mut trie = RadixTree::new();
let worker_0 = 0; let worker_0 = 0;
...@@ -1203,6 +1211,7 @@ mod tests { ...@@ -1203,6 +1211,7 @@ mod tests {
#[case(32)] #[case(32)]
#[case(64)] #[case(64)]
fn test_compute_block_hash_for_seq(#[case] kv_block_size: usize) { fn test_compute_block_hash_for_seq(#[case] kv_block_size: usize) {
setup();
// create a sequence of 64 elements // create a sequence of 64 elements
let sequence = (0..kv_block_size).map(|i| i as u32).collect::<Vec<u32>>(); let sequence = (0..kv_block_size).map(|i| i as u32).collect::<Vec<u32>>();
let hashes = compute_block_hash_for_seq(&sequence, kv_block_size); let hashes = compute_block_hash_for_seq(&sequence, kv_block_size);
...@@ -1250,6 +1259,7 @@ mod tests { ...@@ -1250,6 +1259,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_kv_indexer_new(num_shards: usize, kv_block_size: usize) { async fn test_kv_indexer_new(num_shards: usize, kv_block_size: usize) {
setup();
let token: CancellationToken = CancellationToken::new(); let token: CancellationToken = CancellationToken::new();
let _ = make_indexer(&token, num_shards, kv_block_size); let _ = make_indexer(&token, num_shards, kv_block_size);
} }
...@@ -1257,6 +1267,7 @@ mod tests { ...@@ -1257,6 +1267,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_find_matches(num_shards: usize, kv_block_size: usize) { async fn test_find_matches(num_shards: usize, kv_block_size: usize) {
setup();
let token = CancellationToken::new(); let token = CancellationToken::new();
let kv_indexer = make_indexer(&token, num_shards, kv_block_size); let kv_indexer = make_indexer(&token, num_shards, kv_block_size);
...@@ -1269,6 +1280,7 @@ mod tests { ...@@ -1269,6 +1280,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_find_matches_for_request(num_shards: usize, kv_block_size: usize) { async fn test_find_matches_for_request(num_shards: usize, kv_block_size: usize) {
setup();
let token = CancellationToken::new(); let token = CancellationToken::new();
let kv_indexer = make_indexer(&token, num_shards, kv_block_size); let kv_indexer = make_indexer(&token, num_shards, kv_block_size);
...@@ -1281,6 +1293,7 @@ mod tests { ...@@ -1281,6 +1293,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_apply_event(num_shards: usize, kv_block_size: usize) { async fn test_apply_event(num_shards: usize, kv_block_size: usize) {
setup();
let worker_id = 0; let worker_id = 0;
let token = CancellationToken::new(); let token = CancellationToken::new();
...@@ -1295,6 +1308,7 @@ mod tests { ...@@ -1295,6 +1308,7 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_shutdown(num_shards: usize, kv_block_size: usize) { async fn test_shutdown(num_shards: usize, kv_block_size: usize) {
setup();
let token = CancellationToken::new(); let token = CancellationToken::new();
let mut kv_indexer = make_indexer(&token, num_shards, kv_block_size); let mut kv_indexer = make_indexer(&token, num_shards, kv_block_size);
...@@ -1304,65 +1318,107 @@ mod tests { ...@@ -1304,65 +1318,107 @@ mod tests {
#[tokio::test] #[tokio::test]
#[apply(indexer_template)] #[apply(indexer_template)]
async fn test_frequency(num_shards: usize, kv_block_size: usize) { async fn test_frequency(num_shards: usize, kv_block_size: usize) {
const ONE_MILLIS: Duration = Duration::from_millis(1);
setup();
let mut kv_indexer: Box<dyn KvIndexerInterface>; let mut kv_indexer: Box<dyn KvIndexerInterface>;
let token = CancellationToken::new(); let token = CancellationToken::new();
let duration = Some(Duration::from_millis(50)); let expiration = Duration::from_millis(50);
if num_shards == 1 { if num_shards == 1 {
kv_indexer = Box::new(KvIndexer::new_with_frequency( kv_indexer = Box::new(KvIndexer::new_with_frequency(
token, token,
duration, Some(expiration),
kv_block_size, kv_block_size,
)); ));
} else { } else {
kv_indexer = Box::new(KvIndexerSharded::new_with_frequency( kv_indexer = Box::new(KvIndexerSharded::new_with_frequency(
token, token,
num_shards, num_shards,
duration, Some(expiration),
kv_block_size, kv_block_size,
)); ));
} }
let worker_id = 0; // The blocks
let event = create_store_event(worker_id, 0, vec![1, 2, 3, 4], None);
kv_indexer.apply_event(event).await;
time::sleep(Duration::from_millis(5)).await;
let block_hashes = vec![ let block_hashes = vec![
LocalBlockHash(1), LocalBlockHash(1),
LocalBlockHash(2), LocalBlockHash(2),
LocalBlockHash(3), LocalBlockHash(3),
LocalBlockHash(4), LocalBlockHash(4),
]; ];
let scores = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
assert_eq!(scores.frequencies.len(), 0); let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
assert_eq!(
overlap.frequencies.len(),
0,
"Should be no cached blocks yet"
);
// Blocks go in cache
let worker_id = 0;
let event = create_store_event(worker_id, 0, vec![1, 2, 3, 4], None);
kv_indexer.apply_event(event).await;
let scores = kv_indexer.find_matches(block_hashes.clone()).await.unwrap(); // First access
assert_eq!(scores.frequencies, vec![1, 1, 1, 1]); // The store event is applied async so poll briefly
let mut overlap = OverlapScores::default();
let timeout = Duration::from_millis(10);
let start = Instant::now();
while overlap.scores.is_empty() && Instant::now().duration_since(start) < timeout {
time::sleep(ONE_MILLIS).await;
overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
}
assert_eq!(
overlap.scores.len(),
1,
"One worker has these blocks cached"
);
assert_eq!(
overlap.frequencies.len(),
0,
"Blocks have not previously been accessed"
);
// Second access
let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
assert_eq!(overlap.scores.len(), 1, "Still one worker matches");
assert_eq!(
overlap.frequencies,
vec![1, 1, 1, 1],
"We should see the first access now"
);
time::sleep(Duration::from_millis(100)).await; // Let those two accesses expire
time::sleep(expiration + Duration::from_millis(10)).await;
let scores = kv_indexer.find_matches(block_hashes.clone()).await.unwrap(); // New first access
assert_eq!(scores.frequencies.len(), 0); let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
assert_eq!(
overlap.frequencies.len(),
0,
"Blocks were accessed too long ago"
);
let scores = kv_indexer.find_matches(block_hashes.clone()).await.unwrap(); // New second access
assert_eq!(scores.frequencies, vec![1, 1, 1, 1]); let _ = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
let scores = kv_indexer // Access only the first three blocks
let overlap = kv_indexer
.find_matches(block_hashes[0..3].to_vec()) .find_matches(block_hashes[0..3].to_vec())
.await .await
.unwrap(); .unwrap();
assert_eq!(scores.frequencies, vec![2, 2, 2]); // We see the previous two new accesses
assert_eq!(overlap.frequencies, vec![2, 2, 2]);
let scores = kv_indexer.find_matches(block_hashes.clone()).await.unwrap(); // The third access did not touch the last block
assert_eq!(scores.frequencies, vec![3, 3, 3, 2]); let overlap = kv_indexer.find_matches(block_hashes.clone()).await.unwrap();
assert_eq!(overlap.frequencies, vec![3, 3, 3, 2]);
} }
#[test] #[test]
fn test_router_event_new() { fn test_router_event_new() {
setup();
let worker_id = 0; let worker_id = 0;
let kv_cache_event = KvCacheEvent { let kv_cache_event = KvCacheEvent {
event_id: 1, event_id: 1,
...@@ -1392,6 +1448,7 @@ mod tests { ...@@ -1392,6 +1448,7 @@ mod tests {
#[test] #[test]
fn test_radix_tree_default() { fn test_radix_tree_default() {
setup();
let radix_tree: RadixTree = Default::default(); let radix_tree: RadixTree = Default::default();
assert!(radix_tree.root.borrow().children.is_empty()); assert!(radix_tree.root.borrow().children.is_empty());
assert!(radix_tree.root.borrow().workers.is_empty()); assert!(radix_tree.root.borrow().workers.is_empty());
...@@ -1400,6 +1457,7 @@ mod tests { ...@@ -1400,6 +1457,7 @@ mod tests {
#[test] #[test]
fn test_overlap_scores_default() { fn test_overlap_scores_default() {
setup();
let overlap_scores: OverlapScores = Default::default(); let overlap_scores: OverlapScores = Default::default();
assert!(overlap_scores.scores.is_empty()); assert!(overlap_scores.scores.is_empty());
} }
......
...@@ -236,16 +236,19 @@ impl Request { ...@@ -236,16 +236,19 @@ impl Request {
serde_json::from_str(messages).unwrap(); serde_json::from_str(messages).unwrap();
let tools: Option<Vec<async_openai::types::ChatCompletionTool>> = let tools: Option<Vec<async_openai::types::ChatCompletionTool>> =
tools.map(|x| serde_json::from_str(x).unwrap()); tools.map(|x| serde_json::from_str(x).unwrap());
let tools = tools.unwrap(); //let tools = tools.unwrap();
let tool_choice = tool_choice.unwrap(); //let tool_choice = tool_choice.unwrap();
let inner = async_openai::types::CreateChatCompletionRequestArgs::default() let mut inner = async_openai::types::CreateChatCompletionRequestArgs::default();
.model(model) inner.model(model);
.messages(messages) inner.messages(messages);
.tools(tools) if let Some(tools) = tools {
.tool_choice(tool_choice) inner.tools(tools);
.build() }
.unwrap(); if let Some(tool_choice) = tool_choice {
inner.tool_choice(tool_choice);
}
let inner = inner.build().unwrap();
NvCreateChatCompletionRequest { inner, nvext: None } NvCreateChatCompletionRequest { inner, nvext: None }
} }
......
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: system
content: You are a very helpful assistant!
- role: user
content: How do I reverse a string in Python?
- role: assistant
content: "You can reverse a string in Python using slicing:\n\n```python\nreversed_string = your_string[::-1]\n```\n\nAlternatively, you can use `reversed()` with `join()`:\n\n```python\nreversed_string = ''.join(reversed(your_string))\n```\n"
- role: user
content: What if I want to reverse each word in a sentence but keep their order?
model: meta-llama_llama-3_1-70b-instruct--1605565
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Cutting Knowledge Date: December 2023
Today Date: <redacted>
You are a very helpful assistant!<|eot_id|><|start_header_id|>user<|end_header_id|>
How do I reverse a string in Python?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
You can reverse a string in Python using slicing:
```python
reversed_string = your_string[::-1]
```
Alternatively, you can use `reversed()` with `join()`:
```python
reversed_string = ''.join(reversed(your_string))
```<|eot_id|><|start_header_id|>user<|end_header_id|>
What if I want to reverse each word in a sentence but keep their order?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: user
content: How do I reverse a string in Python?
- role: assistant
content: "You can reverse a string in Python using slicing:\n\n```python\nreversed_string = your_string[::-1]\n```\n\nAlternatively, you can use `reversed()` with `join()`:\n\n```python\nreversed_string = ''.join(reversed(your_string))\n```\n"
- role: user
content: What if I want to reverse each word in a sentence but keep their order?
model: meta-llama_llama-3_1-70b-instruct--1605565
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Cutting Knowledge Date: December 2023
Today Date: <redacted>
<|eot_id|><|start_header_id|>user<|end_header_id|>
How do I reverse a string in Python?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
You can reverse a string in Python using slicing:
```python
reversed_string = your_string[::-1]
```
Alternatively, you can use `reversed()` with `join()`:
```python
reversed_string = ''.join(reversed(your_string))
```<|eot_id|><|start_header_id|>user<|end_header_id|>
What if I want to reverse each word in a sentence but keep their order?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: system
content: You are a very helpful assistant!
- role: user
content: How do I reverse a string in Python?
- role: assistant
content: "You can reverse a "
model: meta-llama_llama-3_1-70b-instruct--1605565
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Cutting Knowledge Date: December 2023
Today Date: <redacted>
You are a very helpful assistant!<|eot_id|><|start_header_id|>user<|end_header_id|>
How do I reverse a string in Python?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
You can reverse a<|eot_id|>
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: system
content: You are a very helpful assistant!
- role: user
content: How do I reverse a string in Python?
- role: assistant
content: "You can reverse a string in Python using slicing:\n\n```python\nreversed_string = your_string[::-1]\n```\n\nAlternatively, you can use `reversed()` with `join()`:\n\n```python\nreversed_string = ''.join(reversed(your_string))\n```\n"
- role: user
content: What if I want to reverse each word in a sentence but keep their order?
model: meta-llama_llama-3_1-70b-instruct--1605565
tools:
- type: function
function:
name: get_current_temperature
description: Get the current temperature for a specific location
parameters:
type: object
properties:
location:
type: string
description: "The city and state, e.g., San Francisco, CA"
unit:
type: string
enum:
- Celsius
- Fahrenheit
description: "The temperature unit to use. Infer this from the user's location."
required:
- location
- unit
- type: function
function:
name: get_rain_probability
description: Get the probability of rain for a specific location
parameters:
type: object
properties:
location:
type: string
description: "The city and state, e.g., San Francisco, CA"
required:
- location
tool_choice: auto
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Environment: ipython
Cutting Knowledge Date: December 2023
Today Date: <redacted>
You are a very helpful assistant!<|eot_id|><|start_header_id|>user<|end_header_id|>
Given the following functions, please respond with a JSON for a function call with its proper arguments that best answers the given prompt.
Respond in the format {"name": function name, "parameters": dictionary of argument name and its value}.Do not use variables.
{
"function": {
"description": "Get the current temperature for a specific location",
"name": "get_current_temperature",
"parameters": {
"properties": {
"location": {
"description": "The city and state, e.g., San Francisco, CA",
"type": "string"
},
"unit": {
"description": "The temperature unit to use. Infer this from the user\u0027s location.",
"enum": [
"Celsius",
"Fahrenheit"
],
"type": "string"
}
},
"required": [
"location",
"unit"
],
"type": "object"
}
},
"type": "function"
}
{
"function": {
"description": "Get the probability of rain for a specific location",
"name": "get_rain_probability",
"parameters": {
"properties": {
"location": {
"description": "The city and state, e.g., San Francisco, CA",
"type": "string"
}
},
"required": [
"location"
],
"type": "object"
}
},
"type": "function"
}
How do I reverse a string in Python?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
You can reverse a string in Python using slicing:
```python
reversed_string = your_string[::-1]
```
Alternatively, you can use `reversed()` with `join()`:
```python
reversed_string = ''.join(reversed(your_string))
```<|eot_id|><|start_header_id|>user<|end_header_id|>
What if I want to reverse each word in a sentence but keep their order?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: user
content: What is deep learning?
model: meta-llama_llama-3_1-70b-instruct--1605565
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Cutting Knowledge Date: December 2023
Today Date: <redacted>
<|eot_id|><|start_header_id|>user<|end_header_id|>
What is deep learning?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
---
source: lib/llm/tests/preprocessor.rs
expression: formatted_prompt
info:
messages:
- role: user
content: What is deep learning?
model: meta-llama_llama-3_1-70b-instruct--1605565
tools:
- type: function
function:
name: get_current_temperature
description: Get the current temperature for a specific location
parameters:
type: object
properties:
location:
type: string
description: "The city and state, e.g., San Francisco, CA"
unit:
type: string
enum:
- Celsius
- Fahrenheit
description: "The temperature unit to use. Infer this from the user's location."
required:
- location
- unit
- type: function
function:
name: get_rain_probability
description: Get the probability of rain for a specific location
parameters:
type: object
properties:
location:
type: string
description: "The city and state, e.g., San Francisco, CA"
required:
- location
tool_choice: auto
---
<|begin_of_text|><|start_header_id|>system<|end_header_id|>
Environment: ipython
Cutting Knowledge Date: December 2023
Today Date: <redacted>
<|eot_id|><|start_header_id|>user<|end_header_id|>
Given the following functions, please respond with a JSON for a function call with its proper arguments that best answers the given prompt.
Respond in the format {"name": function name, "parameters": dictionary of argument name and its value}.Do not use variables.
{
"function": {
"description": "Get the current temperature for a specific location",
"name": "get_current_temperature",
"parameters": {
"properties": {
"location": {
"description": "The city and state, e.g., San Francisco, CA",
"type": "string"
},
"unit": {
"description": "The temperature unit to use. Infer this from the user\u0027s location.",
"enum": [
"Celsius",
"Fahrenheit"
],
"type": "string"
}
},
"required": [
"location",
"unit"
],
"type": "object"
}
},
"type": "function"
}
{
"function": {
"description": "Get the probability of rain for a specific location",
"name": "get_rain_probability",
"parameters": {
"properties": {
"location": {
"description": "The city and state, e.g., San Francisco, CA",
"type": "string"
}
},
"required": [
"location"
],
"type": "object"
}
},
"type": "function"
}
What is deep learning?<|eot_id|><|start_header_id|>assistant<|end_header_id|>
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment