Skip to content

Commit

Permalink
Upgrade to tokio 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Jan 29, 2021
1 parent 85094e6 commit 367064e
Show file tree
Hide file tree
Showing 9 changed files with 292 additions and 367 deletions.
454 changes: 190 additions & 264 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions bobtimus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ env_logger = "0.8"
futures = { version = "0.3", default-features = false }
hex = "0.4"
hmac = "0.10"
http-api-problem = { version = "0.15", features = [ "with_warp" ] }
http-api-problem = { version = "0.21", features = [ "warp" ] }
log = "0.4"
mime_guess = "2.0.3"
reqwest = "0.10"
reqwest = "0.11"
rust-embed = "5.7.0"
rust_decimal = "1.8"
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"
sha2 = "0.9"
structopt = "0.3"
swap = { path = "../swap" }
tokio = { version = "0.2", features = [ "macros" ] }
tokio-tungstenite = { version = "0.11", features = [ "tls" ] }
warp = "0.2.5"
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
tokio-tungstenite = { version = "0.13", features = [ "tls" ] }
warp = { version = "0.3", default-features = false }

[dev-dependencies]
testcontainers = "0.11"
20 changes: 8 additions & 12 deletions bobtimus/src/fixed_rate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
use std::{convert::TryFrom, time::Duration};
use tokio::{
sync::watch::{self, Receiver},
time::delay_for,
time::sleep,
};

#[derive(Clone)]
Expand All @@ -18,17 +15,17 @@ impl Service {

tokio::spawn(async move {
loop {
let _ = tx.broadcast(data);
let _ = tx.send(data);

delay_for(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
});

Self(rx)
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
self.0.clone()
pub fn subscribe(&self) -> RateSubscription {
RateSubscription::from(self.0.clone())
}
}

Expand All @@ -38,10 +35,9 @@ impl Default for Service {
}
}

#[async_trait]
impl LatestRate for Service {
async fn latest_rate(&mut self) -> Result<Rate> {
Ok(fixed_rate())
fn latest_rate(&mut self) -> Rate {
fixed_rate()
}
}

Expand Down
56 changes: 45 additions & 11 deletions bobtimus/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, Rate};
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, RateSubscription};
use anyhow::Context;
use elements_fun::{
encode::serialize_hex,
secp256k1::rand::{CryptoRng, RngCore},
};
use futures::{Stream, StreamExt};
use futures::{StreamExt, TryStreamExt};
use rust_embed::RustEmbed;
use std::{convert::Infallible, sync::Arc};
use std::{error::Error, fmt, sync::Arc};
use tokio::sync::Mutex;
use warp::{
filters::BoxedFilter, http::header::HeaderValue, path::Tail, reply::Response, Filter,
Expand All @@ -18,7 +19,7 @@ struct Waves;

pub fn routes<R, RS>(
bobtimus: Arc<Mutex<Bobtimus<R, RS>>>,
latest_rate_subscription: impl Stream<Item = Rate> + Clone + Send + Sync + 'static,
latest_rate_subscription: RateSubscription,
) -> BoxedFilter<(impl Reply,)>
where
R: RngCore + CryptoRng + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -73,13 +74,46 @@ where
.map_err(warp::reject::custom)
}

fn latest_rate<S>(stream: S) -> impl Reply
where
S: Stream<Item = Rate> + Clone + Send + 'static,
{
warp::sse::reply(warp::sse::keep_alive().stream(stream.map(|data| {
Result::<_, Infallible>::Ok((warp::sse::event("rate"), warp::sse::json(data)))
})))
fn latest_rate(subscription: RateSubscription) -> impl Reply {
let stream = subscription
.into_stream()
.map_ok(|data| {
let event = warp::sse::Event::default()
.id("rate")
.json_data(data)
.context("failed to attach json data to sse event")?;

Ok(event)
})
.map(|result| match result {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
})
.err_into::<RateStreamError>();

warp::sse::reply(warp::sse::keep_alive().stream(stream))
}

#[derive(Debug)]
struct RateStreamError(anyhow::Error);

impl fmt::Display for RateStreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:#}", self.0)
}
}

impl std::error::Error for RateStreamError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for RateStreamError {
fn from(e: anyhow::Error) -> Self {
RateStreamError(e)
}
}

async fn serve_index() -> Result<impl Reply, Rejection> {
Expand Down
70 changes: 9 additions & 61 deletions bobtimus/src/kraken.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use futures::{Future, SinkExt, Stream, StreamExt};
use futures::{SinkExt, StreamExt};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -22,38 +21,17 @@ const SUBSCRIBE_XBT_USD_TICKER_PAYLOAD: &str = r#"
#[derive(Clone)]
pub struct RateService {
receiver: Receiver<Rate>,
latest_rate: Rate,
}

impl Future for RateService {
type Output = Rate;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.receiver.poll_next_unpin(cx) {
std::task::Poll::Ready(Some(rate)) => {
self.latest_rate = rate;
self.poll(cx)
}
std::task::Poll::Ready(None) | std::task::Poll::Pending => {
std::task::Poll::from(self.latest_rate)
}
}
}
}

#[async_trait]
impl LatestRate for RateService {
async fn latest_rate(&mut self) -> anyhow::Result<Rate> {
Ok(self.await)
fn latest_rate(&mut self) -> Rate {
*self.receiver.borrow()
}
}

impl RateService {
pub async fn new() -> Result<Self> {
let (tx, mut rx) = watch::channel(Rate::ZERO);
let (tx, rx) = watch::channel(Rate::ZERO);

let (ws, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
Expand Down Expand Up @@ -84,25 +62,17 @@ impl RateService {
}
};

let _ = tx.broadcast(rate);
let _ = tx.send(rate);
}
});

write.send(SUBSCRIBE_XBT_USD_TICKER_PAYLOAD.into()).await?;

let latest_rate = rx
.next()
.await
.ok_or_else(|| anyhow!("latest rate stream has ended"))?;

Ok(Self {
receiver: rx,
latest_rate,
})
Ok(Self { receiver: rx })
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
self.receiver.clone()
pub fn subscribe(&self) -> RateSubscription {
RateSubscription::from(self.receiver.clone())
}
}

Expand Down Expand Up @@ -172,26 +142,4 @@ mod tests {

let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
}

#[tokio::test]
async fn latest_rate_does_not_wait_for_next_value() {
let (write, read) = watch::channel(Rate::ZERO);

let latest_rate = Rate {
ask: LiquidUsdt::from_str_in_dollar("20000.0").unwrap(),
bid: LiquidUsdt::from_str_in_dollar("19000.0").unwrap(),
};
let _ = write.broadcast(latest_rate).unwrap();

let mut service = RateService {
receiver: read,
latest_rate: Rate::ZERO,
};

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);
}
}
39 changes: 30 additions & 9 deletions bobtimus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use elements_fun::{
bitcoin::{
secp256k1::{All, Secp256k1},
Expand All @@ -12,7 +11,7 @@ use elements_fun::{
Address, AssetId, OutPoint, Transaction, TxIn,
};
use elements_harness::{elementd_rpc::ElementsRpc, Client as ElementsdClient};
use futures::{stream::FuturesUnordered, TryStreamExt};
use futures::{stream, stream::FuturesUnordered, Stream, TryStreamExt};
use serde::{Deserialize, Serialize};

mod amounts;
Expand All @@ -24,6 +23,7 @@ pub mod kraken;
pub mod problem;

pub use amounts::*;
use tokio::sync::watch::Receiver;

pub const USDT_ASSET_ID: &str = "ce091c998b83c78bb71a632313ba3760f1763d9cfcffae02258ffa9865a37bd2";

Expand Down Expand Up @@ -55,11 +55,7 @@ impl<R, RS> Bobtimus<R, RS> {
R: RngCore + CryptoRng,
RS: LatestRate,
{
let latest_rate = self
.rate_service
.latest_rate()
.await
.context("failed to get latest rate")?;
let latest_rate = self.rate_service.latest_rate();
let usdt_amount = latest_rate.buy_quote(payload.btc_amount)?;

let bob_inputs = self
Expand Down Expand Up @@ -197,9 +193,34 @@ impl<R, RS> Bobtimus<R, RS> {
}
}

#[async_trait]
pub trait LatestRate {
async fn latest_rate(&mut self) -> Result<Rate>;
fn latest_rate(&mut self) -> Rate;
}

#[derive(Clone)]
pub struct RateSubscription {
receiver: Receiver<Rate>,
}

impl From<Receiver<Rate>> for RateSubscription {
fn from(receiver: Receiver<Rate>) -> Self {
Self { receiver }
}
}

impl RateSubscription {
pub fn into_stream(self) -> impl Stream<Item = Result<Rate>> {
stream::try_unfold(self.receiver, |mut receiver| async move {
receiver
.changed()
.await
.context("failed to receive latest rate update")?;

let latest_rate = *receiver.borrow();

Ok(Some((latest_rate, receiver)))
})
}
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions elements-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ elements-fun = { path = "../elements-fun", features = [ "serde" ] }
futures = "0.3.5"
hex = "0.4.2"
hmac = "0.10"
jsonrpc_client = { git = "https://github.com/thomaseizinger/rust-jsonrpc-client", branch = "master", features = [ "reqwest" ] }
jsonrpc_client = { version = "0.5", features = [ "reqwest" ] }
log = "0.4"
rand = "0.7"
reqwest = { version = "0.10" }
reqwest = "0.11"
serde = "1.0"
serde_json = "1.0"
sha2 = "0.9"
testcontainers = "0.11"
thiserror = "1.0"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
tracing = "0.1"
url = "2"

Expand Down
2 changes: 1 addition & 1 deletion swap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ thiserror = "1.0.23"
[dev-dependencies]
elements-harness = { path = "../elements-harness" }
testcontainers = "0.11"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
2 changes: 1 addition & 1 deletion waves/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ js-sys = "0.3"
log = "0.4"
rand = { version = "0.6", features = [ "wasm-bindgen" ] }
rand_core = { version = "0.5", features = [ "std" ] }
reqwest = { version = "0.10", default-features = false, features = [ "rustls", "json" ] }
reqwest = { version = "0.11", default-features = false, features = [ "rustls", "json" ] }
rust_decimal = "1"
scrypt = { version = "0.5" }
serde = { version = "1", features = [ "derive" ] }
Expand Down

0 comments on commit 367064e

Please sign in to comment.