Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

remove new_timestamp fn , time module, reworked plugin storage … #1188

Merged
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -112,11 +112,10 @@ impl Replica {
};

// Zid of session for generating timestamps
let zid = session.zid();

let replica = Replica {
name: name.to_string(),
session,
session: session.clone(),
key_expr: storage_config.key_expr.clone(),
replica_config: storage_config.replica_config.clone().unwrap(),
digests_published: RwLock::new(HashSet::new()),
Expand All @@ -131,7 +130,8 @@ impl Replica {

let config = replica.replica_config.clone();
// snapshotter
let snapshotter = Arc::new(Snapshotter::new(zid, rx_log, &startup_entries, &config).await);
let snapshotter =
Arc::new(Snapshotter::new(session, rx_log, &startup_entries, &config).await);
// digest sub
let digest_sub = replica.start_digest_sub(tx_digest).fuse();
// queryable for alignment
Expand Down
22 changes: 12 additions & 10 deletions plugins/zenoh-plugin-storage-manager/src/replica/snapshotter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ use async_std::{
};
use flume::Receiver;
use futures::join;
use zenoh::{key_expr::OwnedKeyExpr, session::ZenohId, time::Timestamp};
use zenoh::{key_expr::OwnedKeyExpr, time::Timestamp, Session};
use zenoh_backend_traits::config::ReplicaConfig;

use super::{Digest, DigestConfig, LogEntry};

pub struct Snapshotter {
// session id for timestamp generation
id: ZenohId,
// session ref for timestamp generation
session: Arc<Session>,
// channel to get updates from the storage
storage_update: Receiver<(OwnedKeyExpr, Timestamp)>,
// configuration parameters of the replica
Expand All @@ -57,7 +57,7 @@ pub struct ReplicationInfo {
impl Snapshotter {
// Initialize the snapshot parameters, logs and digest
pub async fn new(
id: ZenohId,
session: Arc<Session>,
rx_sample: Receiver<(OwnedKeyExpr, Timestamp)>,
initial_entries: &Vec<(OwnedKeyExpr, Timestamp)>,
replica_config: &ReplicaConfig,
Expand All @@ -66,12 +66,12 @@ impl Snapshotter {
// from initial entries, populate the log - stable and volatile
// compute digest
let (last_snapshot_time, last_interval) = Snapshotter::compute_snapshot_params(
id,
session.clone(),
replica_config.propagation_delay,
replica_config.delta,
);
let snapshotter = Snapshotter {
id,
session,
storage_update: rx_sample,
replica_config: replica_config.clone(),
content: ReplicationInfo {
Expand Down Expand Up @@ -131,7 +131,7 @@ impl Snapshotter {
let mut last_snapshot_time = self.content.last_snapshot_time.write().await;
let mut last_interval = self.content.last_interval.write().await;
let (time, interval) = Snapshotter::compute_snapshot_params(
self.id,
self.session.clone(),
self.replica_config.propagation_delay,
self.replica_config.delta,
);
Expand All @@ -143,13 +143,15 @@ impl Snapshotter {
}
}

// TODO
// Compute latest snapshot time and latest interval with respect to the current time
pub fn compute_snapshot_params(
id: ZenohId,
session: Arc<Session>,
propagation_delay: Duration,
delta: Duration,
) -> (Timestamp, u64) {
let now = zenoh::time::new_timestamp(id);
let now = session.new_timestamp();

let latest_interval = (now
.get_time()
.to_system_time()
Expand Down Expand Up @@ -206,7 +208,7 @@ impl Snapshotter {

// Create digest from the stable log at startup
async fn initialize_digest(&self) {
let now = zenoh::time::new_timestamp(self.id);
let now = self.session.new_timestamp();
let replica_data = &self.content;
let log_locked = replica_data.stable_log.read().await;
let latest_interval = replica_data.last_interval.read().await;
Expand Down
6 changes: 3 additions & 3 deletions plugins/zenoh-plugin-storage-manager/src/replica/storage.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ use zenoh::{
query::{ConsolidationMode, QueryTarget},
sample::{Sample, SampleBuilder, SampleKind, TimestampBuilderTrait},
session::{Session, SessionDeclarations},
time::{new_timestamp, Timestamp, NTP64},
time::{Timestamp, NTP64},
Result as ZResult,
};
use zenoh_backend_traits::{
Expand Down Expand Up @@ -149,7 +149,7 @@ impl StorageService {
t.add_async(gc).await;

// get session id for timestamp generation
let zid = self.session.info().zid().await;
let session_timestamp = self.session.new_timestamp();

// subscribe on key_expr
let storage_sub = match self.session.declare_subscriber(&self.key_expr).await {
Expand Down Expand Up @@ -240,7 +240,7 @@ impl StorageService {
continue;
}
};
let timestamp = sample.timestamp().cloned().unwrap_or(new_timestamp(zid));
let timestamp = sample.timestamp().cloned().unwrap_or(session_timestamp);
let sample = SampleBuilder::from(sample).timestamp(timestamp).into();
self.process_sample(sample).await;
},
Expand Down
9 changes: 5 additions & 4 deletions zenoh-ext/src/querying_subscriber.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ use zenoh::{
query::{QueryConsolidation, QueryTarget, ReplyKeyExpr, Selector},
sample::{Locality, Sample, SampleBuilder, TimestampBuilderTrait},
session::{SessionDeclarations, SessionRef},
time::{new_timestamp, Timestamp},
time::Timestamp,
Error, Resolvable, Resolve, Result as ZResult,
};

Expand Down Expand Up @@ -621,7 +621,7 @@ where
/// # }
/// ```
pub struct FetchingSubscriber<'a, Handler> {
subscriber: Subscriber<'a, ()>,
pub(crate) subscriber: Subscriber<'a, ()>,
callback: Arc<dyn Fn(Sample) + Send + Sync + 'static>,
state: Arc<Mutex<InnerState>>,
handler: Handler,
Expand Down Expand Up @@ -654,7 +654,7 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
InputHandler: IntoHandler<'static, Sample, Handler = Handler> + Send,
TryIntoSample: ExtractSample + Send + Sync,
{
let zid = conf.session.zid();
let timestamp = conf.session.new_timestamp();
let state = Arc::new(Mutex::new(InnerState {
pending_fetches: 0,
merge_queue: MergeQueue::new(),
Expand All @@ -672,9 +672,10 @@ impl<'a, Handler> FetchingSubscriber<'a, Handler> {
tracing::trace!(
"Sample received while fetch in progress: push it to merge_queue"
);

// ensure the sample has a timestamp, thus it will always be sorted into the MergeQueue
// after any timestamped Sample possibly coming from a fetch reply.
let timestamp = s.timestamp().cloned().unwrap_or(new_timestamp(zid));
let timestamp = s.timestamp().cloned().unwrap_or(timestamp);
state
.merge_queue
.push(SampleBuilder::from(s).timestamp(timestamp).into());
Expand Down
1 change: 0 additions & 1 deletion zenoh/src/api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,5 +35,4 @@ pub(crate) mod scouting;
pub(crate) mod selector;
pub(crate) mod session;
pub(crate) mod subscriber;
pub(crate) mod time;
pub(crate) mod value;
24 changes: 0 additions & 24 deletions zenoh/src/api/time.rs

This file was deleted.

2 changes: 0 additions & 2 deletions zenoh/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -353,8 +353,6 @@ pub mod liveliness {
/// Timestamp support
pub mod time {
pub use zenoh_protocol::core::{Timestamp, TimestampId, NTP64};

pub use crate::api::time::new_timestamp;
}

/// Configuration to pass to [`open`] and [`scout`] functions and associated constants
Expand Down
Loading