Merge branch 'pulls/841'
This commit is contained in:
commit
bb5b20f78e
7 changed files with 115 additions and 319 deletions
|
@ -251,7 +251,6 @@ pending (0),
|
|||
blocks_info (0),
|
||||
representation (0),
|
||||
unchecked (0),
|
||||
unsynced (0),
|
||||
checksum (0)
|
||||
{
|
||||
if (!error_a)
|
||||
|
@ -268,7 +267,6 @@ checksum (0)
|
|||
error_a |= mdb_dbi_open (transaction, "blocks_info", MDB_CREATE, &blocks_info) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "representation", MDB_CREATE, &representation) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "unchecked", MDB_CREATE | MDB_DUPSORT, &unchecked) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "unsynced", MDB_CREATE, &unsynced) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "checksum", MDB_CREATE, &checksum) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "vote", MDB_CREATE, &vote) != 0;
|
||||
error_a |= mdb_dbi_open (transaction, "meta", MDB_CREATE, &meta) != 0;
|
||||
|
@ -330,6 +328,8 @@ void rai::block_store::do_upgrades (MDB_txn * transaction_a)
|
|||
case 9:
|
||||
upgrade_v9_to_v10 (transaction_a);
|
||||
case 10:
|
||||
upgrade_v10_to_v11 (transaction_a);
|
||||
case 11:
|
||||
break;
|
||||
default:
|
||||
assert (false);
|
||||
|
@ -524,6 +524,13 @@ void rai::block_store::upgrade_v9_to_v10 (MDB_txn * transaction_a)
|
|||
//std::cerr << boost::str (boost::format ("Database upgrade is completed\n"));
|
||||
}
|
||||
|
||||
void rai::block_store::upgrade_v10_to_v11 (MDB_txn * transaction_a)
|
||||
{
|
||||
MDB_dbi unsynced;
|
||||
mdb_dbi_open (transaction_a, "unsynced", MDB_CREATE | MDB_DUPSORT, &unsynced);
|
||||
mdb_drop (transaction_a, unsynced, 1);
|
||||
}
|
||||
|
||||
void rai::block_store::clear (MDB_dbi db_a)
|
||||
{
|
||||
rai::transaction transaction (environment, nullptr, true);
|
||||
|
@ -1117,39 +1124,6 @@ size_t rai::block_store::unchecked_count (MDB_txn * transaction_a)
|
|||
return result;
|
||||
}
|
||||
|
||||
void rai::block_store::unsynced_put (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
auto status (mdb_put (transaction_a, unsynced, rai::mdb_val (hash_a), rai::mdb_val (0, nullptr), 0));
|
||||
assert (status == 0);
|
||||
}
|
||||
|
||||
void rai::block_store::unsynced_del (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
auto status (mdb_del (transaction_a, unsynced, rai::mdb_val (hash_a), nullptr));
|
||||
assert (status == 0);
|
||||
}
|
||||
|
||||
bool rai::block_store::unsynced_exists (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
auto iterator (unsynced_begin (transaction_a, hash_a));
|
||||
return iterator != rai::store_iterator (nullptr) && rai::block_hash (iterator->first.uint256 ()) == hash_a;
|
||||
}
|
||||
|
||||
rai::store_iterator rai::block_store::unsynced_begin (MDB_txn * transaction_a)
|
||||
{
|
||||
return rai::store_iterator (transaction_a, unsynced);
|
||||
}
|
||||
|
||||
rai::store_iterator rai::block_store::unsynced_begin (MDB_txn * transaction_a, rai::uint256_union const & val_a)
|
||||
{
|
||||
return rai::store_iterator (transaction_a, unsynced, rai::mdb_val (val_a));
|
||||
}
|
||||
|
||||
rai::store_iterator rai::block_store::unsynced_end ()
|
||||
{
|
||||
return rai::store_iterator (nullptr);
|
||||
}
|
||||
|
||||
void rai::block_store::checksum_put (MDB_txn * transaction_a, uint64_t prefix, uint8_t mask, rai::uint256_union const & hash_a)
|
||||
{
|
||||
assert ((prefix & 0xff) == 0);
|
||||
|
|
|
@ -108,13 +108,6 @@ public:
|
|||
size_t unchecked_count (MDB_txn *);
|
||||
std::unordered_multimap<rai::block_hash, std::shared_ptr<rai::block>> unchecked_cache;
|
||||
|
||||
void unsynced_put (MDB_txn *, rai::block_hash const &);
|
||||
void unsynced_del (MDB_txn *, rai::block_hash const &);
|
||||
bool unsynced_exists (MDB_txn *, rai::block_hash const &);
|
||||
rai::store_iterator unsynced_begin (MDB_txn *, rai::block_hash const &);
|
||||
rai::store_iterator unsynced_begin (MDB_txn *);
|
||||
rai::store_iterator unsynced_end ();
|
||||
|
||||
void checksum_put (MDB_txn *, uint64_t, uint8_t, rai::checksum const &);
|
||||
bool checksum_get (MDB_txn *, uint64_t, uint8_t, rai::checksum &);
|
||||
void checksum_del (MDB_txn *, uint64_t, uint8_t);
|
||||
|
@ -145,6 +138,7 @@ public:
|
|||
void upgrade_v7_to_v8 (MDB_txn *);
|
||||
void upgrade_v8_to_v9 (MDB_txn *);
|
||||
void upgrade_v9_to_v10 (MDB_txn *);
|
||||
void upgrade_v10_to_v11 (MDB_txn *);
|
||||
|
||||
void clear (MDB_dbi);
|
||||
|
||||
|
@ -216,12 +210,6 @@ public:
|
|||
*/
|
||||
MDB_dbi unchecked;
|
||||
|
||||
/**
|
||||
* Blocks that haven't been broadcast.
|
||||
* rai::block_hash -> (no value)
|
||||
*/
|
||||
MDB_dbi unsynced;
|
||||
|
||||
/**
|
||||
* Mapping of region to checksum.
|
||||
* (uint56_t, uint8_t) -> rai::block_hash
|
||||
|
|
|
@ -480,45 +480,6 @@ TEST (block_store, latest_exists)
|
|||
ASSERT_FALSE (store.account_exists (transaction, one));
|
||||
}
|
||||
|
||||
TEST (block_store, unsynced)
|
||||
{
|
||||
bool init (false);
|
||||
rai::block_store store (init, rai::unique_path ());
|
||||
ASSERT_TRUE (!init);
|
||||
rai::transaction transaction (store.environment, nullptr, true);
|
||||
ASSERT_EQ (store.unsynced_end (), store.unsynced_begin (transaction));
|
||||
rai::block_hash hash1 (0);
|
||||
ASSERT_FALSE (store.unsynced_exists (transaction, hash1));
|
||||
store.unsynced_put (transaction, hash1);
|
||||
ASSERT_TRUE (store.unsynced_exists (transaction, hash1));
|
||||
ASSERT_NE (store.unsynced_end (), store.unsynced_begin (transaction));
|
||||
ASSERT_EQ (hash1, rai::uint256_union (store.unsynced_begin (transaction)->first.uint256 ()));
|
||||
store.unsynced_del (transaction, hash1);
|
||||
ASSERT_FALSE (store.unsynced_exists (transaction, hash1));
|
||||
ASSERT_EQ (store.unsynced_end (), store.unsynced_begin (transaction));
|
||||
}
|
||||
|
||||
TEST (block_store, unsynced_iteration)
|
||||
{
|
||||
bool init (false);
|
||||
rai::block_store store (init, rai::unique_path ());
|
||||
ASSERT_TRUE (!init);
|
||||
rai::transaction transaction (store.environment, nullptr, true);
|
||||
ASSERT_EQ (store.unsynced_end (), store.unsynced_begin (transaction));
|
||||
rai::block_hash hash1 (1);
|
||||
store.unsynced_put (transaction, hash1);
|
||||
rai::block_hash hash2 (2);
|
||||
store.unsynced_put (transaction, hash2);
|
||||
std::unordered_set<rai::block_hash> hashes;
|
||||
for (auto i (store.unsynced_begin (transaction)), n (store.unsynced_end ()); i != n; ++i)
|
||||
{
|
||||
hashes.insert (rai::uint256_union (i->first.uint256 ()));
|
||||
}
|
||||
ASSERT_EQ (2, hashes.size ());
|
||||
ASSERT_TRUE (hashes.find (hash1) != hashes.end ());
|
||||
ASSERT_TRUE (hashes.find (hash2) != hashes.end ());
|
||||
}
|
||||
|
||||
TEST (block_store, large_iteration)
|
||||
{
|
||||
bool init (false);
|
||||
|
|
|
@ -1228,6 +1228,37 @@ TEST (node, DISABLED_bootstrap_no_publish)
|
|||
}
|
||||
}
|
||||
|
||||
// Check that an outgoing bootstrap request can push blocks
|
||||
TEST (node, bootstrap_bulk_push)
|
||||
{
|
||||
rai::system system0 (24000, 1);
|
||||
rai::system system1 (24001, 1);
|
||||
auto node0 (system0.nodes[0]);
|
||||
auto node1 (system1.nodes[0]);
|
||||
rai::keypair key0;
|
||||
// node0 knows about send0 but node1 doesn't.
|
||||
rai::send_block send0 (system0.nodes[0]->latest (rai::test_genesis_key.pub), key0.pub, 500, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0);
|
||||
node0->generate_work (send0);
|
||||
{
|
||||
rai::transaction transaction (node0->store.environment, nullptr, true);
|
||||
ASSERT_EQ (rai::process_result::progress, system0.nodes[0]->ledger.process (transaction, send0).code);
|
||||
}
|
||||
ASSERT_FALSE (node0->bootstrap_initiator.in_progress ());
|
||||
ASSERT_FALSE (node1->bootstrap_initiator.in_progress ());
|
||||
ASSERT_TRUE (node1->active.roots.empty ());
|
||||
node0->bootstrap_initiator.bootstrap (node1->network.endpoint (), false);
|
||||
auto iterations1 (0);
|
||||
while (node1->block (send0.hash ()) == nullptr)
|
||||
{
|
||||
system0.poll ();
|
||||
system1.poll ();
|
||||
++iterations1;
|
||||
ASSERT_GT (200, iterations1);
|
||||
}
|
||||
// since this uses bulk_push, the new block should be republished
|
||||
ASSERT_FALSE (node1->active.roots.empty ());
|
||||
}
|
||||
|
||||
// Bootstrapping a forked open block should succeed.
|
||||
TEST (node, bootstrap_fork_open)
|
||||
{
|
||||
|
|
|
@ -76,7 +76,7 @@ TEST (wallets, wallet_create_max)
|
|||
rai::system system (24000, 1);
|
||||
bool error (false);
|
||||
rai::wallets wallets (error, *system.nodes[0]);
|
||||
const int nonWalletDbs = 17;
|
||||
const int nonWalletDbs = 16;
|
||||
for (int i = 0; i < system.nodes[0]->config.lmdb_max_dbs - nonWalletDbs; i++)
|
||||
{
|
||||
rai::keypair key;
|
||||
|
|
|
@ -12,6 +12,7 @@ constexpr double bootstrap_minimum_frontier_blocks_per_sec = 1000.0;
|
|||
constexpr unsigned bootstrap_frontier_retry_limit = 16;
|
||||
constexpr double bootstrap_minimum_termination_time_sec = 30.0;
|
||||
constexpr unsigned bootstrap_max_new_connections = 10;
|
||||
constexpr unsigned bulk_push_cost_limit = 200;
|
||||
|
||||
rai::socket_timeout::socket_timeout (rai::bootstrap_client & client_a) :
|
||||
ticket (0),
|
||||
|
@ -43,159 +44,6 @@ void rai::socket_timeout::stop ()
|
|||
++ticket;
|
||||
}
|
||||
|
||||
rai::block_synchronization::block_synchronization (boost::log::sources::logger_mt & log_a) :
|
||||
log (log_a)
|
||||
{
|
||||
}
|
||||
|
||||
namespace
|
||||
{
|
||||
class add_dependency_visitor : public rai::block_visitor
|
||||
{
|
||||
public:
|
||||
add_dependency_visitor (MDB_txn * transaction_a, rai::block_synchronization & sync_a) :
|
||||
transaction (transaction_a),
|
||||
sync (sync_a),
|
||||
complete (true)
|
||||
{
|
||||
}
|
||||
virtual ~add_dependency_visitor ()
|
||||
{
|
||||
}
|
||||
void send_block (rai::send_block const & block_a) override
|
||||
{
|
||||
add_dependency (block_a.hashables.previous);
|
||||
}
|
||||
void receive_block (rai::receive_block const & block_a) override
|
||||
{
|
||||
add_dependency (block_a.hashables.previous);
|
||||
if (complete)
|
||||
{
|
||||
add_dependency (block_a.hashables.source);
|
||||
}
|
||||
}
|
||||
void open_block (rai::open_block const & block_a) override
|
||||
{
|
||||
add_dependency (block_a.hashables.source);
|
||||
}
|
||||
void change_block (rai::change_block const & block_a) override
|
||||
{
|
||||
add_dependency (block_a.hashables.previous);
|
||||
}
|
||||
void state_block (rai::state_block const & block_a) override
|
||||
{
|
||||
if (!block_a.hashables.previous.is_zero ())
|
||||
{
|
||||
add_dependency (block_a.hashables.previous);
|
||||
}
|
||||
if (complete)
|
||||
{
|
||||
// Might not be a dependency block (if this is a send) but that's okay
|
||||
add_dependency (block_a.hashables.link);
|
||||
}
|
||||
}
|
||||
void add_dependency (rai::block_hash const & hash_a)
|
||||
{
|
||||
if (!sync.synchronized (transaction, hash_a) && sync.retrieve (transaction, hash_a) != nullptr)
|
||||
{
|
||||
complete = false;
|
||||
sync.blocks.push_back (hash_a);
|
||||
}
|
||||
else
|
||||
{
|
||||
// Block is already synchronized, normal
|
||||
}
|
||||
}
|
||||
MDB_txn * transaction;
|
||||
rai::block_synchronization & sync;
|
||||
bool complete;
|
||||
};
|
||||
}
|
||||
|
||||
bool rai::block_synchronization::add_dependency (MDB_txn * transaction_a, rai::block const & block_a)
|
||||
{
|
||||
add_dependency_visitor visitor (transaction_a, *this);
|
||||
block_a.visit (visitor);
|
||||
return visitor.complete;
|
||||
}
|
||||
|
||||
void rai::block_synchronization::fill_dependencies (MDB_txn * transaction_a)
|
||||
{
|
||||
auto done (false);
|
||||
while (!done)
|
||||
{
|
||||
auto hash (blocks.back ());
|
||||
auto block (retrieve (transaction_a, hash));
|
||||
if (block != nullptr)
|
||||
{
|
||||
done = add_dependency (transaction_a, *block);
|
||||
}
|
||||
else
|
||||
{
|
||||
done = true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
rai::sync_result rai::block_synchronization::synchronize_one (MDB_txn * transaction_a)
|
||||
{
|
||||
// Blocks that depend on multiple paths e.g. receive_blocks, need to have their dependencies recalculated each time
|
||||
fill_dependencies (transaction_a);
|
||||
rai::sync_result result (rai::sync_result::success);
|
||||
auto hash (blocks.back ());
|
||||
blocks.pop_back ();
|
||||
auto block (retrieve (transaction_a, hash));
|
||||
if (block != nullptr)
|
||||
{
|
||||
result = target (transaction_a, *block);
|
||||
}
|
||||
else
|
||||
{
|
||||
// A block that can be the dependency of more than one other block, e.g. send blocks, can be added to the dependency list more than once. Subsequent retrievals won't find the block but this isn't an error
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
rai::sync_result rai::block_synchronization::synchronize (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
auto result (rai::sync_result::success);
|
||||
blocks.clear ();
|
||||
blocks.push_back (hash_a);
|
||||
auto cutoff (std::chrono::steady_clock::now () + rai::transaction_timeout);
|
||||
while (std::chrono::steady_clock::now () < cutoff && result != rai::sync_result::fork && !blocks.empty ())
|
||||
{
|
||||
result = synchronize_one (transaction_a);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
rai::push_synchronization::push_synchronization (rai::node & node_a, std::function<rai::sync_result (MDB_txn *, rai::block const &)> const & target_a) :
|
||||
block_synchronization (node_a.log),
|
||||
target_m (target_a),
|
||||
node (node_a)
|
||||
{
|
||||
}
|
||||
|
||||
bool rai::push_synchronization::synchronized (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
auto result (!node.store.unsynced_exists (transaction_a, hash_a));
|
||||
if (!result)
|
||||
{
|
||||
node.store.unsynced_del (transaction_a, hash_a);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
std::unique_ptr<rai::block> rai::push_synchronization::retrieve (MDB_txn * transaction_a, rai::block_hash const & hash_a)
|
||||
{
|
||||
return node.store.block_get (transaction_a, hash_a);
|
||||
}
|
||||
|
||||
rai::sync_result rai::push_synchronization::target (MDB_txn * transaction_a, rai::block const & block_a)
|
||||
{
|
||||
return target_m (transaction_a, block_a);
|
||||
}
|
||||
|
||||
rai::bootstrap_client::bootstrap_client (std::shared_ptr<rai::node> node_a, std::shared_ptr<rai::bootstrap_attempt> attempt_a, rai::tcp_endpoint const & endpoint_a) :
|
||||
node (node_a),
|
||||
attempt (attempt_a),
|
||||
|
@ -317,7 +165,8 @@ std::shared_ptr<rai::bootstrap_client> rai::bootstrap_client::shared ()
|
|||
rai::frontier_req_client::frontier_req_client (std::shared_ptr<rai::bootstrap_client> connection_a) :
|
||||
connection (connection_a),
|
||||
current (0),
|
||||
count (0)
|
||||
count (0),
|
||||
bulk_push_cost (0)
|
||||
{
|
||||
rai::transaction transaction (connection->node->store.environment, nullptr, false);
|
||||
next (transaction);
|
||||
|
@ -351,14 +200,19 @@ void rai::frontier_req_client::receive_frontier ()
|
|||
});
|
||||
}
|
||||
|
||||
void rai::frontier_req_client::unsynced (MDB_txn * transaction_a, rai::block_hash const & ours_a, rai::block_hash const & theirs_a)
|
||||
void rai::frontier_req_client::unsynced (MDB_txn * transaction_a, rai::block_hash const & head, rai::block_hash const & end)
|
||||
{
|
||||
auto current (ours_a);
|
||||
while (!current.is_zero () && current != theirs_a)
|
||||
if (bulk_push_cost < bulk_push_cost_limit)
|
||||
{
|
||||
connection->node->store.unsynced_put (transaction_a, current);
|
||||
auto block (connection->node->store.block_get (transaction_a, current));
|
||||
current = block->previous ();
|
||||
connection->attempt->add_bulk_push_target (head, end);
|
||||
if (end.is_zero ())
|
||||
{
|
||||
bulk_push_cost += 2;
|
||||
}
|
||||
else
|
||||
{
|
||||
bulk_push_cost += 1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -399,10 +253,7 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
|
|||
{
|
||||
// We know about an account they don't.
|
||||
rai::transaction transaction (connection->node->store.environment, nullptr, true);
|
||||
if (connection->node->wallets.exists (transaction, current))
|
||||
{
|
||||
unsynced (transaction, info.head, 0);
|
||||
}
|
||||
unsynced (transaction, info.head, 0);
|
||||
next (transaction);
|
||||
}
|
||||
if (!current.is_zero ())
|
||||
|
@ -419,14 +270,14 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
|
|||
if (connection->node->store.block_exists (transaction, latest))
|
||||
{
|
||||
// We know about a block they don't.
|
||||
if (connection->node->wallets.exists (transaction, current))
|
||||
{
|
||||
unsynced (transaction, info.head, latest);
|
||||
}
|
||||
unsynced (transaction, info.head, latest);
|
||||
}
|
||||
else
|
||||
{
|
||||
connection->attempt->add_pull (rai::pull_info (account, latest, info.head));
|
||||
// Either we're behind or there's a fork we differ on
|
||||
// Either way, bulk pushing will probably not be effective
|
||||
bulk_push_cost += 5;
|
||||
}
|
||||
}
|
||||
next (transaction);
|
||||
|
@ -450,13 +301,14 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
|
|||
while (!current.is_zero ())
|
||||
{
|
||||
// We know about an account they don't.
|
||||
if (connection->node->wallets.exists (transaction, current))
|
||||
{
|
||||
unsynced (transaction, info.head, 0);
|
||||
}
|
||||
unsynced (transaction, info.head, 0);
|
||||
next (transaction);
|
||||
}
|
||||
}
|
||||
if (connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
BOOST_LOG (connection->node->log) << "Bulk push cost: " << bulk_push_cost;
|
||||
}
|
||||
{
|
||||
try
|
||||
{
|
||||
|
@ -696,11 +548,7 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
|
|||
}
|
||||
|
||||
rai::bulk_push_client::bulk_push_client (std::shared_ptr<rai::bootstrap_client> const & connection_a) :
|
||||
connection (connection_a),
|
||||
synchronization (*connection->node, [this](MDB_txn * transaction_a, rai::block const & block_a) {
|
||||
push_block (block_a);
|
||||
return rai::sync_result::success;
|
||||
})
|
||||
connection (connection_a)
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -720,7 +568,7 @@ void rai::bulk_push_client::start ()
|
|||
connection->start_timeout ();
|
||||
boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer](boost::system::error_code const & ec, size_t size_a) {
|
||||
this_l->connection->stop_timeout ();
|
||||
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true);
|
||||
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, false);
|
||||
if (!ec)
|
||||
{
|
||||
this_l->push (transaction);
|
||||
|
@ -737,32 +585,48 @@ void rai::bulk_push_client::start ()
|
|||
|
||||
void rai::bulk_push_client::push (MDB_txn * transaction_a)
|
||||
{
|
||||
auto finished (false);
|
||||
std::unique_ptr<rai::block> block;
|
||||
bool finished (false);
|
||||
while (block == nullptr && !finished)
|
||||
{
|
||||
auto first (connection->node->store.unsynced_begin (transaction_a));
|
||||
if (first != rai::store_iterator (nullptr))
|
||||
if (current_target.first.is_zero () || current_target.first == current_target.second)
|
||||
{
|
||||
rai::block_hash hash (first->first.uint256 ());
|
||||
if (!hash.is_zero ())
|
||||
std::lock_guard<std::mutex> guard (connection->attempt->mutex);
|
||||
if (!connection->attempt->bulk_push_targets.empty ())
|
||||
{
|
||||
connection->node->store.unsynced_del (transaction_a, hash);
|
||||
synchronization.blocks.push_back (hash);
|
||||
synchronization.synchronize_one (transaction_a);
|
||||
current_target = connection->attempt->bulk_push_targets.back ();
|
||||
connection->attempt->bulk_push_targets.pop_back ();
|
||||
}
|
||||
else
|
||||
{
|
||||
finished = true;
|
||||
}
|
||||
}
|
||||
else
|
||||
if (!finished)
|
||||
{
|
||||
finished = true;
|
||||
block = connection->node->store.block_get (transaction_a, current_target.first);
|
||||
if (block == nullptr)
|
||||
{
|
||||
current_target.first = rai::block_hash (0);
|
||||
}
|
||||
else
|
||||
{
|
||||
if (connection->node->config.logging.bulk_pull_logging ())
|
||||
{
|
||||
BOOST_LOG (connection->node->log) << "Bulk pushing range " << current_target.first.to_string () << " down to " << current_target.second.to_string ();
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
if (finished)
|
||||
{
|
||||
send_finished ();
|
||||
}
|
||||
else
|
||||
{
|
||||
current_target.first = block->previous ();
|
||||
push_block (*block);
|
||||
}
|
||||
}
|
||||
|
||||
void rai::bulk_push_client::send_finished ()
|
||||
|
@ -798,15 +662,8 @@ void rai::bulk_push_client::push_block (rai::block const & block_a)
|
|||
this_l->connection->stop_timeout ();
|
||||
if (!ec)
|
||||
{
|
||||
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, true);
|
||||
if (!this_l->synchronization.blocks.empty ())
|
||||
{
|
||||
this_l->synchronization.synchronize_one (transaction);
|
||||
}
|
||||
else
|
||||
{
|
||||
this_l->push (transaction);
|
||||
}
|
||||
rai::transaction transaction (this_l->connection->node->store.environment, nullptr, false);
|
||||
this_l->push (transaction);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
@ -1171,7 +1028,7 @@ void rai::bootstrap_attempt::populate_connections ()
|
|||
std::lock_guard<std::mutex> lock (mutex);
|
||||
clients.push_back (client);
|
||||
}
|
||||
else
|
||||
else if (connections == 0)
|
||||
{
|
||||
BOOST_LOG (node->log) << boost::str (boost::format ("Bootstrap stopped because there are no peers"));
|
||||
stopped = true;
|
||||
|
@ -1280,6 +1137,12 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a)
|
|||
}
|
||||
}
|
||||
|
||||
void rai::bootstrap_attempt::add_bulk_push_target (rai::block_hash const & head, rai::block_hash const & end)
|
||||
{
|
||||
std::lock_guard<std::mutex> lock (mutex);
|
||||
bulk_push_targets.push_back (std::make_pair (head, end));
|
||||
}
|
||||
|
||||
rai::bootstrap_initiator::bootstrap_initiator (rai::node & node_a) :
|
||||
node (node_a),
|
||||
stopped (false),
|
||||
|
@ -1304,9 +1167,12 @@ void rai::bootstrap_initiator::bootstrap ()
|
|||
}
|
||||
}
|
||||
|
||||
void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a)
|
||||
void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a, bool add_to_peers)
|
||||
{
|
||||
node.peers.insert (endpoint_a, rai::protocol_version);
|
||||
if (add_to_peers)
|
||||
{
|
||||
node.peers.insert (endpoint_a, rai::protocol_version);
|
||||
}
|
||||
std::unique_lock<std::mutex> lock (mutex);
|
||||
if (!stopped)
|
||||
{
|
||||
|
|
|
@ -41,34 +41,6 @@ private:
|
|||
*/
|
||||
static const int bootstrap_message_header_size = sizeof (rai::message::magic_number) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (uint8_t) + sizeof (rai::message_type) + 2;
|
||||
|
||||
class block_synchronization
|
||||
{
|
||||
public:
|
||||
block_synchronization (boost::log::sources::logger_mt &);
|
||||
virtual ~block_synchronization () = default;
|
||||
// Return true if target already has block
|
||||
virtual bool synchronized (MDB_txn *, rai::block_hash const &) = 0;
|
||||
virtual std::unique_ptr<rai::block> retrieve (MDB_txn *, rai::block_hash const &) = 0;
|
||||
virtual rai::sync_result target (MDB_txn *, rai::block const &) = 0;
|
||||
// return true if all dependencies are synchronized
|
||||
bool add_dependency (MDB_txn *, rai::block const &);
|
||||
void fill_dependencies (MDB_txn *);
|
||||
rai::sync_result synchronize_one (MDB_txn *);
|
||||
rai::sync_result synchronize (MDB_txn *, rai::block_hash const &);
|
||||
boost::log::sources::logger_mt & log;
|
||||
std::deque<rai::block_hash> blocks;
|
||||
};
|
||||
class push_synchronization : public rai::block_synchronization
|
||||
{
|
||||
public:
|
||||
push_synchronization (rai::node &, std::function<rai::sync_result (MDB_txn *, rai::block const &)> const &);
|
||||
virtual ~push_synchronization () = default;
|
||||
bool synchronized (MDB_txn *, rai::block_hash const &) override;
|
||||
std::unique_ptr<rai::block> retrieve (MDB_txn *, rai::block_hash const &) override;
|
||||
rai::sync_result target (MDB_txn *, rai::block const &) override;
|
||||
std::function<rai::sync_result (MDB_txn *, rai::block const &)> target_m;
|
||||
rai::node & node;
|
||||
};
|
||||
class bootstrap_client;
|
||||
class pull_info
|
||||
{
|
||||
|
@ -103,6 +75,7 @@ public:
|
|||
void process_fork (MDB_txn *, std::shared_ptr<rai::block>);
|
||||
unsigned target_connections (size_t pulls_remaining);
|
||||
bool should_log ();
|
||||
void add_bulk_push_target (rai::block_hash const &, rai::block_hash const &);
|
||||
std::chrono::steady_clock::time_point next_log;
|
||||
std::deque<std::weak_ptr<rai::bootstrap_client>> clients;
|
||||
std::weak_ptr<rai::bootstrap_client> connection_frontier_request;
|
||||
|
@ -115,6 +88,7 @@ public:
|
|||
std::shared_ptr<rai::node> node;
|
||||
std::atomic<unsigned> account_count;
|
||||
std::atomic<uint64_t> total_blocks;
|
||||
std::vector<std::pair<rai::block_hash, rai::block_hash>> bulk_push_targets;
|
||||
bool stopped;
|
||||
std::mutex mutex;
|
||||
std::condition_variable condition;
|
||||
|
@ -128,7 +102,7 @@ public:
|
|||
void receive_frontier ();
|
||||
void received_frontier (boost::system::error_code const &, size_t);
|
||||
void request_account (rai::account const &, rai::block_hash const &);
|
||||
void unsynced (MDB_txn *, rai::account const &, rai::block_hash const &);
|
||||
void unsynced (MDB_txn *, rai::block_hash const &, rai::block_hash const &);
|
||||
void next (MDB_txn *);
|
||||
void insert_pull (rai::pull_info const &);
|
||||
std::shared_ptr<rai::bootstrap_client> connection;
|
||||
|
@ -139,6 +113,8 @@ public:
|
|||
rai::account faucet;
|
||||
std::chrono::steady_clock::time_point start_time;
|
||||
std::promise<bool> promise;
|
||||
/** A very rough estimate of the cost of `bulk_push`ing missing blocks */
|
||||
uint64_t bulk_push_cost;
|
||||
};
|
||||
class bulk_pull_client : public std::enable_shared_from_this<rai::bulk_pull_client>
|
||||
{
|
||||
|
@ -187,15 +163,15 @@ public:
|
|||
void push_block (rai::block const &);
|
||||
void send_finished ();
|
||||
std::shared_ptr<rai::bootstrap_client> connection;
|
||||
rai::push_synchronization synchronization;
|
||||
std::promise<bool> promise;
|
||||
std::pair<rai::block_hash, rai::block_hash> current_target;
|
||||
};
|
||||
class bootstrap_initiator
|
||||
{
|
||||
public:
|
||||
bootstrap_initiator (rai::node &);
|
||||
~bootstrap_initiator ();
|
||||
void bootstrap (rai::endpoint const &);
|
||||
void bootstrap (rai::endpoint const &, bool add_to_peers = true);
|
||||
void bootstrap ();
|
||||
void run_bootstrap ();
|
||||
void notify_listeners (bool);
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue