Unverified Commit 1a8706c8 authored by Simo Lin's avatar Simo Lin Committed by GitHub
Browse files

[router] reduce contention, fix double-count race (#8978)

parent 7d3af603
...@@ -152,12 +152,11 @@ impl Tree { ...@@ -152,12 +152,11 @@ impl Tree {
parent: RwLock::new(Some(Arc::clone(&curr))), parent: RwLock::new(Some(Arc::clone(&curr))),
}); });
// Increment char count when creating new node with tenant // Attach tenant to the new node (map is empty here) and increment count once
self.tenant_char_count self.tenant_char_count
.entry(tenant.to_string()) .entry(tenant.to_string())
.and_modify(|count| *count += curr_text_count) .and_modify(|count| *count += curr_text_count)
.or_insert(curr_text_count); .or_insert(curr_text_count);
new_node new_node
.tenant_last_access_time .tenant_last_access_time
.insert(tenant.to_string(), timestamp_ms); .insert(tenant.to_string(), timestamp_ms);
...@@ -213,32 +212,38 @@ impl Tree { ...@@ -213,32 +212,38 @@ impl Tree {
prev = Arc::clone(&new_node); prev = Arc::clone(&new_node);
// Increment char count for the tenant in the new split node // Atomically attach tenant to the new split node and increment count once
if !prev.tenant_last_access_time.contains_key(tenant) { match prev.tenant_last_access_time.entry(tenant.to_string()) {
self.tenant_char_count Entry::Vacant(v) => {
.entry(tenant.to_string()) self.tenant_char_count
.and_modify(|count| *count += matched_text_count) .entry(tenant.to_string())
.or_insert(matched_text_count); .and_modify(|count| *count += matched_text_count)
.or_insert(matched_text_count);
v.insert(timestamp_ms);
}
Entry::Occupied(mut o) => {
o.insert(timestamp_ms);
}
} }
prev.tenant_last_access_time
.insert(tenant.to_string(), timestamp_ms);
curr_idx += shared_count; curr_idx += shared_count;
} else { } else {
// move to next node // move to next node
prev = Arc::clone(&matched_node); prev = Arc::clone(&matched_node);
// Increment char count when adding tenant to existing node // Atomically attach tenant to existing node and increment count once
if !prev.tenant_last_access_time.contains_key(tenant) { match prev.tenant_last_access_time.entry(tenant.to_string()) {
self.tenant_char_count Entry::Vacant(v) => {
.entry(tenant.to_string()) self.tenant_char_count
.and_modify(|count| *count += matched_node_text_count) .entry(tenant.to_string())
.or_insert(matched_node_text_count); .and_modify(|count| *count += matched_node_text_count)
.or_insert(matched_node_text_count);
v.insert(timestamp_ms);
}
Entry::Occupied(mut o) => {
o.insert(timestamp_ms);
}
} }
prev.tenant_last_access_time
.insert(tenant.to_string(), timestamp_ms);
curr_idx += shared_count; curr_idx += shared_count;
} }
} }
...@@ -260,29 +265,26 @@ impl Tree { ...@@ -260,29 +265,26 @@ impl Tree {
curr = prev.clone(); curr = prev.clone();
match curr.children.entry(first_char) { if let Some(entry) = curr.children.get(&first_char) {
Entry::Occupied(entry) => { let matched_node = entry.value().clone();
let matched_node = entry.get().clone(); let matched_text_guard = matched_node.text.read().unwrap();
let shared_count = let shared_count = shared_prefix_count(&matched_text_guard, &curr_text);
shared_prefix_count(&matched_node.text.read().unwrap(), &curr_text); let matched_node_text_count = matched_text_guard.chars().count();
drop(matched_text_guard);
let matched_node_text_count = matched_node.text.read().unwrap().chars().count();
if shared_count == matched_node_text_count {
if shared_count == matched_node_text_count { // Full match with current node's text, continue to next node
// Full match with current node's text, continue to next node curr_idx += shared_count;
curr_idx += shared_count; prev = Arc::clone(&matched_node);
prev = Arc::clone(&matched_node); } else {
} else { // Partial match, stop here
// Partial match, stop here curr_idx += shared_count;
curr_idx += shared_count; prev = Arc::clone(&matched_node);
prev = Arc::clone(&matched_node);
break;
}
}
Entry::Vacant(_) => {
// No match found, stop here
break; break;
} }
} else {
// No match found, stop here
break;
} }
} }
...@@ -330,35 +332,32 @@ impl Tree { ...@@ -330,35 +332,32 @@ impl Tree {
curr = prev.clone(); curr = prev.clone();
match curr.children.entry(first_char) { if let Some(entry) = curr.children.get(&first_char) {
Entry::Occupied(entry) => { let matched_node = entry.value().clone();
let matched_node = entry.get().clone();
// Only continue matching if this node belongs to the specified tenant // Only continue matching if this node belongs to the specified tenant
if !matched_node.tenant_last_access_time.contains_key(tenant) { if !matched_node.tenant_last_access_time.contains_key(tenant) {
break; break;
}
let shared_count =
shared_prefix_count(&matched_node.text.read().unwrap(), &curr_text);
let matched_node_text_count = matched_node.text.read().unwrap().chars().count();
if shared_count == matched_node_text_count {
// Full match with current node's text, continue to next node
curr_idx += shared_count;
prev = Arc::clone(&matched_node);
} else {
// Partial match, stop here
curr_idx += shared_count;
prev = Arc::clone(&matched_node);
break;
}
} }
Entry::Vacant(_) => {
// No match found, stop here let matched_text_guard = matched_node.text.read().unwrap();
let shared_count = shared_prefix_count(&matched_text_guard, &curr_text);
let matched_node_text_count = matched_text_guard.chars().count();
drop(matched_text_guard);
if shared_count == matched_node_text_count {
// Full match with current node's text, continue to next node
curr_idx += shared_count;
prev = Arc::clone(&matched_node);
} else {
// Partial match, stop here
curr_idx += shared_count;
prev = Arc::clone(&matched_node);
break; break;
} }
} else {
// No match found, stop here
break;
} }
} }
...@@ -444,12 +443,11 @@ impl Tree { ...@@ -444,12 +443,11 @@ impl Tree {
// Decrement when removing tenant from node // Decrement when removing tenant from node
if node.tenant_last_access_time.contains_key(&tenant) { if node.tenant_last_access_time.contains_key(&tenant) {
let node_len = node.text.read().unwrap().chars().count();
self.tenant_char_count self.tenant_char_count
.entry(tenant.clone()) .entry(tenant.clone())
.and_modify(|count| { .and_modify(|count| {
if *count > 0 { *count = count.saturating_sub(node_len);
*count -= node.text.read().unwrap().chars().count();
}
}); });
} }
...@@ -458,9 +456,11 @@ impl Tree { ...@@ -458,9 +456,11 @@ impl Tree {
// Remove empty nodes // Remove empty nodes
if node.children.is_empty() && node.tenant_last_access_time.is_empty() { if node.children.is_empty() && node.tenant_last_access_time.is_empty() {
if let Some(parent) = node.parent.write().unwrap().as_ref() { if let Some(parent) = node.parent.read().unwrap().as_ref() {
let first_char = node.text.read().unwrap().chars().next().unwrap(); let text_guard = node.text.read().unwrap();
parent.children.remove(&first_char); if let Some(first_char) = text_guard.chars().next() {
parent.children.remove(&first_char);
}
} }
} }
...@@ -507,8 +507,10 @@ impl Tree { ...@@ -507,8 +507,10 @@ impl Tree {
// remove empty nodes // remove empty nodes
if curr.children.is_empty() && curr.tenant_last_access_time.is_empty() { if curr.children.is_empty() && curr.tenant_last_access_time.is_empty() {
if let Some(parent) = curr.parent.read().unwrap().as_ref() { if let Some(parent) = curr.parent.read().unwrap().as_ref() {
let first_char = curr.text.read().unwrap().chars().next().unwrap(); let text_guard = curr.text.read().unwrap();
parent.children.remove(&first_char); if let Some(first_char) = text_guard.chars().next() {
parent.children.remove(&first_char);
}
} }
} }
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment