Unverified Commit 76d96bb0 authored by Yan Ru Pei's avatar Yan Ru Pei Committed by GitHub
Browse files

fix(nats): NATs queue should use streaming API otherwise KV events drop (#3900)


Signed-off-by: default avatarPeaBrane <yanrpei@gmail.com>
parent 5a0d710b
......@@ -439,6 +439,8 @@ pub struct NatsQueue {
subscriber: Option<jetstream::consumer::PullConsumer>,
/// Optional consumer name for broadcast pattern (if None, uses "worker-group")
consumer_name: Option<String>,
/// Message stream for efficient message consumption
message_stream: Option<jetstream::consumer::pull::Stream>,
}
impl NatsQueue {
......@@ -457,6 +459,7 @@ impl NatsQueue {
subject,
subscriber: None,
consumer_name: Some("worker-group".to_string()),
message_stream: None,
}
}
......@@ -477,6 +480,7 @@ impl NatsQueue {
subject,
subscriber: None,
consumer_name: None,
message_stream: None,
}
}
......@@ -499,6 +503,7 @@ impl NatsQueue {
subject,
subscriber: None,
consumer_name: Some(consumer_name),
message_stream: None,
}
}
......@@ -562,7 +567,12 @@ impl NatsQueue {
};
let subscriber = stream.create_consumer(consumer_config).await?;
// Create the message stream for efficient consumption
let message_stream = subscriber.messages().await?;
self.subscriber = Some(subscriber);
self.message_stream = Some(message_stream);
}
self.client = Some(client);
......@@ -581,6 +591,7 @@ impl NatsQueue {
/// Close the connection when done
pub async fn close(&mut self) -> Result<()> {
self.message_stream = None;
self.subscriber = None;
self.client = None;
Ok(())
......@@ -677,28 +688,29 @@ impl NatsQueue {
pub async fn dequeue_task(&mut self, timeout: Option<time::Duration>) -> Result<Option<Bytes>> {
self.ensure_connection().await?;
if let Some(subscriber) = &self.subscriber {
let timeout_duration = timeout.unwrap_or(self.dequeue_timeout);
let mut batch = subscriber
.fetch()
.expires(timeout_duration)
.max_messages(1)
.messages()
.await?;
let Some(ref mut stream) = self.message_stream else {
return Err(anyhow::anyhow!("Message stream not initialized"));
};
let timeout_duration = timeout.unwrap_or(self.dequeue_timeout);
if let Some(message) = batch.next().await {
let message =
message.map_err(|e| anyhow::anyhow!("Failed to get message: {}", e))?;
message
.ack()
// Try to get next message from the stream with timeout
let message = tokio::time::timeout(timeout_duration, stream.next()).await;
match message {
Ok(Some(Ok(msg))) => {
msg.ack()
.await
.map_err(|e| anyhow::anyhow!("Failed to ack message: {}", e))?;
Ok(Some(message.payload.clone()))
} else {
Ok(None)
Ok(Some(msg.payload.clone()))
}
} else {
Err(anyhow::anyhow!("Subscriber not initialized"))
Ok(Some(Err(e))) => Err(anyhow::anyhow!("Failed to get message from stream: {}", e)),
Ok(None) => Err(anyhow::anyhow!("Message stream ended unexpectedly")),
// Timeout - no messages available
Err(_) => Ok(None),
}
}
......
Markdown is supported
0% or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment