From 26661a23a260c0cd3015b7b12288e0a019382368 Mon Sep 17 00:00:00 2001 From: Zhixing Zhang Date: Wed, 11 Nov 2020 14:08:33 -0500 Subject: [PATCH 1/2] Use Unpin for Queue --- src/queue.rs | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/src/queue.rs b/src/queue.rs index 20f2878..7d565b9 100644 --- a/src/queue.rs +++ b/src/queue.rs @@ -4,18 +4,18 @@ use crate::shim::*; use crate::units::*; use crate::isr::*; -unsafe impl Send for Queue {} -unsafe impl Sync for Queue {} +unsafe impl Send for Queue {} +unsafe impl Sync for Queue {} /// A queue with a finite size. The items are owned by the queue and are /// copied. #[derive(Debug)] -pub struct Queue { +pub struct Queue { queue: FreeRtosQueueHandle, item_type: PhantomData, } -impl Queue { +impl Queue { pub fn new(max_size: usize) -> Result, FreeRtosError> { let item_size = mem::size_of::(); @@ -40,6 +40,7 @@ impl Queue { max_wait.to_ticks()) != 0 { Err(FreeRtosError::QueueSendTimeout) } else { + core::mem::forget(item); Ok(()) } } @@ -77,7 +78,7 @@ impl Queue { } } -impl Drop for Queue { +impl Drop for Queue { fn drop(&mut self) { unsafe { freertos_rs_queue_delete(self.queue); From 650794cdc02988c06d9072e3ceed8ff8d9d874bd Mon Sep 17 00:00:00 2001 From: Zhixing Zhang Date: Wed, 11 Nov 2020 14:25:05 -0500 Subject: [PATCH 2/2] Modify trait bounds for processor and pub_sub --- src/patterns/processor.rs | 37 ++++++++++++++++++------------------- src/patterns/pub_sub.rs | 20 ++++++++++---------- 2 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/patterns/processor.rs b/src/patterns/processor.rs index fb85c9a..45bb287 100644 --- a/src/patterns/processor.rs +++ b/src/patterns/processor.rs @@ -12,13 +12,12 @@ pub trait ReplyableMessage { fn reply_to_client_id(&self) -> Option; } -#[derive(Copy, Clone)] -pub struct InputMessage where I: Copy { +pub struct InputMessage where I: Unpin { val: I, reply_to_client_id: Option } -impl InputMessage where I: Copy { +impl InputMessage where I: Unpin { pub fn request(val: I) -> Self { InputMessage { val: val, reply_to_client_id: None } } @@ -27,23 +26,23 @@ impl InputMessage where I: Copy { InputMessage { val: val, reply_to_client_id: Some(client_id) } } - pub fn get_val(&self) -> I { - self.val + pub fn get_val(&self) -> &I { + &self.val } } -impl ReplyableMessage for InputMessage where I: Copy { +impl ReplyableMessage for InputMessage where I: Unpin { fn reply_to_client_id(&self) -> Option { self.reply_to_client_id } } -pub struct Processor where I: ReplyableMessage + Copy, O: Copy { +pub struct Processor where I: ReplyableMessage + Unpin, O: Unpin { queue: Arc>, inner: Arc>>, } -impl Processor where I: ReplyableMessage + Copy, O: Copy { +impl Processor where I: ReplyableMessage + Unpin, O: Unpin { pub fn new(queue_size: usize) -> Result { let p = ProcessorInner { clients: Vec::new(), @@ -114,18 +113,18 @@ impl Processor where I: ReplyableMessage + Copy, O: Copy { } } -impl Processor, O> where I: Copy, O: Copy { +impl Processor, O> where I: Unpin, O: Unpin { pub fn reply_val(&self, received_message: InputMessage, reply: O, max_wait: D) -> Result { self.reply(received_message, reply, max_wait) } } -struct ProcessorInner where O: Copy { +struct ProcessorInner where O: Unpin { clients: Vec<(usize, Weak>)>, next_client_id: usize } -impl ProcessorInner where O: Copy { +impl ProcessorInner where O: Unpin { fn remove_client_reply(&mut self, client: &ClientWithReplyQueue) { self.clients.retain(|ref x| x.0 != client.id) } @@ -133,12 +132,12 @@ impl ProcessorInner where O: Copy { -pub struct ProcessorClient where I: ReplyableMessage + Copy { +pub struct ProcessorClient where I: ReplyableMessage + Unpin { processor_queue: Weak>, client_reply: C } -impl ProcessorClient where I: ReplyableMessage + Copy { +impl ProcessorClient where I: ReplyableMessage + Unpin { pub fn send(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> { let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?; processor_queue.send(message, max_wait)?; @@ -151,7 +150,7 @@ impl ProcessorClient where I: ReplyableMessage + Copy { } } -impl ProcessorClient, ()> where I: Copy { +impl ProcessorClient, ()> where I: Unpin { pub fn send_val(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> { self.send(InputMessage::request(val), max_wait) } @@ -161,7 +160,7 @@ impl ProcessorClient, ()> where I: Copy { } } -impl ProcessorClient> where I: ReplyableMessage + Copy, O: Copy { +impl ProcessorClient> where I: ReplyableMessage + Unpin, O: Unpin { pub fn call(&self, message: I, max_wait: D) -> Result { self.send(message, max_wait)?; self.client_reply.receive_queue.receive(max_wait) @@ -172,7 +171,7 @@ impl ProcessorClient> where I: ReplyableM } } -impl ProcessorClient, SharedClientWithReplyQueue> where I: Copy, O: Copy { +impl ProcessorClient, SharedClientWithReplyQueue> where I: Unpin, O: Unpin { pub fn send_val(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> { self.send(InputMessage::request(val), max_wait) } @@ -183,7 +182,7 @@ impl ProcessorClient, SharedClientWithReplyQueue> where } } -impl Clone for ProcessorClient where I: ReplyableMessage + Copy, C: Clone { +impl Clone for ProcessorClient where I: ReplyableMessage + Unpin, C: Clone { fn clone(&self) -> Self { ProcessorClient { processor_queue: self.processor_queue.clone(), @@ -194,13 +193,13 @@ impl Clone for ProcessorClient where I: ReplyableMessage + Copy, C: -pub struct ClientWithReplyQueue where O: Copy { +pub struct ClientWithReplyQueue where O: Unpin { id: usize, processor_inner: Arc>>, receive_queue: Queue } -impl Drop for ClientWithReplyQueue where O: Copy { +impl Drop for ClientWithReplyQueue where O: Unpin { fn drop(&mut self) { if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) { p.remove_client_reply(&self); diff --git a/src/patterns/pub_sub.rs b/src/patterns/pub_sub.rs index bd81ef9..c134eef 100644 --- a/src/patterns/pub_sub.rs +++ b/src/patterns/pub_sub.rs @@ -6,16 +6,16 @@ use crate::units::*; /// A pub-sub queue. An item sent to the publisher is sent to every subscriber. -pub struct QueuePublisher { +pub struct QueuePublisher { inner: Arc>>, } /// A subscribtion to the publisher. -pub struct QueueSubscriber { +pub struct QueueSubscriber { inner: Arc>, } -impl QueuePublisher { +impl QueuePublisher { /// Create a new publisher pub fn new() -> Result, FreeRtosError> { let inner = PublisherInner { @@ -33,7 +33,7 @@ impl QueuePublisher { if let Ok(m) = self.inner.lock(max_wait) { for subscriber in &m.subscribers { - if let Ok(_) = subscriber.queue.send(item, max_wait) { + if let Ok(_) = subscriber.queue.send(item.clone(), max_wait) { sent_to += 1; } } @@ -67,13 +67,13 @@ impl QueuePublisher { } } -impl Clone for QueuePublisher { +impl Clone for QueuePublisher { fn clone(&self) -> Self { QueuePublisher { inner: self.inner.clone() } } } -impl Drop for QueueSubscriber { +impl Drop for QueueSubscriber { fn drop(&mut self) { if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) { l.unsubscribe(&self.inner); @@ -82,25 +82,25 @@ impl Drop for QueueSubscriber { } -impl QueueSubscriber { +impl QueueSubscriber { /// Wait for an item to be posted from the publisher. pub fn receive(&self, max_wait: D) -> Result { self.inner.queue.receive(max_wait) } } -struct PublisherInner { +struct PublisherInner { subscribers: Vec>>, queue_next_id: usize, } -impl PublisherInner { +impl PublisherInner { fn unsubscribe(&mut self, subscriber: &SubscriberInner) { self.subscribers.retain(|ref x| x.id != subscriber.id); } } -struct SubscriberInner { +struct SubscriberInner { id: usize, queue: Queue, publisher: Arc>>,