Split bootstrap source files (#2274)

* Split bootstrap source files
* Separate nano::bootstrap_attempt::process_block_lazy () function
This commit is contained in:
Sergey Kroshnin 2019-09-03 20:44:03 +03:00 committed by GitHub
commit c710267333
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
14 changed files with 3865 additions and 3770 deletions

View file

@ -27,8 +27,16 @@ add_library (node
blockprocessor.cpp
blockprocessor.hpp
blockprocessor.cpp
bootstrap.hpp
bootstrap.cpp
bootstrap/bootstrap_bulk_pull.hpp
bootstrap/bootstrap_bulk_pull.cpp
bootstrap/bootstrap_bulk_push.hpp
bootstrap/bootstrap_bulk_push.cpp
bootstrap/bootstrap_frontier.hpp
bootstrap/bootstrap_frontier.cpp
bootstrap/bootstrap_server.hpp
bootstrap/bootstrap_server.cpp
bootstrap/bootstrap.hpp
bootstrap/bootstrap.cpp
cli.hpp
cli.cpp
common.hpp

File diff suppressed because it is too large Load diff

View file

@ -1,400 +0,0 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
#include <boost/log/sources/logger.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/thread/thread.hpp>
#include <atomic>
#include <future>
#include <queue>
#include <stack>
#include <unordered_set>
namespace nano
{
class bootstrap_attempt;
class bootstrap_client;
class node;
namespace transport
{
class channel_tcp;
}
enum class sync_result
{
success,
error,
fork
};
class bootstrap_client;
class pull_info
{
public:
using count_t = nano::bulk_pull::count_t;
pull_info () = default;
pull_info (nano::account const &, nano::block_hash const &, nano::block_hash const &, count_t = 0);
nano::account account{ 0 };
nano::block_hash head{ 0 };
nano::block_hash head_original{ 0 };
nano::block_hash end{ 0 };
count_t count{ 0 };
unsigned attempts{ 0 };
uint64_t processed{ 0 };
};
enum class bootstrap_mode
{
legacy,
lazy,
wallet_lazy
};
class frontier_req_client;
class bulk_push_client;
class bulk_pull_account_client;
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
{
public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &);
bool consume_future (std::future<bool> &);
void populate_connections ();
bool request_frontier (nano::unique_lock<std::mutex> &);
void request_pull (nano::unique_lock<std::mutex> &);
void request_push (nano::unique_lock<std::mutex> &);
void add_connection (nano::endpoint const &);
void connect_client (nano::tcp_endpoint const &);
void pool_connection (std::shared_ptr<nano::bootstrap_client>);
void stop ();
void requeue_pull (nano::pull_info const &);
void add_pull (nano::pull_info const &);
bool still_pulling ();
unsigned target_connections (size_t pulls_remaining);
bool should_log ();
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, bool);
void lazy_run ();
void lazy_start (nano::block_hash const &);
void lazy_add (nano::block_hash const &);
bool lazy_finished ();
void lazy_pull_flush ();
void lazy_clear ();
void request_pending (nano::unique_lock<std::mutex> &);
void requeue_pending (nano::account const &);
void wallet_run ();
void wallet_start (std::deque<nano::account> &);
bool wallet_finished ();
std::chrono::steady_clock::time_point next_log;
std::deque<std::weak_ptr<nano::bootstrap_client>> clients;
std::weak_ptr<nano::bootstrap_client> connection_frontier_request;
std::weak_ptr<nano::frontier_req_client> frontiers;
std::weak_ptr<nano::bulk_push_client> push;
std::deque<nano::pull_info> pulls;
std::deque<std::shared_ptr<nano::bootstrap_client>> idle;
std::atomic<unsigned> connections;
std::atomic<unsigned> pulling;
std::shared_ptr<nano::node> node;
std::atomic<unsigned> account_count;
std::atomic<uint64_t> total_blocks;
std::atomic<unsigned> runs_count;
std::vector<std::pair<nano::block_hash, nano::block_hash>> bulk_push_targets;
std::atomic<bool> stopped;
nano::bootstrap_mode mode;
std::mutex mutex;
nano::condition_variable condition;
// Lazy bootstrap
std::unordered_set<nano::block_hash> lazy_blocks;
std::unordered_map<nano::block_hash, std::pair<nano::block_hash, nano::uint128_t>> lazy_state_unknown;
std::unordered_map<nano::block_hash, nano::uint128_t> lazy_balances;
std::unordered_set<nano::block_hash> lazy_keys;
std::deque<nano::block_hash> lazy_pulls;
std::atomic<uint64_t> lazy_stopped;
uint64_t lazy_max_stopped = 256;
std::mutex lazy_mutex;
// Wallet lazy bootstrap
std::deque<nano::account> wallet_accounts;
};
class frontier_req_client final : public std::enable_shared_from_this<nano::frontier_req_client>
{
public:
explicit frontier_req_client (std::shared_ptr<nano::bootstrap_client>);
~frontier_req_client ();
void run ();
void receive_frontier ();
void received_frontier (boost::system::error_code const &, size_t);
void unsynced (nano::block_hash const &, nano::block_hash const &);
void next (nano::transaction const &);
std::shared_ptr<nano::bootstrap_client> connection;
nano::account current;
nano::block_hash frontier;
unsigned count;
nano::account landing;
nano::account faucet;
std::chrono::steady_clock::time_point start_time;
std::promise<bool> promise;
/** A very rough estimate of the cost of `bulk_push`ing missing blocks */
uint64_t bulk_push_cost;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
static size_t constexpr size_frontier = sizeof (nano::account) + sizeof (nano::block_hash);
};
class bulk_pull_client final : public std::enable_shared_from_this<nano::bulk_pull_client>
{
public:
bulk_pull_client (std::shared_ptr<nano::bootstrap_client>, nano::pull_info const &);
~bulk_pull_client ();
void request ();
void receive_block ();
void throttled_receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
nano::block_hash first ();
std::shared_ptr<nano::bootstrap_client> connection;
nano::block_hash expected;
nano::account known_account;
nano::pull_info pull;
uint64_t pull_blocks;
uint64_t unexpected_count;
};
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
{
public:
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::transport::channel_tcp>);
~bootstrap_client ();
std::shared_ptr<nano::bootstrap_client> shared ();
void stop (bool force);
double block_rate () const;
double elapsed_seconds () const;
std::shared_ptr<nano::node> node;
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::shared_ptr<nano::transport::channel_tcp> channel;
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::chrono::steady_clock::time_point start_time;
std::atomic<uint64_t> block_count;
std::atomic<bool> pending_stop;
std::atomic<bool> hard_stop;
};
class bulk_push_client final : public std::enable_shared_from_this<nano::bulk_push_client>
{
public:
explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &);
~bulk_push_client ();
void start ();
void push (nano::transaction const &);
void push_block (nano::block const &);
void send_finished ();
std::shared_ptr<nano::bootstrap_client> connection;
std::promise<bool> promise;
std::pair<nano::block_hash, nano::block_hash> current_target;
};
class bulk_pull_account_client final : public std::enable_shared_from_this<nano::bulk_pull_account_client>
{
public:
bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client>, nano::account const &);
~bulk_pull_account_client ();
void request ();
void receive_pending ();
std::shared_ptr<nano::bootstrap_client> connection;
nano::account account;
uint64_t pull_blocks;
};
class cached_pulls final
{
public:
std::chrono::steady_clock::time_point time;
nano::uint512_union account_head;
nano::block_hash new_head;
};
class pulls_cache final
{
public:
void add (nano::pull_info const &);
void update_pull (nano::pull_info &);
void remove (nano::pull_info const &);
std::mutex pulls_cache_mutex;
class account_head_tag
{
};
boost::multi_index_container<
nano::cached_pulls,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<nano::cached_pulls, std::chrono::steady_clock::time_point, &nano::cached_pulls::time>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<account_head_tag>, boost::multi_index::member<nano::cached_pulls, nano::uint512_union, &nano::cached_pulls::account_head>>>>
cache;
constexpr static size_t cache_size_max = 10000;
};
class bootstrap_initiator final
{
public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true);
void bootstrap ();
void bootstrap_lazy (nano::block_hash const &, bool = false);
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();
void notify_listeners (bool);
void add_observer (std::function<void(bool)> const &);
bool in_progress ();
std::shared_ptr<nano::bootstrap_attempt> current_attempt ();
nano::pulls_cache cache;
void stop ();
private:
nano::node & node;
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::atomic<bool> stopped;
std::mutex mutex;
nano::condition_variable condition;
std::mutex observers_mutex;
std::vector<std::function<void(bool)>> observers;
boost::thread thread;
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_initiator & bootstrap_initiator, const std::string & name);
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_initiator & bootstrap_initiator, const std::string & name);
class bootstrap_server;
class bootstrap_listener final
{
public:
bootstrap_listener (uint16_t, nano::node &);
void start ();
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::socket>);
size_t connection_count ();
std::mutex mutex;
std::unordered_map<nano::bootstrap_server *, std::weak_ptr<nano::bootstrap_server>> connections;
nano::tcp_endpoint endpoint ();
nano::node & node;
std::shared_ptr<nano::server_socket> listening_socket;
bool on;
std::atomic<size_t> bootstrap_count{ 0 };
std::atomic<size_t> realtime_count{ 0 };
private:
uint16_t port;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name);
class message;
enum class bootstrap_server_type
{
undefined,
bootstrap,
realtime,
realtime_response_server // special type for tcp channel response server
};
class bootstrap_server final : public std::enable_shared_from_this<nano::bootstrap_server>
{
public:
bootstrap_server (std::shared_ptr<nano::socket>, std::shared_ptr<nano::node>);
~bootstrap_server ();
void stop ();
void receive ();
void receive_header_action (boost::system::error_code const &, size_t);
void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_frontier_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_keepalive_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_publish_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_confirm_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_confirm_ack_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_node_id_handshake_action (boost::system::error_code const &, size_t, nano::message_header const &);
void add_request (std::unique_ptr<nano::message>);
void finish_request ();
void finish_request_async ();
void run_next ();
void timeout ();
bool is_bootstrap_connection ();
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::shared_ptr<nano::socket> socket;
std::shared_ptr<nano::node> node;
std::mutex mutex;
std::queue<std::unique_ptr<nano::message>> requests;
std::atomic<bool> stopped{ false };
std::atomic<nano::bootstrap_server_type> type{ nano::bootstrap_server_type::undefined };
std::atomic<bool> keepalive_first{ true };
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
};
class bulk_pull;
class bulk_pull_server final : public std::enable_shared_from_this<nano::bulk_pull_server>
{
public:
bulk_pull_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::bulk_pull>);
void set_current_end ();
std::shared_ptr<nano::block> get_next ();
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull> request;
nano::block_hash current;
bool include_start;
nano::bulk_pull::count_t max_count;
nano::bulk_pull::count_t sent_count;
};
class bulk_pull_account;
class bulk_pull_account_server final : public std::enable_shared_from_this<nano::bulk_pull_account_server>
{
public:
bulk_pull_account_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::bulk_pull_account>);
void set_params ();
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> get_next ();
void send_frontier ();
void send_next_block ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void complete (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull_account> request;
std::unordered_set<nano::uint256_union> deduplication;
nano::pending_key current_key;
bool pending_address_only;
bool pending_include_address;
bool invalid_request;
};
class bulk_push_server final : public std::enable_shared_from_this<nano::bulk_push_server>
{
public:
explicit bulk_push_server (std::shared_ptr<nano::bootstrap_server> const &);
void throttled_receive ();
void receive ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::shared_ptr<nano::bootstrap_server> connection;
};
class frontier_req;
class frontier_req_server final : public std::enable_shared_from_this<nano::frontier_req_server>
{
public:
frontier_req_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::frontier_req>);
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
void next ();
std::shared_ptr<nano::bootstrap_server> connection;
nano::account current;
nano::block_hash frontier;
std::unique_ptr<nano::frontier_req> request;
size_t count;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
};
}

File diff suppressed because it is too large Load diff

View file

@ -0,0 +1,201 @@
#pragma once
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
#include <boost/log/sources/logger.hpp>
#include <boost/multi_index/hashed_index.hpp>
#include <boost/multi_index/member.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/random_access_index.hpp>
#include <boost/multi_index_container.hpp>
#include <boost/thread/thread.hpp>
#include <atomic>
#include <future>
#include <queue>
#include <stack>
#include <unordered_set>
namespace nano
{
class bootstrap_attempt;
class bootstrap_client;
class node;
namespace transport
{
class channel_tcp;
}
enum class sync_result
{
success,
error,
fork
};
class bootstrap_client;
enum class bootstrap_mode
{
legacy,
lazy,
wallet_lazy
};
class frontier_req_client;
class bulk_push_client;
class bootstrap_attempt final : public std::enable_shared_from_this<bootstrap_attempt>
{
public:
explicit bootstrap_attempt (std::shared_ptr<nano::node> node_a, nano::bootstrap_mode mode_a = nano::bootstrap_mode::legacy);
~bootstrap_attempt ();
void run ();
std::shared_ptr<nano::bootstrap_client> connection (nano::unique_lock<std::mutex> &);
bool consume_future (std::future<bool> &);
void populate_connections ();
bool request_frontier (nano::unique_lock<std::mutex> &);
void request_pull (nano::unique_lock<std::mutex> &);
void request_push (nano::unique_lock<std::mutex> &);
void add_connection (nano::endpoint const &);
void connect_client (nano::tcp_endpoint const &);
void pool_connection (std::shared_ptr<nano::bootstrap_client>);
void stop ();
void requeue_pull (nano::pull_info const &);
void add_pull (nano::pull_info const &);
bool still_pulling ();
unsigned target_connections (size_t pulls_remaining);
bool should_log ();
void add_bulk_push_target (nano::block_hash const &, nano::block_hash const &);
bool process_block (std::shared_ptr<nano::block>, nano::account const &, uint64_t, bool);
bool process_block_lazy (std::shared_ptr<nano::block>, nano::account const &, uint64_t);
void lazy_run ();
void lazy_start (nano::block_hash const &);
void lazy_add (nano::block_hash const &);
bool lazy_finished ();
void lazy_pull_flush ();
void lazy_clear ();
void request_pending (nano::unique_lock<std::mutex> &);
void requeue_pending (nano::account const &);
void wallet_run ();
void wallet_start (std::deque<nano::account> &);
bool wallet_finished ();
std::chrono::steady_clock::time_point next_log;
std::deque<std::weak_ptr<nano::bootstrap_client>> clients;
std::weak_ptr<nano::bootstrap_client> connection_frontier_request;
std::weak_ptr<nano::frontier_req_client> frontiers;
std::weak_ptr<nano::bulk_push_client> push;
std::deque<nano::pull_info> pulls;
std::deque<std::shared_ptr<nano::bootstrap_client>> idle;
std::atomic<unsigned> connections;
std::atomic<unsigned> pulling;
std::shared_ptr<nano::node> node;
std::atomic<unsigned> account_count;
std::atomic<uint64_t> total_blocks;
std::atomic<unsigned> runs_count;
std::vector<std::pair<nano::block_hash, nano::block_hash>> bulk_push_targets;
std::atomic<bool> stopped;
nano::bootstrap_mode mode;
std::mutex mutex;
std::condition_variable condition;
// Lazy bootstrap
std::unordered_set<nano::block_hash> lazy_blocks;
std::unordered_map<nano::block_hash, std::pair<nano::block_hash, nano::uint128_t>> lazy_state_unknown;
std::unordered_map<nano::block_hash, nano::uint128_t> lazy_balances;
std::unordered_set<nano::block_hash> lazy_keys;
std::deque<nano::block_hash> lazy_pulls;
std::atomic<uint64_t> lazy_stopped;
uint64_t lazy_max_stopped = 256;
std::mutex lazy_mutex;
// Wallet lazy bootstrap
std::deque<nano::account> wallet_accounts;
};
class bootstrap_client final : public std::enable_shared_from_this<bootstrap_client>
{
public:
bootstrap_client (std::shared_ptr<nano::node>, std::shared_ptr<nano::bootstrap_attempt>, std::shared_ptr<nano::transport::channel_tcp>);
~bootstrap_client ();
std::shared_ptr<nano::bootstrap_client> shared ();
void stop (bool force);
double block_rate () const;
double elapsed_seconds () const;
std::shared_ptr<nano::node> node;
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::shared_ptr<nano::transport::channel_tcp> channel;
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::chrono::steady_clock::time_point start_time;
std::atomic<uint64_t> block_count;
std::atomic<bool> pending_stop;
std::atomic<bool> hard_stop;
};
class cached_pulls final
{
public:
std::chrono::steady_clock::time_point time;
nano::uint512_union account_head;
nano::block_hash new_head;
};
class pulls_cache final
{
public:
void add (nano::pull_info const &);
void update_pull (nano::pull_info &);
void remove (nano::pull_info const &);
std::mutex pulls_cache_mutex;
class account_head_tag
{
};
boost::multi_index_container<
nano::cached_pulls,
boost::multi_index::indexed_by<
boost::multi_index::ordered_non_unique<boost::multi_index::member<nano::cached_pulls, std::chrono::steady_clock::time_point, &nano::cached_pulls::time>>,
boost::multi_index::hashed_unique<boost::multi_index::tag<account_head_tag>, boost::multi_index::member<nano::cached_pulls, nano::uint512_union, &nano::cached_pulls::account_head>>>>
cache;
constexpr static size_t cache_size_max = 10000;
};
class bootstrap_initiator final
{
public:
explicit bootstrap_initiator (nano::node &);
~bootstrap_initiator ();
void bootstrap (nano::endpoint const &, bool add_to_peers = true);
void bootstrap ();
void bootstrap_lazy (nano::block_hash const &, bool = false);
void bootstrap_wallet (std::deque<nano::account> &);
void run_bootstrap ();
void notify_listeners (bool);
void add_observer (std::function<void(bool)> const &);
bool in_progress ();
std::shared_ptr<nano::bootstrap_attempt> current_attempt ();
nano::pulls_cache cache;
void stop ();
private:
nano::node & node;
std::shared_ptr<nano::bootstrap_attempt> attempt;
std::atomic<bool> stopped;
std::mutex mutex;
std::condition_variable condition;
std::mutex observers_mutex;
std::vector<std::function<void(bool)>> observers;
boost::thread thread;
friend std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_initiator & bootstrap_initiator, const std::string & name);
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_initiator & bootstrap_initiator, const std::string & name);
class bootstrap_limits final
{
public:
static constexpr double bootstrap_connection_scale_target_blocks = 50000.0;
static constexpr double bootstrap_connection_warmup_time_sec = 5.0;
static constexpr double bootstrap_minimum_blocks_per_sec = 10.0;
static constexpr double bootstrap_minimum_elapsed_seconds_blockrate = 0.02;
static constexpr double bootstrap_minimum_frontier_blocks_per_sec = 1000.0;
static constexpr unsigned bootstrap_frontier_retry_limit = 16;
static constexpr double bootstrap_minimum_termination_time_sec = 30.0;
static constexpr unsigned bootstrap_max_new_connections = 10;
static constexpr unsigned bulk_push_cost_limit = 200;
};
}

View file

@ -0,0 +1,937 @@
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
nano::pull_info::pull_info (nano::account const & account_a, nano::block_hash const & head_a, nano::block_hash const & end_a, count_t count_a) :
account (account_a),
head (head_a),
head_original (head_a),
end (end_a),
count (count_a)
{
}
nano::bulk_pull_client::bulk_pull_client (std::shared_ptr<nano::bootstrap_client> connection_a, nano::pull_info const & pull_a) :
connection (connection_a),
known_account (0),
pull (pull_a),
pull_blocks (0),
unexpected_count (0)
{
nano::lock_guard<std::mutex> mutex (connection->attempt->mutex);
connection->attempt->condition.notify_all ();
}
nano::bulk_pull_client::~bulk_pull_client ()
{
// If received end block is not expected end block
if (expected != pull.end)
{
pull.head = expected;
if (connection->attempt->mode != nano::bootstrap_mode::legacy)
{
pull.account = expected;
}
pull.processed += pull_blocks - unexpected_count;
connection->attempt->requeue_pull (pull);
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull end block is not expected %1% for account %2%") % pull.end.to_string () % pull.account.to_account ()));
}
}
else
{
connection->node->bootstrap_initiator.cache.remove (pull);
}
{
nano::lock_guard<std::mutex> mutex (connection->attempt->mutex);
--connection->attempt->pulling;
}
connection->attempt->condition.notify_all ();
}
void nano::bulk_pull_client::request ()
{
expected = pull.head;
nano::bulk_pull req;
req.start = (pull.head == pull.head_original) ? pull.account : pull.head; // Account for new pulls, head for cached pulls
req.end = pull.end;
req.count = pull.count;
req.set_count_present (pull.count != 0);
if (connection->node->config.logging.bulk_pull_logging ())
{
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.try_log (boost::str (boost::format ("Requesting account %1% from %2%. %3% accounts in queue") % pull.account.to_account () % connection->channel->to_string () % connection->attempt->pulls.size ()));
}
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
{
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % connection->attempt->pulls.size ()));
}
auto this_l (shared_from_this ());
connection->channel->send (
req, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->throttled_receive_block ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error sending bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->channel->to_string ()));
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_request_failure, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_droppable false
}
void nano::bulk_pull_client::throttled_receive_block ()
{
if (!connection->node->block_processor.half_full ())
{
receive_block ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
if (!this_l->connection->pending_stop && !this_l->connection->attempt->stopped)
{
this_l->throttled_receive_block ();
}
});
}
}
void nano::bulk_pull_client::receive_block ()
{
auto this_l (shared_from_this ());
connection->channel->socket->async_read (connection->receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->received_type ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
}
});
}
void nano::bulk_pull_client::received_type ()
{
auto this_l (shared_from_this ());
nano::block_type type (static_cast<nano::block_type> (connection->receive_buffer->data ()[0]));
switch (type)
{
case nano::block_type::send:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::receive:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::open:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::change:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::state:
{
connection->channel->socket->async_read (connection->receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::not_a_block:
{
// Avoid re-using slow peers, or peers that sent the wrong blocks.
if (!connection->pending_stop && expected == pull.end)
{
connection->attempt->pool_connection (connection);
}
break;
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast<int> (type)));
}
break;
}
}
}
void nano::bulk_pull_client::received_block (boost::system::error_code const & ec, size_t size_a, nano::block_type type_a)
{
if (!ec)
{
nano::bufferstream stream (connection->receive_buffer->data (), size_a);
std::shared_ptr<nano::block> block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !nano::work_validate (*block))
{
auto hash (block->hash ());
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string block_l;
block->serialize_json (block_l, connection->node->config.logging.single_line_record ());
connection->node->logger.try_log (boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l));
}
// Is block expected?
bool block_expected (false);
if (hash == expected)
{
expected = block->previous ();
block_expected = true;
}
else
{
unexpected_count++;
}
if (pull_blocks == 0 && block_expected)
{
known_account = block->account ();
}
if (connection->block_count++ == 0)
{
connection->start_time = std::chrono::steady_clock::now ();
}
connection->attempt->total_blocks++;
bool stop_pull (connection->attempt->process_block (block, known_account, pull_blocks, block_expected));
pull_blocks++;
if (!stop_pull && !connection->hard_stop.load ())
{
/* Process block in lazy pull if not stopped
Stop usual pull request with unexpected block & more than 16k blocks processed
to prevent spam */
if (connection->attempt->mode != nano::bootstrap_mode::legacy || unexpected_count < 16384)
{
throttled_receive_block ();
}
}
else if (stop_pull && block_expected)
{
expected = pull.end;
connection->attempt->pool_connection (connection);
}
if (stop_pull)
{
connection->attempt->lazy_stopped++;
}
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Error deserializing block received from pull request");
}
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_deserialize_receive_block, nano::stat::dir::in);
}
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()));
}
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_receive_block_failure, nano::stat::dir::in);
}
}
nano::bulk_pull_account_client::bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client> connection_a, nano::account const & account_a) :
connection (connection_a),
account (account_a),
pull_blocks (0)
{
connection->attempt->condition.notify_all ();
}
nano::bulk_pull_account_client::~bulk_pull_account_client ()
{
{
nano::lock_guard<std::mutex> mutex (connection->attempt->mutex);
--connection->attempt->pulling;
}
connection->attempt->condition.notify_all ();
}
void nano::bulk_pull_account_client::request ()
{
nano::bulk_pull_account req;
req.account = account;
req.minimum_amount = connection->node->config.receive_minimum;
req.flags = nano::bulk_pull_account_flags::pending_hash_and_amount;
if (connection->node->config.logging.bulk_pull_logging ())
{
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.try_log (boost::str (boost::format ("Requesting pending for account %1% from %2%. %3% accounts in queue") % req.account.to_account () % connection->channel->to_string () % connection->attempt->wallet_accounts.size ()));
}
else if (connection->node->config.logging.network_logging () && connection->attempt->should_log ())
{
nano::unique_lock<std::mutex> lock (connection->attempt->mutex);
connection->node->logger.always_log (boost::str (boost::format ("%1% accounts in pull queue") % connection->attempt->wallet_accounts.size ()));
}
auto this_l (shared_from_this ());
connection->channel->send (
req, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->receive_pending ();
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error starting bulk pull request to %1%: to %2%") % ec.message () % this_l->connection->channel->to_string ()));
}
this_l->connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_error_starting_request, nano::stat::dir::in);
}
},
false); // is bootstrap traffic is_droppable false
}
void nano::bulk_pull_account_client::receive_pending ()
{
auto this_l (shared_from_this ());
size_t size_l (sizeof (nano::uint256_union) + sizeof (nano::uint128_union));
connection->channel->socket->async_read (connection->receive_buffer, size_l, [this_l, size_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == size_l)
{
if (!ec)
{
nano::block_hash pending;
nano::bufferstream frontier_stream (this_l->connection->receive_buffer->data (), sizeof (nano::uint256_union));
auto error1 (nano::try_read (frontier_stream, pending));
(void)error1;
assert (!error1);
nano::amount balance;
nano::bufferstream balance_stream (this_l->connection->receive_buffer->data () + sizeof (nano::uint256_union), sizeof (nano::uint128_union));
auto error2 (nano::try_read (balance_stream, balance));
(void)error2;
assert (!error2);
if (this_l->pull_blocks == 0 || !pending.is_zero ())
{
if (this_l->pull_blocks == 0 || balance.number () >= this_l->connection->node->config.receive_minimum.number ())
{
this_l->pull_blocks++;
{
if (!pending.is_zero ())
{
auto transaction (this_l->connection->node->store.tx_begin_read ());
if (!this_l->connection->node->store.block_exists (transaction, pending))
{
this_l->connection->attempt->lazy_start (pending);
}
}
}
this_l->receive_pending ();
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
}
}
else
{
this_l->connection->attempt->pool_connection (this_l->connection);
}
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.network_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while receiving bulk pull account frontier %1%") % ec.message ()));
}
}
}
else
{
this_l->connection->attempt->requeue_pending (this_l->account);
if (this_l->connection->node->config.logging.network_message_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % size_l % size_a));
}
}
});
}
/**
* Handle a request for the pull of all blocks associated with an account
* The account is supplied as the "start" member, and the final block to
* send is the "end" member. The "start" member may also be a block
* hash, in which case the that hash is used as the start of a chain
* to send. To determine if "start" is interpretted as an account or
* hash, the ledger is checked to see if the block specified exists,
* if not then it is interpretted as an account.
*
* Additionally, if "start" is specified as a block hash the range
* is inclusive of that block hash, that is the range will be:
* [start, end); In the case that a block hash is not specified the
* range will be exclusive of the frontier for that account with
* a range of (frontier, end)
*/
void nano::bulk_pull_server::set_current_end ()
{
include_start = false;
assert (request != nullptr);
auto transaction (connection->node->store.tx_begin_read ());
if (!connection->node->store.block_exists (transaction, request->end))
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull end block doesn't exist: %1%, sending everything") % request->end.to_string ()));
}
request->end.clear ();
}
if (connection->node->store.block_exists (transaction, request->start))
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Bulk pull request for block hash: %1%") % request->start.to_string ()));
}
current = request->start;
include_start = true;
}
else
{
nano::account_info info;
auto no_address (connection->node->store.account_get (transaction, request->start, info));
if (no_address)
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Request for unknown account: %1%") % request->start.to_account ()));
}
current = request->end;
}
else
{
current = info.head;
if (!request->end.is_zero ())
{
auto account (connection->node->ledger.account (transaction, request->end));
if (account != request->start)
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Request for block that is not on account chain: %1% not on %2%") % request->end.to_string () % request->start.to_account ()));
}
current = request->end;
}
}
}
}
sent_count = 0;
if (request->is_count_present ())
{
max_count = request->count;
}
else
{
max_count = 0;
}
}
void nano::bulk_pull_server::send_next ()
{
auto block (get_next ());
if (block != nullptr)
{
std::vector<uint8_t> send_buffer;
{
nano::vectorstream stream (send_buffer);
nano::serialize_block (stream, *block);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block->hash ().to_string ()));
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
send_finished ();
}
}
std::shared_ptr<nano::block> nano::bulk_pull_server::get_next ()
{
std::shared_ptr<nano::block> result;
bool send_current = false, set_current_to_end = false;
/*
* Determine if we should reply with a block
*
* If our cursor is on the final block, we should signal that we
* are done by returning a null result.
*
* Unless we are including the "start" member and this is the
* start member, then include it anyway.
*/
if (current != request->end)
{
send_current = true;
}
else if (current == request->end && include_start == true)
{
send_current = true;
/*
* We also need to ensure that the next time
* are invoked that we return a null result
*/
set_current_to_end = true;
}
/*
* Account for how many blocks we have provided. If this
* exceeds the requested maximum, return an empty object
* to signal the end of results
*/
if (max_count != 0 && sent_count >= max_count)
{
send_current = false;
}
if (send_current)
{
auto transaction (connection->node->store.tx_begin_read ());
result = connection->node->store.block_get (transaction, current);
if (result != nullptr && set_current_to_end == false)
{
auto previous (result->previous ());
if (!previous.is_zero ())
{
current = previous;
}
else
{
current = request->end;
}
}
else
{
current = request->end;
}
sent_count++;
}
/*
* Once we have processed "get_next()" once our cursor is no longer on
* the "start" member, so this flag is not relevant is always false.
*/
include_start = false;
return result;
}
void nano::bulk_pull_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
send_next ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
}
}
}
void nano::bulk_pull_server::send_finished ()
{
nano::shared_const_buffer send_buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk sending finished");
}
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->no_block_sent (ec, size_a);
});
}
void nano::bulk_pull_server::no_block_sent (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
assert (size_a == 1);
connection->finish_request ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Unable to send not-a-block");
}
}
}
nano::bulk_pull_server::bulk_pull_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::bulk_pull> request_a) :
connection (connection_a),
request (std::move (request_a))
{
set_current_end ();
}
/**
* Bulk pull blocks related to an account
*/
void nano::bulk_pull_account_server::set_params ()
{
assert (request != nullptr);
/*
* Parse the flags
*/
invalid_request = false;
pending_include_address = false;
pending_address_only = false;
if (request->flags == nano::bulk_pull_account_flags::pending_address_only)
{
pending_address_only = true;
}
else if (request->flags == nano::bulk_pull_account_flags::pending_hash_amount_and_address)
{
/**
** This is the same as "pending_hash_and_amount" but with the
** sending address appended, for UI purposes mainly.
**/
pending_include_address = true;
}
else if (request->flags == nano::bulk_pull_account_flags::pending_hash_and_amount)
{
/** The defaults are set above **/
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Invalid bulk_pull_account flags supplied %1%") % static_cast<uint8_t> (request->flags)));
}
invalid_request = true;
return;
}
/*
* Initialize the current item from the requested account
*/
current_key.account = request->account;
current_key.hash = 0;
}
void nano::bulk_pull_account_server::send_frontier ()
{
/*
* This function is really the entry point into this class,
* so handle the invalid_request case by terminating the
* request without any response
*/
if (!invalid_request)
{
auto stream_transaction (connection->node->store.tx_begin_read ());
// Get account balance and frontier block hash
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
nano::uint128_union account_frontier_balance (account_frontier_balance_int);
// Write the frontier block hash and balance into a buffer
std::vector<uint8_t> send_buffer;
{
nano::vectorstream output_stream (send_buffer);
write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}
// Send the buffer to the requestor
auto this_l (shared_from_this ());
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
}
void nano::bulk_pull_account_server::send_next_block ()
{
/*
* Get the next item from the queue, it is a tuple with the key (which
* contains the account and hash) and data (which contains the amount)
*/
auto block_data (get_next ());
auto block_info_key (block_data.first.get ());
auto block_info (block_data.second.get ());
if (block_info_key != nullptr)
{
/*
* If we have a new item, emit it to the socket
*/
std::vector<uint8_t> send_buffer;
if (pending_address_only)
{
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending address: %1%") % block_info->source.to_string ()));
}
write (output_stream, block_info->source.bytes);
}
else
{
nano::vectorstream output_stream (send_buffer);
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending block: %1%") % block_info_key->hash.to_string ()));
}
write (output_stream, block_info_key->hash.bytes);
write (output_stream, block_info->amount.bytes);
if (pending_include_address)
{
/**
** Write the source address as well, if requested
**/
write (output_stream, block_info->source.bytes);
}
}
auto this_l (shared_from_this ());
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
/*
* Otherwise, finalize the connection
*/
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Done sending blocks")));
}
send_finished ();
}
}
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> nano::bulk_pull_account_server::get_next ()
{
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> result;
while (true)
{
/*
* For each iteration of this loop, establish and then
* destroy a database transaction, to avoid locking the
* database for a prolonged period.
*/
auto stream_transaction (connection->node->store.tx_begin_read ());
auto stream (connection->node->store.pending_begin (stream_transaction, current_key));
if (stream == nano::store_iterator<nano::pending_key, nano::pending_info> (nullptr))
{
break;
}
nano::pending_key key (stream->first);
nano::pending_info info (stream->second);
/*
* Get the key for the next value, to use in the next call or iteration
*/
current_key.account = key.account;
current_key.hash = key.hash.number () + 1;
/*
* Finish up if the response is for a different account
*/
if (key.account != request->account)
{
break;
}
/*
* Skip entries where the amount is less than the requested
* minimum
*/
if (info.amount < request->minimum_amount)
{
continue;
}
/*
* If the pending_address_only flag is set, de-duplicate the
* responses. The responses are the address of the sender,
* so they are are part of the pending table's information
* and not key, so we have to de-duplicate them manually.
*/
if (pending_address_only)
{
if (!deduplication.insert (info.source).second)
{
/*
* If the deduplication map gets too
* large, clear it out. This may
* result in some duplicates getting
* sent to the client, but we do not
* want to commit too much memory
*/
if (deduplication.size () > 4096)
{
deduplication.clear ();
}
continue;
}
}
result.first = std::unique_ptr<nano::pending_key> (new nano::pending_key (key));
result.second = std::unique_ptr<nano::pending_info> (new nano::pending_info (info));
break;
}
return result;
}
void nano::bulk_pull_account_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
send_next_block ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Unable to bulk send block: %1%") % ec.message ()));
}
}
}
void nano::bulk_pull_account_server::send_finished ()
{
/*
* The "bulk_pull_account" final sequence is a final block of all
* zeros. If we are sending only account public keys (with the
* "pending_address_only" flag) then it will be 256-bits of zeros,
* otherwise it will be either 384-bits of zeros (if the
* "pending_include_address" flag is not set) or 640-bits of zeros
* (if that flag is set).
*/
std::vector<uint8_t> send_buffer;
{
nano::vectorstream output_stream (send_buffer);
nano::uint256_union account_zero (0);
nano::uint128_union balance_zero (0);
write (output_stream, account_zero.bytes);
if (!pending_address_only)
{
write (output_stream, balance_zero.bytes);
if (pending_include_address)
{
write (output_stream, account_zero.bytes);
}
}
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk sending for an account finished");
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->complete (ec, size_a);
});
}
void nano::bulk_pull_account_server::complete (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
if (pending_address_only)
{
assert (size_a == 32);
}
else
{
if (pending_include_address)
{
assert (size_a == 80);
}
else
{
assert (size_a == 48);
}
}
connection->finish_request ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Unable to pending-as-zero");
}
}
}
nano::bulk_pull_account_server::bulk_pull_account_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::bulk_pull_account> request_a) :
connection (connection_a),
request (std::move (request_a)),
current_key (0, 0)
{
/*
* Setup the streaming response for the first call to "send_frontier" and "send_next_block"
*/
set_params ();
}

View file

@ -0,0 +1,93 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
#include <unordered_set>
namespace nano
{
class pull_info
{
public:
using count_t = nano::bulk_pull::count_t;
pull_info () = default;
pull_info (nano::account const &, nano::block_hash const &, nano::block_hash const &, count_t = 0);
nano::account account{ 0 };
nano::block_hash head{ 0 };
nano::block_hash head_original{ 0 };
nano::block_hash end{ 0 };
count_t count{ 0 };
unsigned attempts{ 0 };
uint64_t processed{ 0 };
};
class bootstrap_client;
class bulk_pull_client final : public std::enable_shared_from_this<nano::bulk_pull_client>
{
public:
bulk_pull_client (std::shared_ptr<nano::bootstrap_client>, nano::pull_info const &);
~bulk_pull_client ();
void request ();
void receive_block ();
void throttled_receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
nano::block_hash first ();
std::shared_ptr<nano::bootstrap_client> connection;
nano::block_hash expected;
nano::account known_account;
nano::pull_info pull;
uint64_t pull_blocks;
uint64_t unexpected_count;
};
class bulk_pull_account_client final : public std::enable_shared_from_this<nano::bulk_pull_account_client>
{
public:
bulk_pull_account_client (std::shared_ptr<nano::bootstrap_client>, nano::account const &);
~bulk_pull_account_client ();
void request ();
void receive_pending ();
std::shared_ptr<nano::bootstrap_client> connection;
nano::account account;
uint64_t pull_blocks;
};
class bootstrap_server;
class bulk_pull;
class bulk_pull_server final : public std::enable_shared_from_this<nano::bulk_pull_server>
{
public:
bulk_pull_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::bulk_pull>);
void set_current_end ();
std::shared_ptr<nano::block> get_next ();
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull> request;
nano::block_hash current;
bool include_start;
nano::bulk_pull::count_t max_count;
nano::bulk_pull::count_t sent_count;
};
class bulk_pull_account;
class bulk_pull_account_server final : public std::enable_shared_from_this<nano::bulk_pull_account_server>
{
public:
bulk_pull_account_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::bulk_pull_account>);
void set_params ();
std::pair<std::unique_ptr<nano::pending_key>, std::unique_ptr<nano::pending_info>> get_next ();
void send_frontier ();
void send_next_block ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void complete (boost::system::error_code const &, size_t);
std::shared_ptr<nano::bootstrap_server> connection;
std::unique_ptr<nano::bulk_pull_account> request;
std::unordered_set<nano::uint256_union> deduplication;
nano::pending_key current_key;
bool pending_address_only;
bool pending_include_address;
bool invalid_request;
};
}

View file

@ -0,0 +1,256 @@
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_bulk_push.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
nano::bulk_push_client::bulk_push_client (std::shared_ptr<nano::bootstrap_client> const & connection_a) :
connection (connection_a)
{
}
nano::bulk_push_client::~bulk_push_client ()
{
}
void nano::bulk_push_client::start ()
{
nano::bulk_push message;
auto this_l (shared_from_this ());
connection->channel->send (
message, [this_l](boost::system::error_code const & ec, size_t size_a) {
auto transaction (this_l->connection->node->store.tx_begin_read ());
if (!ec)
{
this_l->push (transaction);
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Unable to send bulk_push request: %1%") % ec.message ()));
}
}
},
false); // is bootstrap traffic is_droppable false
}
void nano::bulk_push_client::push (nano::transaction const & transaction_a)
{
std::shared_ptr<nano::block> block;
bool finished (false);
while (block == nullptr && !finished)
{
if (current_target.first.is_zero () || current_target.first == current_target.second)
{
nano::lock_guard<std::mutex> guard (connection->attempt->mutex);
if (!connection->attempt->bulk_push_targets.empty ())
{
current_target = connection->attempt->bulk_push_targets.back ();
connection->attempt->bulk_push_targets.pop_back ();
}
else
{
finished = true;
}
}
if (!finished)
{
block = connection->node->store.block_get (transaction_a, current_target.first);
if (block == nullptr)
{
current_target.first = nano::block_hash (0);
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk pushing range ", current_target.first.to_string (), " down to ", current_target.second.to_string ());
}
}
}
}
if (finished)
{
send_finished ();
}
else
{
current_target.first = block->previous ();
push_block (*block);
}
}
void nano::bulk_push_client::send_finished ()
{
nano::shared_const_buffer buffer (static_cast<uint8_t> (nano::block_type::not_a_block));
auto this_l (shared_from_this ());
connection->channel->send_buffer (buffer, nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
try
{
this_l->promise.set_value (false);
}
catch (std::future_error &)
{
}
});
}
void nano::bulk_push_client::push_block (nano::block const & block_a)
{
std::vector<uint8_t> buffer;
{
nano::vectorstream stream (buffer);
nano::serialize_block (stream, block_a);
}
auto this_l (shared_from_this ());
connection->channel->send_buffer (nano::shared_const_buffer (std::move (buffer)), nano::stat::detail::all, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
auto transaction (this_l->connection->node->store.tx_begin_read ());
this_l->push (transaction);
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error sending block during bulk push: %1%") % ec.message ()));
}
}
});
}
nano::bulk_push_server::bulk_push_server (std::shared_ptr<nano::bootstrap_server> const & connection_a) :
receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
connection (connection_a)
{
receive_buffer->resize (256);
}
void nano::bulk_push_server::throttled_receive ()
{
if (!connection->node->block_processor.half_full ())
{
receive ();
}
else
{
auto this_l (shared_from_this ());
connection->node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (1), [this_l]() {
if (!this_l->connection->stopped)
{
this_l->throttled_receive ();
}
});
}
}
void nano::bulk_push_server::receive ()
{
if (connection->node->bootstrap_initiator.in_progress ())
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Aborting bulk_push because a bootstrap attempt is in progress");
}
}
else
{
auto this_l (shared_from_this ());
connection->socket->async_read (receive_buffer, 1, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->received_type ();
}
else
{
if (this_l->connection->node->config.logging.bulk_pull_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error receiving block type: %1%") % ec.message ()));
}
}
});
}
}
void nano::bulk_push_server::received_type ()
{
auto this_l (shared_from_this ());
nano::block_type type (static_cast<nano::block_type> (receive_buffer->data ()[0]));
switch (type)
{
case nano::block_type::send:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::send, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::send_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::receive:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::receive, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::receive_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::open:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::open, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::open_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::change:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::change, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::change_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::state:
{
connection->node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::state_block, nano::stat::dir::in);
connection->socket->async_read (receive_buffer, nano::state_block::size, [this_l, type](boost::system::error_code const & ec, size_t size_a) {
this_l->received_block (ec, size_a, type);
});
break;
}
case nano::block_type::not_a_block:
{
connection->finish_request ();
break;
}
default:
{
if (connection->node->config.logging.network_packet_logging ())
{
connection->node->logger.try_log ("Unknown type received as block type");
}
break;
}
}
}
void nano::bulk_push_server::received_block (boost::system::error_code const & ec, size_t size_a, nano::block_type type_a)
{
if (!ec)
{
nano::bufferstream stream (receive_buffer->data (), size_a);
auto block (nano::deserialize_block (stream, type_a));
if (block != nullptr && !nano::work_validate (*block))
{
connection->node->process_active (std::move (block));
throttled_receive ();
}
else
{
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Error deserializing block received from pull request");
}
}
}
}

View file

@ -0,0 +1,35 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
namespace nano
{
class transaction;
class bootstrap_client;
class bulk_push_client final : public std::enable_shared_from_this<nano::bulk_push_client>
{
public:
explicit bulk_push_client (std::shared_ptr<nano::bootstrap_client> const &);
~bulk_push_client ();
void start ();
void push (nano::transaction const &);
void push_block (nano::block const &);
void send_finished ();
std::shared_ptr<nano::bootstrap_client> connection;
std::promise<bool> promise;
std::pair<nano::block_hash, nano::block_hash> current_target;
};
class bootstrap_server;
class bulk_push_server final : public std::enable_shared_from_this<nano::bulk_push_server>
{
public:
explicit bulk_push_server (std::shared_ptr<nano::bootstrap_server> const &);
void throttled_receive ();
void receive ();
void received_type ();
void received_block (boost::system::error_code const &, size_t, nano::block_type);
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::shared_ptr<nano::bootstrap_server> connection;
};
}

View file

@ -0,0 +1,342 @@
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
constexpr double nano::bootstrap_limits::bootstrap_connection_warmup_time_sec;
constexpr double nano::bootstrap_limits::bootstrap_minimum_elapsed_seconds_blockrate;
constexpr double nano::bootstrap_limits::bootstrap_minimum_frontier_blocks_per_sec;
constexpr unsigned nano::bootstrap_limits::bulk_push_cost_limit;
constexpr size_t nano::frontier_req_client::size_frontier;
void nano::frontier_req_client::run ()
{
nano::frontier_req request;
request.start.clear ();
request.age = std::numeric_limits<decltype (request.age)>::max ();
request.count = std::numeric_limits<decltype (request.count)>::max ();
auto this_l (shared_from_this ());
connection->channel->send (
request, [this_l](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
this_l->receive_frontier ();
}
else
{
if (this_l->connection->node->config.logging.network_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Error while sending bootstrap request %1%") % ec.message ()));
}
}
},
false); // is bootstrap traffic is_droppable false
}
nano::frontier_req_client::frontier_req_client (std::shared_ptr<nano::bootstrap_client> connection_a) :
connection (connection_a),
current (0),
count (0),
bulk_push_cost (0)
{
auto transaction (connection->node->store.tx_begin_read ());
next (transaction);
}
nano::frontier_req_client::~frontier_req_client ()
{
}
void nano::frontier_req_client::receive_frontier ()
{
auto this_l (shared_from_this ());
connection->channel->socket->async_read (connection->receive_buffer, nano::frontier_req_client::size_frontier, [this_l](boost::system::error_code const & ec, size_t size_a) {
// An issue with asio is that sometimes, instead of reporting a bad file descriptor during disconnect,
// we simply get a size of 0.
if (size_a == nano::frontier_req_client::size_frontier)
{
this_l->received_frontier (ec, size_a);
}
else
{
if (this_l->connection->node->config.logging.network_message_logging ())
{
this_l->connection->node->logger.try_log (boost::str (boost::format ("Invalid size: expected %1%, got %2%") % nano::frontier_req_client::size_frontier % size_a));
}
}
});
}
void nano::frontier_req_client::unsynced (nano::block_hash const & head, nano::block_hash const & end)
{
if (bulk_push_cost < nano::bootstrap_limits::bulk_push_cost_limit)
{
connection->attempt->add_bulk_push_target (head, end);
if (end.is_zero ())
{
bulk_push_cost += 2;
}
else
{
bulk_push_cost += 1;
}
}
}
void nano::frontier_req_client::received_frontier (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
assert (size_a == nano::frontier_req_client::size_frontier);
nano::account account;
nano::bufferstream account_stream (connection->receive_buffer->data (), sizeof (account));
auto error1 (nano::try_read (account_stream, account));
(void)error1;
assert (!error1);
nano::block_hash latest;
nano::bufferstream latest_stream (connection->receive_buffer->data () + sizeof (account), sizeof (latest));
auto error2 (nano::try_read (latest_stream, latest));
(void)error2;
assert (!error2);
if (count == 0)
{
start_time = std::chrono::steady_clock::now ();
}
++count;
std::chrono::duration<double> time_span = std::chrono::duration_cast<std::chrono::duration<double>> (std::chrono::steady_clock::now () - start_time);
double elapsed_sec = std::max (time_span.count (), nano::bootstrap_limits::bootstrap_minimum_elapsed_seconds_blockrate);
double blocks_per_sec = static_cast<double> (count) / elapsed_sec;
if (elapsed_sec > nano::bootstrap_limits::bootstrap_connection_warmup_time_sec && blocks_per_sec < nano::bootstrap_limits::bootstrap_minimum_frontier_blocks_per_sec)
{
connection->node->logger.try_log (boost::str (boost::format ("Aborting frontier req because it was too slow")));
promise.set_value (true);
return;
}
if (connection->attempt->should_log ())
{
connection->node->logger.always_log (boost::str (boost::format ("Received %1% frontiers from %2%") % std::to_string (count) % connection->channel->to_string ()));
}
auto transaction (connection->node->store.tx_begin_read ());
if (!account.is_zero ())
{
while (!current.is_zero () && current < account)
{
// We know about an account they don't.
unsynced (frontier, 0);
next (transaction);
}
if (!current.is_zero ())
{
if (account == current)
{
if (latest == frontier)
{
// In sync
}
else
{
if (connection->node->store.block_exists (transaction, latest))
{
// We know about a block they don't.
unsynced (frontier, latest);
}
else
{
connection->attempt->add_pull (nano::pull_info (account, latest, frontier));
// Either we're behind or there's a fork we differ on
// Either way, bulk pushing will probably not be effective
bulk_push_cost += 5;
}
}
next (transaction);
}
else
{
assert (account < current);
connection->attempt->add_pull (nano::pull_info (account, latest, nano::block_hash (0)));
}
}
else
{
connection->attempt->add_pull (nano::pull_info (account, latest, nano::block_hash (0)));
}
receive_frontier ();
}
else
{
while (!current.is_zero ())
{
// We know about an account they don't.
unsynced (frontier, 0);
next (transaction);
}
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log ("Bulk push cost: ", bulk_push_cost);
}
{
try
{
promise.set_value (false);
}
catch (std::future_error &)
{
}
connection->attempt->pool_connection (connection);
}
}
}
else
{
if (connection->node->config.logging.network_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error while receiving frontier %1%") % ec.message ()));
}
}
}
void nano::frontier_req_client::next (nano::transaction const & transaction_a)
{
// Filling accounts deque to prevent often read transactions
if (accounts.empty ())
{
size_t max_size (128);
for (auto i (connection->node->store.latest_begin (transaction_a, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i)
{
nano::account_info const & info (i->second);
nano::account const & account (i->first);
accounts.emplace_back (account, info.head);
}
/* If loop breaks before max_size, then latest_end () is reached
Add empty record to finish frontier_req_server */
if (accounts.size () != max_size)
{
accounts.emplace_back (nano::account (0), nano::block_hash (0));
}
}
// Retrieving accounts from deque
auto const & account_pair (accounts.front ());
current = account_pair.first;
frontier = account_pair.second;
accounts.pop_front ();
}
nano::frontier_req_server::frontier_req_server (std::shared_ptr<nano::bootstrap_server> const & connection_a, std::unique_ptr<nano::frontier_req> request_a) :
connection (connection_a),
current (request_a->start.number () - 1),
frontier (0),
request (std::move (request_a)),
count (0)
{
next ();
}
void nano::frontier_req_server::send_next ()
{
if (!current.is_zero () && count < request->count)
{
std::vector<uint8_t> send_buffer;
{
nano::vectorstream stream (send_buffer);
write (stream, current.bytes);
write (stream, frontier.bytes);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.bulk_pull_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Sending frontier for %1% %2%") % current.to_account () % frontier.to_string ()));
}
next ();
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
else
{
send_finished ();
}
}
void nano::frontier_req_server::send_finished ()
{
std::vector<uint8_t> send_buffer;
{
nano::vectorstream stream (send_buffer);
nano::uint256_union zero (0);
write (stream, zero.bytes);
write (stream, zero.bytes);
}
auto this_l (shared_from_this ());
if (connection->node->config.logging.network_logging ())
{
connection->node->logger.try_log ("Frontier sending finished");
}
connection->socket->async_write (nano::shared_const_buffer (std::move (send_buffer)), [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->no_block_sent (ec, size_a);
});
}
void nano::frontier_req_server::no_block_sent (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
connection->finish_request ();
}
else
{
if (connection->node->config.logging.network_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending frontier finish: %1%") % ec.message ()));
}
}
}
void nano::frontier_req_server::sent_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
count++;
send_next ();
}
else
{
if (connection->node->config.logging.network_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending frontier pair: %1%") % ec.message ()));
}
}
}
void nano::frontier_req_server::next ()
{
// Filling accounts deque to prevent often read transactions
if (accounts.empty ())
{
auto now (nano::seconds_since_epoch ());
bool skip_old (request->age != std::numeric_limits<decltype (request->age)>::max ());
size_t max_size (128);
auto transaction (connection->node->store.tx_begin_read ());
for (auto i (connection->node->store.latest_begin (transaction, current.number () + 1)), n (connection->node->store.latest_end ()); i != n && accounts.size () != max_size; ++i)
{
nano::account_info const & info (i->second);
if (!skip_old || (now - info.modified) <= request->age)
{
nano::account const & account (i->first);
accounts.emplace_back (account, info.head);
}
}
/* If loop breaks before max_size, then latest_end () is reached
Add empty record to finish frontier_req_server */
if (accounts.size () != max_size)
{
accounts.emplace_back (nano::account (0), nano::block_hash (0));
}
}
// Retrieving accounts from deque
auto const & account_pair (accounts.front ());
current = account_pair.first;
frontier = account_pair.second;
accounts.pop_front ();
}

View file

@ -0,0 +1,51 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
namespace nano
{
class transaction;
class bootstrap_client;
class frontier_req_client final : public std::enable_shared_from_this<nano::frontier_req_client>
{
public:
explicit frontier_req_client (std::shared_ptr<nano::bootstrap_client>);
~frontier_req_client ();
void run ();
void receive_frontier ();
void received_frontier (boost::system::error_code const &, size_t);
void unsynced (nano::block_hash const &, nano::block_hash const &);
void next (nano::transaction const &);
std::shared_ptr<nano::bootstrap_client> connection;
nano::account current;
nano::block_hash frontier;
unsigned count;
nano::account landing;
nano::account faucet;
std::chrono::steady_clock::time_point start_time;
std::promise<bool> promise;
/** A very rough estimate of the cost of `bulk_push`ing missing blocks */
uint64_t bulk_push_cost;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
static size_t constexpr size_frontier = sizeof (nano::account) + sizeof (nano::block_hash);
};
class bootstrap_server;
class frontier_req;
class frontier_req_server final : public std::enable_shared_from_this<nano::frontier_req_server>
{
public:
frontier_req_server (std::shared_ptr<nano::bootstrap_server> const &, std::unique_ptr<nano::frontier_req>);
void send_next ();
void sent_action (boost::system::error_code const &, size_t);
void send_finished ();
void no_block_sent (boost::system::error_code const &, size_t);
void next ();
std::shared_ptr<nano::bootstrap_server> connection;
nano::account current;
nano::block_hash frontier;
std::unique_ptr<nano::frontier_req> request;
size_t count;
std::deque<std::pair<nano::account, nano::block_hash>> accounts;
};
}

View file

@ -0,0 +1,658 @@
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/tcp.hpp>
nano::bootstrap_listener::bootstrap_listener (uint16_t port_a, nano::node & node_a) :
node (node_a),
port (port_a)
{
}
void nano::bootstrap_listener::start ()
{
listening_socket = std::make_shared<nano::server_socket> (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.tcp_incoming_connections_max);
boost::system::error_code ec;
listening_socket->start (ec);
if (ec)
{
node.logger.try_log (boost::str (boost::format ("Error while binding for incoming TCP/bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ()));
throw std::runtime_error (ec.message ());
}
listening_socket->on_connection ([this](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
bool keep_accepting = true;
if (ec_a)
{
keep_accepting = false;
this->node.logger.try_log (boost::str (boost::format ("Error while accepting incoming TCP/bootstrap connections: %1%") % ec_a.message ()));
}
else
{
accept_action (ec_a, new_connection);
}
return keep_accepting;
});
}
void nano::bootstrap_listener::stop ()
{
decltype (connections) connections_l;
{
nano::lock_guard<std::mutex> lock (mutex);
on = false;
connections_l.swap (connections);
}
if (listening_socket)
{
listening_socket->close ();
listening_socket = nullptr;
}
}
size_t nano::bootstrap_listener::connection_count ()
{
nano::lock_guard<std::mutex> lock (mutex);
return connections.size ();
}
void nano::bootstrap_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::socket> socket_a)
{
auto connection (std::make_shared<nano::bootstrap_server> (socket_a, node.shared ()));
{
nano::lock_guard<std::mutex> lock (mutex);
connections[connection.get ()] = connection;
connection->receive ();
}
}
boost::asio::ip::tcp::endpoint nano::bootstrap_listener::endpoint ()
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listening_socket->listening_port ());
}
namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name)
{
auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type);
auto composite = std::make_unique<seq_con_info_composite> (name);
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "connections", bootstrap_listener.connection_count (), sizeof_element }));
return composite;
}
}
nano::bootstrap_server::~bootstrap_server ()
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log ("Exiting incoming TCP/bootstrap server");
}
if (type == nano::bootstrap_server_type::bootstrap)
{
--node->bootstrap.bootstrap_count;
}
else if (type == nano::bootstrap_server_type::realtime)
{
--node->bootstrap.realtime_count;
node->network.response_channels.remove (remote_endpoint);
// Clear temporary channel
auto exisiting_response_channel (node->network.tcp_channels.find_channel (remote_endpoint));
if (exisiting_response_channel != nullptr)
{
exisiting_response_channel->server = false;
node->network.tcp_channels.erase (remote_endpoint);
}
}
stop ();
nano::lock_guard<std::mutex> lock (node->bootstrap.mutex);
node->bootstrap.connections.erase (this);
}
void nano::bootstrap_server::stop ()
{
if (!stopped.exchange (true))
{
if (socket != nullptr)
{
socket->close ();
}
}
}
nano::bootstrap_server::bootstrap_server (std::shared_ptr<nano::socket> socket_a, std::shared_ptr<nano::node> node_a) :
receive_buffer (std::make_shared<std::vector<uint8_t>> ()),
socket (socket_a),
node (node_a)
{
receive_buffer->resize (512);
}
void nano::bootstrap_server::receive ()
{
// Increase timeout to receive TCP header (idle server socket)
socket->set_timeout (node->network_params.node.idle_timeout);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, 8, [this_l](boost::system::error_code const & ec, size_t size_a) {
// Set remote_endpoint
if (this_l->remote_endpoint.port () == 0)
{
this_l->remote_endpoint = this_l->socket->remote_endpoint ();
}
// Decrease timeout to default
this_l->socket->set_timeout (this_l->node->config.tcp_io_timeout);
// Receive header
this_l->receive_header_action (ec, size_a);
});
}
void nano::bootstrap_server::receive_header_action (boost::system::error_code const & ec, size_t size_a)
{
if (!ec)
{
assert (size_a == 8);
nano::bufferstream type_stream (receive_buffer->data (), size_a);
auto error (false);
nano::message_header header (error, type_stream);
if (!error)
{
switch (header.type)
{
case nano::message_type::bulk_pull:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull, nano::stat::dir::in);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_bulk_pull_action (ec, size_a, header);
});
break;
}
case nano::message_type::bulk_pull_account:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_pull_account, nano::stat::dir::in);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_bulk_pull_account_action (ec, size_a, header);
});
break;
}
case nano::message_type::frontier_req:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::frontier_req, nano::stat::dir::in);
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_frontier_req_action (ec, size_a, header);
});
break;
}
case nano::message_type::bulk_push:
{
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::bulk_push, nano::stat::dir::in);
if (is_bootstrap_connection ())
{
add_request (std::unique_ptr<nano::message> (new nano::bulk_push (header)));
}
break;
}
case nano::message_type::keepalive:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_keepalive_action (ec, size_a, header);
});
break;
}
case nano::message_type::publish:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_publish_action (ec, size_a, header);
});
break;
}
case nano::message_type::confirm_ack:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_confirm_ack_action (ec, size_a, header);
});
break;
}
case nano::message_type::confirm_req:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_confirm_req_action (ec, size_a, header);
});
break;
}
case nano::message_type::node_id_handshake:
{
auto this_l (shared_from_this ());
socket->async_read (receive_buffer, header.payload_length_bytes (), [this_l, header](boost::system::error_code const & ec, size_t size_a) {
this_l->receive_node_id_handshake_action (ec, size_a, header);
});
break;
}
default:
{
if (node->config.logging.network_logging ())
{
node->logger.try_log (boost::str (boost::format ("Received invalid type from bootstrap connection %1%") % static_cast<uint8_t> (header.type)));
}
break;
}
}
}
}
else
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error while receiving type: %1%") % ec.message ()));
}
}
}
void nano::bootstrap_server::receive_bulk_pull_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::bulk_pull> request (new nano::bulk_pull (error, stream, header_a));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Received bulk pull for %1% down to %2%, maximum of %3%") % request->start.to_string () % request->end.to_string () % (request->count ? request->count : std::numeric_limits<double>::infinity ())));
}
if (is_bootstrap_connection ())
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
}
void nano::bootstrap_server::receive_bulk_pull_account_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
assert (size_a == header_a.payload_length_bytes ());
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::bulk_pull_account> request (new nano::bulk_pull_account (error, stream, header_a));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Received bulk pull account for %1% with a minimum amount of %2%") % request->account.to_account () % nano::amount (request->minimum_amount).format_balance (nano::Mxrb_ratio, 10, true)));
}
if (is_bootstrap_connection ())
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
}
void nano::bootstrap_server::receive_frontier_req_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::frontier_req> request (new nano::frontier_req (error, stream, header_a));
if (!error)
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log (boost::str (boost::format ("Received frontier request for %1% with age %2%") % request->start.to_string () % request->age));
}
if (is_bootstrap_connection ())
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else
{
if (node->config.logging.network_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error sending receiving frontier request: %1%") % ec.message ()));
}
}
}
void nano::bootstrap_server::receive_keepalive_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::keepalive> request (new nano::keepalive (error, stream, header_a));
if (!error)
{
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else
{
if (node->config.logging.network_keepalive_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error receiving keepalive: %1%") % ec.message ()));
}
}
}
void nano::bootstrap_server::receive_publish_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::publish> request (new nano::publish (error, stream, header_a));
if (!error)
{
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else
{
if (node->config.logging.network_message_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error receiving publish: %1%") % ec.message ()));
}
}
}
void nano::bootstrap_server::receive_confirm_req_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::confirm_req> request (new nano::confirm_req (error, stream, header_a));
if (!error)
{
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else if (node->config.logging.network_message_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error receiving confirm_req: %1%") % ec.message ()));
}
}
void nano::bootstrap_server::receive_confirm_ack_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::confirm_ack> request (new nano::confirm_ack (error, stream, header_a));
if (!error)
{
if (type == nano::bootstrap_server_type::realtime || type == nano::bootstrap_server_type::realtime_response_server)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else if (node->config.logging.network_message_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error receiving confirm_ack: %1%") % ec.message ()));
}
}
void nano::bootstrap_server::receive_node_id_handshake_action (boost::system::error_code const & ec, size_t size_a, nano::message_header const & header_a)
{
if (!ec)
{
auto error (false);
nano::bufferstream stream (receive_buffer->data (), size_a);
std::unique_ptr<nano::node_id_handshake> request (new nano::node_id_handshake (error, stream, header_a));
if (!error)
{
if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_tcp_realtime)
{
add_request (std::unique_ptr<nano::message> (request.release ()));
}
receive ();
}
}
else if (node->config.logging.network_node_id_handshake_logging ())
{
node->logger.try_log (boost::str (boost::format ("Error receiving node_id_handshake: %1%") % ec.message ()));
}
}
void nano::bootstrap_server::add_request (std::unique_ptr<nano::message> message_a)
{
assert (message_a != nullptr);
nano::lock_guard<std::mutex> lock (mutex);
auto start (requests.empty ());
requests.push (std::move (message_a));
if (start)
{
run_next ();
}
}
void nano::bootstrap_server::finish_request ()
{
nano::lock_guard<std::mutex> lock (mutex);
requests.pop ();
if (!requests.empty ())
{
run_next ();
}
else
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();
}
});
}
}
void nano::bootstrap_server::finish_request_async ()
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->background ([this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->finish_request ();
}
});
}
void nano::bootstrap_server::timeout ()
{
if (socket != nullptr)
{
if (socket->has_timed_out ())
{
if (node->config.logging.bulk_pull_logging ())
{
node->logger.try_log ("Closing incoming tcp / bootstrap server by timeout");
}
{
nano::lock_guard<std::mutex> lock (node->bootstrap.mutex);
node->bootstrap.connections.erase (this);
}
socket->close ();
}
}
else
{
nano::lock_guard<std::mutex> lock (node->bootstrap.mutex);
node->bootstrap.connections.erase (this);
}
}
namespace
{
class request_response_visitor : public nano::message_visitor
{
public:
explicit request_response_visitor (std::shared_ptr<nano::bootstrap_server> const & connection_a) :
connection (connection_a)
{
}
virtual ~request_response_visitor () = default;
void keepalive (nano::keepalive const & message_a) override
{
bool first_keepalive (connection->keepalive_first);
if (first_keepalive)
{
connection->keepalive_first = false;
}
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a, first_keepalive]() {
connection_l->node->network.tcp_channels.process_keepalive (message_a, connection_l->remote_endpoint, first_keepalive);
});
}
void publish (nano::publish const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void confirm_req (nano::confirm_req const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void confirm_ack (nano::confirm_ack const & message_a) override
{
connection->finish_request_async ();
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, connection_l->remote_node_id, connection_l->socket, connection_l->type);
});
}
void bulk_pull (nano::bulk_pull const &) override
{
auto response (std::make_shared<nano::bulk_pull_server> (connection, std::unique_ptr<nano::bulk_pull> (static_cast<nano::bulk_pull *> (connection->requests.front ().release ()))));
response->send_next ();
}
void bulk_pull_account (nano::bulk_pull_account const &) override
{
auto response (std::make_shared<nano::bulk_pull_account_server> (connection, std::unique_ptr<nano::bulk_pull_account> (static_cast<nano::bulk_pull_account *> (connection->requests.front ().release ()))));
response->send_frontier ();
}
void bulk_push (nano::bulk_push const &) override
{
auto response (std::make_shared<nano::bulk_push_server> (connection));
response->throttled_receive ();
}
void frontier_req (nano::frontier_req const &) override
{
auto response (std::make_shared<nano::frontier_req_server> (connection, std::unique_ptr<nano::frontier_req> (static_cast<nano::frontier_req *> (connection->requests.front ().release ()))));
response->send_next ();
}
void node_id_handshake (nano::node_id_handshake const & message_a) override
{
if (connection->node->config.logging.network_node_id_handshake_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Received node_id_handshake message from %1%") % connection->remote_endpoint));
}
if (message_a.query)
{
boost::optional<std::pair<nano::account, nano::signature>> response (std::make_pair (connection->node->node_id.pub, nano::sign_message (connection->node->node_id.prv, connection->node->node_id.pub, *message_a.query)));
assert (!nano::validate_message (response->first, *message_a.query, response->second));
auto cookie (connection->node->network.syn_cookies.assign (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint)));
nano::node_id_handshake response_message (cookie, response);
auto shared_const_buffer = response_message.to_shared_const_buffer ();
// clang-format off
connection->socket->async_write (shared_const_buffer, [connection = connection ](boost::system::error_code const & ec, size_t size_a) {
if (ec)
{
if (connection->node->config.logging.network_node_id_handshake_logging ())
{
connection->node->logger.try_log (boost::str (boost::format ("Error sending node_id_handshake to %1%: %2%") % connection->remote_endpoint % ec.message ()));
}
// Stop invalid handshake
connection->stop ();
}
else
{
connection->node->stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::out);
connection->finish_request ();
}
});
// clang-format on
}
else if (message_a.response)
{
nano::account const & node_id (message_a.response->first);
if (!connection->node->network.syn_cookies.validate (nano::transport::map_tcp_to_endpoint (connection->remote_endpoint), node_id, message_a.response->second) && node_id != connection->node->node_id.pub)
{
connection->remote_node_id = node_id;
connection->type = nano::bootstrap_server_type::realtime;
++connection->node->bootstrap.realtime_count;
connection->finish_request_async ();
}
else
{
// Stop invalid handshake
connection->stop ();
}
}
else
{
connection->finish_request_async ();
}
nano::account node_id (connection->remote_node_id);
nano::bootstrap_server_type type (connection->type);
assert (node_id.is_zero () || type == nano::bootstrap_server_type::realtime);
auto connection_l (connection->shared_from_this ());
connection->node->background ([connection_l, message_a, node_id, type]() {
connection_l->node->network.tcp_channels.process_message (message_a, connection_l->remote_endpoint, node_id, connection_l->socket, type);
});
}
std::shared_ptr<nano::bootstrap_server> connection;
};
}
void nano::bootstrap_server::run_next ()
{
assert (!requests.empty ());
request_response_visitor visitor (shared_from_this ());
requests.front ()->visit (visitor);
}
bool nano::bootstrap_server::is_bootstrap_connection ()
{
if (type == nano::bootstrap_server_type::undefined && !node->flags.disable_bootstrap_listener && node->bootstrap.bootstrap_count < node->config.bootstrap_connections_max)
{
++node->bootstrap.bootstrap_count;
type = nano::bootstrap_server_type::bootstrap;
}
return type == nano::bootstrap_server_type::bootstrap;
}

View file

@ -0,0 +1,78 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
#include <atomic>
#include <queue>
namespace nano
{
class bootstrap_server;
class bootstrap_listener final
{
public:
bootstrap_listener (uint16_t, nano::node &);
void start ();
void stop ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::socket>);
size_t connection_count ();
std::mutex mutex;
std::unordered_map<nano::bootstrap_server *, std::weak_ptr<nano::bootstrap_server>> connections;
nano::tcp_endpoint endpoint ();
nano::node & node;
std::shared_ptr<nano::server_socket> listening_socket;
bool on;
std::atomic<size_t> bootstrap_count{ 0 };
std::atomic<size_t> realtime_count{ 0 };
private:
uint16_t port;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name);
class message;
enum class bootstrap_server_type
{
undefined,
bootstrap,
realtime,
realtime_response_server // special type for tcp channel response server
};
class bootstrap_server final : public std::enable_shared_from_this<nano::bootstrap_server>
{
public:
bootstrap_server (std::shared_ptr<nano::socket>, std::shared_ptr<nano::node>);
~bootstrap_server ();
void stop ();
void receive ();
void receive_header_action (boost::system::error_code const &, size_t);
void receive_bulk_pull_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_bulk_pull_account_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_frontier_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_keepalive_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_publish_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_confirm_req_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_confirm_ack_action (boost::system::error_code const &, size_t, nano::message_header const &);
void receive_node_id_handshake_action (boost::system::error_code const &, size_t, nano::message_header const &);
void add_request (std::unique_ptr<nano::message>);
void finish_request ();
void finish_request_async ();
void run_next ();
void timeout ();
bool is_bootstrap_connection ();
std::shared_ptr<std::vector<uint8_t>> receive_buffer;
std::shared_ptr<nano::socket> socket;
std::shared_ptr<nano::node> node;
std::mutex mutex;
std::queue<std::unique_ptr<nano::message>> requests;
std::atomic<bool> stopped{ false };
std::atomic<nano::bootstrap_server_type> type{ nano::bootstrap_server_type::undefined };
std::atomic<bool> keepalive_first{ true };
// Remote enpoint used to remove response channel even after socket closing
nano::tcp_endpoint remote_endpoint{ boost::asio::ip::address_v6::any (), 0 };
nano::account remote_node_id{ 0 };
};
}

View file

@ -7,7 +7,11 @@
#include <nano/lib/work.hpp>
#include <nano/node/active_transactions.hpp>
#include <nano/node/blockprocessor.hpp>
#include <nano/node/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap.hpp>
#include <nano/node/bootstrap/bootstrap_bulk_pull.hpp>
#include <nano/node/bootstrap/bootstrap_bulk_push.hpp>
#include <nano/node/bootstrap/bootstrap_frontier.hpp>
#include <nano/node/bootstrap/bootstrap_server.hpp>
#include <nano/node/confirmation_height_processor.hpp>
#include <nano/node/distributed_work.hpp>
#include <nano/node/election.hpp>