Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tamper detection/values #224

Open
wants to merge 11 commits into
base: trunk
Choose a base branch
from
3 changes: 1 addition & 2 deletions CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -74,5 +74,4 @@ endif()

add_subdirectory(src)
add_subdirectory(tests)
add_subdirectory(tools/bench)
add_subdirectory(tools/shard-seeder)
add_subdirectory(tools)
8 changes: 5 additions & 3 deletions src/uhs/atomizer/atomizer/atomizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -227,8 +227,8 @@ namespace cbdc::atomizer {
auto err_set = std::unordered_set<hash_t, hashing::null>{};
for(size_t offset = 0; offset <= cache_check_range; offset++) {
for(const auto& inp : tx.m_inputs) {
if(m_spent[offset].find(inp) != m_spent[offset].end()) {
err_set.insert(inp);
if(m_spent[offset].find(inp.m_id) != m_spent[offset].end()) {
err_set.insert(inp.m_id);
}
}
}
Expand All @@ -246,6 +246,8 @@ namespace cbdc::atomizer {
// None of the inputs have previously been spent during block heights
// we used attestations from, so spend all the TX inputs in the current
// block height (offset 0).
m_spent[0].insert(tx.m_inputs.begin(), tx.m_inputs.end());
for(const auto& inp : tx.m_inputs) {
m_spent[0].insert(inp.m_id);
}
}
}
2 changes: 1 addition & 1 deletion src/uhs/atomizer/sentinel/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ namespace cbdc::sentinel {
if(inputs_sent.find(j) != inputs_sent.end()) {
continue;
}
if(!config::hash_in_shard_range(range, ctx.m_inputs[i])) {
if(!config::hash_in_shard_range(range, ctx.m_inputs[i].m_id)) {
continue;
}
inputs_sent.insert(j);
Expand Down
54 changes: 54 additions & 0 deletions src/uhs/atomizer/shard/controller.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ namespace cbdc::shard {
t.join();
}
}

if(m_audit_thread.joinable()) {
m_audit_thread.join();
}
}

auto controller::init() -> bool {
Expand All @@ -49,6 +53,13 @@ namespace cbdc::shard {
return false;
}

m_audit_log.open(m_opts.m_shard_audit_logs[m_shard_id],
std::ios::app | std::ios::out);
if(!m_audit_log.good()) {
m_logger->error("Failed to open audit log");
return false;
}

if(!m_archiver_client.init()) {
m_logger->warn("Failed to connect to archiver");
}
Expand Down Expand Up @@ -133,12 +144,17 @@ namespace cbdc::shard {
break;
}

if(blk.m_height <= m_shard.best_block_height()) {
break;
}

// Attempt to catch up to the latest block
for(uint64_t i = m_shard.best_block_height() + 1; i < blk.m_height;
i++) {
const auto past_blk = m_archiver_client.get_block(i);
if(past_blk) {
m_shard.digest_block(past_blk.value());
audit();
} else {
m_logger->info("Waiting for archiver sync");
const auto wait_time = std::chrono::milliseconds(10);
Expand All @@ -148,6 +164,7 @@ namespace cbdc::shard {
}
}
}
audit();

m_logger->info("Digested block", blk.m_height);
return std::nullopt;
Expand Down Expand Up @@ -206,4 +223,41 @@ namespace cbdc::shard {
std::visit(res_handler, res);
}
}

void controller::audit() {
auto height = m_shard.best_block_height();
if(m_opts.m_shard_audit_interval > 0
&& height % m_opts.m_shard_audit_interval != 0) {
return;
}

auto status = m_audit_fut.wait_for(std::chrono::seconds(0));
if(status != std::future_status::ready && m_audit_thread.joinable()) {
m_logger->warn(
"Previous audit not finished, skipping audit for h:",
height);
return;
}

auto snp = m_shard.get_snapshot();
if(m_audit_thread.joinable()) {
m_audit_thread.join(); // blocking here
}

m_audit_finished = decltype(m_audit_finished)();
m_audit_fut = m_audit_finished.get_future();

m_audit_thread = std::thread([this, s = std::move(snp), height]() {
auto maybe_total = m_shard.audit(s);
if(!maybe_total.has_value()) {
m_logger->fatal("Error running audit at height", height);
}
m_audit_log << height << " " << maybe_total.value() << std::endl;
m_logger->info("Audit completed for",
height,
maybe_total.value(),
"coins total");
m_audit_finished.set_value();
});
}
}
7 changes: 7 additions & 0 deletions src/uhs/atomizer/shard/controller.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
#include "util/common/config.hpp"
#include "util/network/connection_manager.hpp"

#include <future>
#include <memory>
#include <secp256k1.h>

Expand Down Expand Up @@ -60,11 +61,17 @@ namespace cbdc::shard {
blocking_queue<network::message_t> m_request_queue;
std::vector<std::thread> m_handler_threads;

std::ofstream m_audit_log;
std::thread m_audit_thread;
std::promise<void> m_audit_finished;
std::future<void> m_audit_fut{m_audit_finished.get_future()};

auto server_handler(cbdc::network::message_t&& pkt)
-> std::optional<cbdc::buffer>;
auto atomizer_handler(cbdc::network::message_t&& pkt)
-> std::optional<cbdc::buffer>;
void request_consumer();
void audit();
};
}

Expand Down
73 changes: 56 additions & 17 deletions src/uhs/atomizer/shard/shard.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

#include "shard.hpp"

#include "uhs/transaction/messages.hpp"

#include <utility>

namespace cbdc::shard {
Expand Down Expand Up @@ -49,7 +51,8 @@ namespace cbdc::shard {
sizeof(this->m_best_block_height));
}

update_snapshot();
auto snp = get_snapshot();
update_snapshot(std::move(snp));

return std::nullopt;
}
Expand All @@ -65,20 +68,20 @@ namespace cbdc::shard {
for(const auto& tx : blk.m_transactions) {
// Add new outputs
for(const auto& out : tx.m_uhs_outputs) {
if(is_output_on_shard(out)) {
std::array<char, sizeof(out)> out_arr{};
std::memcpy(out_arr.data(), out.data(), out.size());
leveldb::Slice OutPointKey(out_arr.data(), out.size());
if(is_output_on_shard(out.m_id)) {
auto out_buf = cbdc::make_buffer(out);
leveldb::Slice OutPointKey(out_buf.c_str(),
out_buf.size());
batch.Put(OutPointKey, leveldb::Slice());
}
}

// Delete spent inputs
for(const auto& inp : tx.m_inputs) {
if(is_output_on_shard(inp)) {
std::array<char, sizeof(inp)> inp_arr{};
std::memcpy(inp_arr.data(), inp.data(), inp.size());
leveldb::Slice OutPointKey(inp_arr.data(), inp.size());
if(is_output_on_shard(inp.m_id)) {
auto inp_buf = cbdc::make_buffer(inp);
leveldb::Slice OutPointKey(inp_buf.c_str(),
inp_buf.size());
batch.Delete(OutPointKey);
}
}
Expand All @@ -97,7 +100,8 @@ namespace cbdc::shard {
// Commit the changes atomically
this->m_db->Write(this->m_write_options, &batch);

update_snapshot();
auto snp = get_snapshot();
update_snapshot(std::move(snp));

return true;
}
Expand Down Expand Up @@ -135,18 +139,17 @@ namespace cbdc::shard {
for(uint64_t i = 0; i < tx.m_inputs.size(); i++) {
const auto& inp = tx.m_inputs[i];
// Only check for inputs/outputs relevant to this shard
if(!is_output_on_shard(inp)) {
if(!is_output_on_shard(inp.m_id)) {
continue;
}

std::array<char, sizeof(inp)> inp_arr{};
std::memcpy(inp_arr.data(), inp.data(), inp.size());
leveldb::Slice OutPointKey(inp_arr.data(), inp.size());
auto inp_buf = cbdc::make_buffer(inp);
leveldb::Slice OutPointKey(inp_buf.c_str(), inp_buf.size());
std::string op;

const auto& res = m_db->Get(read_options, OutPointKey, &op);
if(res.IsNotFound()) {
dne_inputs.push_back(inp);
dne_inputs.push_back(inp.m_id);
} else {
attestations.insert(i);
}
Expand Down Expand Up @@ -174,13 +177,49 @@ namespace cbdc::shard {
return config::hash_in_shard_range(m_prefix_range, uhs_hash);
}

void shard::update_snapshot() {
void shard::update_snapshot(std::shared_ptr<const leveldb::Snapshot> snp) {
std::unique_lock<std::shared_mutex> l(m_snp_mut);
m_snp_height = m_best_block_height;
m_snp = std::shared_ptr<const leveldb::Snapshot>(
m_snp = std::move(snp);
}

auto shard::audit(const std::shared_ptr<const leveldb::Snapshot>& snp)
-> std::optional<uint64_t> {
auto opts = leveldb::ReadOptions();
opts.snapshot = snp.get();
auto it = std::shared_ptr<leveldb::Iterator>(m_db->NewIterator(opts));
it->SeekToFirst();
// Skip best block height key
it->Next();
uint64_t tot{};
for(; it->Valid(); it->Next()) {
auto key = it->key();
auto buf = cbdc::buffer();
buf.extend(key.size());
std::memcpy(buf.data(), key.data(), key.size());
auto maybe_uhs_element
= cbdc::from_buffer<transaction::uhs_element>(buf);
if(!maybe_uhs_element.has_value()) {
return std::nullopt;
}
auto& uhs_element = maybe_uhs_element.value();
if(transaction::calculate_uhs_id(uhs_element.m_data,
uhs_element.m_value)
!= uhs_element.m_id) {
return std::nullopt;
}
tot += uhs_element.m_value;
}

return tot;
}

auto shard::get_snapshot() -> std::shared_ptr<const leveldb::Snapshot> {
auto snp = std::shared_ptr<const leveldb::Snapshot>(
m_db->GetSnapshot(),
[&](const leveldb::Snapshot* p) {
m_db->ReleaseSnapshot(p);
});
return snp;
}
}
18 changes: 16 additions & 2 deletions src/uhs/atomizer/shard/shard.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,11 +65,25 @@ namespace cbdc::shard {
/// \return the best block height.
[[nodiscard]] auto best_block_height() const -> uint64_t;

/// Returns a LevelDB snapshot of the current state of the shard's
/// database.
/// \return LevelDB snapshot.
auto get_snapshot() -> std::shared_ptr<const leveldb::Snapshot>;

/// Audit the supply of coins in this shard's UHS and check UHS IDs
/// match the nested data and value stored in the UHS.
/// \param snp LevelDB snapshot upon which to calculate the audit.
/// \return total value of all UHS elements in the snapshot or
/// std::nullopt if any of the UHS elements do not match their
/// UHS ID.
auto audit(const std::shared_ptr<const leveldb::Snapshot>& snp)
-> std::optional<uint64_t>;

private:
[[nodiscard]] auto is_output_on_shard(const hash_t& uhs_hash) const
-> bool;

void update_snapshot();
void update_snapshot(std::shared_ptr<const leveldb::Snapshot> snp);

std::unique_ptr<leveldb::DB> m_db;
leveldb::ReadOptions m_read_options;
Expand All @@ -81,7 +95,7 @@ namespace cbdc::shard {
uint64_t m_snp_height{};
std::shared_mutex m_snp_mut;

const std::string m_best_block_height_key = "bestBlockHeight";
const std::string m_best_block_height_key;

std::pair<uint8_t, uint8_t> m_prefix_range;
};
Expand Down
10 changes: 5 additions & 5 deletions src/uhs/atomizer/watchtower/block_cache.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@ namespace cbdc::watchtower {
auto& old_blk = m_blks.front();
for(auto& tx : old_blk.m_transactions) {
for(auto& in : tx.m_inputs) {
m_spent_ids.erase(in);
m_spent_ids.erase(in.m_id);
}
for(auto& out : tx.m_uhs_outputs) {
m_unspent_ids.erase(out);
m_unspent_ids.erase(out.m_id);
}
}
m_blks.pop();
Expand All @@ -32,13 +32,13 @@ namespace cbdc::watchtower {
auto blk_height = m_blks.back().m_height;
for(auto& tx : m_blks.back().m_transactions) {
for(auto& in : tx.m_inputs) {
m_unspent_ids.erase(in);
m_unspent_ids.erase(in.m_id);
m_spent_ids.insert(
{{in, std::make_pair(blk_height, tx.m_id)}});
{{in.m_id, std::make_pair(blk_height, tx.m_id)}});
}
for(auto& out : tx.m_uhs_outputs) {
m_unspent_ids.insert(
{{out, std::make_pair(blk_height, tx.m_id)}});
{{out.m_id, std::make_pair(blk_height, tx.m_id)}});
}
}
m_best_blk_height = std::max(m_best_blk_height, blk_height);
Expand Down
4 changes: 2 additions & 2 deletions src/uhs/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,10 @@ add_library(client atomizer_client.cpp
client.cpp
twophase_client.cpp)

target_link_libraries(client bech32)

add_executable(client-cli client-cli.cpp)
target_link_libraries(client-cli client
atomizer
bech32
atomizer
watchtower
sentinel
Expand Down
14 changes: 7 additions & 7 deletions src/uhs/client/atomizer_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,16 +42,16 @@ namespace cbdc {
auto ctx = transaction::compact_tx(tx);
auto [it, success] = tus.insert({ctx.m_id, {}});
assert(success);
it->second.insert(it->second.end(),
ctx.m_inputs.begin(),
ctx.m_inputs.end());
it->second.insert(it->second.end(),
ctx.m_uhs_outputs.begin(),
ctx.m_uhs_outputs.end());
for(auto& inp : ctx.m_inputs) {
it->second.push_back(inp.m_id);
}
for(auto& out : ctx.m_uhs_outputs) {
it->second.push_back(out.m_id);
}
}

for(const auto& [tx_id, in] : pending_inputs()) {
tus.insert({tx_id, {in.hash()}});
tus.insert({tx_id, {in.to_uhs_element().m_id}});
}

cbdc::watchtower::status_update_request req{tus};
Expand Down
Loading