Skip to content

Commit

Permalink
Merge pull request #135 from snapview/clean-ups
Browse files Browse the repository at this point in the history
Regular maintenance + wanted to check how the new GitHub actions would work :P
  • Loading branch information
daniel-abramov authored Nov 26, 2020
2 parents 1e9871c + 518ad04 commit 71a5b72
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 157 deletions.
15 changes: 15 additions & 0 deletions .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,21 @@ name: CI
on: [push, pull_request]

jobs:
fmt:
name: Format
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v2
- uses: actions-rs/toolchain@v1
with:
toolchain: nightly
components: rustfmt
override: true
- uses: actions-rs/cargo@v1
with:
command: fmt
args: --all -- --check

test:
name: Test
runs-on: ubuntu-latest
Expand Down
25 changes: 10 additions & 15 deletions examples/autobahn-client.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
use futures_util::{SinkExt, StreamExt};
use log::*;
use tokio_tungstenite::{connect_async, tungstenite::Error, tungstenite::Result};
use tokio_tungstenite::{
connect_async,
tungstenite::{Error, Result},
};
use url::Url;

const AGENT: &str = "Tungstenite";
Expand All @@ -12,19 +15,13 @@ async fn get_case_count() -> Result<u32> {
.await?;
let msg = socket.next().await.expect("Can't fetch case count")?;
socket.close(None).await?;
Ok(msg
.into_text()?
.parse::<u32>()
.expect("Can't parse case count"))
Ok(msg.into_text()?.parse::<u32>().expect("Can't parse case count"))
}

async fn update_reports() -> Result<()> {
let (mut socket, _) = connect_async(
Url::parse(&format!(
"ws://localhost:9001/updateReports?agent={}",
AGENT
))
.expect("Can't update reports"),
Url::parse(&format!("ws://localhost:9001/updateReports?agent={}", AGENT))
.expect("Can't update reports"),
)
.await?;
socket.close(None).await?;
Expand All @@ -33,11 +30,9 @@ async fn update_reports() -> Result<()> {

async fn run_test(case: u32) -> Result<()> {
info!("Running test case {}", case);
let case_url = Url::parse(&format!(
"ws://localhost:9001/runCase?case={}&agent={}",
case, AGENT
))
.expect("Bad testcase URL");
let case_url =
Url::parse(&format!("ws://localhost:9001/runCase?case={}&agent={}", case, AGENT))
.expect("Bad testcase URL");

let (mut ws_stream, _) = connect_async(case_url).await?;
while let Some(msg) = ws_stream.next().await {
Expand Down
4 changes: 1 addition & 3 deletions examples/autobahn-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,9 +38,7 @@ async fn main() {
info!("Listening on: {}", addr);

while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
let peer = stream.peer_addr().expect("connected streams should have a peer address");
info!("Peer address: {}", peer);

tokio::spawn(accept_connection(peer, stream));
Expand Down
6 changes: 2 additions & 4 deletions examples/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,12 +16,10 @@ use futures_util::{future, pin_mut, StreamExt};
use tokio::io::{AsyncReadExt, AsyncWriteExt};
use tokio_tungstenite::{connect_async, tungstenite::protocol::Message};


#[tokio::main]
async fn main() {
let connect_addr = env::args()
.nth(1)
.unwrap_or_else(|| panic!("this program requires at least one argument"));
let connect_addr =
env::args().nth(1).unwrap_or_else(|| panic!("this program requires at least one argument"));

let url = url::Url::parse(&connect_addr).unwrap();

Expand Down
12 changes: 3 additions & 9 deletions examples/echo-server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ use tokio::net::{TcpListener, TcpStream};
#[tokio::main]
async fn main() -> Result<(), Error> {
let _ = env_logger::try_init();
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

// Create the event loop and TCP listener we'll accept connections on.
let try_socket = TcpListener::bind(&addr).await;
Expand All @@ -34,9 +32,7 @@ async fn main() -> Result<(), Error> {
}

async fn accept_connection(stream: TcpStream) {
let addr = stream
.peer_addr()
.expect("connected streams should have a peer address");
let addr = stream.peer_addr().expect("connected streams should have a peer address");
info!("Peer address: {}", addr);

let ws_stream = tokio_tungstenite::accept_async(stream)
Expand All @@ -46,7 +42,5 @@ async fn accept_connection(stream: TcpStream) {
info!("New WebSocket connection: {}", addr);

let (write, read) = ws_stream.split();
read.forward(write)
.await
.expect("Failed to forward message")
read.forward(write).await.expect("Failed to forward message")
}
13 changes: 6 additions & 7 deletions examples/interval-server.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use futures_util::future::{select, Either};
use futures_util::{SinkExt, StreamExt};
use futures_util::{
future::{select, Either},
SinkExt, StreamExt,
};
use log::*;
use std::net::SocketAddr;
use std::time::Duration;
use std::{net::SocketAddr, time::Duration};
use tokio::net::{TcpListener, TcpStream};
use tokio_tungstenite::{accept_async, tungstenite::Error};
use tungstenite::{Message, Result};
Expand Down Expand Up @@ -63,9 +64,7 @@ async fn main() {
info!("Listening on: {}", addr);

while let Ok((stream, _)) = listener.accept().await {
let peer = stream
.peer_addr()
.expect("connected streams should have a peer address");
let peer = stream.peer_addr().expect("connected streams should have a peer address");
info!("Peer address: {}", peer);

tokio::spawn(accept_connection(peer, stream));
Expand Down
22 changes: 5 additions & 17 deletions examples/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,12 +25,8 @@ use std::{
sync::{Arc, Mutex},
};

use futures_util::{
future, pin_mut,
stream::TryStreamExt,
StreamExt,
};
use futures_channel::mpsc::{unbounded, UnboundedSender};
use futures_util::{future, pin_mut, stream::TryStreamExt, StreamExt};

use tokio::net::{TcpListener, TcpStream};
use tungstenite::protocol::Message;
Expand All @@ -53,18 +49,12 @@ async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: Socke
let (outgoing, incoming) = ws_stream.split();

let broadcast_incoming = incoming.try_for_each(|msg| {
println!(
"Received a message from {}: {}",
addr,
msg.to_text().unwrap()
);
println!("Received a message from {}: {}", addr, msg.to_text().unwrap());
let peers = peer_map.lock().unwrap();

// We want to broadcast the message to everyone except ourselves.
let broadcast_recipients = peers
.iter()
.filter(|(peer_addr, _)| peer_addr != &&addr)
.map(|(_, ws_sink)| ws_sink);
let broadcast_recipients =
peers.iter().filter(|(peer_addr, _)| peer_addr != &&addr).map(|(_, ws_sink)| ws_sink);

for recp in broadcast_recipients {
recp.unbounded_send(msg.clone()).unwrap();
Expand All @@ -84,9 +74,7 @@ async fn handle_connection(peer_map: PeerMap, raw_stream: TcpStream, addr: Socke

#[tokio::main]
async fn main() -> Result<(), IoError> {
let addr = env::args()
.nth(1)
.unwrap_or_else(|| "127.0.0.1:8080".to_string());
let addr = env::args().nth(1).unwrap_or_else(|| "127.0.0.1:8080".to_string());

let state = PeerMap::new(Mutex::new(HashMap::new()));

Expand Down
7 changes: 7 additions & 0 deletions rustfmt.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
# This project uses rustfmt to format source code. Run `cargo +nightly fmt [-- --check].
# https://github.com/rust-lang/rustfmt/blob/master/Configurations.md

# Break complex but short statements a bit less.
use_small_heuristics = "Max"

merge_imports = true
26 changes: 8 additions & 18 deletions src/compat.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use log::*;
use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
io::{Read, Write},
pin::Pin,
task::{Context, Poll},
};

use futures_util::task;
use std::sync::Arc;
Expand Down Expand Up @@ -147,11 +149,7 @@ where
trace!("{}:{} Read.read", file!(), line!());
let mut buf = ReadBuf::new(buf);
match self.with_context(ContextWaker::Read, |ctx, stream| {
trace!(
"{}:{} Read.with_context read -> poll_read",
file!(),
line!()
);
trace!("{}:{} Read.with_context read -> poll_read", file!(), line!());
stream.poll_read(ctx, &mut buf)
}) {
Poll::Ready(Ok(_)) => Ok(buf.filled().len()),
Expand All @@ -168,11 +166,7 @@ where
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
trace!("{}:{} Write.write", file!(), line!());
match self.with_context(ContextWaker::Write, |ctx, stream| {
trace!(
"{}:{} Write.with_context write -> poll_write",
file!(),
line!()
);
trace!("{}:{} Write.with_context write -> poll_write", file!(), line!());
stream.poll_write(ctx, buf)
}) {
Poll::Ready(r) => r,
Expand All @@ -183,11 +177,7 @@ where
fn flush(&mut self) -> std::io::Result<()> {
trace!("{}:{} Write.flush", file!(), line!());
match self.with_context(ContextWaker::Write, |ctx, stream| {
trace!(
"{}:{} Write.with_context flush -> poll_flush",
file!(),
line!()
);
trace!("{}:{} Write.with_context flush -> poll_flush", file!(), line!());
stream.poll_flush(ctx)
}) {
Poll::Ready(r) => r,
Expand Down
19 changes: 9 additions & 10 deletions src/connect.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
//! Connection helper.
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio::{
io::{AsyncRead, AsyncWrite},
net::TcpStream,
};

use tungstenite::client::uri_mode;
use tungstenite::handshake::client::Response;
use tungstenite::protocol::WebSocketConfig;
use tungstenite::Error;
use tungstenite::{
client::uri_mode, handshake::client::Response, protocol::WebSocketConfig, Error,
};

use super::{client_async_with_config, IntoClientRequest, Request, WebSocketStream};

Expand All @@ -16,8 +17,7 @@ pub(crate) mod encryption {

use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::stream::Mode;
use tungstenite::Error;
use tungstenite::{stream::Mode, Error};

use crate::stream::Stream as StreamSwitcher;

Expand Down Expand Up @@ -63,8 +63,7 @@ pub use self::encryption::TlsConnector;
pub(crate) mod encryption {
use tokio::io::{AsyncRead, AsyncWrite};

use tungstenite::stream::Mode;
use tungstenite::Error;
use tungstenite::{stream::Mode, Error};

pub type AutoStream<S> = S;

Expand Down
33 changes: 18 additions & 15 deletions src/handshake.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use crate::compat::{AllowStd, SetWaker};
use crate::WebSocketStream;
use crate::{
compat::{AllowStd, SetWaker},
WebSocketStream,
};
use log::*;
use pin_project::pin_project;
use std::future::Future;
use std::io::{Read, Write};
use std::pin::Pin;
use std::task::{Context, Poll};
use std::{
future::Future,
io::{Read, Write},
pin::Pin,
task::{Context, Poll},
};
use tokio::io::{AsyncRead, AsyncWrite};
use tungstenite::handshake::client::Response;
use tungstenite::handshake::server::Callback;
use tungstenite::handshake::{HandshakeError as Error, HandshakeRole, MidHandshake as WsHandshake};
use tungstenite::{ClientHandshake, ServerHandshake, WebSocket};
use tungstenite::{
handshake::{
client::Response, server::Callback, HandshakeError as Error, HandshakeRole,
MidHandshake as WsHandshake,
},
ClientHandshake, ServerHandshake, WebSocket,
};

pub(crate) async fn without_handshake<F, S>(stream: S, f: F) -> WebSocketStream<S>
where
Expand Down Expand Up @@ -39,11 +46,7 @@ where
type Output = WebSocket<AllowStd<S>>;

fn poll(self: Pin<&mut Self>, ctx: &mut Context<'_>) -> Poll<Self::Output> {
let inner = self
.get_mut()
.0
.take()
.expect("future polled after completion");
let inner = self.get_mut().0.take().expect("future polled after completion");
trace!("Setting context when skipping handshake");
let stream = AllowStd::new(inner.stream, ctx.waker());

Expand Down
Loading

0 comments on commit 71a5b72

Please sign in to comment.