Message sink interface (#3364)
This change is an application of dependency injection to remove coupling between the tcp_channels and udp_channels classes and the network class. An interface called message_sink has been created and an implementation that forwards network message matching existing behavior has been put it its place.
This commit is contained in:
parent
def99da543
commit
f594d0ba68
12 changed files with 72 additions and 60 deletions
|
@ -256,11 +256,11 @@ TEST (active_transactions, inactive_votes_cache_fork)
|
|||
node.vote_processor.vote (vote, std::make_shared<nano::transport::channel_loopback> (node));
|
||||
auto channel1 (node.network.udp_channels.create (node.network.endpoint ()));
|
||||
ASSERT_TIMELY (5s, node.active.inactive_votes_cache_size () == 1);
|
||||
node.network.process_message (nano::publish (send2), channel1);
|
||||
node.network.inbound (nano::publish (send2), channel1);
|
||||
node.block_processor.flush ();
|
||||
ASSERT_NE (nullptr, node.block (send2->hash ()));
|
||||
node.scheduler.flush (); // Start election, otherwise conflicting block won't be inserted into election
|
||||
node.network.process_message (nano::publish (send1), channel1);
|
||||
node.network.inbound (nano::publish (send1), channel1);
|
||||
node.block_processor.flush ();
|
||||
bool confirmed (false);
|
||||
system.deadline_set (5s);
|
||||
|
|
|
@ -662,19 +662,19 @@ TEST (confirmation_height, conflict_rollback_cemented)
|
|||
auto send2 (std::make_shared<nano::send_block> (genesis.hash (), key2.pub, nano::genesis_amount - 100, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
|
||||
nano::publish publish2 (send2);
|
||||
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
|
||||
node1->network.process_message (publish1, channel1);
|
||||
node1->network.inbound (publish1, channel1);
|
||||
node1->block_processor.flush ();
|
||||
node1->scheduler.flush ();
|
||||
auto channel2 (node2->network.udp_channels.create (node1->network.endpoint ()));
|
||||
node2->network.process_message (publish2, channel2);
|
||||
node2->network.inbound (publish2, channel2);
|
||||
node2->block_processor.flush ();
|
||||
node2->scheduler.flush ();
|
||||
ASSERT_EQ (1, node1->active.size ());
|
||||
ASSERT_EQ (1, node2->active.size ());
|
||||
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
|
||||
node1->network.process_message (publish2, channel1);
|
||||
node1->network.inbound (publish2, channel1);
|
||||
node1->block_processor.flush ();
|
||||
node2->network.process_message (publish1, channel2);
|
||||
node2->network.inbound (publish1, channel2);
|
||||
node2->block_processor.flush ();
|
||||
auto election (node2->active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
|
||||
ASSERT_NE (nullptr, election);
|
||||
|
|
|
@ -361,7 +361,7 @@ TEST (receivable_processor, confirm_insufficient_pos)
|
|||
nano::keypair key1;
|
||||
auto vote (std::make_shared<nano::vote> (key1.pub, key1.prv, 0, block1));
|
||||
nano::confirm_ack con1 (vote);
|
||||
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
}
|
||||
|
||||
TEST (receivable_processor, confirm_sufficient_pos)
|
||||
|
@ -375,7 +375,7 @@ TEST (receivable_processor, confirm_sufficient_pos)
|
|||
node1.scheduler.activate (nano::dev_genesis_key.pub, node1.store.tx_begin_read ());
|
||||
auto vote (std::make_shared<nano::vote> (nano::dev_genesis_key.pub, nano::dev_genesis_key.prv, 0, block1));
|
||||
nano::confirm_ack con1 (vote);
|
||||
node1.network.process_message (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.inbound (con1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
}
|
||||
|
||||
TEST (receivable_processor, send_with_receive)
|
||||
|
@ -989,10 +989,10 @@ TEST (network, duplicate_revert_publish)
|
|||
auto channel = nano::establish_tcp (system, *other_node, node.network.endpoint ());
|
||||
ASSERT_NE (nullptr, channel);
|
||||
ASSERT_EQ (0, publish.digest);
|
||||
node.network.process_message (publish, channel);
|
||||
node.network.inbound (publish, channel);
|
||||
ASSERT_TRUE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
|
||||
publish.digest = digest;
|
||||
node.network.process_message (publish, channel);
|
||||
node.network.inbound (publish, channel);
|
||||
ASSERT_FALSE (node.network.publish_filter.apply (bytes.data (), bytes.size ()));
|
||||
}
|
||||
|
||||
|
|
|
@ -363,7 +363,7 @@ TEST (node, receive_gap)
|
|||
.build_shared ();
|
||||
node1.work_generate_blocking (*block);
|
||||
nano::publish message (block);
|
||||
node1.network.process_message (message, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.inbound (message, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.block_processor.flush ();
|
||||
ASSERT_EQ (1, node1.gap_cache.size ());
|
||||
}
|
||||
|
@ -1210,19 +1210,19 @@ TEST (node, fork_flip)
|
|||
.build_shared ();
|
||||
nano::publish publish2 (send2);
|
||||
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.process_message (publish1, channel1);
|
||||
node1.network.inbound (publish1, channel1);
|
||||
node1.block_processor.flush ();
|
||||
node1.scheduler.flush ();
|
||||
auto channel2 (node2.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node2.network.process_message (publish2, channel2);
|
||||
node2.network.inbound (publish2, channel2);
|
||||
node2.block_processor.flush ();
|
||||
node2.scheduler.flush ();
|
||||
ASSERT_EQ (1, node1.active.size ());
|
||||
ASSERT_EQ (1, node2.active.size ());
|
||||
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
|
||||
node1.network.process_message (publish2, channel1);
|
||||
node1.network.inbound (publish2, channel1);
|
||||
node1.block_processor.flush ();
|
||||
node2.network.process_message (publish1, channel2);
|
||||
node2.network.inbound (publish1, channel2);
|
||||
node2.block_processor.flush ();
|
||||
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
|
||||
ASSERT_NE (nullptr, election1);
|
||||
|
@ -1285,9 +1285,9 @@ TEST (node, fork_multi_flip)
|
|||
.work (*system.work.generate (publish2.block->hash ()))
|
||||
.build_shared ();
|
||||
nano::publish publish3 (send3);
|
||||
node1.network.process_message (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node2.network.process_message (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node2.network.process_message (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node1.network.inbound (publish1, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node2.network.inbound (publish2, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node2.network.inbound (publish3, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node1.block_processor.flush ();
|
||||
node1.scheduler.flush ();
|
||||
node2.block_processor.flush ();
|
||||
|
@ -1295,10 +1295,10 @@ TEST (node, fork_multi_flip)
|
|||
ASSERT_EQ (1, node1.active.size ());
|
||||
ASSERT_EQ (1, node2.active.size ());
|
||||
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
|
||||
node1.network.process_message (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.process_message (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.inbound (publish2, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.inbound (publish3, node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.block_processor.flush ();
|
||||
node2.network.process_message (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node2.network.inbound (publish1, node2.network.udp_channels.create (node2.network.endpoint ()));
|
||||
node2.block_processor.flush ();
|
||||
auto election1 (node2.active.election (nano::qualified_root (genesis.hash (), genesis.hash ())));
|
||||
ASSERT_NE (nullptr, election1);
|
||||
|
@ -1380,7 +1380,7 @@ TEST (node, fork_open)
|
|||
.build_shared ();
|
||||
nano::publish publish1 (send1);
|
||||
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.process_message (publish1, channel1);
|
||||
node1.network.inbound (publish1, channel1);
|
||||
node1.block_processor.flush ();
|
||||
node1.scheduler.flush ();
|
||||
auto election = node1.active.election (publish1.block->qualified_root ());
|
||||
|
@ -1396,7 +1396,7 @@ TEST (node, fork_open)
|
|||
.work (*system.work.generate (key1.pub))
|
||||
.build_shared ();
|
||||
nano::publish publish2 (open1);
|
||||
node1.network.process_message (publish2, channel1);
|
||||
node1.network.inbound (publish2, channel1);
|
||||
node1.block_processor.flush ();
|
||||
node1.scheduler.flush ();
|
||||
ASSERT_EQ (1, node1.active.size ());
|
||||
|
@ -1409,7 +1409,7 @@ TEST (node, fork_open)
|
|||
.build_shared ();
|
||||
nano::publish publish3 (open2);
|
||||
system.wallet (0)->insert_adhoc (nano::dev_genesis_key.prv);
|
||||
node1.network.process_message (publish3, channel1);
|
||||
node1.network.inbound (publish3, channel1);
|
||||
node1.block_processor.flush ();
|
||||
node1.scheduler.flush ();
|
||||
election = node1.active.election (publish3.block->qualified_root ());
|
||||
|
@ -2714,14 +2714,14 @@ TEST (node, local_votes_cache)
|
|||
nano::confirm_req message1 (send1);
|
||||
nano::confirm_req message2 (send2);
|
||||
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
|
||||
node.network.process_message (message1, channel);
|
||||
node.network.inbound (message1, channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 1);
|
||||
node.network.process_message (message2, channel);
|
||||
node.network.inbound (message2, channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::requests, nano::stat::detail::requests_generated_votes) == 2);
|
||||
for (auto i (0); i < 100; ++i)
|
||||
{
|
||||
node.network.process_message (message1, channel);
|
||||
node.network.process_message (message2, channel);
|
||||
node.network.inbound (message1, channel);
|
||||
node.network.inbound (message2, channel);
|
||||
}
|
||||
for (int i = 0; i < 4; ++i)
|
||||
{
|
||||
|
@ -2737,7 +2737,7 @@ TEST (node, local_votes_cache)
|
|||
nano::confirm_req message3 (send3);
|
||||
for (auto i (0); i < 100; ++i)
|
||||
{
|
||||
node.network.process_message (message3, channel);
|
||||
node.network.inbound (message3, channel);
|
||||
}
|
||||
for (int i = 0; i < 4; ++i)
|
||||
{
|
||||
|
@ -2795,26 +2795,26 @@ TEST (node, local_votes_cache_batch)
|
|||
nano::confirm_req message (batch);
|
||||
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
|
||||
// Generates and sends one vote for both hashes which is then cached
|
||||
node.network.process_message (message, channel);
|
||||
node.network.inbound (message, channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 1);
|
||||
ASSERT_EQ (1, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
|
||||
ASSERT_FALSE (node.history.votes (send2->root (), send2->hash ()).empty ());
|
||||
ASSERT_FALSE (node.history.votes (receive1->root (), receive1->hash ()).empty ());
|
||||
// Only one confirm_ack should be sent if all hashes are part of the same vote
|
||||
node.network.process_message (message, channel);
|
||||
node.network.inbound (message, channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 2);
|
||||
ASSERT_EQ (2, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
|
||||
// Test when votes are different
|
||||
node.history.erase (send2->root ());
|
||||
node.history.erase (receive1->root ());
|
||||
node.network.process_message (nano::confirm_req (send2->hash (), send2->root ()), channel);
|
||||
node.network.inbound (nano::confirm_req (send2->hash (), send2->root ()), channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 3);
|
||||
ASSERT_EQ (3, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
|
||||
node.network.process_message (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
|
||||
node.network.inbound (nano::confirm_req (receive1->hash (), receive1->root ()), channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 4);
|
||||
ASSERT_EQ (4, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
|
||||
// There are two different votes, so both should be sent in response
|
||||
node.network.process_message (message, channel);
|
||||
node.network.inbound (message, channel);
|
||||
ASSERT_TIMELY (3s, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out) == 6);
|
||||
ASSERT_EQ (6, node.stats.count (nano::stat::type::message, nano::stat::detail::confirm_ack, nano::stat::dir::out));
|
||||
}
|
||||
|
@ -2830,7 +2830,7 @@ TEST (node, local_votes_cache_generate_new_vote)
|
|||
// Repsond with cached vote
|
||||
nano::confirm_req message1 (genesis.open);
|
||||
auto channel (node.network.udp_channels.create (node.network.endpoint ()));
|
||||
node.network.process_message (message1, channel);
|
||||
node.network.inbound (message1, channel);
|
||||
ASSERT_TIMELY (3s, !node.history.votes (genesis.open->root (), genesis.open->hash ()).empty ());
|
||||
auto votes1 (node.history.votes (genesis.open->root (), genesis.open->hash ()));
|
||||
ASSERT_EQ (1, votes1.size ());
|
||||
|
@ -2850,7 +2850,7 @@ TEST (node, local_votes_cache_generate_new_vote)
|
|||
// One of the hashes is cached
|
||||
std::vector<std::pair<nano::block_hash, nano::root>> roots_hashes{ std::make_pair (genesis.open->hash (), genesis.open->root ()), std::make_pair (send1->hash (), send1->root ()) };
|
||||
nano::confirm_req message2 (roots_hashes);
|
||||
node.network.process_message (message2, channel);
|
||||
node.network.inbound (message2, channel);
|
||||
ASSERT_TIMELY (3s, !node.history.votes (send1->root (), send1->hash ()).empty ());
|
||||
auto votes2 (node.history.votes (send1->root (), send1->hash ()));
|
||||
ASSERT_EQ (1, votes2.size ());
|
||||
|
@ -3259,13 +3259,13 @@ TEST (node, fork_election_invalid_block_signature)
|
|||
.sign (nano::dev_genesis_key.prv, 0) // Invalid signature
|
||||
.build_shared ();
|
||||
auto channel1 (node1.network.udp_channels.create (node1.network.endpoint ()));
|
||||
node1.network.process_message (nano::publish (send1), channel1);
|
||||
node1.network.inbound (nano::publish (send1), channel1);
|
||||
ASSERT_TIMELY (5s, node1.active.active (send1->qualified_root ()));
|
||||
auto election (node1.active.election (send1->qualified_root ()));
|
||||
ASSERT_NE (nullptr, election);
|
||||
ASSERT_EQ (1, election->blocks ().size ());
|
||||
node1.network.process_message (nano::publish (send3), channel1);
|
||||
node1.network.process_message (nano::publish (send2), channel1);
|
||||
node1.network.inbound (nano::publish (send3), channel1);
|
||||
node1.network.inbound (nano::publish (send2), channel1);
|
||||
ASSERT_TIMELY (3s, election->blocks ().size () > 1);
|
||||
ASSERT_EQ (election->blocks ()[send2->hash ()]->block_signature (), send2->block_signature ());
|
||||
}
|
||||
|
|
|
@ -285,7 +285,7 @@ TEST (telemetry, receive_from_non_listening_channel)
|
|||
nano::system system;
|
||||
auto node = system.add_node ();
|
||||
nano::telemetry_ack message (nano::telemetry_data{});
|
||||
node->network.process_message (message, node->network.udp_channels.create (node->network.endpoint ()));
|
||||
node->network.inbound (message, node->network.udp_channels.create (node->network.endpoint ()));
|
||||
// We have not sent a telemetry_req message to this endpoint, so shouldn't count telemetry_ack received from it.
|
||||
ASSERT_EQ (node->telemetry->telemetry_data_size (), 0);
|
||||
}
|
||||
|
@ -632,7 +632,7 @@ TEST (telemetry, remove_peer_invalid_signature)
|
|||
// Change anything so that the signed message is incorrect
|
||||
telemetry_data.block_count = 0;
|
||||
auto telemetry_ack = nano::telemetry_ack (telemetry_data);
|
||||
node->network.process_message (telemetry_ack, channel);
|
||||
node->network.inbound (telemetry_ack, channel);
|
||||
|
||||
ASSERT_TIMELY (10s, node->stats.count (nano::stat::type::telemetry, nano::stat::detail::invalid_signature) > 0);
|
||||
ASSERT_NO_ERROR (system.poll_until_true (3s, [&node, address = channel->get_endpoint ().address ()] () -> bool {
|
||||
|
|
|
@ -144,7 +144,7 @@ TEST (websocket, stopped_election)
|
|||
auto send1 (std::make_shared<nano::send_block> (genesis.hash (), key1.pub, 0, nano::dev_genesis_key.prv, nano::dev_genesis_key.pub, *system.work.generate (genesis.hash ())));
|
||||
nano::publish publish1 (send1);
|
||||
auto channel1 (node1->network.udp_channels.create (node1->network.endpoint ()));
|
||||
node1->network.process_message (publish1, channel1);
|
||||
node1->network.inbound (publish1, channel1);
|
||||
node1->block_processor.flush ();
|
||||
ASSERT_TIMELY (1s, node1->active.election (send1->qualified_root ()));
|
||||
node1->active.erase (*send1);
|
||||
|
|
|
@ -13,14 +13,15 @@
|
|||
|
||||
nano::network::network (nano::node & node_a, uint16_t port_a) :
|
||||
syn_cookies (node_a.network_params.node.max_peers_per_ip),
|
||||
inbound{ [this] (nano::message const & message, std::shared_ptr<nano::transport::channel> const & channel) { process_message (message, channel); } },
|
||||
buffer_container (node_a.stats, nano::network::buffer_size, 4096), // 2Mb receive buffer
|
||||
resolver (node_a.io_ctx),
|
||||
limiter (node_a.config.bandwidth_limit_burst_ratio, node_a.config.bandwidth_limit),
|
||||
tcp_message_manager (node_a.config.tcp_incoming_connections_max),
|
||||
node (node_a),
|
||||
publish_filter (256 * 1024),
|
||||
udp_channels (node_a, port_a),
|
||||
tcp_channels (node_a),
|
||||
udp_channels (node_a, port_a, inbound),
|
||||
tcp_channels (node_a, inbound),
|
||||
port (port_a),
|
||||
disconnect_observer ([] () {})
|
||||
{
|
||||
|
|
|
@ -154,7 +154,6 @@ public:
|
|||
void broadcast_confirm_req_many (std::deque<std::pair<std::shared_ptr<nano::block>, std::shared_ptr<std::vector<std::shared_ptr<nano::transport::channel>>>>>, std::function<void ()> = nullptr, unsigned = broadcast_interval_ms);
|
||||
std::shared_ptr<nano::transport::channel> find_node_id (nano::account const &);
|
||||
std::shared_ptr<nano::transport::channel> find_channel (nano::endpoint const &);
|
||||
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
|
||||
bool not_a_peer (nano::endpoint const &, bool);
|
||||
// Should we reach out to this endpoint with a keepalive message
|
||||
bool reachout (nano::endpoint const &, bool = false);
|
||||
|
@ -180,6 +179,12 @@ public:
|
|||
bool empty () const;
|
||||
void erase (nano::transport::channel const &);
|
||||
void set_bandwidth_params (double, size_t);
|
||||
|
||||
private:
|
||||
void process_message (nano::message const &, std::shared_ptr<nano::transport::channel> const &);
|
||||
|
||||
public:
|
||||
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> inbound;
|
||||
nano::message_buffer_manager buffer_container;
|
||||
boost::asio::ip::udp::resolver resolver;
|
||||
std::vector<boost::thread> packet_processing_threads;
|
||||
|
|
|
@ -110,8 +110,9 @@ void nano::transport::channel_tcp::set_endpoint ()
|
|||
}
|
||||
}
|
||||
|
||||
nano::transport::tcp_channels::tcp_channels (nano::node & node_a) :
|
||||
node (node_a)
|
||||
nano::transport::tcp_channels::tcp_channels (nano::node & node, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
|
||||
node{ node },
|
||||
sink{ sink }
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -296,14 +297,14 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
|
|||
auto channel (node.network.find_channel (nano::transport::map_tcp_to_endpoint (endpoint_a)));
|
||||
if (channel)
|
||||
{
|
||||
node.network.process_message (message_a, channel);
|
||||
sink (message_a, channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
channel = node.network.find_node_id (node_id_a);
|
||||
if (channel)
|
||||
{
|
||||
node.network.process_message (message_a, channel);
|
||||
sink (message_a, channel);
|
||||
}
|
||||
else if (!node.network.excluded_peers.check (endpoint_a))
|
||||
{
|
||||
|
@ -322,7 +323,7 @@ void nano::transport::tcp_channels::process_message (nano::message const & messa
|
|||
{
|
||||
insert (temporary_channel, socket_a, nullptr);
|
||||
}
|
||||
node.network.process_message (message_a, temporary_channel);
|
||||
sink (message_a, temporary_channel);
|
||||
}
|
||||
else
|
||||
{
|
||||
|
|
|
@ -76,7 +76,7 @@ namespace transport
|
|||
friend class telemetry_simultaneous_requests_Test;
|
||||
|
||||
public:
|
||||
tcp_channels (nano::node &);
|
||||
tcp_channels (nano::node &, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> = nullptr);
|
||||
bool insert (std::shared_ptr<nano::transport::channel_tcp> const &, std::shared_ptr<nano::socket> const &, std::shared_ptr<nano::bootstrap_server> const &);
|
||||
void erase (nano::tcp_endpoint const &);
|
||||
size_t size () const;
|
||||
|
@ -112,6 +112,7 @@ namespace transport
|
|||
nano::node & node;
|
||||
|
||||
private:
|
||||
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
|
||||
class endpoint_tag
|
||||
{
|
||||
};
|
||||
|
|
|
@ -61,9 +61,10 @@ std::string nano::transport::channel_udp::to_string () const
|
|||
return boost::str (boost::format ("%1%") % endpoint);
|
||||
}
|
||||
|
||||
nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a) :
|
||||
node (node_a),
|
||||
strand (node_a.io_ctx.get_executor ())
|
||||
nano::transport::udp_channels::udp_channels (nano::node & node_a, uint16_t port_a, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
|
||||
node{ node_a },
|
||||
strand{ node_a.io_ctx.get_executor () },
|
||||
sink{ sink }
|
||||
{
|
||||
if (!node.flags.disable_udp)
|
||||
{
|
||||
|
@ -364,9 +365,10 @@ namespace
|
|||
class udp_message_visitor : public nano::message_visitor
|
||||
{
|
||||
public:
|
||||
udp_message_visitor (nano::node & node_a, nano::endpoint const & endpoint_a) :
|
||||
node (node_a),
|
||||
endpoint (endpoint_a)
|
||||
udp_message_visitor (nano::node & node_a, nano::endpoint const & endpoint_a, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink) :
|
||||
node{ node_a },
|
||||
endpoint{ endpoint_a },
|
||||
sink{ sink }
|
||||
{
|
||||
}
|
||||
void keepalive (nano::keepalive const & message_a) override
|
||||
|
@ -512,11 +514,12 @@ public:
|
|||
node.network.udp_channels.modify (find_channel, [] (std::shared_ptr<nano::transport::channel_udp> const & channel_a) {
|
||||
channel_a->set_last_packet_received (std::chrono::steady_clock::now ());
|
||||
});
|
||||
node.network.process_message (message_a, find_channel);
|
||||
sink (message_a, find_channel);
|
||||
}
|
||||
}
|
||||
nano::node & node;
|
||||
nano::endpoint endpoint;
|
||||
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
|
||||
};
|
||||
}
|
||||
|
||||
|
@ -537,7 +540,7 @@ void nano::transport::udp_channels::receive_action (nano::message_buffer * data_
|
|||
}
|
||||
if (allowed_sender)
|
||||
{
|
||||
udp_message_visitor visitor (node, data_a->endpoint);
|
||||
udp_message_visitor visitor (node, data_a->endpoint, sink);
|
||||
nano::message_parser parser (node.network.publish_filter, node.block_uniquer, node.vote_uniquer, visitor, node.work);
|
||||
parser.deserialize_buffer (data_a->buffer, data_a->size);
|
||||
if (parser.status == nano::message_parser::parse_status::success)
|
||||
|
|
|
@ -75,7 +75,7 @@ namespace transport
|
|||
friend class nano::transport::channel_udp;
|
||||
|
||||
public:
|
||||
udp_channels (nano::node &, uint16_t);
|
||||
udp_channels (nano::node &, uint16_t, std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink);
|
||||
std::shared_ptr<nano::transport::channel_udp> insert (nano::endpoint const &, unsigned);
|
||||
void erase (nano::endpoint const &);
|
||||
size_t size () const;
|
||||
|
@ -106,6 +106,7 @@ namespace transport
|
|||
void list (std::deque<std::shared_ptr<nano::transport::channel>> &, uint8_t = 0);
|
||||
void modify (std::shared_ptr<nano::transport::channel_udp> const &, std::function<void (std::shared_ptr<nano::transport::channel_udp> const &)>);
|
||||
nano::node & node;
|
||||
std::function<void (nano::message const &, std::shared_ptr<nano::transport::channel> const &)> sink;
|
||||
|
||||
private:
|
||||
void close_socket ();
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue