Skip to content

Commit

Permalink
Merge pull request #142 from dnaka91/tokio-1.0
Browse files Browse the repository at this point in the history
Upgrade to tokio 1.0
  • Loading branch information
daniel-abramov authored Jan 9, 2021
2 parents 71a5b72 + c0e4979 commit d3ae6c6
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 30 deletions.
8 changes: 4 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,10 @@ stream = []
log = "0.4"
futures-util = { version = "0.3", default-features = false, features = ["async-await", "sink", "std"] }
pin-project = "1.0"
tokio = { version = "0.3", default-features = false, features = ["io-util"] }
tokio = { version = "1.0.0", default-features = false, features = ["io-util"] }

[dependencies.tungstenite]
version = "0.11.1"
version = "0.12.0"
default-features = false

[dependencies.native-tls]
Expand All @@ -33,10 +33,10 @@ version = "0.2.0"

[dependencies.tokio-native-tls]
optional = true
version = "0.2"
version = "0.3.0"

[dev-dependencies]
futures-channel = "0.3"
tokio = { version = "0.3", default-features = false, features = ["io-std", "macros", "rt-multi-thread", "stream", "time"] }
tokio = { version = "1.0.0", default-features = false, features = ["io-std", "macros", "rt-multi-thread", "time"] }
url = "2.0.0"
env_logger = "0.7"
23 changes: 7 additions & 16 deletions examples/interval-server.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
use futures_util::{
future::{select, Either},
SinkExt, StreamExt,
};
use futures_util::{SinkExt, StreamExt};
use log::*;
use std::{net::SocketAddr, time::Duration};
use tokio::net::{TcpListener, TcpStream};
Expand All @@ -25,29 +22,23 @@ async fn handle_connection(peer: SocketAddr, stream: TcpStream) -> Result<()> {

// Echo incoming WebSocket messages and send a message periodically every second.

let mut msg_fut = ws_receiver.next();
let mut tick_fut = interval.next();
loop {
match select(msg_fut, tick_fut).await {
Either::Left((msg, tick_fut_continue)) => {
tokio::select! {
msg = ws_receiver.next() => {
match msg {
Some(msg) => {
let msg = msg?;
if msg.is_text() || msg.is_binary() {
if msg.is_text() ||msg.is_binary() {
ws_sender.send(msg).await?;
} else if msg.is_close() {
break;
}
tick_fut = tick_fut_continue; // Continue waiting for tick.
msg_fut = ws_receiver.next(); // Receive next WebSocket message.
}
None => break, // WebSocket stream terminated.
};
None => break,
}
}
Either::Right((_, msg_fut_continue)) => {
_ = interval.tick() => {
ws_sender.send(Message::Text("tick".to_owned())).await?;
msg_fut = msg_fut_continue; // Continue receiving the WebSocket message.
tick_fut = interval.next(); // Wait for next tick.
}
}
}
Expand Down
20 changes: 10 additions & 10 deletions src/handshake.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,11 +92,11 @@ pub(crate) async fn client_handshake<F, S>(
) -> Result<(WebSocketStream<S>, Response), Error<ClientHandshake<AllowStd<S>>>>
where
F: FnOnce(
AllowStd<S>,
) -> Result<
<ClientHandshake<AllowStd<S>> as HandshakeRole>::FinalResult,
Error<ClientHandshake<AllowStd<S>>>,
> + Unpin,
AllowStd<S>,
) -> Result<
<ClientHandshake<AllowStd<S>> as HandshakeRole>::FinalResult,
Error<ClientHandshake<AllowStd<S>>>,
> + Unpin,
S: AsyncRead + AsyncWrite + Unpin,
{
let result = handshake(stream, f).await?;
Expand All @@ -111,11 +111,11 @@ pub(crate) async fn server_handshake<C, F, S>(
where
C: Callback + Unpin,
F: FnOnce(
AllowStd<S>,
) -> Result<
<ServerHandshake<AllowStd<S>, C> as HandshakeRole>::FinalResult,
Error<ServerHandshake<AllowStd<S>, C>>,
> + Unpin,
AllowStd<S>,
) -> Result<
<ServerHandshake<AllowStd<S>, C> as HandshakeRole>::FinalResult,
Error<ServerHandshake<AllowStd<S>, C>>,
> + Unpin,
S: AsyncRead + AsyncWrite + Unpin,
{
let s: WebSocket<AllowStd<S>> = handshake(stream, f).await?;
Expand Down

0 comments on commit d3ae6c6

Please sign in to comment.