Skip to content
This repository was archived by the owner on Nov 26, 2024. It is now read-only.

feat: add copy insert support #56

Draft
wants to merge 1 commit into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ pub use traits::*;

/// PostGres trait implementations
mod mevdb;
pub use mevdb::{BatchInserts, MevDB};
pub use mevdb::{copy_insert, BatchInserts, MevDB};

mod prices;
pub use prices::HistoricalPrice;
Expand Down
6 changes: 6 additions & 0 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,7 @@ async fn run<M: Middleware + Clone + 'static>(provider: M, opts: Opts) -> anyhow
let mut inserts = BatchInserts::new(db, rx);
let mut insert_ctn = 0usize;
let mut error_ctn = 0usize;
let mut evals = Vec::new();
while let Some(res) = inserts.next().await {
match res {
Ok(eval) => {
Expand All @@ -231,6 +232,7 @@ async fn run<M: Middleware + Clone + 'static>(provider: M, opts: Opts) -> anyhow
eval.inspection.hash,
eval.inspection.block_number,
);
evals.push(eval);
}
Err(err) => {
error_ctn += 1;
Expand All @@ -242,6 +244,10 @@ async fn run<M: Middleware + Clone + 'static>(provider: M, opts: Opts) -> anyhow
"inserted evaluations: {}, errors: {}, block range [{}..{}) using {} tasks",
insert_ctn, error_ctn, inner.from, inner.to, inner.tasks
);

let db = inserts.get_database().await;
println!("inserting");
mev_inspect::copy_insert(db, &evals).await?;
}
};
} else {
Expand Down
238 changes: 210 additions & 28 deletions src/mevdb.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,15 @@ use std::collections::VecDeque;
use std::pin::Pin;
use std::task::{Context, Poll};
use thiserror::Error;
use tokio_postgres::{config::Config, Client, NoTls};
use tokio_postgres::binary_copy::BinaryCopyInWriter;
use tokio_postgres::types::ToSql;
use tokio_postgres::{config::Config, Client, Error, NoTls, Statement};

/// Wrapper around PostGres for storing results in the database
pub struct MevDB {
client: Client,
table_name: String,
overwrite: String,
prepared_insert_stmt: Statement,
}

impl MevDB {
Expand All @@ -28,17 +30,63 @@ impl MevDB {
}
});

let table_name = table_name.into();

// TODO: Allow overwriting on conflict
let overwrite = "on conflict do nothing";
client
.batch_execute(&format!(
"CREATE TABLE IF NOT EXISTS {} (
hash text PRIMARY KEY,
status text,

block_number NUMERIC,
gas_price NUMERIC,
gas_used NUMERIC,
revenue NUMERIC,

protocols text[],
actions text[],

eoa text,
contract text,
proxy_impl text,

inserted_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
)",
table_name
))
.await?;

// FIXME: this fails if db does not exist
let prepared_insert_stmt = client
.prepare(&format!(
"INSERT INTO {} (
hash,
status,
block_number,
gas_price,
gas_used,
revenue,
protocols,
actions,
eoa,
contract,
proxy_impl
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
ON CONFLICT DO NOTHING",
table_name,
))
.await?;

Ok(Self {
client,
table_name: table_name.into(),
overwrite: overwrite.to_owned(),
table_name,
prepared_insert_stmt,
})
}

/// Creates a new table for the MEV data
pub async fn create(&mut self) -> Result<(), DbError> {
pub async fn create(&self) -> Result<(), DbError> {
self.client
.batch_execute(&format!(
"CREATE TABLE IF NOT EXISTS {} (
Expand Down Expand Up @@ -66,27 +114,10 @@ impl MevDB {
}

/// Inserts data from this evaluation to PostGres
pub async fn insert(&mut self, evaluation: &Evaluation) -> Result<(), DbError> {
pub async fn insert(&self, evaluation: &Evaluation) -> Result<(), DbError> {
self.client
.execute(
format!(
"INSERT INTO {} (
hash,
status,
block_number,
gas_price,
gas_used,
revenue,
protocols,
actions,
eoa,
contract,
proxy_impl
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11)
{}",
self.table_name, self.overwrite,
)
.as_str(),
&self.prepared_insert_stmt,
&[
&format!("{:?}", evaluation.inspection.hash),
&format!("{:?}", evaluation.inspection.status),
Expand All @@ -111,7 +142,7 @@ impl MevDB {
}

/// Checks if the transaction hash is already inspected
pub async fn exists(&mut self, hash: TxHash) -> Result<bool, DbError> {
pub async fn exists(&self, hash: TxHash) -> Result<bool, DbError> {
let rows = self
.client
.query(
Expand All @@ -128,7 +159,7 @@ impl MevDB {
}

/// Checks if the provided block has been inspected
pub async fn block_exists(&mut self, block: u64) -> Result<bool, DbError> {
pub async fn block_exists(&self, block: u64) -> Result<bool, DbError> {
let rows = self
.client
.query(
Expand All @@ -147,7 +178,7 @@ impl MevDB {
}
}

pub async fn clear(&mut self) -> Result<(), DbError> {
pub async fn clear(&self) -> Result<(), DbError> {
self.client
.batch_execute(&format!("DROP TABLE {}", self.table_name))
.await?;
Expand Down Expand Up @@ -307,6 +338,157 @@ async fn insert_evaluation(
}
}

type CopyInsert<'a> = Pin<Box<dyn Future<Output = CopyInsertOutput<'a>>>>;

type CopyInsertOutput<'a> = (
Pin<&'a mut BinaryCopyInWriter>,
Vec<Result<Evaluation, (Evaluation, DbError)>>,
);

pub async fn writer_copy_insert<'a>(
mut writer: Pin<&'a mut BinaryCopyInWriter>,
mut evals: VecDeque<Evaluation>,
) -> CopyInsertOutput<'a> {
let mut results = Vec::with_capacity(evals.len());

while let Some(evaluation) = evals.pop_front() {
let sql = vec![
Box::new(format!("{:?}", evaluation.inspection.hash)) as Box<dyn ToSql + Sync>,
Box::new(format!("{:?}", evaluation.inspection.status)),
Box::new(Decimal::from(evaluation.inspection.block_number)),
Box::new(u256_decimal(evaluation.gas_price).unwrap()),
Box::new(u256_decimal(evaluation.gas_used).unwrap()),
Box::new(u256_decimal(evaluation.profit).unwrap()),
Box::new(vec_str(&evaluation.inspection.protocols)),
Box::new(vec_str(&evaluation.actions)),
Box::new(format!("{:?}", evaluation.inspection.from)),
Box::new(format!("{:?}", evaluation.inspection.contract)),
Box::new(
evaluation
.inspection
.proxy_impl
.map(|x| format!("{:?}", x))
.unwrap_or_else(|| "".to_owned()),
),
];

if let Err(err) = writer
.as_mut()
.write(&sql.iter().map(|x| &**x).collect::<Vec<_>>())
.await
{
results.push(Err((evaluation, err.into())))
} else {
results.push(Ok(evaluation))
}
}

(writer, results)
}

pub async fn copy_insert(db: MevDB, evals: &[Evaluation]) -> Result<MevDB, DbError> {
use tokio_postgres::types::*;

let types = &[
Type::TEXT,
Type::TEXT,
Type::NUMERIC,
Type::NUMERIC,
Type::NUMERIC,
Type::NUMERIC,
Type::TEXT_ARRAY,
Type::TEXT_ARRAY,
Type::TEXT,
Type::TEXT,
Type::TEXT,
];

let data: Vec<Vec<Box<dyn ToSql + Sync>>> = evals
.into_iter()
.map(|evaluation| {
vec![
Box::new(format!("{:?}", evaluation.inspection.hash)) as Box<dyn ToSql + Sync>,
Box::new(format!("{:?}", evaluation.inspection.status)),
Box::new(Decimal::from(evaluation.inspection.block_number)),
Box::new(u256_decimal(evaluation.gas_price).unwrap()),
Box::new(u256_decimal(evaluation.gas_used).unwrap()),
Box::new(u256_decimal(evaluation.profit).unwrap()),
Box::new(vec_str(&evaluation.inspection.protocols)),
Box::new(vec_str(&evaluation.actions)),
Box::new(format!("{:?}", evaluation.inspection.from)),
Box::new(format!("{:?}", evaluation.inspection.contract)),
Box::new(
evaluation
.inspection
.proxy_impl
.map(|x| format!("{:?}", x))
.unwrap_or_else(|| "".to_owned()),
),
]
})
.collect();

db.client
.batch_execute(
"CREATE TABLE IF NOT EXISTS tmp_mev_inspections (
hash text PRIMARY KEY,
status text,

block_number NUMERIC,
gas_price NUMERIC,
gas_used NUMERIC,
revenue NUMERIC,

protocols text[],
actions text[],

eoa text,
contract text,
proxy_impl text
)",
)
.await?;
println!("created tmp db");

let stmt = db
.client
.prepare(
"COPY tmp_mev_inspections(
hash,
status,
block_number,
gas_price,
gas_used,
revenue,

protocols,
actions,

eoa,
contract,
proxy_impl
)
FROM STDIN (FORMAT binary)",
)
.await?;

let sink = db.client.copy_in(&stmt).await?;

let writer = BinaryCopyInWriter::new(sink, types);
futures::pin_mut!(writer);

for data in data {
println!("inserted row");
writer
.as_mut()
.write(&data.iter().map(|x| &**x).collect::<Vec<_>>())
.await?;
}

writer.finish().await?;
Ok(db)
}

#[derive(Error, Debug)]
pub enum InsertEvaluationError<M: Middleware + 'static> {
#[error(transparent)]
Expand Down