Skip to content
This repository was archived by the owner on Aug 2, 2022. It is now read-only.

Commit

Permalink
include pending generated transactions in scheduling
Browse files Browse the repository at this point in the history
NB: we do not have the facilities to create/process these yet but they can be scheduled
scheduling is generated/signed agnostic though the block format still segregates them
chain_controllers _db now has indices for generated transactions.
tests using all signed transactions for now.

closes #139
  • Loading branch information
Bart Wyatt committed Aug 8, 2017
1 parent 133442b commit d0828b2
Show file tree
Hide file tree
Showing 9 changed files with 200 additions and 69 deletions.
42 changes: 26 additions & 16 deletions libraries/chain/block_schedule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,15 +45,15 @@ typedef std::hash<decltype(AccountName::value)> account_hash;
static account_hash account_hasher;

struct schedule_entry {
schedule_entry(uint _cycle, uint _thread, SignedTransaction const * _transaction)
schedule_entry(uint _cycle, uint _thread, pending_transaction const * _transaction)
: cycle(_cycle)
, thread(_thread)
, transaction(_transaction)
{}

uint cycle;
uint thread;
SignedTransaction const *transaction;
pending_transaction const *transaction;

friend bool operator<( schedule_entry const &l, schedule_entry const &r ) {
if (l.cycle < r.cycle) {
Expand Down Expand Up @@ -92,15 +92,15 @@ static block_schedule from_entries(vector<schedule_entry> &entries) {
// allocations, we cannot emplace_back as that would reverse
// the transactions in a thread
auto &thread = cycle.at(entry.thread);
thread.user_input.emplace(thread.user_input.begin(), entry.transaction);
thread.transactions.emplace(thread.transactions.begin(), entry.transaction);
}

return result;
}

template<typename CONTAINER>
auto initialize_pointer_vector(CONTAINER const &c) {
vector<SignedTransaction const *> result;
vector<pending_transaction const *> result;
result.reserve(c.size());
for (auto const &t : c) {
result.emplace_back(&t);
Expand All @@ -109,12 +109,20 @@ auto initialize_pointer_vector(CONTAINER const &c) {
return result;
}

struct transaction_size_visitor : public fc::visitor<size_t>
{
template <typename T>
size_t operator()(const T &trx_p) const {
return fc::raw::pack_size(*trx_p);
}
};

struct block_size_skipper {
size_t current_size;
size_t const max_size;

bool should_skip(SignedTransaction const *t) const {
size_t transaction_size = fc::raw::pack_size(*t);
bool should_skip(pending_transaction const *t) const {
size_t transaction_size = t->visit(transaction_size_visitor());
// postpone transaction if it would make block too big
if( transaction_size + current_size > max_size ) {
return true;
Expand All @@ -123,8 +131,8 @@ struct block_size_skipper {
}
}

void apply(SignedTransaction const *t) {
size_t transaction_size = fc::raw::pack_size(*t);
void apply(pending_transaction const *t) {
size_t transaction_size = t->visit(transaction_size_visitor());
current_size += transaction_size;
}
};
Expand All @@ -136,7 +144,7 @@ auto make_skipper(const global_property_object &properties) {
}

block_schedule block_schedule::by_threading_conflicts(
deque<SignedTransaction> const &transactions,
vector<pending_transaction> const &transactions,
const global_property_object &properties
)
{
Expand All @@ -150,7 +158,7 @@ block_schedule block_schedule::by_threading_conflicts(
schedule.reserve(transactions.size());

auto current = initialize_pointer_vector(transactions);
vector<SignedTransaction const *> postponed;
vector<pending_transaction const *> postponed;
postponed.reserve(transactions.size());

vector<uint> txs_per_thread;
Expand All @@ -170,8 +178,9 @@ block_schedule block_schedule::by_threading_conflicts(

auto assigned_to = optional<uint>();
bool postpone = false;
auto scopes = t->visit(scope_extracting_visitor());

for (const auto &a : t->scope) {
for (const auto &a : scopes) {
uint hash_index = account_hasher(a) % HASH_SIZE;
if (assigned_to && assigned_threads[hash_index] && assigned_to != assigned_threads[hash_index]) {
postpone = true;
Expand All @@ -192,7 +201,7 @@ block_schedule block_schedule::by_threading_conflicts(
}

if (txs_per_thread[*assigned_to] < MAX_TXS_PER_THREAD) {
for (const auto &a : t->scope)
for (const auto &a : scopes)
{
uint hash_index = account_hasher(a) % HASH_SIZE;
assigned_threads[hash_index] = assigned_to;
Expand Down Expand Up @@ -220,7 +229,7 @@ block_schedule block_schedule::by_threading_conflicts(
}

block_schedule block_schedule::by_cycling_conflicts(
deque<SignedTransaction> const &transactions,
vector<pending_transaction> const &transactions,
const global_property_object &properties
)
{
Expand All @@ -231,7 +240,7 @@ block_schedule block_schedule::by_cycling_conflicts(
schedule.reserve(transactions.size());

auto current = initialize_pointer_vector(transactions);
vector<SignedTransaction const *> postponed;
vector<pending_transaction const *> postponed;
postponed.reserve(transactions.size());

int cycle = 0;
Expand All @@ -246,8 +255,9 @@ block_schedule block_schedule::by_cycling_conflicts(
continue;
}

auto scopes = t->visit(scope_extracting_visitor());
bool u = false;
for (const auto &a : t->scope) {
for (const auto &a : scopes) {
uint hash_index = account_hasher(a) % HASH_SIZE;
if (used[hash_index]) {
u = true;
Expand All @@ -257,7 +267,7 @@ block_schedule block_schedule::by_cycling_conflicts(
}

if (!u) {
for (const auto &a : t->scope) {
for (const auto &a : scopes) {
uint hash_index = account_hasher(a) % HASH_SIZE;
used[hash_index] = true;
}
Expand Down
93 changes: 57 additions & 36 deletions libraries/chain/chain_controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include <eos/chain/global_property_object.hpp>
#include <eos/chain/key_value_object.hpp>
#include <eos/chain/action_objects.hpp>
#include <eos/chain/generated_transaction_object.hpp>
#include <eos/chain/transaction_object.hpp>
#include <eos/chain/producer_object.hpp>

Expand Down Expand Up @@ -258,7 +259,6 @@ ProcessedTransaction chain_controller::_push_transaction(const SignedTransaction
return pt;
}


signed_block chain_controller::generate_block(
fc::time_point_sec when,
const AccountName& producer,
Expand Down Expand Up @@ -293,7 +293,20 @@ signed_block chain_controller::_generate_block(
if( !(skip & skip_producer_signature) )
FC_ASSERT( producer_obj.signing_key == block_signing_private_key.get_public_key() );

auto schedule = block_schedule::by_threading_conflicts(_pending_transactions, get_global_properties());

auto& generated = _db.get_index<generated_transaction_multi_index, generated_transaction_object::by_trx_id>();

vector<pending_transaction> pending;
pending.reserve(generated.size() + _pending_transactions.size());
for (auto const &gt: generated) {
pending.emplace_back(pending_transaction {&gt.trx});
}

for(auto const &st: _pending_transactions) {
pending.emplace_back(pending_transaction {&st});
}

auto schedule = block_schedule::by_threading_conflicts(pending, get_global_properties());

//
// The following code throws away existing pending_tx_session and
Expand All @@ -309,25 +322,6 @@ signed_block chain_controller::_generate_block(
_pending_tx_session.reset();
_pending_tx_session = _db.start_undo_session(true);

auto process_one = [this](SignedTransaction const *tx, database &db) -> optional<ProcessedTransaction> {
try
{
auto temp_session = db.start_undo_session(true);
auto ptx = _apply_transaction(*tx);
temp_session.squash();

return optional<ProcessedTransaction>(ptx);
}
catch ( const fc::exception& e )
{
// Do nothing, transaction will not be re-applied
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
wlog( "The transaction was ${t}", ("t", *tx) );
}

return optional<ProcessedTransaction>();
};

signed_block pending_block;
pending_block.cycles.reserve(schedule.cycles.size());

Expand All @@ -340,23 +334,42 @@ signed_block chain_controller::_generate_block(

for (const auto &t : c) {
thread block_thread;
block_thread.generated_input.reserve(t.generated_input.size());
for (const auto &trx : t.generated_input) {
#warning TODO: Process generated transaction
}

block_thread.user_input.reserve(t.user_input.size());
for (const auto &trx : t.user_input) {
auto processed = process_one(trx, _db);
if (processed) {
block_thread.user_input.emplace_back(*processed);
valid_transaction_count++;
} else {
invalid_transaction_count++;
}
block_thread.user_input.reserve(t.transactions.size());
block_thread.generated_input.reserve(t.transactions.size());
for (const auto &trx : t.transactions) {
try
{
auto temp_session = _db.start_undo_session(true);
if (trx->contains<SignedTransaction const *>()) {
auto processed = _apply_transaction(*trx->get<SignedTransaction const *>());
block_thread.user_input.emplace_back(processed);
} else if (trx->contains<GeneratedTransaction const *>()) {
#warning TODO: Process generated transaction
// auto processed = _apply_transaction(*trx->get<GeneratedTransaction const *>());
// block_thread.generated_input.emplace_back(processed);
} else {
FC_THROW_EXCEPTION(tx_scheduling_exception, "Unknown transaction type in block_schedule");
}

temp_session.squash();
valid_transaction_count++;
}
catch ( const fc::exception& e )
{
// Do nothing, transaction will not be re-applied
wlog( "Transaction was not processed while generating block due to ${e}", ("e", e) );
if (trx->contains<SignedTransaction const *>()) {
wlog( "The transaction was ${t}", ("t", *trx->get<SignedTransaction const *>()) );
} else if (trx->contains<GeneratedTransaction const *>()) {
wlog( "The transaction was ${t}", ("t", *trx->get<GeneratedTransaction const *>()) );
}
invalid_transaction_count++;
}
}

if (!(block_thread.generated_input.empty() && block_thread.user_input.empty())) {
block_thread.generated_input.shrink_to_fit();
block_thread.user_input.shrink_to_fit();
block_cycle.emplace_back(std::move(block_thread));
}
}
Expand Down Expand Up @@ -473,7 +486,7 @@ void chain_controller::_apply_block(const signed_block& next_block)
#warning TODO: Process generated transaction
}
for(const auto& trx : thread.user_input ) {
apply_transaction(trx);
_apply_transaction(trx);
}
}
}
Expand Down Expand Up @@ -827,6 +840,7 @@ void chain_controller::initialize_indexes() {
_db.add_index<dynamic_global_property_multi_index>();
_db.add_index<block_summary_multi_index>();
_db.add_index<transaction_multi_index>();
_db.add_index<generated_transaction_multi_index>();
_db.add_index<producer_multi_index>();
}

Expand Down Expand Up @@ -1064,6 +1078,13 @@ void chain_controller::clear_expired_transactions()
const auto& dedupe_index = transaction_idx.indices().get<by_expiration>();
while( (!dedupe_index.empty()) && (head_block_time() > dedupe_index.rbegin()->trx.expiration) )
transaction_idx.remove(*dedupe_index.rbegin());

//Look for expired transactions in the pending generated list, and remove them.
//Transactions must have expired by at least two forking windows in order to be removed.
auto& generated_transaction_idx = _db.get_mutable_index<generated_transaction_multi_index>();
const auto& generated_index = generated_transaction_idx.indices().get<generated_transaction_object::by_expiration>();
while( (!generated_index.empty()) && (head_block_time() > generated_index.rbegin()->trx.expiration) )
generated_transaction_idx.remove(*generated_index.rbegin());
} FC_CAPTURE_AND_RETHROW() }

using boost::container::flat_set;
Expand Down
2 changes: 1 addition & 1 deletion libraries/chain/include/eos/chain/block.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ namespace eos { namespace chain {
};

struct thread {
vector<generated_transaction_id_type> generated_input;
vector<ProcessedGeneratedTransaction> generated_input;
vector<ProcessedTransaction> user_input;

digest_type merkle_digest() const;
Expand Down
17 changes: 12 additions & 5 deletions libraries/chain/include/eos/chain/block_schedule.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,10 +26,10 @@
#include <eos/chain/transaction.hpp>

namespace eos { namespace chain {
using pending_transaction = static_variant<SignedTransaction const *, GeneratedTransaction const *>;

struct thread_schedule {
vector<generated_transaction_id_type> generated_input;
vector<SignedTransaction const *> user_input;
vector<pending_transaction const *> transactions;
};

using cycle_schedule = vector<thread_schedule>;
Expand All @@ -49,16 +49,23 @@ namespace eos { namespace chain {
* falling back on cycles
* @return the block scheduler
*/
static block_schedule by_threading_conflicts(deque<SignedTransaction> const &transactions, const global_property_object& properties);
static block_schedule by_threading_conflicts(vector<pending_transaction> const &transactions, const global_property_object& properties);

/**
* A greedy scheduler that attempts uses future cycles to resolve scope contention
* @return the block scheduler
*/
static block_schedule by_cycling_conflicts(deque<SignedTransaction> const &transactions, const global_property_object& properties);
static block_schedule by_cycling_conflicts(vector<pending_transaction> const &transactions, const global_property_object& properties);
};

struct scope_extracting_visitor : public fc::visitor<vector<AccountName> const &> {
template <typename T>
vector<AccountName> const & operator()(const T &trx_p) const {
return trx_p->scope;
}
};

} } // eos::chain

FC_REFLECT(eos::chain::thread_schedule, (generated_input)(user_input) )
FC_REFLECT(eos::chain::thread_schedule, (transactions))
FC_REFLECT(eos::chain::block_schedule, (cycles))
1 change: 1 addition & 0 deletions libraries/chain/include/eos/chain/exceptions.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ namespace eos { namespace chain {
FC_DECLARE_DERIVED_EXCEPTION( tx_missing_scope, eos::chain::transaction_exception, 3030008, "missing required scope" )
FC_DECLARE_DERIVED_EXCEPTION( tx_missing_recipient, eos::chain::transaction_exception, 3030009, "missing required recipient" )
FC_DECLARE_DERIVED_EXCEPTION( checktime_exceeded, eos::chain::transaction_exception, 3030010, "allotted processing time was exceeded" )
FC_DECLARE_DERIVED_EXCEPTION( tx_scheduling_exception, eos::chain::transaction_exception, 3030011, "transaction failed during sheduling" )

FC_DECLARE_DERIVED_EXCEPTION( invalid_pts_address, eos::chain::utility_exception, 3060001, "invalid pts address" )
FC_DECLARE_DERIVED_EXCEPTION( insufficient_feeds, eos::chain::chain_exception, 37006, "insufficient feeds" )
Expand Down
Loading

0 comments on commit d0828b2

Please sign in to comment.