Merge branch 'dev_tcp' into develop

This commit is contained in:
Colin LeMahieu 2024-03-15 16:42:52 +00:00
commit ef985e3cf1
No known key found for this signature in database
GPG key ID: 43708520C8DFB938
15 changed files with 291 additions and 217 deletions

View file

@ -32,12 +32,58 @@ using logger_id = std::pair<nano::log::type, nano::log::detail>;
std::string to_string (logger_id); std::string to_string (logger_id);
logger_id parse_logger_id (std::string const &); logger_id parse_logger_id (std::string const &);
}
// Time helpers
namespace nano::log
{
template <class Clock> template <class Clock>
auto microseconds (std::chrono::time_point<Clock> time) auto microseconds (std::chrono::time_point<Clock> time)
{ {
return std::chrono::duration_cast<std::chrono::microseconds> (time.time_since_epoch ()).count (); return std::chrono::duration_cast<std::chrono::microseconds> (time.time_since_epoch ()).count ();
} }
template <class Duration>
auto microseconds (Duration duration)
{
return std::chrono::duration_cast<std::chrono::microseconds> (duration).count ();
}
template <class Clock>
auto milliseconds (std::chrono::time_point<Clock> time)
{
return std::chrono::duration_cast<std::chrono::milliseconds> (time.time_since_epoch ()).count ();
}
template <class Duration>
auto milliseconds (Duration duration)
{
return std::chrono::duration_cast<std::chrono::milliseconds> (duration).count ();
}
template <class Clock>
auto seconds (std::chrono::time_point<Clock> time)
{
return std::chrono::duration_cast<std::chrono::seconds> (time.time_since_epoch ()).count ();
}
template <class Duration>
auto seconds (Duration duration)
{
return std::chrono::duration_cast<std::chrono::seconds> (duration).count ();
}
template <class Clock>
auto milliseconds_delta (std::chrono::time_point<Clock> time, std::chrono::time_point<Clock> now = Clock::now ())
{
return std::chrono::duration_cast<std::chrono::milliseconds> (now - time).count ();
}
template <class Clock>
auto seconds_delta (std::chrono::time_point<Clock> time, std::chrono::time_point<Clock> now = Clock::now ())
{
return std::chrono::duration_cast<std::chrono::seconds> (now - time).count ();
}
} }
namespace nano namespace nano

View file

@ -72,6 +72,7 @@ enum class type
election_scheduler, election_scheduler,
vote_generator, vote_generator,
rep_tiers, rep_tiers,
syn_cookies,
// bootstrap // bootstrap
bulk_pull_client, bulk_pull_client,

View file

@ -17,6 +17,7 @@ enum class type : uint8_t
ledger, ledger,
rollback, rollback,
bootstrap, bootstrap,
network,
tcp_server, tcp_server,
vote, vote,
election, election,
@ -52,6 +53,7 @@ enum class type : uint8_t
rep_crawler, rep_crawler,
local_block_broadcaster, local_block_broadcaster,
rep_tiers, rep_tiers,
syn_cookies,
bootstrap_ascending, bootstrap_ascending,
bootstrap_ascending_accounts, bootstrap_ascending_accounts,
@ -67,6 +69,8 @@ enum class detail : uint8_t
// common // common
ok, ok,
loop, loop,
loop_cleanup,
loop_keepalive,
total, total,
process, process,
processed, processed,

View file

@ -109,6 +109,12 @@ std::string nano::thread_role::get_string (nano::thread_role::name role)
case nano::thread_role::name::rep_tiers: case nano::thread_role::name::rep_tiers:
thread_role_name_string = "Rep tiers"; thread_role_name_string = "Rep tiers";
break; break;
case nano::thread_role::name::network_cleanup:
thread_role_name_string = "Net cleanup";
break;
case nano::thread_role::name::network_keepalive:
thread_role_name_string = "Net keepalive";
break;
default: default:
debug_assert (false && "nano::thread_role::get_string unhandled thread role"); debug_assert (false && "nano::thread_role::get_string unhandled thread role");
} }

View file

@ -45,6 +45,8 @@ enum class name
rep_crawler, rep_crawler,
local_block_broadcasting, local_block_broadcasting,
rep_tiers, rep_tiers,
network_cleanup,
network_keepalive,
}; };
/* /*

View file

@ -9,89 +9,164 @@
#include <boost/format.hpp> #include <boost/format.hpp>
using namespace std::chrono_literals;
/* /*
* network * network
*/ */
nano::network::network (nano::node & node_a, uint16_t port_a) : nano::network::network (nano::node & node, uint16_t port) :
id (nano::network_constants::active_network), node{ node },
syn_cookies (node_a.network_params.network.max_peers_per_ip), id{ nano::network_constants::active_network },
inbound{ [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) { syn_cookies{ node.network_params.network.max_peers_per_ip, node.logger },
debug_assert (message.header.network == node.network_params.network.current_network); resolver{ node.io_ctx },
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); publish_filter{ 256 * 1024 },
process_message (message, channel); tcp_channels{ node, [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
} }, inbound (message, channel);
resolver (node_a.io_ctx), } },
tcp_message_manager (node_a.config.tcp_incoming_connections_max), port{ port }
node (node_a),
publish_filter (256 * 1024),
tcp_channels (node_a, inbound),
port (port_a),
disconnect_observer ([] () {})
{ {
for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
packet_processing_threads.emplace_back (nano::thread_attributes::get_default (), [this, i] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
try
{
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
});
}
} }
nano::network::~network () nano::network::~network ()
{ {
stop (); // All threads must be stopped before this destructor
debug_assert (processing_threads.empty ());
debug_assert (!cleanup_thread.joinable ());
debug_assert (!keepalive_thread.joinable ());
} }
void nano::network::start () void nano::network::start ()
{ {
if (!node.flags.disable_connection_cleanup) cleanup_thread = std::thread ([this] () {
{ nano::thread_role::set (nano::thread_role::name::network_cleanup);
ongoing_cleanup (); run_cleanup ();
} });
ongoing_syn_cookie_cleanup ();
keepalive_thread = std::thread ([this] () {
nano::thread_role::set (nano::thread_role::name::network_keepalive);
run_keepalive ();
});
if (!node.flags.disable_tcp_realtime) if (!node.flags.disable_tcp_realtime)
{ {
tcp_channels.start (); tcp_channels.start ();
for (std::size_t i = 0; i < node.config.network_threads; ++i)
{
processing_threads.emplace_back (nano::thread_attributes::get_default (), [this] () {
nano::thread_role::set (nano::thread_role::name::packet_processing);
run_processing ();
});
}
} }
ongoing_keepalive ();
} }
void nano::network::stop () void nano::network::stop ()
{ {
if (!stopped.exchange (true))
{ {
tcp_channels.stop (); nano::lock_guard<nano::mutex> lock{ mutex };
resolver.cancel (); stopped = true;
tcp_message_manager.stop (); }
port = 0; condition.notify_all ();
for (auto & thread : packet_processing_threads)
tcp_channels.stop ();
resolver.cancel ();
for (auto & thread : processing_threads)
{
thread.join ();
}
processing_threads.clear ();
if (keepalive_thread.joinable ())
{
keepalive_thread.join ();
}
if (cleanup_thread.joinable ())
{
cleanup_thread.join ();
}
port = 0;
}
void nano::network::run_processing ()
{
try
{
// TODO: Move responsibility of packet queuing and processing to the message_processor class
tcp_channels.process_messages ();
}
catch (boost::system::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::error_code & ec)
{
node.logger.critical (nano::log::type::network, "Error: {}", ec.message ());
release_assert (false);
}
catch (std::runtime_error & err)
{
node.logger.critical (nano::log::type::network, "Error: {}", err.what ());
release_assert (false);
}
catch (...)
{
node.logger.critical (nano::log::type::network, "Unknown error");
release_assert (false);
}
}
void nano::network::run_cleanup ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.is_dev_network () ? 1s : 5s);
lock.unlock ();
if (stopped)
{ {
thread.join (); return;
} }
node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_cleanup);
if (!node.flags.disable_connection_cleanup)
{
auto const cutoff = std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ();
cleanup (cutoff);
}
auto const syn_cookie_cutoff = std::chrono::steady_clock::now () - node.network_params.network.syn_cookie_cutoff;
syn_cookies.purge (syn_cookie_cutoff);
lock.lock ();
}
}
void nano::network::run_keepalive ()
{
nano::unique_lock<nano::mutex> lock{ mutex };
while (!stopped)
{
condition.wait_for (lock, node.network_params.network.keepalive_period);
lock.unlock ();
if (stopped)
{
return;
}
node.stats.inc (nano::stat::type::network, nano::stat::detail::loop_keepalive);
flood_keepalive (0.75f);
flood_keepalive_self (0.25f);
lock.lock ();
} }
} }
@ -109,34 +184,6 @@ void nano::network::send_keepalive_self (std::shared_ptr<nano::transport::channe
channel_a->send (message); channel_a->send (message);
} }
void nano::network::send_node_id_handshake (std::shared_ptr<nano::transport::channel> const & channel_a, std::optional<nano::uint256_union> const & cookie, std::optional<nano::uint256_union> const & respond_to)
{
std::optional<nano::node_id_handshake::response_payload> response;
if (respond_to)
{
nano::node_id_handshake::response_payload pld{ node.node_id.pub, nano::sign_message (node.node_id.prv, node.node_id.pub, *respond_to) };
debug_assert (!nano::validate_message (pld.node_id, *respond_to, pld.signature));
response = pld;
}
std::optional<nano::node_id_handshake::query_payload> query;
if (cookie)
{
nano::node_id_handshake::query_payload pld{ *cookie };
query = pld;
}
nano::node_id_handshake message{ node.network_params.network, query, response };
node.logger.debug (nano::log::type::network, "Node ID handshake sent to: {} (query: {}, respond to: {}, signature: {})",
nano::util::to_str (channel_a->get_endpoint ()),
(query ? query->cookie.to_string () : "<none>"),
(respond_to ? respond_to->to_string () : "<none>"),
(response ? response->signature.to_string () : "<none>"));
channel_a->send (message);
}
void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a) void nano::network::flood_message (nano::message & message_a, nano::transport::buffer_drop_policy const drop_policy_a, float const scale_a)
{ {
for (auto & i : list (fanout (scale_a))) for (auto & i : list (fanout (scale_a)))
@ -220,14 +267,6 @@ void nano::network::flood_block_many (std::deque<std::shared_ptr<nano::block>> b
} }
} }
void nano::network::send_confirm_req (std::shared_ptr<nano::transport::channel> const & channel_a, std::pair<nano::block_hash, nano::root> const & hash_root_a)
{
auto & [hash, root] = hash_root_a;
// Confirmation request with hash + root
nano::confirm_req req (node.network_params.network, hash, root);
channel_a->send (req);
}
namespace namespace
{ {
class network_message_visitor : public nano::message_visitor class network_message_visitor : public nano::message_visitor
@ -354,6 +393,13 @@ void nano::network::process_message (nano::message const & message, std::shared_
message.visit (visitor); message.visit (visitor);
} }
void nano::network::inbound (const nano::message & message, const std::shared_ptr<nano::transport::channel> & channel)
{
debug_assert (message.header.network == node.network_params.network.current_network);
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min);
process_message (message, channel);
}
// Send keepalives to all the peers we've been notified of // Send keepalives to all the peers we've been notified of
void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a) void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a)
{ {
@ -506,52 +552,18 @@ nano::endpoint nano::network::endpoint () const
return nano::endpoint (boost::asio::ip::address_v6::loopback (), port); return nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
} }
void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff_a) void nano::network::cleanup (std::chrono::steady_clock::time_point const & cutoff)
{ {
tcp_channels.purge (cutoff_a); node.logger.debug (nano::log::type::network, "Performing cleanup, cutoff: {}s", nano::log::seconds_delta (cutoff));
tcp_channels.purge (cutoff);
if (node.network.empty ()) if (node.network.empty ())
{ {
disconnect_observer (); disconnect_observer ();
} }
} }
void nano::network::ongoing_cleanup ()
{
cleanup (std::chrono::steady_clock::now () - node.network_params.network.cleanup_cutoff ());
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + std::chrono::seconds (node.network_params.network.is_dev_network () ? 1 : 5), [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->network.ongoing_cleanup ();
}
});
}
void nano::network::ongoing_syn_cookie_cleanup ()
{
syn_cookies.purge (std::chrono::steady_clock::now () - nano::transport::syn_cookie_cutoff);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + (nano::transport::syn_cookie_cutoff * 2), [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->network.ongoing_syn_cookie_cleanup ();
}
});
}
void nano::network::ongoing_keepalive ()
{
flood_keepalive (0.75f);
flood_keepalive_self (0.25f);
std::weak_ptr<nano::node> node_w (node.shared ());
node.workers.add_timed_task (std::chrono::steady_clock::now () + node.network_params.network.keepalive_period, [node_w] () {
if (auto node_l = node_w.lock ())
{
node_l->network.ongoing_keepalive ();
}
});
}
std::size_t nano::network::size () const std::size_t nano::network::size () const
{ {
return tcp_channels.size (); return tcp_channels.size ();
@ -702,18 +714,19 @@ void nano::tcp_message_manager::stop ()
* syn_cookies * syn_cookies
*/ */
nano::syn_cookies::syn_cookies (std::size_t max_cookies_per_ip_a) : nano::syn_cookies::syn_cookies (std::size_t max_cookies_per_ip_a, nano::logger & logger_a) :
max_cookies_per_ip (max_cookies_per_ip_a) max_cookies_per_ip (max_cookies_per_ip_a),
logger (logger_a)
{ {
} }
boost::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a) std::optional<nano::uint256_union> nano::syn_cookies::assign (nano::endpoint const & endpoint_a)
{ {
auto ip_addr (endpoint_a.address ()); auto ip_addr (endpoint_a.address ());
debug_assert (ip_addr.is_v6 ()); debug_assert (ip_addr.is_v6 ());
nano::lock_guard<nano::mutex> lock{ syn_cookie_mutex }; nano::lock_guard<nano::mutex> lock{ syn_cookie_mutex };
unsigned & ip_cookies = cookies_per_ip[ip_addr]; unsigned & ip_cookies = cookies_per_ip[ip_addr];
boost::optional<nano::uint256_union> result; std::optional<nano::uint256_union> result;
if (ip_cookies < max_cookies_per_ip) if (ip_cookies < max_cookies_per_ip)
{ {
if (cookies.find (endpoint_a) == cookies.end ()) if (cookies.find (endpoint_a) == cookies.end ())
@ -755,6 +768,8 @@ bool nano::syn_cookies::validate (nano::endpoint const & endpoint_a, nano::accou
void nano::syn_cookies::purge (std::chrono::steady_clock::time_point const & cutoff_a) void nano::syn_cookies::purge (std::chrono::steady_clock::time_point const & cutoff_a)
{ {
logger.debug (nano::log::type::syn_cookies, "Purging syn cookies, cutoff: {}s", nano::log::seconds_delta (cutoff_a));
nano::lock_guard<nano::mutex> lock{ syn_cookie_mutex }; nano::lock_guard<nano::mutex> lock{ syn_cookie_mutex };
auto it (cookies.begin ()); auto it (cookies.begin ());
while (it != cookies.end ()) while (it != cookies.end ())

View file

@ -16,39 +16,18 @@ namespace nano
{ {
class node; class node;
class tcp_message_manager final
{
public:
tcp_message_manager (unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();
private:
nano::mutex mutex;
nano::condition_variable producer_condition;
nano::condition_variable consumer_condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };
friend class network_tcp_message_manager_Test;
};
/** /**
* Node ID cookies for node ID handshakes * Node ID cookies for node ID handshakes
*/ */
class syn_cookies final class syn_cookies final
{ {
public: public:
explicit syn_cookies (std::size_t); syn_cookies (std::size_t max_peers_per_ip, nano::logger &);
void purge (std::chrono::steady_clock::time_point const &); void purge (std::chrono::steady_clock::time_point const &);
// Returns boost::none if the IP is rate capped on syn cookie requests, // Returns boost::none if the IP is rate capped on syn cookie requests,
// or if the endpoint already has a syn cookie query // or if the endpoint already has a syn cookie query
boost::optional<nano::uint256_union> assign (nano::endpoint const &); std::optional<nano::uint256_union> assign (nano::endpoint const &);
// Returns false if valid, true if invalid (true on error convention) // Returns false if valid, true if invalid (true on error convention)
// Also removes the syn cookie from the store if valid // Also removes the syn cookie from the store if valid
bool validate (nano::endpoint const &, nano::account const &, nano::signature const &); bool validate (nano::endpoint const &, nano::account const &, nano::signature const &);
@ -58,6 +37,9 @@ public:
std::unique_ptr<container_info_component> collect_container_info (std::string const &); std::unique_ptr<container_info_component> collect_container_info (std::string const &);
std::size_t cookies_size (); std::size_t cookies_size ();
private: // Dependencies
nano::logger & logger;
private: private:
class syn_cookie_info final class syn_cookie_info final
{ {
@ -74,12 +56,12 @@ private:
class network final class network final
{ {
public: public:
network (nano::node &, uint16_t); network (nano::node &, uint16_t port);
~network (); ~network ();
nano::networks id;
void start (); void start ();
void stop (); void stop ();
void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f); void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f);
void flood_keepalive (float const scale_a = 1.0f); void flood_keepalive (float const scale_a = 1.0f);
void flood_keepalive_self (float const scale_a = 0.5f); void flood_keepalive_self (float const scale_a = 0.5f);
@ -94,8 +76,6 @@ public:
void merge_peer (nano::endpoint const &); void merge_peer (nano::endpoint const &);
void send_keepalive (std::shared_ptr<nano::transport::channel> const &); void send_keepalive (std::shared_ptr<nano::transport::channel> const &);
void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &); void send_keepalive_self (std::shared_ptr<nano::transport::channel> const &);
void send_node_id_handshake (std::shared_ptr<nano::transport::channel> const &, std::optional<nano::uint256_union> const & cookie, std::optional<nano::uint256_union> const & respond_to);
void send_confirm_req (std::shared_ptr<nano::transport::channel> const & channel_a, std::pair<nano::block_hash, nano::root> const & hash_root_a);
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &); std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &); std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
bool not_a_peer (nano::endpoint const &, bool); bool not_a_peer (nano::endpoint const &, bool);
@ -112,41 +92,53 @@ public:
// Get the next peer for attempting a tcp bootstrap connection // Get the next peer for attempting a tcp bootstrap connection
nano::tcp_endpoint bootstrap_peer (); nano::tcp_endpoint bootstrap_peer ();
nano::endpoint endpoint () const; nano::endpoint endpoint () const;
void cleanup (std::chrono::steady_clock::time_point const &); void cleanup (std::chrono::steady_clock::time_point const & cutoff);
void ongoing_cleanup ();
// Node ID cookies cleanup
nano::syn_cookies syn_cookies;
void ongoing_syn_cookie_cleanup ();
void ongoing_keepalive ();
std::size_t size () const; std::size_t size () const;
float size_sqrt () const; float size_sqrt () const;
bool empty () const; bool empty () const;
void erase (nano::transport::channel const &); void erase (nano::transport::channel const &);
/** Disconnects and adds peer to exclusion list */ /** Disconnects and adds peer to exclusion list */
void exclude (std::shared_ptr<nano::transport::channel> const & channel); void exclude (std::shared_ptr<nano::transport::channel> const & channel);
void inbound (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
public: // Handshake
/** Verifies that handshake response matches our query. @returns true if OK */ /** Verifies that handshake response matches our query. @returns true if OK */
bool verify_handshake_response (nano::node_id_handshake::response_payload const & response, nano::endpoint const & remote_endpoint); bool verify_handshake_response (nano::node_id_handshake::response_payload const & response, nano::endpoint const & remote_endpoint);
std::optional<nano::node_id_handshake::query_payload> prepare_handshake_query (nano::endpoint const & remote_endpoint); std::optional<nano::node_id_handshake::query_payload> prepare_handshake_query (nano::endpoint const & remote_endpoint);
nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const; nano::node_id_handshake::response_payload prepare_handshake_response (nano::node_id_handshake::query_payload const & query, bool v2) const;
private: private:
void run_processing ();
void run_cleanup ();
void run_keepalive ();
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &); void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
public: private: // Dependencies
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::node & node; nano::node & node;
public:
nano::networks const id;
nano::syn_cookies syn_cookies;
boost::asio::ip::udp::resolver resolver;
nano::peer_exclusion excluded_peers;
nano::network_filter publish_filter; nano::network_filter publish_filter;
nano::transport::tcp_channels tcp_channels; nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 }; std::atomic<uint16_t> port{ 0 };
std::function<void ()> disconnect_observer;
public: // Callbacks
std::function<void ()> disconnect_observer{ [] () {} };
// Called when a new channel is observed // Called when a new channel is observed
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer; std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer{ [] (auto) {} };
private:
std::atomic<bool> stopped{ false }; std::atomic<bool> stopped{ false };
mutable nano::mutex mutex;
nano::condition_variable condition;
std::vector<boost::thread> processing_threads; // Using boost::thread to enable increased stack size
std::thread cleanup_thread;
std::thread keepalive_thread;
public:
static unsigned const broadcast_interval_ms = 10; static unsigned const broadcast_interval_ms = 10;
static std::size_t const buffer_size = 512; static std::size_t const buffer_size = 512;

View file

@ -174,8 +174,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
vote_uniquer{}, vote_uniquer{},
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode), confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
vote_cache{ config.vote_cache, stats }, vote_cache{ config.vote_cache, stats },
generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false }, generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false },
final_generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true }, final_generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true },
active{ *this, confirmation_height_processor, block_processor }, active{ *this, confirmation_height_processor, block_processor },
scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) }, scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) },
scheduler{ *scheduler_impl }, scheduler{ *scheduler_impl },

View file

@ -25,30 +25,6 @@ bool nano::transport::inproc::channel::operator== (nano::transport::channel cons
return endpoint == other_a.get_endpoint (); return endpoint == other_a.get_endpoint ();
} }
/**
* This function is called for every message received by the inproc channel.
* Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer
*/
class message_visitor_inbound : public nano::message_visitor
{
public:
message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr<nano::transport::inproc::channel> channel) :
inbound{ inbound },
channel{ channel }
{
}
decltype (nano::network::inbound) & inbound;
// the channel to reply to, if a reply is generated
std::shared_ptr<nano::transport::inproc::channel> channel;
void default_handler (nano::message const & message) override
{
inbound (message, channel);
}
};
/** /**
* Send the buffer to the peer and call the callback function when done. The call never fails. * Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
@ -78,11 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co
// process message // process message
{ {
node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->header.type), nano::stat::dir::in); node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->type ()), nano::stat::dir::in);
destination.network.inbound (*message_a, remote_channel);
// create an inbound message visitor class to handle incoming messages
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
message_a->visit (visitor);
} }
}); });

View file

@ -126,6 +126,7 @@ void nano::transport::channel_tcp::operator() (nano::object_stream & obs) const
nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) : nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
node{ node }, node{ node },
message_manager{ node.config.tcp_incoming_connections_max },
sink{ std::move (sink) } sink{ std::move (sink) }
{ {
} }
@ -295,7 +296,7 @@ void nano::transport::tcp_channels::process_messages ()
{ {
while (!stopped) while (!stopped)
{ {
auto item (node.network.tcp_message_manager.get_message ()); auto item = message_manager.get_message ();
if (item.message != nullptr) if (item.message != nullptr)
{ {
process_message (*item.message, item.endpoint, item.node_id, item.socket); process_message (*item.message, item.endpoint, item.node_id, item.socket);
@ -364,6 +365,9 @@ void nano::transport::tcp_channels::stop ()
{ {
stopped = true; stopped = true;
nano::unique_lock<nano::mutex> lock{ mutex }; nano::unique_lock<nano::mutex> lock{ mutex };
message_manager.stop ();
// Close all TCP sockets // Close all TCP sockets
for (auto const & channel : channels) for (auto const & channel : channels)
{ {

View file

@ -25,6 +25,28 @@ public:
nano::account node_id; nano::account node_id;
std::shared_ptr<nano::transport::socket> socket; std::shared_ptr<nano::transport::socket> socket;
}; };
class tcp_message_manager final
{
public:
tcp_message_manager (unsigned incoming_connections_max_a);
void put_message (nano::tcp_message_item const & item_a);
nano::tcp_message_item get_message ();
// Stop container and notify waiting threads
void stop ();
private:
nano::mutex mutex;
nano::condition_variable producer_condition;
nano::condition_variable consumer_condition;
std::deque<nano::tcp_message_item> entries;
unsigned max_entries;
static unsigned const max_entries_per_connection = 16;
bool stopped{ false };
friend class network_tcp_message_manager_Test;
};
namespace transport namespace transport
{ {
class tcp_server; class tcp_server;
@ -136,10 +158,14 @@ namespace transport
// Connection start // Connection start
void start_tcp (nano::endpoint const &); void start_tcp (nano::endpoint const &);
void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &); void start_tcp_receive_node_id (std::shared_ptr<nano::transport::channel_tcp> const &, nano::endpoint const &, std::shared_ptr<std::vector<uint8_t>> const &);
private: // Dependencies
nano::node & node; nano::node & node;
public:
nano::tcp_message_manager message_manager;
private: private:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
class endpoint_tag class endpoint_tag
{ {
}; };
@ -255,6 +281,9 @@ namespace transport
mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>> mi::member<tcp_endpoint_attempt, std::chrono::steady_clock::time_point, &tcp_endpoint_attempt::last_attempt>>>>
attempts; attempts;
// clang-format on // clang-format on
private:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
std::atomic<bool> stopped{ false }; std::atomic<bool> stopped{ false };
friend class network_peer_max_tcp_attempts_subnetwork_Test; friend class network_peer_max_tcp_attempts_subnetwork_Test;

View file

@ -496,7 +496,7 @@ void nano::transport::tcp_server::queue_realtime (std::unique_ptr<nano::message>
{ {
return; return;
} }
node->network.tcp_message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket }); node->network.tcp_channels.message_manager.put_message (nano::tcp_message_item{ std::move (message), remote_endpoint, remote_node_id, socket });
} }
/* /*

View file

@ -22,5 +22,4 @@ bool is_ipv4_or_v4_mapped_address (boost::asio::ip::address const &);
// Unassigned, reserved, self // Unassigned, reserved, self
bool reserved_address (nano::endpoint const &, bool = false); bool reserved_address (nano::endpoint const &, bool = false);
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
} }

View file

@ -162,8 +162,9 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (na
return composite; return composite;
} }
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) : nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) :
config (config_a), config (config_a),
node (node_a),
ledger (ledger_a), ledger (ledger_a),
wallets (wallets_a), wallets (wallets_a),
vote_processor (vote_processor_a), vote_processor (vote_processor_a),
@ -394,7 +395,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &
{ {
network.flood_vote_pr (vote_a); network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f); network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (network.node, network.node)); vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (node, node)); // TODO: Avoid creating a temporary channel each time
} }
void nano::vote_generator::run () void nano::vote_generator::run ()

View file

@ -22,6 +22,7 @@ namespace mi = boost::multi_index;
namespace nano namespace nano
{ {
class node;
class ledger; class ledger;
class network; class network;
class node_config; class node_config;
@ -122,7 +123,7 @@ private:
using queue_entry_t = std::pair<nano::root, nano::block_hash>; using queue_entry_t = std::pair<nano::root, nano::block_hash>;
public: public:
vote_generator (nano::node_config const &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final); vote_generator (nano::node_config const &, nano::node &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final);
~vote_generator (); ~vote_generator ();
/** Queue items for vote generation, or broadcast votes already in cache */ /** Queue items for vote generation, or broadcast votes already in cache */
@ -153,6 +154,7 @@ private:
private: // Dependencies private: // Dependencies
nano::node_config const & config; nano::node_config const & config;
nano::node & node;
nano::ledger & ledger; nano::ledger & ledger;
nano::wallets & wallets; nano::wallets & wallets;
nano::vote_processor & vote_processor; nano::vote_processor & vote_processor;