diff --git a/CMakeLists.txt b/CMakeLists.txt index 2ea958b4..857912e7 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -1,8 +1,8 @@ cmake_minimum_required (VERSION 2.8.11) project (rai) -set (CPACK_PACKAGE_VERSION_MAJOR "7") -set (CPACK_PACKAGE_VERSION_MINOR "9") +set (CPACK_PACKAGE_VERSION_MAJOR "8") +set (CPACK_PACKAGE_VERSION_MINOR "0") set (CPACK_PACKAGE_VERSION_PATCH "0") set (RAIBLOCKS_GUI OFF CACHE BOOL "") @@ -107,43 +107,10 @@ set (IPHLPAPI_LIBRARY iphlpapi CACHE STRING "") add_subdirectory (miniupnp/miniupnpc) include_directories (miniupnp/miniupnpc) -add_library (cryptopp - cryptopp/algparam.cpp - cryptopp/asn.cpp - cryptopp/basecode.cpp - cryptopp/cpu.cpp - cryptopp/cryptlib.cpp - cryptopp/default.cpp - cryptopp/des.cpp - cryptopp/dessp.cpp - cryptopp/dll.cpp - cryptopp/ec2n.cpp - cryptopp/ecp.cpp - cryptopp/filters.cpp - cryptopp/fips140.cpp - cryptopp/gcm.cpp - cryptopp/gf2n.cpp - cryptopp/gfpcrypt.cpp - cryptopp/hex.cpp - cryptopp/hmac.cpp - cryptopp/hrtimer.cpp - cryptopp/integer.cpp - cryptopp/iterhash.cpp - cryptopp/misc.cpp - cryptopp/modes.cpp - cryptopp/mqueue.cpp - cryptopp/nbtheory.cpp - cryptopp/oaep.cpp - cryptopp/osrng.cpp - cryptopp/pubkey.cpp - cryptopp/queue.cpp - cryptopp/randpool.cpp - cryptopp/rdtables.cpp - cryptopp/rijndael.cpp - cryptopp/rng.cpp - cryptopp/sha.cpp - cryptopp/simple.cpp - cryptopp/winpipes.cpp) +set (BUILD_SHARED OFF CACHE BOOL "") +set (BUILD_TESTING OFF CACHE BOOL "") +set (USE_INTERMEDIATE_OBJECTS_TARGET OFF CACHE BOOL "") +add_subdirectory (cryptopp) add_library (argon2 Argon2/Source/Argon2/argon2.cpp @@ -245,7 +212,7 @@ if (RAIBLOCKS_TEST) add_executable (slow_test rai/slow_test/node.cpp) - set_target_properties (core_test slow_test PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}") + set_target_properties (core_test slow_test PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1") set_target_properties (core_test slow_test PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}") endif (RAIBLOCKS_TEST) @@ -272,7 +239,7 @@ if (RAIBLOCKS_GUI) set_target_properties (rai_wallet qt_test PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}") - set_target_properties (qt_test qt rai_wallet qt_system PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}") + set_target_properties (qt_test qt rai_wallet qt_system PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1") set_target_properties (qt qt_system PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}") endif (RAIBLOCKS_GUI) @@ -281,11 +248,10 @@ add_executable (rai_node rai/rai_node/daemon.hpp rai/rai_node/entry.cpp) -set_target_properties (cryptopp PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS}") set_target_properties (argon2 PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS}") set_target_properties (blake2 PROPERTIES COMPILE_FLAGS "${PLATFORM_C_FLAGS} ${PLATFORM_COMPILE_FLAGS} -D__SSE2__") set_target_properties (ed25519 PROPERTIES COMPILE_FLAGS "${PLATFORM_C_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DED25519_CUSTOMHASH -DED25519_CUSTOMRNG") -set_target_properties (secure node rai_node PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}") +set_target_properties (secure node rai_node PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1") set_target_properties (secure node rai_node PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}") if (WIN32) @@ -301,20 +267,20 @@ else (WIN32) endif (WIN32) if (RAIBLOCKS_TEST) - target_link_libraries (core_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) + target_link_libraries (core_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) - target_link_libraries (slow_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) + target_link_libraries (slow_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) endif (RAIBLOCKS_TEST) if (RAIBLOCKS_GUI) - target_link_libraries (qt_test node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets Qt5::Test ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS}) + target_link_libraries (qt_test node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets Qt5::Test ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS}) - target_link_libraries (qt_system node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS}) + target_link_libraries (qt_system node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS}) - target_link_libraries (rai_wallet node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS} ${PLATFORM_WALLET_LIBS}) + target_link_libraries (rai_wallet node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS} ${PLATFORM_WALLET_LIBS}) endif (RAIBLOCKS_GUI) -target_link_libraries (rai_node node secure lmdb xxhash ed25519 argon2 blake2 cryptopp libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) +target_link_libraries (rai_node node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS}) set (CPACK_RESOURCE_FILE_LICENSE ${CMAKE_SOURCE_DIR}/LICENSE) if (RAIBLOCKS_GUI) diff --git a/rai/core_test/block_store.cpp b/rai/core_test/block_store.cpp index 4f872984..bd7bacd0 100644 --- a/rai/core_test/block_store.cpp +++ b/rai/core_test/block_store.cpp @@ -874,3 +874,19 @@ TEST (block_store, upgrade_v7_v8) ASSERT_EQ (store.unchecked_end (), iterator1); } } + +TEST (block_store, sequence_flush) +{ + auto path (rai::unique_path ()); + bool init (false); + rai::block_store store (init, path); + ASSERT_FALSE (init); + rai::transaction transaction (store.environment, nullptr, true); + rai::account account (0); + auto seq1 (store.sequence_atomic_inc (transaction, account)); + auto seq2 (store.sequence_get (transaction, account)); + ASSERT_NE (seq2, seq1); + store.sequence_flush(transaction); + auto seq3 (store.sequence_get (transaction, account)); + ASSERT_EQ (seq3, seq1); +} diff --git a/rai/core_test/gap_cache.cpp b/rai/core_test/gap_cache.cpp index 287608c8..c7c52f27 100644 --- a/rai/core_test/gap_cache.cpp +++ b/rai/core_test/gap_cache.cpp @@ -56,7 +56,7 @@ TEST (gap_cache, gap_bootstrap) auto send (std::make_shared (latest, key.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest))); { rai::transaction transaction (system.nodes [0]->store.environment, nullptr, true); - ASSERT_EQ (rai::process_result::progress, system.nodes [0]->process_receive_one (transaction, send).code); + ASSERT_EQ (rai::process_result::progress, system.nodes [0]->block_processor.process_receive_one (transaction, send).code); } ASSERT_EQ (rai::genesis_amount - 100, system.nodes [0]->balance (rai::genesis_account)); ASSERT_EQ (rai::genesis_amount, system.nodes [1]->balance (rai::genesis_account)); @@ -83,11 +83,11 @@ TEST (gap_cache, two_dependencies) auto send2 (std::make_shared (send1->hash (), key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1->hash ()))); auto open (std::make_shared (send1->hash (), key.pub, key.pub, key.prv, key.pub, system.work.generate (key.pub))); ASSERT_EQ (0, system.nodes [0]->gap_cache.blocks.size ()); - system.nodes [0]->process_receive_many (send2); + system.nodes [0]->block_processor.process_receive_many (send2); ASSERT_EQ (1, system.nodes [0]->gap_cache.blocks.size ()); - system.nodes [0]->process_receive_many (open); + system.nodes [0]->block_processor.process_receive_many (open); ASSERT_EQ (2, system.nodes [0]->gap_cache.blocks.size ()); - system.nodes [0]->process_receive_many (send1); + system.nodes [0]->block_processor.process_receive_many (send1); ASSERT_EQ (0, system.nodes [0]->gap_cache.blocks.size ()); rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); ASSERT_TRUE (system.nodes [0]->store.block_exists (transaction, send1->hash ())); diff --git a/rai/core_test/message.cpp b/rai/core_test/message.cpp index 1f1cb32c..2ef34aa4 100644 --- a/rai/core_test/message.cpp +++ b/rai/core_test/message.cpp @@ -54,8 +54,8 @@ TEST (message, publish_serialization) ASSERT_EQ (8, bytes.size ()); ASSERT_EQ (0x52, bytes [0]); ASSERT_EQ (0x41, bytes [1]); - ASSERT_EQ (0x03, bytes [2]); - ASSERT_EQ (0x03, bytes [3]); + ASSERT_EQ (0x04, bytes [2]); + ASSERT_EQ (0x04, bytes [3]); ASSERT_EQ (0x01, bytes [4]); ASSERT_EQ (static_cast (rai::message_type::publish), bytes [5]); ASSERT_EQ (0x02, bytes [6]); @@ -68,8 +68,8 @@ TEST (message, publish_serialization) std::bitset <16> extensions; ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type, extensions)); ASSERT_EQ (0x01, version_min); - ASSERT_EQ (0x03, version_using); - ASSERT_EQ (0x03, version_max); + ASSERT_EQ (0x04, version_using); + ASSERT_EQ (0x04, version_max); ASSERT_EQ (rai::message_type::publish, type); } diff --git a/rai/core_test/network.cpp b/rai/core_test/network.cpp index 125a369a..669275dc 100644 --- a/rai/core_test/network.cpp +++ b/rai/core_test/network.cpp @@ -149,9 +149,10 @@ TEST (network, send_discarded_publish) { rai::system system (24000, 2); auto block (std::make_shared (1, 1, 2, rai::keypair ().prv, 4, system.work.generate (1))); - system.nodes [0]->network.republish_block (block); + rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); + system.nodes [0]->network.republish_block (transaction, block); rai::genesis genesis; - ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub)); + ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub)); auto iterations (0); while (system.nodes [1]->network.incoming.publish == 0) @@ -160,7 +161,7 @@ TEST (network, send_discarded_publish) ++iterations; ASSERT_LT (iterations, 200); } - ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub)); + ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub)); } @@ -168,9 +169,10 @@ TEST (network, send_invalid_publish) { rai::system system (24000, 2); auto block (std::make_shared (1, 1, 20, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (1))); - system.nodes [0]->network.republish_block (block); + rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); + system.nodes [0]->network.republish_block (transaction, block); rai::genesis genesis; - ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub)); + ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub)); auto iterations (0); while (system.nodes [1]->network.incoming.publish == 0) @@ -179,7 +181,7 @@ TEST (network, send_invalid_publish) ++iterations; ASSERT_LT (iterations, 200); } - ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub)); + ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub)); ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub)); } @@ -192,7 +194,7 @@ TEST (network, send_valid_confirm_ack) rai::block_hash latest1 (system.nodes [0]->latest (rai::test_genesis_key.pub)); rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1)); rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub)); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (block2))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (block2))); auto iterations (0); // Keep polling until latest block changes while (system.nodes [1]->latest (rai::test_genesis_key.pub) == latest2) @@ -215,7 +217,7 @@ TEST (network, send_valid_publish) rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1)); auto hash2 (block2.hash ()); rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub)); - system.nodes [1]->process_receive_republish (std::unique_ptr (new rai::send_block (block2))); + system.nodes [1]->process_active (std::unique_ptr (new rai::send_block (block2))); auto iterations (0); while (system.nodes [0]->network.incoming.publish == 0) { @@ -300,8 +302,8 @@ TEST (receivable_processor, send_with_receive) ASSERT_EQ (0, system.nodes [0]->balance (key2.pub)); ASSERT_EQ (amount, system.nodes [1]->balance (rai::test_genesis_key.pub)); ASSERT_EQ (0, system.nodes [1]->balance (key2.pub)); - system.nodes [0]->process_receive_republish (block1); - system.nodes [1]->process_receive_republish (block1); + system.nodes [0]->process_active (block1); + system.nodes [1]->process_active (block1); ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [0]->balance (rai::test_genesis_key.pub)); ASSERT_EQ (0, system.nodes [0]->balance (key2.pub)); ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [1]->balance (rai::test_genesis_key.pub)); @@ -513,6 +515,7 @@ TEST (bootstrap_processor, process_one) ++iterations; ASSERT_LT (iterations, 200); } + ASSERT_EQ (0, node1->active.roots.size ()); node1->stop (); } diff --git a/rai/core_test/node.cpp b/rai/core_test/node.cpp index ca26904d..1c618912 100644 --- a/rai/core_test/node.cpp +++ b/rai/core_test/node.cpp @@ -163,8 +163,8 @@ TEST (node, send_out_of_order) rai::genesis genesis; rai::send_block send1 (genesis.hash (), key2.pub, std::numeric_limits ::max () - system.nodes [0]->config.receive_minimum.number (), rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())); rai::send_block send2 (send1.hash (), key2.pub, std::numeric_limits ::max () - system.nodes [0]->config.receive_minimum.number () * 2, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1.hash ())); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (send2))); - system.nodes [0]->process_receive_republish (std::unique_ptr (new rai::send_block (send1))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (send2))); + system.nodes [0]->process_active (std::unique_ptr (new rai::send_block (send1))); auto iterations (0); while (std::any_of (system.nodes.begin (), system.nodes.end (), [&] (std::shared_ptr const & node_a) {return node_a->balance (rai::test_genesis_key.pub) != rai::genesis_amount - system.nodes [0]->config.receive_minimum.number () * 2;})) { @@ -181,7 +181,7 @@ TEST (node, quick_confirm) rai::block_hash previous (system.nodes [0]->latest (rai::test_genesis_key.pub)); system.wallet (0)->insert_adhoc (key.prv); auto send (std::make_shared (previous, key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (previous))); - system.nodes [0]->process_receive_republish (send); + system.nodes [0]->process_active (send); auto iterations (0); while (system.nodes [0]->balance (key.pub).is_zero ()) { @@ -210,7 +210,10 @@ TEST (node, auto_bootstrap) ASSERT_FALSE (init1.error ()); node1->network.send_keepalive (system.nodes [0]->network.endpoint ()); node1->start (); - ASSERT_TRUE (node1->bootstrap_initiator.in_progress ()); + while (!node1->bootstrap_initiator.in_progress ()) + { + system.poll (); + } auto iterations3 (0); while (node1->balance (key2.pub) != system.nodes [0]->config.receive_minimum.number ()) { @@ -633,7 +636,8 @@ TEST (node, confirm_locked) system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv); system.wallet (0)->enter_password ("1"); auto block (std::make_shared (0, 0, 0, rai::keypair ().prv, 0, 0)); - system.nodes [0]->network.republish_block (block); + rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); + system.nodes [0]->network.republish_block (transaction, block); } TEST (node_config, random_rep) @@ -652,7 +656,7 @@ TEST (node, block_replace) system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv); auto block1 (system.wallet (0)->send_action (rai::test_genesis_key.pub, 0, rai::Gxrb_ratio)); auto block3 (system.wallet (0)->send_action (rai::test_genesis_key.pub, 0, rai::Gxrb_ratio)); - ASSERT_NE (nullptr, block1); + ASSERT_NE (nullptr, block1); auto initial_work (block1->block_work ()); while (system.work.work_value (block1->root (), block1->block_work ()) <= system.work.work_value (block1->root (), initial_work)) { @@ -662,7 +666,11 @@ TEST (node, block_replace) rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false); ASSERT_EQ (block3->hash (), system.nodes [0]->store.block_successor (transaction, block1->hash ())); } - system.nodes [1]->network.republish_block (block1); + for (auto i (0); i < 1; ++i) + { + rai::transaction transaction_a (system.nodes [1]->store.environment, nullptr, false); + system.nodes [1]->network.republish_block (transaction_a, block1); + } auto iterations1 (0); std::unique_ptr block2; while (block2 == nullptr) @@ -699,13 +707,13 @@ TEST (node, fork_publish) auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); rai::keypair key2; auto send2 (std::make_shared (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); - node1.process_receive_republish (send1); + node1.process_active (send1); ASSERT_EQ (1, node1.active.roots.size ()); auto existing (node1.active.roots.find (send1->root ())); ASSERT_NE (node1.active.roots.end (), existing); auto election (existing->election); ASSERT_EQ (2, election->votes.rep_votes.size ()); - node1.process_receive_republish (send2); + node1.process_active (send2); auto existing1 (election->votes.rep_votes.find (rai::test_genesis_key.pub)); ASSERT_NE (election->votes.rep_votes.end (), existing1); ASSERT_EQ (*send1, *existing1->second); @@ -730,12 +738,12 @@ TEST (node, fork_keep) // send1 and send2 fork to different accounts auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); auto send2 (std::make_shared (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - node1.process_receive_republish (send1); - node2.process_receive_republish (send1); + node1.process_active (send1); + node2.process_active (send1); ASSERT_EQ (1, node1.active.roots.size ()); ASSERT_EQ (1, node2.active.roots.size ()); - node1.process_receive_republish (send2); - node2.process_receive_republish (send2); + node1.process_active (send2); + node2.process_active (send2); auto conflict (node2.active.roots.find (genesis.hash ())); ASSERT_NE (node2.active.roots.end (), conflict); auto votes1 (conflict->election); @@ -889,8 +897,8 @@ TEST (node, fork_bootstrap_flip) rai::keypair key2; auto send2 (std::make_shared (latest, key2.pub, rai::genesis_amount - rai::Gxrb_ratio, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system0.work.generate (latest))); // Insert but don't rebroadcast, simulating settled blocks - node1.process_receive_many (send1); - node2.process_receive_many (send2); + node1.block_processor.process_receive_many (send1); + node2.block_processor.process_receive_many (send2); { rai::transaction transaction (node2.store.environment, nullptr, false); ASSERT_TRUE (node2.store.block_exists (transaction, send2->hash ())); @@ -952,22 +960,22 @@ TEST (node, fork_open_flip) rai::keypair rep1; rai::keypair rep2; auto send1 (std::make_shared (genesis.hash (), key1.pub, rai::genesis_amount - 1, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()))); - node1.process_receive_republish (send1); - node2.process_receive_republish (send1); + node1.process_active (send1); + node2.process_active (send1); // We should be keeping this block auto open1 (std::make_shared (send1->hash (), rep1.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); // This block should be evicted auto open2 (std::make_shared (send1->hash (), rep2.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub))); ASSERT_FALSE (*open1 == *open2); // node1 gets copy that will remain - node1.process_receive_republish (open1); + node1.process_active (open1); // node2 gets copy that will be evicted - node2.process_receive_republish (open2); + node2.process_active (open2); ASSERT_EQ (2, node1.active.roots.size ()); ASSERT_EQ (2, node2.active.roots.size ()); // Notify both nodes that a fork exists - node1.process_receive_republish (open2); - node2.process_receive_republish (open1); + node1.process_active (open2); + node2.process_active (open1); auto conflict (node2.active.roots.find (open1->root ())); ASSERT_NE (node2.active.roots.end (), conflict); auto votes1 (conflict->election); @@ -996,10 +1004,10 @@ TEST (node, coherent_observer) { rai::system system (24000, 1); auto & node1 (*system.nodes [0]); - node1.observers.blocks.add ([&node1] (rai::block const & block_a, rai::account const & account_a, rai::amount const &) + node1.observers.blocks.add ([&node1] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const &) { rai::transaction transaction (node1.store.environment, nullptr, false); - ASSERT_TRUE (node1.store.block_exists (transaction, block_a.hash ())); + ASSERT_TRUE (node1.store.block_exists (transaction, block_a->hash ())); }); system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv); rai::keypair key; @@ -1104,12 +1112,12 @@ TEST (node, broadcast_elected) system.wallet (2)->insert_adhoc (rep_other.prv); auto fork0 (std::make_shared (node2->latest (rai::test_genesis_key.pub), rep_small.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); node0->generate_work (*fork0); - node0->process_receive_republish (fork0); - node1->process_receive_republish (fork0); + node0->process_active (fork0); + node1->process_active (fork0); auto fork1 (std::make_shared (node2->latest (rai::test_genesis_key.pub), rep_big.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0)); node0->generate_work (*fork1); system.wallet (2)->insert_adhoc (rep_small.prv); - node2->process_receive_republish (fork1); + node2->process_active (fork1); //std::cerr << "fork0: " << fork_hash.to_string () << std::endl; //std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl; auto iterations (0); @@ -1173,7 +1181,7 @@ TEST (node, bootstrap_no_publish) node1->bootstrap_initiator.bootstrap (node0->network.endpoint ()); ASSERT_TRUE (node1->active.roots.empty ()); auto iterations1 (0); - while (node1->bootstrap_initiator.in_progress ()) + while (node1->block (send0.hash ()) == nullptr) { // Poll until the TCP connection is torn down and in_progress goes false system0.poll (); diff --git a/rai/core_test/rpc.cpp b/rai/core_test/rpc.cpp index 20111efa..3c5ec60c 100644 --- a/rai/core_test/rpc.cpp +++ b/rai/core_test/rpc.cpp @@ -290,7 +290,7 @@ TEST (rpc, send_fail) request.put ("source", rai::test_genesis_key.pub.to_account ()); request.put ("destination", rai::test_genesis_key.pub.to_account ()); request.put ("amount", "100"); - auto done (false); + std::atomic done (false); std::thread thread2 ([&system, &done] () { auto iterations (0); @@ -1221,6 +1221,7 @@ TEST (rpc, DISABLED_payment_wait) TEST (rpc, peers) { rai::system system (24000, 2); + system.nodes [0]->peers.insert (rai::endpoint (boost::asio::ip::address_v6::from_string ("::ffff:80.80.80.80"), 4000), 1); rai::rpc rpc (system.service, *system.nodes [0], rai::rpc_config (true)); rpc.start (); boost::property_tree::ptree request; @@ -1232,7 +1233,7 @@ TEST (rpc, peers) } ASSERT_EQ (200, response.status); auto & peers_node (response.json.get_child ("peers")); - ASSERT_EQ (1, peers_node.size ()); + ASSERT_EQ (2, peers_node.size ()); } TEST (rpc, pending) @@ -1359,7 +1360,7 @@ TEST (rpc, version) ASSERT_EQ ("1", response1.json.get ("rpc_version")); ASSERT_EQ (200, response1.status); ASSERT_EQ ("8", response1.json.get ("store_version")); - ASSERT_EQ (boost::str (boost::format ("RaiBlocks %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH), response1.json.get ("node_vendor")); + ASSERT_EQ (boost::str (boost::format ("RaiBlocks %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR), response1.json.get ("node_vendor")); auto headers (response1.resp.find ("Access-Control-Allow-Origin")); ASSERT_NE (response1.resp.end (), headers); ASSERT_EQ ("*", headers->value ()); @@ -1432,7 +1433,7 @@ TEST (rpc, work_peer_bad) rpc.start (); node2.config.work_peers.push_back (std::make_pair (boost::asio::ip::address_v6::any (), 0)); rai::block_hash hash1 (1); - uint64_t work (0); + std::atomic work (0); node2.generate_work (hash1, [&work] (uint64_t work_a) { work = work_a; @@ -1958,7 +1959,7 @@ TEST (rpc, bootstrap_any) ASSERT_TRUE (success.empty()); } -TEST (rpc, republish) +TEST (rpc, DISABLED_republish) { rai::system system (24000, 2); rai::keypair key; @@ -2486,7 +2487,7 @@ TEST (rpc, search_pending_all) } } -TEST (rpc, wallet_republish) +TEST (rpc, DISABLED_wallet_republish) { rai::system system (24000, 1); rai::genesis genesis; diff --git a/rai/core_test/work_pool.cpp b/rai/core_test/work_pool.cpp index e99c4a4d..aef1624b 100644 --- a/rai/core_test/work_pool.cpp +++ b/rai/core_test/work_pool.cpp @@ -52,7 +52,7 @@ TEST (work, cancel_many) pool.cancel (key1); } -TEST (work, opencl) +TEST (work, DISABLED_opencl) { rai::logging logging; logging.init (rai::unique_path ()); diff --git a/rai/node/bootstrap.cpp b/rai/node/bootstrap.cpp index ed374e68..55065adb 100755 --- a/rai/node/bootstrap.cpp +++ b/rai/node/bootstrap.cpp @@ -150,21 +150,15 @@ rai::sync_result rai::push_synchronization::target (MDB_txn * transaction_a, rai rai::bootstrap_client::bootstrap_client (std::shared_ptr node_a, std::shared_ptr attempt_a, rai::tcp_endpoint const & endpoint_a) : node (node_a), attempt (attempt_a), -socket (node_a->network.service), -pull_client (*this), +socket (node_a->service), endpoint (endpoint_a), -timeout (node_a->network.service) +timeout (node_a->service) { + ++attempt->connections; } rai::bootstrap_client::~bootstrap_client () { - if (!pull_client.pull.account.is_zero ()) - { - // If this connection is ending and request_account hasn't been cleared it didn't finish, requeue - attempt->requeue_pull (pull_client.pull); - --attempt->pulling; - } --attempt->connections; } @@ -179,6 +173,7 @@ void rai::bootstrap_client::start_timeout () auto this_l (this_w.lock ()); if (this_l != nullptr) { + BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->endpoint); this_l->socket.close (); } } @@ -187,7 +182,7 @@ void rai::bootstrap_client::start_timeout () void rai::bootstrap_client::stop_timeout () { - auto killed (timeout.expires_from_now ()); + size_t killed (timeout.cancel ()); (void) killed; } @@ -201,7 +196,7 @@ void rai::bootstrap_client::run () if (!ec) { BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint); - this_l->work (); + this_l->attempt->pool_connection (this_l->shared_from_this ()); } else { @@ -222,13 +217,6 @@ void rai::bootstrap_client::run () }); } -void rai::bootstrap_client::frontier_request () -{ - auto this_l (shared_from_this ()); - auto client_l (std::make_shared (this_l)); - client_l->run (); -} - void rai::frontier_req_client::run () { std::unique_ptr request (new rai::frontier_req); @@ -264,7 +252,7 @@ std::shared_ptr rai::bootstrap_client::shared () return shared_from_this (); } -rai::frontier_req_client::frontier_req_client (std::shared_ptr const & connection_a) : +rai::frontier_req_client::frontier_req_client (std::shared_ptr connection_a) : connection (connection_a), current (0), count (0), @@ -276,23 +264,6 @@ next_report (std::chrono::system_clock::now () + std::chrono::seconds (15)) rai::frontier_req_client::~frontier_req_client () { - std::lock_guard lock (connection->attempt->mutex); - if (connection->attempt->state == rai::attempt_state::requesting_frontiers) - { - if (connection->node->config.logging.network_logging ()) - { - BOOST_LOG (connection->node->log) << "frontier_req failed, reattempting"; - } - connection->attempt->state = rai::attempt_state::starting; - connection->attempt->pulls.clear (); - } - else - { - if (connection->node->config.logging.network_logging ()) - { - BOOST_LOG (connection->node->log) << "Exiting frontier_req initiator"; - } - } } void rai::frontier_req_client::receive_frontier () @@ -325,7 +296,7 @@ void rai::frontier_req_client::unsynced (MDB_txn * transaction_a, rai::block_has void rai::frontier_req_client::received_frontier (boost::system::error_code const & ec, size_t size_a) { - if (!ec && connection->attempt->state != rai::attempt_state::complete) + if (!ec) { assert (size_a == sizeof (rai::uint256_union) + sizeof (rai::uint256_union)); rai::account account; @@ -417,7 +388,16 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons next (transaction); } } - connection->completed_frontier_request (); + { + try + { + promise.set_value (false); + } + catch (std::future_error &) + { + } + connection->attempt->pool_connection (connection); + } } } else @@ -443,14 +423,20 @@ void rai::frontier_req_client::next (MDB_txn * transaction_a) } } -rai::bulk_pull_client::bulk_pull_client (rai::bootstrap_client & connection_a) : -connection (connection_a), -account_count (0) +rai::bulk_pull_client::bulk_pull_client (std::shared_ptr connection_a) : +connection (connection_a) { + ++connection->attempt->pulling; } rai::bulk_pull_client::~bulk_pull_client () { + --connection->attempt->pulling; + if (!pull.account.is_zero ()) + { + connection->attempt->requeue_pull (pull); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % connection->endpoint); + } } void rai::bulk_pull_client::request (rai::pull_info const & pull_a) @@ -465,103 +451,106 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a) rai::vectorstream stream (*buffer); req.serialize (stream); } - if (connection.node->config.logging.bulk_pull_logging ()) + if (connection->node->config.logging.bulk_pull_logging ()) { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint); } - else if (connection.node->config.logging.network_logging () && account_count % 256 == 0) + else if (connection->node->config.logging.network_logging () && connection->attempt->account_count++ % 256 == 0) { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint); } - ++account_count; - auto connection_l (connection.shared ()); - connection.start_timeout (); - boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a) + auto this_l (shared_from_this ()); + connection->start_timeout (); + boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); + this_l->connection->stop_timeout (); if (!ec) { - connection_l->pull_client.receive_block (); + this_l->receive_block (); } else { - BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % connection_l->endpoint); + BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % this_l->connection->endpoint); } }); } void rai::bulk_pull_client::receive_block () { - auto connection_l (connection.shared ()); - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a) + auto this_l (shared_from_this ()); + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); + this_l->connection->stop_timeout (); if (!ec) { - connection_l->pull_client.received_type (); + this_l->received_type (); } else { - BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ()); + BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ()); } }); } void rai::bulk_pull_client::received_type () { - auto connection_l (connection.shared ()); - rai::block_type type (static_cast (connection.receive_buffer [0])); + auto this_l (shared_from_this ()); + rai::block_type type (static_cast (connection->receive_buffer [0])); switch (type) { case rai::block_type::send: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::send_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::receive: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::receive_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::open: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::open_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::change: { - connection.start_timeout (); - boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a) + connection->start_timeout (); + boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::change_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a) { - connection_l->stop_timeout (); - connection_l->pull_client.received_block (ec, size_a); + this_l->connection->stop_timeout (); + this_l->received_block (ec, size_a); }); break; } case rai::block_type::not_a_block: { - connection.completed_pull (); + if (expected == pull.end) + { + pull = rai::pull_info (); + connection->attempt->pool_connection (connection); + } break; } default: { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type)); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast (type)); break; } } @@ -571,58 +560,53 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec { if (!ec) { - rai::bufferstream stream (connection.receive_buffer.data (), 1 + size_a); + rai::bufferstream stream (connection->receive_buffer.data (), 1 + size_a); std::shared_ptr block (rai::deserialize_block (stream)); if (block != nullptr) { auto hash (block->hash ()); - if (connection.node->config.logging.bulk_pull_logging ()) + if (connection->node->config.logging.bulk_pull_logging ()) { std::string block_l; block->serialize_json (block_l); - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l); } if (hash == expected) { expected = block->previous (); } - auto attempt (connection.attempt); - // Process the block asynchronously from making the next network requests since this is a potentially long operation - // Hold a reference to the current attempt so we don't start another one while blocks are being processed. - attempt->node->background ([attempt, block] () - { - attempt->node->process_receive_many (block, [attempt] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr block_a) + auto attempt_l (connection->attempt); + attempt_l->node->block_processor.add (block, [attempt_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr block_a) + { + switch (result_a.code) { - switch (result_a.code) + case rai::process_result::progress: + case rai::process_result::old: + break; + case rai::process_result::fork: { - case rai::process_result::progress: - case rai::process_result::old: - break; - case rai::process_result::fork: - { - auto node_l (attempt->node); - std::shared_ptr block (node_l->ledger.forked_block (transaction_a, *block_a)); - node_l->active.start (transaction_a, block); - attempt->node->network.broadcast_confirm_req (block_a); - attempt->node->network.broadcast_confirm_req (block); - BOOST_LOG (attempt->node->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ()); - break; - } - default: - break; + auto node_l (attempt_l->node); + std::shared_ptr block (node_l->ledger.forked_block (transaction_a, *block_a)); + node_l->active.start (transaction_a, block); + node_l->network.broadcast_confirm_req (block_a); + node_l->network.broadcast_confirm_req (block); + BOOST_LOG (node_l->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ()); + break; } - }); + default: + break; + } }); receive_block (); } else { - BOOST_LOG (connection.node->log) << "Error deserializing block received from pull request"; + BOOST_LOG (connection->node->log) << "Error deserializing block received from pull request"; } } else { - BOOST_LOG (connection.node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()); + BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ()); } } @@ -638,22 +622,6 @@ synchronization (*connection->node, [this] (MDB_txn * transaction_a, rai::block rai::bulk_push_client::~bulk_push_client () { - std::lock_guard lock (connection->attempt->mutex); - if (connection->attempt->state == rai::attempt_state::pushing) - { - if (connection->node->config.logging.network_logging ()) - { - BOOST_LOG (connection->node->log) << "Bulk push client failed"; - } - connection->attempt->state = rai::attempt_state::complete; - } - else - { - if (connection->node->config.logging.network_logging ()) - { - BOOST_LOG (connection->node->log) << "Exiting bulk push client"; - } - } } void rai::bulk_push_client::start () @@ -722,7 +690,13 @@ void rai::bulk_push_client::send_finished () auto this_l (shared_from_this ()); async_write (connection->socket, boost::asio::buffer (buffer->data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a) { - this_l->connection->completed_pushes (); + try + { + this_l->promise.set_value (false); + } + catch (std::future_error &) + { + } }); } @@ -776,52 +750,186 @@ rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr node_a) : connections (0), pulling (0), node (node_a), -state (rai::attempt_state::starting) +account_count (0), +stopped (false) { BOOST_LOG (node->log) << "Starting bootstrap attempt"; + node->bootstrap_initiator.notify_listeners (true); } rai::bootstrap_attempt::~bootstrap_attempt () { - node->bootstrap_initiator.notify_listeners (); BOOST_LOG (node->log) << "Exiting bootstrap attempt"; + node->bootstrap_initiator.notify_listeners (false); +} + +bool rai::bootstrap_attempt::request_frontier (std::unique_lock & lock_a) +{ + auto result (true); + auto connection_l (connection (lock_a)); + if (connection_l) + { + std::future future; + { + auto client (std::make_shared (connection_l)); + client->run (); + frontiers = client; + future = client->promise.get_future (); + } + lock_a.unlock (); + result = consume_future (future); + lock_a.lock (); + if (result) + { + pulls.clear (); + } + if (node->config.logging.network_logging ()) + { + if (!result) + { + BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % connection_l->endpoint); + } + else + { + BOOST_LOG (node->log) << "frontier_req failed, reattempting"; + } + } + } + return result; +} + +void rai::bootstrap_attempt::request_pull (std::unique_lock & lock_a) +{ + auto connection_l (connection (lock_a)); + if (connection_l) + { + auto pull (pulls.front ()); + pulls.pop_front (); + auto client (std::make_shared (connection_l)); + client->request (pull); + } +} + +bool rai::bootstrap_attempt::request_push (std::unique_lock & lock_a) +{ + auto result (true); + auto connection_l (connection (lock_a)); + if (connection_l) + { + std::future future; + { + auto client (std::make_shared (connection_l)); + client->start (); + push = client; + future = client->promise.get_future (); + } + lock_a.unlock (); + result = consume_future (future); + lock_a.lock (); + if (node->config.logging.network_logging ()) + { + BOOST_LOG (node->log) << "Exiting bulk push client"; + if (result) + { + BOOST_LOG (node->log) << "Bulk push client failed"; + } + } + } + return result; +} + +void rai::bootstrap_attempt::run () +{ + populate_connections (); + std::unique_lock lock (mutex); + auto frontier_failure (true); + while (!stopped && frontier_failure) + { + frontier_failure = request_frontier (lock); + } + while (!stopped && (!pulls.empty () || pulling > 0)) + { + if (!pulls.empty ()) + { + request_pull (lock); + } + else + { + condition.wait (lock); + } + } + if (!stopped) + { + BOOST_LOG (node->log) << "Completed pulls"; + } + auto push_failure (true); + while (!stopped && push_failure) + { + push_failure = request_push (lock); + } + stopped = true; + condition.notify_all (); + idle.clear (); +} + +std::shared_ptr rai::bootstrap_attempt::connection (std::unique_lock & lock_a) +{ + while (!stopped && idle.empty ()) + { + condition.wait (lock_a); + } + std::shared_ptr result; + if (!idle.empty ()) + { + result = idle.back (); + idle.pop_back (); + } + return result; +} + +bool rai::bootstrap_attempt::consume_future (std::future & future_a) +{ + bool result; + try + { + result = future_a.get (); + } + catch (std::future_error &) + { + result = true; + } + return result; } void rai::bootstrap_attempt::populate_connections () { - if (++connections < node->config.bootstrap_connections) + if (connections < node->config.bootstrap_connections) { auto peer (node->peers.bootstrap_peer ()); - if (peer != rai::endpoint ()) + if (peer != rai::endpoint (boost::asio::ip::address_v6::any (), 0)) { auto client (std::make_shared (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ()))); client->run (); + std::lock_guard lock (mutex); + clients.push_back (client); } else { - --connections; + BOOST_LOG (node->log) << boost::str (boost::format ("Bootstrap stopped because there are no peers")); + stopped = true; + condition.notify_all (); } } - else + if (!stopped) { - --connections; - } - std::weak_ptr this_w (shared_from_this ()); - switch (state) - { - case rai::attempt_state::starting: - case rai::attempt_state::requesting_frontiers: - case rai::attempt_state::requesting_pulls: - node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] () + std::weak_ptr this_w (shared_from_this ()); + node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] () + { + if (auto this_l = this_w.lock ()) { - if (auto this_l = this_w.lock ()) - { - this_l->populate_connections (); - } - }); - break; - default: - break; + this_l->populate_connections (); + } + }); } } @@ -831,112 +939,44 @@ void rai::bootstrap_attempt::add_connection (rai::endpoint const & endpoint_a) client->run (); } +void rai::bootstrap_attempt::pool_connection (std::shared_ptr client_a) +{ + std::lock_guard lock (mutex); + idle.push_back (client_a); + condition.notify_all (); +} + void rai::bootstrap_attempt::stop () { std::lock_guard lock (mutex); - state = rai::attempt_state::complete; -} - -void rai::bootstrap_client::completed_frontier_request () -{ + stopped = true; + condition.notify_all (); + for (auto i : clients) { - std::lock_guard lock (attempt->mutex); - if (node->config.logging.network_logging ()) + if (auto client = i.lock ()) { - BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % attempt->pulls.size () % endpoint); + client->socket.close (); } - attempt->state = rai::attempt_state::requesting_pulls; } - work (); -} - -void rai::bootstrap_client::completed_pull () -{ - if (pull_client.expected == pull_client.pull.end) + if (auto i = frontiers.lock ()) { - pull_client.pull = rai::pull_info (); - --attempt->pulling; - work (); + try + { + i->promise.set_value (true); + } + catch (std::future_error &) + { + } } - else + if (auto i = push.lock ()) { - attempt->requeue_pull (pull_client.pull); - BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint); - } -} - -void rai::bootstrap_client::completed_pulls () -{ - BOOST_LOG (node->log) << "Completed pulls"; - assert (node->bootstrap_initiator.in_progress ()); - auto pushes (std::make_shared (shared_from_this ())); - pushes->start (); -} - -void rai::bootstrap_client::completed_pushes () -{ - std::lock_guard lock (attempt->mutex); - attempt->state = rai::attempt_state::complete; -} - -void rai::bootstrap_client::poll () -{ - auto this_l (shared_from_this ()); - attempt->node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (rai::rai_network == rai::rai_networks::rai_test_network ? 0 : 1), [this_l] () - { - this_l->work (); - }); -} - -void rai::bootstrap_client::work () -{ - auto poll_l (false); - std::unique_lock lock (attempt->mutex); - switch (attempt->state) - { - case rai::attempt_state::starting: - attempt->state = rai::attempt_state::requesting_frontiers; - lock.unlock (); - if (this->node->config.logging.network_logging ()) - { - BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request to %1%") % endpoint); - } - frontier_request (); - break; - case rai::attempt_state::requesting_frontiers: - poll_l = true; - break; - case rai::attempt_state::requesting_pulls: - if (!attempt->pulls.empty ()) - { - // There are more things to pull - auto pull (attempt->pulls.front ()); - attempt->pulls.pop_front (); - ++attempt->pulling; - lock.unlock (); - pull_client.request (pull); - } - else - { - if (attempt->pulling == 0) - { - attempt->state = rai::attempt_state::pushing; - lock.unlock (); - completed_pulls (); - } - else - { - poll_l = true; - } - } - break; - case rai::attempt_state::pushing: - case rai::attempt_state::complete: - break; - }; - if (poll_l) - { - poll (); + try + { + i->promise.set_value (true); + } + catch (std::future_error &) + { + } } } @@ -947,6 +987,7 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a) { std::lock_guard lock (mutex); pulls.push_front (pull); + condition.notify_all (); } else { @@ -960,27 +1001,37 @@ stopped (false) { } +rai::bootstrap_initiator::~bootstrap_initiator () +{ + stop (); +} + void rai::bootstrap_initiator::bootstrap () { - std::lock_guard lock (mutex); - if (attempt.lock () == nullptr && !stopped) + std::unique_lock lock (mutex); + if (!stopped && attempt == nullptr) { - auto attempt_l (std::make_shared (node.shared ())); - attempt = attempt_l; - attempt_l->populate_connections (); + stop_attempt (lock); + attempt = std::make_shared (node.shared ()); + attempt_thread.reset (new std::thread ([this] () + { + attempt->run (); + this->node.block_processor.flush (); + std::lock_guard lock (mutex); + attempt.reset (); + condition.notify_all (); + })); } } void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a) { + node.peers.insert (endpoint_a, 0); bootstrap (); std::lock_guard lock (mutex); - if (auto attempt_l = attempt.lock ()) + if (attempt != nullptr) { - if (!stopped) - { - attempt_l->add_connection (endpoint_a); - } + attempt->add_connection (endpoint_a); } } @@ -992,26 +1043,40 @@ void rai::bootstrap_initiator::add_observer (std::function const & bool rai::bootstrap_initiator::in_progress () { - return attempt.lock () != nullptr; + std::lock_guard lock (mutex); + return attempt != nullptr; } void rai::bootstrap_initiator::stop () { - std::lock_guard lock (mutex); + std::unique_lock lock (mutex); stopped = true; - auto attempt_l (attempt.lock ()); - if (attempt_l != nullptr) + stop_attempt (lock); +} + +void rai::bootstrap_initiator::stop_attempt (std::unique_lock & lock_a) +{ + assert (!mutex.try_lock ()); + if (attempt != nullptr) { - attempt_l->stop (); + attempt->stop (); + } + while (attempt != nullptr) + { + condition.wait (lock_a); + } + if (attempt_thread) + { + attempt_thread->join (); + attempt_thread.reset (); } } -void rai::bootstrap_initiator::notify_listeners () +void rai::bootstrap_initiator::notify_listeners (bool in_progress_a) { - auto in_progress_l (in_progress ()); for (auto & i: observers) { - i (in_progress_l); + i (in_progress_a); } } @@ -1483,7 +1548,7 @@ void rai::bulk_push_server::received_block (boost::system::error_code const & ec { if (!connection->node->bootstrap_initiator.in_progress ()) { - connection->node->process_receive_republish (std::move (block)); + connection->node->process_active (std::move (block)); } receive (); } diff --git a/rai/node/bootstrap.hpp b/rai/node/bootstrap.hpp index 891ba3fd..1b9fe7ea 100644 --- a/rai/node/bootstrap.hpp +++ b/rai/node/bootstrap.hpp @@ -4,6 +4,7 @@ #include #include +#include #include #include #include @@ -48,14 +49,6 @@ public: rai::node & node; }; class bootstrap_client; -enum class attempt_state -{ - starting, - requesting_frontiers, - requesting_pulls, - pushing, - complete -}; class pull_info { public: @@ -66,27 +59,41 @@ public: rai::block_hash end; unsigned attempts; }; +class frontier_req_client; +class bulk_push_client; class bootstrap_attempt : public std::enable_shared_from_this { public: bootstrap_attempt (std::shared_ptr node_a); ~bootstrap_attempt (); + void run (); + std::shared_ptr connection (std::unique_lock &); + bool consume_future (std::future &); void populate_connections (); + bool request_frontier (std::unique_lock &); + void request_pull (std::unique_lock &); + bool request_push (std::unique_lock &); void add_connection (rai::endpoint const &); + void pool_connection (std::shared_ptr ); void stop (); void requeue_pull (rai::pull_info const &); + std::deque > clients; + std::weak_ptr frontiers; + std::weak_ptr push; std::deque pulls; + std::vector > idle; std::atomic connections; - std::atomic pulling; + std::atomic pulling; std::shared_ptr node; - rai::attempt_state state; - std::unordered_set attempted; + std::atomic account_count; + bool stopped; std::mutex mutex; + std::condition_variable condition; }; class frontier_req_client : public std::enable_shared_from_this { public: - frontier_req_client (std::shared_ptr const &); + frontier_req_client (std::shared_ptr ); ~frontier_req_client (); void run (); void receive_frontier (); @@ -99,19 +106,19 @@ public: rai::account_info info; unsigned count; std::chrono::system_clock::time_point next_report; + std::promise promise; }; -class bulk_pull_client +class bulk_pull_client : public std::enable_shared_from_this { public: - bulk_pull_client (rai::bootstrap_client &); + bulk_pull_client (std::shared_ptr ); ~bulk_pull_client (); void request (rai::pull_info const &); void receive_block (); void received_type (); void received_block (boost::system::error_code const &, size_t); rai::block_hash first (); - rai::bootstrap_client & connection; - size_t account_count; + std::shared_ptr connection; rai::block_hash expected; rai::pull_info pull; }; @@ -121,14 +128,6 @@ public: bootstrap_client (std::shared_ptr , std::shared_ptr , rai::tcp_endpoint const &); ~bootstrap_client (); void run (); - void frontier_request (); - void work (); - void poll (); - void completed_frontier_request (); - void sent_request (boost::system::error_code const &, size_t); - void completed_pull (); - void completed_pulls (); - void completed_pushes (); std::shared_ptr shared (); void start_timeout (); void stop_timeout (); @@ -136,7 +135,6 @@ public: std::shared_ptr attempt; boost::asio::ip::tcp::socket socket; std::array receive_buffer; - rai::bulk_pull_client pull_client; rai::tcp_endpoint endpoint; boost::asio::deadline_timer timeout; }; @@ -151,22 +149,27 @@ public: void send_finished (); std::shared_ptr connection; rai::push_synchronization synchronization; + std::promise promise; }; class bootstrap_initiator { public: bootstrap_initiator (rai::node &); + ~bootstrap_initiator (); void bootstrap (rai::endpoint const &); void bootstrap (); - void notify_listeners (); + void notify_listeners (bool); void add_observer (std::function const &); bool in_progress (); void stop (); + void stop_attempt (std::unique_lock &); rai::node & node; - std::weak_ptr attempt; + std::shared_ptr attempt; + std::unique_ptr attempt_thread; bool stopped; private: std::mutex mutex; + std::condition_variable condition; std::vector > observers; }; class bootstrap_listener diff --git a/rai/node/common.cpp b/rai/node/common.cpp index 12418477..ee963276 100644 --- a/rai/node/common.cpp +++ b/rai/node/common.cpp @@ -8,8 +8,8 @@ size_t constexpr rai::message::bootstrap_server_position; std::bitset <16> constexpr rai::message::block_type_mask; rai::message::message (rai::message_type type_a) : -version_max (0x03), -version_using (0x03), +version_max (0x04), +version_using (0x04), version_min (0x01), type (type_a) { diff --git a/rai/node/node.cpp b/rai/node/node.cpp index 03b91ef5..f66dd060 100755 --- a/rai/node/node.cpp +++ b/rai/node/node.cpp @@ -39,10 +39,9 @@ confirm_ack (0) { } -rai::network::network (boost::asio::io_service & service_a, uint16_t port, rai::node & node_a) : -socket (service_a, rai::endpoint (boost::asio::ip::address_v6::any (), port)), -service (service_a), -resolver (service_a), +rai::network::network (rai::node & node_a, uint16_t port) : +socket (node_a.service, rai::endpoint (boost::asio::ip::address_v6::any (), port)), +resolver (node_a.service), node (node_a), bad_sender_count (0), on (true), @@ -166,16 +165,15 @@ void rai::network::rebroadcast_reps (std::shared_ptr block_a) } template -bool confirm_block (rai::node & node_a, T & list_a, std::shared_ptr block_a) +bool confirm_block (MDB_txn * transaction_a, rai::node & node_a, T & list_a, std::shared_ptr block_a) { bool result (false); if (node_a.config.enable_voting) { - rai::transaction transaction (node_a.store.environment, nullptr, true); - node_a.wallets.foreach_representative (transaction, [&result, &block_a, &list_a, &node_a, &transaction] (rai::public_key const & pub_a, rai::raw_key const & prv_a) + node_a.wallets.foreach_representative (transaction_a, [&result, &block_a, &list_a, &node_a, &transaction_a] (rai::public_key const & pub_a, rai::raw_key const & prv_a) { result = true; - auto sequence (node_a.store.sequence_atomic_inc (transaction, pub_a)); + auto sequence (node_a.store.sequence_atomic_inc (transaction_a, pub_a)); rai::vote vote (pub_a, prv_a, sequence, block_a); rai::confirm_ack confirm (vote); std::shared_ptr > bytes (new std::vector ); @@ -193,21 +191,21 @@ bool confirm_block (rai::node & node_a, T & list_a, std::shared_ptr } template <> -bool confirm_block (rai::node & node_a, rai::endpoint & peer_a, std::shared_ptr block_a) +bool confirm_block (MDB_txn * transaction_a, rai::node & node_a, rai::endpoint & peer_a, std::shared_ptr block_a) { std::array endpoints; endpoints [0] = peer_a; - auto result (confirm_block (node_a, endpoints, std::move (block_a))); + auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a))); return result; } -void rai::network::republish_block (std::shared_ptr block) +void rai::network::republish_block (MDB_txn * transaction, std::shared_ptr block) { rebroadcast_reps (block); auto hash (block->hash ()); auto list (node.peers.list_sqrt ()); // If we're a representative, broadcast a signed confirm, otherwise an unsigned publish - if (!confirm_block (node, list, block)) + if (!confirm_block (transaction, node, list, block)) { rai::publish message (block); std::shared_ptr > bytes (new std::vector ); @@ -360,7 +358,7 @@ public: ++node.network.incoming.publish; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.process_receive_republish (message_a.block); + node.process_active (message_a.block); } void confirm_req (rai::confirm_req const & message_a) override { @@ -371,10 +369,11 @@ public: ++node.network.incoming.confirm_req; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.process_receive_republish (message_a.block); - if (node.ledger.block_exists (message_a.block->hash ())) + node.process_active (message_a.block); + rai::transaction transaction_a (node.store.environment, nullptr, false); + if (node.store.block_exists (transaction_a, message_a.block->hash ())) { - confirm_block (node, sender, message_a.block); + confirm_block (transaction_a, node, sender, message_a.block); } } void confirm_ack (rai::confirm_ack const & message_a) override @@ -386,7 +385,7 @@ public: ++node.network.incoming.confirm_ack; node.peers.contacted (sender, message_a.version_using); node.peers.insert (sender, message_a.version_using); - node.process_receive_republish (message_a.vote.block); + node.process_active (message_a.vote.block); node.vote_processor.vote (message_a.vote, sender); } void bulk_pull (rai::bulk_pull const &) override @@ -996,7 +995,7 @@ rai::vote_result rai::vote_processor::vote (rai::vote const & vote_a, rai::endpo { rai::vote_result result; { - rai::transaction transaction (node.store.environment, nullptr, true); + rai::transaction transaction (node.store.environment, nullptr, false); result = vote_a.validate (transaction, node.store); } if (node.config.logging.vote_logging ()) @@ -1045,12 +1044,237 @@ bool rai::rep_crawler::exists (rai::block_hash const & hash_a) return active.count (hash_a) != 0; } +rai::block_processor::block_processor (rai::node & node_a) : +stopped (false), +idle (true), +node (node_a), +thread ([this] () { process_blocks (); }) +{ +} + +rai::block_processor::~block_processor () +{ + stop (); + thread.join (); +} + +void rai::block_processor::stop () +{ + std::lock_guard lock (mutex); + stopped = true; + condition.notify_all (); +} + +void rai::block_processor::flush () +{ + std::unique_lock lock (mutex); + while (!stopped && (!blocks.empty () || !idle)) + { + condition.wait (lock); + } +} + +void rai::block_processor::add (std::shared_ptr block_a, std::function )> action_a) +{ + std::lock_guard lock (mutex); + blocks.push_back (std::make_pair (block_a, action_a)); + condition.notify_all (); +} + +void rai::block_processor::process_blocks () +{ + std::unique_lock lock (mutex); + while (!stopped) + { + if (!blocks.empty ()) + { + { + auto info (blocks.front ()); + blocks.pop_front (); + lock.unlock (); + process_receive_many (info.first, info.second); + } + // Let other threads get an opportunity to transaction lock + std::this_thread::yield (); + lock.lock (); + } + else + { + idle = true; + condition.notify_all (); + condition.wait (lock); + idle = false; + } + } +} + +void rai::block_processor::process_receive_many (std::shared_ptr block_a, std::function )> completed_a) +{ + std::vector > blocks; + blocks.push_back (block_a); + while (!blocks.empty ()) + { + std::deque , rai::process_return>> progress; + { + rai::transaction transaction (node.store.environment, nullptr, true); + auto count (0); + while (!blocks.empty () && count < rai::blocks_per_transaction) + { + auto block (blocks.back ()); + blocks.pop_back (); + auto hash (block->hash ()); + auto process_result (process_receive_one (transaction, block)); + completed_a (transaction, process_result, block); + switch (process_result.code) + { + case rai::process_result::progress: + { + progress.push_back (std::make_pair (block, process_result)); + } + case rai::process_result::old: + { + auto cached (node.store.unchecked_get (transaction, hash)); + for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i) + { + node.store.unchecked_del (transaction, hash, **i); + blocks.push_back (std::move (*i)); + } + std::lock_guard lock (node.gap_cache.mutex); + node.gap_cache.blocks.get <1> ().erase (hash); + break; + } + default: + break; + } + ++count; + } + } + for (auto & i : progress) + { + node.observers.blocks (i.first, i.second.account, i.second.amount); + } + } +} + +rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr block_a) +{ + rai::process_return result; + result = node.ledger.process (transaction_a, *block_a); + switch (result.code) + { + case rai::process_result::progress: + { + if (node.config.logging.ledger_logging ()) + { + std::string block; + block_a->serialize_json (block); + BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1% %2%") % block_a->hash ().to_string () % block); + } + break; + } + case rai::process_result::gap_previous: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ()); + } + node.store.unchecked_put (transaction_a, block_a->previous (), block_a); + node.gap_cache.add (transaction_a, block_a); + break; + } + case rai::process_result::gap_source: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ()); + } + node.store.unchecked_put (transaction_a, block_a->source (), block_a); + node.gap_cache.add (transaction_a, block_a); + break; + } + case rai::process_result::old: + { + { + auto root (block_a->root ()); + auto hash (block_a->hash ()); + auto existing (node.store.block_get (transaction_a, hash)); + if (existing != nullptr) + { + // Replace block with one that has higher work value + if (node.work.work_value (root, block_a->block_work ()) > node.work.work_value (root, existing->block_work ())) + { + node.store.block_put (transaction_a, hash, *block_a, node.store.block_successor (transaction_a, hash)); + } + } + else + { + // Could have been rolled back, maybe + } + } + if (node.config.logging.ledger_duplicate_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ()); + } + break; + } + case rai::process_result::bad_signature: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ()); + } + break; + } + case rai::process_result::overspend: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Overspend for: %1%") % block_a->hash ().to_string ()); + } + break; + } + case rai::process_result::unreceivable: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ()); + } + break; + } + case rai::process_result::not_receive_from_send: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ()); + } + break; + } + case rai::process_result::fork: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ()); + } + break; + } + case rai::process_result::account_mismatch: + { + if (node.config.logging.ledger_logging ()) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ()); + } + } + } + return result; +} + rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) : node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a) { } rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::node_config const & config_a, rai::work_pool & work_a) : +service (service_a), config (config_a), alarm (alarm_a), work (work_a), @@ -1059,14 +1283,15 @@ gap_cache (*this), ledger (store, config_a.inactive_supply.number ()), active (*this), wallets (init_a.block_store_init, *this), -network (service_a, config.peering_port, *this), +network (*this, config.peering_port), bootstrap_initiator (*this), bootstrap (service_a, config.peering_port, *this), peers (network.endpoint ()), application_path (application_path_a), port_mapping (*this), vote_processor (*this), -warmed_up (0) +warmed_up (0), +block_processor (*this) { store.environment.sizing_action = [this] () { @@ -1088,101 +1313,115 @@ warmed_up (0) { observers.disconnect (); }; - observers.blocks.add ([this] (rai::block const & block_a, rai::account const & account_a, rai::amount const & amount_a) + observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const & amount_a) { - if (!config.callback_address.empty ()) + if (this->block_arrival.recent (block_a->hash ())) + { + rai::transaction transaction (store.environment, nullptr, true); + active.start (transaction, block_a); + } + }); + observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const & amount_a) + { + if (this->block_arrival.recent (block_a->hash ())) { - boost::property_tree::ptree event; - event.add ("account", account_a.to_account ()); - event.add ("hash", block_a.hash ().to_string ()); - std::string block_text; - block_a.serialize_json (block_text); - event.add ("block", block_text); - event.add ("amount", amount_a.to_string_dec ()); - std::stringstream ostream; - boost::property_tree::write_json (ostream, event); - ostream.flush (); - auto body (std::make_shared (ostream.str ())); - auto address (config.callback_address); - auto port (config.callback_port); - auto target (std::make_shared (config.callback_target)); auto node_l (shared_from_this ()); - auto resolver (std::make_shared (network.service)); - resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) + background ([node_l, block_a, account_a, amount_a] () { - if (!ec) + if (!node_l->config.callback_address.empty ()) { - for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i) + boost::property_tree::ptree event; + event.add ("account", account_a.to_account ()); + event.add ("hash", block_a->hash ().to_string ()); + std::string block_text; + block_a->serialize_json (block_text); + event.add ("block", block_text); + event.add ("amount", amount_a.to_string_dec ()); + std::stringstream ostream; + boost::property_tree::write_json (ostream, event); + ostream.flush (); + auto body (std::make_shared (ostream.str ())); + auto address (node_l->config.callback_address); + auto port (node_l->config.callback_port); + auto target (std::make_shared (node_l->config.callback_target)); + auto resolver (std::make_shared (node_l->service)); + resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a) { - auto sock (std::make_shared (node_l->network.service)); - sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec) + if (!ec) { - if (!ec) + for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i) { - auto req (std::make_shared > ()); - req->method (boost::beast::http::verb::post); - req->target (*target); - req->version = 11; - req->insert(boost::beast::http::field::host, address); - req->body = *body; - //req->prepare (*req); - //boost::beast::http::prepare(req); - req->prepare_payload(); - boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec) + auto sock (std::make_shared (node_l->service)); + sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec) { if (!ec) { - auto sb (std::make_shared ()); - auto resp (std::make_shared > ()); - boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec) + auto req (std::make_shared > ()); + req->method (boost::beast::http::verb::post); + req->target (*target); + req->version = 11; + req->insert(boost::beast::http::field::host, address); + req->body = *body; + //req->prepare (*req); + //boost::beast::http::prepare(req); + req->prepare_payload(); + boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec) { if (!ec) { - if (resp->result() == boost::beast::http::status::ok) + auto sb (std::make_shared ()); + auto resp (std::make_shared > ()); + boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec) { - } - else - { - if (node_l->config.logging.callback_logging ()) + if (!ec) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result()); + if (resp->result() == boost::beast::http::status::ok) + { + } + else + { + if (node_l->config.logging.callback_logging ()) + { + BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result()); + } + } } - } + else + { + if (node_l->config.logging.callback_logging ()) + { + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ()); + } + }; + }); } else { if (node_l->config.logging.callback_logging ()) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ()); + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ()); } - }; + } }); } else { if (node_l->config.logging.callback_logging ()) { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ()); + BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ()); } } }); } - else + } + else + { + if (node_l->config.logging.callback_logging ()) { - if (node_l->config.logging.callback_logging ()) - { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ()); - } + BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ()); } - }); - } - } - else - { - if (node_l->config.logging.callback_logging ()) - { - BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ()); - } + } + }); } }); } @@ -1212,7 +1451,7 @@ warmed_up (0) } } }); - BOOST_LOG (log) << "Node starting, version: " << RAIBLOCKS_VERSION_MAJOR << "." << RAIBLOCKS_VERSION_MINOR << "." << RAIBLOCKS_VERSION_PATCH; + BOOST_LOG (log) << "Node starting, version: " << RAIBLOCKS_VERSION_MAJOR << "." << RAIBLOCKS_VERSION_MINOR; BOOST_LOG (log) << boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ()); if (!init_a.error ()) { @@ -1354,187 +1593,14 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared }); } -void rai::node::process_receive_republish (std::shared_ptr incoming) +void rai::node::process_active (std::shared_ptr incoming) { - std::vector >> completed; + block_arrival.add (incoming->hash ()); + block_processor.process_receive_many (incoming); + if (rai::rai_network == rai::rai_networks::rai_test_network) { - assert (incoming != nullptr); - process_receive_many (incoming, [this, &completed] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr block_a) - { - switch (result_a.code) - { - case rai::process_result::progress: - { - auto node_l (shared_from_this ()); - active.start (transaction_a, block_a); - completed.push_back (std::make_tuple (result_a, block_a)); - break; - } - default: - { - break; - } - } - }); + block_processor.flush (); } - for (auto & i: completed) - { - observers.blocks (*std::get <1> (i), std::get <0> (i).account, std::get <0>(i).amount); - } -} - -void rai::node::process_receive_many (std::shared_ptr block_a, std::function )> completed_a) -{ - std::vector > blocks; - blocks.push_back (block_a); - while (!blocks.empty ()) - { - { - rai::transaction transaction (store.environment, nullptr, true); - auto count (0); - while (!blocks.empty () && count < rai::blocks_per_transaction) - { - auto block (blocks.back ()); - blocks.pop_back (); - auto hash (block->hash ()); - auto process_result (process_receive_one (transaction, block)); - completed_a (transaction, process_result, block); - switch (process_result.code) - { - case rai::process_result::progress: - case rai::process_result::old: - { - auto cached (store.unchecked_get (transaction, hash)); - for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i) - { - store.unchecked_del (transaction, hash, **i); - blocks.push_back (std::move (*i)); - } - std::lock_guard lock (gap_cache.mutex); - gap_cache.blocks.get <1> ().erase (hash); - break; - } - default: - break; - } - ++count; - } - } - // Let other threads get an opportunity to transaction lock - std::this_thread::yield (); - } -} - -rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, std::shared_ptr block_a) -{ - rai::process_return result; - result = ledger.process (transaction_a, *block_a); - switch (result.code) - { - case rai::process_result::progress: - { - if (config.logging.ledger_logging ()) - { - std::string block; - block_a->serialize_json (block); - BOOST_LOG (log) << boost::str (boost::format ("Processing block %1% %2%") % block_a->hash ().to_string () % block); - } - break; - } - case rai::process_result::gap_previous: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ()); - } - store.unchecked_put (transaction_a, block_a->previous (), block_a); - gap_cache.add (transaction_a, block_a); - break; - } - case rai::process_result::gap_source: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ()); - } - store.unchecked_put (transaction_a, block_a->source (), block_a); - gap_cache.add (transaction_a, block_a); - break; - } - case rai::process_result::old: - { - { - auto root (block_a->root ()); - auto hash (block_a->hash ()); - auto existing (store.block_get (transaction_a, hash)); - if (existing != nullptr) - { - // Replace block with one that has higher work value - if (work.work_value (root, block_a->block_work ()) > work.work_value (root, existing->block_work ())) - { - store.block_put (transaction_a, hash, *block_a, store.block_successor (transaction_a, hash)); - } - } - else - { - // Could have been rolled back, maybe - } - } - if (config.logging.ledger_duplicate_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ()); - } - break; - } - case rai::process_result::bad_signature: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ()); - } - break; - } - case rai::process_result::overspend: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Overspend for: %1%") % block_a->hash ().to_string ()); - } - break; - } - case rai::process_result::unreceivable: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ()); - } - break; - } - case rai::process_result::not_receive_from_send: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ()); - } - break; - } - case rai::process_result::fork: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ()); - } - break; - } - case rai::process_result::account_mismatch: - { - if (config.logging.ledger_logging ()) - { - BOOST_LOG (log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ()); - } - } - } - return result; } rai::process_return rai::node::process (rai::block const & block_a) @@ -1570,6 +1636,17 @@ std::vector rai::peer_container::list () return result; } +std::map rai::peer_container::list_version () +{ + std::map result; + std::lock_guard lock (mutex); + for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i) + { + result.insert (std::pair (i->endpoint, i->network_version)); + } + return result; +} + rai::endpoint rai::peer_container::bootstrap_peer () { rai::endpoint result (boost::asio::ip::address_v6::any (), 0); @@ -1666,6 +1743,7 @@ void rai::node::start () network.receive (); ongoing_keepalive (); ongoing_bootstrap (); + ongoing_vote_flush (); ongoing_rep_crawl (); bootstrap.start (); backup_wallet (); @@ -1678,6 +1756,7 @@ void rai::node::start () void rai::node::stop () { BOOST_LOG (log) << "Node stopping"; + block_processor.stop (); active.stop (); network.stop (); bootstrap_initiator.stop (); @@ -1797,6 +1876,22 @@ void rai::node::ongoing_bootstrap () }); } +void rai::node::ongoing_vote_flush () +{ + { + rai::transaction transaction (store.environment, nullptr, true); + store.sequence_flush (transaction); + } + std::weak_ptr node_w (shared_from_this ()); + alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [node_w] () + { + if (auto node_l = node_w.lock ()) + { + node_l->ongoing_vote_flush (); + } + }); +} + void rai::node::backup_wallet () { rai::transaction transaction (store.environment, nullptr, false); @@ -1871,7 +1966,7 @@ void start () auto service (i.second); node->background ([this_l, host, service] () { - auto connection (std::make_shared (this_l->node->network.service, host, service)); + auto connection (std::make_shared (this_l->node->service, host, service)); connection->socket.async_connect (rai::tcp_endpoint (host, service), [this_l, connection] (boost::system::error_code const & ec) { if (!ec) @@ -1961,9 +2056,8 @@ void stop () request.target ("/"); request.version = 11; request.body = request_string; - //boost::beast::http::prepare (request); request.prepare_payload(); - auto socket (std::make_shared (this_l->node->network.service)); + auto socket (std::make_shared (this_l->node->service)); boost::beast::http::async_write (*socket, request, [socket] (boost::system::error_code const & ec) { }); @@ -2149,6 +2243,24 @@ rai::endpoint rai::network::endpoint () return rai::endpoint (boost::asio::ip::address_v6::loopback (), port); } +void rai::block_arrival::add (rai::block_hash const & hash_a) +{ + std::lock_guard lock (mutex); + auto now (std::chrono::system_clock::now ()); + arrival.insert (rai::block_arrival_info {now, hash_a}); +} + +bool rai::block_arrival::recent (rai::block_hash const & hash_a) +{ + std::lock_guard lock (mutex); + auto now (std::chrono::system_clock::now ()); + while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now) + { + arrival.erase (arrival.begin ()); + } + return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end (); +} + std::unordered_set rai::peer_container::random_set (size_t count_a) { std::unordered_set result; @@ -2520,7 +2632,8 @@ void rai::election::broadcast_winner () rai::transaction transaction (node.store.environment, nullptr, true); compute_rep_votes (transaction); } - node.network.republish_block (last_winner); + rai::transaction transaction_a (node.store.environment, nullptr, false); + node.network.republish_block (transaction_a, last_winner); } rai::uint128_t rai::election::quorum_threshold (MDB_txn * transaction_a, rai::ledger & ledger_a) @@ -2542,14 +2655,6 @@ void rai::election::confirm_once (MDB_txn * transaction_a) auto tally_l (node.ledger.tally (transaction_a, votes)); assert (tally_l.size () > 0); auto winner (tally_l.begin ()); - if (tally_l.size () > 1) - { - BOOST_LOG (node.log) << boost::str (boost::format ("Vote tally weight %2% for root %1%") % votes.id.to_string () % winner->first.convert_to ()); - for (auto i (votes.rep_votes.begin ()), n (votes.rep_votes.end ()); i != n; ++i) - { - BOOST_LOG (node.log) << boost::str (boost::format ("%1% %2%") % i->first.to_account () % i->second->hash ().to_string ()); - } - } if (!(*winner->second == *last_winner)) { if (winner->first > minimum_treshold (transaction_a, node.ledger)) @@ -2558,12 +2663,7 @@ void rai::election::confirm_once (MDB_txn * transaction_a) // Replace our block with the winner and roll back any dependent blocks node.ledger.rollback (transaction_a, last_winner->hash ()); node.ledger.process (transaction_a, *winner->second); - auto block_l (winner->second); - auto node_l (node.shared ()); - node.background ([block_l, node_l] () - { - node_l->process_receive_many (block_l); - }); + node.block_processor.add (winner->second); last_winner = std::move (winner->second); } else @@ -2601,6 +2701,14 @@ void rai::election::confirm_if_quarum (MDB_txn * transaction_a) void rai::election::confirm_cutoff (MDB_txn * transaction_a) { + //if (tally_l.size () > 1) + { + BOOST_LOG (node.log) << boost::str (boost::format ("Vote tally weight %2% for root %1%") % votes.id.to_string () % last_winner->root ().to_string ()); + for (auto i (votes.rep_votes.begin ()), n (votes.rep_votes.end ()); i != n; ++i) + { + BOOST_LOG (node.log) << boost::str (boost::format ("%1% %2%") % i->first.to_account () % i->second->hash ().to_string ()); + } + } confirm_once (transaction_a); } @@ -2725,36 +2833,6 @@ int rai::node::store_version () return store.version_get (transaction); } -rai::fan::fan (rai::uint256_union const & key, size_t count_a) -{ - std::unique_ptr first (new rai::uint256_union (key)); - for (auto i (1); i < count_a; ++i) - { - std::unique_ptr entry (new rai::uint256_union); - random_pool.GenerateBlock (entry->bytes.data (), entry->bytes.size ()); - *first ^= *entry; - values.push_back (std::move (entry)); - } - values.push_back (std::move (first)); -} - -void rai::fan::value (rai::raw_key & prv_a) -{ - prv_a.data.clear (); - for (auto & i: values) - { - prv_a.data ^= *i; - } -} - -void rai::fan::value_set (rai::raw_key const & value_a) -{ - rai::raw_key value_l; - value (value_l); - *(values [0]) ^= value_l.data; - *(values [0]) ^= value_a.data; -} - rai::thread_runner::thread_runner (boost::asio::io_service & service_a, unsigned service_threads_a) { for (auto i (0); i < service_threads_a; ++i) diff --git a/rai/node/node.hpp b/rai/node/node.hpp index 6d41e3e7..da1050af 100644 --- a/rai/node/node.hpp +++ b/rai/node/node.hpp @@ -180,6 +180,7 @@ public: std::vector representatives (size_t); // List of all peers std::vector list (); + std::map list_version (); // A list of random peers with size the square root of total peer count std::vector list_sqrt (); // Get the next peer for attempting bootstrap @@ -266,17 +267,41 @@ public: std::atomic confirm_req; std::atomic confirm_ack; }; +class block_arrival_info +{ +public: + std::chrono::system_clock::time_point arrival; + rai::block_hash hash; +}; +// This class tracks blocks that are probably live because they arrived in a UDP packet +// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks. +class block_arrival +{ +public: + void add (rai::block_hash const &); + bool recent (rai::block_hash const &); + boost::multi_index_container + < + rai::block_arrival_info, + boost::multi_index::indexed_by + < + boost::multi_index::ordered_non_unique >, + boost::multi_index::hashed_unique > + > + > arrival; + std::mutex mutex; +}; class network { public: - network (boost::asio::io_service &, uint16_t, rai::node &); + network (rai::node &, uint16_t); void receive (); void stop (); void receive_action (boost::system::error_code const &, size_t); void rpc_action (boost::system::error_code const &, size_t); void rebroadcast_reps (std::shared_ptr ); void republish_vote (std::chrono::system_clock::time_point const &, rai::vote const &); - void republish_block (std::shared_ptr ); + void republish_block (MDB_txn *, std::shared_ptr ); void republish (rai::block_hash const &, std::shared_ptr >, rai::endpoint); void publish_broadcast (std::vector &, std::unique_ptr ); void confirm_send (rai::confirm_ack const &, std::shared_ptr >, rai::endpoint const &); @@ -290,7 +315,6 @@ public: std::array buffer; boost::asio::ip::udp::socket socket; std::mutex socket_mutex; - boost::asio::io_service & service; boost::asio::ip::udp::resolver resolver; rai::node & node; uint64_t bad_sender_count; @@ -382,7 +406,7 @@ public: class node_observers { public: - rai::observer_set blocks; + rai::observer_set , rai::account const &, rai::amount const &> blocks; rai::observer_set wallet; rai::observer_set vote; rai::observer_set endpoint; @@ -406,6 +430,28 @@ public: std::mutex mutex; std::unordered_set active; }; +// Processing blocks is a potentially long IO operation +// This class isolates block insertion from other operations like servicing network operations +class block_processor +{ +public: + block_processor (rai::node &); + ~block_processor (); + void stop (); + void flush (); + void add (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); + void process_receive_many (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); + rai::process_return process_receive_one (MDB_txn *, std::shared_ptr ); +private: + void process_blocks (); + bool stopped; + bool idle; + std::deque , std::function )>>> blocks; + std::mutex mutex; + std::condition_variable condition; + rai::node & node; + std::thread thread; +}; class node : public std::enable_shared_from_this { public: @@ -425,9 +471,7 @@ public: int store_version (); void process_confirmed (std::shared_ptr ); void process_message (rai::message &, rai::endpoint const &); - void process_receive_republish (std::shared_ptr ); - void process_receive_many (std::shared_ptr , std::function )> = [] (MDB_txn *, rai::process_return, std::shared_ptr ) {}); - rai::process_return process_receive_one (MDB_txn *, std::shared_ptr ); + void process_active (std::shared_ptr ); rai::process_return process (rai::block const &); void keepalive_preconfigured (std::vector const &); rai::block_hash latest (rai::account const &); @@ -439,12 +483,14 @@ public: void ongoing_keepalive (); void ongoing_rep_crawl (); void ongoing_bootstrap (); + void ongoing_vote_flush (); void backup_wallet (); int price (rai::uint128_t const &, int); void generate_work (rai::block &); uint64_t generate_work (rai::uint256_union const &); void generate_work (rai::uint256_union const &, std::function ); - void add_initial_peers (); + void add_initial_peers (); + boost::asio::io_service & service; rai::node_config config; rai::alarm & alarm; rai::work_pool & work; @@ -464,6 +510,8 @@ public: rai::vote_processor vote_processor; rai::rep_crawler rep_crawler; unsigned warmed_up; + rai::block_processor block_processor; + rai::block_arrival block_arrival; static double constexpr price_max = 16.0; static double constexpr free_cutoff = 1024.0; static std::chrono::seconds constexpr period = std::chrono::seconds (60); diff --git a/rai/node/rpc.cpp b/rai/node/rpc.cpp index 5a1257ee..e90b659a 100755 --- a/rai/node/rpc.cpp +++ b/rai/node/rpc.cpp @@ -77,7 +77,7 @@ node (node_a) acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true)); acceptor.bind (endpoint); acceptor.listen (); - node_a.observers.blocks.add ([this] (rai::block const & block_a, rai::account const & account_a, rai::amount const &) + node_a.observers.blocks.add ([this] (std::shared_ptr block_a, rai::account const & account_a, rai::amount const &) { observer_action (account_a); }); @@ -1471,14 +1471,12 @@ void rai::rpc_handler::peers () { boost::property_tree::ptree response_l; boost::property_tree::ptree peers_l; - auto peers_list (node.peers.list()); + auto peers_list (node.peers.list_version ()); for (auto i (peers_list.begin ()), n (peers_list.end ()); i != n; ++i) { - boost::property_tree::ptree entry; std::stringstream text; - text << *i; - entry.put ("", text.str ()); - peers_l.push_back (std::make_pair ("", entry)); + text << i->first; + peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), boost::property_tree::ptree (std::to_string (i->second)))); } response_l.add_child ("peers", peers_l); response (response_l); @@ -1562,10 +1560,14 @@ void rai::rpc_handler::pending_exists () auto block (node.store.block_get (transaction, hash)); if (block != nullptr) { - auto block_l (static_cast (block.release ())); - auto account (block_l->hashables.destination); + auto block_l (dynamic_cast (block.get ())); + auto exists (false); + if (block_l != nullptr) + { + auto account (block_l->hashables.destination); + exists = node.store.pending_exists (transaction, rai::pending_key (account, hash)); + } boost::property_tree::ptree response_l; - auto exists (node.store.pending_exists (transaction, rai::pending_key (account, hash))); response_l.put ("exists", exists ? "1" : "0"); response (response_l); } @@ -1786,7 +1788,7 @@ void rai::rpc_handler::process () { if (!node.work.work_validate (*block)) { - node.process_receive_republish (std::move (block)); + node.process_active (std::move (block)); boost::property_tree::ptree response_l; response (response_l); } @@ -1982,33 +1984,35 @@ void rai::rpc_handler::representatives () void rai::rpc_handler::republish () { - uint64_t count (2048U); + uint64_t count (1024U); uint64_t sources (0); - try + uint64_t destinations (0); + boost::optional count_text (request.get_optional ("count")); + if (count_text.is_initialized ()) { - std::string count_text (request.get ("count")); - auto error (decode_unsigned (count_text, count)); + auto error (decode_unsigned (count_text.get (), count)); if (error) { - error_response (response, "Invalid count"); + error_response (response, "Invalid count limit"); } } - catch (std::runtime_error &) + boost::optional sources_text (request.get_optional ("sources")); + if (sources_text.is_initialized ()) { - // If there is no "count" in request - } - try - { - std::string sources_text (request.get ("sources")); - auto error (decode_unsigned (sources_text, sources)); - if (error) + auto sources_error (decode_unsigned (sources_text.get (), sources)); + if (sources_error) { error_response (response, "Invalid sources number"); } } - catch (std::runtime_error &) + boost::optional destinations_text (request.get_optional ("destinations")); + if (destinations_text.is_initialized ()) { - // If there is no "sources" in request + auto destinations_error (decode_unsigned (destinations_text.get (), destinations)); + if (destinations_error) + { + error_response (response, "Invalid destinations number"); + } } std::string hash_text (request.get ("hash")); rai::uint256_union hash; @@ -2026,29 +2030,66 @@ void rai::rpc_handler::republish () block = node.store.block_get (transaction, hash); if (sources != 0) // Republish source chain { - std::unique_ptr block_a; rai::block_hash source (block->source ()); + std::unique_ptr block_a (node.store.block_get (transaction, source)); std::vector hashes; - while (!source.is_zero () && hashes.size () < sources) + while (block_a != nullptr && hashes.size () < sources) { hashes.push_back (source); - block_a = node.store.block_get (transaction, source); source = block_a->previous (); + block_a = node.store.block_get (transaction, source); } - std::reverse (hashes.begin (), hashes.end ()); + std::reverse (hashes.begin (), hashes.end ()); for (auto & hash_l : hashes) { block_a = node.store.block_get (transaction, hash_l); - node.network.republish_block (std::move (block_a)); + node.network.republish_block (transaction, std::move (block_a)); boost::property_tree::ptree entry_l; entry_l.put ("", hash_l.to_string ()); blocks.push_back (std::make_pair ("", entry_l)); } } - node.network.republish_block (std::move (block)); // Republish block + node.network.republish_block (transaction, std::move (block)); // Republish block boost::property_tree::ptree entry; entry.put ("", hash.to_string ()); blocks.push_back (std::make_pair ("", entry)); + if (destinations != 0) // Republish destination chain + { + auto block_b (node.store.block_get (transaction, hash)); + auto block_s (dynamic_cast (block_b.get ())); + if (block_s != nullptr) + { + auto destination (block_s->hashables.destination); + auto exists (node.store.pending_exists (transaction, rai::pending_key (destination, hash))); + if (!exists) + { + rai::block_hash previous (node.ledger.latest (transaction, destination)); + std::unique_ptr block_d (node.store.block_get (transaction, previous)); + rai::block_hash source; + std::vector hashes; + while (block_d != nullptr && hash != source) + { + hashes.push_back (previous); + source = block_d->source (); + previous = block_d->previous (); + block_d = node.store.block_get (transaction, previous); + } + std::reverse (hashes.begin (), hashes.end ()); + if (hashes.size () > destinations) + { + hashes.resize(destinations); + } + for (auto & hash_l : hashes) + { + block_d = node.store.block_get (transaction, hash_l); + node.network.republish_block (transaction, std::move (block_d)); + boost::property_tree::ptree entry_l; + entry_l.put ("", hash_l.to_string ()); + blocks.push_back (std::make_pair ("", entry_l)); + } + } + } + } hash = node.store.block_successor (transaction, hash); } response_l.put ("success", ""); // obsolete @@ -2332,7 +2373,7 @@ void rai::rpc_handler::version () boost::property_tree::ptree response_l; response_l.put ("rpc_version", "1"); response_l.put ("store_version", std::to_string (node.store_version ())); - response_l.put ("node_vendor", boost::str (boost::format ("RaiBlocks %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH)); + response_l.put ("node_vendor", boost::str (boost::format ("RaiBlocks %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR)); response (response_l); } @@ -2884,7 +2925,7 @@ void rai::rpc_handler::wallet_republish () for (auto & hash : hashes) { block = node.store.block_get (transaction, hash); - node.network.republish_block (std::move (block));; + node.network.republish_block (transaction, std::move (block));; boost::property_tree::ptree entry; entry.put ("", hash.to_string ()); blocks.push_back (std::make_pair ("", entry)); @@ -3230,7 +3271,7 @@ void rai::rpc_handler::work_peers_clear () rai::rpc_connection::rpc_connection (rai::node & node_a, rai::rpc & rpc_a) : node (node_a.shared ()), rpc (rpc_a), -socket (node_a.network.service) +socket (node_a.service) { } diff --git a/rai/node/wallet.cpp b/rai/node/wallet.cpp index b5b7b41e..88665ec6 100644 --- a/rai/node/wallet.cpp +++ b/rai/node/wallet.cpp @@ -342,8 +342,7 @@ bool rai::wallet_store::rekey (MDB_txn * transaction_a, std::string const & pass wallet_key (wallet_key_l, transaction_a); rai::raw_key password_l; password.value (password_l); - (*password.values [0]) ^= password_l.data; - (*password.values [0]) ^= password_new.data; + password.value_set (password_new); rai::uint256_union encrypted; encrypted.encrypt (wallet_key_l, password_new, salt (transaction_a).owords [0]); entry_put_raw (transaction_a, rai::wallet_store::wallet_key_special, rai::wallet_value (encrypted)); @@ -361,6 +360,44 @@ void rai::wallet_store::derive_key (rai::raw_key & prv_a, MDB_txn * transaction_ kdf.phs (prv_a, password_a, salt_l); } +rai::fan::fan (rai::uint256_union const & key, size_t count_a) +{ + std::unique_ptr first (new rai::uint256_union (key)); + for (auto i (1); i < count_a; ++i) + { + std::unique_ptr entry (new rai::uint256_union); + random_pool.GenerateBlock (entry->bytes.data (), entry->bytes.size ()); + *first ^= *entry; + values.push_back (std::move (entry)); + } + values.push_back (std::move (first)); +} + +void rai::fan::value (rai::raw_key & prv_a) +{ + std::lock_guard lock (mutex); + value_get (prv_a); +} + +void rai::fan::value_get (rai::raw_key & prv_a) +{ + assert (!mutex.try_lock ()); + prv_a.data.clear (); + for (auto & i: values) + { + prv_a.data ^= *i; + } +} + +void rai::fan::value_set (rai::raw_key const & value_a) +{ + std::lock_guard lock (mutex); + rai::raw_key value_l; + value_get (value_l); + *(values [0]) ^= value_l.data; + *(values [0]) ^= value_a.data; +} + rai::wallet_value::wallet_value (MDB_val const & val_a) { assert (val_a.mv_size == sizeof (*this)); @@ -998,7 +1035,7 @@ std::shared_ptr rai::wallet::receive_action (rai::send_block const if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); auto source (send_a.hashables.destination); @@ -1036,7 +1073,7 @@ std::shared_ptr rai::wallet::change_action (rai::account const & so if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] @@ -1077,7 +1114,7 @@ std::shared_ptr rai::wallet::send_action (rai::account const & sour if (block != nullptr) { assert (block != nullptr); - node.process_receive_republish (block); + node.process_active (block); auto hash (block->hash ()); auto this_l (shared_from_this ()); node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash] diff --git a/rai/node/wallet.hpp b/rai/node/wallet.hpp index 869a77e9..4a9bf54d 100644 --- a/rai/node/wallet.hpp +++ b/rai/node/wallet.hpp @@ -49,6 +49,9 @@ public: void value (rai::raw_key &); void value_set (rai::raw_key const &); std::vector > values; +private: + std::mutex mutex; + void value_get (rai::raw_key &); }; class wallet_value { diff --git a/rai/qt/qt.cpp b/rai/qt/qt.cpp index b99086a0..cae31214 100755 --- a/rai/qt/qt.cpp +++ b/rai/qt/qt.cpp @@ -68,7 +68,7 @@ balance_layout (new QHBoxLayout), balance_label (new QLabel), wallet (wallet_a) { - version = new QLabel (boost::str (boost::format ("Version %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH).c_str ()); + version = new QLabel (boost::str (boost::format ("Version %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR).c_str ()); self_layout->addWidget (your_account_label); self_layout->addStretch (); self_layout->addWidget (version); @@ -629,7 +629,7 @@ void rai_qt::block_viewer::rebroadcast_action (rai::uint256_union const & hash_a auto block (wallet.node.store.block_get (transaction, hash_a)); if (block != nullptr) { - wallet.node.network.republish_block (std::move (block)); + wallet.node.network.republish_block (transaction, std::move (block)); auto successor (wallet.node.store.block_successor (transaction, hash_a)); if (!successor.is_zero ()) { @@ -790,7 +790,7 @@ std::string rai_qt::status::color () result = "color: blue"; break; case rai_qt::status_types::synchronizing: - result = "color: red"; + result = "color: blue"; break; case rai_qt::status_types::locked: result = "color: orange"; @@ -996,7 +996,7 @@ void rai_qt::wallet::start () this_l->push_main_stack (this_l->send_blocks_window); } }); - node.observers.blocks.add ([this_w] (rai::block const &, rai::account const & account_a, rai::amount const &) + node.observers.blocks.add ([this_w] (std::shared_ptr , rai::account const & account_a, rai::amount const &) { if (auto this_l = this_w.lock ()) { @@ -1445,8 +1445,8 @@ ledger_refresh (new QPushButton ("Refresh")), ledger_back (new QPushButton ("Back")), peers_window (new QWidget), peers_layout (new QVBoxLayout), -peers_model (new QStringListModel), -peers_view (new QListView), +peers_model (new QStandardItemModel), +peers_view (new QTableView), bootstrap_label (new QLabel ("IPV6:port \"::ffff:192.168.0.1:7075\"")), bootstrap_line (new QLineEdit), peers_bootstrap (new QPushButton ("Initiate Bootstrap")), @@ -1478,8 +1478,13 @@ wallet (wallet_a) ledger_layout->setContentsMargins (0, 0, 0, 0); ledger_window->setLayout (ledger_layout); + peers_model->setHorizontalHeaderItem (0, new QStandardItem ("IPv6 address:port")); + peers_model->setHorizontalHeaderItem (1, new QStandardItem ("Net version")); peers_view->setEditTriggers (QAbstractItemView::NoEditTriggers); + peers_view->verticalHeader ()->hide (); peers_view->setModel (peers_model); + peers_view->setColumnWidth(0,220); + peers_view->setSortingEnabled(true); peers_layout->addWidget (peers_view); peers_layout->addWidget (bootstrap_label); peers_layout->addWidget (bootstrap_line); @@ -1598,7 +1603,6 @@ wallet (wallet_a) { this->wallet.push_main_stack (this->wallet.account_viewer.window); }); - refresh_ledger (); bootstrap->setToolTip ("Multi-connection bootstrap to random peers"); search_for_receivables->setToolTip ("Search for pending blocks"); create_block->setToolTip ("Create block in JSON format"); @@ -1607,22 +1611,20 @@ wallet (wallet_a) void rai_qt::advanced_actions::refresh_peers () { - auto list (wallet.node.peers.list ()); - std::sort (list.begin (), list.end (), [] (rai::endpoint const & lhs, rai::endpoint const & rhs) + peers_model->removeRows (0, peers_model->rowCount ()); + auto list (wallet.node.peers.list_version ()); + for (auto i (list.begin ()), n (list.end ()); i != n; ++i) { - return lhs < rhs; - }); - QStringList peers; - for (auto i: list) - { - std::stringstream endpoint; - endpoint << i.address ().to_string (); - endpoint << ':'; - endpoint << i.port (); - QString qendpoint (endpoint.str().c_str ()); - peers << qendpoint; - } - peers_model->setStringList (peers); + std::stringstream endpoint; + endpoint << i->first.address ().to_string (); + endpoint << ':'; + endpoint << i->first.port (); + QString qendpoint (endpoint.str().c_str ()); + QList items; + items.push_back (new QStandardItem (qendpoint)); + items.push_back (new QStandardItem (QString (std::to_string (i->second).c_str ()))); + peers_model->appendRow (items); + } } void rai_qt::advanced_actions::refresh_ledger () @@ -1671,7 +1673,7 @@ wallet (wallet_a) { show_label_ok (*status); this->status->setText (""); - this->wallet.node.process_receive_republish (std::move (block_l)); + this->wallet.node.process_active (std::move (block_l)); } else { diff --git a/rai/qt/qt.hpp b/rai/qt/qt.hpp index be68ffaa..f2f33ff0 100644 --- a/rai/qt/qt.hpp +++ b/rai/qt/qt.hpp @@ -81,8 +81,8 @@ namespace rai_qt { QWidget * peers_window; QVBoxLayout * peers_layout; - QStringListModel * peers_model; - QListView * peers_view; + QStandardItemModel * peers_model; + QTableView * peers_view; QLabel * bootstrap_label; QLineEdit * bootstrap_line; QPushButton * peers_bootstrap; @@ -262,9 +262,9 @@ namespace rai_qt { disconnected, working, locked, - synchronizing, vulnerable, active, + synchronizing, nominal }; class status diff --git a/rai/rai_wallet/entry.cpp b/rai/rai_wallet/entry.cpp index dd00bc2e..55ffac3a 100755 --- a/rai/rai_wallet/entry.cpp +++ b/rai/rai_wallet/entry.cpp @@ -205,78 +205,74 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost auto error (rai::fetch_object (config, config_path, config_file)); config_file.close (); if (!error) - { + { + boost::asio::io_service service; config.node.logging.init (data_path); std::shared_ptr node; std::shared_ptr gui; rai::set_application_icon (application); - std::thread node_thread ([&] () + rai::work_pool work (config.node.work_threads, rai::opencl_work::create (config.opencl_enable, config.opencl, config.node.logging)); + rai::alarm alarm (service); + rai::node_init init; + node = std::make_shared (init, service, data_path, alarm, config.node, work); + if (!init.error ()) { - boost::asio::io_service service; - rai::work_pool work (config.node.work_threads, rai::opencl_work::create (config.opencl_enable, config.opencl, config.node.logging)); - rai::alarm alarm (service); - rai::node_init init; - node = std::make_shared (init, service, data_path, alarm, config.node, work); - if (!init.error ()) + auto wallet (node->wallets.open (config.wallet)); + if (wallet == nullptr) { - auto wallet (node->wallets.open (config.wallet)); - if (wallet == nullptr) + auto existing (node->wallets.items.begin ()); + if (existing != node->wallets.items.end ()) { - auto existing (node->wallets.items.begin ()); - if (existing != node->wallets.items.end ()) - { - wallet = existing->second; - config.wallet = existing->first; - } - else - { - wallet = node->wallets.create (config.wallet); - } + wallet = existing->second; + config.wallet = existing->first; } - if (config.account.is_zero () || !wallet->exists (config.account)) + else { - rai::transaction transaction (wallet->store.environment, nullptr, true); - auto existing (wallet->store.begin (transaction)); - if (existing != wallet->store.end ()) - { - rai::uint256_union account (existing->first); - config.account = account; - } - else - { - config.account = wallet->deterministic_insert (transaction); - } + wallet = node->wallets.create (config.wallet); } - assert (wallet->exists (config.account)); - update_config (config, config_path, config_file); - node->start (); - rai::rpc rpc (service, *node, config.rpc); - if (config.rpc_enable) - { - rpc.start (); - } - rai::thread_runner runner (service, node->config.io_threads); - QObject::connect (&application, &QApplication::aboutToQuit, [&] () - { - rpc.stop (); - node->stop (); - }); - application.postEvent (&processor, new rai_qt::eventloop_event ([&] () - { - gui = std::make_shared (application, processor, *node, wallet, config.account); - splash->close(); - gui->start (); - gui->client_window->show (); - })); - runner.join (); } - else + if (config.account.is_zero () || !wallet->exists (config.account)) { - show_error ("Error initializing node"); + rai::transaction transaction (wallet->store.environment, nullptr, true); + auto existing (wallet->store.begin (transaction)); + if (existing != wallet->store.end ()) + { + rai::uint256_union account (existing->first); + config.account = account; + } + else + { + config.account = wallet->deterministic_insert (transaction); + } } - }); - result = application.exec (); - node_thread.join (); + assert (wallet->exists (config.account)); + update_config (config, config_path, config_file); + node->start (); + rai::rpc rpc (service, *node, config.rpc); + if (config.rpc_enable) + { + rpc.start (); + } + rai::thread_runner runner (service, node->config.io_threads); + QObject::connect (&application, &QApplication::aboutToQuit, [&] () + { + rpc.stop (); + node->stop (); + }); + application.postEvent (&processor, new rai_qt::eventloop_event ([&] () + { + gui = std::make_shared (application, processor, *node, wallet, config.account); + splash->close(); + gui->start (); + gui->client_window->show (); + })); + result = application.exec (); + runner.join (); + } + else + { + show_error ("Error initializing node"); + } update_config (config, config_path, config_file); } else diff --git a/rai/secure.cpp b/rai/secure.cpp old mode 100644 new mode 100755 index 99d1894f..71eac76b --- a/rai/secure.cpp +++ b/rai/secure.cpp @@ -61,9 +61,10 @@ genesis_account (rai::rai_network == rai::rai_networks::rai_test_network ? rai_t genesis_block (rai::rai_network == rai::rai_networks::rai_test_network ? rai_test_genesis : rai::rai_network == rai::rai_networks::rai_beta_network ? rai_beta_genesis : rai_live_genesis), genesis_amount (std::numeric_limits ::max ()) { + CryptoPP::AutoSeededRandomPool random_pool; // Randomly generating these mean no two nodes will ever have the same sentinal values which protects against some insecure algorithms - rai::random_pool.GenerateBlock (not_a_block.bytes.data (), not_a_block.bytes.size ()); - rai::random_pool.GenerateBlock (not_an_account.bytes.data (), not_an_account.bytes.size ()); + random_pool.GenerateBlock (not_a_block.bytes.data (), not_a_block.bytes.size ()); + random_pool.GenerateBlock (not_an_account.bytes.data (), not_an_account.bytes.size ()); } rai::keypair zero_key; rai::keypair test_genesis_key; @@ -76,7 +77,6 @@ std::string rai_live_genesis; rai::account genesis_account; std::string genesis_block; rai::uint128_t genesis_amount; -CryptoPP::AutoSeededRandomPool random_pool; rai::block_hash not_a_block; rai::account not_an_account; }; @@ -100,7 +100,6 @@ std::string const & rai::rai_live_genesis (globals.rai_live_genesis); rai::account const & rai::genesis_account (globals.genesis_account); std::string const & rai::genesis_block (globals.genesis_block); rai::uint128_t const & rai::genesis_amount (globals.genesis_amount); -CryptoPP::AutoSeededRandomPool & rai::random_pool (globals.random_pool); rai::block_hash const & rai::not_a_block (globals.not_a_block); rai::block_hash const & rai::not_an_account (globals.not_an_account); @@ -1508,7 +1507,6 @@ size_t rai::block_counts::sum () } rai::block_store::block_store (bool & error_a, boost::filesystem::path const & path_a) : -sequence_cache_count (0), environment (error_a, path_a), frontiers (0), accounts (0), @@ -2484,17 +2482,36 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u void rai::block_store::sequence_flush (MDB_txn * transaction_a) { - for (auto i (sequence_cache.begin ()), n (sequence_cache.end ()); i != n; ++i) + std::unordered_map sequence_cache_l; + { + std::lock_guard lock (sequence_mutex); + sequence_cache_l.swap (sequence_cache); + } + for (auto i (sequence_cache_l.begin ()), n (sequence_cache_l.end ()); i != n; ++i) { auto status1 (mdb_put (transaction_a, sequence, i->first.val (), rai::mdb_val (sizeof (i->second), &i->second), 0)); assert (status1 == 0); } - sequence_cache_count = 0; - sequence_cache.clear (); +} + +uint64_t rai::block_store::sequence_get (MDB_txn * transaction_a, rai::account const & account_a) +{ + uint64_t result (0); + MDB_val value; + auto status (mdb_get (transaction_a, sequence, account_a.val (), &value)); + assert (status == 0 || status == MDB_NOTFOUND); + if (status == 0) + { + rai::bufferstream stream (reinterpret_cast (value.mv_data), value.mv_size); + auto error (rai::read (stream, result)); + assert (!error); + } + return result; } uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::account const & account_a) { + assert (!sequence_mutex.try_lock ()); uint64_t result (0); auto existing (sequence_cache.find (account_a)); if (existing != sequence_cache.end ()) @@ -2503,44 +2520,28 @@ uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::accou } else { - MDB_val value; - auto status (mdb_get (transaction_a, sequence, account_a.val (), &value)); - assert (status == 0 || status == MDB_NOTFOUND); - if (status == 0) - { - rai::bufferstream stream (reinterpret_cast (value.mv_data), value.mv_size); - auto error (rai::read (stream, result)); - assert (!error); - } + result = sequence_get (transaction_a, account_a); } return result; } uint64_t rai::block_store::sequence_atomic_inc (MDB_txn * transaction_a, rai::account const & account_a) { + std::lock_guard lock (sequence_mutex); auto result (sequence_current (transaction_a, account_a)); result += 1; sequence_cache [account_a] = result; - ++sequence_cache_count; - if (sequence_cache_count > sequence_cache_max) - { - sequence_flush (transaction_a); - } return result; } uint64_t rai::block_store::sequence_atomic_observe (MDB_txn * transaction_a, rai::account const & account_a, uint64_t sequence_a) { + std::lock_guard lock (sequence_mutex); auto current (sequence_current (transaction_a, account_a)); auto result (std::max (current, sequence_a)); if (sequence_a > current) { sequence_cache [account_a] = sequence_a; - ++sequence_cache_count; - if (sequence_cache_count > sequence_cache_max) - { - sequence_flush (transaction_a); - } } return result; } diff --git a/rai/secure.hpp b/rai/secure.hpp index 3c561d10..5081dc60 100644 --- a/rai/secure.hpp +++ b/rai/secure.hpp @@ -388,19 +388,13 @@ public: bool checksum_get (MDB_txn *, uint64_t, uint8_t, rai::checksum &); void checksum_del (MDB_txn *, uint64_t, uint8_t); + uint64_t sequence_get (MDB_txn *, rai::account const &); uint64_t sequence_atomic_inc (MDB_txn *, rai::account const &); uint64_t sequence_atomic_observe (MDB_txn *, rai::account const &, uint64_t); uint64_t sequence_current (MDB_txn *, rai::account const &); void sequence_flush (MDB_txn *); + std::mutex sequence_mutex; std::unordered_map sequence_cache; - size_t sequence_cache_count; - static size_t const sequence_cache_max = 256; - // IO per sequence_atomic_max profiled with store.vote_load - // 1 - 1,900,000 - // 16 - 235,000 - // 128 - 27,000 - // 256 - 14,000 - // 1024 - 3,200 void version_put (MDB_txn *, int); int version_get (MDB_txn *); diff --git a/rai/slow_test/node.cpp b/rai/slow_test/node.cpp index 0e7bc651..baba206f 100644 --- a/rai/slow_test/node.cpp +++ b/rai/slow_test/node.cpp @@ -193,7 +193,8 @@ TEST (node, fork_storm) system.nodes [i]->generate_work (*open); auto open_result (system.nodes [i]->process (*open)); ASSERT_EQ (rai::process_result::progress, open_result.code); - system.nodes [i]->network.republish_block (open); + rai::transaction transaction (system.nodes [i]->store.environment, nullptr, false); + system.nodes [i]->network.republish_block (transaction, open); } } auto again (true); diff --git a/rai/utility.cpp b/rai/utility.cpp index 25507d35..b860656c 100644 --- a/rai/utility.cpp +++ b/rai/utility.cpp @@ -7,6 +7,8 @@ #include +thread_local CryptoPP::AutoSeededRandomPool rai::random_pool; + boost::filesystem::path rai::unique_path () { auto result (working_path () / boost::filesystem::unique_path ()); diff --git a/rai/utility.hpp b/rai/utility.hpp old mode 100644 new mode 100755 index 29aca1ab..f6a71821 --- a/rai/utility.hpp +++ b/rai/utility.hpp @@ -21,7 +21,9 @@ namespace rai { -extern CryptoPP::AutoSeededRandomPool & random_pool; +// Random pool used by RaiBlocks. +// This must be thread_local as long as the AutoSeededRandomPool implementation requires it +extern thread_local CryptoPP::AutoSeededRandomPool random_pool; // We operate on streams of uint8_t by convention using stream = std::basic_streambuf ; using bufferstream = boost::iostreams::stream_buffer >;