Skip to content

Commit 4da63d3

Browse files
committed
refactor(app): state module
1 parent 49241a8 commit 4da63d3

File tree

14 files changed

+214
-156
lines changed

14 files changed

+214
-156
lines changed

core/application/src/app.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -10,8 +10,8 @@ use lightning_interfaces::types::{ChainId, NodeInfo};
1010
use tracing::{error, info};
1111

1212
use crate::config::{Config, StorageConfig};
13-
use crate::env::{Env, UpdateWorker};
14-
use crate::query_runner::QueryRunner;
13+
use crate::env::{ApplicationEnv, Env, UpdateWorker};
14+
use crate::state::QueryRunner;
1515
pub struct Application<C: Collection> {
1616
update_socket: Mutex<Option<ExecutionEngineSocket>>,
1717
query_runner: QueryRunner,
@@ -102,7 +102,7 @@ impl<C: Collection> ApplicationInterface<C> for Application<C> {
102102
let mut counter = 0;
103103

104104
loop {
105-
match Env::new(config, Some((checkpoint_hash, &checkpoint))) {
105+
match ApplicationEnv::new(config, Some((checkpoint_hash, &checkpoint))) {
106106
Ok(mut env) => {
107107
info!(
108108
"Successfully built database from checkpoint with hash {checkpoint_hash:?}"

core/application/src/env.rs

+20-81
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
1-
use std::collections::BTreeSet;
21
use std::path::Path;
32
use std::time::Duration;
43

54
use affair::AsyncWorker as WorkerTrait;
65
use anyhow::{Context, Result};
7-
use atomo::{Atomo, AtomoBuilder, DefaultSerdeBackend, QueryPerm, StorageBackend, UpdatePerm};
6+
use atomo::{AtomoBuilder, DefaultSerdeBackend, SerdeBackend, StorageBackend};
87
use atomo_rocks::{Cache as RocksCache, Env as RocksEnv, Options};
98
use fleek_crypto::{ClientPublicKey, ConsensusPublicKey, EthAddress, NodePublicKey};
109
use hp_fixed::unsigned::HpUfixed;
1110
use lightning_interfaces::prelude::*;
1211
use lightning_interfaces::types::{
1312
AccountInfo,
14-
Blake3Hash,
1513
Block,
1614
BlockExecutionResponse,
1715
Committee,
@@ -24,31 +22,28 @@ use lightning_interfaces::types::{
2422
NodeInfo,
2523
NodeServed,
2624
ProtocolParams,
27-
ReportedReputationMeasurements,
2825
Service,
2926
ServiceId,
30-
ServiceRevenue,
3127
TotalServed,
3228
TransactionReceipt,
3329
TransactionResponse,
34-
TxHash,
3530
Value,
3631
};
3732
use lightning_metrics::increment_counter;
3833
use tracing::warn;
3934

4035
use crate::config::{Config, StorageConfig};
4136
use crate::genesis::GenesisPrices;
42-
use crate::query_runner::QueryRunner;
43-
use crate::state::State;
37+
use crate::state::{ApplicationState, QueryRunner};
4438
use crate::storage::{AtomoStorage, AtomoStorageBuilder};
45-
use crate::table::StateTables;
4639

47-
pub struct Env<P, B: StorageBackend> {
48-
pub inner: Atomo<P, B>,
40+
pub struct Env<B: StorageBackend, S: SerdeBackend> {
41+
pub inner: ApplicationState<B, S>,
4942
}
5043

51-
impl Env<UpdatePerm, AtomoStorage> {
44+
pub type ApplicationEnv = Env<AtomoStorage, DefaultSerdeBackend>;
45+
46+
impl ApplicationEnv {
5247
pub fn new(config: &Config, checkpoint: Option<([u8; 32], &[u8])>) -> Result<Self> {
5348
let storage = match config.storage {
5449
StorageConfig::RocksDb => {
@@ -83,60 +78,17 @@ impl Env<UpdatePerm, AtomoStorage> {
8378
StorageConfig::InMemory => AtomoStorageBuilder::new::<&Path>(None),
8479
};
8580

86-
let mut atomo = AtomoBuilder::<AtomoStorageBuilder, DefaultSerdeBackend>::new(storage);
87-
atomo = atomo
88-
.with_table::<Metadata, Value>("metadata")
89-
.with_table::<EthAddress, AccountInfo>("account")
90-
.with_table::<ClientPublicKey, EthAddress>("client_keys")
91-
.with_table::<NodeIndex, NodeInfo>("node")
92-
.with_table::<ConsensusPublicKey, NodeIndex>("consensus_key_to_index")
93-
.with_table::<NodePublicKey, NodeIndex>("pub_key_to_index")
94-
.with_table::<(NodeIndex, NodeIndex), Duration>("latencies")
95-
.with_table::<Epoch, Committee>("committee")
96-
.with_table::<ServiceId, Service>("service")
97-
.with_table::<ProtocolParams, u128>("parameter")
98-
.with_table::<NodeIndex, Vec<ReportedReputationMeasurements>>("rep_measurements")
99-
.with_table::<NodeIndex, u8>("rep_scores")
100-
.with_table::<NodeIndex, u8>("submitted_rep_measurements")
101-
.with_table::<NodeIndex, NodeServed>("current_epoch_served")
102-
.with_table::<NodeIndex, NodeServed>("last_epoch_served")
103-
.with_table::<Epoch, TotalServed>("total_served")
104-
.with_table::<CommodityTypes, HpUfixed<6>>("commodity_prices")
105-
.with_table::<ServiceId, ServiceRevenue>("service_revenue")
106-
.with_table::<TxHash, ()>("executed_digests")
107-
.with_table::<NodeIndex, u8>("uptime")
108-
.with_table::<Blake3Hash, BTreeSet<NodeIndex>>("uri_to_node")
109-
.with_table::<NodeIndex, BTreeSet<Blake3Hash>>("node_to_uri")
110-
.enable_iter("current_epoch_served")
111-
.enable_iter("rep_measurements")
112-
.enable_iter("submitted_rep_measurements")
113-
.enable_iter("rep_scores")
114-
.enable_iter("latencies")
115-
.enable_iter("node")
116-
.enable_iter("executed_digests")
117-
.enable_iter("uptime")
118-
.enable_iter("service_revenue")
119-
.enable_iter("uri_to_node")
120-
.enable_iter("node_to_uri");
121-
122-
#[cfg(debug_assertions)]
123-
{
124-
atomo = atomo
125-
.enable_iter("consensus_key_to_index")
126-
.enable_iter("pub_key_to_index");
127-
}
81+
let atomo = AtomoBuilder::<AtomoStorageBuilder, DefaultSerdeBackend>::new(storage);
12882

12983
Ok(Self {
130-
inner: atomo.build()?,
84+
inner: ApplicationState::build(atomo)?,
13185
})
13286
}
13387

13488
pub fn query_runner(&self) -> QueryRunner {
135-
QueryRunner::new(self.inner.query())
89+
self.inner.query()
13690
}
137-
}
13891

139-
impl<B: StorageBackend> Env<UpdatePerm, B> {
14092
#[autometrics::autometrics]
14193
async fn run<F, P>(&mut self, mut block: Block, get_putter: F) -> BlockExecutionResponse
14294
where
@@ -145,10 +97,7 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
14597
{
14698
let response = self.inner.run(move |ctx| {
14799
// Create the app/execution environment
148-
let backend = StateTables {
149-
table_selector: ctx,
150-
};
151-
let app = State::new(backend);
100+
let app = ApplicationState::executor(ctx);
152101
let last_block_hash = app.get_block_hash();
153102

154103
let block_number = app.get_block_number() + 1;
@@ -199,14 +148,14 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
199148
response.txn_receipts.push(receipt);
200149
}
201150

151+
// Set the last executed block hash and sub dag index
202152
// if epoch changed a new committee starts and subdag starts back at 0
203153
let (new_sub_dag_index, new_sub_dag_round) = if response.change_epoch {
204154
(0, 0)
205155
} else {
206156
(block.sub_dag_index, block.sub_dag_round)
207157
};
208-
// Set the last executed block hash and sub dag index
209-
app.set_last_block(block.digest, new_sub_dag_index, new_sub_dag_round);
158+
app.set_last_block(response.block_hash, new_sub_dag_index, new_sub_dag_round);
210159

211160
// Return the response
212161
response
@@ -243,13 +192,6 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
243192
response
244193
}
245194

246-
/// Returns an identical environment but with query permissions
247-
pub fn query_socket(&self) -> Env<QueryPerm, B> {
248-
Env {
249-
inner: self.inner.query(),
250-
}
251-
}
252-
253195
/// Tries to seeds the application state with the genesis block
254196
/// This function will panic if the genesis file cannot be decoded into the correct types
255197
/// Will return true if database was empty and genesis needed to be loaded or false if there was
@@ -461,37 +403,34 @@ impl<B: StorageBackend> Env<UpdatePerm, B> {
461403
}
462404
}
463405

464-
metadata_table.insert(Metadata::Epoch, Value::Epoch(0));
465-
Ok(true)
406+
metadata_table.insert(Metadata::Epoch, Value::Epoch(0));
407+
Ok(true)
466408
})
467409
}
468410

469411
// Should only be called after saving or loading from an epoch checkpoint
470412
pub fn update_last_epoch_hash(&mut self, state_hash: [u8; 32]) {
471413
self.inner.run(move |ctx| {
472-
let backend = StateTables {
473-
table_selector: ctx,
474-
};
475-
let app = State::new(backend);
414+
let app = ApplicationState::executor(ctx);
476415
app.set_last_epoch_hash(state_hash);
477416
})
478417
}
479418
}
480419

481-
impl Default for Env<UpdatePerm, AtomoStorage> {
420+
impl Default for ApplicationEnv {
482421
fn default() -> Self {
483422
Self::new(&Config::default(), None).unwrap()
484423
}
485424
}
486425

487426
/// The socket that receives all update transactions
488427
pub struct UpdateWorker<C: Collection> {
489-
env: Env<UpdatePerm, AtomoStorage>,
428+
env: ApplicationEnv,
490429
blockstore: C::BlockstoreInterface,
491430
}
492431

493432
impl<C: Collection> UpdateWorker<C> {
494-
pub fn new(env: Env<UpdatePerm, AtomoStorage>, blockstore: C::BlockstoreInterface) -> Self {
433+
pub fn new(env: ApplicationEnv, blockstore: C::BlockstoreInterface) -> Self {
495434
Self { env, blockstore }
496435
}
497436
}
@@ -518,7 +457,7 @@ mod env_tests {
518457
.write_to_dir(temp_dir.path().to_path_buf().try_into().unwrap())
519458
.unwrap();
520459
let config = Config::test(genesis_path);
521-
let mut env = Env::<_, AtomoStorage>::new(&config, None).unwrap();
460+
let mut env = ApplicationEnv::new(&config, None).unwrap();
522461

523462
assert!(env.apply_genesis_block(&config).unwrap());
524463

core/application/src/lib.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,7 @@ pub mod config;
33
pub mod env;
44
pub mod genesis;
55
pub mod network;
6-
pub mod query_runner;
76
pub mod state;
8-
pub(crate) mod storage;
9-
pub mod table;
7+
pub mod storage;
108
#[cfg(test)]
119
mod tests;

core/application/src/table.rs core/application/src/state/context.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,11 @@ pub trait TableRef<K, V> {
2626
fn remove(&self, key: &K);
2727
}
2828

29-
pub struct StateTables<'selector, B: StorageBackend, S: SerdeBackend> {
29+
pub struct StateContext<'selector, B: StorageBackend, S: SerdeBackend> {
3030
pub table_selector: &'selector TableSelector<B, S>,
3131
}
3232

33-
impl<'selector, B: StorageBackend, S: SerdeBackend> Backend for StateTables<'selector, B, S> {
33+
impl<'selector, B: StorageBackend, S: SerdeBackend> Backend for StateContext<'selector, B, S> {
3434
type Ref<
3535
K: Eq + Hash + Send + Serialize + DeserializeOwned + 'static,
3636
V: Clone + Send + Serialize + DeserializeOwned + 'static,

core/application/src/state.rs core/application/src/state/executor.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,7 @@ use rand::rngs::StdRng;
6868
use rand::seq::SliceRandom;
6969
use rand::SeedableRng;
7070

71-
use crate::table::{Backend, TableRef};
71+
use super::context::{Backend, TableRef};
7272

7373
/// Reported measurements are weighted by the reputation score of the reporting node.
7474
/// If there is no reputation score for the reporting node, we use a quantile from the array
@@ -96,11 +96,11 @@ lazy_static! {
9696
static ref BIG_HUNDRED: HpUfixed<18> = HpUfixed::<18>::from(100_u64);
9797
}
9898

99-
/// The state of the Application
99+
/// The state executor encapsuates the logic for executing transactions.
100100
///
101101
/// The functions implemented on this struct are the "Smart Contracts" of the application layer
102102
/// All state changes come from Transactions and start at execute_transaction
103-
pub struct State<B: Backend> {
103+
pub struct StateExecutor<B: Backend> {
104104
pub metadata: B::Ref<Metadata, Value>,
105105
pub account_info: B::Ref<EthAddress, AccountInfo>,
106106
pub client_keys: B::Ref<ClientPublicKey, EthAddress>,
@@ -126,7 +126,7 @@ pub struct State<B: Backend> {
126126
pub backend: B,
127127
}
128128

129-
impl<B: Backend> State<B> {
129+
impl<B: Backend> StateExecutor<B> {
130130
pub fn new(backend: B) -> Self {
131131
Self {
132132
metadata: backend.get_table_reference("metadata"),
@@ -155,6 +155,7 @@ impl<B: Backend> State<B> {
155155
}
156156
}
157157

158+
/// Executes a generic transaction.
158159
pub fn execute_transaction(&self, txn: TransactionRequest) -> TransactionResponse {
159160
let hash = txn.hash();
160161
let (sender, response) = match txn {
@@ -173,9 +174,8 @@ impl<B: Backend> State<B> {
173174
response
174175
}
175176

176-
/// This function is the entry point of a transaction
177+
/// Executes a fleek transaction.
177178
fn execute_fleek_transaction(&self, txn: UpdateRequest) -> TransactionResponse {
178-
// Execute transaction
179179
let response = match txn.payload.method {
180180
UpdateMethod::SubmitDeliveryAcknowledgmentAggregation {
181181
commodity,
@@ -274,6 +274,7 @@ impl<B: Backend> State<B> {
274274
response
275275
}
276276

277+
/// Executes an ethereum transaction.
277278
fn execute_ethereum_transaction(&self, txn: EthersTransaction) -> TransactionResponse {
278279
let to_address = match txn.to {
279280
Some(address) => address,

core/application/src/state/mod.rs

+7
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,7 @@
1+
mod context;
2+
mod executor;
3+
mod query;
4+
mod writer;
5+
6+
pub use query::QueryRunner;
7+
pub use writer::ApplicationState;

0 commit comments

Comments
 (0)