Using unchecked database as a hold for any blocks that aren't in the ledger. This merges gap_cache and the old unchecked database.

This commit is contained in:
clemahieu 2017-06-20 10:07:32 -05:00
commit 9bfb86cb53
9 changed files with 124 additions and 105 deletions

View file

@ -183,14 +183,14 @@ TEST (bootstrap, simple)
rai::send_block block1 (0, 1, 2, rai::keypair ().prv, 4, 5);
rai::transaction transaction (store.environment, nullptr, true);
auto block2 (store.unchecked_get (transaction, block1.previous ()));
ASSERT_EQ (nullptr, block2);
ASSERT_TRUE (block2.empty ());
store.unchecked_put (transaction, block1.previous (), block1);
auto block3 (store.unchecked_get (transaction, block1.previous ()));
ASSERT_NE (nullptr, block3);
ASSERT_EQ (block1, *block3);
store.unchecked_del (transaction, block1.previous ());
ASSERT_FALSE (block3.empty ());
ASSERT_EQ (block1, *block3 [0]);
store.unchecked_del (transaction, block1.previous (), block1);
auto block4 (store.unchecked_get (transaction, block1.previous ()));
ASSERT_EQ (nullptr, block4);
ASSERT_TRUE (block4.empty ());
}
TEST (checksum, simple)

View file

@ -18,7 +18,8 @@ public:
}
std::unique_ptr <rai::block> retrieve (MDB_txn * transaction_a, rai::block_hash const & hash_a) override
{
return store.unchecked_get (transaction_a, hash_a);
//return store.unchecked_get (transaction_a, hash_a);
return nullptr;
}
rai::sync_result target (MDB_txn * transaction_a, rai::block const & block_a) override
{
@ -243,9 +244,9 @@ TEST (pull_synchronization, dependent_fork)
rai::pull_synchronization sync (node0, nullptr);
ASSERT_EQ (rai::sync_result::fork, sync.synchronize (transaction, send3.hash ()));
// Voting will either discard this block or commit it. If it's discarded we don't want to attempt it again
ASSERT_EQ (nullptr, node0.store.unchecked_get (transaction, send1.hash ()));
ASSERT_FALSE (node0.store.unchecked_get (transaction, send1.hash ()).empty ());
// This block will either succeed, if its predecessor is comitted by voting, or will be a gap and will be discarded
ASSERT_NE (nullptr, node0.store.unchecked_get (transaction, send3.hash ()));
ASSERT_FALSE (node0.store.unchecked_get (transaction, send3.hash ()).empty ());
ASSERT_TRUE (node0.active.active (send1));
}
@ -268,7 +269,7 @@ TEST (pull_synchronization, clear_blocks)
{
{
rai::transaction transaction (node1.store.environment, nullptr, false);
done = node1.store.unchecked_get (transaction, send0.hash ()) == nullptr;
done = node1.store.unchecked_get (transaction, send0.hash ()).empty ();
}
++iterations;
ASSERT_GT (200, iterations);

View file

@ -6,8 +6,9 @@ TEST (gap_cache, add_new)
rai::system system (24000, 1);
rai::gap_cache cache (*system.nodes [0]);
rai::send_block block1 (0, 1, 2, rai::keypair ().prv, 4, 5);
cache.add (rai::send_block (block1), block1.previous ());
ASSERT_NE (cache.blocks.end (), cache.blocks.find (block1.previous ()));
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, true);
cache.add (transaction, rai::send_block (block1), block1.previous ());
ASSERT_NE (system.nodes [0]->store.unchecked_end (), system.nodes [0]->store.unchecked_begin (transaction, block1.previous ()));
}
TEST (gap_cache, add_existing)
@ -15,16 +16,16 @@ TEST (gap_cache, add_existing)
rai::system system (24000, 1);
rai::gap_cache cache (*system.nodes [0]);
rai::send_block block1 (0, 1, 2, rai::keypair ().prv, 4, 5);
auto previous (block1.previous ());
cache.add (block1, previous);
auto existing1 (cache.blocks.find (previous));
ASSERT_NE (cache.blocks.end (), existing1);
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, true);
cache.add (transaction, block1, block1.previous ());
auto existing1 (cache.blocks.get <1> ().find (block1.hash ()));
ASSERT_NE (cache.blocks.get <1> ().end (), existing1);
auto arrival (existing1->arrival);
while (arrival == std::chrono::system_clock::now ());
cache.add (block1, previous);
cache.add (transaction, block1, block1.previous ());
ASSERT_EQ (1, cache.blocks.size ());
auto existing2 (cache.blocks.find (previous));
ASSERT_NE (cache.blocks.end (), existing2);
auto existing2 (cache.blocks.get <1> ().find (block1.hash ()));
ASSERT_NE (cache.blocks.get <1> ().end (), existing2);
ASSERT_GT (existing2->arrival, arrival);
}
@ -33,18 +34,17 @@ TEST (gap_cache, comparison)
rai::system system (24000, 1);
rai::gap_cache cache (*system.nodes [0]);
rai::send_block block1 (1, 0, 2, rai::keypair ().prv, 4, 5);
auto previous1 (block1.previous ());
cache.add (rai::send_block (block1), previous1);
auto existing1 (cache.blocks.find (previous1));
ASSERT_NE (cache.blocks.end (), existing1);
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, true);
cache.add (transaction, block1, block1.previous ());
auto existing1 (cache.blocks.get <1> ().find (block1.hash ()));
ASSERT_NE (cache.blocks.get <1> ().end (), existing1);
auto arrival (existing1->arrival);
while (std::chrono::system_clock::now () == arrival);
rai::send_block block3 (0, 42, 1, rai::keypair ().prv, 3, 4);
auto previous2 (block3.previous ());
cache.add (rai::send_block (block3), previous2);
cache.add (transaction, block3, block3.previous ());
ASSERT_EQ (2, cache.blocks.size ());
auto existing2 (cache.blocks.find (previous2));
ASSERT_NE (cache.blocks.end (), existing2);
auto existing2 (cache.blocks.get <1> ().find (block3.hash ()));
ASSERT_NE (cache.blocks.get <1> ().end (), existing2);
ASSERT_GT (existing2->arrival, arrival);
ASSERT_EQ (arrival, cache.blocks.get <1> ().begin ()->arrival);
}

View file

@ -129,15 +129,16 @@ attempt (attempt_a)
std::unique_ptr <rai::block> rai::pull_synchronization::retrieve (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
return node.store.unchecked_get (transaction_a, hash_a);
//return node.store.unchecked_get (transaction_a, hash_a);
return nullptr;
}
rai::sync_result rai::pull_synchronization::target (MDB_txn * transaction_a, rai::block const & block_a)
{
auto result (rai::sync_result::error);
node.process_receive_many (transaction_a, block_a, [this, transaction_a, &result] (rai::process_return result_a, rai::block const & block_a)
/*node.process_receive_many (block_a, [this, &result] (rai::process_return result_a, rai::block const & block_a)
{
this->node.store.unchecked_del (transaction_a, block_a.hash ());
this->node.store.unchecked_del (transaction_a, block_a.hash (), block_a);
switch (result_a.code)
{
case rai::process_result::progress:
@ -176,7 +177,7 @@ rai::sync_result rai::pull_synchronization::target (MDB_txn * transaction_a, rai
BOOST_LOG (log) << boost::str (boost::format ("Error inserting block in bootstrap: %1%") % block_a.hash ().to_string ());
break;
}
});
});*/
return result;
}
@ -739,12 +740,34 @@ void rai::bootstrap_pull_cache::flush (size_t minimum_a)
{
while (!blocks_l.empty ())
{
rai::transaction transaction (attempt.node->store.environment, nullptr, true);
auto count (0);
while (!blocks_l.empty () && count < rai::blocks_per_transaction)
{
auto & front (blocks_l.front ());
attempt.node->store.unchecked_put (transaction, front->hash(), *front);
attempt.node->process_receive_many (*front, [this] (MDB_txn * transaction_a, rai::process_return result_a, rai::block const & block_a)
{
switch (result_a.code)
{
case rai::process_result::progress:
case rai::process_result::old:
break;
case rai::process_result::fork:
{
auto node_l (attempt.node);
auto block (node_l->ledger.forked_block (transaction_a, block_a));
node_l->active.start (transaction_a, *block, [node_l] (rai::block & block_a)
{
node_l->process_confirmed (block_a);
});
attempt.node->network.broadcast_confirm_req (block_a);
attempt.node->network.broadcast_confirm_req (*block);
BOOST_LOG (attempt.node->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a.hash ().to_string () % block->hash ().to_string () % block_a.root ().to_string ());
break;
}
default:
break;
}
});
blocks_l.pop_front ();
++count;
}
@ -910,11 +933,11 @@ void rai::bootstrap_attempt::completed_pull (std::shared_ptr <rai::bootstrap_cli
}
client_a->pull_client.pull = rai::pull_info ();
}
cache.flush (cache.block_count);
if (repool)
{
pool_connection (client_a);
}
cache.flush (cache.block_count);
}
void rai::bootstrap_attempt::completed_pulls (std::shared_ptr <rai::bootstrap_client> client_a)

View file

@ -1243,50 +1243,35 @@ node (node_a)
{
}
void rai::gap_cache::add (rai::block const & block_a, rai::block_hash needed_a)
void rai::gap_cache::add (MDB_txn * transaction_a, rai::block const & block_a, rai::block_hash const & hash_a)
{
auto hash (block_a.hash ());
std::lock_guard <std::mutex> lock (mutex);
auto existing (blocks.get <2>().find (hash));
if (existing != blocks.get <2> ().end ())
auto existing (blocks.get <1>().find (hash));
if (existing != blocks.get <1> ().end ())
{
blocks.get <2> ().modify (existing, [&block_a] (rai::gap_information & info)
blocks.get <1> ().modify (existing, [&block_a] (rai::gap_information & info)
{
info.arrival = std::chrono::system_clock::now ();
});
}
else
{
blocks.insert ({std::chrono::system_clock::now (), needed_a, hash, std::unique_ptr <rai::votes> (new rai::votes (block_a)), block_a.clone ()});
node.store.unchecked_put (transaction_a, hash_a, block_a);
blocks.insert ({std::chrono::system_clock::now (), hash, std::unique_ptr <rai::votes> (new rai::votes (block_a))});
if (blocks.size () > max)
{
blocks.get <1> ().erase (blocks.get <1> ().begin ());
blocks.get <0> ().erase (blocks.get <0> ().begin ());
}
}
}
std::vector <std::unique_ptr <rai::block>> rai::gap_cache::get (rai::block_hash const & hash_a)
{
purge_old ();
std::lock_guard <std::mutex> lock (mutex);
std::vector <std::unique_ptr <rai::block>> result;
for (auto i (blocks.find (hash_a)), n (blocks.end ()); i != n && i->required == hash_a; ++i)
{
blocks.modify (i, [&result] (rai::gap_information & info)
{
result.push_back (std::move (info.block));
});
}
blocks.erase (hash_a);
return result;
}
void rai::gap_cache::vote (rai::vote const & vote_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto hash (vote_a.block->hash ());
auto existing (blocks.get <2> ().find (hash));
if (existing != blocks.get <2> ().end ())
auto existing (blocks.get <1> ().find (hash));
if (existing != blocks.get <1> ().end ())
{
existing->votes->vote (vote_a);
rai::transaction transaction (node.store.environment, nullptr, false);
@ -1366,16 +1351,15 @@ void rai::node::process_receive_republish (std::unique_ptr <rai::block> incoming
{
std::vector <std::tuple <rai::process_return, std::unique_ptr <rai::block>>> completed;
{
rai::transaction transaction (store.environment, nullptr, true);
assert (incoming != nullptr);
process_receive_many (transaction, *incoming, [this, &completed, &transaction] (rai::process_return result_a, rai::block const & block_a)
process_receive_many (*incoming, [this, &completed] (MDB_txn * transaction_a, rai::process_return result_a, rai::block const & block_a)
{
switch (result_a.code)
{
case rai::process_result::progress:
{
auto node_l (shared_from_this ());
active.start (transaction, block_a, [node_l] (rai::block & block_a)
active.start (transaction_a, block_a, [node_l] (rai::block & block_a)
{
node_l->process_confirmed (block_a);
});
@ -1395,26 +1379,41 @@ void rai::node::process_receive_republish (std::unique_ptr <rai::block> incoming
}
}
void rai::node::process_receive_many (rai::block const & block_a, std::function <void (rai::process_return, rai::block const &)> completed_a)
{
rai::transaction transaction (store.environment, nullptr, true);
process_receive_many (transaction, block_a, completed_a);
}
void rai::node::process_receive_many (MDB_txn * transaction_a, rai::block const & block_a, std::function <void (rai::process_return, rai::block const &)> completed_a)
void rai::node::process_receive_many (rai::block const & block_a, std::function <void (MDB_txn *, rai::process_return, rai::block const &)> completed_a)
{
std::vector <std::unique_ptr <rai::block>> blocks;
blocks.push_back (block_a.clone ());
while (!blocks.empty ())
{
auto block (std::move (blocks.back ()));
blocks.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction_a, *block));
completed_a (process_result, *block);
auto cached (gap_cache.get (hash));
blocks.resize (blocks.size () + cached.size ());
std::move (cached.begin (), cached.end (), blocks.end () - cached.size ());
rai::transaction transaction (store.environment, nullptr, true);
auto count (0);
while (!blocks.empty () && count < rai::blocks_per_transaction)
{
auto block (std::move (blocks.back ()));
blocks.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction, *block));
completed_a (transaction, process_result, *block);
switch (process_result.code)
{
case rai::process_result::progress:
case rai::process_result::old:
{
auto cached (store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
store.unchecked_del (transaction, hash, **i);
blocks.push_back (std::move (*i));
}
std::lock_guard <std::mutex> lock (gap_cache.mutex);
gap_cache.blocks.get <1> ().erase (hash);
break;
}
default:
break;
}
++count;
}
}
}
@ -1440,8 +1439,7 @@ rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, rai
{
BOOST_LOG (log) << boost::str (boost::format ("Gap previous for: %1%") % block_a.hash ().to_string ());
}
auto previous (block_a.previous ());
gap_cache.add (block_a, previous);
gap_cache.add (transaction_a, block_a, block_a.previous ());
break;
}
case rai::process_result::gap_source:
@ -1450,8 +1448,7 @@ rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, rai
{
BOOST_LOG (log) << boost::str (boost::format ("Gap source for: %1%") % block_a.hash ().to_string ());
}
auto source (block_a.source ());
gap_cache.add (block_a, source);
gap_cache.add (transaction_a, block_a, block_a.source ());
break;
}
case rai::process_result::old:
@ -2118,6 +2115,7 @@ public:
void rai::node::process_unchecked (std::shared_ptr <rai::bootstrap_attempt> attempt_a)
{
/*
auto block_count (0);
assert (attempt_a == nullptr || bootstrap_initiator.in_progress ());
static std::atomic_flag unchecked_in_progress = ATOMIC_FLAG_INIT;
@ -2153,6 +2151,7 @@ void rai::node::process_unchecked (std::shared_ptr <rai::bootstrap_attempt> atte
unchecked_in_progress.clear ();
wallets.search_pending_all ();
}
*/
}
void rai::node::process_confirmed (rai::block const & confirmed_a)

View file

@ -123,17 +123,14 @@ class gap_information
{
public:
std::chrono::system_clock::time_point arrival;
rai::block_hash required;
rai::block_hash hash;
std::unique_ptr <rai::votes> votes;
std::unique_ptr <rai::block> block;
};
class gap_cache
{
public:
gap_cache (rai::node &);
void add (rai::block const &, rai::block_hash);
std::vector <std::unique_ptr <rai::block>> get (rai::block_hash const &);
void add (MDB_txn *, rai::block const &, rai::block_hash const &);
void vote (rai::vote const &);
rai::uint128_t bootstrap_threshold (MDB_txn *);
void purge_old ();
@ -142,7 +139,6 @@ public:
rai::gap_information,
boost::multi_index::indexed_by
<
boost::multi_index::hashed_non_unique <boost::multi_index::member <gap_information, rai::block_hash, &gap_information::required>>,
boost::multi_index::ordered_non_unique <boost::multi_index::member <gap_information, std::chrono::system_clock::time_point, &gap_information::arrival>>,
boost::multi_index::hashed_unique <boost::multi_index::member <gap_information, rai::block_hash, &gap_information::hash>>
>
@ -421,8 +417,7 @@ public:
void process_confirmed (rai::block const &);
void process_message (rai::message &, rai::endpoint const &);
void process_receive_republish (std::unique_ptr <rai::block>);
void process_receive_many (rai::block const &, std::function <void (rai::process_return, rai::block const &)> = [] (rai::process_return, rai::block const &) {});
void process_receive_many (MDB_txn *, rai::block const &, std::function <void (rai::process_return, rai::block const &)> = [] (rai::process_return, rai::block const &) {});
void process_receive_many (rai::block const &, std::function <void (MDB_txn *, rai::process_return, rai::block const &)> = [] (MDB_txn *, rai::process_return, rai::block const &) {});
rai::process_return process_receive_one (MDB_txn *, rai::block const &);
rai::process_return process (rai::block const &);
void keepalive_preconfigured (std::vector <std::string> const &);

View file

@ -631,7 +631,7 @@ std::string rai_qt::status::text ()
}
result += ", Block: ";
if (unchecked != 0)
if (unchecked != 0 && wallet.wallet_m->node.bootstrap_initiator.in_progress ())
{
count_string += " (" + std::to_string (unchecked) + ")";
}
@ -1355,7 +1355,7 @@ void rai_qt::advanced_actions::refresh_count ()
auto size (wallet.wallet_m->node.store.block_count (transaction));
auto unchecked (wallet.wallet_m->node.store.unchecked_count (transaction));
auto count_string (std::to_string (size.sum ()));
if (unchecked != 0)
if (unchecked != 0 && wallet.wallet_m->node.bootstrap_initiator.in_progress ())
{
count_string += " (" + std::to_string (unchecked) + ")";
}

View file

@ -1542,7 +1542,7 @@ checksum (0)
error_a |= mdb_dbi_open (transaction, "change", MDB_CREATE, &change_blocks) != 0;
error_a |= mdb_dbi_open (transaction, "pending", MDB_CREATE, &pending) != 0;
error_a |= mdb_dbi_open (transaction, "representation", MDB_CREATE, &representation) != 0;
error_a |= mdb_dbi_open (transaction, "unchecked", MDB_CREATE, &unchecked) != 0;
error_a |= mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &unchecked) != 0;
error_a |= mdb_dbi_open (transaction, "unsynced", MDB_CREATE, &unsynced) != 0;
error_a |= mdb_dbi_open (transaction, "checksum", MDB_CREATE, &checksum) != 0;
error_a |= mdb_dbi_open (transaction, "sequence", MDB_CREATE, &sequence) != 0;
@ -2326,24 +2326,25 @@ void rai::block_store::unchecked_put (MDB_txn * transaction_a, rai::block_hash c
assert (status == 0);
}
std::unique_ptr <rai::block> rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a)
std::vector <std::unique_ptr <rai::block>> rai::block_store::unchecked_get (MDB_txn * transaction_a, rai::block_hash const & hash_a)
{
MDB_val value;
auto status (mdb_get (transaction_a, unchecked, hash_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
std::unique_ptr <rai::block> result;
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
result = rai::deserialize_block (stream);
assert (result != nullptr);
}
return result;
std::vector <std::unique_ptr <rai::block>> result;
for (auto i (unchecked_begin (transaction_a, hash_a)), n (unchecked_end ()); i != n && rai::block_hash (i->first) == hash_a; ++i)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (i->second.mv_data), i->second.mv_size);
result.push_back (rai::deserialize_block (stream));
}
return result;
}
void rai::block_store::unchecked_del (MDB_txn * transaction_a, rai::block_hash const & hash_a)
void rai::block_store::unchecked_del (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::block const & block_a)
{
auto status (mdb_del (transaction_a, unchecked, hash_a.val (), nullptr));
std::vector <uint8_t> vector;
{
rai::vectorstream stream (vector);
rai::serialize_block (stream, block_a);
}
auto status (mdb_del (transaction_a, unchecked, hash_a.val (), rai::mdb_val (vector.size (), vector.data ())));
assert (status == 0 || status == MDB_NOTFOUND);
}

View file

@ -365,8 +365,8 @@ public:
void unchecked_clear (MDB_txn *);
void unchecked_put (MDB_txn *, rai::block_hash const &, rai::block const &);
std::unique_ptr <rai::block> unchecked_get (MDB_txn *, rai::block_hash const &);
void unchecked_del (MDB_txn *, rai::block_hash const &);
std::vector <std::unique_ptr <rai::block>> unchecked_get (MDB_txn *, rai::block_hash const &);
void unchecked_del (MDB_txn *, rai::block_hash const &, rai::block const &);
rai::store_iterator unchecked_begin (MDB_txn *);
rai::store_iterator unchecked_begin (MDB_txn *, rai::block_hash const &);
rai::store_iterator unchecked_end ();