This commit is contained in:
Piotr Wójcik 2024-03-13 15:35:13 +01:00 committed by Piotr Wójcik
commit 99b8b5364f
6 changed files with 35 additions and 49 deletions

View file

@ -14,20 +14,16 @@
*/
nano::network::network (nano::node & node_a, uint16_t port_a) :
node (node_a),
id (nano::network_constants::active_network),
syn_cookies (node_a.network_params.network.max_peers_per_ip),
inbound{ [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
debug_assert (message.header.network == node.network_params.network.current_network);
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min);
process_message (message, channel);
} },
resolver (node_a.io_ctx),
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
node (node_a),
publish_filter (256 * 1024),
tcp_channels (node_a, inbound),
port (port_a),
disconnect_observer ([] () {})
tcp_channels (node_a, [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) {
inbound (message, channel);
}),
port (port_a), disconnect_observer ([] () {})
{
for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i)
{
@ -354,6 +350,13 @@ void nano::network::process_message (nano::message const & message, std::shared_
message.visit (visitor);
}
void nano::network::inbound (const nano::message & message, const std::shared_ptr<nano::transport::channel> & channel)
{
debug_assert (message.header.network == node.network_params.network.current_network);
debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min);
process_message (message, channel);
}
// Send keepalives to all the peers we've been notified of
void nano::network::merge_peers (std::array<nano::endpoint, 8> const & peers_a)
{

View file

@ -74,12 +74,12 @@ private:
class network final
{
public:
network (nano::node &, uint16_t);
network (nano::node &, uint16_t port);
~network ();
nano::networks id;
void start ();
void stop ();
void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f);
void flood_keepalive (float const scale_a = 1.0f);
void flood_keepalive_self (float const scale_a = 0.5f);
@ -114,8 +114,6 @@ public:
nano::endpoint endpoint () const;
void cleanup (std::chrono::steady_clock::time_point const &);
void ongoing_cleanup ();
// Node ID cookies cleanup
nano::syn_cookies syn_cookies;
void ongoing_syn_cookie_cleanup ();
void ongoing_keepalive ();
std::size_t size () const;
@ -124,7 +122,9 @@ public:
void erase (nano::transport::channel const &);
/** Disconnects and adds peer to exclusion list */
void exclude (std::shared_ptr<nano::transport::channel> const & channel);
void inbound (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
public: // Handshake
/** Verifies that handshake response matches our query. @returns true if OK */
bool verify_handshake_response (nano::node_id_handshake::response_payload const & response, nano::endpoint const & remote_endpoint);
std::optional<nano::node_id_handshake::query_payload> prepare_handshake_query (nano::endpoint const & remote_endpoint);
@ -133,20 +133,27 @@ public:
private:
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
private: // Dependencies
nano::node & node;
public:
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
nano::networks const id;
nano::syn_cookies syn_cookies;
boost::asio::ip::udp::resolver resolver;
std::vector<boost::thread> packet_processing_threads;
nano::peer_exclusion excluded_peers;
nano::tcp_message_manager tcp_message_manager;
nano::node & node;
nano::network_filter publish_filter;
nano::transport::tcp_channels tcp_channels;
std::atomic<uint16_t> port{ 0 };
std::function<void ()> disconnect_observer;
// Called when a new channel is observed
std::function<void (std::shared_ptr<nano::transport::channel>)> channel_observer;
private:
std::atomic<bool> stopped{ false };
public:
static unsigned const broadcast_interval_ms = 10;
static std::size_t const buffer_size = 512;

View file

@ -174,8 +174,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons
vote_uniquer{},
confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode),
vote_cache{ config.vote_cache, stats },
generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false },
final_generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true },
generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false },
final_generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true },
active{ *this, confirmation_height_processor, block_processor },
scheduler_impl{ std::make_unique<nano::scheduler::component> (*this) },
scheduler{ *scheduler_impl },

View file

@ -25,30 +25,6 @@ bool nano::transport::inproc::channel::operator== (nano::transport::channel cons
return endpoint == other_a.get_endpoint ();
}
/**
* This function is called for every message received by the inproc channel.
* Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer
*/
class message_visitor_inbound : public nano::message_visitor
{
public:
message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr<nano::transport::inproc::channel> channel) :
inbound{ inbound },
channel{ channel }
{
}
decltype (nano::network::inbound) & inbound;
// the channel to reply to, if a reply is generated
std::shared_ptr<nano::transport::inproc::channel> channel;
void default_handler (nano::message const & message) override
{
inbound (message, channel);
}
};
/**
* Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
@ -78,11 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co
// process message
{
node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->header.type), nano::stat::dir::in);
// create an inbound message visitor class to handle incoming messages
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
message_a->visit (visitor);
node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->type ()), nano::stat::dir::in);
destination.network.inbound (*message_a, remote_channel);
}
});

View file

@ -162,8 +162,9 @@ std::unique_ptr<nano::container_info_component> nano::collect_container_info (na
return composite;
}
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) :
nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) :
config (config_a),
node (node_a),
ledger (ledger_a),
wallets (wallets_a),
vote_processor (vote_processor_a),
@ -394,7 +395,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &
{
network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (network.node, network.node));
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (node, node)); // TODO: Avoid creating a temporary channel each time
}
void nano::vote_generator::run ()

View file

@ -22,6 +22,7 @@ namespace mi = boost::multi_index;
namespace nano
{
class node;
class ledger;
class network;
class node_config;
@ -122,7 +123,7 @@ private:
using queue_entry_t = std::pair<nano::root, nano::block_hash>;
public:
vote_generator (nano::node_config const &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final);
vote_generator (nano::node_config const &, nano::node &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final);
~vote_generator ();
/** Queue items for vote generation, or broadcast votes already in cache */
@ -153,6 +154,7 @@ private:
private: // Dependencies
nano::node_config const & config;
nano::node & node;
nano::ledger & ledger;
nano::wallets & wallets;
nano::vote_processor & vote_processor;