TCP socket rewrite with strand and queueing support (#1938)

* Strand and queuing support in tcp socket

* Check callback validity in async_write, do sync close in tests to avoid address-reuse issue

* Address review items; tests, checking stats

* Remove unrelated websocket changes

* Reintroduce atomic ticket system, but support concurrent writers. Close from destructor, check strand-execution where possible, and fix a test with too low deadline.

* Don't start io operations if closed

* Don't pass bool to join(), use unsigned instead of size_t and return error instead of success in counted_completion
This commit is contained in:
cryptocode 2019-05-14 12:54:40 +02:00 committed by clemahieu
commit 381fbcd769
30 changed files with 885 additions and 329 deletions

View file

@ -18,6 +18,7 @@ add_executable (core_test
processor_service.cpp
peer_container.cpp
signing.cpp
socket.cpp
timer.cpp
uint256_union.cpp
versioning.cpp

View file

@ -15,6 +15,7 @@ TEST (logging, serialization)
logging1.ledger_logging_value = !logging1.ledger_logging_value;
logging1.ledger_duplicate_logging_value = !logging1.ledger_duplicate_logging_value;
logging1.network_logging_value = !logging1.network_logging_value;
logging1.network_timeout_logging_value = !logging1.network_timeout_logging_value;
logging1.network_message_logging_value = !logging1.network_message_logging_value;
logging1.network_publish_logging_value = !logging1.network_publish_logging_value;
logging1.network_packet_logging_value = !logging1.network_packet_logging_value;
@ -37,6 +38,7 @@ TEST (logging, serialization)
ASSERT_EQ (logging1.ledger_logging_value, logging2.ledger_logging_value);
ASSERT_EQ (logging1.ledger_duplicate_logging_value, logging2.ledger_duplicate_logging_value);
ASSERT_EQ (logging1.network_logging_value, logging2.network_logging_value);
ASSERT_EQ (logging1.network_timeout_logging_value, logging2.network_timeout_logging_value);
ASSERT_EQ (logging1.network_message_logging_value, logging2.network_message_logging_value);
ASSERT_EQ (logging1.network_publish_logging_value, logging2.network_publish_logging_value);
ASSERT_EQ (logging1.network_packet_logging_value, logging2.network_packet_logging_value);
@ -81,11 +83,13 @@ TEST (logging, upgrade_v6_v7)
logging1.serialize_json (tree);
tree.erase ("version");
tree.erase ("min_time_between_output");
tree.erase ("network_timeout_logging_value");
bool upgraded (false);
ASSERT_FALSE (logging2.deserialize_json (upgraded, tree));
ASSERT_TRUE (upgraded);
ASSERT_LE (7, tree.get<int> ("version"));
ASSERT_EQ (5, tree.get<uintmax_t> ("min_time_between_output"));
ASSERT_EQ (false, tree.get<bool> ("network_timeout_logging_value"));
}
namespace

View file

@ -1149,7 +1149,12 @@ TEST (network, endpoint_bad_fd)
system.nodes[0]->stop ();
auto endpoint (system.nodes[0]->network.endpoint ());
ASSERT_TRUE (endpoint.address ().is_loopback ());
ASSERT_EQ (0, endpoint.port ());
// The endpoint is invalidated asynchronously
system.deadline_set (10s);
while (system.nodes[0]->network.endpoint ().port () != 0)
{
ASSERT_NO_ERROR (system.poll ());
}
}
TEST (network, reserved_address)
@ -1384,14 +1389,22 @@ TEST (bootstrap, keepalive)
auto socket (std::make_shared<nano::socket> (system.nodes[0]));
nano::keepalive keepalive;
auto input (keepalive.to_bytes ());
socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket](boost::system::error_code const & ec) {
std::atomic<bool> write_done (false);
socket->async_connect (system.nodes[0]->bootstrap.endpoint (), [&input, socket, &write_done](boost::system::error_code const & ec) {
ASSERT_FALSE (ec);
socket->async_write (input, [&input](boost::system::error_code const & ec, size_t size_a) {
socket->async_write (input, [&input, &write_done](boost::system::error_code const & ec, size_t size_a) {
ASSERT_FALSE (ec);
ASSERT_EQ (input->size (), size_a);
write_done = true;
});
});
system.deadline_set (std::chrono::seconds (5));
while (!write_done)
{
ASSERT_NO_ERROR (system.poll ());
}
auto output (keepalive.to_bytes ());
std::atomic<bool> done (false);
socket->async_read (output, output->size (), [&output, &done](boost::system::error_code const & ec, size_t size_a) {
@ -1942,7 +1955,7 @@ TEST (bootstrap, tcp_listener_timeout_empty)
{
nano::system system (24000, 1);
auto node0 (system.nodes[0]);
node0->config.tcp_server_timeout = std::chrono::seconds (1);
node0->config.tcp_idle_timeout = std::chrono::seconds (1);
auto socket (std::make_shared<nano::socket> (node0));
std::atomic<bool> connected (false);
socket->async_connect (node0->bootstrap.endpoint (), [&connected](boost::system::error_code const & ec) {
@ -1955,7 +1968,7 @@ TEST (bootstrap, tcp_listener_timeout_empty)
ASSERT_NO_ERROR (system.poll ());
}
bool disconnected (false);
system.deadline_set (std::chrono::seconds (5));
system.deadline_set (std::chrono::seconds (6));
while (!disconnected)
{
{
@ -1970,7 +1983,7 @@ TEST (bootstrap, tcp_listener_timeout_keepalive)
{
nano::system system (24000, 1);
auto node0 (system.nodes[0]);
node0->config.tcp_server_timeout = std::chrono::seconds (1);
node0->config.tcp_idle_timeout = std::chrono::seconds (1);
auto socket (std::make_shared<nano::socket> (node0));
nano::keepalive keepalive;
auto input (keepalive.to_bytes ());
@ -1991,7 +2004,7 @@ TEST (bootstrap, tcp_listener_timeout_keepalive)
ASSERT_EQ (node0->bootstrap.connections.size (), 1);
}
bool disconnected (false);
system.deadline_set (std::chrono::seconds (5));
system.deadline_set (std::chrono::seconds (10));
while (!disconnected)
{
{

View file

@ -441,9 +441,10 @@ TEST (node, connect_after_junk)
nano::system system (24000, 1);
nano::node_init init1;
auto node1 (std::make_shared<nano::node> (init1, system.io_ctx, 24001, nano::unique_path (), system.alarm, system.logging, system.work));
uint64_t junk (0);
auto junk_buffer (std::make_shared<std::vector<uint8_t>> ());
junk_buffer->push_back (0);
nano::transport::channel_udp channel1 (node1->network.udp_channels, system.nodes[0]->network.endpoint ());
channel1.send_buffer_raw (boost::asio::buffer (&junk, sizeof (junk)), [](boost::system::error_code const &, size_t) {});
channel1.send_buffer (junk_buffer, nano::stat::detail::bulk_pull, [](boost::system::error_code const &, size_t) {});
system.deadline_set (10s);
while (system.nodes[0]->stats.count (nano::stat::type::error) == 0)
{
@ -697,8 +698,8 @@ TEST (node_config, v16_v17_upgrade)
nano::node_config config;
config.logging.init (path);
// These config options should not be present
ASSERT_FALSE (tree.get_optional_child ("tcp_client_timeout"));
ASSERT_FALSE (tree.get_optional_child ("tcp_server_timeout"));
ASSERT_FALSE (tree.get_optional_child ("tcp_io_timeout"));
ASSERT_FALSE (tree.get_optional_child ("tcp_idle_timeout"));
ASSERT_FALSE (tree.get_optional_child ("pow_sleep_interval"));
ASSERT_FALSE (tree.get_optional_child ("external_address"));
ASSERT_FALSE (tree.get_optional_child ("external_port"));
@ -706,8 +707,8 @@ TEST (node_config, v16_v17_upgrade)
config.deserialize_json (upgraded, tree);
// The config options should be added after the upgrade
ASSERT_TRUE (!!tree.get_optional_child ("tcp_client_timeout"));
ASSERT_TRUE (!!tree.get_optional_child ("tcp_server_timeout"));
ASSERT_TRUE (!!tree.get_optional_child ("tcp_io_timeout"));
ASSERT_TRUE (!!tree.get_optional_child ("tcp_idle_timeout"));
ASSERT_TRUE (!!tree.get_optional_child ("pow_sleep_interval"));
ASSERT_TRUE (!!tree.get_optional_child ("external_address"));
ASSERT_TRUE (!!tree.get_optional_child ("external_port"));
@ -732,8 +733,8 @@ TEST (node_config, v17_values)
// Check config is correct
{
tree.put ("tcp_client_timeout", 1);
tree.put ("tcp_server_timeout", 0);
tree.put ("tcp_io_timeout", 1);
tree.put ("tcp_idle_timeout", 0);
tree.put ("pow_sleep_interval", 0);
tree.put ("external_address", "::1");
tree.put ("external_port", 0);
@ -749,8 +750,8 @@ TEST (node_config, v17_values)
config.deserialize_json (upgraded, tree);
ASSERT_FALSE (upgraded);
ASSERT_EQ (config.tcp_client_timeout.count (), 1);
ASSERT_EQ (config.tcp_server_timeout.count (), 0);
ASSERT_EQ (config.tcp_io_timeout.count (), 1);
ASSERT_EQ (config.tcp_idle_timeout.count (), 0);
ASSERT_EQ (config.pow_sleep_interval.count (), 0);
ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::1"));
ASSERT_EQ (config.external_port, 0);
@ -760,8 +761,8 @@ TEST (node_config, v17_values)
ASSERT_TRUE (config.diagnostics_config.txn_tracking.ignore_writes_below_block_processor_max_time);
// Check config is correct with other values
tree.put ("tcp_client_timeout", std::numeric_limits<unsigned long>::max () - 100);
tree.put ("tcp_server_timeout", std::numeric_limits<unsigned>::max ());
tree.put ("tcp_io_timeout", std::numeric_limits<unsigned long>::max () - 100);
tree.put ("tcp_idle_timeout", std::numeric_limits<unsigned>::max ());
tree.put ("pow_sleep_interval", std::numeric_limits<unsigned long>::max () - 100);
tree.put ("external_address", "::ffff:192.168.1.1");
tree.put ("external_port", std::numeric_limits<uint16_t>::max () - 1);
@ -777,8 +778,8 @@ TEST (node_config, v17_values)
upgraded = false;
config.deserialize_json (upgraded, tree);
ASSERT_FALSE (upgraded);
ASSERT_EQ (config.tcp_client_timeout.count (), std::numeric_limits<unsigned long>::max () - 100);
ASSERT_EQ (config.tcp_server_timeout.count (), std::numeric_limits<unsigned>::max ());
ASSERT_EQ (config.tcp_io_timeout.count (), std::numeric_limits<unsigned long>::max () - 100);
ASSERT_EQ (config.tcp_idle_timeout.count (), std::numeric_limits<unsigned>::max ());
ASSERT_EQ (config.pow_sleep_interval.count (), std::numeric_limits<unsigned long>::max () - 100);
ASSERT_EQ (config.external_address, boost::asio::ip::address_v6::from_string ("::ffff:192.168.1.1"));
ASSERT_EQ (config.external_port, std::numeric_limits<uint16_t>::max () - 1);

109
nano/core_test/socket.cpp Normal file
View file

@ -0,0 +1,109 @@
#include <boost/thread.hpp>
#include <gtest/gtest.h>
#include <nano/core_test/testutil.hpp>
#include <nano/node/socket.hpp>
#include <nano/node/testing.hpp>
using namespace std::chrono_literals;
TEST (socket, concurrent_writes)
{
nano::inactive_node inactivenode;
auto node = inactivenode.node;
// This gives more realistic execution than using system#poll, allowing writes to
// queue up and drain concurrently.
nano::thread_runner runner (node->io_ctx, 1);
constexpr size_t max_connections = 4;
constexpr size_t client_count = max_connections;
constexpr size_t message_count = 4;
constexpr size_t total_message_count = client_count * message_count;
// We're expecting client_count*4 messages
nano::util::counted_completion read_count_completion (total_message_count);
std::function<void(std::shared_ptr<nano::socket>)> reader = [&read_count_completion, &total_message_count, &reader](std::shared_ptr<nano::socket> socket_a) {
auto buff (std::make_shared<std::vector<uint8_t>> ());
buff->resize (1);
socket_a->async_read (buff, 1, [&read_count_completion, &reader, &total_message_count, socket_a, buff](boost::system::error_code const & ec, size_t size_a) {
if (!ec)
{
if (read_count_completion.increment () < total_message_count)
{
reader (socket_a);
}
}
else if (ec != boost::asio::error::eof)
{
std::cerr << "async_read: " << ec.message () << std::endl;
}
});
};
boost::asio::ip::tcp::endpoint endpoint (boost::asio::ip::address_v4::any (), 25000);
auto server_socket (std::make_shared<nano::server_socket> (node, endpoint, max_connections, nano::socket::concurrency::multi_writer));
boost::system::error_code ec;
server_socket->start (ec);
ASSERT_FALSE (ec);
std::vector<std::shared_ptr<nano::socket>> connections;
// On every new connection, start reading data
server_socket->on_connection ([&connections, &reader](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
if (ec_a)
{
std::cerr << "on_connection: " << ec_a.message () << std::endl;
}
else
{
connections.push_back (new_connection);
reader (new_connection);
}
// Keep accepting connections
return true;
});
nano::util::counted_completion connection_count_completion (client_count);
std::vector<std::shared_ptr<nano::socket>> clients;
for (unsigned i = 0; i < client_count; i++)
{
auto client (std::make_shared<nano::socket> (node, boost::none, nano::socket::concurrency::multi_writer));
clients.push_back (client);
client->async_connect (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v4::loopback (), 25000),
[&connection_count_completion](boost::system::error_code const & ec_a) {
if (ec_a)
{
std::cerr << "async_connect: " << ec_a.message () << std::endl;
}
else
{
connection_count_completion.increment ();
}
});
}
ASSERT_FALSE (connection_count_completion.await_count_for (10s));
// Execute overlapping writes from multiple threads
auto client (clients[0]);
for (int i = 0; i < client_count; i++)
{
std::thread runner ([&client]() {
for (int i = 0; i < message_count; i++)
{
auto buff (std::make_shared<std::vector<uint8_t>> ());
buff->push_back ('A' + i);
client->async_write (buff);
}
});
runner.detach ();
}
ASSERT_FALSE (read_count_completion.await_count_for (10s));
node->stop ();
runner.stop_event_processing ();
runner.join ();
ASSERT_EQ (node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in), client_count);
// We may exhaust max connections and have some tcp accept failures, but no more than the client count
ASSERT_LT (node->stats.count (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in), client_count);
}

View file

@ -1,6 +1,10 @@
#pragma once
#include <atomic>
#include <boost/multiprecision/cpp_int.hpp>
#include <condition_variable>
#include <mutex>
#include <nano/lib/timer.hpp>
#include <string>
#define GTEST_TEST_ERROR_CODE(expression, text, actual, expected, fail) \
@ -36,4 +40,83 @@ extern nano::uint256_union const & nano_test_account;
extern nano::uint256_union const & genesis_account;
extern nano::uint256_union const & burn_account;
extern nano::uint128_t const & genesis_amount;
namespace util
{
/**
* Helper to signal completion of async handlers in tests.
* Subclasses implement specific conditions for completion.
*/
class completion_signal
{
public:
virtual ~completion_signal ()
{
notify ();
}
/** Explicitly notify the completion */
void notify ()
{
cv.notify_all ();
}
protected:
std::condition_variable cv;
std::mutex mutex;
};
/**
* Signals completion when a count is reached.
*/
class counted_completion : public completion_signal
{
public:
/**
* Constructor
* @param required_count_a When increment() reaches this count within the deadline, await_count_for() will return false.
*/
counted_completion (unsigned required_count_a) :
required_count (required_count_a)
{
}
/**
* Wait for increment() to signal completion, or reaching the deadline.
* @param deadline_duration_a Deadline as a std::chrono duration
* @return true if the count is reached within the deadline
*/
template <typename UNIT>
bool await_count_for (UNIT deadline_duration_a)
{
nano::timer<UNIT> timer (nano::timer_state::started);
bool error = true;
while (error && timer.before_deadline (deadline_duration_a))
{
error = count < required_count;
if (error)
{
std::unique_lock<std::mutex> lock (mutex);
cv.wait_for (lock, std::chrono::milliseconds (1));
}
}
return error;
}
/** Increments the current count. If the required count is reached, await_count_for() waiters are notified. */
unsigned increment ()
{
auto val (count.fetch_add (1));
if (val >= required_count)
{
notify ();
}
return val;
}
private:
std::atomic<unsigned> count{ 0 };
unsigned required_count;
};
}
}

View file

@ -142,7 +142,8 @@ void nano::thread_attributes::set (boost::thread::attributes & attrs)
attrs_l->set_stack_size (8000000); //8MB
}
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a)
nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned service_threads_a) :
io_guard (boost::asio::make_work_guard (io_ctx_a))
{
boost::thread::attributes attrs;
nano::thread_attributes::set (attrs);
@ -154,6 +155,13 @@ nano::thread_runner::thread_runner (boost::asio::io_context & io_ctx_a, unsigned
{
io_ctx_a.run ();
}
catch (std::exception const & ex)
{
std::cerr << ex.what () << std::endl;
#ifndef NDEBUG
throw ex;
#endif
}
catch (...)
{
#ifndef NDEBUG
@ -176,6 +184,7 @@ nano::thread_runner::~thread_runner ()
void nano::thread_runner::join ()
{
io_guard.reset ();
for (auto & i : threads)
{
if (i.joinable ())
@ -185,6 +194,11 @@ void nano::thread_runner::join ()
}
}
void nano::thread_runner::stop_event_processing ()
{
io_guard.get_executor ().context ().stop ();
}
/*
* Backing code for "release_assert", which is itself a macro
*/

View file

@ -123,8 +123,12 @@ class thread_runner final
public:
thread_runner (boost::asio::io_context &, unsigned);
~thread_runner ();
/** Tells the IO context to stop processing events.*/
void stop_event_processing ();
/** Wait for IO threads to complete */
void join ();
std::vector<boost::thread> threads;
boost::asio::executor_work_guard<boost::asio::io_context::executor_type> io_guard;
};
template <typename... T>

View file

@ -244,9 +244,12 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
nano::set_secure_perm_file (config_path, error_chmod);
if (!error)
{
boost::asio::io_context io_ctx;
config.node.logging.init (data_path);
nano::logger_mt logger{ config.node.logging.min_time_between_log_output };
boost::asio::io_context io_ctx;
nano::thread_runner runner (io_ctx, config.node.io_threads);
std::shared_ptr<nano::node> node;
std::shared_ptr<nano_qt::wallet> gui;
nano::set_application_icon (application);
@ -258,6 +261,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
nano::alarm alarm (io_ctx);
nano::node_init init;
nano::node_flags flags;
node = std::make_shared<nano::node> (init, io_ctx, data_path, alarm, config.node, work, flags);
if (!init.error ())
{
@ -330,8 +334,6 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
#endif
}
}
nano::thread_runner runner (io_ctx, node->config.io_threads);
QObject::connect (&application, &QApplication::aboutToQuit, [&]() {
ipc.stop ();
node->stop ();
@ -345,6 +347,7 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
rpc_process->terminate ();
}
#endif
runner.stop_event_processing ();
});
application.postEvent (&processor, new nano_qt::eventloop_event ([&]() {
gui = std::make_shared<nano_qt::wallet> (application, processor, *node, wallet, config.account);

View file

@ -71,6 +71,8 @@ add_library (node
transport/udp.cpp
signatures.hpp
signatures.cpp
socket.hpp
socket.cpp
stats.hpp
stats.cpp
voting.hpp

View file

@ -20,123 +20,6 @@ constexpr unsigned bulk_push_cost_limit = 200;
size_t constexpr nano::frontier_req_client::size_frontier;
nano::socket::socket (std::shared_ptr<nano::node> node_a) :
socket_m (node_a->io_ctx),
last_action_time (0),
async_start_time (std::numeric_limits<uint64_t>::max ()),
node (node_a)
{
}
void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void(boost::system::error_code const &)> callback_a)
{
checkup (node->config.tcp_client_timeout.count ());
auto this_l (shared_from_this ());
start ();
socket_m.async_connect (endpoint_a, [this_l, callback_a](boost::system::error_code const & ec) {
this_l->stop ();
callback_a (ec);
});
}
void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
assert (size_a <= buffer_a->size ());
auto this_l (shared_from_this ());
if (socket_m.is_open ())
{
start ();
boost::asio::async_read (socket_m, boost::asio::buffer (buffer_a->data (), size_a), [this_l, callback_a](boost::system::error_code const & ec, size_t size_a) {
this_l->node->stats.add (nano::stat::type::traffic_bootstrap, nano::stat::dir::in, size_a);
this_l->stop ();
callback_a (ec, size_a);
});
}
}
void nano::socket::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
auto this_l (shared_from_this ());
if (socket_m.is_open ())
{
start ();
async_write (boost::asio::buffer (buffer_a->data (), buffer_a->size ()), [this_l, callback_a, buffer_a](boost::system::error_code const & ec, size_t size_a) {
this_l->node->stats.add (nano::stat::type::traffic_bootstrap, nano::stat::dir::out, size_a);
this_l->stop ();
callback_a (ec, size_a);
});
}
}
void nano::socket::async_write (boost::asio::const_buffer buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
boost::asio::async_write (socket_m, buffer_a, callback_a);
}
void nano::socket::start ()
{
auto now (std::chrono::steady_clock::now ().time_since_epoch ().count ());
async_start_time = now;
last_action_time = now;
}
void nano::socket::stop ()
{
async_start_time = std::numeric_limits<uint64_t>::max ();
last_action_time = std::chrono::steady_clock::now ().time_since_epoch ().count ();
}
void nano::socket::close ()
{
boost::system::error_code ec;
socket_m.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
/* Ignore error code for shutdown as it is a best effort anyway. */
socket_m.close (ec);
if (ec)
{
// The underlying file descriptor is closed anyway, so just log the error and increment socket failure stat.
node->logger.try_log ("Failed to close socket gracefully: ", ec.message ());
node->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
}
}
void nano::socket::checkup (uint64_t timeout_a)
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node->network_params.network.is_test_network () ? 1 : 10), [this_w, timeout_a]() {
if (auto this_l = this_w.lock ())
{
if (this_l->async_start_time != std::numeric_limits<uint64_t>::max () && this_l->async_start_time + timeout_a < static_cast<uint64_t> (std::chrono::steady_clock::now ().time_since_epoch ().count ()))
{
if (this_l->node->config.logging.bulk_pull_logging ())
{
this_l->node->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote_endpoint ()));
}
this_l->close ();
}
else
{
this_l->checkup (timeout_a);
}
}
});
}
nano::tcp_endpoint nano::socket::remote_endpoint ()
{
nano::tcp_endpoint endpoint;
if (socket_m.is_open ())
{
boost::system::error_code remote_endpoint_error;
endpoint = socket_m.remote_endpoint (remote_endpoint_error);
}
return endpoint;
}
nano::bootstrap_client::bootstrap_client (std::shared_ptr<nano::node> node_a, std::shared_ptr<nano::bootstrap_attempt> attempt_a, std::shared_ptr<nano::transport::channel_tcp> channel_a) :
node (node_a),
attempt (attempt_a),
@ -1241,7 +1124,8 @@ void nano::bootstrap_attempt::connect_client (nano::tcp_endpoint const & endpoin
++connections;
auto socket (std::make_shared<nano::socket> (node));
auto this_l (shared_from_this ());
socket->async_connect (endpoint_a, [this_l, socket, endpoint_a](boost::system::error_code const & ec) {
socket->async_connect (endpoint_a,
[this_l, socket, endpoint_a](boost::system::error_code const & ec) {
if (!ec)
{
if (this_l->node->config.logging.bulk_pull_logging ())
@ -1924,30 +1808,35 @@ std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_initiato
}
}
nano::bootstrap_listener::bootstrap_listener (boost::asio::io_context & io_ctx_a, uint16_t port_a, nano::node & node_a) :
acceptor (io_ctx_a),
local (boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port_a)),
io_ctx (io_ctx_a),
nano::bootstrap_listener::bootstrap_listener (uint16_t port_a, nano::node & node_a) :
node (node_a),
defer_acceptor (io_ctx_a)
port (port_a)
{
}
void nano::bootstrap_listener::start ()
{
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
listening_socket = std::make_shared<nano::server_socket> (node.shared (), boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::any (), port), node.config.bootstrap_connections_max);
boost::system::error_code ec;
acceptor.bind (local, ec);
listening_socket->start (ec);
if (ec)
{
node.logger.try_log (boost::str (boost::format ("Error while binding for bootstrap on port %1%: %2%") % local.port () % ec.message ()));
node.logger.try_log (boost::str (boost::format ("Error while binding for bootstrap on port %1%: %2%") % listening_socket->listening_port () % ec.message ()));
throw std::runtime_error (ec.message ());
}
acceptor.listen ();
accept_connection ();
listening_socket->on_connection ([this](std::shared_ptr<nano::socket> new_connection, boost::system::error_code const & ec_a) {
bool keep_accepting = true;
if (ec_a)
{
keep_accepting = false;
this->node.logger.try_log (boost::str (boost::format ("Error while accepting bootstrap connections: %1%") % ec_a.message ()));
}
else
{
accept_action (ec_a, new_connection);
}
return keep_accepting;
});
}
void nano::bootstrap_listener::stop ()
@ -1958,88 +1847,41 @@ void nano::bootstrap_listener::stop ()
on = false;
connections_l.swap (connections);
}
acceptor.close ();
for (auto & i : connections_l)
if (listening_socket)
{
auto connection (i.second.lock ());
if (connection)
{
connection->socket->close ();
}
listening_socket->close ();
listening_socket = nullptr;
}
}
void nano::bootstrap_listener::accept_connection ()
size_t nano::bootstrap_listener::connection_count ()
{
if (acceptor.is_open ())
{
if (connections.size () < node.config.bootstrap_connections_max)
{
auto socket (std::make_shared<nano::socket> (node.shared ()));
socket->checkup (node.config.tcp_server_timeout.count ());
acceptor.async_accept (socket->socket_m, [this, socket](boost::system::error_code const & ec) {
accept_action (ec, socket);
});
}
else
{
node.logger.try_log (boost::str (boost::format ("Unable to accept new TCP network sockets (have %1% concurrent connections, limit of %2%), will try to accept again in 1s") % connections.size () % node.config.bootstrap_connections_max));
defer_acceptor.expires_after (std::chrono::seconds (1));
defer_acceptor.async_wait ([this](const boost::system::error_code & ec) {
/*
* There should be no other call points that can invoke
* accept_connect() after starting the listener, so if we
* get an error from the I/O context, something is probably
* wrong.
*/
if (!ec)
{
accept_connection ();
}
});
}
}
std::lock_guard<std::mutex> lock (mutex);
return connections.size ();
}
void nano::bootstrap_listener::accept_action (boost::system::error_code const & ec, std::shared_ptr<nano::socket> socket_a)
{
if (!ec)
auto connection (std::make_shared<nano::bootstrap_server> (socket_a, node.shared ()));
{
auto connection (std::make_shared<nano::bootstrap_server> (socket_a, node.shared ()));
{
std::lock_guard<std::mutex> lock (mutex);
if (acceptor.is_open ())
{
connections[connection.get ()] = connection;
connection->receive ();
}
}
accept_connection ();
}
else
{
node.logger.try_log (boost::str (boost::format ("Error while accepting bootstrap connections: %1%") % ec.message ()));
std::lock_guard<std::mutex> lock (mutex);
connections[connection.get ()] = connection;
connection->receive ();
}
}
boost::asio::ip::tcp::endpoint nano::bootstrap_listener::endpoint ()
{
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), local.port ());
return boost::asio::ip::tcp::endpoint (boost::asio::ip::address_v6::loopback (), listening_socket->listening_port ());
}
namespace nano
{
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name)
{
size_t count = 0;
{
std::lock_guard<std::mutex> guard (bootstrap_listener.mutex);
count = bootstrap_listener.connections.size ();
}
auto sizeof_element = sizeof (decltype (bootstrap_listener.connections)::value_type);
auto composite = std::make_unique<seq_con_info_composite> (name);
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "connections", count, sizeof_element }));
composite->add_component (std::make_unique<seq_con_info_leaf> (seq_con_info{ "connections", bootstrap_listener.connection_count (), sizeof_element }));
return composite;
}
}
@ -2253,7 +2095,7 @@ void nano::bootstrap_server::finish_request ()
else
{
std::weak_ptr<nano::bootstrap_server> this_w (shared_from_this ());
node->alarm.add (std::chrono::steady_clock::now () + node->config.tcp_server_timeout + std::chrono::seconds (1), [this_w]() {
node->alarm.add (std::chrono::steady_clock::now () + (node->config.tcp_io_timeout * 2) + std::chrono::seconds (1), [this_w]() {
if (auto this_l = this_w.lock ())
{
this_l->timeout ();
@ -2266,7 +2108,7 @@ void nano::bootstrap_server::timeout ()
{
if (socket != nullptr)
{
if (socket->last_action_time + node->config.tcp_server_timeout.count () < static_cast<uint64_t> (std::chrono::steady_clock::now ().time_since_epoch ().count ()))
if (socket->has_timed_out ())
{
if (node->config.logging.bulk_pull_logging ())
{
@ -2658,46 +2500,30 @@ void nano::bulk_pull_account_server::send_frontier ()
* so handle the invalid_request case by terminating the
* request without any response
*/
if (invalid_request)
if (!invalid_request)
{
connection->finish_request ();
auto stream_transaction (connection->node->store.tx_begin_read ());
return;
// Get account balance and frontier block hash
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
nano::uint128_union account_frontier_balance (account_frontier_balance_int);
// Write the frontier block hash and balance into a buffer
send_buffer->clear ();
{
nano::vectorstream output_stream (*send_buffer);
write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}
// Send the buffer to the requestor
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
/*
* Supply the account frontier
*/
/**
** Establish a database transaction
**/
auto stream_transaction (connection->node->store.tx_begin_read ());
/**
** Get account balance and frontier block hash
**/
auto account_frontier_hash (connection->node->ledger.latest (stream_transaction, request->account));
auto account_frontier_balance_int (connection->node->ledger.account_balance (stream_transaction, request->account));
nano::uint128_union account_frontier_balance (account_frontier_balance_int);
/**
** Write the frontier block hash and balance into a buffer
**/
send_buffer->clear ();
{
nano::vectorstream output_stream (*send_buffer);
write (output_stream, account_frontier_hash.bytes);
write (output_stream, account_frontier_balance.bytes);
}
/**
** Send the buffer to the requestor
**/
auto this_l (shared_from_this ());
connection->socket->async_write (send_buffer, [this_l](boost::system::error_code const & ec, size_t size_a) {
this_l->sent_action (ec, size_a);
});
}
void nano::bulk_pull_account_server::send_next_block ()

View file

@ -1,6 +1,7 @@
#pragma once
#include <nano/node/common.hpp>
#include <nano/node/socket.hpp>
#include <nano/secure/blockstore.hpp>
#include <nano/secure/ledger.hpp>
@ -33,26 +34,6 @@ enum class sync_result
error,
fork
};
class socket final : public std::enable_shared_from_this<nano::socket>
{
public:
explicit socket (std::shared_ptr<nano::node>);
void async_connect (nano::tcp_endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
void async_write (std::shared_ptr<std::vector<uint8_t>>, std::function<void(boost::system::error_code const &, size_t)>);
void async_write (boost::asio::const_buffer, std::function<void(boost::system::error_code const &, size_t)>);
void start ();
void stop ();
void close ();
void checkup (uint64_t);
nano::tcp_endpoint remote_endpoint ();
boost::asio::ip::tcp::socket socket_m;
std::atomic<uint64_t> last_action_time;
private:
std::atomic<uint64_t> async_start_time;
std::shared_ptr<nano::node> node;
};
class bootstrap_client;
class pull_info
@ -285,22 +266,21 @@ class bootstrap_server;
class bootstrap_listener final
{
public:
bootstrap_listener (boost::asio::io_context &, uint16_t, nano::node &);
bootstrap_listener (uint16_t, nano::node &);
void start ();
void stop ();
void accept_connection ();
void accept_action (boost::system::error_code const &, std::shared_ptr<nano::socket>);
size_t connection_count ();
std::mutex mutex;
std::unordered_map<nano::bootstrap_server *, std::weak_ptr<nano::bootstrap_server>> connections;
nano::tcp_endpoint endpoint ();
boost::asio::ip::tcp::acceptor acceptor;
nano::tcp_endpoint local;
boost::asio::io_context & io_ctx;
nano::node & node;
std::shared_ptr<nano::server_socket> listening_socket;
bool on;
private:
boost::asio::steady_timer defer_acceptor;
uint16_t port;
};
std::unique_ptr<seq_con_info_component> collect_seq_con_info (bootstrap_listener & bootstrap_listener, const std::string & name);

View file

@ -256,7 +256,7 @@ public:
server.node.logger.always_log ("IPC: acceptor error: ", ec.message ());
}
if (acceptor->is_open () && ec != boost::asio::error::operation_aborted)
if (ec != boost::asio::error::operation_aborted && acceptor->is_open ())
{
this->accept ();
}

View file

@ -38,6 +38,7 @@ nano::error nano::logging::serialize_json (nano::jsonconfig & json) const
json.put ("ledger_duplicate", ledger_duplicate_logging_value);
json.put ("vote", vote_logging_value);
json.put ("network", network_logging_value);
json.put ("network_timeout", network_timeout_logging_value);
json.put ("network_message", network_message_logging_value);
json.put ("network_publish", network_publish_logging_value);
json.put ("network_packet", network_packet_logging_value);
@ -87,6 +88,7 @@ bool nano::logging::upgrade_json (unsigned version_a, nano::jsonconfig & json)
upgraded_l = true;
case 6:
json.put ("min_time_between_output", min_time_between_log_output.count ());
json.put ("network_timeout", network_timeout_logging_value);
json.erase ("log_rpc");
json.put ("long_database_txns", false);
upgraded_l = true;
@ -126,6 +128,7 @@ nano::error nano::logging::deserialize_json (bool & upgraded_a, nano::jsonconfig
json.get<bool> ("ledger_duplicate", ledger_duplicate_logging_value);
json.get<bool> ("vote", vote_logging_value);
json.get<bool> ("network", network_logging_value);
json.get<bool> ("network_timeout", network_timeout_logging_value);
json.get<bool> ("network_message", network_message_logging_value);
json.get<bool> ("network_publish", network_publish_logging_value);
json.get<bool> ("network_packet", network_packet_logging_value);
@ -168,6 +171,11 @@ bool nano::logging::network_logging () const
return network_logging_value;
}
bool nano::logging::network_timeout_logging () const
{
return network_logging () && network_timeout_logging_value;
}
bool nano::logging::network_message_logging () const
{
return network_logging () && network_message_logging_value;

View file

@ -25,6 +25,7 @@ public:
bool ledger_duplicate_logging () const;
bool vote_logging () const;
bool network_logging () const;
bool network_timeout_logging () const;
bool network_message_logging () const;
bool network_publish_logging () const;
bool network_packet_logging () const;
@ -45,6 +46,7 @@ public:
bool ledger_duplicate_logging_value{ false };
bool vote_logging_value{ false };
bool network_logging_value{ true };
bool network_timeout_logging_value{ false };
bool network_message_logging_value{ false };
bool network_publish_logging_value{ false };
bool network_packet_logging_value{ false };

View file

@ -1035,7 +1035,7 @@ ledger (store, stats, config.epoch_block_link, config.epoch_block_signer),
checker (config.signature_checker_threads),
network (*this, config.peering_port),
bootstrap_initiator (*this),
bootstrap (io_ctx_a, config.peering_port, *this),
bootstrap (config.peering_port, *this),
application_path (application_path_a),
port_mapping (*this),
vote_processor (*this),

View file

@ -114,8 +114,8 @@ nano::error nano::node_config::serialize_json (nano::jsonconfig & json) const
json.put ("allow_local_peers", allow_local_peers);
json.put ("vote_minimum", vote_minimum.to_string_dec ());
json.put ("unchecked_cutoff_time", unchecked_cutoff_time.count ());
json.put ("tcp_client_timeout", tcp_client_timeout.count ());
json.put ("tcp_server_timeout", tcp_server_timeout.count ());
json.put ("tcp_io_timeout", tcp_io_timeout.count ());
json.put ("tcp_idle_timeout", tcp_idle_timeout.count ());
json.put ("pow_sleep_interval", pow_sleep_interval.count ());
json.put ("external_address", external_address.to_string ());
json.put ("external_port", external_port);
@ -255,8 +255,8 @@ bool nano::node_config::upgrade_json (unsigned version_a, nano::jsonconfig & jso
nano::jsonconfig diagnostics_l;
diagnostics_config.serialize_json (diagnostics_l);
json.put_child ("diagnostics", diagnostics_l);
json.put ("tcp_client_timeout", tcp_client_timeout.count ());
json.put ("tcp_server_timeout", tcp_server_timeout.count ());
json.put ("tcp_io_timeout", tcp_io_timeout.count ());
json.put ("tcp_idle_timeout", tcp_idle_timeout.count ());
json.put (pow_sleep_interval_key, pow_sleep_interval.count ());
json.put ("external_address", external_address.to_string ());
json.put ("external_port", external_port);
@ -361,12 +361,13 @@ nano::error nano::node_config::deserialize_json (bool & upgraded_a, nano::jsonco
auto unchecked_cutoff_time_l = static_cast<unsigned long> (unchecked_cutoff_time.count ());
json.get ("unchecked_cutoff_time", unchecked_cutoff_time_l);
unchecked_cutoff_time = std::chrono::seconds (unchecked_cutoff_time_l);
auto tcp_client_timeout_l = static_cast<unsigned long> (tcp_client_timeout.count ());
json.get ("tcp_client_timeout", tcp_client_timeout_l);
tcp_client_timeout = std::chrono::seconds (tcp_client_timeout_l);
auto tcp_server_timeout_l = static_cast<unsigned long> (tcp_server_timeout.count ());
json.get ("tcp_server_timeout", tcp_server_timeout_l);
tcp_server_timeout = std::chrono::seconds (tcp_server_timeout_l);
auto tcp_io_timeout_l = static_cast<unsigned long> (tcp_io_timeout.count ());
json.get ("tcp_io_timeout", tcp_io_timeout_l);
tcp_io_timeout = std::chrono::seconds (tcp_io_timeout_l);
auto tcp_idle_timeout_l = static_cast<unsigned long> (tcp_idle_timeout.count ());
json.get ("tcp_idle_timeout", tcp_idle_timeout_l);
tcp_idle_timeout = std::chrono::seconds (tcp_idle_timeout_l);
auto ipc_config_l (json.get_optional_child ("ipc"));
if (ipc_config_l)

View file

@ -61,8 +61,10 @@ public:
uint16_t external_port{ 0 };
std::chrono::milliseconds block_processor_batch_max_time{ std::chrono::milliseconds (5000) };
std::chrono::seconds unchecked_cutoff_time{ std::chrono::seconds (4 * 60 * 60) }; // 4 hours
std::chrono::seconds tcp_client_timeout{ std::chrono::seconds (5) };
std::chrono::seconds tcp_server_timeout{ std::chrono::seconds (30) };
/** Timeout for initiated async operations */
std::chrono::seconds tcp_io_timeout{ network_params.network.is_test_network () ? std::chrono::seconds (5) : std::chrono::seconds (15) };
/** Default maximum idle time for a socket before it's automatically closed */
std::chrono::seconds tcp_idle_timeout{ std::chrono::minutes (2) };
std::chrono::nanoseconds pow_sleep_interval{ 0 };
static std::chrono::seconds constexpr keepalive_period = std::chrono::seconds (60);
static std::chrono::seconds constexpr keepalive_cutoff = keepalive_period * 5;

347
nano/node/socket.cpp Normal file
View file

@ -0,0 +1,347 @@
#include <limits>
#include <nano/node/node.hpp>
#include <nano/node/socket.hpp>
nano::socket::socket (std::shared_ptr<nano::node> node_a, boost::optional<std::chrono::seconds> max_idle_time_a, nano::socket::concurrency concurrency_a) :
strand (node_a->io_ctx.get_executor ()),
tcp_socket (node_a->io_ctx),
node (node_a),
writer_concurrency (concurrency_a),
next_deadline (std::numeric_limits<uint64_t>::max ()),
last_completion_time (0),
max_idle_time (max_idle_time_a)
{
if (!max_idle_time)
{
max_idle_time = node_a->config.tcp_idle_timeout;
}
}
nano::socket::~socket ()
{
close_internal ();
}
void nano::socket::async_connect (nano::tcp_endpoint const & endpoint_a, std::function<void(boost::system::error_code const &)> callback_a)
{
checkup ();
auto this_l (shared_from_this ());
start_timer ();
this_l->tcp_socket.async_connect (endpoint_a,
boost::asio::bind_executor (this_l->strand,
[this_l, callback_a, endpoint_a](boost::system::error_code const & ec) {
this_l->stop_timer ();
this_l->remote = endpoint_a;
callback_a (ec);
}));
}
void nano::socket::async_read (std::shared_ptr<std::vector<uint8_t>> buffer_a, size_t size_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
assert (size_a <= buffer_a->size ());
auto this_l (shared_from_this ());
if (!closed)
{
start_timer ();
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, size_a, this_l]() {
assert (!this_l->closed);
boost::asio::async_read (this_l->tcp_socket, boost::asio::buffer (buffer_a->data (), size_a),
boost::asio::bind_executor (this_l->strand,
[this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::in, size_a);
this_l->stop_timer ();
callback_a (ec, size_a);
}
}));
}));
}
}
void nano::socket::async_write (std::shared_ptr<std::vector<uint8_t>> buffer_a, std::function<void(boost::system::error_code const &, size_t)> callback_a)
{
auto this_l (shared_from_this ());
if (!closed)
{
if (writer_concurrency == nano::socket::concurrency::multi_writer)
{
boost::asio::post (strand, boost::asio::bind_executor (strand, [buffer_a, callback_a, this_l]() {
bool write_in_progress = !this_l->send_queue.empty ();
this_l->send_queue.emplace_back (nano::socket::queue_item{ buffer_a, callback_a });
if (!write_in_progress)
{
this_l->write_queued_messages ();
}
}));
}
else
{
start_timer ();
boost::asio::async_write (tcp_socket, boost::asio::buffer (buffer_a->data (), buffer_a->size ()),
boost::asio::bind_executor (strand,
[this_l, buffer_a, callback_a](boost::system::error_code const & ec, size_t size_a) {
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (callback_a)
{
callback_a (ec, size_a);
}
}
}));
}
}
}
void nano::socket::write_queued_messages ()
{
if (!closed)
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
auto msg (send_queue.front ());
start_timer ();
boost::asio::async_write (tcp_socket, boost::asio::buffer (msg.buffer->data (), msg.buffer->size ()),
boost::asio::bind_executor (strand,
[msg, this_w](boost::system::error_code ec, std::size_t size_a) {
if (auto this_l = this_w.lock ())
{
if (auto node = this_l->node.lock ())
{
node->stats.add (nano::stat::type::traffic_tcp, nano::stat::dir::out, size_a);
this_l->stop_timer ();
if (!this_l->closed)
{
if (msg.callback)
{
msg.callback (ec, size_a);
}
this_l->send_queue.pop_front ();
if (!ec && !this_l->send_queue.empty ())
{
this_l->write_queued_messages ();
}
}
}
}
}));
}
}
void nano::socket::start_timer ()
{
if (auto node_l = node.lock ())
{
start_timer (node_l->config.tcp_io_timeout);
}
}
void nano::socket::start_timer (std::chrono::seconds deadline_a)
{
next_deadline = deadline_a.count ();
}
void nano::socket::stop_timer ()
{
last_completion_time = nano::seconds_since_epoch ();
}
void nano::socket::checkup ()
{
std::weak_ptr<nano::socket> this_w (shared_from_this ());
if (auto node_l = node.lock ())
{
node_l->alarm.add (std::chrono::steady_clock::now () + std::chrono::seconds (node_l->network_params.network.is_test_network () ? 1 : 2), [this_w, node_l]() {
if (auto this_l = this_w.lock ())
{
uint64_t now (nano::seconds_since_epoch ());
if (this_l->next_deadline != std::numeric_limits<uint64_t>::max () && now - this_l->last_completion_time > this_l->next_deadline)
{
if (auto node_l = this_l->node.lock ())
{
this_l->timed_out = true;
this_l->close ();
if (node_l->config.logging.network_timeout_logging ())
{
node_l->logger.try_log (boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->remote_endpoint ()));
}
}
}
else if (!this_l->closed)
{
this_l->checkup ();
}
}
});
}
}
bool nano::socket::has_timed_out () const
{
return timed_out;
}
void nano::socket::set_max_idle_timeout (std::chrono::seconds max_idle_time_a)
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l, max_idle_time_a]() {
this_l->max_idle_time = max_idle_time_a;
}));
}
void nano::socket::close ()
{
auto this_l (shared_from_this ());
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l] {
this_l->close_internal ();
}));
}
// This must be called from a strand or the destructor
void nano::socket::close_internal ()
{
if (!closed)
{
closed = true;
max_idle_time = boost::none;
boost::system::error_code ec;
// Ignore error code for shutdown as it is best-effort
tcp_socket.shutdown (boost::asio::ip::tcp::socket::shutdown_both, ec);
tcp_socket.close (ec);
send_queue.clear ();
if (ec)
{
if (auto node_l = node.lock ())
{
node_l->logger.try_log ("Failed to close socket gracefully: ", ec.message ());
node_l->stats.inc (nano::stat::type::bootstrap, nano::stat::detail::error_socket_close);
}
}
}
}
nano::tcp_endpoint nano::socket::remote_endpoint () const
{
return remote;
}
void nano::socket::set_writer_concurrency (concurrency writer_concurrency_a)
{
writer_concurrency = writer_concurrency_a;
}
nano::server_socket::server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, nano::socket::concurrency concurrency_a) :
socket (node_a, std::chrono::seconds::max (), concurrency_a), acceptor (node_a->io_ctx), local (local_a), deferred_accept_timer (node_a->io_ctx), max_inbound_connections (max_connections_a), concurrency_new_connections (concurrency_a)
{
}
void nano::server_socket::start (boost::system::error_code & ec_a)
{
acceptor.open (local.protocol ());
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
acceptor.bind (local, ec_a);
if (!ec_a)
{
acceptor.listen (boost::asio::socket_base::max_listen_connections, ec_a);
}
}
void nano::server_socket::close ()
{
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
boost::asio::dispatch (strand, boost::asio::bind_executor (strand, [this_l]() {
this_l->close_internal ();
this_l->acceptor.close ();
for (auto & connection_w : this_l->connections)
{
if (auto connection_l = connection_w.lock ())
{
connection_l->close ();
}
}
this_l->connections.clear ();
}));
}
void nano::server_socket::on_connection (std::function<bool(std::shared_ptr<nano::socket>, boost::system::error_code const &)> callback_a)
{
auto this_l (std::static_pointer_cast<nano::server_socket> (shared_from_this ()));
boost::asio::post (strand, boost::asio::bind_executor (strand, [this_l, callback_a]() {
if (auto node_l = this_l->node.lock ())
{
if (this_l->acceptor.is_open ())
{
if (this_l->connections.size () < this_l->max_inbound_connections)
{
// Prepare new connection
auto new_connection (std::make_shared<nano::socket> (node_l->shared (), boost::none, this_l->concurrency_new_connections));
this_l->acceptor.async_accept (new_connection->tcp_socket, new_connection->remote,
boost::asio::bind_executor (this_l->strand,
[this_l, new_connection, callback_a](boost::system::error_code const & ec_a) {
if (auto node_l = this_l->node.lock ())
{
if (!ec_a)
{
// Make sure the new connection doesn't idle. Note that in most cases, the callback is going to start
// an IO operation immediately, which will start a timer.
new_connection->checkup ();
new_connection->start_timer (node_l->network_params.network.is_test_network () ? std::chrono::seconds (2) : node_l->config.tcp_idle_timeout);
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_success, nano::stat::dir::in);
this_l->connections.push_back (new_connection);
this_l->evict_dead_connections ();
}
else
{
node_l->logger.try_log ("Unable to accept connection: ", ec_a.message ());
}
// If the callback returns true, keep accepting new connections
if (callback_a (new_connection, ec_a))
{
this_l->on_connection (callback_a);
}
else
{
node_l->logger.try_log ("Stopping to accept connections");
}
}
}));
}
else
{
this_l->evict_dead_connections ();
node_l->stats.inc (nano::stat::type::tcp, nano::stat::detail::tcp_accept_failure, nano::stat::dir::in);
this_l->deferred_accept_timer.expires_after (std::chrono::seconds (2));
this_l->deferred_accept_timer.async_wait ([this_l, callback_a](const boost::system::error_code & ec_a) {
if (!ec_a)
{
// Try accepting again
std::static_pointer_cast<nano::server_socket> (this_l)->on_connection (callback_a);
}
else
{
if (auto node_l = this_l->node.lock ())
{
node_l->logger.try_log ("Unable to accept connection (deferred): ", ec_a.message ());
}
}
});
}
}
}
}));
}
// This must be called from a strand
void nano::server_socket::evict_dead_connections ()
{
assert (strand.running_in_this_thread ());
connections.erase (std::remove_if (connections.begin (), connections.end (), [](auto & connection) { return connection.expired (); }), connections.end ());
}

123
nano/node/socket.hpp Normal file
View file

@ -0,0 +1,123 @@
#pragma once
#include <boost/asio.hpp>
#include <boost/optional.hpp>
#include <chrono>
#include <deque>
#include <memory>
#include <utility>
#include <vector>
namespace nano
{
class node;
class server_socket;
/** Socket class for tcp clients and newly accepted connections */
class socket : public std::enable_shared_from_this<nano::socket>
{
friend class server_socket;
public:
/**
* If multi_writer is used, overlapping writes are allowed, including from multiple threads.
* For bootstrapping, reading and writing alternates on a socket, thus single_writer
* should be used to avoid queueing overhead. For live messages, multiple threads may want
* to concurrenctly queue messages on the same socket, thus multi_writer should be used.
*/
enum class concurrency
{
single_writer,
multi_writer
};
/**
* Constructor
* @param node Owning node
* @param max_idle_time If no activity occurs within the idle time, the socket is closed. If not set, the tcp_idle_time config option is used.
* @param concurrency write concurrency
*/
explicit socket (std::shared_ptr<nano::node> node, boost::optional<std::chrono::seconds> max_idle_time = boost::none, concurrency = concurrency::single_writer);
virtual ~socket ();
void async_connect (boost::asio::ip::tcp::endpoint const &, std::function<void(boost::system::error_code const &)>);
void async_read (std::shared_ptr<std::vector<uint8_t>>, size_t, std::function<void(boost::system::error_code const &, size_t)>);
void async_write (std::shared_ptr<std::vector<uint8_t>>, std::function<void(boost::system::error_code const &, size_t)> = nullptr);
void close ();
boost::asio::ip::tcp::endpoint remote_endpoint () const;
/** Returns true if the socket has timed out */
bool has_timed_out () const;
/** This can be called to change the maximum idle time, e.g. based on the type of traffic detected. */
void set_max_idle_timeout (std::chrono::seconds max_idle_time_a);
/** Change write concurrent */
void set_writer_concurrency (concurrency writer_concurrency_a);
protected:
/** Holds the buffer and callback for queued writes */
class queue_item
{
public:
std::shared_ptr<std::vector<uint8_t>> buffer;
std::function<void(boost::system::error_code const &, size_t)> callback;
};
boost::asio::strand<boost::asio::io_context::executor_type> strand;
boost::asio::ip::tcp::socket tcp_socket;
std::weak_ptr<nano::node> node;
/** The other end of the connection */
boost::asio::ip::tcp::endpoint remote;
/** Send queue, protected by always being accessed in the strand */
std::deque<queue_item> send_queue;
std::atomic<concurrency> writer_concurrency;
std::atomic<uint64_t> next_deadline;
std::atomic<uint64_t> last_completion_time;
std::atomic<bool> timed_out{ false };
boost::optional<std::chrono::seconds> max_idle_time;
/** Set by close() - completion handlers must check this. This is more reliable than checking
error codes as the OS may have already completed the async operation. */
std::atomic<bool> closed{ false };
void close_internal ();
void write_queued_messages ();
void start_timer (std::chrono::seconds deadline_a);
void start_timer ();
void stop_timer ();
void checkup ();
};
/** Socket class for TCP servers */
class server_socket final : public socket
{
public:
/**
* Constructor
* @param node_a Owning node
* @param local_a Address and port to listen on
* @param max_connections_a Maximum number of concurrent connections
* @param concurrency_a Write concurrency for new connections
*/
explicit server_socket (std::shared_ptr<nano::node> node_a, boost::asio::ip::tcp::endpoint local_a, size_t max_connections_a, concurrency concurrency_a = concurrency::single_writer);
/**Start accepting new connections */
void start (boost::system::error_code &);
/** Stop accepting new connections */
void close ();
/** Register callback for new connections. The callback must return true to keep accepting new connections. */
void on_connection (std::function<bool(std::shared_ptr<nano::socket> new_connection, boost::system::error_code const &)>);
uint16_t listening_port ()
{
return local.port ();
}
private:
std::vector<std::weak_ptr<nano::socket>> connections;
boost::asio::ip::tcp::acceptor acceptor;
boost::asio::ip::tcp::endpoint local;
boost::asio::steady_timer deferred_accept_timer;
size_t max_inbound_connections;
/** Concurrency setting for new connections */
concurrency concurrency_new_connections;
void evict_dead_connections ();
};
}

View file

@ -343,6 +343,9 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::ledger:
res = "ledger";
break;
case nano::stat::type::tcp:
res = "tcp";
break;
case nano::stat::type::udp:
res = "udp";
break;
@ -355,7 +358,7 @@ std::string nano::stat::type_to_string (uint32_t key)
case nano::stat::type::traffic:
res = "traffic";
break;
case nano::stat::type::traffic_bootstrap:
case nano::stat::type::traffic_tcp:
res = "traffic_bootstrap";
break;
case nano::stat::type::vote:
@ -490,6 +493,12 @@ std::string nano::stat::detail_to_string (uint32_t key)
case nano::stat::detail::overflow:
res = "overflow";
break;
case nano::stat::detail::tcp_accept_success:
res = "accept_success";
break;
case nano::stat::detail::tcp_accept_failure:
res = "accept_failure";
break;
case nano::stat::detail::unreachable_host:
res = "unreachable_host";
break;

View file

@ -217,7 +217,7 @@ public:
enum class type : uint8_t
{
traffic,
traffic_bootstrap,
traffic_tcp,
error,
message,
block,
@ -228,6 +228,7 @@ public:
http_callback,
peering,
ipc,
tcp,
udp,
confirmation_height
};
@ -297,6 +298,10 @@ public:
invalid_node_id_handshake_message,
outdated_version,
// tcp
tcp_accept_success,
tcp_accept_failure,
// ipc
invocations,

View file

@ -2,6 +2,7 @@
#include <chrono>
#include <nano/lib/errors.hpp>
#include <nano/lib/utility.hpp>
#include <nano/node/node.hpp>
namespace nano

View file

@ -2,7 +2,7 @@
#include <nano/node/transport/tcp.hpp>
nano::transport::channel_tcp::channel_tcp (nano::node & node_a, std::shared_ptr<nano::socket> socket_a) :
node (node_a),
channel (node_a),
socket (socket_a)
{
}
@ -24,15 +24,15 @@ bool nano::transport::channel_tcp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_tcp::send_buffer_raw (boost::asio::const_buffer buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
void nano::transport::channel_tcp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
socket->async_write (buffer_a, callback_a);
socket->async_write (buffer_a, callback (buffer_a, detail_a, callback_a));
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_tcp::callback (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
// clang-format off
return [ buffer_a, node = std::weak_ptr<nano::node> (node.shared ()), detail_a, callback_a ](boost::system::error_code const & ec, size_t size_a)
return [ buffer_a, node = std::weak_ptr<nano::node> (node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
{
if (auto node_l = node.lock ())
{
@ -40,14 +40,9 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
}
if (!ec)
if (callback_a)
{
node_l->stats.add (nano::stat::type::traffic, nano::stat::dir::out, size_a);
node_l->stats.inc (nano::stat::type::message, detail_a, nano::stat::dir::out);
if (callback_a)
{
callback_a (ec, size_a);
}
callback_a (ec, size_a);
}
}
};

View file

@ -14,14 +14,13 @@ namespace transport
channel_tcp (nano::node &, std::shared_ptr<nano::socket>);
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer_raw (boost::asio::const_buffer, std::function<void(boost::system::error_code const &, size_t)> const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::string to_string () const override;
bool operator== (nano::transport::channel_tcp const & other_a) const
{
return &node == &other_a.node && socket == other_a.socket;
}
nano::node & node;
std::shared_ptr<nano::socket> socket;
};
} // namespace transport

View file

@ -1,4 +1,5 @@
#include <nano/node/common.hpp>
#include <nano/node/node.hpp>
#include <nano/node/transport/transport.hpp>
namespace
@ -56,9 +57,9 @@ nano::endpoint nano::transport::map_endpoint_to_v6 (nano::endpoint const & endpo
return endpoint_l;
}
void nano::transport::channel::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
nano::transport::channel::channel (nano::node & node_a) :
node (node_a)
{
send_buffer_raw (boost::asio::buffer (buffer_a->data (), buffer_a->size ()), callback (buffer_a, detail_a, callback_a));
}
void nano::transport::channel::send (nano::message const & message_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
@ -66,5 +67,7 @@ void nano::transport::channel::send (nano::message const & message_a, std::funct
callback_visitor visitor;
message_a.visit (visitor);
auto buffer (message_a.to_bytes ());
send_buffer (buffer, visitor.result, callback_a);
auto detail (visitor.result);
send_buffer (buffer, detail, callback_a);
node.stats.inc (nano::stat::type::message, detail, nano::stat::dir::out);
}

View file

@ -14,14 +14,17 @@ namespace transport
class channel
{
public:
channel (nano::node &);
virtual ~channel () = default;
virtual size_t hash_code () const = 0;
virtual bool operator== (nano::transport::channel const &) const = 0;
void send (nano::message const &, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const;
virtual void send_buffer_raw (boost::asio::const_buffer, std::function<void(boost::system::error_code const &, size_t)> const &) const = 0;
virtual void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const = 0;
virtual std::string to_string () const = 0;
protected:
nano::node & node;
};
} // namespace transport
} // namespace nano

View file

@ -5,6 +5,7 @@
std::chrono::seconds constexpr nano::transport::udp_channels::syn_cookie_cutoff;
nano::transport::channel_udp::channel_udp (nano::transport::udp_channels & channels_a, nano::endpoint const & endpoint_a, unsigned network_version_a) :
channel (channels_a.node),
network_version (network_version_a),
endpoint (endpoint_a),
channels (channels_a)
@ -29,15 +30,15 @@ bool nano::transport::channel_udp::operator== (nano::transport::channel const &
return result;
}
void nano::transport::channel_udp::send_buffer_raw (boost::asio::const_buffer buffer_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
void nano::transport::channel_udp::send_buffer (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
channels.send (buffer_a, endpoint, callback_a);
channels.send (boost::asio::const_buffer (buffer_a->data (), buffer_a->size ()), endpoint, callback (buffer_a, detail_a, callback_a));
}
std::function<void(boost::system::error_code const &, size_t)> nano::transport::channel_udp::callback (std::shared_ptr<std::vector<uint8_t>> buffer_a, nano::stat::detail detail_a, std::function<void(boost::system::error_code const &, size_t)> const & callback_a) const
{
// clang-format off
return [ buffer_a, node = std::weak_ptr<nano::node> (channels.node.shared ()), detail_a, callback_a ](boost::system::error_code const & ec, size_t size_a)
return [ buffer_a, node = std::weak_ptr<nano::node> (channels.node.shared ()), callback_a ](boost::system::error_code const & ec, size_t size_a)
{
if (auto node_l = node.lock ())
{
@ -45,14 +46,14 @@ std::function<void(boost::system::error_code const &, size_t)> nano::transport::
{
node_l->stats.inc (nano::stat::type::error, nano::stat::detail::unreachable_host, nano::stat::dir::out);
}
if (!ec)
if (size_a > 0)
{
node_l->stats.add (nano::stat::type::traffic, nano::stat::dir::out, size_a);
node_l->stats.inc (nano::stat::type::message, detail_a, nano::stat::dir::out);
if (callback_a)
{
callback_a (ec, size_a);
}
}
if (callback_a)
{
callback_a (ec, size_a);
}
}
};
@ -75,6 +76,7 @@ socket (node_a.io_ctx, nano::endpoint (boost::asio::ip::address_v6::any (), port
{
node.logger.try_log ("Unable to retrieve port: ", ec.message ());
}
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
@ -390,14 +392,29 @@ void nano::transport::udp_channels::stop ()
std::lock_guard<std::mutex> lock (mutex);
local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
// On test-net, close directly to avoid address-reuse issues. On livenet, close
// through the strand as multiple IO threads may access the socket.
// clang-format off
boost::asio::post (strand, [this] {
boost::system::error_code ignored;
this->socket.close (ignored);
});
if (node.network_params.network.is_test_network ())
{
this->close_socket ();
}
else
{
boost::asio::dispatch (strand, [this] {
this->close_socket ();
});
}
// clang-format on
}
void nano::transport::udp_channels::close_socket ()
{
boost::system::error_code ignored;
this->socket.close (ignored);
this->local_endpoint = nano::endpoint (boost::asio::ip::address_v6::loopback (), 0);
}
nano::endpoint nano::transport::udp_channels::get_local_endpoint () const
{
std::lock_guard<std::mutex> lock (mutex);

View file

@ -19,7 +19,7 @@ namespace transport
channel_udp (nano::transport::udp_channels &, nano::endpoint const &, unsigned = nano::protocol_version);
size_t hash_code () const override;
bool operator== (nano::transport::channel const &) const override;
void send_buffer_raw (boost::asio::const_buffer, std::function<void(boost::system::error_code const &, size_t)> const &) const override;
void send_buffer (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::function<void(boost::system::error_code const &, size_t)> callback (std::shared_ptr<std::vector<uint8_t>>, nano::stat::detail, std::function<void(boost::system::error_code const &, size_t)> const & = nullptr) const override;
std::string to_string () const override;
bool operator== (nano::transport::channel_udp const & other_a) const
@ -130,6 +130,7 @@ namespace transport
static std::chrono::seconds constexpr syn_cookie_cutoff = std::chrono::seconds (5);
private:
void close_socket ();
void ongoing_syn_cookie_cleanup ();
class endpoint_tag
{

View file

@ -15,6 +15,7 @@ int main (int argc, char ** argv)
nano_qt::eventloop_processor processor;
static int count (16);
nano::system system (24000, count);
nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads);
std::unique_ptr<QTabWidget> client_tabs (new QTabWidget);
std::vector<std::unique_ptr<nano_qt::wallet>> guis;
for (auto i (0); i < count; ++i)
@ -28,7 +29,6 @@ int main (int argc, char ** argv)
client_tabs->addTab (guis.back ()->client_window, boost::str (boost::format ("Wallet %1%") % i).c_str ());
}
client_tabs->show ();
nano::thread_runner runner (system.io_ctx, system.nodes[0]->config.io_threads);
QObject::connect (&application, &QApplication::aboutToQuit, [&]() {
system.stop ();
});