Skip to content

Commit dc26565

Browse files
committed
boxed the receiver
1 parent ffd5758 commit dc26565

File tree

6 files changed

+27
-23
lines changed

6 files changed

+27
-23
lines changed

crates/fluvio-cli/src/client/consume/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -394,7 +394,7 @@ mod cmd {
394394
let mut stream = fluvio
395395
.consumer_with_config(consume_config)
396396
.await?
397-
.take_until(stop_signal.recv());
397+
.take_until(stop_signal.recv().boxed());
398398
self.consume_records_stream(&mut stream, tableformat)
399399
.await?;
400400

crates/fluvio-sc/src/controllers/mirroring/controller.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,7 @@ impl<C: MetadataItem> RemoteMirrorController<C> {
7171
if let Some((home, _)) = self.get_mirror_home_cluster().await {
7272
info!(home = %home.id, "connected to home cluster");
7373
let home_config = self.build_home_client(&home).await?;
74-
let mut stream = self.request_stream(&home, home_config).await?.boxed_local();
74+
let mut stream = self.request_stream(&home, home_config).await?.boxed();
7575

7676
info!("created request stream");
7777

crates/fluvio/src/admin.rs

+16-14
Original file line numberDiff line numberDiff line change
@@ -349,21 +349,23 @@ impl FluvioAdmin {
349349
debug!(api_version = req_msg.header.api_version(), obj = %S::LABEL, "create watch stream");
350350
let inner_socket = self.socket.new_socket();
351351
let stream = inner_socket.create_stream(req_msg, 10).await?;
352-
Ok(stream.map(|respons_result| match respons_result {
353-
Ok(response) => {
354-
let watch_response = response.downcast().map_err(|err| {
355-
IoError::new(ErrorKind::Other, format!("downcast error: {:#?}", err))
356-
})?;
357-
watch_response.ok_or(IoError::new(
352+
Ok(stream
353+
.map(|respons_result| match respons_result {
354+
Ok(response) => {
355+
let watch_response = response.downcast().map_err(|err| {
356+
IoError::new(ErrorKind::Other, format!("downcast error: {:#?}", err))
357+
})?;
358+
watch_response.ok_or(IoError::new(
359+
ErrorKind::Other,
360+
format!("cannot decoded as {s}", s = S::LABEL),
361+
))
362+
}
363+
Err(err) => Err(IoError::new(
358364
ErrorKind::Other,
359-
format!("cannot decoded as {s}", s = S::LABEL),
360-
))
361-
}
362-
Err(err) => Err(IoError::new(
363-
ErrorKind::Other,
364-
format!("socket error {err}"),
365-
)),
366-
}))
365+
format!("socket error {err}"),
366+
)),
367+
})
368+
.boxed())
367369
}
368370
}
369371

crates/fluvio/src/consumer/mod.rs

+5-4
Original file line numberDiff line numberDiff line change
@@ -384,10 +384,11 @@ where
384384
warn!("SPU does not support Offset Management API");
385385
}
386386

387-
let mut stream = StreamExt::boxed(self
388-
.pool
389-
.create_stream_with_version(&replica, stream_request, stream_fetch_version)
390-
.await?);
387+
let mut stream = StreamExt::boxed(
388+
self.pool
389+
.create_stream_with_version(&replica, stream_request, stream_fetch_version)
390+
.await?,
391+
);
391392

392393
let (server_sender, server_recv) =
393394
async_channel::bounded::<StreamToServer>(STREAM_TO_SERVER_CHANNEL_SIZE);

crates/fluvio/src/consumer/stream.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,8 @@ impl<T: Stream<Item = Result<Record, ErrorCode>> + Unpin> Stream
110110
}
111111
}
112112

113-
impl<T> ConsumerStream for futures_util::stream::TakeUntil<T, async_channel::Recv<'_, ()>>
113+
impl<T> ConsumerStream
114+
for futures_util::stream::TakeUntil<T, BoxFuture<'_, async_channel::Recv<'_, ()>>>
114115
where
115116
T: ConsumerStream,
116117
{

crates/fluvio/src/sync/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -168,7 +168,7 @@ mod context {
168168
mod unstable {
169169
use super::*;
170170
use crate::metadata::store::MetadataChanges;
171-
use futures_util::Stream;
171+
use futures_util::{Stream, StreamExt};
172172

173173
impl<S> StoreContext<S>
174174
where
@@ -193,7 +193,7 @@ mod context {
193193
}
194194
});
195195

196-
receiver
196+
receiver.boxed()
197197
}
198198
}
199199
}

0 commit comments

Comments
 (0)