Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: fixes for futures #3363

Merged
merged 2 commits into from
Nov 17, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions cli/compilers/ts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,8 +272,8 @@ impl TsCompiler {

let worker = TsCompiler::setup_worker(global_state.clone());
let worker_ = worker.clone();
worker.post_message(req_msg).unwrap();
let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await;
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down Expand Up @@ -382,8 +382,8 @@ impl TsCompiler {
.add("Compile", &module_url.to_string());
let global_state_ = global_state.clone();

worker.post_message(req_msg).unwrap();
let first_msg_fut = async move {
worker.post_message(req_msg).await.unwrap();
let result = worker.await;
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down
13 changes: 6 additions & 7 deletions cli/compilers/wasm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,14 +86,13 @@ impl WasmCompiler {
let worker_ = worker.clone();
let url = source_file.url.clone();

let _res = worker.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
);
let fut = worker
.post_message(
serde_json::to_string(&base64_data)
.unwrap()
.into_boxed_str()
.into_boxed_bytes(),
)
.then(move |_| worker)
.then(move |result| {
if let Err(err) = result {
// TODO(ry) Need to forward the error instead of exiting.
Expand Down
3 changes: 2 additions & 1 deletion cli/ops/workers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,8 @@ fn op_host_post_message(
let mut table = state.workers.lock().unwrap();
// TODO: don't return bad resource anymore
let worker = table.get_mut(&id).ok_or_else(bad_resource)?;
tokio_util::block_on(worker.post_message(msg).boxed())
worker
.post_message(msg)
.map_err(|e| DenoError::new(ErrorKind::Other, e.to_string()))?;
Ok(JsonOp::Sync(json!({})))
}
Expand Down
19 changes: 7 additions & 12 deletions cli/worker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,17 +144,12 @@ impl Worker {
/// Post message to worker as a host.
///
/// This method blocks current thread.
pub fn post_message(
self: &Self,
buf: Buf,
) -> impl Future<Output = Result<(), ErrBox>> {
pub fn post_message(self: &Self, buf: Buf) -> Result<(), ErrBox> {
let channels = self.external_channels.lock().unwrap();
let mut sender = channels.sender.clone();
async move {
let result = sender.send(buf).map_err(ErrBox::from).await;
drop(sender);
result
}
futures::executor::block_on(sender.send(buf))
.map(|_| ())
.map_err(ErrBox::from)
}

/// Get message from worker as a host.
Expand Down Expand Up @@ -396,7 +391,7 @@ mod tests {

let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();

let r = futures::executor::block_on(worker_.post_message(msg).boxed());
let r = worker_.post_message(msg);
assert!(r.is_ok());

let maybe_msg =
Expand All @@ -409,7 +404,7 @@ mod tests {
.to_string()
.into_boxed_str()
.into_boxed_bytes();
let r = futures::executor::block_on(worker_.post_message(msg).boxed());
let r = worker_.post_message(msg);
assert!(r.is_ok());
})
}
Expand Down Expand Up @@ -439,7 +434,7 @@ mod tests {
);

let msg = json!("hi").to_string().into_boxed_str().into_boxed_bytes();
let r = futures::executor::block_on(worker_.post_message(msg));
let r = worker_.post_message(msg);
assert!(r.is_ok());

futures::executor::block_on(worker_future).unwrap();
Expand Down
22 changes: 15 additions & 7 deletions core/examples/http_bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ extern crate lazy_static;
use deno::*;
use futures::future::FutureExt;
use futures::future::TryFutureExt;
use futures::stream::StreamExt;
use std::env;
use std::future::Future;
use std::io::Error;
Expand All @@ -24,6 +25,7 @@ use std::pin::Pin;
use std::sync::Mutex;
use std::sync::MutexGuard;
use std::task::Poll;
use tokio::net::tcp::Incoming;
use tokio::prelude::Async;
use tokio::prelude::AsyncRead;
use tokio::prelude::AsyncWrite;
Expand Down Expand Up @@ -190,7 +192,7 @@ pub fn bad_resource() -> Error {
Error::new(ErrorKind::NotFound, "bad resource id")
}

struct TcpListener(tokio::net::TcpListener);
struct TcpListener(Incoming);

impl Resource for TcpListener {}

Expand All @@ -213,14 +215,19 @@ fn op_accept(
) -> Pin<Box<HttpOp>> {
let rid = record.arg as u32;
debug!("accept {}", rid);
let fut = futures::future::poll_fn(move |_cx| {
let fut = futures::future::poll_fn(move |cx| {
let mut table = lock_resource_table();
let listener =
table.get_mut::<TcpListener>(rid).ok_or_else(bad_resource)?;
match listener.0.poll_accept() {
Err(e) => Poll::Ready(Err(e)),
Ok(Async::Ready(v)) => Poll::Ready(Ok(v)),
Ok(Async::NotReady) => Poll::Pending,
let mut listener = futures::compat::Compat01As03::new(&mut listener.0);
match listener.poll_next_unpin(cx) {
Poll::Ready(Some(Err(e))) => Poll::Ready(Err(e)),
Poll::Ready(Some(Ok(stream))) => {
let addr = stream.peer_addr().unwrap();
Poll::Ready(Ok((stream, addr)))
}
Poll::Pending => Poll::Pending,
_ => unreachable!(),
}
})
.and_then(move |(stream, addr)| {
Expand All @@ -240,7 +247,8 @@ fn op_listen(
let addr = "127.0.0.1:4544".parse::<SocketAddr>().unwrap();
let listener = tokio::net::TcpListener::bind(&addr).unwrap();
let mut table = lock_resource_table();
let rid = table.add("tcpListener", Box::new(TcpListener(listener)));
let rid =
table.add("tcpListener", Box::new(TcpListener(listener.incoming())));
futures::future::ok(rid as i32).boxed()
}

Expand Down