Skip to content
GitLab
Menu
Projects
Groups
Snippets
Loading...
Help
Help
Support
Community forum
Keyboard shortcuts
?
Submit feedback
Contribute to GitLab
Sign in
Toggle navigation
Menu
Open sidebar
OpenDAS
dynamo
Commits
0c66b2d2
Unverified
Commit
0c66b2d2
authored
Nov 07, 2025
by
Yan Ru Pei
Committed by
GitHub
Nov 07, 2025
Browse files
chore: better error logging for "failed to join reader and writer tasks" #3910 (#3913)
Signed-off-by:
PeaBrane
<
yanrpei@gmail.com
>
parent
f5094935
Changes
1
Hide whitespace changes
Inline
Side-by-side
Showing
1 changed file
with
28 additions
and
6 deletions
+28
-6
lib/runtime/src/pipeline/network/tcp/client.rs
lib/runtime/src/pipeline/network/tcp/client.rs
+28
-6
No files found.
lib/runtime/src/pipeline/network/tcp/client.rs
View file @
0c66b2d2
...
@@ -84,6 +84,7 @@ impl TcpClient {
...
@@ -84,6 +84,7 @@ impl TcpClient {
}
}
let
stream
=
TcpClient
::
connect
(
&
info
.address
)
.await
?
;
let
stream
=
TcpClient
::
connect
(
&
info
.address
)
.await
?
;
let
peer_port
=
stream
.peer_addr
()
.ok
()
.map
(|
addr
|
addr
.port
());
let
(
read_half
,
write_half
)
=
tokio
::
io
::
split
(
stream
);
let
(
read_half
,
write_half
)
=
tokio
::
io
::
split
(
stream
);
let
framed_reader
=
FramedRead
::
new
(
read_half
,
TwoPartCodec
::
default
());
let
framed_reader
=
FramedRead
::
new
(
read_half
,
TwoPartCodec
::
default
());
...
@@ -100,7 +101,7 @@ impl TcpClient {
...
@@ -100,7 +101,7 @@ impl TcpClient {
// transport specific handshake message
// transport specific handshake message
let
handshake
=
CallHomeHandshake
{
let
handshake
=
CallHomeHandshake
{
subject
:
info
.subject
,
subject
:
info
.subject
.clone
()
,
stream_type
:
StreamType
::
Response
,
stream_type
:
StreamType
::
Response
,
};
};
...
@@ -127,6 +128,7 @@ impl TcpClient {
...
@@ -127,6 +128,7 @@ impl TcpClient {
let
writer_task
=
tokio
::
spawn
(
handle_writer
(
framed_writer
,
bytes_rx
,
alive_rx
,
context
));
let
writer_task
=
tokio
::
spawn
(
handle_writer
(
framed_writer
,
bytes_rx
,
alive_rx
,
context
));
let
subject
=
info
.subject
.clone
();
tokio
::
spawn
(
async
move
{
tokio
::
spawn
(
async
move
{
// await both tasks
// await both tasks
let
(
reader
,
writer
)
=
tokio
::
join!
(
reader_task
,
writer_task
);
let
(
reader
,
writer
)
=
tokio
::
join!
(
reader_task
,
writer_task
);
...
@@ -166,9 +168,29 @@ impl TcpClient {
...
@@ -166,9 +168,29 @@ impl TcpClient {
Ok
(())
Ok
(())
}
}
_
=>
{
(
Err
(
reader_err
),
Ok
(
_
))
=>
{
tracing
::
error!
(
"failed to join reader and writer tasks"
);
tracing
::
error!
(
anyhow
::
bail!
(
"failed to join reader and writer tasks"
);
"reader task failed to join (peer_port: {peer_port:?}, subject: {subject}): {reader_err:?}"
);
anyhow
::
bail!
(
"reader task failed to join (peer_port: {peer_port:?}, subject: {subject}): {reader_err:?}"
);
}
(
Ok
(
_
),
Err
(
writer_err
))
=>
{
tracing
::
error!
(
"writer task failed to join (peer_port: {peer_port:?}, subject: {subject}): {writer_err:?}"
);
anyhow
::
bail!
(
"writer task failed to join (peer_port: {peer_port:?}, subject: {subject}): {writer_err:?}"
);
}
(
Err
(
reader_err
),
Err
(
writer_err
))
=>
{
tracing
::
error!
(
"both reader and writer tasks failed to join (peer_port: {peer_port:?}, subject: {subject}) - reader: {reader_err:?}, writer: {writer_err:?}"
);
anyhow
::
bail!
(
"both reader and writer tasks failed to join (peer_port: {peer_port:?}, subject: {subject}) - reader: {reader_err:?}, writer: {writer_err:?}"
);
}
}
}
}
});
});
...
@@ -227,10 +249,10 @@ async fn handle_reader(
...
@@ -227,10 +249,10 @@ async fn handle_reader(
}
}
}
}
}
}
Some
(
Err
(
_
))
=>
{
Some
(
Err
(
e
))
=>
{
// TODO(#171) - address fatal errors
// TODO(#171) - address fatal errors
// in this case the binary representation of the message is invalid
// in this case the binary representation of the message is invalid
panic!
(
"fatal error - failed to decode message from stream; invalid line protocol"
);
panic!
(
"fatal error - failed to decode message from stream; invalid line protocol
: {e:?}
"
);
}
}
None
=>
{
None
=>
{
tracing
::
debug!
(
"tcp stream closed by server"
);
tracing
::
debug!
(
"tcp stream closed by server"
);
...
...
Write
Preview
Markdown
is supported
0%
Try again
or
attach a new file
.
Attach a file
Cancel
You are about to add
0
people
to the discussion. Proceed with caution.
Finish editing this message first!
Cancel
Please
register
or
sign in
to comment