From 920d73a337b50976db6d34a4392f145bae2d2331 Mon Sep 17 00:00:00 2001 From: clemahieu Date: Wed, 4 May 2022 21:54:45 +0100 Subject: [PATCH] Renaming nano::transport::channel_loopback to nano::transport::inproc::channel. (#3805) * Renaming nano::transport::channel_loopback to nano::transport::inproc::channel. The name 'inproc' better indicates it's implementation through memory transfer, in contrast to the name 'loopback' might indicate an association with a network stack. * Adding shared_const_buffer::to_bytes which copies all bytes in the buffer to a vector. * Call the callback function and add some comments Co-authored-by: Dimitrios Siganos --- nano/core_test/active_transactions.cpp | 27 +++--- nano/core_test/ledger.cpp | 13 +-- nano/core_test/network.cpp | 9 +- nano/core_test/node.cpp | 23 ++--- nano/core_test/system.cpp | 17 ++++ nano/core_test/vote_processor.cpp | 11 +-- nano/lib/asio.cpp | 11 +++ nano/lib/asio.hpp | 1 + nano/nano_node/entry.cpp | 3 +- nano/node/CMakeLists.txt | 2 + nano/node/transport/inproc.cpp | 120 +++++++++++++++++++++++++ nano/node/transport/inproc.hpp | 50 +++++++++++ nano/node/transport/transport.cpp | 29 +----- nano/node/transport/transport.hpp | 34 ------- nano/node/voting.cpp | 3 +- nano/slow_test/node.cpp | 3 +- 16 files changed, 252 insertions(+), 104 deletions(-) create mode 100644 nano/node/transport/inproc.cpp create mode 100644 nano/node/transport/inproc.hpp diff --git a/nano/core_test/active_transactions.cpp b/nano/core_test/active_transactions.cpp index f56def3c..162fe52f 100644 --- a/nano/core_test/active_transactions.cpp +++ b/nano/core_test/active_transactions.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -219,7 +220,7 @@ TEST (active_transactions, inactive_votes_cache) .work (*system.work.generate (latest)) .build_shared (); auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector (1, send->hash ()))); - node.vote_processor.vote (vote, std::make_shared (node)); + node.vote_processor.vote (vote, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1); node.process_active (send); node.block_processor.flush (); @@ -241,7 +242,7 @@ TEST (active_transactions, inactive_votes_cache_non_final) .work (*system.work.generate (latest)) .build_shared (); auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector (1, send->hash ()))); // Non-final vote - node.vote_processor.vote (vote, std::make_shared (node)); + node.vote_processor.vote (vote, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1); node.process_active (send); node.block_processor.flush (); @@ -278,7 +279,7 @@ TEST (active_transactions, inactive_votes_cache_fork) .build_shared (); auto const vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector (1, send1->hash ())); - node.vote_processor.vote (vote, std::make_shared (node)); + node.vote_processor.vote (vote, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1); node.process_active (send2); @@ -326,7 +327,7 @@ TEST (active_transactions, inactive_votes_cache_existing_vote) ASSERT_GT (node.weight (key.pub), node.minimum_principal_weight ()); // Insert vote auto vote1 (std::make_shared (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector (1, send->hash ()))); - node.vote_processor.vote (vote1, std::make_shared (node)); + node.vote_processor.vote (vote1, std::make_shared (node, node)); ASSERT_TIMELY (5s, election->votes ().size () == 2); ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_new)); auto last_vote1 (election->votes ()[key.pub]); @@ -390,9 +391,9 @@ TEST (active_transactions, DISABLED_inactive_votes_cache_multiple_votes) node.block_processor.flush (); // Process votes auto vote1 (std::make_shared (key1.pub, key1.prv, 0, 0, std::vector (1, send1->hash ()))); - node.vote_processor.vote (vote1, std::make_shared (node)); + node.vote_processor.vote (vote1, std::make_shared (node, node)); auto vote2 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector (1, send1->hash ()))); - node.vote_processor.vote (vote2, std::make_shared (node)); + node.vote_processor.vote (vote2, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.find_inactive_votes_cache (send1->hash ()).voters.size () == 2); ASSERT_EQ (1, node.active.inactive_votes_cache_size ()); node.scheduler.activate (nano::dev::genesis_key.pub, node.store.tx_begin_read ()); @@ -471,18 +472,18 @@ TEST (active_transactions, inactive_votes_cache_election_start) // Inactive votes std::vector hashes{ open1->hash (), open2->hash (), send4->hash () }; auto vote1 (std::make_shared (key1.pub, key1.prv, 0, 0, hashes)); - node.vote_processor.vote (vote1, std::make_shared (node)); + node.vote_processor.vote (vote1, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 3); ASSERT_TRUE (node.active.empty ()); ASSERT_EQ (1, node.ledger.cache.cemented_count); // 2 votes are required to start election (dev network) auto vote2 (std::make_shared (key2.pub, key2.prv, 0, 0, hashes)); - node.vote_processor.vote (vote2, std::make_shared (node)); + node.vote_processor.vote (vote2, std::make_shared (node, node)); // Only open1 & open2 blocks elections should start (send4 is missing previous block in ledger) ASSERT_TIMELY (5s, 2 == node.active.size ()); // Confirm elections with weight quorum auto vote0 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, hashes)); // Final vote for confirmation - node.vote_processor.vote (vote0, std::make_shared (node)); + node.vote_processor.vote (vote0, std::make_shared (node, node)); ASSERT_TIMELY (5s, node.active.empty ()); ASSERT_TIMELY (5s, 5 == node.ledger.cache.cemented_count); // A late block arrival also checks the inactive votes cache @@ -713,7 +714,7 @@ TEST (active_transactions, republish_winner) auto election = node1.active.election (fork->qualified_root ()); ASSERT_NE (nullptr, election); auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector{ fork->hash () }); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); node1.vote_processor.flush (); node1.block_processor.flush (); ASSERT_TIMELY (3s, election->confirmed ()); @@ -835,7 +836,7 @@ TEST (active_transactions, fork_replacement_tally) node1.process_active (open); // Confirmation auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector{ send->hash (), open->hash () })); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); } node1.block_processor.flush (); ASSERT_TIMELY (5s, node1.ledger.cache.cemented_count == 1 + 2 * reps_count); @@ -885,7 +886,7 @@ TEST (active_transactions, fork_replacement_tally) .work (*system.work.generate (latest)) .build_shared (); auto vote (std::make_shared (keys[i].pub, keys[i].prv, 0, 0, std::vector{ fork->hash () })); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); node1.vote_processor.flush (); node1.process_active (fork); } @@ -915,7 +916,7 @@ TEST (active_transactions, fork_replacement_tally) // Process vote for correct block & replace existing lowest tally block auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector{ send_last->hash () })); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); node1.vote_processor.flush (); node2.network.flood_block (send_last); ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1); diff --git a/nano/core_test/ledger.cpp b/nano/core_test/ledger.cpp index 0e9bb0f3..ac9ea40f 100644 --- a/nano/core_test/ledger.cpp +++ b/nano/core_test/ledger.cpp @@ -2,6 +2,7 @@ #include #include #include +#include #include #include @@ -647,10 +648,10 @@ TEST (votes, check_signature) ASSERT_EQ (1, election1->votes ().size ()); auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 1, 0, send1)); vote1->signature.bytes[0] ^= 1; - ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1))); + ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1, node1))); vote1->signature.bytes[0] ^= 1; - ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1))); - ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1))); + ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1, node1))); + ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared (node1, node1))); } TEST (votes, add_one) @@ -785,7 +786,7 @@ TEST (votes, add_old) node1.scheduler.flush (); auto election1 = node1.active.election (send1->qualified_root ()); auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 2, 0, send1)); - auto channel (std::make_shared (node1)); + auto channel (std::make_shared (node1, node1)); node1.vote_processor.vote_blocking (vote1, channel); nano::keypair key2; auto send2 (std::make_shared (nano::dev::genesis->hash (), key2.pub, 0, nano::dev::genesis_key.prv, nano::dev::genesis_key.pub, 0)); @@ -827,7 +828,7 @@ TEST (votes, DISABLED_add_old_different_account) ASSERT_EQ (1, election1->votes ().size ()); ASSERT_EQ (1, election2->votes ().size ()); auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 2, 0, send1)); - auto channel (std::make_shared (node1)); + auto channel (std::make_shared (node1, node1)); auto vote_result1 (node1.vote_processor.vote_blocking (vote1, channel)); ASSERT_EQ (nano::vote_code::vote, vote_result1); ASSERT_EQ (2, election1->votes ().size ()); @@ -861,7 +862,7 @@ TEST (votes, add_cooldown) node1.scheduler.flush (); auto election1 = node1.active.election (send1->qualified_root ()); auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 1, 0, send1)); - auto channel (std::make_shared (node1)); + auto channel (std::make_shared (node1, node1)); node1.vote_processor.vote_blocking (vote1, channel); nano::keypair key2; auto send2 (std::make_shared (nano::dev::genesis->hash (), key2.pub, 0, nano::dev::genesis_key.prv, nano::dev::genesis_key.pub, 0)); diff --git a/nano/core_test/network.cpp b/nano/core_test/network.cpp index 8d6dfd7b..536073b1 100644 --- a/nano/core_test/network.cpp +++ b/nano/core_test/network.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -1256,14 +1257,14 @@ TEST (network, loopback_channel) nano::system system (2); auto & node1 = *system.nodes[0]; auto & node2 = *system.nodes[1]; - nano::transport::channel_loopback channel1 (node1); + nano::transport::inproc::channel channel1 (node1, node1); ASSERT_EQ (channel1.get_type (), nano::transport::transport_type::loopback); ASSERT_EQ (channel1.get_endpoint (), node1.network.endpoint ()); ASSERT_EQ (channel1.get_tcp_endpoint (), nano::transport::map_endpoint_to_tcp (node1.network.endpoint ())); ASSERT_EQ (channel1.get_network_version (), node1.network_params.network.protocol_version); ASSERT_EQ (channel1.get_node_id (), node1.node_id.pub); ASSERT_EQ (channel1.get_node_id_optional ().value_or (0), node1.node_id.pub); - nano::transport::channel_loopback channel2 (node2); + nano::transport::inproc::channel channel2 (node2, node2); ASSERT_TRUE (channel1 == channel1); ASSERT_FALSE (channel1 == channel2); ++node1.network.port; @@ -1277,10 +1278,10 @@ TEST (network, filter) auto & node1 = *system.nodes[0]; nano::keepalive keepalive{ nano::dev::network_params.network }; const_cast (keepalive.header.network) = nano::networks::nano_dev_network; - node1.network.inbound (keepalive, std::make_shared (node1)); + node1.network.inbound (keepalive, std::make_shared (node1, node1)); ASSERT_EQ (0, node1.stats.count (nano::stat::type::message, nano::stat::detail::invalid_network)); const_cast (keepalive.header.network) = nano::networks::invalid; - node1.network.inbound (keepalive, std::make_shared (node1)); + node1.network.inbound (keepalive, std::make_shared (node1, node1)); ASSERT_EQ (1, node1.stats.count (nano::stat::type::message, nano::stat::detail::invalid_network)); } diff --git a/nano/core_test/node.cpp b/nano/core_test/node.cpp index 2baecdf6..39d638cd 100644 --- a/nano/core_test/node.cpp +++ b/nano/core_test/node.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -2125,14 +2126,14 @@ TEST (node, online_reps_rep_crawler) auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ nano::dev::genesis->hash () }); ASSERT_EQ (0, node1.online_reps.online ()); // Without rep crawler - node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); + node1.vote_processor.vote_blocking (vote, std::make_shared (node1, node1)); ASSERT_EQ (0, node1.online_reps.online ()); // After inserting to rep crawler { nano::lock_guard guard (node1.rep_crawler.probable_reps_mutex); node1.rep_crawler.active.insert (nano::dev::genesis->hash ()); } - node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); + node1.vote_processor.vote_blocking (vote, std::make_shared (node1, node1)); ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ()); } } @@ -2162,7 +2163,7 @@ TEST (node, online_reps_election) // Process vote for ongoing election auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector{ send1->hash () }); ASSERT_EQ (0, node1.online_reps.online ()); - node1.vote_processor.vote_blocking (vote, std::make_shared (node1)); + node1.vote_processor.vote_blocking (vote, std::make_shared (node1, node1)); ASSERT_EQ (nano::dev::constants.genesis_amount - nano::Gxrb_ratio, node1.online_reps.online ()); } @@ -2517,7 +2518,7 @@ TEST (node, local_votes_cache_fork) ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code); // Cache vote auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector (1, send1->hash ()))); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); node1.history.add (send1->root (), send1->hash (), vote); auto votes2 (node1.history.votes (send1->root (), send1->hash ())); ASSERT_EQ (1, votes2.size ()); @@ -2559,7 +2560,7 @@ TEST (node, vote_republish) auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, send2)); ASSERT_TRUE (node1.active.active (*send1)); ASSERT_TIMELY (10s, node2.active.active (*send1)); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); ASSERT_TIMELY (10s, node1.block (send2->hash ())); ASSERT_TIMELY (10s, node2.block (send2->hash ())); ASSERT_FALSE (node1.block (send1->hash ())); @@ -2654,7 +2655,7 @@ TEST (node, vote_by_hash_republish) auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, vote_blocks); // Final vote for confirmation ASSERT_TRUE (node1.active.active (*send1)); ASSERT_TRUE (node2.active.active (*send1)); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); ASSERT_TIMELY (10s, node1.block (send2->hash ())); ASSERT_TIMELY (10s, node2.block (send2->hash ())); ASSERT_FALSE (node1.block (send1->hash ())); @@ -2697,7 +2698,7 @@ TEST (node, DISABLED_vote_by_hash_epoch_block_republish) auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, vote_blocks)); ASSERT_TRUE (node1.active.active (*send1)); ASSERT_TRUE (node2.active.active (*send1)); - node1.vote_processor.vote (vote, std::make_shared (node1)); + node1.vote_processor.vote (vote, std::make_shared (node1, node1)); ASSERT_TIMELY (10s, node1.block (epoch1->hash ())); ASSERT_TIMELY (10s, node2.block (epoch1->hash ())); ASSERT_FALSE (node1.block (send1->hash ())); @@ -3176,7 +3177,7 @@ TEST (node, confirm_back) std::vector vote_blocks; vote_blocks.push_back (send2->hash ()); auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, vote_blocks)); - node.vote_processor.vote_blocking (vote, std::make_shared (node)); + node.vote_processor.vote_blocking (vote, std::make_shared (node, node)); ASSERT_TIMELY (10s, node.active.empty ()); } @@ -3711,7 +3712,7 @@ TEST (node, rollback_gap_source) ASSERT_EQ (1, election->votes ().size ()); // Confirm open auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector (1, open->hash ()))); - node.vote_processor.vote (vote1, std::make_shared (node)); + node.vote_processor.vote (vote1, std::make_shared (node, node)); ASSERT_TIMELY (5s, election->votes ().size () == 2); ASSERT_TIMELY (3s, election->confirmed ()); } @@ -3738,7 +3739,7 @@ TEST (node, rollback_gap_source) ASSERT_EQ (2, election->blocks ().size ()); // Confirm open (again) auto vote1 (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector (1, open->hash ()))); - node.vote_processor.vote (vote1, std::make_shared (node)); + node.vote_processor.vote (vote1, std::make_shared (node, node)); ASSERT_TIMELY (5s, election->votes ().size () == 2); } // Wait for new rollback @@ -4280,7 +4281,7 @@ TEST (rep_crawler, local) nano::node_flags flags; flags.disable_rep_crawler = true; auto & node = *system.add_node (flags); - auto loopback = std::make_shared (node); + auto loopback = std::make_shared (node, node); auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector{ nano::dev::genesis->hash () }); { nano::lock_guard guard (node.rep_crawler.probable_reps_mutex); diff --git a/nano/core_test/system.cpp b/nano/core_test/system.cpp index 4854fbd7..6fc89a9f 100644 --- a/nano/core_test/system.cpp +++ b/nano/core_test/system.cpp @@ -1,3 +1,4 @@ +#include #include #include #include @@ -183,3 +184,19 @@ TEST (system, rep_initialize_many) ASSERT_EQ ((nano::dev::constants.genesis_amount - nano::Gxrb_ratio) / 2, node1->balance (key0.pub)); ASSERT_EQ ((nano::dev::constants.genesis_amount - nano::Gxrb_ratio) / 2, node1->balance (key1.pub)); } + +TEST (system, transport_basic) +{ + nano::system system{ 1 }; + auto & node0 = *system.nodes[0]; + // Start nodes in separate systems so they don't automatically connect with each other. + nano::system system1{ 1 }; + auto & node1 = *system1.nodes[0]; + ASSERT_EQ (0, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in)); + nano::transport::inproc::channel channel{ node0, node1 }; + // Send a keepalive message since they are easy to construct + nano::keepalive junk{ nano::dev::network_params.network }; + channel.send (junk); + // Ensure the keepalive has been reecived on the target. + ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0); +} diff --git a/nano/core_test/vote_processor.cpp b/nano/core_test/vote_processor.cpp index 080a98a5..7fe9a47f 100644 --- a/nano/core_test/vote_processor.cpp +++ b/nano/core_test/vote_processor.cpp @@ -1,4 +1,5 @@ #include +#include #include #include #include @@ -15,7 +16,7 @@ TEST (vote_processor, codes) auto vote (std::make_shared (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector{ nano::dev::genesis->hash () })); auto vote_invalid = std::make_shared (*vote); vote_invalid->signature.bytes[0] ^= 1; - auto channel (std::make_shared (node)); + auto channel (std::make_shared (node, node)); // Invalid signature ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, false)); @@ -46,7 +47,7 @@ TEST (vote_processor, flush) { nano::system system (1); auto & node (*system.nodes[0]); - auto channel (std::make_shared (node)); + auto channel (std::make_shared (node, node)); for (unsigned i = 0; i < 2000; ++i) { auto vote = std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * (1 + i), 0, std::vector{ nano::dev::genesis->hash () }); @@ -64,7 +65,7 @@ TEST (vote_processor, invalid_signature) auto vote = std::make_shared (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector{ nano::dev::genesis->hash () }); auto vote_invalid = std::make_shared (*vote); vote_invalid->signature.bytes[0] ^= 1; - auto channel = std::make_shared (node); + auto channel = std::make_shared (node, node); node.block_confirm (nano::dev::genesis); auto election = node.active.election (nano::dev::genesis->qualified_root ()); @@ -86,7 +87,7 @@ TEST (vote_processor, no_capacity) auto & node (*system.add_node (node_flags)); nano::keypair key; auto vote (std::make_shared (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector{ nano::dev::genesis->hash () })); - auto channel (std::make_shared (node)); + auto channel (std::make_shared (node, node)); ASSERT_TRUE (node.vote_processor.vote (vote, channel)); } @@ -98,7 +99,7 @@ TEST (vote_processor, overflow) auto & node (*system.add_node (node_flags)); nano::keypair key; auto vote (std::make_shared (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector{ nano::dev::genesis->hash () })); - auto channel (std::make_shared (node)); + auto channel (std::make_shared (node, node)); // No way to lock the processor, but queueing votes in quick succession must result in overflow size_t not_processed{ 0 }; diff --git a/nano/lib/asio.cpp b/nano/lib/asio.cpp index b73e1b86..9dbc97c2 100644 --- a/nano/lib/asio.cpp +++ b/nano/lib/asio.cpp @@ -43,3 +43,14 @@ std::size_t nano::shared_const_buffer::size () const { return m_buffer.size (); } + +std::vector nano::shared_const_buffer::to_bytes () const +{ + std::vector bytes; + for (auto const & buffer : *this) + { + bytes.resize (bytes.size () + buffer.size ()); + std::copy ((uint8_t const *)buffer.data (), (uint8_t const *)buffer.data () + buffer.size (), bytes.data () + bytes.size () - buffer.size ()); + } + return bytes; +} diff --git a/nano/lib/asio.hpp b/nano/lib/asio.hpp index 161a2e00..dcbce2a5 100644 --- a/nano/lib/asio.hpp +++ b/nano/lib/asio.hpp @@ -20,6 +20,7 @@ public: boost::asio::const_buffer const * end () const; std::size_t size () const; + std::vector to_bytes () const; private: std::shared_ptr> m_data; diff --git a/nano/nano_node/entry.cpp b/nano/nano_node/entry.cpp index 02414350..f1112c52 100644 --- a/nano/nano_node/entry.cpp +++ b/nano/nano_node/entry.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -1108,7 +1109,7 @@ int main (int argc, char * const * argv) while (!votes.empty ()) { auto vote (votes.front ()); - auto channel (std::make_shared (*node)); + auto channel (std::make_shared (*node, *node)); node->vote_processor.vote (vote, channel); votes.pop_front (); } diff --git a/nano/node/CMakeLists.txt b/nano/node/CMakeLists.txt index ca7645df..ba02c0f1 100644 --- a/nano/node/CMakeLists.txt +++ b/nano/node/CMakeLists.txt @@ -178,6 +178,8 @@ add_library( state_block_signature_verification.cpp telemetry.hpp telemetry.cpp + transport/inproc.hpp + transport/inproc.cpp transport/tcp.hpp transport/tcp.cpp transport/transport.hpp diff --git a/nano/node/transport/inproc.cpp b/nano/node/transport/inproc.cpp new file mode 100644 index 00000000..eec7a5e6 --- /dev/null +++ b/nano/node/transport/inproc.cpp @@ -0,0 +1,120 @@ +#include +#include + +#include + +nano::transport::inproc::channel::channel (nano::node & node, nano::node & destination) : + transport::channel{ node }, + destination{ destination }, + endpoint{ node.network.endpoint () } +{ + set_node_id (node.node_id.pub); + set_network_version (node.network_params.network.protocol_version); +} + +std::size_t nano::transport::inproc::channel::hash_code () const +{ + std::hash<::nano::endpoint> hash; + return hash (endpoint); +} + +bool nano::transport::inproc::channel::operator== (nano::transport::channel const & other_a) const +{ + return endpoint == other_a.get_endpoint (); +} + +/** + * This function is called for every message received by the inproc channel. + * Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer + */ +class message_visitor_inbound : public nano::message_visitor +{ +public: + message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr channel) : + inbound{ inbound }, + channel{ channel } + { + } + + decltype (nano::network::inbound) & inbound; + + // the channel to reply to, if a reply is generated + std::shared_ptr channel; + + void keepalive (nano::keepalive const & message) + { + inbound (message, channel); + } + void publish (nano::publish const & message) + { + inbound (message, channel); + } + void confirm_req (nano::confirm_req const & message) + { + inbound (message, channel); + } + void confirm_ack (nano::confirm_ack const & message) + { + inbound (message, channel); + } + void bulk_pull (nano::bulk_pull const & message) + { + inbound (message, channel); + } + void bulk_pull_account (nano::bulk_pull_account const & message) + { + inbound (message, channel); + } + void bulk_push (nano::bulk_push const & message) + { + inbound (message, channel); + } + void frontier_req (nano::frontier_req const & message) + { + inbound (message, channel); + } + void node_id_handshake (nano::node_id_handshake const & message) + { + inbound (message, channel); + } + void telemetry_req (nano::telemetry_req const & message) + { + inbound (message, channel); + } + void telemetry_ack (nano::telemetry_ack const & message) + { + inbound (message, channel); + } +}; + +/** + * Send the buffer to the peer and call the callback function when done. The call never fails. + * Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background. + */ +void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) +{ + // we create a temporary channel for the reply path, in case the receiver of the message wants to reply + auto remote_channel = std::make_shared (destination, node); + + // create an inbound message visitor class to handle incoming messages because that's what the message parser expects + message_visitor_inbound visitor{ destination.network.inbound, remote_channel }; + + nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network }; + + // parse the message and action any work that needs to be done on that object via the visitor object + auto bytes = buffer_a.to_bytes (); + auto size = bytes.size (); + parser.deserialize_buffer (bytes.data (), size); + + if (callback_a) + { + node.background ([callback_a, size] () { + callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size); + }); + } +} + +std::string nano::transport::inproc::channel::to_string () const +{ + return boost::str (boost::format ("%1%") % endpoint); +} diff --git a/nano/node/transport/inproc.hpp b/nano/node/transport/inproc.hpp new file mode 100644 index 00000000..707f89e2 --- /dev/null +++ b/nano/node/transport/inproc.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include + +namespace nano +{ +namespace transport +{ + /** + * In-process transport channel. Mostly useful for unit tests + **/ + namespace inproc + { + class channel final : public nano::transport::channel + { + public: + explicit channel (nano::node & node, nano::node & destination); + std::size_t hash_code () const override; + bool operator== (nano::transport::channel const &) const override; + // TODO: investigate clang-tidy warning about default parameters on virtual/override functions + // + void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override; + std::string to_string () const override; + bool operator== (nano::transport::inproc::channel const & other_a) const + { + return endpoint == other_a.get_endpoint (); + } + + nano::endpoint get_endpoint () const override + { + return endpoint; + } + + nano::tcp_endpoint get_tcp_endpoint () const override + { + return nano::transport::map_endpoint_to_tcp (endpoint); + } + + nano::transport::transport_type get_type () const override + { + return nano::transport::transport_type::loopback; + } + + private: + nano::node & destination; + nano::endpoint const endpoint; + }; + } // namespace inproc +} // namespace transport +} // namespace nano diff --git a/nano/node/transport/transport.cpp b/nano/node/transport/transport.cpp index 1ca425db..beda88c8 100644 --- a/nano/node/transport/transport.cpp +++ b/nano/node/transport/transport.cpp @@ -1,5 +1,6 @@ #include #include +#include #include #include @@ -133,34 +134,6 @@ void nano::transport::channel::send (nano::message & message_a, std::function hash; - return hash (endpoint); -} - -bool nano::transport::channel_loopback::operator== (nano::transport::channel const & other_a) const -{ - return endpoint == other_a.get_endpoint (); -} - -void nano::transport::channel_loopback::send_buffer (nano::shared_const_buffer const & buffer_a, std::function const & callback_a, nano::buffer_drop_policy drop_policy_a) -{ - release_assert (false && "sending to a loopback channel is not supported"); -} - -std::string nano::transport::channel_loopback::to_string () const -{ - return boost::str (boost::format ("%1%") % endpoint); -} - boost::asio::ip::address_v6 nano::transport::mapped_from_v4_bytes (unsigned long address_a) { return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a)); diff --git a/nano/node/transport/transport.hpp b/nano/node/transport/transport.hpp index 95c56d2b..0f35c803 100644 --- a/nano/node/transport/transport.hpp +++ b/nano/node/transport/transport.hpp @@ -146,40 +146,6 @@ namespace transport protected: nano::node & node; }; - - class channel_loopback final : public nano::transport::channel - { - public: - explicit channel_loopback (nano::node &); - std::size_t hash_code () const override; - bool operator== (nano::transport::channel const &) const override; - // TODO: investigate clang-tidy warning about default parameters on virtual/override functions - // - void send_buffer (nano::shared_const_buffer const &, std::function const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override; - std::string to_string () const override; - bool operator== (nano::transport::channel_loopback const & other_a) const - { - return endpoint == other_a.get_endpoint (); - } - - nano::endpoint get_endpoint () const override - { - return endpoint; - } - - nano::tcp_endpoint get_tcp_endpoint () const override - { - return nano::transport::map_endpoint_to_tcp (endpoint); - } - - nano::transport::transport_type get_type () const override - { - return nano::transport::transport_type::loopback; - } - - private: - nano::endpoint const endpoint; - }; } // namespace transport } // namespace nano diff --git a/nano/node/voting.cpp b/nano/node/voting.cpp index 124879d5..b9ad78cf 100644 --- a/nano/node/voting.cpp +++ b/nano/node/voting.cpp @@ -3,6 +3,7 @@ #include #include #include +#include #include #include #include @@ -380,7 +381,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr const & { network.flood_vote_pr (vote_a); network.flood_vote (vote_a, 2.0f); - vote_processor.vote (vote_a, std::make_shared (network.node)); + vote_processor.vote (vote_a, std::make_shared (network.node, network.node)); } void nano::vote_generator::run () diff --git a/nano/slow_test/node.cpp b/nano/slow_test/node.cpp index 71019c1a..6e53968b 100644 --- a/nano/slow_test/node.cpp +++ b/nano/slow_test/node.cpp @@ -1,6 +1,7 @@ #include #include #include +#include #include #include #include @@ -467,7 +468,7 @@ TEST (store, vote_load) for (auto i (0); i < 1000000; ++i) { auto vote (std::make_shared (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, i, 0, block)); - node.vote_processor.vote (vote, std::make_shared (node)); + node.vote_processor.vote (vote, std::make_shared (node, node)); } }