Renaming nano::transport::channel_loopback to nano::transport::inproc::channel. (#3805)

* Renaming nano::transport::channel_loopback to nano::transport::inproc::channel.

The name 'inproc' better indicates it's implementation through memory transfer, in contrast to the name 'loopback' might indicate an association with a network stack.

* Adding shared_const_buffer::to_bytes which copies all bytes in the buffer to a vector.

* Call the callback function and add some comments

Co-authored-by: Dimitrios Siganos <dimitris@siganos.org>
This commit is contained in:
clemahieu 2022-05-04 21:54:45 +01:00 committed by GitHub
commit 920d73a337
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
16 changed files with 252 additions and 104 deletions

View file

@ -1,5 +1,6 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/election.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -219,7 +220,7 @@ TEST (active_transactions, inactive_votes_cache)
.work (*system.work.generate (latest))
.build_shared ();
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash> (1, send->hash ())));
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
node.process_active (send);
node.block_processor.flush ();
@ -241,7 +242,7 @@ TEST (active_transactions, inactive_votes_cache_non_final)
.work (*system.work.generate (latest))
.build_shared ();
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash> (1, send->hash ()))); // Non-final vote
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
node.process_active (send);
node.block_processor.flush ();
@ -278,7 +279,7 @@ TEST (active_transactions, inactive_votes_cache_fork)
.build_shared ();
auto const vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash> (1, send1->hash ()));
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
node.process_active (send2);
@ -326,7 +327,7 @@ TEST (active_transactions, inactive_votes_cache_existing_vote)
ASSERT_GT (node.weight (key.pub), node.minimum_principal_weight ());
// Insert vote
auto vote1 (std::make_shared<nano::vote> (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector<nano::block_hash> (1, send->hash ())));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, election->votes ().size () == 2);
ASSERT_EQ (1, node.stats.count (nano::stat::type::election, nano::stat::detail::vote_new));
auto last_vote1 (election->votes ()[key.pub]);
@ -390,9 +391,9 @@ TEST (active_transactions, DISABLED_inactive_votes_cache_multiple_votes)
node.block_processor.flush ();
// Process votes
auto vote1 (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, 0, std::vector<nano::block_hash> (1, send1->hash ())));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
auto vote2 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash> (1, send1->hash ())));
node.vote_processor.vote (vote2, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote2, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.find_inactive_votes_cache (send1->hash ()).voters.size () == 2);
ASSERT_EQ (1, node.active.inactive_votes_cache_size ());
node.scheduler.activate (nano::dev::genesis_key.pub, node.store.tx_begin_read ());
@ -471,18 +472,18 @@ TEST (active_transactions, inactive_votes_cache_election_start)
// Inactive votes
std::vector<nano::block_hash> hashes{ open1->hash (), open2->hash (), send4->hash () };
auto vote1 (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, 0, hashes));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 3);
ASSERT_TRUE (node.active.empty ());
ASSERT_EQ (1, node.ledger.cache.cemented_count);
// 2 votes are required to start election (dev network)
auto vote2 (std::make_shared<nano::vote> (key2.pub, key2.prv, 0, 0, hashes));
node.vote_processor.vote (vote2, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote2, std::make_shared<nano::transport::inproc::channel> (node, node));
// Only open1 & open2 blocks elections should start (send4 is missing previous block in ledger)
ASSERT_TIMELY (5s, 2 == node.active.size ());
// Confirm elections with weight quorum
auto vote0 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, hashes)); // Final vote for confirmation
node.vote_processor.vote (vote0, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote0, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, node.active.empty ());
ASSERT_TIMELY (5s, 5 == node.ledger.cache.cemented_count);
// A late block arrival also checks the inactive votes cache
@ -713,7 +714,7 @@ TEST (active_transactions, republish_winner)
auto election = node1.active.election (fork->qualified_root ());
ASSERT_NE (nullptr, election);
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash>{ fork->hash () });
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
node1.block_processor.flush ();
ASSERT_TIMELY (3s, election->confirmed ());
@ -835,7 +836,7 @@ TEST (active_transactions, fork_replacement_tally)
node1.process_active (open);
// Confirmation
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash>{ send->hash (), open->hash () }));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
}
node1.block_processor.flush ();
ASSERT_TIMELY (5s, node1.ledger.cache.cemented_count == 1 + 2 * reps_count);
@ -885,7 +886,7 @@ TEST (active_transactions, fork_replacement_tally)
.work (*system.work.generate (latest))
.build_shared ();
auto vote (std::make_shared<nano::vote> (keys[i].pub, keys[i].prv, 0, 0, std::vector<nano::block_hash>{ fork->hash () }));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
node1.process_active (fork);
}
@ -915,7 +916,7 @@ TEST (active_transactions, fork_replacement_tally)
// Process vote for correct block & replace existing lowest tally block
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash>{ send_last->hash () }));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.flush ();
node2.network.flood_block (send_last);
ASSERT_TIMELY (3s, node1.stats.count (nano::stat::type::message, nano::stat::detail::publish, nano::stat::dir::in) > 1);

View file

@ -2,6 +2,7 @@
#include <nano/lib/threading.hpp>
#include <nano/node/election.hpp>
#include <nano/node/rocksdb/rocksdb.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -647,10 +648,10 @@ TEST (votes, check_signature)
ASSERT_EQ (1, election1->votes ().size ());
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 1, 0, send1));
vote1->signature.bytes[0] ^= 1;
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_loopback> (node1)));
ASSERT_EQ (nano::vote_code::invalid, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::inproc::channel> (node1, node1)));
vote1->signature.bytes[0] ^= 1;
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_loopback> (node1)));
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::channel_loopback> (node1)));
ASSERT_EQ (nano::vote_code::vote, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::inproc::channel> (node1, node1)));
ASSERT_EQ (nano::vote_code::replay, node1.vote_processor.vote_blocking (vote1, std::make_shared<nano::transport::inproc::channel> (node1, node1)));
}
TEST (votes, add_one)
@ -785,7 +786,7 @@ TEST (votes, add_old)
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 2, 0, send1));
auto channel (std::make_shared<nano::transport::channel_loopback> (node1));
auto channel (std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.vote_blocking (vote1, channel);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (nano::dev::genesis->hash (), key2.pub, 0, nano::dev::genesis_key.prv, nano::dev::genesis_key.pub, 0));
@ -827,7 +828,7 @@ TEST (votes, DISABLED_add_old_different_account)
ASSERT_EQ (1, election1->votes ().size ());
ASSERT_EQ (1, election2->votes ().size ());
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 2, 0, send1));
auto channel (std::make_shared<nano::transport::channel_loopback> (node1));
auto channel (std::make_shared<nano::transport::inproc::channel> (node1, node1));
auto vote_result1 (node1.vote_processor.vote_blocking (vote1, channel));
ASSERT_EQ (nano::vote_code::vote, vote_result1);
ASSERT_EQ (2, election1->votes ().size ());
@ -861,7 +862,7 @@ TEST (votes, add_cooldown)
node1.scheduler.flush ();
auto election1 = node1.active.election (send1->qualified_root ());
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * 1, 0, send1));
auto channel (std::make_shared<nano::transport::channel_loopback> (node1));
auto channel (std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.vote_processor.vote_blocking (vote1, channel);
nano::keypair key2;
auto send2 (std::make_shared<nano::send_block> (nano::dev::genesis->hash (), key2.pub, 0, nano::dev::genesis_key.prv, nano::dev::genesis_key.pub, 0));

View file

@ -1,4 +1,5 @@
#include <nano/node/nodeconfig.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
@ -1256,14 +1257,14 @@ TEST (network, loopback_channel)
nano::system system (2);
auto & node1 = *system.nodes[0];
auto & node2 = *system.nodes[1];
nano::transport::channel_loopback channel1 (node1);
nano::transport::inproc::channel channel1 (node1, node1);
ASSERT_EQ (channel1.get_type (), nano::transport::transport_type::loopback);
ASSERT_EQ (channel1.get_endpoint (), node1.network.endpoint ());
ASSERT_EQ (channel1.get_tcp_endpoint (), nano::transport::map_endpoint_to_tcp (node1.network.endpoint ()));
ASSERT_EQ (channel1.get_network_version (), node1.network_params.network.protocol_version);
ASSERT_EQ (channel1.get_node_id (), node1.node_id.pub);
ASSERT_EQ (channel1.get_node_id_optional ().value_or (0), node1.node_id.pub);
nano::transport::channel_loopback channel2 (node2);
nano::transport::inproc::channel channel2 (node2, node2);
ASSERT_TRUE (channel1 == channel1);
ASSERT_FALSE (channel1 == channel2);
++node1.network.port;
@ -1277,10 +1278,10 @@ TEST (network, filter)
auto & node1 = *system.nodes[0];
nano::keepalive keepalive{ nano::dev::network_params.network };
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::nano_dev_network;
node1.network.inbound (keepalive, std::make_shared<nano::transport::channel_loopback> (node1));
node1.network.inbound (keepalive, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_EQ (0, node1.stats.count (nano::stat::type::message, nano::stat::detail::invalid_network));
const_cast<nano::networks &> (keepalive.header.network) = nano::networks::invalid;
node1.network.inbound (keepalive, std::make_shared<nano::transport::channel_loopback> (node1));
node1.network.inbound (keepalive, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_EQ (1, node1.stats.count (nano::stat::type::message, nano::stat::detail::invalid_network));
}

View file

@ -1,4 +1,5 @@
#include <nano/node/election.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
@ -2125,14 +2126,14 @@ TEST (node, online_reps_rep_crawler)
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
ASSERT_EQ (0, node1.online_reps.online ());
// Without rep crawler
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_EQ (0, node1.online_reps.online ());
// After inserting to rep crawler
{
nano::lock_guard<nano::mutex> guard (node1.rep_crawler.probable_reps_mutex);
node1.rep_crawler.active.insert (nano::dev::genesis->hash ());
}
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_EQ (nano::dev::constants.genesis_amount, node1.online_reps.online ());
}
}
@ -2162,7 +2163,7 @@ TEST (node, online_reps_election)
// Process vote for ongoing election
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::milliseconds_since_epoch (), 0, std::vector<nano::block_hash>{ send1->hash () });
ASSERT_EQ (0, node1.online_reps.online ());
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_EQ (nano::dev::constants.genesis_amount - nano::Gxrb_ratio, node1.online_reps.online ());
}
@ -2517,7 +2518,7 @@ TEST (node, local_votes_cache_fork)
ASSERT_EQ (nano::process_result::progress, node1.process (*send1).code);
// Cache vote
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector<nano::block_hash> (1, send1->hash ())));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
node1.history.add (send1->root (), send1->hash (), vote);
auto votes2 (node1.history.votes (send1->root (), send1->hash ()));
ASSERT_EQ (1, votes2.size ());
@ -2559,7 +2560,7 @@ TEST (node, vote_republish)
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, send2));
ASSERT_TRUE (node1.active.active (*send1));
ASSERT_TIMELY (10s, node2.active.active (*send1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_TIMELY (10s, node1.block (send2->hash ()));
ASSERT_TIMELY (10s, node2.block (send2->hash ()));
ASSERT_FALSE (node1.block (send1->hash ()));
@ -2654,7 +2655,7 @@ TEST (node, vote_by_hash_republish)
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, vote_blocks); // Final vote for confirmation
ASSERT_TRUE (node1.active.active (*send1));
ASSERT_TRUE (node2.active.active (*send1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_TIMELY (10s, node1.block (send2->hash ()));
ASSERT_TIMELY (10s, node2.block (send2->hash ()));
ASSERT_FALSE (node1.block (send1->hash ()));
@ -2697,7 +2698,7 @@ TEST (node, DISABLED_vote_by_hash_epoch_block_republish)
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, vote_blocks));
ASSERT_TRUE (node1.active.active (*send1));
ASSERT_TRUE (node2.active.active (*send1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node1));
node1.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node1, node1));
ASSERT_TIMELY (10s, node1.block (epoch1->hash ()));
ASSERT_TIMELY (10s, node2.block (epoch1->hash ()));
ASSERT_FALSE (node1.block (send1->hash ()));
@ -3176,7 +3177,7 @@ TEST (node, confirm_back)
std::vector<nano::block_hash> vote_blocks;
vote_blocks.push_back (send2->hash ());
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, vote_blocks));
node.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote_blocking (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (10s, node.active.empty ());
}
@ -3711,7 +3712,7 @@ TEST (node, rollback_gap_source)
ASSERT_EQ (1, election->votes ().size ());
// Confirm open
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash> (1, open->hash ())));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, election->votes ().size () == 2);
ASSERT_TIMELY (3s, election->confirmed ());
}
@ -3738,7 +3739,7 @@ TEST (node, rollback_gap_source)
ASSERT_EQ (2, election->blocks ().size ());
// Confirm open (again)
auto vote1 (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_max, nano::vote::duration_max, std::vector<nano::block_hash> (1, open->hash ())));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote1, std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TIMELY (5s, election->votes ().size () == 2);
}
// Wait for new rollback
@ -4280,7 +4281,7 @@ TEST (rep_crawler, local)
nano::node_flags flags;
flags.disable_rep_crawler = true;
auto & node = *system.add_node (flags);
auto loopback = std::make_shared<nano::transport::channel_loopback> (node);
auto loopback = std::make_shared<nano::transport::inproc::channel> (node, node);
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, 0, 0, std::vector{ nano::dev::genesis->hash () });
{
nano::lock_guard<nano::mutex> guard (node.rep_crawler.probable_reps_mutex);

View file

@ -1,3 +1,4 @@
#include <nano/node/transport/inproc.hpp>
#include <nano/test_common/network.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -183,3 +184,19 @@ TEST (system, rep_initialize_many)
ASSERT_EQ ((nano::dev::constants.genesis_amount - nano::Gxrb_ratio) / 2, node1->balance (key0.pub));
ASSERT_EQ ((nano::dev::constants.genesis_amount - nano::Gxrb_ratio) / 2, node1->balance (key1.pub));
}
TEST (system, transport_basic)
{
nano::system system{ 1 };
auto & node0 = *system.nodes[0];
// Start nodes in separate systems so they don't automatically connect with each other.
nano::system system1{ 1 };
auto & node1 = *system1.nodes[0];
ASSERT_EQ (0, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in));
nano::transport::inproc::channel channel{ node0, node1 };
// Send a keepalive message since they are easy to construct
nano::keepalive junk{ nano::dev::network_params.network };
channel.send (junk);
// Ensure the keepalive has been reecived on the target.
ASSERT_TIMELY (5s, node1.stats.count (nano::stat::type::message, nano::stat::detail::keepalive, nano::stat::dir::in) > 0);
}

View file

@ -1,4 +1,5 @@
#include <nano/lib/jsonconfig.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/test_common/system.hpp>
#include <nano/test_common/testutil.hpp>
@ -15,7 +16,7 @@ TEST (vote_processor, codes)
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () }));
auto vote_invalid = std::make_shared<nano::vote> (*vote);
vote_invalid->signature.bytes[0] ^= 1;
auto channel (std::make_shared<nano::transport::channel_loopback> (node));
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
// Invalid signature
ASSERT_EQ (nano::vote_code::invalid, node.vote_processor.vote_blocking (vote_invalid, channel, false));
@ -46,7 +47,7 @@ TEST (vote_processor, flush)
{
nano::system system (1);
auto & node (*system.nodes[0]);
auto channel (std::make_shared<nano::transport::channel_loopback> (node));
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
for (unsigned i = 0; i < 2000; ++i)
{
auto vote = std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, nano::vote::timestamp_min * (1 + i), 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
@ -64,7 +65,7 @@ TEST (vote_processor, invalid_signature)
auto vote = std::make_shared<nano::vote> (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () });
auto vote_invalid = std::make_shared<nano::vote> (*vote);
vote_invalid->signature.bytes[0] ^= 1;
auto channel = std::make_shared<nano::transport::channel_loopback> (node);
auto channel = std::make_shared<nano::transport::inproc::channel> (node, node);
node.block_confirm (nano::dev::genesis);
auto election = node.active.election (nano::dev::genesis->qualified_root ());
@ -86,7 +87,7 @@ TEST (vote_processor, no_capacity)
auto & node (*system.add_node (node_flags));
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () }));
auto channel (std::make_shared<nano::transport::channel_loopback> (node));
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
ASSERT_TRUE (node.vote_processor.vote (vote, channel));
}
@ -98,7 +99,7 @@ TEST (vote_processor, overflow)
auto & node (*system.add_node (node_flags));
nano::keypair key;
auto vote (std::make_shared<nano::vote> (key.pub, key.prv, nano::vote::timestamp_min * 1, 0, std::vector<nano::block_hash>{ nano::dev::genesis->hash () }));
auto channel (std::make_shared<nano::transport::channel_loopback> (node));
auto channel (std::make_shared<nano::transport::inproc::channel> (node, node));
// No way to lock the processor, but queueing votes in quick succession must result in overflow
size_t not_processed{ 0 };

View file

@ -43,3 +43,14 @@ std::size_t nano::shared_const_buffer::size () const
{
return m_buffer.size ();
}
std::vector<uint8_t> nano::shared_const_buffer::to_bytes () const
{
std::vector<uint8_t> bytes;
for (auto const & buffer : *this)
{
bytes.resize (bytes.size () + buffer.size ());
std::copy ((uint8_t const *)buffer.data (), (uint8_t const *)buffer.data () + buffer.size (), bytes.data () + bytes.size () - buffer.size ());
}
return bytes;
}

View file

@ -20,6 +20,7 @@ public:
boost::asio::const_buffer const * end () const;
std::size_t size () const;
std::vector<uint8_t> to_bytes () const;
private:
std::shared_ptr<std::vector<uint8_t>> m_data;

View file

@ -7,6 +7,7 @@
#include <nano/node/ipc/ipc_server.hpp>
#include <nano/node/json_handler.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <boost/dll/runtime_symbol_info.hpp>
#include <boost/filesystem/operations.hpp>
@ -1108,7 +1109,7 @@ int main (int argc, char * const * argv)
while (!votes.empty ())
{
auto vote (votes.front ());
auto channel (std::make_shared<nano::transport::channel_loopback> (*node));
auto channel (std::make_shared<nano::transport::inproc::channel> (*node, *node));
node->vote_processor.vote (vote, channel);
votes.pop_front ();
}

View file

@ -178,6 +178,8 @@ add_library(
state_block_signature_verification.cpp
telemetry.hpp
telemetry.cpp
transport/inproc.hpp
transport/inproc.cpp
transport/tcp.hpp
transport/tcp.cpp
transport/transport.hpp

View file

@ -0,0 +1,120 @@
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <boost/format.hpp>
nano::transport::inproc::channel::channel (nano::node & node, nano::node & destination) :
transport::channel{ node },
destination{ destination },
endpoint{ node.network.endpoint () }
{
set_node_id (node.node_id.pub);
set_network_version (node.network_params.network.protocol_version);
}
std::size_t nano::transport::inproc::channel::hash_code () const
{
std::hash<::nano::endpoint> hash;
return hash (endpoint);
}
bool nano::transport::inproc::channel::operator== (nano::transport::channel const & other_a) const
{
return endpoint == other_a.get_endpoint ();
}
/**
* This function is called for every message received by the inproc channel.
* Note that it is called from inside the context of nano::transport::inproc::channel::send_buffer
*/
class message_visitor_inbound : public nano::message_visitor
{
public:
message_visitor_inbound (decltype (nano::network::inbound) & inbound, std::shared_ptr<nano::transport::inproc::channel> channel) :
inbound{ inbound },
channel{ channel }
{
}
decltype (nano::network::inbound) & inbound;
// the channel to reply to, if a reply is generated
std::shared_ptr<nano::transport::inproc::channel> channel;
void keepalive (nano::keepalive const & message)
{
inbound (message, channel);
}
void publish (nano::publish const & message)
{
inbound (message, channel);
}
void confirm_req (nano::confirm_req const & message)
{
inbound (message, channel);
}
void confirm_ack (nano::confirm_ack const & message)
{
inbound (message, channel);
}
void bulk_pull (nano::bulk_pull const & message)
{
inbound (message, channel);
}
void bulk_pull_account (nano::bulk_pull_account const & message)
{
inbound (message, channel);
}
void bulk_push (nano::bulk_push const & message)
{
inbound (message, channel);
}
void frontier_req (nano::frontier_req const & message)
{
inbound (message, channel);
}
void node_id_handshake (nano::node_id_handshake const & message)
{
inbound (message, channel);
}
void telemetry_req (nano::telemetry_req const & message)
{
inbound (message, channel);
}
void telemetry_ack (nano::telemetry_ack const & message)
{
inbound (message, channel);
}
};
/**
* Send the buffer to the peer and call the callback function when done. The call never fails.
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
*/
void nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
{
// we create a temporary channel for the reply path, in case the receiver of the message wants to reply
auto remote_channel = std::make_shared<nano::transport::inproc::channel> (destination, node);
// create an inbound message visitor class to handle incoming messages because that's what the message parser expects
message_visitor_inbound visitor{ destination.network.inbound, remote_channel };
nano::message_parser parser{ destination.network.publish_filter, destination.block_uniquer, destination.vote_uniquer, visitor, destination.work, destination.network_params.network };
// parse the message and action any work that needs to be done on that object via the visitor object
auto bytes = buffer_a.to_bytes ();
auto size = bytes.size ();
parser.deserialize_buffer (bytes.data (), size);
if (callback_a)
{
node.background ([callback_a, size] () {
callback_a (boost::system::errc::make_error_code (boost::system::errc::success), size);
});
}
}
std::string nano::transport::inproc::channel::to_string () const
{
return boost::str (boost::format ("%1%") % endpoint);
}

View file

@ -0,0 +1,50 @@
#pragma once
#include <nano/node/transport/transport.hpp>
namespace nano
{
namespace transport
{
/**
* In-process transport channel. Mostly useful for unit tests
**/
namespace inproc
{
class channel final : public nano::transport::channel
{
public:
explicit channel (nano::node & node, nano::node & destination);
std::size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::string to_string () const override;
bool operator== (nano::transport::inproc::channel const & other_a) const
{
return endpoint == other_a.get_endpoint ();
}
nano::endpoint get_endpoint () const override
{
return endpoint;
}
nano::tcp_endpoint get_tcp_endpoint () const override
{
return nano::transport::map_endpoint_to_tcp (endpoint);
}
nano::transport::transport_type get_type () const override
{
return nano::transport::transport_type::loopback;
}
private:
nano::node & destination;
nano::endpoint const endpoint;
};
} // namespace inproc
} // namespace transport
} // namespace nano

View file

@ -1,5 +1,6 @@
#include <nano/node/common.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/transport.hpp>
#include <boost/asio/ip/address.hpp>
@ -133,34 +134,6 @@ void nano::transport::channel::send (nano::message & message_a, std::function<vo
}
}
nano::transport::channel_loopback::channel_loopback (nano::node & node_a) :
channel (node_a), endpoint (node_a.network.endpoint ())
{
set_node_id (node_a.node_id.pub);
set_network_version (node_a.network_params.network.protocol_version);
}
std::size_t nano::transport::channel_loopback::hash_code () const
{
std::hash<::nano::endpoint> hash;
return hash (endpoint);
}
bool nano::transport::channel_loopback::operator== (nano::transport::channel const & other_a) const
{
return endpoint == other_a.get_endpoint ();
}
void nano::transport::channel_loopback::send_buffer (nano::shared_const_buffer const & buffer_a, std::function<void (boost::system::error_code const &, std::size_t)> const & callback_a, nano::buffer_drop_policy drop_policy_a)
{
release_assert (false && "sending to a loopback channel is not supported");
}
std::string nano::transport::channel_loopback::to_string () const
{
return boost::str (boost::format ("%1%") % endpoint);
}
boost::asio::ip::address_v6 nano::transport::mapped_from_v4_bytes (unsigned long address_a)
{
return boost::asio::ip::address_v6::v4_mapped (boost::asio::ip::address_v4 (address_a));

View file

@ -146,40 +146,6 @@ namespace transport
protected:
nano::node & node;
};
class channel_loopback final : public nano::transport::channel
{
public:
explicit channel_loopback (nano::node &);
std::size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
// TODO: investigate clang-tidy warning about default parameters on virtual/override functions
//
void send_buffer (nano::shared_const_buffer const &, std::function<void (boost::system::error_code const &, std::size_t)> const & = nullptr, nano::buffer_drop_policy = nano::buffer_drop_policy::limiter) override;
std::string to_string () const override;
bool operator== (nano::transport::channel_loopback const & other_a) const
{
return endpoint == other_a.get_endpoint ();
}
nano::endpoint get_endpoint () const override
{
return endpoint;
}
nano::tcp_endpoint get_tcp_endpoint () const override
{
return nano::transport::map_endpoint_to_tcp (endpoint);
}
nano::transport::transport_type get_type () const override
{
return nano::transport::transport_type::loopback;
}
private:
nano::endpoint const endpoint;
};
} // namespace transport
} // namespace nano

View file

@ -3,6 +3,7 @@
#include <nano/lib/utility.hpp>
#include <nano/node/network.hpp>
#include <nano/node/nodeconfig.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/vote_processor.hpp>
#include <nano/node/voting.hpp>
#include <nano/node/wallet.hpp>
@ -380,7 +381,7 @@ void nano::vote_generator::broadcast_action (std::shared_ptr<nano::vote> const &
{
network.flood_vote_pr (vote_a);
network.flood_vote (vote_a, 2.0f);
vote_processor.vote (vote_a, std::make_shared<nano::transport::channel_loopback> (network.node));
vote_processor.vote (vote_a, std::make_shared<nano::transport::inproc::channel> (network.node, network.node));
}
void nano::vote_generator::run ()

View file

@ -1,6 +1,7 @@
#include <nano/crypto_lib/random_pool.hpp>
#include <nano/lib/threading.hpp>
#include <nano/node/election.hpp>
#include <nano/node/transport/inproc.hpp>
#include <nano/node/transport/udp.hpp>
#include <nano/node/unchecked_map.hpp>
#include <nano/test_common/network.hpp>
@ -467,7 +468,7 @@ TEST (store, vote_load)
for (auto i (0); i < 1000000; ++i)
{
auto vote (std::make_shared<nano::vote> (nano::dev::genesis_key.pub, nano::dev::genesis_key.prv, i, 0, block));
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
node.vote_processor.vote (vote, std::make_shared<nano::transport::inproc::channel> (node, node));
}
}