Skip to content

Commit

Permalink
Upgrade to tokio 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
thomaseizinger committed Jan 22, 2021
1 parent 85094e6 commit 1b3b069
Show file tree
Hide file tree
Showing 8 changed files with 297 additions and 210 deletions.
395 changes: 272 additions & 123 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions bobtimus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ env_logger = "0.8"
futures = { version = "0.3", default-features = false }
hex = "0.4"
hmac = "0.10"
http-api-problem = { version = "0.15", features = [ "with_warp" ] }
http-api-problem = { version = "0.19", features = [ "with_warp" ] }
log = "0.4"
mime_guess = "2.0.3"
reqwest = "0.10"
reqwest = "0.11"
rust-embed = "5.7.0"
rust_decimal = "1.8"
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"
sha2 = "0.9"
structopt = "0.3"
swap = { path = "../swap" }
tokio = { version = "0.2", features = [ "macros" ] }
tokio-tungstenite = { version = "0.11", features = [ "tls" ] }
warp = "0.2.5"
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
tokio-tungstenite = { version = "0.13", features = [ "tls" ] }
warp = { version = "0.3", default-features = false }

[dev-dependencies]
testcontainers = "0.11"
16 changes: 6 additions & 10 deletions bobtimus/src/fixed_rate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use std::{convert::TryFrom, time::Duration};
use tokio::{
sync::watch::{self, Receiver},
time::delay_for,
time::sleep,
};

#[derive(Clone)]
Expand All @@ -18,16 +15,16 @@ impl Service {

tokio::spawn(async move {
loop {
let _ = tx.broadcast(data);
let _ = tx.send(data);

delay_for(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
});

Self(rx)
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
pub fn subscribe(&self) -> Receiver<Rate> {
self.0.clone()
}
}
Expand All @@ -38,10 +35,9 @@ impl Default for Service {
}
}

#[async_trait]
impl LatestRate for Service {
async fn latest_rate(&mut self) -> Result<Rate> {
Ok(fixed_rate())
fn latest_rate(&mut self) -> Rate {
fixed_rate()
}
}

Expand Down
66 changes: 7 additions & 59 deletions bobtimus/src/kraken.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use futures::{Future, SinkExt, Stream, StreamExt};
use futures::{SinkExt, StreamExt};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -22,38 +21,17 @@ const SUBSCRIBE_XBT_USD_TICKER_PAYLOAD: &str = r#"
#[derive(Clone)]
pub struct RateService {
receiver: Receiver<Rate>,
latest_rate: Rate,
}

impl Future for RateService {
type Output = Rate;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.receiver.poll_next_unpin(cx) {
std::task::Poll::Ready(Some(rate)) => {
self.latest_rate = rate;
self.poll(cx)
}
std::task::Poll::Ready(None) | std::task::Poll::Pending => {
std::task::Poll::from(self.latest_rate)
}
}
}
}

#[async_trait]
impl LatestRate for RateService {
async fn latest_rate(&mut self) -> anyhow::Result<Rate> {
Ok(self.await)
fn latest_rate(&mut self) -> Rate {
*self.receiver.borrow()
}
}

impl RateService {
pub async fn new() -> Result<Self> {
let (tx, mut rx) = watch::channel(Rate::ZERO);
let (tx, rx) = watch::channel(Rate::ZERO);

let (ws, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
Expand Down Expand Up @@ -84,24 +62,16 @@ impl RateService {
}
};

let _ = tx.broadcast(rate);
let _ = tx.send(rate);
}
});

write.send(SUBSCRIBE_XBT_USD_TICKER_PAYLOAD.into()).await?;

let latest_rate = rx
.next()
.await
.ok_or_else(|| anyhow!("latest rate stream has ended"))?;

Ok(Self {
receiver: rx,
latest_rate,
})
Ok(Self { receiver: rx })
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
pub fn subscribe(&self) -> Receiver<Rate> {
self.receiver.clone()
}
}
Expand Down Expand Up @@ -172,26 +142,4 @@ mod tests {

let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
}

#[tokio::test]
async fn latest_rate_does_not_wait_for_next_value() {
let (write, read) = watch::channel(Rate::ZERO);

let latest_rate = Rate {
ask: LiquidUsdt::from_str_in_dollar("20000.0").unwrap(),
bid: LiquidUsdt::from_str_in_dollar("19000.0").unwrap(),
};
let _ = write.broadcast(latest_rate).unwrap();

let mut service = RateService {
receiver: read,
latest_rate: Rate::ZERO,
};

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);
}
}
10 changes: 2 additions & 8 deletions bobtimus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use elements_fun::{
bitcoin::{
secp256k1::{All, Secp256k1},
Expand Down Expand Up @@ -55,11 +54,7 @@ impl<R, RS> Bobtimus<R, RS> {
R: RngCore + CryptoRng,
RS: LatestRate,
{
let latest_rate = self
.rate_service
.latest_rate()
.await
.context("failed to get latest rate")?;
let latest_rate = self.rate_service.latest_rate();
let usdt_amount = latest_rate.buy_quote(payload.btc_amount)?;

let bob_inputs = self
Expand Down Expand Up @@ -197,9 +192,8 @@ impl<R, RS> Bobtimus<R, RS> {
}
}

#[async_trait]
pub trait LatestRate {
async fn latest_rate(&mut self) -> Result<Rate>;
fn latest_rate(&mut self) -> Rate;
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions elements-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ elements-fun = { path = "../elements-fun", features = [ "serde" ] }
futures = "0.3.5"
hex = "0.4.2"
hmac = "0.10"
jsonrpc_client = { git = "https://github.com/thomaseizinger/rust-jsonrpc-client", branch = "master", features = [ "reqwest" ] }
jsonrpc_client = { version = "0.5", features = [ "reqwest" ] }
log = "0.4"
rand = "0.7"
reqwest = { version = "0.10" }
reqwest = "0.11"
serde = "1.0"
serde_json = "1.0"
sha2 = "0.9"
testcontainers = "0.11"
thiserror = "1.0"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
tracing = "0.1"
url = "2"

Expand Down
2 changes: 1 addition & 1 deletion swap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ thiserror = "1.0.23"
[dev-dependencies]
elements-harness = { path = "../elements-harness" }
testcontainers = "0.11"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
2 changes: 1 addition & 1 deletion waves/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ js-sys = "0.3"
log = "0.4"
rand = { version = "0.6", features = [ "wasm-bindgen" ] }
rand_core = { version = "0.5", features = [ "std" ] }
reqwest = { version = "0.10", default-features = false, features = [ "rustls", "json" ] }
reqwest = { version = "0.11", default-features = false, features = [ "rustls", "json" ] }
rust_decimal = "1"
scrypt = { version = "0.5" }
serde = { version = "1", features = [ "derive" ] }
Expand Down

0 comments on commit 1b3b069

Please sign in to comment.