Skip to content

Commit e064b8d

Browse files
committed
ops: Classify std::io::ErrorKind::BrokenPipe as a retryable transient error
Signed-off-by: Joshua Potts <[email protected]>
1 parent f7ae3e6 commit e064b8d

File tree

2 files changed

+61
-48
lines changed

2 files changed

+61
-48
lines changed

src/execute.rs

+4-12
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,7 @@ use tonic::transport::Channel;
4141
use triomphe::Arc;
4242

4343
use crate::client::NetworkData;
44-
use crate::execute::error::{
45-
is_hyper_canceled,
46-
is_tonic_status_transient,
47-
};
44+
use crate::execute::error::is_tonic_status_transient;
4845
use crate::ping_query::PingQuery;
4946
use crate::{
5047
client,
@@ -334,13 +331,6 @@ fn map_tonic_error(
334331
retry::Error::Transient(status.into())
335332
}
336333

337-
// if the proxy cancels the request (IE it's `Unavailable`/`ResourceExausted`) treat it like a transient error.
338-
tonic::Code::Unknown if is_hyper_canceled(&status) => {
339-
network.mark_node_unhealthy(node_index);
340-
341-
retry::Error::Transient(status.into())
342-
}
343-
344334
// todo: find a way to make this less fragile
345335
// hack:
346336
// if this happens:
@@ -360,7 +350,9 @@ fn map_tonic_error(
360350
}
361351
}
362352

363-
tonic::Code::Internal if is_tonic_status_transient(&status) => {
353+
_ if is_tonic_status_transient(&status) => {
354+
network.mark_node_unhealthy(node_index);
355+
364356
retry::Error::Transient(status.into())
365357
}
366358

src/execute/error.rs

+57-36
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,56 @@ use std::error::Error;
2121

2222
use serde::de::StdError;
2323

24-
/// Punches through all the layers of `tonic::Status` sources to check if this is a `hyper::Error` that is canceled.
25-
pub(super) fn is_hyper_canceled(status: &tonic::Status) -> bool {
24+
fn has_transient_io_error<E: StdError>(error: E) -> bool {
25+
let Some(source) = error.source() else {
26+
return false;
27+
};
28+
29+
if let Some(io_error) = source.downcast_ref::<std::io::Error>() {
30+
is_io_error_transient(io_error)
31+
} else {
32+
false
33+
}
34+
}
35+
36+
// tonic 0.11 (current dependency)
37+
fn is_hyper_0_error_transient(error: &hyper_0::Error) -> bool {
38+
if error.is_canceled() || has_transient_io_error(error) {
39+
true
40+
} else if let Some(source) = error.source() {
41+
if let Some(h2_error) = source.downcast_ref::<h2_03::Error>() {
42+
h2_error.is_go_away()
43+
} else {
44+
false
45+
}
46+
} else {
47+
false
48+
}
49+
}
50+
51+
// tonic 0.12
52+
fn is_hyper_error_transient(error: &hyper::Error) -> bool {
53+
if error.is_canceled() || has_transient_io_error(error) {
54+
true
55+
} else if let Some(source) = error.source() {
56+
if let Some(h2_error) = source.downcast_ref::<h2::Error>() {
57+
h2_error.is_go_away()
58+
} else {
59+
false
60+
}
61+
} else {
62+
false
63+
}
64+
}
65+
66+
fn is_io_error_transient(error: &std::io::Error) -> bool {
67+
match error.kind() {
68+
std::io::ErrorKind::BrokenPipe => true,
69+
_ => false,
70+
}
71+
}
72+
73+
pub(super) fn is_tonic_status_transient(status: &tonic::Status) -> bool {
2674
let source = status
2775
.source()
2876
.and_then(|it| it.downcast_ref::<tonic::transport::Error>())
@@ -33,11 +81,9 @@ pub(super) fn is_hyper_canceled(status: &tonic::Status) -> bool {
3381
};
3482

3583
if let Some(hyper_0) = source.downcast_ref::<hyper_0::Error>() {
36-
// tonic 0.11 (current dependency)
37-
hyper_0.is_canceled()
38-
} else if let Some(hyper_1) = source.downcast_ref::<hyper::Error>() {
39-
// tonic 0.12
40-
hyper_1.is_canceled()
84+
is_hyper_0_error_transient(hyper_0)
85+
} else if let Some(hyper) = source.downcast_ref::<hyper::Error>() {
86+
is_hyper_error_transient(hyper)
4187
} else {
4288
false
4389
}
@@ -48,47 +94,22 @@ pub(super) fn is_hyper_canceled(status: &tonic::Status) -> bool {
4894
/// Because hyper does not expose constructors for its error variants, there is no
4995
/// reasonable way to construct a test for positive detection of a hyper cancellation.
5096
#[cfg(test)]
51-
mod test_is_hyper_canceled {
97+
mod test_is_tonic_status_transient {
5298
use tonic::Code;
5399

54-
use super::is_hyper_canceled;
100+
use super::is_tonic_status_transient;
55101

56102
#[test]
57103
fn ignores_tonic_abort() {
58104
let input = tonic::Status::new(Code::Aborted, "foo");
59105

60-
assert!(!is_hyper_canceled(&input));
106+
assert!(!is_tonic_status_transient(&input));
61107
}
62108

63109
#[test]
64110
fn ignores_tonic_cancel() {
65111
let input = tonic::Status::new(Code::Cancelled, "foo");
66112

67-
assert!(!is_hyper_canceled(&input));
68-
}
69-
}
70-
71-
pub(super) fn is_tonic_status_transient(status: &tonic::Status) -> bool {
72-
let source = status
73-
.source()
74-
.and_then(|it| it.downcast_ref::<tonic::transport::Error>())
75-
.and_then(StdError::source);
76-
77-
let Some(source) = source else {
78-
return false;
79-
};
80-
81-
if let Some(hyper_0) = source.downcast_ref::<hyper_0::Error>() {
82-
// tonic 0.11 (current dependency)
83-
let source: Option<&h2_03::Error> = hyper_0.source().and_then(|s| s.downcast_ref());
84-
85-
source.map(|s| s.is_go_away()).unwrap_or_default()
86-
} else if let Some(hyper_1) = source.downcast_ref::<hyper::Error>() {
87-
// tonic 0.12
88-
let source: Option<&h2::Error> = hyper_1.source().and_then(|s| s.downcast_ref());
89-
90-
source.map(|s| s.is_go_away()).unwrap_or_default()
91-
} else {
92-
false
113+
assert!(!is_tonic_status_transient(&input));
93114
}
94115
}

0 commit comments

Comments
 (0)