From 99b8b5364fbfabd933f46479ef42f64992ae84d5 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Piotr=20Wo=CC=81jcik?= <3044353+pwojcikdev@users.noreply.github.com> Date: Wed, 13 Mar 2024 15:35:13 +0100 Subject: [PATCH] Organize --- nano/node/network.cpp | 21 ++++++++++++--------- nano/node/network.hpp | 19 +++++++++++++------ nano/node/node.cpp | 4 ++-- nano/node/transport/inproc.cpp | 31 ++----------------------------- nano/node/voting.cpp | 5 +++-- nano/node/voting.hpp | 4 +++- 6 files changed, 35 insertions(+), 49 deletions(-) diff --git a/nano/node/network.cpp b/nano/node/network.cpp index 80660cad0..04e5f489f 100644 --- a/nano/node/network.cpp +++ b/nano/node/network.cpp @@ -14,20 +14,16 @@ */ nano::network::network (nano::node & node_a, uint16_t port_a) : + node (node_a), id (nano::network_constants::active_network), syn_cookies (node_a.network_params.network.max_peers_per_ip), - inbound{ [this] (nano::message const & message, std::shared_ptr const & channel) { - debug_assert (message.header.network == node.network_params.network.current_network); - debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); - process_message (message, channel); - } }, resolver (node_a.io_ctx), tcp_message_manager (node_a.config.tcp_incoming_connections_max), - node (node_a), publish_filter (256 * 1024), - tcp_channels (node_a, inbound), - port (port_a), - disconnect_observer ([] () {}) + tcp_channels (node_a, [this] (nano::message const & message, std::shared_ptr const & channel) { + inbound (message, channel); + }), + port (port_a), disconnect_observer ([] () {}) { for (std::size_t i = 0; i < node.config.network_threads && !node.flags.disable_tcp_realtime; ++i) { @@ -354,6 +350,13 @@ void nano::network::process_message (nano::message const & message, std::shared_ message.visit (visitor); } +void nano::network::inbound (const nano::message & message, const std::shared_ptr & channel) +{ + debug_assert (message.header.network == node.network_params.network.current_network); + debug_assert (message.header.version_using >= node.network_params.network.protocol_version_min); + process_message (message, channel); +} + // Send keepalives to all the peers we've been notified of void nano::network::merge_peers (std::array const & peers_a) { diff --git a/nano/node/network.hpp b/nano/node/network.hpp index cd5b90e04..f49b5f498 100644 --- a/nano/node/network.hpp +++ b/nano/node/network.hpp @@ -74,12 +74,12 @@ private: class network final { public: - network (nano::node &, uint16_t); + network (nano::node &, uint16_t port); ~network (); - nano::networks id; void start (); void stop (); + void flood_message (nano::message &, nano::transport::buffer_drop_policy const = nano::transport::buffer_drop_policy::limiter, float const = 1.0f); void flood_keepalive (float const scale_a = 1.0f); void flood_keepalive_self (float const scale_a = 0.5f); @@ -114,8 +114,6 @@ public: nano::endpoint endpoint () const; void cleanup (std::chrono::steady_clock::time_point const &); void ongoing_cleanup (); - // Node ID cookies cleanup - nano::syn_cookies syn_cookies; void ongoing_syn_cookie_cleanup (); void ongoing_keepalive (); std::size_t size () const; @@ -124,7 +122,9 @@ public: void erase (nano::transport::channel const &); /** Disconnects and adds peer to exclusion list */ void exclude (std::shared_ptr const & channel); + void inbound (nano::message const &, std::shared_ptr const &); +public: // Handshake /** Verifies that handshake response matches our query. @returns true if OK */ bool verify_handshake_response (nano::node_id_handshake::response_payload const & response, nano::endpoint const & remote_endpoint); std::optional prepare_handshake_query (nano::endpoint const & remote_endpoint); @@ -133,20 +133,27 @@ public: private: void process_message (nano::message const &, std::shared_ptr const &); +private: // Dependencies + nano::node & node; + public: - std::function const &)> inbound; + nano::networks const id; + nano::syn_cookies syn_cookies; boost::asio::ip::udp::resolver resolver; std::vector packet_processing_threads; nano::peer_exclusion excluded_peers; nano::tcp_message_manager tcp_message_manager; - nano::node & node; nano::network_filter publish_filter; nano::transport::tcp_channels tcp_channels; std::atomic port{ 0 }; std::function disconnect_observer; // Called when a new channel is observed std::function)> channel_observer; + +private: std::atomic stopped{ false }; + +public: static unsigned const broadcast_interval_ms = 10; static std::size_t const buffer_size = 512; diff --git a/nano/node/node.cpp b/nano/node/node.cpp index 8397fa70d..b0de6bc93 100644 --- a/nano/node/node.cpp +++ b/nano/node/node.cpp @@ -174,8 +174,8 @@ nano::node::node (boost::asio::io_context & io_ctx_a, std::filesystem::path cons vote_uniquer{}, confirmation_height_processor (ledger, write_database_queue, config.conf_height_processor_batch_min_time, logger, node_initialized_latch, flags.confirmation_height_processor_mode), vote_cache{ config.vote_cache, stats }, - generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false }, - final_generator{ config, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true }, + generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* non-final */ false }, + final_generator{ config, *this, ledger, wallets, vote_processor, history, network, stats, logger, /* final */ true }, active{ *this, confirmation_height_processor, block_processor }, scheduler_impl{ std::make_unique (*this) }, scheduler{ *scheduler_impl }, diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp index b07bd0d35..2dfb36d12 100644 --- a/nano/node/transport/inproc.cpp +++ b/nano/node/transport/inproc.cpp @@ -25,30 +25,6 @@ bool nano::transport::inproc::channel::operator== (nano::transport::channel cons return endpoint == other_a.get_endpoint (); } -/** - * This function is called for every message received by the inproc channel. - * Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer - */ -class message_visitor_inbound : public nano::message_visitor -{ -public: - message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr channel) : - inbound{ inbound }, - channel{ channel } - { - } - - decltype (nano::network::inbound) & inbound; - - // the channel to reply to, if a reply is generated - std::shared_ptr channel; - - void default_handler (nano::message const & message) override - { - inbound (message, channel); - } -}; - /** * Send the buffer to the peer and call the callback function when done. The call never fails. * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. @@ -78,11 +54,8 @@ void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer co // process message { - node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->header.type), nano::stat::dir::in); - - // create an inbound message visitor class to handle incoming messages - message_visitor_inbound visitor{ destination.network.inbound, remote_channel }; - message_a->visit (visitor); + node.stats.inc (nano::stat::type::message, to_stat_detail (message_a->type ()), nano::stat::dir::in); + destination.network.inbound (*message_a, remote_channel); } }); diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index 9967f7edb..13a66ea9e 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -162,8 +162,9 @@ std::unique_ptr nano::collect_container_info (na return composite; } -nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) : +nano::vote_generator::vote_generator (nano::node_config const & config_a, nano::node & node_a, nano::ledger & ledger_a, nano::wallets & wallets_a, nano::vote_processor & vote_processor_a, nano::local_vote_history & history_a, nano::network & network_a, nano::stats & stats_a, nano::logger & logger_a, bool is_final_a) : config (config_a), + node (node_a), ledger (ledger_a), wallets (wallets_a), vote_processor (vote_processor_a), @@ -394,7 +395,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr const & { network.flood_vote_pr (vote_a); network.flood_vote (vote_a, 2.0f); - vote_processor.vote (vote_a, std::make_shared (network.node, network.node)); + vote_processor.vote (vote_a, std::make_shared (node, node)); // TODO: Avoid creating a temporary channel each time } void nano::vote_generator::run () diff --git a/nano/node/voting.hpp b/nano/node/voting.hpp index 6a2b2c79b..24c5813dd 100644 --- a/nano/node/voting.hpp +++ b/nano/node/voting.hpp @@ -22,6 +22,7 @@ namespace mi = boost::multi_index; namespace nano { +class node; class ledger; class network; class node_config; @@ -122,7 +123,7 @@ private: using queue_entry_t = std::pair; public: - vote_generator (nano::node_config const &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final); + vote_generator (nano::node_config const &, nano::node &, nano::ledger &, nano::wallets &, nano::vote_processor &, nano::local_vote_history &, nano::network &, nano::stats &, nano::logger &, bool is_final); ~vote_generator (); /** Queue items for vote generation, or broadcast votes already in cache */ @@ -153,6 +154,7 @@ private: private: // Dependencies nano::node_config const & config; + nano::node & node; nano::ledger & ledger; nano::wallets & wallets; nano::vote_processor & vote_processor;