Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Change the trait bound for Queue elements from Copy to Unpin #33

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 18 additions & 19 deletions src/patterns/processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,12 @@ pub trait ReplyableMessage {
fn reply_to_client_id(&self) -> Option<usize>;
}

#[derive(Copy, Clone)]
pub struct InputMessage<I> where I: Copy {
pub struct InputMessage<I> where I: Unpin {
val: I,
reply_to_client_id: Option<usize>
}

impl<I> InputMessage<I> where I: Copy {
impl<I> InputMessage<I> where I: Unpin {
pub fn request(val: I) -> Self {
InputMessage { val: val, reply_to_client_id: None }
}
Expand All @@ -27,23 +26,23 @@ impl<I> InputMessage<I> where I: Copy {
InputMessage { val: val, reply_to_client_id: Some(client_id) }
}

pub fn get_val(&self) -> I {
self.val
pub fn get_val(&self) -> &I {
&self.val
}
}

impl<I> ReplyableMessage for InputMessage<I> where I: Copy {
impl<I> ReplyableMessage for InputMessage<I> where I: Unpin {
fn reply_to_client_id(&self) -> Option<usize> {
self.reply_to_client_id
}
}

pub struct Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
pub struct Processor<I, O> where I: ReplyableMessage + Unpin, O: Unpin {
queue: Arc<Queue<I>>,
inner: Arc<Mutex<ProcessorInner<O>>>,
}

impl<I, O> Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
impl<I, O> Processor<I, O> where I: ReplyableMessage + Unpin, O: Unpin {
pub fn new(queue_size: usize) -> Result<Self, FreeRtosError> {
let p = ProcessorInner {
clients: Vec::new(),
Expand Down Expand Up @@ -114,31 +113,31 @@ impl<I, O> Processor<I, O> where I: ReplyableMessage + Copy, O: Copy {
}
}

impl<I, O> Processor<InputMessage<I>, O> where I: Copy, O: Copy {
impl<I, O> Processor<InputMessage<I>, O> where I: Unpin, O: Unpin {
pub fn reply_val<D: DurationTicks>(&self, received_message: InputMessage<I>, reply: O, max_wait: D) -> Result<bool, FreeRtosError> {
self.reply(received_message, reply, max_wait)
}
}

struct ProcessorInner<O> where O: Copy {
struct ProcessorInner<O> where O: Unpin {
clients: Vec<(usize, Weak<ClientWithReplyQueue<O>>)>,
next_client_id: usize
}

impl<O> ProcessorInner<O> where O: Copy {
impl<O> ProcessorInner<O> where O: Unpin {
fn remove_client_reply(&mut self, client: &ClientWithReplyQueue<O>) {
self.clients.retain(|ref x| x.0 != client.id)
}
}



pub struct ProcessorClient<I, C> where I: ReplyableMessage + Copy {
pub struct ProcessorClient<I, C> where I: ReplyableMessage + Unpin {
processor_queue: Weak<Queue<I>>,
client_reply: C
}

impl<I, O> ProcessorClient<I, O> where I: ReplyableMessage + Copy {
impl<I, O> ProcessorClient<I, O> where I: ReplyableMessage + Unpin {
pub fn send<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<(), FreeRtosError> {
let processor_queue = self.processor_queue.upgrade().ok_or(FreeRtosError::ProcessorHasShutDown)?;
processor_queue.send(message, max_wait)?;
Expand All @@ -151,7 +150,7 @@ impl<I, O> ProcessorClient<I, O> where I: ReplyableMessage + Copy {
}
}

impl<I> ProcessorClient<InputMessage<I>, ()> where I: Copy {
impl<I> ProcessorClient<InputMessage<I>, ()> where I: Unpin {
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
}
Expand All @@ -161,7 +160,7 @@ impl<I> ProcessorClient<InputMessage<I>, ()> where I: Copy {
}
}

impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>> where I: ReplyableMessage + Copy, O: Copy {
impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>> where I: ReplyableMessage + Unpin, O: Unpin {
pub fn call<D: DurationTicks>(&self, message: I, max_wait: D) -> Result<O, FreeRtosError> {
self.send(message, max_wait)?;
self.client_reply.receive_queue.receive(max_wait)
Expand All @@ -172,7 +171,7 @@ impl<I, O> ProcessorClient<I, SharedClientWithReplyQueue<O>> where I: ReplyableM
}
}

impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>> where I: Copy, O: Copy {
impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>> where I: Unpin, O: Unpin {
pub fn send_val<D: DurationTicks>(&self, val: I, max_wait: D) -> Result<(), FreeRtosError> {
self.send(InputMessage::request(val), max_wait)
}
Expand All @@ -183,7 +182,7 @@ impl<I, O> ProcessorClient<InputMessage<I>, SharedClientWithReplyQueue<O>> where
}
}

impl<I, C> Clone for ProcessorClient<I, C> where I: ReplyableMessage + Copy, C: Clone {
impl<I, C> Clone for ProcessorClient<I, C> where I: ReplyableMessage + Unpin, C: Clone {
fn clone(&self) -> Self {
ProcessorClient {
processor_queue: self.processor_queue.clone(),
Expand All @@ -194,13 +193,13 @@ impl<I, C> Clone for ProcessorClient<I, C> where I: ReplyableMessage + Copy, C:



pub struct ClientWithReplyQueue<O> where O: Copy {
pub struct ClientWithReplyQueue<O> where O: Unpin {
id: usize,
processor_inner: Arc<Mutex<ProcessorInner<O>>>,
receive_queue: Queue<O>
}

impl<O> Drop for ClientWithReplyQueue<O> where O: Copy {
impl<O> Drop for ClientWithReplyQueue<O> where O: Unpin {
fn drop(&mut self) {
if let Ok(mut p) = self.processor_inner.lock(Duration::ms(1000)) {
p.remove_client_reply(&self);
Expand Down
20 changes: 10 additions & 10 deletions src/patterns/pub_sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@ use crate::units::*;


/// A pub-sub queue. An item sent to the publisher is sent to every subscriber.
pub struct QueuePublisher<T: Sized + Copy> {
pub struct QueuePublisher<T: Sized + Unpin + Clone> {
inner: Arc<Mutex<PublisherInner<T>>>,
}

/// A subscribtion to the publisher.
pub struct QueueSubscriber<T: Sized + Copy> {
pub struct QueueSubscriber<T: Sized + Unpin + Clone> {
inner: Arc<SubscriberInner<T>>,
}

impl<T: Sized + Copy> QueuePublisher<T> {
impl<T: Sized + Unpin + Clone> QueuePublisher<T> {
/// Create a new publisher
pub fn new() -> Result<QueuePublisher<T>, FreeRtosError> {
let inner = PublisherInner {
Expand All @@ -33,7 +33,7 @@ impl<T: Sized + Copy> QueuePublisher<T> {

if let Ok(m) = self.inner.lock(max_wait) {
for subscriber in &m.subscribers {
if let Ok(_) = subscriber.queue.send(item, max_wait) {
if let Ok(_) = subscriber.queue.send(item.clone(), max_wait) {
sent_to += 1;
}
}
Expand Down Expand Up @@ -67,13 +67,13 @@ impl<T: Sized + Copy> QueuePublisher<T> {
}
}

impl<T: Sized + Copy> Clone for QueuePublisher<T> {
impl<T: Sized + Unpin + Clone> Clone for QueuePublisher<T> {
fn clone(&self) -> Self {
QueuePublisher { inner: self.inner.clone() }
}
}

impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
impl<T: Sized + Unpin + Clone> Drop for QueueSubscriber<T> {
fn drop(&mut self) {
if let Ok(mut l) = self.inner.publisher.lock(Duration::infinite()) {
l.unsubscribe(&self.inner);
Expand All @@ -82,25 +82,25 @@ impl<T: Sized + Copy> Drop for QueueSubscriber<T> {
}


impl<T: Sized + Copy> QueueSubscriber<T> {
impl<T: Sized + Unpin + Clone> QueueSubscriber<T> {
/// Wait for an item to be posted from the publisher.
pub fn receive<D: DurationTicks>(&self, max_wait: D) -> Result<T, FreeRtosError> {
self.inner.queue.receive(max_wait)
}
}

struct PublisherInner<T: Sized + Copy> {
struct PublisherInner<T: Sized + Unpin + Clone> {
subscribers: Vec<Arc<SubscriberInner<T>>>,
queue_next_id: usize,
}

impl<T: Sized + Copy> PublisherInner<T> {
impl<T: Sized + Unpin + Clone> PublisherInner<T> {
fn unsubscribe(&mut self, subscriber: &SubscriberInner<T>) {
self.subscribers.retain(|ref x| x.id != subscriber.id);
}
}

struct SubscriberInner<T: Sized + Copy> {
struct SubscriberInner<T: Sized + Unpin + Clone> {
id: usize,
queue: Queue<T>,
publisher: Arc<Mutex<PublisherInner<T>>>,
Expand Down
11 changes: 6 additions & 5 deletions src/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,18 +4,18 @@ use crate::shim::*;
use crate::units::*;
use crate::isr::*;

unsafe impl<T: Sized + Copy> Send for Queue<T> {}
unsafe impl<T: Sized + Copy> Sync for Queue<T> {}
unsafe impl<T: Sized + Unpin> Send for Queue<T> {}
unsafe impl<T: Sized + Unpin> Sync for Queue<T> {}

/// A queue with a finite size. The items are owned by the queue and are
/// copied.
#[derive(Debug)]
pub struct Queue<T: Sized + Copy> {
pub struct Queue<T: Sized + Unpin> {
queue: FreeRtosQueueHandle,
item_type: PhantomData<T>,
}

impl<T: Sized + Copy> Queue<T> {
impl<T: Sized + Unpin> Queue<T> {
pub fn new(max_size: usize) -> Result<Queue<T>, FreeRtosError> {

let item_size = mem::size_of::<T>();
Expand All @@ -40,6 +40,7 @@ impl<T: Sized + Copy> Queue<T> {
max_wait.to_ticks()) != 0 {
Err(FreeRtosError::QueueSendTimeout)
} else {
core::mem::forget(item);
Ok(())
}
}
Expand Down Expand Up @@ -77,7 +78,7 @@ impl<T: Sized + Copy> Queue<T> {
}
}

impl<T: Sized + Copy> Drop for Queue<T> {
impl<T: Sized + Unpin> Drop for Queue<T> {
fn drop(&mut self) {
unsafe {
freertos_rs_queue_delete(self.queue);
Expand Down