Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
2a2ad756
Unverified
Commit
2a2ad756
authored
Jan 25, 2026
by
Qi Wang
Committed by
GitHub
Jan 26, 2026
Browse files
test: unit test TCP client handle_reader [2/n] (#5056)
parent
4447c247
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
245 additions
and
0 deletions
+245
-0
lib/runtime/src/pipeline/network/tcp/client.rs
lib/runtime/src/pipeline/network/tcp/client.rs
+245
-0
No files found.
lib/runtime/src/pipeline/network/tcp/client.rs
View file @
2a2ad756
...
@@ -719,4 +719,249 @@ mod tests {
...
@@ -719,4 +719,249 @@ mod tests {
let
sentinel
=
recv_msg
(
&
mut
reader
)
.await
;
let
sentinel
=
recv_msg
(
&
mut
reader
)
.await
;
assert_sentinel_message
(
sentinel
);
assert_sentinel_message
(
sentinel
);
}
}
// ==================== handle_reader tests ====================
struct
ReaderHarness
{
framed_server
:
FramedWrite
<
tokio
::
io
::
WriteHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
framed_reader
:
FramedRead
<
tokio
::
io
::
ReadHalf
<
tokio
::
net
::
TcpStream
>
,
TwoPartCodec
>
,
alive_tx
:
oneshot
::
Sender
<
()
>
,
alive_rx
:
oneshot
::
Receiver
<
()
>
,
controller
:
Arc
<
Controller
>
,
}
/// Creates a reusable reader harness with paired TCP streams and test channels.
async
fn
reader_harness
()
->
ReaderHarness
{
let
(
client
,
server
)
=
create_tcp_pair
()
.await
;
let
(
read_half
,
_
write_half
)
=
tokio
::
io
::
split
(
client
);
let
(
_
server_read
,
server_write
)
=
tokio
::
io
::
split
(
server
);
let
framed_reader
=
FramedRead
::
new
(
read_half
,
TwoPartCodec
::
default
());
let
framed_server
=
FramedWrite
::
new
(
server_write
,
TwoPartCodec
::
default
());
let
(
alive_tx
,
alive_rx
)
=
oneshot
::
channel
::
<
()
>
();
let
controller
=
Arc
::
new
(
Controller
::
default
());
ReaderHarness
{
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
,
controller
,
}
}
fn
control_message
(
msg
:
&
ControlMessage
)
->
TwoPartMessage
{
let
msg_bytes
=
serde_json
::
to_vec
(
msg
)
.unwrap
();
TwoPartMessage
::
from_header
(
Bytes
::
from
(
msg_bytes
))
}
/// Test that handle_reader handles Stop control message by calling context.stop()
#[tokio::test]
async
fn
test_handle_reader_stop_control_message
()
{
let
ReaderHarness
{
mut
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
:
_
alive_rx
,
controller
,
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
controller_clone
=
controller
.clone
();
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller_clone
,
alive_tx
)
.await
},
);
// Send Stop control message from server
framed_server
.send
(
control_message
(
&
ControlMessage
::
Stop
))
.await
.unwrap
();
// Close the framed server to signal EOF to the client
framed_server
.close
()
.await
.unwrap
();
// Wait for reader to finish
let
_
=
reader_handle
.await
.unwrap
();
// Verify that stop was called on the controller
assert
!
(
controller
.is_stopped
(),
"Controller should be stopped after receiving Stop message"
);
}
/// Test that handle_reader handles Kill control message by calling context.kill()
#[tokio::test]
async
fn
test_handle_reader_kill_control_message
()
{
let
ReaderHarness
{
mut
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
:
_
alive_rx
,
controller
,
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
controller_clone
=
controller
.clone
();
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller_clone
,
alive_tx
)
.await
},
);
// Send Kill control message from server
framed_server
.send
(
control_message
(
&
ControlMessage
::
Kill
))
.await
.unwrap
();
// Close the framed server to signal EOF to the client
framed_server
.close
()
.await
.unwrap
();
// Wait for reader to finish
let
_
=
reader_handle
.await
.unwrap
();
// Verify that kill was called on the controller
assert
!
(
controller
.is_killed
(),
"Controller should be killed after receiving Kill message"
);
}
/// Test that handle_reader exits when alive channel is closed
#[tokio::test]
async
fn
test_handle_reader_exits_on_alive_channel_closed
()
{
let
ReaderHarness
{
framed_reader
,
alive_tx
,
alive_rx
,
controller
,
..
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller
,
alive_tx
)
.await
});
// Drop the alive_rx to close the channel (simulating writer finishing)
drop
(
alive_rx
);
// Reader should exit due to alive channel closure
let
result
=
reader_handle
.await
;
assert
!
(
result
.is_ok
(),
"handle_reader should exit when alive channel is closed"
);
}
/// Test that handle_reader exits when TCP stream is closed
#[tokio::test]
async
fn
test_handle_reader_exits_on_stream_closed
()
{
let
ReaderHarness
{
mut
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
:
_
alive_rx
,
controller
,
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller
,
alive_tx
)
.await
});
// Close the framed server to signal EOF to the client
framed_server
.close
()
.await
.unwrap
();
// Reader should exit due to stream closure
let
result
=
tokio
::
time
::
timeout
(
std
::
time
::
Duration
::
from_secs
(
1
),
reader_handle
)
.await
;
assert
!
(
result
.is_ok
(),
"handle_reader should exit when stream is closed"
);
}
/// Test that handle_reader handles multiple control messages in sequence
#[tokio::test]
async
fn
test_handle_reader_multiple_control_messages
()
{
let
ReaderHarness
{
mut
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
:
_
alive_rx
,
controller
,
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
controller_clone
=
controller
.clone
();
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller_clone
,
alive_tx
)
.await
},
);
// Send multiple Stop messages (first one will stop, subsequent ones are no-ops)
framed_server
.send
(
control_message
(
&
ControlMessage
::
Stop
))
.await
.unwrap
();
framed_server
.send
(
control_message
(
&
ControlMessage
::
Stop
))
.await
.unwrap
();
// Close the framed server to signal EOF to the client
framed_server
.close
()
.await
.unwrap
();
// Wait for reader to finish
let
_
=
reader_handle
.await
.unwrap
();
// Verify that stop was called
assert
!
(
controller
.is_stopped
(),
"Controller should be stopped after receiving Stop messages"
);
}
/// Test handle_reader with Stop followed by Kill
#[tokio::test]
async
fn
test_handle_reader_stop_then_kill
()
{
let
ReaderHarness
{
mut
framed_server
,
framed_reader
,
alive_tx
,
alive_rx
:
_
alive_rx
,
controller
,
}
=
reader_harness
()
.await
;
// Spawn the reader task
let
controller_clone
=
controller
.clone
();
let
reader_handle
=
tokio
::
spawn
(
async
move
{
handle_reader
(
framed_reader
,
controller_clone
,
alive_tx
)
.await
},
);
// Send Stop first, then Kill
framed_server
.send
(
control_message
(
&
ControlMessage
::
Stop
))
.await
.unwrap
();
framed_server
.send
(
control_message
(
&
ControlMessage
::
Kill
))
.await
.unwrap
();
// Close the framed server to signal EOF to the client
framed_server
.close
()
.await
.unwrap
();
// Wait for reader to finish
let
_
=
reader_handle
.await
.unwrap
();
// Verify that kill was called (which sets killed state)
assert
!
(
controller
.is_killed
(),
"Controller should be killed after receiving Kill message"
);
}
}
}
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment