Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to tokio 1.0 #82

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
454 changes: 190 additions & 264 deletions Cargo.lock

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions bobtimus/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,20 @@ env_logger = "0.8"
futures = { version = "0.3", default-features = false }
hex = "0.4"
hmac = "0.10"
http-api-problem = { version = "0.15", features = [ "with_warp" ] }
http-api-problem = { version = "0.21", features = [ "warp" ] }
log = "0.4"
mime_guess = "2.0.3"
reqwest = "0.10"
reqwest = "0.11"
rust-embed = "5.7.0"
rust_decimal = "1.8"
serde = { version = "1", features = [ "derive" ] }
serde_json = "1"
sha2 = "0.9"
structopt = "0.3"
swap = { path = "../swap" }
tokio = { version = "0.2", features = [ "macros" ] }
tokio-tungstenite = { version = "0.11", features = [ "tls" ] }
warp = "0.2.5"
tokio = { version = "1", features = [ "macros", "rt-multi-thread" ] }
tokio-tungstenite = { version = "0.13", features = [ "tls" ] }
warp = { version = "0.3", default-features = false }

[dev-dependencies]
testcontainers = "0.11"
20 changes: 8 additions & 12 deletions bobtimus/src/fixed_rate.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,8 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use anyhow::Result;
use async_trait::async_trait;
use futures::Stream;
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
use std::{convert::TryFrom, time::Duration};
use tokio::{
sync::watch::{self, Receiver},
time::delay_for,
time::sleep,
};

#[derive(Clone)]
Expand All @@ -18,17 +15,17 @@ impl Service {

tokio::spawn(async move {
loop {
let _ = tx.broadcast(data);
let _ = tx.send(data);

delay_for(Duration::from_secs(5)).await;
sleep(Duration::from_secs(5)).await;
}
});

Self(rx)
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
self.0.clone()
pub fn subscribe(&self) -> RateSubscription {
RateSubscription::from(self.0.clone())
}
}

Expand All @@ -38,10 +35,9 @@ impl Default for Service {
}
}

#[async_trait]
impl LatestRate for Service {
async fn latest_rate(&mut self) -> Result<Rate> {
Ok(fixed_rate())
fn latest_rate(&mut self) -> Rate {
fixed_rate()
}
}

Expand Down
56 changes: 45 additions & 11 deletions bobtimus/src/http.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, Rate};
use crate::{problem, Bobtimus, CreateSwapPayload, LatestRate, RateSubscription};
use anyhow::Context;
use elements_fun::{
encode::serialize_hex,
secp256k1::rand::{CryptoRng, RngCore},
};
use futures::{Stream, StreamExt};
use futures::{StreamExt, TryStreamExt};
use rust_embed::RustEmbed;
use std::{convert::Infallible, sync::Arc};
use std::{error::Error, fmt, sync::Arc};
use tokio::sync::Mutex;
use warp::{
filters::BoxedFilter, http::header::HeaderValue, path::Tail, reply::Response, Filter,
Expand All @@ -18,7 +19,7 @@ struct Waves;

pub fn routes<R, RS>(
bobtimus: Arc<Mutex<Bobtimus<R, RS>>>,
latest_rate_subscription: impl Stream<Item = Rate> + Clone + Send + Sync + 'static,
latest_rate_subscription: RateSubscription,
) -> BoxedFilter<(impl Reply,)>
where
R: RngCore + CryptoRng + Clone + Send + Sync + 'static,
Expand Down Expand Up @@ -73,13 +74,46 @@ where
.map_err(warp::reject::custom)
}

fn latest_rate<S>(stream: S) -> impl Reply
where
S: Stream<Item = Rate> + Clone + Send + 'static,
{
warp::sse::reply(warp::sse::keep_alive().stream(stream.map(|data| {
Result::<_, Infallible>::Ok((warp::sse::event("rate"), warp::sse::json(data)))
})))
fn latest_rate(subscription: RateSubscription) -> impl Reply {
let stream = subscription
.into_stream()
.map_ok(|data| {
let event = warp::sse::Event::default()
.id("rate")
.json_data(data)
.context("failed to attach json data to sse event")?;

Ok(event)
})
.map(|result| match result {
Ok(Ok(ok)) => Ok(ok),
Ok(Err(e)) => Err(e),
Err(e) => Err(e),
})
.err_into::<RateStreamError>();

warp::sse::reply(warp::sse::keep_alive().stream(stream))
}

#[derive(Debug)]
struct RateStreamError(anyhow::Error);

impl fmt::Display for RateStreamError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
write!(f, "{:#}", self.0)
}
}

impl std::error::Error for RateStreamError {
fn source(&self) -> Option<&(dyn Error + 'static)> {
self.0.source()
}
}

impl From<anyhow::Error> for RateStreamError {
fn from(e: anyhow::Error) -> Self {
RateStreamError(e)
}
}

async fn serve_index() -> Result<impl Reply, Rejection> {
Expand Down
70 changes: 9 additions & 61 deletions bobtimus/src/kraken.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::{LatestRate, LiquidUsdt, Rate};
use crate::{LatestRate, LiquidUsdt, Rate, RateSubscription};
use anyhow::{anyhow, bail, Result};
use async_trait::async_trait;
use futures::{Future, SinkExt, Stream, StreamExt};
use futures::{SinkExt, StreamExt};
use reqwest::Url;
use serde::{Deserialize, Serialize};
use serde_json::Value;
Expand All @@ -22,38 +21,17 @@ const SUBSCRIBE_XBT_USD_TICKER_PAYLOAD: &str = r#"
#[derive(Clone)]
pub struct RateService {
receiver: Receiver<Rate>,
latest_rate: Rate,
}

impl Future for RateService {
type Output = Rate;

fn poll(
mut self: std::pin::Pin<&mut Self>,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Self::Output> {
match self.receiver.poll_next_unpin(cx) {
std::task::Poll::Ready(Some(rate)) => {
self.latest_rate = rate;
self.poll(cx)
}
std::task::Poll::Ready(None) | std::task::Poll::Pending => {
std::task::Poll::from(self.latest_rate)
}
}
}
}

#[async_trait]
impl LatestRate for RateService {
async fn latest_rate(&mut self) -> anyhow::Result<Rate> {
Ok(self.await)
fn latest_rate(&mut self) -> Rate {
*self.receiver.borrow()
}
}

impl RateService {
pub async fn new() -> Result<Self> {
let (tx, mut rx) = watch::channel(Rate::ZERO);
let (tx, rx) = watch::channel(Rate::ZERO);

let (ws, _response) =
tokio_tungstenite::connect_async(Url::parse(KRAKEN_WS_URL).expect("valid url")).await?;
Expand Down Expand Up @@ -84,25 +62,17 @@ impl RateService {
}
};

let _ = tx.broadcast(rate);
let _ = tx.send(rate);
}
});

write.send(SUBSCRIBE_XBT_USD_TICKER_PAYLOAD.into()).await?;

let latest_rate = rx
.next()
.await
.ok_or_else(|| anyhow!("latest rate stream has ended"))?;

Ok(Self {
receiver: rx,
latest_rate,
})
Ok(Self { receiver: rx })
}

pub fn subscribe(&self) -> impl Stream<Item = Rate> + Clone {
self.receiver.clone()
pub fn subscribe(&self) -> RateSubscription {
RateSubscription::from(self.receiver.clone())
}
}

Expand Down Expand Up @@ -172,26 +142,4 @@ mod tests {

let _ = serde_json::from_str::<TickerUpdate>(sample_response).unwrap();
}

#[tokio::test]
async fn latest_rate_does_not_wait_for_next_value() {
let (write, read) = watch::channel(Rate::ZERO);

let latest_rate = Rate {
ask: LiquidUsdt::from_str_in_dollar("20000.0").unwrap(),
bid: LiquidUsdt::from_str_in_dollar("19000.0").unwrap(),
};
let _ = write.broadcast(latest_rate).unwrap();

let mut service = RateService {
receiver: read,
latest_rate: Rate::ZERO,
};

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);

let rate = service.latest_rate().await.unwrap();
assert_eq!(rate, latest_rate);
}
}
39 changes: 30 additions & 9 deletions bobtimus/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
use anyhow::{Context, Result};
use async_trait::async_trait;
use elements_fun::{
bitcoin::{
secp256k1::{All, Secp256k1},
Expand All @@ -12,8 +11,9 @@ use elements_fun::{
Address, AssetId, OutPoint, Transaction, TxIn,
};
use elements_harness::{elementd_rpc::ElementsRpc, Client as ElementsdClient};
use futures::{stream::FuturesUnordered, TryStreamExt};
use futures::{stream, stream::FuturesUnordered, Stream, TryStreamExt};
use serde::{Deserialize, Serialize};
use tokio::sync::watch::Receiver;

mod amounts;

Expand Down Expand Up @@ -55,11 +55,7 @@ impl<R, RS> Bobtimus<R, RS> {
R: RngCore + CryptoRng,
RS: LatestRate,
{
let latest_rate = self
.rate_service
.latest_rate()
.await
.context("failed to get latest rate")?;
let latest_rate = self.rate_service.latest_rate();
let usdt_amount = latest_rate.buy_quote(payload.btc_amount)?;

let bob_inputs = self
Expand Down Expand Up @@ -197,9 +193,34 @@ impl<R, RS> Bobtimus<R, RS> {
}
}

#[async_trait]
pub trait LatestRate {
async fn latest_rate(&mut self) -> Result<Rate>;
fn latest_rate(&mut self) -> Rate;
}

#[derive(Clone)]
pub struct RateSubscription {
receiver: Receiver<Rate>,
}

impl From<Receiver<Rate>> for RateSubscription {
fn from(receiver: Receiver<Rate>) -> Self {
Self { receiver }
}
}

impl RateSubscription {
pub fn into_stream(self) -> impl Stream<Item = Result<Rate>> {
stream::try_unfold(self.receiver, |mut receiver| async move {
receiver
.changed()
.await
.context("failed to receive latest rate update")?;

let latest_rate = *receiver.borrow();

Ok(Some((latest_rate, receiver)))
})
}
}

#[cfg(test)]
Expand Down
6 changes: 3 additions & 3 deletions elements-harness/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,16 +11,16 @@ elements-fun = { path = "../elements-fun", features = [ "serde" ] }
futures = "0.3.5"
hex = "0.4.2"
hmac = "0.10"
jsonrpc_client = { git = "https://github.com/thomaseizinger/rust-jsonrpc-client", branch = "master", features = [ "reqwest" ] }
jsonrpc_client = { version = "0.5", features = [ "reqwest" ] }
log = "0.4"
rand = "0.7"
reqwest = { version = "0.10" }
reqwest = "0.11"
serde = "1.0"
serde_json = "1.0"
sha2 = "0.9"
testcontainers = "0.11"
thiserror = "1.0"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
tracing = "0.1"
url = "2"

Expand Down
2 changes: 1 addition & 1 deletion swap/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ thiserror = "1.0.23"
[dev-dependencies]
elements-harness = { path = "../elements-harness" }
testcontainers = "0.11"
tokio = { version = "0.2", default-features = false, features = [ "blocking", "macros", "rt-core", "time" ] }
tokio = { version = "1", default-features = false, features = [ "macros", "rt", "time" ] }
2 changes: 1 addition & 1 deletion waves/wallet/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ js-sys = "0.3"
log = "0.4"
rand = { version = "0.6", features = [ "wasm-bindgen" ] }
rand_core = { version = "0.5", features = [ "std" ] }
reqwest = { version = "0.10", default-features = false, features = [ "rustls", "json" ] }
reqwest = { version = "0.11", default-features = false, features = [ "rustls", "json" ] }
rust_decimal = "1"
scrypt = { version = "0.5" }
serde = { version = "1", features = [ "derive" ] }
Expand Down