diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 02ed68ce..f1be9fb8 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -354,7 +354,7 @@ TEST (receivable_processor, confirm_insufficient_pos) nano::keypair key1; auto vote (std::make_shared (key1.pub, key1.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, confirm_sufficient_pos) @@ -369,7 +369,7 @@ TEST (receivable_processor, confirm_sufficient_pos) node1.active.start (block1); auto vote (std::make_shared (nano::test_genesis_key.pub, nano::test_genesis_key.prv, 0, block1)); nano::confirm_ack con1 (vote); - node1.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ())); } TEST (receivable_processor, send_with_receive) @@ -2047,17 +2047,17 @@ TEST (confirmation_height, conflict_rollback_cemented) auto send2 (std::make_shared (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); nano::publish publish2 (send2); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.process_message (publish1, channel1); + node1.network.process_message (publish1, channel1); node1.block_processor.flush (); auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ())); - node2.process_message (publish2, channel2); + node2.network.process_message (publish2, channel2); node2.block_processor.flush (); ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - node1.process_message (publish2, channel1); + node1.network.process_message (publish2, channel1); node1.block_processor.flush (); - node2.process_message (publish1, channel2); + node2.network.process_message (publish1, channel2); node2.block_processor.flush (); std::unique_lock lock (node2.active.mutex); auto conflict (node2.active.roots.find (nano::qualified_root (genesis.hash (), genesis.hash ()))); diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index c4a9665d..9fe9ceed 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -289,7 +289,7 @@ TEST (node, receive_gap) auto block (std::make_shared (5, 1, 2, nano::keypair ().prv, 4, 0)); node1.work_generate_blocking (*block); nano::publish message (block); - node1.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); ASSERT_EQ (1, node1.gap_cache.size ()); } @@ -1067,17 +1067,17 @@ TEST (node, fork_flip) auto send2 (std::make_shared (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); nano::publish publish2 (send2); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.process_message (publish1, channel1); + node1.network.process_message (publish1, channel1); node1.block_processor.flush (); auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ())); - node2.process_message (publish2, channel2); + node2.network.process_message (publish2, channel2); node2.block_processor.flush (); ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (1, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - node1.process_message (publish2, channel1); + node1.network.process_message (publish2, channel1); node1.block_processor.flush (); - node2.process_message (publish1, channel2); + node2.network.process_message (publish1, channel2); node2.block_processor.flush (); std::unique_lock lock (node2.active.mutex); auto conflict (node2.active.roots.find (nano::qualified_root (genesis.hash (), genesis.hash ()))); @@ -1127,18 +1127,18 @@ TEST (node, fork_multi_flip) nano::publish publish2 (send2); auto send3 (std::make_shared (publish2.block->hash (), key2.pub, nano::genesis_amount - 100, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (publish2.block->hash ()))); nano::publish publish3 (send3); - node1.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); - node2.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); - node2.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ())); node2.block_processor.flush (); ASSERT_EQ (1, node1.active.size ()); ASSERT_EQ (2, node2.active.size ()); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - node1.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); - node1.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ())); + node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ())); node1.block_processor.flush (); - node2.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); + node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ())); node2.block_processor.flush (); std::unique_lock lock (node2.active.mutex); auto conflict (node2.active.roots.find (nano::qualified_root (genesis.hash (), genesis.hash ()))); @@ -1227,17 +1227,17 @@ TEST (node, fork_open) auto send1 (std::make_shared (genesis.hash (), key1.pub, 0, nano::test_genesis_key.prv, nano::test_genesis_key.pub, system.work.generate (genesis.hash ()))); nano::publish publish1 (send1); auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ())); - node1.process_message (publish1, channel1); + node1.network.process_message (publish1, channel1); node1.block_processor.flush (); auto open1 (std::make_shared (publish1.block->hash (), 1, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); nano::publish publish2 (open1); - node1.process_message (publish2, channel1); + node1.network.process_message (publish2, channel1); node1.block_processor.flush (); auto open2 (std::make_shared (publish1.block->hash (), 2, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); nano::publish publish3 (open2); ASSERT_EQ (2, node1.active.size ()); system.wallet (0)->insert_adhoc (nano::test_genesis_key.prv); - node1.process_message (publish3, channel1); + node1.network.process_message (publish3, channel1); node1.block_processor.flush (); } @@ -2118,8 +2118,8 @@ TEST (node, local_votes_cache) auto channel (node.network.udp_channels.create (node.network.endpoint ())); for (auto i (0); i < 100; ++i) { - node.process_message (message1, channel); - node.process_message (message2, channel); + node.network.process_message (message1, channel); + node.network.process_message (message2, channel); } { std::lock_guard lock (boost::polymorphic_downcast (node.store_impl.get ())->cache_mutex); @@ -2135,7 +2135,7 @@ TEST (node, local_votes_cache) nano::confirm_req message3 (send3); for (auto i (0); i < 100; ++i) { - node.process_message (message3, channel); + node.network.process_message (message3, channel); } { std::lock_guard lock (boost::polymorphic_downcast (node.store_impl.get ())->cache_mutex); diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index 96c2d21e..14a5e299 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -44,6 +44,8 @@ add_library (node lmdb_txn_tracker.cpp logging.hpp logging.cpp + network.hpp + network.cpp nodeconfig.hpp nodeconfig.cpp node_observers.hpp diff --git a/nano/node/network.cpp b/nano/node/network.cpp new file mode 100644 index 00000000..6263b265 --- /dev/null +++ b/nano/node/network.cpp @@ -0,0 +1,757 @@ +#include + +#include + +#include +#include + +nano::network::network (nano::node & node_a, uint16_t port_a) : +buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer +resolver (node_a.io_ctx), +node (node_a), +udp_channels (node_a, port_a), +tcp_channels (node_a), +disconnect_observer ([]() {}) +{ + boost::thread::attributes attrs; + nano::thread_attributes::set (attrs); + for (size_t i = 0; i < node.config.network_threads; ++i) + { + packet_processing_threads.push_back (boost::thread (attrs, [this]() { + nano::thread_role::set (nano::thread_role::name::packet_processing); + try + { + udp_channels.process_packets (); + } + catch (boost::system::error_code & ec) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::error_code & ec) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); + release_assert (false); + } + catch (std::runtime_error & err) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); + release_assert (false); + } + catch (...) + { + this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); + release_assert (false); + } + if (this->node.config.logging.network_packet_logging ()) + { + this->node.logger.try_log ("Exiting packet processing thread"); + } + })); + } +} + +nano::network::~network () +{ + for (auto & thread : packet_processing_threads) + { + thread.join (); + } +} + +void nano::network::start () +{ + ongoing_cleanup (); + udp_channels.start (); + tcp_channels.start (); +} + +void nano::network::stop () +{ + udp_channels.stop (); + tcp_channels.stop (); + resolver.cancel (); + buffer_container.stop (); +} + +void nano::network::send_keepalive (std::shared_ptr channel_a) +{ + nano::keepalive message; + random_fill (message.peers); + channel_a->send (message); +} + +void nano::network::send_keepalive_self (std::shared_ptr channel_a) +{ + nano::keepalive message; + if (node.config.external_address != boost::asio::ip::address_v6{} && node.config.external_port != 0) + { + message.peers[0] = nano::endpoint (node.config.external_address, node.config.external_port); + } + else + { + auto external_address (node.port_mapping.external_address ()); + if (external_address.address () != boost::asio::ip::address_v4::any ()) + { + message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); + message.peers[1] = external_address; + } + else + { + message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); + } + } + channel_a->send (message); +} + +void nano::network::send_node_id_handshake (std::shared_ptr channel_a, boost::optional const & query, boost::optional const & respond_to) +{ + boost::optional> response (boost::none); + if (respond_to) + { + response = std::make_pair (node.node_id.pub, nano::sign_message (node.node_id.prv, node.node_id.pub, *respond_to)); + assert (!nano::validate_message (response->first, *respond_to, response->second)); + } + nano::node_id_handshake message (query, response); + if (node.config.logging.network_node_id_handshake_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Node ID handshake sent with node ID %1% to %2%: query %3%, respond_to %4% (signature %5%)") % node.node_id.pub.to_account () % channel_a->get_endpoint () % (query ? query->to_string () : std::string ("[none]")) % (respond_to ? respond_to->to_string () : std::string ("[none]")) % (response ? response->second.to_string () : std::string ("[none]")))); + } + channel_a->send (message); +} + +template +bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, T & list_a, std::shared_ptr block_a, bool also_publish) +{ + bool result (false); + if (node_a.config.enable_voting) + { + auto hash (block_a->hash ()); + // Search in cache + auto votes (node_a.votes_cache.find (hash)); + if (votes.empty ()) + { + // Generate new vote + node_a.wallets.foreach_representative (transaction_a, [&result, &list_a, &node_a, &transaction_a, &hash](nano::public_key const & pub_a, nano::raw_key const & prv_a) { + result = true; + auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, std::vector (1, hash))); + nano::confirm_ack confirm (vote); + auto vote_bytes = confirm.to_bytes (); + for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) + { + j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + } + node_a.votes_cache.add (vote); + }); + } + else + { + // Send from cache + for (auto & vote : votes) + { + nano::confirm_ack confirm (vote); + auto vote_bytes = confirm.to_bytes (); + for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) + { + j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + } + } + } + // Republish if required + if (also_publish) + { + nano::publish publish (block_a); + auto publish_bytes (publish.to_bytes ()); + for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) + { + j->get ()->send_buffer (publish_bytes, nano::stat::detail::publish); + } + } + } + return result; +} + +bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, std::shared_ptr channel_a, std::shared_ptr block_a, bool also_publish) +{ + std::array, 1> endpoints = { channel_a }; + auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a), also_publish)); + return result; +} + +void nano::network::confirm_hashes (nano::transaction const & transaction_a, std::shared_ptr channel_a, std::vector blocks_bundle_a) +{ + if (node.config.enable_voting) + { + node.wallets.foreach_representative (transaction_a, [this, &blocks_bundle_a, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) { + auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, blocks_bundle_a)); + nano::confirm_ack confirm (vote); + std::shared_ptr> bytes (new std::vector); + { + nano::vectorstream stream (*bytes); + confirm.serialize (stream); + } + channel_a->send_buffer (bytes, nano::stat::detail::confirm_ack); + this->node.votes_cache.add (vote); + }); + } +} + +bool nano::network::send_votes_cache (std::shared_ptr channel_a, nano::block_hash const & hash_a) +{ + // Search in cache + auto votes (node.votes_cache.find (hash_a)); + // Send from cache + for (auto & vote : votes) + { + nano::confirm_ack confirm (vote); + auto vote_bytes = confirm.to_bytes (); + channel_a->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); + } + // Returns true if votes were sent + bool result (!votes.empty ()); + return result; +} + +void nano::network::flood_message (nano::message const & message_a) +{ + auto list (list_fanout ()); + for (auto i (list.begin ()), n (list.end ()); i != n; ++i) + { + (*i)->send (message_a); + } +} + +void nano::network::flood_block_batch (std::deque> blocks_a, unsigned delay_a) +{ + auto block (blocks_a.front ()); + blocks_a.pop_front (); + flood_block (block); + if (!blocks_a.empty ()) + { + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.flood_block_batch (blocks_a, delay_a); + } + }); + } +} + +void nano::network::broadcast_confirm_req (std::shared_ptr block_a) +{ + auto list (std::make_shared>> (node.rep_crawler.representative_endpoints (std::numeric_limits::max ()))); + if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()) + { + // broadcast request to all peers (with max limit 2 * sqrt (peers count)) + auto peers (node.network.list (std::min (static_cast (100), 2 * node.network.size_sqrt ()))); + list->clear (); + for (auto & peer : peers) + { + list->push_back (peer); + } + } + + /* + * In either case (broadcasting to all representatives, or broadcasting to + * all peers because there are not enough connected representatives), + * limit each instance to a single random up-to-32 selection. The invoker + * of "broadcast_confirm_req" will be responsible for calling it again + * if the votes for a block have not arrived in time. + */ + const size_t max_endpoints = 32; + random_pool::shuffle (list->begin (), list->end ()); + if (list->size () > max_endpoints) + { + list->erase (list->begin () + max_endpoints, list->end ()); + } + + broadcast_confirm_req_base (block_a, list, 0); +} + +void nano::network::broadcast_confirm_req_base (std::shared_ptr block_a, std::shared_ptr>> endpoints_a, unsigned delay_a, bool resumption) +{ + const size_t max_reps = 10; + if (!resumption && node.config.logging.network_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Broadcasting confirm req for block %1% to %2% representatives") % block_a->hash ().to_string () % endpoints_a->size ())); + } + auto count (0); + while (!endpoints_a->empty () && count < max_reps) + { + nano::confirm_req req (block_a); + auto channel (endpoints_a->back ()); + channel->send (req); + endpoints_a->pop_back (); + count++; + } + if (!endpoints_a->empty ()) + { + delay_a += std::rand () % broadcast_interval_ms; + + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, block_a, endpoints_a, delay_a]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.broadcast_confirm_req_base (block_a, endpoints_a, delay_a, true); + } + }); + } +} + +void nano::network::broadcast_confirm_req_batch (std::unordered_map, std::vector>> request_bundle_a, unsigned delay_a, bool resumption) +{ + const size_t max_reps = 10; + if (!resumption && node.config.logging.network_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ())); + } + auto count (0); + while (!request_bundle_a.empty () && count < max_reps) + { + auto j (request_bundle_a.begin ()); + count++; + std::vector> roots_hashes; + // Limit max request size hash + root to 6 pairs + while (roots_hashes.size () <= confirm_req_hashes_max && !j->second.empty ()) + { + roots_hashes.push_back (j->second.back ()); + j->second.pop_back (); + } + nano::confirm_req req (roots_hashes); + j->first->send (req); + if (j->second.empty ()) + { + request_bundle_a.erase (j); + } + } + if (!request_bundle_a.empty ()) + { + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, delay_a]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.broadcast_confirm_req_batch (request_bundle_a, delay_a + 50, true); + } + }); + } +} + +void nano::network::broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>> deque_a, unsigned delay_a) +{ + auto pair (deque_a.front ()); + deque_a.pop_front (); + auto block (pair.first); + // confirm_req to representatives + auto endpoints (pair.second); + if (!endpoints->empty ()) + { + broadcast_confirm_req_base (block, endpoints, delay_a); + } + /* Continue while blocks remain + Broadcast with random delay between delay_a & 2*delay_a */ + if (!deque_a.empty ()) + { + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.broadcast_confirm_req_batch (deque_a, delay_a); + } + }); + } +} + +namespace +{ +class network_message_visitor : public nano::message_visitor +{ +public: + network_message_visitor (nano::node & node_a, std::shared_ptr channel_a) : + node (node_a), + channel (channel_a) + { + } + void keepalive (nano::keepalive const & message_a) override + { + if (node.config.logging.network_keepalive_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Received keepalive message from %1%") % channel->to_string ())); + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); + node.network.merge_peers (message_a.peers); + } + void publish (nano::publish const & message_a) override + { + if (node.config.logging.network_message_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Publish message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in); + if (!node.block_processor.full ()) + { + node.process_active (message_a.block); + } + node.active.publish (message_a.block); + } + void confirm_req (nano::confirm_req const & message_a) override + { + if (node.config.logging.network_message_logging ()) + { + if (!message_a.roots_hashes.empty ()) + { + node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for hashes:roots %2%") % channel->to_string () % message_a.roots_string ())); + } + else + { + node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); + } + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in); + // Don't load nodes with disabled voting + if (node.config.enable_voting && node.wallets.reps_count) + { + if (message_a.block != nullptr) + { + auto hash (message_a.block->hash ()); + if (!node.network.send_votes_cache (channel, hash)) + { + auto transaction (node.store.tx_begin_read ()); + auto successor (node.ledger.successor (transaction, message_a.block->qualified_root ())); + if (successor != nullptr) + { + auto same_block (successor->hash () == hash); + confirm_block (transaction, node, channel, std::move (successor), !same_block); + } + } + } + else if (!message_a.roots_hashes.empty ()) + { + auto transaction (node.store.tx_begin_read ()); + std::vector blocks_bundle; + for (auto & root_hash : message_a.roots_hashes) + { + if (!node.network.send_votes_cache (channel, root_hash.first) && node.store.block_exists (transaction, root_hash.first)) + { + blocks_bundle.push_back (root_hash.first); + } + else + { + nano::block_hash successor (0); + // Search for block root + successor = node.store.block_successor (transaction, root_hash.second); + // Search for account root + if (successor.is_zero () && node.store.account_exists (transaction, root_hash.second)) + { + nano::account_info info; + auto error (node.store.account_get (transaction, root_hash.second, info)); + assert (!error); + successor = info.open_block; + } + if (!successor.is_zero ()) + { + if (!node.network.send_votes_cache (channel, successor)) + { + blocks_bundle.push_back (successor); + } + auto successor_block (node.store.block_get (transaction, successor)); + assert (successor_block != nullptr); + nano::publish publish (successor_block); + channel->send (publish); + } + } + } + if (!blocks_bundle.empty ()) + { + node.network.confirm_hashes (transaction, channel, blocks_bundle); + } + } + } + } + void confirm_ack (nano::confirm_ack const & message_a) override + { + if (node.config.logging.network_message_logging ()) + { + node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2%sequence %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->sequence))); + } + node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in); + for (auto & vote_block : message_a.vote->blocks) + { + if (!vote_block.which ()) + { + auto block (boost::get> (vote_block)); + if (!node.block_processor.full ()) + { + node.process_active (block); + } + node.active.publish (block); + } + } + node.vote_processor.vote (message_a.vote, channel); + } + void bulk_pull (nano::bulk_pull const &) override + { + assert (false); + } + void bulk_pull_account (nano::bulk_pull_account const &) override + { + assert (false); + } + void bulk_push (nano::bulk_push const &) override + { + assert (false); + } + void frontier_req (nano::frontier_req const &) override + { + assert (false); + } + void node_id_handshake (nano::node_id_handshake const & message_a) override + { + node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); + } + nano::node & node; + std::shared_ptr channel; +}; +} + +void nano::network::process_message (nano::message const & message_a, std::shared_ptr channel_a) +{ + network_message_visitor visitor (node, channel_a); + message_a.visit (visitor); +} + +// Send keepalives to all the peers we've been notified of +void nano::network::merge_peers (std::array const & peers_a) +{ + for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i) + { + merge_peer (*i); + } +} + +void nano::network::merge_peer (nano::endpoint const & peer_a) +{ + if (!reachout (peer_a, node.config.allow_local_peers)) + { + std::weak_ptr node_w (node.shared ()); + node.network.tcp_channels.start_tcp (peer_a, [node_w](std::shared_ptr channel_a) { + if (auto node_l = node_w.lock ()) + { + node_l->network.send_keepalive (channel_a); + } + }); + } +} + +bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) +{ + bool result (false); + if (endpoint_a.address ().to_v6 ().is_unspecified ()) + { + result = true; + } + else if (nano::transport::reserved_address (endpoint_a, allow_local_peers)) + { + result = true; + } + else if (endpoint_a == endpoint ()) + { + result = true; + } + return result; +} + +bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers) +{ + // Don't contact invalid IPs + bool error = not_a_peer (endpoint_a, allow_local_peers); + if (!error) + { + error |= udp_channels.reachout (endpoint_a); + error |= tcp_channels.reachout (endpoint_a); + } + return error; +} + +std::deque> nano::network::list (size_t count_a) +{ + std::deque> result; + tcp_channels.list (result); + udp_channels.list (result); + random_pool::shuffle (result.begin (), result.end ()); + if (result.size () > count_a) + { + result.resize (count_a, nullptr); + } + return result; +} + +// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability +std::deque> nano::network::list_fanout () +{ + auto result (list (size_sqrt ())); + return result; +} + +std::unordered_set> nano::network::random_set (size_t count_a) const +{ + std::unordered_set> result (tcp_channels.random_set (count_a)); + std::unordered_set> udp_random (udp_channels.random_set (count_a)); + for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i) + { + result.insert (*i); + } + while (result.size () > count_a) + { + result.erase (result.begin ()); + } + return result; +} + +void nano::network::random_fill (std::array & target_a) const +{ + auto peers (random_set (target_a.size ())); + assert (peers.size () <= target_a.size ()); + auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); + assert (endpoint.address ().is_v6 ()); + std::fill (target_a.begin (), target_a.end (), endpoint); + auto j (target_a.begin ()); + for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i, ++j) + { + assert ((*i)->get_endpoint ().address ().is_v6 ()); + assert (j < target_a.end ()); + *j = (*i)->get_endpoint (); + } +} + +nano::tcp_endpoint nano::network::bootstrap_peer () +{ + auto result (udp_channels.bootstrap_peer ()); + if (result == nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0)) + { + result = tcp_channels.bootstrap_peer (); + } + return result; +} + +std::shared_ptr nano::network::find_channel (nano::endpoint const & endpoint_a) +{ + std::shared_ptr result (tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint_a))); + if (!result) + { + result = udp_channels.channel (endpoint_a); + } + return result; +} + +std::shared_ptr nano::network::find_node_id (nano::account const & node_id_a) +{ + std::shared_ptr result (tcp_channels.find_node_id (node_id_a)); + if (!result) + { + result = udp_channels.find_node_id (node_id_a); + } + return result; +} + +void nano::network::add_response_channels (nano::tcp_endpoint const & endpoint_a, std::vector insert_channels) +{ + std::lock_guard lock (response_channels_mutex); + response_channels.emplace (endpoint_a, insert_channels); +} + +std::shared_ptr nano::network::search_response_channel (nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a) +{ + // Search by node ID + std::shared_ptr result (find_node_id (node_id_a)); + if (!result) + { + // Search in response channels + std::unique_lock lock (response_channels_mutex); + auto existing (response_channels.find (endpoint_a)); + if (existing != response_channels.end ()) + { + auto channels_list (existing->second); + lock.unlock (); + // TCP + for (auto & i : channels_list) + { + auto search_channel (tcp_channels.find_channel (i)); + if (search_channel != nullptr) + { + result = search_channel; + break; + } + } + // UDP + if (!result) + { + for (auto & i : channels_list) + { + auto udp_endpoint (nano::transport::map_tcp_to_endpoint (i)); + auto search_channel (udp_channels.channel (udp_endpoint)); + if (search_channel != nullptr) + { + result = search_channel; + break; + } + } + } + } + } + return result; +} + +void nano::network::remove_response_channel (nano::tcp_endpoint const & endpoint_a) +{ + std::lock_guard lock (response_channels_mutex); + response_channels.erase (endpoint_a); +} + +size_t nano::network::response_channels_size () +{ + std::lock_guard lock (response_channels_mutex); + return response_channels.size (); +} + +nano::endpoint nano::network::endpoint () +{ + return udp_channels.get_local_endpoint (); +} + +void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) +{ + tcp_channels.purge (cutoff_a); + udp_channels.purge (cutoff_a); + if (node.network.empty ()) + { + disconnect_observer (); + } +} + +void nano::network::ongoing_cleanup () +{ + cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff); + std::weak_ptr node_w (node.shared ()); + node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() { + if (auto node_l = node_w.lock ()) + { + node_l->network.ongoing_cleanup (); + } + }); +} + +size_t nano::network::size () const +{ + return tcp_channels.size () + udp_channels.size (); +} + +size_t nano::network::size_sqrt () const +{ + return (static_cast (std::ceil (std::sqrt (size ())))); +} + +bool nano::network::empty () const +{ + return size () == 0; +} \ No newline at end of file diff --git a/nano/node/network.hpp b/nano/node/network.hpp new file mode 100644 index 00000000..b99c528e --- /dev/null +++ b/nano/node/network.hpp @@ -0,0 +1,139 @@ +#pragma once + +#include +#include +#include + +#include +#include + +#include +#include + +namespace nano +{ +class channel; +class node; +class stats; +class transaction; +class message_buffer final +{ +public: + uint8_t * buffer{ nullptr }; + size_t size{ 0 }; + nano::endpoint endpoint; +}; +/** + * A circular buffer for servicing nano realtime messages. + * This container follows a producer/consumer model where the operating system is producing data in to + * buffers which are serviced by internal threads. + * If buffers are not serviced fast enough they're internally dropped. + * This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order. + * All public methods are thread-safe +*/ +class message_buffer_manager final +{ +public: + // Stats - Statistics + // Size - Size of each individual buffer + // Count - Number of buffers to allocate + message_buffer_manager (nano::stat & stats, size_t, size_t); + // Return a buffer where message data can be put + // Method will attempt to return the first free buffer + // If there are no free buffers, an unserviced buffer will be dequeued and returned + // Function will block if there are no free or unserviced buffers + // Return nullptr if the container has stopped + nano::message_buffer * allocate (); + // Queue a buffer that has been filled with message data and notify servicing threads + void enqueue (nano::message_buffer *); + // Return a buffer that has been filled with message data + // Function will block until a buffer has been added + // Return nullptr if the container has stopped + nano::message_buffer * dequeue (); + // Return a buffer to the freelist after is has been serviced + void release (nano::message_buffer *); + // Stop container and notify waiting threads + void stop (); + +private: + nano::stat & stats; + std::mutex mutex; + std::condition_variable condition; + boost::circular_buffer free; + boost::circular_buffer full; + std::vector slab; + std::vector entries; + bool stopped; +}; +class network final +{ +public: + network (nano::node &, uint16_t); + ~network (); + void start (); + void stop (); + void flood_message (nano::message const &); + void flood_vote (std::shared_ptr vote_a) + { + nano::confirm_ack message (vote_a); + flood_message (message); + } + void flood_block (std::shared_ptr block_a) + { + nano::publish publish (block_a); + flood_message (publish); + } + void flood_block_batch (std::deque>, unsigned = broadcast_interval_ms); + void merge_peers (std::array const &); + void merge_peer (nano::endpoint const &); + void send_keepalive (std::shared_ptr); + void send_keepalive_self (std::shared_ptr); + void send_node_id_handshake (std::shared_ptr, boost::optional const & query, boost::optional const & respond_to); + void broadcast_confirm_req (std::shared_ptr); + void broadcast_confirm_req_base (std::shared_ptr, std::shared_ptr>>, unsigned, bool = false); + void broadcast_confirm_req_batch (std::unordered_map, std::vector>>, unsigned = broadcast_interval_ms, bool = false); + void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>>, unsigned = broadcast_interval_ms); + void confirm_hashes (nano::transaction const &, std::shared_ptr, std::vector); + bool send_votes_cache (std::shared_ptr, nano::block_hash const &); + std::shared_ptr find_node_id (nano::account const &); + std::shared_ptr find_channel (nano::endpoint const &); + void process_message (nano::message const &, std::shared_ptr); + bool not_a_peer (nano::endpoint const &, bool); + // Should we reach out to this endpoint with a keepalive message + bool reachout (nano::endpoint const &, bool = false); + std::deque> list (size_t); + // A list of random peers sized for the configured rebroadcast fanout + std::deque> list_fanout (); + void random_fill (std::array &) const; + std::unordered_set> random_set (size_t) const; + // Get the next peer for attempting a tcp bootstrap connection + nano::tcp_endpoint bootstrap_peer (); + // Response channels + void add_response_channels (nano::tcp_endpoint const &, std::vector); + std::shared_ptr search_response_channel (nano::tcp_endpoint const &, nano::account const &); + void remove_response_channel (nano::tcp_endpoint const &); + size_t response_channels_size (); + nano::endpoint endpoint (); + void cleanup (std::chrono::steady_clock::time_point const &); + void ongoing_cleanup (); + size_t size () const; + size_t size_sqrt () const; + bool empty () const; + nano::message_buffer_manager buffer_container; + boost::asio::ip::udp::resolver resolver; + std::vector packet_processing_threads; + nano::node & node; + nano::transport::udp_channels udp_channels; + nano::transport::tcp_channels tcp_channels; + std::function disconnect_observer; + // Called when a new channel is observed + std::function)> channel_observer; + static unsigned const broadcast_interval_ms = 10; + static size_t const buffer_size = 512; + static size_t const confirm_req_hashes_max = 6; + +private: + std::mutex response_channels_mutex; + std::unordered_map> response_channels; +}; +} diff --git a/nano/node/node.cpp b/nano/node/node.cpp index b71364f9..43d015e5 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -28,105 +28,6 @@ extern unsigned char nano_bootstrap_weights_beta[]; extern size_t nano_bootstrap_weights_beta_size; } -nano::network::network (nano::node & node_a, uint16_t port_a) : -buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer -resolver (node_a.io_ctx), -node (node_a), -udp_channels (node_a, port_a), -tcp_channels (node_a), -disconnect_observer ([]() {}) -{ - boost::thread::attributes attrs; - nano::thread_attributes::set (attrs); - for (size_t i = 0; i < node.config.network_threads; ++i) - { - packet_processing_threads.push_back (boost::thread (attrs, [this]() { - nano::thread_role::set (nano::thread_role::name::packet_processing); - try - { - udp_channels.process_packets (); - } - catch (boost::system::error_code & ec) - { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); - release_assert (false); - } - catch (std::error_code & ec) - { - this->node.logger.try_log (FATAL_LOG_PREFIX, ec.message ()); - release_assert (false); - } - catch (std::runtime_error & err) - { - this->node.logger.try_log (FATAL_LOG_PREFIX, err.what ()); - release_assert (false); - } - catch (...) - { - this->node.logger.try_log (FATAL_LOG_PREFIX, "Unknown exception"); - release_assert (false); - } - if (this->node.config.logging.network_packet_logging ()) - { - this->node.logger.try_log ("Exiting packet processing thread"); - } - })); - } -} - -nano::network::~network () -{ - for (auto & thread : packet_processing_threads) - { - thread.join (); - } -} - -void nano::network::start () -{ - ongoing_cleanup (); - udp_channels.start (); - tcp_channels.start (); -} - -void nano::network::stop () -{ - udp_channels.stop (); - tcp_channels.stop (); - resolver.cancel (); - buffer_container.stop (); -} - -void nano::network::send_keepalive (std::shared_ptr channel_a) -{ - nano::keepalive message; - random_fill (message.peers); - channel_a->send (message); -} - -void nano::network::send_keepalive_self (std::shared_ptr channel_a) -{ - nano::keepalive message; - if (node.config.external_address != boost::asio::ip::address_v6{} && node.config.external_port != 0) - { - message.peers[0] = nano::endpoint (node.config.external_address, node.config.external_port); - } - else - { - auto external_address (node.port_mapping.external_address ()); - if (external_address.address () != boost::asio::ip::address_v4::any ()) - { - message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); - message.peers[1] = external_address; - } - else - { - message.peers[0] = nano::endpoint (boost::asio::ip::address_v6{}, endpoint ().port ()); - } - } - channel_a->send (message); -} - void nano::node::keepalive (std::string const & address_a, uint16_t port_a) { auto node_l (shared_from_this ()); @@ -160,610 +61,6 @@ void nano::node::keepalive (std::string const & address_a, uint16_t port_a) }); } -void nano::network::send_node_id_handshake (std::shared_ptr channel_a, boost::optional const & query, boost::optional const & respond_to) -{ - boost::optional> response (boost::none); - if (respond_to) - { - response = std::make_pair (node.node_id.pub, nano::sign_message (node.node_id.prv, node.node_id.pub, *respond_to)); - assert (!nano::validate_message (response->first, *respond_to, response->second)); - } - nano::node_id_handshake message (query, response); - if (node.config.logging.network_node_id_handshake_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Node ID handshake sent with node ID %1% to %2%: query %3%, respond_to %4% (signature %5%)") % node.node_id.pub.to_account () % channel_a->get_endpoint () % (query ? query->to_string () : std::string ("[none]")) % (respond_to ? respond_to->to_string () : std::string ("[none]")) % (response ? response->second.to_string () : std::string ("[none]")))); - } - channel_a->send (message); -} - -template -bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, T & list_a, std::shared_ptr block_a, bool also_publish) -{ - bool result (false); - if (node_a.config.enable_voting) - { - auto hash (block_a->hash ()); - // Search in cache - auto votes (node_a.votes_cache.find (hash)); - if (votes.empty ()) - { - // Generate new vote - node_a.wallets.foreach_representative (transaction_a, [&result, &list_a, &node_a, &transaction_a, &hash](nano::public_key const & pub_a, nano::raw_key const & prv_a) { - result = true; - auto vote (node_a.store.vote_generate (transaction_a, pub_a, prv_a, std::vector (1, hash))); - nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); - for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) - { - j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); - } - node_a.votes_cache.add (vote); - }); - } - else - { - // Send from cache - for (auto & vote : votes) - { - nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); - for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) - { - j->get ()->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); - } - } - } - // Republish if required - if (also_publish) - { - nano::publish publish (block_a); - auto publish_bytes (publish.to_bytes ()); - for (auto j (list_a.begin ()), m (list_a.end ()); j != m; ++j) - { - j->get ()->send_buffer (publish_bytes, nano::stat::detail::publish); - } - } - } - return result; -} - -bool confirm_block (nano::transaction const & transaction_a, nano::node & node_a, std::shared_ptr channel_a, std::shared_ptr block_a, bool also_publish) -{ - std::array, 1> endpoints = { channel_a }; - auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a), also_publish)); - return result; -} - -void nano::network::confirm_hashes (nano::transaction const & transaction_a, std::shared_ptr channel_a, std::vector blocks_bundle_a) -{ - if (node.config.enable_voting) - { - node.wallets.foreach_representative (transaction_a, [this, &blocks_bundle_a, &channel_a, &transaction_a](nano::public_key const & pub_a, nano::raw_key const & prv_a) { - auto vote (this->node.store.vote_generate (transaction_a, pub_a, prv_a, blocks_bundle_a)); - nano::confirm_ack confirm (vote); - std::shared_ptr> bytes (new std::vector); - { - nano::vectorstream stream (*bytes); - confirm.serialize (stream); - } - channel_a->send_buffer (bytes, nano::stat::detail::confirm_ack); - this->node.votes_cache.add (vote); - }); - } -} - -bool nano::network::send_votes_cache (std::shared_ptr channel_a, nano::block_hash const & hash_a) -{ - // Search in cache - auto votes (node.votes_cache.find (hash_a)); - // Send from cache - for (auto & vote : votes) - { - nano::confirm_ack confirm (vote); - auto vote_bytes = confirm.to_bytes (); - channel_a->send_buffer (vote_bytes, nano::stat::detail::confirm_ack); - } - // Returns true if votes were sent - bool result (!votes.empty ()); - return result; -} - -void nano::network::flood_message (nano::message const & message_a) -{ - auto list (list_fanout ()); - for (auto i (list.begin ()), n (list.end ()); i != n; ++i) - { - (*i)->send (message_a); - } -} - -void nano::network::flood_block_batch (std::deque> blocks_a, unsigned delay_a) -{ - auto block (blocks_a.front ()); - blocks_a.pop_front (); - flood_block (block); - if (!blocks_a.empty ()) - { - std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, blocks_a, delay_a]() { - if (auto node_l = node_w.lock ()) - { - node_l->network.flood_block_batch (blocks_a, delay_a); - } - }); - } -} - -void nano::network::broadcast_confirm_req (std::shared_ptr block_a) -{ - auto list (std::make_shared>> (node.rep_crawler.representative_endpoints (std::numeric_limits::max ()))); - if (list->empty () || node.rep_crawler.total_weight () < node.config.online_weight_minimum.number ()) - { - // broadcast request to all peers (with max limit 2 * sqrt (peers count)) - auto peers (node.network.list (std::min (static_cast (100), 2 * node.network.size_sqrt ()))); - list->clear (); - for (auto & peer : peers) - { - list->push_back (peer); - } - } - - /* - * In either case (broadcasting to all representatives, or broadcasting to - * all peers because there are not enough connected representatives), - * limit each instance to a single random up-to-32 selection. The invoker - * of "broadcast_confirm_req" will be responsible for calling it again - * if the votes for a block have not arrived in time. - */ - const size_t max_endpoints = 32; - random_pool::shuffle (list->begin (), list->end ()); - if (list->size () > max_endpoints) - { - list->erase (list->begin () + max_endpoints, list->end ()); - } - - broadcast_confirm_req_base (block_a, list, 0); -} - -void nano::network::broadcast_confirm_req_base (std::shared_ptr block_a, std::shared_ptr>> endpoints_a, unsigned delay_a, bool resumption) -{ - const size_t max_reps = 10; - if (!resumption && node.config.logging.network_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Broadcasting confirm req for block %1% to %2% representatives") % block_a->hash ().to_string () % endpoints_a->size ())); - } - auto count (0); - while (!endpoints_a->empty () && count < max_reps) - { - nano::confirm_req req (block_a); - auto channel (endpoints_a->back ()); - channel->send (req); - endpoints_a->pop_back (); - count++; - } - if (!endpoints_a->empty ()) - { - delay_a += std::rand () % broadcast_interval_ms; - - std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, block_a, endpoints_a, delay_a]() { - if (auto node_l = node_w.lock ()) - { - node_l->network.broadcast_confirm_req_base (block_a, endpoints_a, delay_a, true); - } - }); - } -} - -void nano::network::broadcast_confirm_req_batch (std::unordered_map, std::vector>> request_bundle_a, unsigned delay_a, bool resumption) -{ - const size_t max_reps = 10; - if (!resumption && node.config.logging.network_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Broadcasting batch confirm req to %1% representatives") % request_bundle_a.size ())); - } - auto count (0); - while (!request_bundle_a.empty () && count < max_reps) - { - auto j (request_bundle_a.begin ()); - count++; - std::vector> roots_hashes; - // Limit max request size hash + root to 6 pairs - while (roots_hashes.size () <= confirm_req_hashes_max && !j->second.empty ()) - { - roots_hashes.push_back (j->second.back ()); - j->second.pop_back (); - } - nano::confirm_req req (roots_hashes); - j->first->send (req); - if (j->second.empty ()) - { - request_bundle_a.erase (j); - } - } - if (!request_bundle_a.empty ()) - { - std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a), [node_w, request_bundle_a, delay_a]() { - if (auto node_l = node_w.lock ()) - { - node_l->network.broadcast_confirm_req_batch (request_bundle_a, delay_a + 50, true); - } - }); - } -} - -void nano::network::broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>> deque_a, unsigned delay_a) -{ - auto pair (deque_a.front ()); - deque_a.pop_front (); - auto block (pair.first); - // confirm_req to representatives - auto endpoints (pair.second); - if (!endpoints->empty ()) - { - broadcast_confirm_req_base (block, endpoints, delay_a); - } - /* Continue while blocks remain - Broadcast with random delay between delay_a & 2*delay_a */ - if (!deque_a.empty ()) - { - std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + std::chrono::milliseconds (delay_a + std::rand () % delay_a), [node_w, deque_a, delay_a]() { - if (auto node_l = node_w.lock ()) - { - node_l->network.broadcast_confirm_req_batch (deque_a, delay_a); - } - }); - } -} - -namespace -{ -class network_message_visitor : public nano::message_visitor -{ -public: - network_message_visitor (nano::node & node_a, std::shared_ptr channel_a) : - node (node_a), - channel (channel_a) - { - } - void keepalive (nano::keepalive const & message_a) override - { - if (node.config.logging.network_keepalive_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Received keepalive message from %1%") % channel->to_string ())); - } - node.stats.inc (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in); - node.network.merge_peers (message_a.peers); - } - void publish (nano::publish const & message_a) override - { - if (node.config.logging.network_message_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Publish message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); - } - node.stats.inc (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in); - if (!node.block_processor.full ()) - { - node.process_active (message_a.block); - } - node.active.publish (message_a.block); - } - void confirm_req (nano::confirm_req const & message_a) override - { - if (node.config.logging.network_message_logging ()) - { - if (!message_a.roots_hashes.empty ()) - { - node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for hashes:roots %2%") % channel->to_string () % message_a.roots_string ())); - } - else - { - node.logger.try_log (boost::str (boost::format ("Confirm_req message from %1% for %2%") % channel->to_string () % message_a.block->hash ().to_string ())); - } - } - node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_req, nano::stat::dir::in); - // Don't load nodes with disabled voting - if (node.config.enable_voting && node.wallets.reps_count) - { - if (message_a.block != nullptr) - { - auto hash (message_a.block->hash ()); - if (!node.network.send_votes_cache (channel, hash)) - { - auto transaction (node.store.tx_begin_read ()); - auto successor (node.ledger.successor (transaction, message_a.block->qualified_root ())); - if (successor != nullptr) - { - auto same_block (successor->hash () == hash); - confirm_block (transaction, node, channel, std::move (successor), !same_block); - } - } - } - else if (!message_a.roots_hashes.empty ()) - { - auto transaction (node.store.tx_begin_read ()); - std::vector blocks_bundle; - for (auto & root_hash : message_a.roots_hashes) - { - if (!node.network.send_votes_cache (channel, root_hash.first) && node.store.block_exists (transaction, root_hash.first)) - { - blocks_bundle.push_back (root_hash.first); - } - else - { - nano::block_hash successor (0); - // Search for block root - successor = node.store.block_successor (transaction, root_hash.second); - // Search for account root - if (successor.is_zero () && node.store.account_exists (transaction, root_hash.second)) - { - nano::account_info info; - auto error (node.store.account_get (transaction, root_hash.second, info)); - assert (!error); - successor = info.open_block; - } - if (!successor.is_zero ()) - { - if (!node.network.send_votes_cache (channel, successor)) - { - blocks_bundle.push_back (successor); - } - auto successor_block (node.store.block_get (transaction, successor)); - assert (successor_block != nullptr); - nano::publish publish (successor_block); - channel->send (publish); - } - } - } - if (!blocks_bundle.empty ()) - { - node.network.confirm_hashes (transaction, channel, blocks_bundle); - } - } - } - } - void confirm_ack (nano::confirm_ack const & message_a) override - { - if (node.config.logging.network_message_logging ()) - { - node.logger.try_log (boost::str (boost::format ("Received confirm_ack message from %1% for %2%sequence %3%") % channel->to_string () % message_a.vote->hashes_string () % std::to_string (message_a.vote->sequence))); - } - node.stats.inc (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::in); - for (auto & vote_block : message_a.vote->blocks) - { - if (!vote_block.which ()) - { - auto block (boost::get> (vote_block)); - if (!node.block_processor.full ()) - { - node.process_active (block); - } - node.active.publish (block); - } - } - node.vote_processor.vote (message_a.vote, channel); - } - void bulk_pull (nano::bulk_pull const &) override - { - assert (false); - } - void bulk_pull_account (nano::bulk_pull_account const &) override - { - assert (false); - } - void bulk_push (nano::bulk_push const &) override - { - assert (false); - } - void frontier_req (nano::frontier_req const &) override - { - assert (false); - } - void node_id_handshake (nano::node_id_handshake const & message_a) override - { - node.stats.inc (nano::stat::type::message, nano::stat::detail::node_id_handshake, nano::stat::dir::in); - } - nano::node & node; - std::shared_ptr channel; -}; -} - -// Send keepalives to all the peers we've been notified of -void nano::network::merge_peers (std::array const & peers_a) -{ - for (auto i (peers_a.begin ()), j (peers_a.end ()); i != j; ++i) - { - merge_peer (*i); - } -} - -void nano::network::merge_peer (nano::endpoint const & peer_a) -{ - if (!reachout (peer_a, node.config.allow_local_peers)) - { - std::weak_ptr node_w (node.shared ()); - node.network.tcp_channels.start_tcp (peer_a, [node_w](std::shared_ptr channel_a) { - if (auto node_l = node_w.lock ()) - { - node_l->network.send_keepalive (channel_a); - } - }); - } -} - -bool nano::network::not_a_peer (nano::endpoint const & endpoint_a, bool allow_local_peers) -{ - bool result (false); - if (endpoint_a.address ().to_v6 ().is_unspecified ()) - { - result = true; - } - else if (nano::transport::reserved_address (endpoint_a, allow_local_peers)) - { - result = true; - } - else if (endpoint_a == endpoint ()) - { - result = true; - } - return result; -} - -bool nano::network::reachout (nano::endpoint const & endpoint_a, bool allow_local_peers) -{ - // Don't contact invalid IPs - bool error = not_a_peer (endpoint_a, allow_local_peers); - if (!error) - { - error |= udp_channels.reachout (endpoint_a); - error |= tcp_channels.reachout (endpoint_a); - } - return error; -} - -std::deque> nano::network::list (size_t count_a) -{ - std::deque> result; - tcp_channels.list (result); - udp_channels.list (result); - random_pool::shuffle (result.begin (), result.end ()); - if (result.size () > count_a) - { - result.resize (count_a, nullptr); - } - return result; -} - -// Simulating with sqrt_broadcast_simulate shows we only need to broadcast to sqrt(total_peers) random peers in order to successfully publish to everyone with high probability -std::deque> nano::network::list_fanout () -{ - auto result (list (size_sqrt ())); - return result; -} - -std::unordered_set> nano::network::random_set (size_t count_a) const -{ - std::unordered_set> result (tcp_channels.random_set (count_a)); - std::unordered_set> udp_random (udp_channels.random_set (count_a)); - for (auto i (udp_random.begin ()), n (udp_random.end ()); i != n && result.size () < count_a * 1.5; ++i) - { - result.insert (*i); - } - while (result.size () > count_a) - { - result.erase (result.begin ()); - } - return result; -} - -void nano::network::random_fill (std::array & target_a) const -{ - auto peers (random_set (target_a.size ())); - assert (peers.size () <= target_a.size ()); - auto endpoint (nano::endpoint (boost::asio::ip::address_v6{}, 0)); - assert (endpoint.address ().is_v6 ()); - std::fill (target_a.begin (), target_a.end (), endpoint); - auto j (target_a.begin ()); - for (auto i (peers.begin ()), n (peers.end ()); i != n; ++i, ++j) - { - assert ((*i)->get_endpoint ().address ().is_v6 ()); - assert (j < target_a.end ()); - *j = (*i)->get_endpoint (); - } -} - -nano::tcp_endpoint nano::network::bootstrap_peer () -{ - auto result (udp_channels.bootstrap_peer ()); - if (result == nano::tcp_endpoint (boost::asio::ip::address_v6::any (), 0)) - { - result = tcp_channels.bootstrap_peer (); - } - return result; -} - -std::shared_ptr nano::network::find_channel (nano::endpoint const & endpoint_a) -{ - std::shared_ptr result (tcp_channels.find_channel (nano::transport::map_endpoint_to_tcp (endpoint_a))); - if (!result) - { - result = udp_channels.channel (endpoint_a); - } - return result; -} - -std::shared_ptr nano::network::find_node_id (nano::account const & node_id_a) -{ - std::shared_ptr result (tcp_channels.find_node_id (node_id_a)); - if (!result) - { - result = udp_channels.find_node_id (node_id_a); - } - return result; -} - -void nano::network::add_response_channels (nano::tcp_endpoint const & endpoint_a, std::vector insert_channels) -{ - std::lock_guard lock (response_channels_mutex); - response_channels.emplace (endpoint_a, insert_channels); -} - -std::shared_ptr nano::network::search_response_channel (nano::tcp_endpoint const & endpoint_a, nano::account const & node_id_a) -{ - // Search by node ID - std::shared_ptr result (find_node_id (node_id_a)); - if (!result) - { - // Search in response channels - std::unique_lock lock (response_channels_mutex); - auto existing (response_channels.find (endpoint_a)); - if (existing != response_channels.end ()) - { - auto channels_list (existing->second); - lock.unlock (); - // TCP - for (auto & i : channels_list) - { - auto search_channel (tcp_channels.find_channel (i)); - if (search_channel != nullptr) - { - result = search_channel; - break; - } - } - // UDP - if (!result) - { - for (auto & i : channels_list) - { - auto udp_endpoint (nano::transport::map_tcp_to_endpoint (i)); - auto search_channel (udp_channels.channel (udp_endpoint)); - if (search_channel != nullptr) - { - result = search_channel; - break; - } - } - } - } - } - return result; -} - -void nano::network::remove_response_channel (nano::tcp_endpoint const & endpoint_a) -{ - std::lock_guard lock (response_channels_mutex); - response_channels.erase (endpoint_a); -} - -size_t nano::network::response_channels_size () -{ - std::lock_guard lock (response_channels_mutex); - return response_channels.size (); -} - bool nano::operation::operator> (nano::operation const & other_a) const { return wakeup > other_a.wakeup; @@ -2572,54 +1869,6 @@ void nano::node::process_confirmed (std::shared_ptr block_a, uint8_ } } -void nano::node::process_message (nano::message const & message_a, std::shared_ptr channel_a) -{ - network_message_visitor visitor (*this, channel_a); - message_a.visit (visitor); -} - -nano::endpoint nano::network::endpoint () -{ - return udp_channels.get_local_endpoint (); -} - -void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) -{ - tcp_channels.purge (cutoff_a); - udp_channels.purge (cutoff_a); - if (node.network.empty ()) - { - disconnect_observer (); - } -} - -void nano::network::ongoing_cleanup () -{ - cleanup (std::chrono::steady_clock::now () - node.network_params.node.cutoff); - std::weak_ptr node_w (node.shared ()); - node.alarm.add (std::chrono::steady_clock::now () + node.network_params.node.period, [node_w]() { - if (auto node_l = node_w.lock ()) - { - node_l->network.ongoing_cleanup (); - } - }); -} - -size_t nano::network::size () const -{ - return tcp_channels.size () + udp_channels.size (); -} - -size_t nano::network::size_sqrt () const -{ - return (static_cast (std::ceil (std::sqrt (size ())))); -} - -bool nano::network::empty () const -{ - return size () == 0; -} - bool nano::block_arrival::add (nano::block_hash const & hash_a) { std::lock_guard lock (mutex); diff --git a/nano/node/node.hpp b/nano/node/node.hpp index b256d2cb..7a5c4dec 100644 --- a/nano/node/node.hpp +++ b/nano/node/node.hpp @@ -6,6 +6,7 @@ #include #include #include +#include #include #include #include @@ -13,8 +14,6 @@ #include #include #include -#include -#include #include #include #include @@ -184,126 +183,6 @@ private: std::unique_ptr collect_seq_con_info (online_reps & online_reps, const std::string & name); -class message_buffer final -{ -public: - uint8_t * buffer{ nullptr }; - size_t size{ 0 }; - nano::endpoint endpoint; -}; -/** - * A circular buffer for servicing nano realtime messages. - * This container follows a producer/consumer model where the operating system is producing data in to - * buffers which are serviced by internal threads. - * If buffers are not serviced fast enough they're internally dropped. - * This container has a maximum space to hold N buffers of M size and will allocate them in round-robin order. - * All public methods are thread-safe -*/ -class message_buffer_manager final -{ -public: - // Stats - Statistics - // Size - Size of each individual buffer - // Count - Number of buffers to allocate - message_buffer_manager (nano::stat & stats, size_t, size_t); - // Return a buffer where message data can be put - // Method will attempt to return the first free buffer - // If there are no free buffers, an unserviced buffer will be dequeued and returned - // Function will block if there are no free or unserviced buffers - // Return nullptr if the container has stopped - nano::message_buffer * allocate (); - // Queue a buffer that has been filled with message data and notify servicing threads - void enqueue (nano::message_buffer *); - // Return a buffer that has been filled with message data - // Function will block until a buffer has been added - // Return nullptr if the container has stopped - nano::message_buffer * dequeue (); - // Return a buffer to the freelist after is has been serviced - void release (nano::message_buffer *); - // Stop container and notify waiting threads - void stop (); - -private: - nano::stat & stats; - std::mutex mutex; - std::condition_variable condition; - boost::circular_buffer free; - boost::circular_buffer full; - std::vector slab; - std::vector entries; - bool stopped; -}; -class network final -{ -public: - network (nano::node &, uint16_t); - ~network (); - void start (); - void stop (); - void flood_message (nano::message const &); - void flood_vote (std::shared_ptr vote_a) - { - nano::confirm_ack message (vote_a); - flood_message (message); - } - void flood_block (std::shared_ptr block_a) - { - nano::publish publish (block_a); - flood_message (publish); - } - void flood_block_batch (std::deque>, unsigned = broadcast_interval_ms); - void merge_peers (std::array const &); - void merge_peer (nano::endpoint const &); - void send_keepalive (std::shared_ptr); - void send_keepalive_self (std::shared_ptr); - void send_node_id_handshake (std::shared_ptr, boost::optional const & query, boost::optional const & respond_to); - void broadcast_confirm_req (std::shared_ptr); - void broadcast_confirm_req_base (std::shared_ptr, std::shared_ptr>>, unsigned, bool = false); - void broadcast_confirm_req_batch (std::unordered_map, std::vector>>, unsigned = broadcast_interval_ms, bool = false); - void broadcast_confirm_req_batch (std::deque, std::shared_ptr>>>>, unsigned = broadcast_interval_ms); - void confirm_hashes (nano::transaction const &, std::shared_ptr, std::vector); - bool send_votes_cache (std::shared_ptr, nano::block_hash const &); - std::shared_ptr find_node_id (nano::account const &); - std::shared_ptr find_channel (nano::endpoint const &); - bool not_a_peer (nano::endpoint const &, bool); - // Should we reach out to this endpoint with a keepalive message - bool reachout (nano::endpoint const &, bool = false); - std::deque> list (size_t); - // A list of random peers sized for the configured rebroadcast fanout - std::deque> list_fanout (); - void random_fill (std::array &) const; - std::unordered_set> random_set (size_t) const; - // Get the next peer for attempting a tcp bootstrap connection - nano::tcp_endpoint bootstrap_peer (); - // Response channels - void add_response_channels (nano::tcp_endpoint const &, std::vector); - std::shared_ptr search_response_channel (nano::tcp_endpoint const &, nano::account const &); - void remove_response_channel (nano::tcp_endpoint const &); - size_t response_channels_size (); - nano::endpoint endpoint (); - void cleanup (std::chrono::steady_clock::time_point const &); - void ongoing_cleanup (); - size_t size () const; - size_t size_sqrt () const; - bool empty () const; - nano::message_buffer_manager buffer_container; - boost::asio::ip::udp::resolver resolver; - std::vector packet_processing_threads; - nano::node & node; - nano::transport::udp_channels udp_channels; - nano::transport::tcp_channels tcp_channels; - std::function disconnect_observer; - // Called when a new channel is observed - std::function)> channel_observer; - static unsigned const broadcast_interval_ms = 10; - static size_t const buffer_size = 512; - static size_t const confirm_req_hashes_max = 6; - -private: - std::mutex response_channels_mutex; - std::unordered_map> response_channels; -}; - class node_init final { public: @@ -365,7 +244,6 @@ public: int store_version (); void receive_confirmed (nano::transaction const &, std::shared_ptr, nano::block_hash const &); void process_confirmed (std::shared_ptr, uint8_t = 0); - void process_message (nano::message const &, std::shared_ptr); void process_active (std::shared_ptr); nano::process_return process (nano::block const &); void keepalive_preconfigured (std::vector const &); diff --git a/nano/node/transport/tcp.cpp b/nano/node/transport/tcp.cpp index d590cff1..bbb56b22 100644 --- a/nano/node/transport/tcp.cpp +++ b/nano/node/transport/tcp.cpp @@ -1,4 +1,5 @@ #include +#include #include nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::shared_ptr socket_a) : @@ -232,19 +233,19 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a))); if (channel) { - node.process_message (message_a, channel); + node.network.process_message (message_a, channel); } else { channel = node.network.search_response_channel (endpoint_a, node_id_a); if (channel) { - node.process_message (message_a, channel); + node.network.process_message (message_a, channel); } else { auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a))); - node.process_message (message_a, udp_channel); + node.network.process_message (message_a, udp_channel); } } } @@ -277,7 +278,7 @@ void nano::transport::tcp_channels::process_keepalive (nano::keepalive const & m node.network.add_response_channels (endpoint_a, insert_response_channels); } auto udp_channel (std::make_shared (node.network.udp_channels, nano::transport::map_tcp_to_endpoint (endpoint_a))); - node.process_message (message_a, udp_channel); + node.network.process_message (message_a, udp_channel); } } diff --git a/nano/node/transport/tcp.hpp b/nano/node/transport/tcp.hpp index 24076409..3601cbf5 100644 --- a/nano/node/transport/tcp.hpp +++ b/nano/node/transport/tcp.hpp @@ -1,9 +1,16 @@ #pragma once #include -#include #include +#include +#include +#include +#include +#include +#include +#include + namespace nano { namespace transport diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 056fdf5c..321c7055 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -1,9 +1,10 @@ #pragma once #include +#include #include -#include +#include namespace nano { diff --git a/nano/node/transport/udp.cpp b/nano/node/transport/udp.cpp index 69b32c96..757358c8 100644 --- a/nano/node/transport/udp.cpp +++ b/nano/node/transport/udp.cpp @@ -1,5 +1,6 @@ #include #include +#include #include nano::transport::channel_udp::channel_udp (nano::transport::udp_channels & channels_a, nano::endpoint const & endpoint_a, unsigned network_version_a) : @@ -463,7 +464,7 @@ public: node.network.udp_channels.modify (find_channel, [](std::shared_ptr channel_a) { channel_a->set_last_packet_received (std::chrono::steady_clock::now ()); }); - node.process_message (message_a, find_channel); + node.network.process_message (message_a, find_channel); } } nano::node & node; diff --git a/nano/node/transport/udp.hpp b/nano/node/transport/udp.hpp index 1028f519..71827cf3 100644 --- a/nano/node/transport/udp.hpp +++ b/nano/node/transport/udp.hpp @@ -1,11 +1,18 @@ #pragma once #include -#include #include #include +#include +#include +#include +#include +#include +#include +#include + namespace nano { class message_buffer;