Skip to content

Commit 6e399de

Browse files
chore(pool): use spawn macro for non-crucial tasks
1 parent 058b315 commit 6e399de

File tree

4 files changed

+283
-240
lines changed

4 files changed

+283
-240
lines changed

core/pool/src/connection.rs

+9-9
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@ use anyhow::Result;
44
use bytes::{Buf, Bytes};
55
use futures::{SinkExt, StreamExt};
66
use lightning_interfaces::types::NodeIndex;
7-
use lightning_interfaces::{RequestHeader, ServiceScope};
7+
use lightning_interfaces::{spawn, RequestHeader, ServiceScope};
88
use tokio::io::{AsyncReadExt, AsyncWriteExt};
99
use tokio::sync::mpsc::{Receiver, Sender};
1010
use tokio::sync::oneshot;
@@ -57,7 +57,7 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
5757
};
5858
let connection_event_tx = ctx.connection_event_tx.clone();
5959
let peer = ctx.peer;
60-
tokio::spawn(async move {
60+
spawn!(async move {
6161
if let Err(e) =
6262
handle_incoming_bi_stream::<C>(
6363
peer,
@@ -69,7 +69,7 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
6969
"failed to handle incoming bi-stream with peer with index {peer}: {e:?}"
7070
);
7171
}
72-
});
72+
}, "POOL: handle incoming bi stream");
7373
}
7474
accept_result = connection.accept_uni_stream() => {
7575
let stream_rx = match accept_result {
@@ -80,7 +80,7 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
8080
};
8181
let connection_event_tx = ctx.connection_event_tx.clone();
8282
let peer = ctx.peer;
83-
tokio::spawn(async move {
83+
spawn!(async move {
8484
if let Err(e) =
8585
handle_incoming_uni_stream::<C>(
8686
peer,
@@ -92,7 +92,7 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
9292
"failed to handle incoming uni-stream from peer with index {peer}: {e:?}"
9393
);
9494
}
95-
});
95+
}, "POOL: handle incoming uni stream");
9696
}
9797
request = ctx.service_request_rx.recv() => {
9898
match request {
@@ -101,20 +101,20 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
101101
// We need to create a new stream on the connection.
102102
let connection = ctx.connection.clone();
103103
let peer = ctx.peer;
104-
tokio::spawn(async move{
104+
spawn!(async move{
105105
if let Err(e) = send_message(connection, message).await {
106106
tracing::error!(
107107
"failed to send message to peer with index {peer}: {e:?}"
108108
);
109109
}
110-
});
110+
}, "POOL: send message");
111111
},
112112
Some(Request::SendReqResp { service, request, respond }) => {
113113
tracing::trace!("handling new outgoing request");
114114
// We need to create a new stream on the connection for the channel.
115115
let connection = ctx.connection.clone();
116116
let peer = ctx.peer;
117-
tokio::spawn(async move {
117+
spawn!(async move {
118118
if let Err(e) = send_request(
119119
connection,
120120
service,
@@ -125,7 +125,7 @@ pub async fn connection_loop<C: ConnectionInterface>(mut ctx: Context<C>) -> Res
125125
"there was an error when sending request to {peer}: {e:?}"
126126
);
127127
}
128-
});
128+
}, "POOL: send request");
129129
}
130130
Some(Request::Stats { respond }) => {
131131
tracing::debug!("handling new stats request");

0 commit comments

Comments
 (0)