Unverified Commit b9640e5c authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

feat: allow specifying consumer name for NATS queue + manually purge old messages (#2740)

parent 73f60feb
...@@ -443,6 +443,8 @@ pub struct NatsQueue { ...@@ -443,6 +443,8 @@ pub struct NatsQueue {
subject: String, subject: String,
/// The subscriber for pull-based consumption /// The subscriber for pull-based consumption
subscriber: Option<jetstream::consumer::PullConsumer>, subscriber: Option<jetstream::consumer::PullConsumer>,
/// Optional consumer name for broadcast pattern (if None, uses "worker-group")
consumer_name: Option<String>,
} }
impl NatsQueue { impl NatsQueue {
...@@ -460,6 +462,29 @@ impl NatsQueue { ...@@ -460,6 +462,29 @@ impl NatsQueue {
client: None, client: None,
subject, subject,
subscriber: None, subscriber: None,
consumer_name: None,
}
}
/// Create a new NatsQueue with a specific consumer name for broadcast pattern
/// Each consumer with a unique name will receive all messages independently
pub fn new_with_consumer(
stream_name: String,
nats_server: String,
dequeue_timeout: time::Duration,
consumer_name: String,
) -> Self {
let sanitized_stream_name = stream_name.replace(['/', '\\'], "_");
let subject = format!("{}.*", sanitized_stream_name);
Self {
stream_name: sanitized_stream_name,
nats_server,
dequeue_timeout,
client: None,
subject,
subscriber: None,
consumer_name: Some(consumer_name),
} }
} }
...@@ -486,7 +511,11 @@ impl NatsQueue { ...@@ -486,7 +511,11 @@ impl NatsQueue {
// Create persistent subscriber // Create persistent subscriber
let consumer_config = jetstream::consumer::pull::Config { let consumer_config = jetstream::consumer::pull::Config {
durable_name: Some("worker-group".to_string()), durable_name: Some(
self.consumer_name
.clone()
.unwrap_or_else(|| "worker-group".to_string()),
),
..Default::default() ..Default::default()
}; };
...@@ -515,6 +544,45 @@ impl NatsQueue { ...@@ -515,6 +544,45 @@ impl NatsQueue {
Ok(()) Ok(())
} }
/// Shutdown the consumer by deleting it from the stream and closing the connection
/// This permanently removes the consumer from the server
pub async fn shutdown(&mut self) -> Result<()> {
if let (Some(client), Some(consumer_name)) = (&self.client, &self.consumer_name) {
// Get the stream and delete the consumer
let stream = client.jetstream().get_stream(&self.stream_name).await?;
stream.delete_consumer(consumer_name).await.map_err(|e| {
anyhow::anyhow!("Failed to delete consumer {}: {}", consumer_name, e)
})?;
log::debug!(
"Deleted consumer {} from stream {}",
consumer_name,
self.stream_name
);
} else {
log::warn!(
"Cannot shutdown consumer: client or consumer_name is None (client: {:?}, consumer_name: {:?})",
self.client.is_some(),
self.consumer_name.is_some()
);
}
// Then close the connection
self.close().await
}
/// Count the number of consumers for the stream
pub async fn count_consumers(&mut self) -> Result<usize> {
self.ensure_connection().await?;
if let Some(client) = &self.client {
let mut stream = client.jetstream().get_stream(&self.stream_name).await?;
let info = stream.info().await?;
Ok(info.state.consumer_count)
} else {
Err(anyhow::anyhow!("Client not connected"))
}
}
/// Enqueue a task using the provided data /// Enqueue a task using the provided data
pub async fn enqueue_task(&mut self, task_data: Bytes) -> Result<()> { pub async fn enqueue_task(&mut self, task_data: Bytes) -> Result<()> {
self.ensure_connection().await?; self.ensure_connection().await?;
...@@ -564,8 +632,12 @@ impl NatsQueue { ...@@ -564,8 +632,12 @@ impl NatsQueue {
if let Some(client) = &self.client { if let Some(client) = &self.client {
// Get consumer info to get pending messages count // Get consumer info to get pending messages count
let stream = client.jetstream().get_stream(&self.stream_name).await?; let stream = client.jetstream().get_stream(&self.stream_name).await?;
let consumer_name = self
.consumer_name
.clone()
.unwrap_or_else(|| "worker-group".to_string());
let mut consumer: jetstream::consumer::PullConsumer = stream let mut consumer: jetstream::consumer::PullConsumer = stream
.get_consumer("worker-group") .get_consumer(&consumer_name)
.await .await
.map_err(|e| anyhow::anyhow!("Failed to get consumer: {}", e))?; .map_err(|e| anyhow::anyhow!("Failed to get consumer: {}", e))?;
let info = consumer.info().await?; let info = consumer.info().await?;
...@@ -575,6 +647,102 @@ impl NatsQueue { ...@@ -575,6 +647,102 @@ impl NatsQueue {
Err(anyhow::anyhow!("Client not connected")) Err(anyhow::anyhow!("Client not connected"))
} }
} }
/// Purge messages from the stream up to (but not including) the specified sequence number
/// This permanently removes messages and affects all consumers of the stream
pub async fn purge_up_to_sequence(&self, sequence: u64) -> Result<()> {
if let Some(client) = &self.client {
let stream = client.jetstream().get_stream(&self.stream_name).await?;
// NOTE: this purge excludes the sequence itself
// https://docs.rs/nats/latest/nats/jetstream/struct.PurgeRequest.html
stream.purge().sequence(sequence).await.map_err(|e| {
anyhow::anyhow!("Failed to purge stream up to sequence {}: {}", sequence, e)
})?;
log::debug!(
"Purged stream {} up to sequence {}",
self.stream_name,
sequence
);
Ok(())
} else {
Err(anyhow::anyhow!("Client not connected"))
}
}
/// Purge messages from the stream up to the minimum acknowledged sequence across all consumers
/// This finds the lowest acknowledged sequence number across all consumers and purges up to that point
pub async fn purge_acknowledged(&mut self) -> Result<()> {
self.ensure_connection().await?;
let Some(client) = &self.client else {
return Err(anyhow::anyhow!("Client not connected"));
};
let stream = client.jetstream().get_stream(&self.stream_name).await?;
// Get all consumer names for the stream
let consumer_names: Vec<String> = stream
.consumer_names()
.try_collect()
.await
.map_err(|e| anyhow::anyhow!("Failed to list consumers: {}", e))?;
if consumer_names.is_empty() {
log::debug!("No consumers found for stream {}", self.stream_name);
return Ok(());
}
// Find the minimum acknowledged sequence across all consumers
let mut min_ack_sequence = u64::MAX;
for consumer_name in &consumer_names {
let mut consumer: jetstream::consumer::PullConsumer = stream
.get_consumer(consumer_name)
.await
.map_err(|e| anyhow::anyhow!("Failed to get consumer {}: {}", consumer_name, e))?;
let info = consumer.info().await.map_err(|e| {
anyhow::anyhow!("Failed to get consumer info for {}: {}", consumer_name, e)
})?;
// The ack_floor contains the stream sequence of the highest contiguously acknowledged message
// If stream_sequence is 0, it means no messages have been acknowledged yet
if info.ack_floor.stream_sequence > 0 {
min_ack_sequence = min_ack_sequence.min(info.ack_floor.stream_sequence);
log::debug!(
"Consumer {} has ack_floor at sequence {}",
consumer_name,
info.ack_floor.stream_sequence
);
}
}
// Only purge if we found a valid minimum acknowledged sequence
if min_ack_sequence < u64::MAX && min_ack_sequence > 0 {
// Purge up to (but not including) the minimum acknowledged sequence + 1
// We add 1 because we want to include the minimum acknowledged message in the purge
let purge_sequence = min_ack_sequence + 1;
self.purge_up_to_sequence(purge_sequence).await?;
log::info!(
"Purged stream {} up to acknowledged sequence {} (purged up to sequence {})",
self.stream_name,
min_ack_sequence,
purge_sequence
);
} else {
log::debug!(
"No messages to purge for stream {} (min_ack_sequence: {})",
self.stream_name,
min_ack_sequence
);
}
Ok(())
}
} }
/// Prometheus metrics that mirror the NATS client statistics (in primitive types) /// Prometheus metrics that mirror the NATS client statistics (in primitive types)
...@@ -786,4 +954,195 @@ mod tests { ...@@ -786,4 +954,195 @@ mod tests {
.await .await
.expect("Failed to delete bucket"); .expect("Failed to delete bucket");
} }
// Integration test for broadcast pattern with purging
#[tokio::test]
#[ignore]
async fn test_nats_queue_broadcast_with_purge() {
use uuid::Uuid;
// Create unique stream name for this test
let stream_name = format!("test-broadcast-{}", Uuid::new_v4());
let nats_server = "nats://localhost:4222".to_string();
let timeout = time::Duration::from_secs(0);
// Create two consumers with different names for the same stream
let consumer1_name = format!("consumer-{}", Uuid::new_v4());
let consumer2_name = format!("consumer-{}", Uuid::new_v4());
let mut queue1 = NatsQueue::new_with_consumer(
stream_name.clone(),
nats_server.clone(),
timeout,
consumer1_name,
);
let mut queue2 = NatsQueue::new_with_consumer(
stream_name.clone(),
nats_server.clone(),
timeout,
consumer2_name,
);
// Connect both queues (first one creates the stream, second one reuses it)
queue1.connect().await.expect("Failed to connect queue1");
queue2.connect().await.expect("Failed to connect queue2");
// Send 4 messages
let messages = vec![
Bytes::from("message1"),
Bytes::from("message2"),
Bytes::from("message3"),
Bytes::from("message4"),
];
for msg in &messages {
queue1
.enqueue_task(msg.clone())
.await
.expect("Failed to enqueue message");
}
// Give JetStream a moment to persist the messages
tokio::time::sleep(time::Duration::from_millis(100)).await;
// Get stream info to find the sequence numbers
// We need to know the sequence of message 2 to purge up to it
let client_options = Client::builder()
.server(nats_server.clone())
.build()
.expect("Failed to build client options");
let client = client_options
.connect()
.await
.expect("Failed to connect to NATS");
// Purge the first two messages (sequence 1 and 2)
// Note: JetStream sequences start at 1, and purge is exclusive of the sequence number
queue1
.purge_up_to_sequence(3)
.await
.expect("Failed to purge messages");
// Give JetStream a moment to process the purge
tokio::time::sleep(time::Duration::from_millis(100)).await;
// Consumer 1 dequeues one message (message3)
let msg3_consumer1 = queue1
.dequeue_task(Some(time::Duration::from_millis(500)))
.await
.expect("Failed to dequeue from queue1");
assert_eq!(
msg3_consumer1,
Some(messages[2].clone()),
"Consumer 1 should get message3"
);
// Give JetStream a moment to process acknowledgments
tokio::time::sleep(time::Duration::from_millis(100)).await;
// Now run purge_acknowledged
// At this point:
// - Consumer 1 has ack'd message 3 (ack_floor = 3)
// - Consumer 2 hasn't consumed anything yet (ack_floor = 0)
// - Min ack_floor = 0, so nothing will be purged
queue1
.purge_acknowledged()
.await
.expect("Failed to purge acknowledged messages");
// Give JetStream a moment to process the purge
tokio::time::sleep(time::Duration::from_millis(100)).await;
// Now collect remaining messages from both consumers
let mut consumer1_remaining = Vec::new();
let mut consumer2_remaining = Vec::new();
// Collect remaining messages from consumer 1
while let Some(msg) = queue1
.dequeue_task(None)
.await
.expect("Failed to dequeue from queue1")
{
consumer1_remaining.push(msg);
}
// Collect remaining messages from consumer 2
while let Some(msg) = queue2
.dequeue_task(None)
.await
.expect("Failed to dequeue from queue2")
{
consumer2_remaining.push(msg);
}
// Verify consumer 1 gets 1 remaining message (message4)
assert_eq!(
consumer1_remaining.len(),
1,
"Consumer 1 should have 1 remaining message"
);
assert_eq!(
consumer1_remaining[0], messages[3],
"Consumer 1 should get message4"
);
// Verify consumer 2 gets 2 messages (message3 and message4)
assert_eq!(
consumer2_remaining.len(),
2,
"Consumer 2 should have 2 messages"
);
assert_eq!(
consumer2_remaining[0], messages[2],
"Consumer 2 should get message3"
);
assert_eq!(
consumer2_remaining[1], messages[3],
"Consumer 2 should get message4"
);
// Test consumer count and shutdown behavior
// First verify via consumer 1 that there are two consumers
let consumer_count = queue1
.count_consumers()
.await
.expect("Failed to count consumers");
assert_eq!(consumer_count, 2, "Should have 2 consumers initially");
// Close consumer 1 and verify via consumer 2 that there are still two consumers
queue1.close().await.expect("Failed to close queue1");
let consumer_count = queue2
.count_consumers()
.await
.expect("Failed to count consumers");
assert_eq!(
consumer_count, 2,
"Should still have 2 consumers after closing queue1"
);
// Reconnect queue1 to be able to shutdown
queue1.connect().await.expect("Failed to reconnect queue1");
// Shutdown consumer 1 and verify via consumer 2 that there is only one consumer left
queue1.shutdown().await.expect("Failed to shutdown queue1");
let consumer_count = queue2
.count_consumers()
.await
.expect("Failed to count consumers");
assert_eq!(
consumer_count, 1,
"Should have only 1 consumer after shutting down queue1"
);
// Clean up by deleting the stream
client
.jetstream()
.delete_stream(&stream_name)
.await
.expect("Failed to delete test stream");
}
} }
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment