Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Do not trigger transport error in case of SHM buffer invalidation #1245

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 6 additions & 1 deletion io/zenoh-transport/src/multicast/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
//
use std::sync::MutexGuard;

#[cfg(feature = "shared-memory")]
use tracing::error;
use zenoh_core::{zlock, zread};
use zenoh_protocol::{
core::{Locator, Priority, Reliability},
Expand Down Expand Up @@ -44,7 +46,10 @@ impl TransportMulticastInner {
#[cfg(feature = "shared-memory")]
{
if self.manager.config.multicast.is_shm {
crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?;
if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) {
error!("Error receiving SHM buffer: {e}");
return Ok(());
}
}
}

Expand Down
3 changes: 1 addition & 2 deletions io/zenoh-transport/src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,8 +334,7 @@ pub fn map_zslice_to_shmbuf(zslice: &mut ZSlice, shmr: &ShmReader) -> ZResult<()
let smb = shmr.read_shmbuf(&shmbinfo)?;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure about this solution. If we do that the only result we get is that the shm_buf is remapped and propagated anyway in the system. As a result, the user will eventually receive an empty message.
A better approach would be to let the caller of this function to properly handle the failure case.
In practical terms, in case of failure we don't call the routing callback and just drop this message without closing the link. Opinions?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I thought about dropping the whole message, but the problem is that ZBuf might contain many ZSlice's and not necessary all of them will be invalidated - what should we do in this case? Drop the whole ZBuf and potentially loose a lot of data? - I think this is not a good option.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Such "punctured" ZBuf can also be a bad thing because it can be completely broken from user's perspective, but it is way better to give user decide by exposing this buffer out instead of implicitly dropping it

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point is that we may be a router and propagate an already invalid message across different hops to eventually reach the user application. Having a punctured ZBuf is not a good option either because we don't explicitly signal that to the user and we are not safe from the case where a punctured ZBuf is a valid ZBuf, leading to the case where we are replacing user's content that becomes valid by chance.

Because of the above I believe it is still safer and clearer from a behavioural point of view (principle of least surprise) to drop the message and potentially log it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, I will replace this behavior!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done!


// Replace the content of the slice
let zs: ZSlice = smb.into();
*zslice = zs;
*zslice = smb.into();

Ok(())
}
7 changes: 6 additions & 1 deletion io/zenoh-transport/src/unicast/lowlatency/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
#[cfg(feature = "shared-memory")]
use tracing::error;
use zenoh_buffers::{
reader::{HasReader, Reader},
ZSlice,
Expand All @@ -37,7 +39,10 @@ impl TransportUnicastLowlatency {
#[cfg(feature = "shared-memory")]
{
if self.config.shm.is_some() {
crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?;
if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) {
error!("Error receiving SHM buffer: {e}");
return Ok(());
}
}
}
callback.handle_message(msg)
Expand Down
7 changes: 6 additions & 1 deletion io/zenoh-transport/src/unicast/universal/rx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
//
use std::sync::MutexGuard;

#[cfg(feature = "shared-memory")]
use tracing::error;
use zenoh_core::{zlock, zread};
use zenoh_link::Link;
use zenoh_protocol::{
Expand Down Expand Up @@ -45,7 +47,10 @@ impl TransportUnicastUniversal {
#[cfg(feature = "shared-memory")]
{
if self.config.shm.is_some() {
crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr)?;
if let Err(e) = crate::shm::map_zmsg_to_shmbuf(&mut msg, &self.manager.shmr) {
error!("Error receiving SHM buffer: {e}");
return Ok(());
}
}
}
callback.handle_message(msg)
Expand Down