Merge remote-tracking branch 'refs/remotes/clemahieu/master' into avx2

This commit is contained in:
SergiySW 2017-09-30 19:46:52 +03:00
commit cc5646f087
24 changed files with 1134 additions and 867 deletions

View file

@ -1,8 +1,8 @@
cmake_minimum_required (VERSION 2.8.11)
project (rai)
set (CPACK_PACKAGE_VERSION_MAJOR "7")
set (CPACK_PACKAGE_VERSION_MINOR "9")
set (CPACK_PACKAGE_VERSION_MAJOR "8")
set (CPACK_PACKAGE_VERSION_MINOR "0")
set (CPACK_PACKAGE_VERSION_PATCH "0")
set (RAIBLOCKS_GUI OFF CACHE BOOL "")
@ -107,43 +107,10 @@ set (IPHLPAPI_LIBRARY iphlpapi CACHE STRING "")
add_subdirectory (miniupnp/miniupnpc)
include_directories (miniupnp/miniupnpc)
add_library (cryptopp
cryptopp/algparam.cpp
cryptopp/asn.cpp
cryptopp/basecode.cpp
cryptopp/cpu.cpp
cryptopp/cryptlib.cpp
cryptopp/default.cpp
cryptopp/des.cpp
cryptopp/dessp.cpp
cryptopp/dll.cpp
cryptopp/ec2n.cpp
cryptopp/ecp.cpp
cryptopp/filters.cpp
cryptopp/fips140.cpp
cryptopp/gcm.cpp
cryptopp/gf2n.cpp
cryptopp/gfpcrypt.cpp
cryptopp/hex.cpp
cryptopp/hmac.cpp
cryptopp/hrtimer.cpp
cryptopp/integer.cpp
cryptopp/iterhash.cpp
cryptopp/misc.cpp
cryptopp/modes.cpp
cryptopp/mqueue.cpp
cryptopp/nbtheory.cpp
cryptopp/oaep.cpp
cryptopp/osrng.cpp
cryptopp/pubkey.cpp
cryptopp/queue.cpp
cryptopp/randpool.cpp
cryptopp/rdtables.cpp
cryptopp/rijndael.cpp
cryptopp/rng.cpp
cryptopp/sha.cpp
cryptopp/simple.cpp
cryptopp/winpipes.cpp)
set (BUILD_SHARED OFF CACHE BOOL "")
set (BUILD_TESTING OFF CACHE BOOL "")
set (USE_INTERMEDIATE_OBJECTS_TARGET OFF CACHE BOOL "")
add_subdirectory (cryptopp)
add_library (argon2
Argon2/Source/Argon2/argon2.cpp
@ -245,7 +212,7 @@ if (RAIBLOCKS_TEST)
add_executable (slow_test
rai/slow_test/node.cpp)
set_target_properties (core_test slow_test PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}")
set_target_properties (core_test slow_test PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1")
set_target_properties (core_test slow_test PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}")
endif (RAIBLOCKS_TEST)
@ -272,7 +239,7 @@ if (RAIBLOCKS_GUI)
set_target_properties (rai_wallet qt_test PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}")
set_target_properties (qt_test qt rai_wallet qt_system PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}")
set_target_properties (qt_test qt rai_wallet qt_system PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1")
set_target_properties (qt qt_system PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}")
endif (RAIBLOCKS_GUI)
@ -281,11 +248,10 @@ add_executable (rai_node
rai/rai_node/daemon.hpp
rai/rai_node/entry.cpp)
set_target_properties (cryptopp PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS}")
set_target_properties (argon2 PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS}")
set_target_properties (blake2 PROPERTIES COMPILE_FLAGS "${PLATFORM_C_FLAGS} ${PLATFORM_COMPILE_FLAGS} -D__SSE2__")
set_target_properties (ed25519 PROPERTIES COMPILE_FLAGS "${PLATFORM_C_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DED25519_CUSTOMHASH -DED25519_CUSTOMRNG")
set_target_properties (secure node rai_node PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1 -DRAIBLOCKS_VERSION_PATCH=${CPACK_PACKAGE_VERSION_PATCH}")
set_target_properties (secure node rai_node PROPERTIES COMPILE_FLAGS "${PLATFORM_CXX_FLAGS} ${PLATFORM_COMPILE_FLAGS} -DQT_NO_KEYWORDS -DACTIVE_NETWORK=${ACTIVE_NETWORK} -DRAIBLOCKS_VERSION_MAJOR=${CPACK_PACKAGE_VERSION_MAJOR} -DRAIBLOCKS_VERSION_MINOR=${CPACK_PACKAGE_VERSION_MINOR} -DBOOST_ASIO_HAS_STD_ARRAY=1")
set_target_properties (secure node rai_node PROPERTIES LINK_FLAGS "${PLATFORM_LINK_FLAGS}")
if (WIN32)
@ -301,20 +267,20 @@ else (WIN32)
endif (WIN32)
if (RAIBLOCKS_TEST)
target_link_libraries (core_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (core_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (slow_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (slow_test node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static gtest_main gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
endif (RAIBLOCKS_TEST)
if (RAIBLOCKS_GUI)
target_link_libraries (qt_test node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets Qt5::Test ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (qt_test node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets Qt5::Test ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (qt_system node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (qt_system node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static gtest libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (rai_wallet node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS} ${PLATFORM_WALLET_LIBS})
target_link_libraries (rai_wallet node secure lmdb xxhash ed25519 qt argon2 blake2 cryptopp-static libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} Qt5::Core Qt5::Gui Qt5::Widgets ${QT_QTGUI_LIBRARY} ${PLATFORM_LIBS} ${PLATFORM_WALLET_LIBS})
endif (RAIBLOCKS_GUI)
target_link_libraries (rai_node node secure lmdb xxhash ed25519 argon2 blake2 cryptopp libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
target_link_libraries (rai_node node secure lmdb xxhash ed25519 argon2 blake2 cryptopp-static libminiupnpc-static ${Boost_ATOMIC_LIBRARY} ${Boost_CHRONO_LIBRARY} ${Boost_REGEX_LIBRARY} ${Boost_DATE_TIME_LIBRARY} ${Boost_FILESYSTEM_LIBRARY} ${Boost_SYSTEM_LIBRARY} ${Boost_LOG_LIBRARY} ${Boost_PROGRAM_OPTIONS_LIBRARY} ${Boost_LOG_SETUP_LIBRARY} ${Boost_THREAD_LIBRARY} ${PLATFORM_LIBS})
set (CPACK_RESOURCE_FILE_LICENSE ${CMAKE_SOURCE_DIR}/LICENSE)
if (RAIBLOCKS_GUI)

View file

@ -874,3 +874,19 @@ TEST (block_store, upgrade_v7_v8)
ASSERT_EQ (store.unchecked_end (), iterator1);
}
}
TEST (block_store, sequence_flush)
{
auto path (rai::unique_path ());
bool init (false);
rai::block_store store (init, path);
ASSERT_FALSE (init);
rai::transaction transaction (store.environment, nullptr, true);
rai::account account (0);
auto seq1 (store.sequence_atomic_inc (transaction, account));
auto seq2 (store.sequence_get (transaction, account));
ASSERT_NE (seq2, seq1);
store.sequence_flush(transaction);
auto seq3 (store.sequence_get (transaction, account));
ASSERT_EQ (seq3, seq1);
}

View file

@ -56,7 +56,7 @@ TEST (gap_cache, gap_bootstrap)
auto send (std::make_shared <rai::send_block> (latest, key.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest)));
{
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, true);
ASSERT_EQ (rai::process_result::progress, system.nodes [0]->process_receive_one (transaction, send).code);
ASSERT_EQ (rai::process_result::progress, system.nodes [0]->block_processor.process_receive_one (transaction, send).code);
}
ASSERT_EQ (rai::genesis_amount - 100, system.nodes [0]->balance (rai::genesis_account));
ASSERT_EQ (rai::genesis_amount, system.nodes [1]->balance (rai::genesis_account));
@ -83,11 +83,11 @@ TEST (gap_cache, two_dependencies)
auto send2 (std::make_shared <rai::send_block> (send1->hash (), key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1->hash ())));
auto open (std::make_shared <rai::open_block> (send1->hash (), key.pub, key.pub, key.prv, key.pub, system.work.generate (key.pub)));
ASSERT_EQ (0, system.nodes [0]->gap_cache.blocks.size ());
system.nodes [0]->process_receive_many (send2);
system.nodes [0]->block_processor.process_receive_many (send2);
ASSERT_EQ (1, system.nodes [0]->gap_cache.blocks.size ());
system.nodes [0]->process_receive_many (open);
system.nodes [0]->block_processor.process_receive_many (open);
ASSERT_EQ (2, system.nodes [0]->gap_cache.blocks.size ());
system.nodes [0]->process_receive_many (send1);
system.nodes [0]->block_processor.process_receive_many (send1);
ASSERT_EQ (0, system.nodes [0]->gap_cache.blocks.size ());
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
ASSERT_TRUE (system.nodes [0]->store.block_exists (transaction, send1->hash ()));

View file

@ -54,8 +54,8 @@ TEST (message, publish_serialization)
ASSERT_EQ (8, bytes.size ());
ASSERT_EQ (0x52, bytes [0]);
ASSERT_EQ (0x41, bytes [1]);
ASSERT_EQ (0x03, bytes [2]);
ASSERT_EQ (0x03, bytes [3]);
ASSERT_EQ (0x04, bytes [2]);
ASSERT_EQ (0x04, bytes [3]);
ASSERT_EQ (0x01, bytes [4]);
ASSERT_EQ (static_cast <uint8_t> (rai::message_type::publish), bytes [5]);
ASSERT_EQ (0x02, bytes [6]);
@ -68,8 +68,8 @@ TEST (message, publish_serialization)
std::bitset <16> extensions;
ASSERT_FALSE (rai::message::read_header (stream, version_max, version_using, version_min, type, extensions));
ASSERT_EQ (0x01, version_min);
ASSERT_EQ (0x03, version_using);
ASSERT_EQ (0x03, version_max);
ASSERT_EQ (0x04, version_using);
ASSERT_EQ (0x04, version_max);
ASSERT_EQ (rai::message_type::publish, type);
}

View file

@ -149,9 +149,10 @@ TEST (network, send_discarded_publish)
{
rai::system system (24000, 2);
auto block (std::make_shared <rai::send_block> (1, 1, 2, rai::keypair ().prv, 4, system.work.generate (1)));
system.nodes [0]->network.republish_block (block);
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
system.nodes [0]->network.republish_block (transaction, block);
rai::genesis genesis;
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes [1]->network.incoming.publish == 0)
@ -160,7 +161,7 @@ TEST (network, send_discarded_publish)
++iterations;
ASSERT_LT (iterations, 200);
}
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
}
@ -168,9 +169,10 @@ TEST (network, send_invalid_publish)
{
rai::system system (24000, 2);
auto block (std::make_shared <rai::send_block> (1, 1, 20, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (1)));
system.nodes [0]->network.republish_block (block);
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
system.nodes [0]->network.republish_block (transaction, block);
rai::genesis genesis;
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
auto iterations (0);
while (system.nodes [1]->network.incoming.publish == 0)
@ -179,7 +181,7 @@ TEST (network, send_invalid_publish)
++iterations;
ASSERT_LT (iterations, 200);
}
ASSERT_EQ (genesis.hash (), system.nodes [0]->latest (rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [0]->ledger.latest (transaction, rai::test_genesis_key.pub));
ASSERT_EQ (genesis.hash (), system.nodes [1]->latest (rai::test_genesis_key.pub));
}
@ -192,7 +194,7 @@ TEST (network, send_valid_confirm_ack)
rai::block_hash latest1 (system.nodes [0]->latest (rai::test_genesis_key.pub));
rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1));
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
// Keep polling until latest block changes
while (system.nodes [1]->latest (rai::test_genesis_key.pub) == latest2)
@ -215,7 +217,7 @@ TEST (network, send_valid_publish)
rai::send_block block2 (latest1, key2.pub, 50, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (latest1));
auto hash2 (block2.hash ());
rai::block_hash latest2 (system.nodes [1]->latest (rai::test_genesis_key.pub));
system.nodes [1]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (block2)));
system.nodes [1]->process_active (std::unique_ptr <rai::block> (new rai::send_block (block2)));
auto iterations (0);
while (system.nodes [0]->network.incoming.publish == 0)
{
@ -300,8 +302,8 @@ TEST (receivable_processor, send_with_receive)
ASSERT_EQ (0, system.nodes [0]->balance (key2.pub));
ASSERT_EQ (amount, system.nodes [1]->balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.nodes [1]->balance (key2.pub));
system.nodes [0]->process_receive_republish (block1);
system.nodes [1]->process_receive_republish (block1);
system.nodes [0]->process_active (block1);
system.nodes [1]->process_active (block1);
ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [0]->balance (rai::test_genesis_key.pub));
ASSERT_EQ (0, system.nodes [0]->balance (key2.pub));
ASSERT_EQ (amount - system.nodes [0]->config.receive_minimum.number (), system.nodes [1]->balance (rai::test_genesis_key.pub));
@ -513,6 +515,7 @@ TEST (bootstrap_processor, process_one)
++iterations;
ASSERT_LT (iterations, 200);
}
ASSERT_EQ (0, node1->active.roots.size ());
node1->stop ();
}

View file

@ -163,8 +163,8 @@ TEST (node, send_out_of_order)
rai::genesis genesis;
rai::send_block send1 (genesis.hash (), key2.pub, std::numeric_limits <rai::uint128_t>::max () - system.nodes [0]->config.receive_minimum.number (), rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ()));
rai::send_block send2 (send1.hash (), key2.pub, std::numeric_limits <rai::uint128_t>::max () - system.nodes [0]->config.receive_minimum.number () * 2, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (send1.hash ()));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (send2)));
system.nodes [0]->process_receive_republish (std::unique_ptr <rai::block> (new rai::send_block (send1)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (send2)));
system.nodes [0]->process_active (std::unique_ptr <rai::block> (new rai::send_block (send1)));
auto iterations (0);
while (std::any_of (system.nodes.begin (), system.nodes.end (), [&] (std::shared_ptr <rai::node> const & node_a) {return node_a->balance (rai::test_genesis_key.pub) != rai::genesis_amount - system.nodes [0]->config.receive_minimum.number () * 2;}))
{
@ -181,7 +181,7 @@ TEST (node, quick_confirm)
rai::block_hash previous (system.nodes [0]->latest (rai::test_genesis_key.pub));
system.wallet (0)->insert_adhoc (key.prv);
auto send (std::make_shared <rai::send_block> (previous, key.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (previous)));
system.nodes [0]->process_receive_republish (send);
system.nodes [0]->process_active (send);
auto iterations (0);
while (system.nodes [0]->balance (key.pub).is_zero ())
{
@ -210,7 +210,10 @@ TEST (node, auto_bootstrap)
ASSERT_FALSE (init1.error ());
node1->network.send_keepalive (system.nodes [0]->network.endpoint ());
node1->start ();
ASSERT_TRUE (node1->bootstrap_initiator.in_progress ());
while (!node1->bootstrap_initiator.in_progress ())
{
system.poll ();
}
auto iterations3 (0);
while (node1->balance (key2.pub) != system.nodes [0]->config.receive_minimum.number ())
{
@ -633,7 +636,8 @@ TEST (node, confirm_locked)
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
system.wallet (0)->enter_password ("1");
auto block (std::make_shared <rai::send_block> (0, 0, 0, rai::keypair ().prv, 0, 0));
system.nodes [0]->network.republish_block (block);
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
system.nodes [0]->network.republish_block (transaction, block);
}
TEST (node_config, random_rep)
@ -652,7 +656,7 @@ TEST (node, block_replace)
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
auto block1 (system.wallet (0)->send_action (rai::test_genesis_key.pub, 0, rai::Gxrb_ratio));
auto block3 (system.wallet (0)->send_action (rai::test_genesis_key.pub, 0, rai::Gxrb_ratio));
ASSERT_NE (nullptr, block1);
ASSERT_NE (nullptr, block1);
auto initial_work (block1->block_work ());
while (system.work.work_value (block1->root (), block1->block_work ()) <= system.work.work_value (block1->root (), initial_work))
{
@ -662,7 +666,11 @@ TEST (node, block_replace)
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
ASSERT_EQ (block3->hash (), system.nodes [0]->store.block_successor (transaction, block1->hash ()));
}
system.nodes [1]->network.republish_block (block1);
for (auto i (0); i < 1; ++i)
{
rai::transaction transaction_a (system.nodes [1]->store.environment, nullptr, false);
system.nodes [1]->network.republish_block (transaction_a, block1);
}
auto iterations1 (0);
std::unique_ptr <rai::block> block2;
while (block2 == nullptr)
@ -699,13 +707,13 @@ TEST (node, fork_publish)
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node1.process_receive_republish (send1);
node1.process_active (send1);
ASSERT_EQ (1, node1.active.roots.size ());
auto existing (node1.active.roots.find (send1->root ()));
ASSERT_NE (node1.active.roots.end (), existing);
auto election (existing->election);
ASSERT_EQ (2, election->votes.rep_votes.size ());
node1.process_receive_republish (send2);
node1.process_active (send2);
auto existing1 (election->votes.rep_votes.find (rai::test_genesis_key.pub));
ASSERT_NE (election->votes.rep_votes.end (), existing1);
ASSERT_EQ (*send1, *existing1->second);
@ -730,12 +738,12 @@ TEST (node, fork_keep)
// send1 and send2 fork to different accounts
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
auto send2 (std::make_shared <rai::send_block> (genesis.hash (), key2.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
node1.process_receive_republish (send1);
node2.process_receive_republish (send1);
node1.process_active (send1);
node2.process_active (send1);
ASSERT_EQ (1, node1.active.roots.size ());
ASSERT_EQ (1, node2.active.roots.size ());
node1.process_receive_republish (send2);
node2.process_receive_republish (send2);
node1.process_active (send2);
node2.process_active (send2);
auto conflict (node2.active.roots.find (genesis.hash ()));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
@ -889,8 +897,8 @@ TEST (node, fork_bootstrap_flip)
rai::keypair key2;
auto send2 (std::make_shared <rai::send_block> (latest, key2.pub, rai::genesis_amount - rai::Gxrb_ratio, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system0.work.generate (latest)));
// Insert but don't rebroadcast, simulating settled blocks
node1.process_receive_many (send1);
node2.process_receive_many (send2);
node1.block_processor.process_receive_many (send1);
node2.block_processor.process_receive_many (send2);
{
rai::transaction transaction (node2.store.environment, nullptr, false);
ASSERT_TRUE (node2.store.block_exists (transaction, send2->hash ()));
@ -952,22 +960,22 @@ TEST (node, fork_open_flip)
rai::keypair rep1;
rai::keypair rep2;
auto send1 (std::make_shared <rai::send_block> (genesis.hash (), key1.pub, rai::genesis_amount - 1, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
node1.process_receive_republish (send1);
node2.process_receive_republish (send1);
node1.process_active (send1);
node2.process_active (send1);
// We should be keeping this block
auto open1 (std::make_shared <rai::open_block> (send1->hash (), rep1.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
// This block should be evicted
auto open2 (std::make_shared <rai::open_block> (send1->hash (), rep2.pub, key1.pub, key1.prv, key1.pub, system.work.generate (key1.pub)));
ASSERT_FALSE (*open1 == *open2);
// node1 gets copy that will remain
node1.process_receive_republish (open1);
node1.process_active (open1);
// node2 gets copy that will be evicted
node2.process_receive_republish (open2);
node2.process_active (open2);
ASSERT_EQ (2, node1.active.roots.size ());
ASSERT_EQ (2, node2.active.roots.size ());
// Notify both nodes that a fork exists
node1.process_receive_republish (open2);
node2.process_receive_republish (open1);
node1.process_active (open2);
node2.process_active (open1);
auto conflict (node2.active.roots.find (open1->root ()));
ASSERT_NE (node2.active.roots.end (), conflict);
auto votes1 (conflict->election);
@ -996,10 +1004,10 @@ TEST (node, coherent_observer)
{
rai::system system (24000, 1);
auto & node1 (*system.nodes [0]);
node1.observers.blocks.add ([&node1] (rai::block const & block_a, rai::account const & account_a, rai::amount const &)
node1.observers.blocks.add ([&node1] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const &)
{
rai::transaction transaction (node1.store.environment, nullptr, false);
ASSERT_TRUE (node1.store.block_exists (transaction, block_a.hash ()));
ASSERT_TRUE (node1.store.block_exists (transaction, block_a->hash ()));
});
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key;
@ -1104,12 +1112,12 @@ TEST (node, broadcast_elected)
system.wallet (2)->insert_adhoc (rep_other.prv);
auto fork0 (std::make_shared <rai::send_block> (node2->latest (rai::test_genesis_key.pub), rep_small.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node0->generate_work (*fork0);
node0->process_receive_republish (fork0);
node1->process_receive_republish (fork0);
node0->process_active (fork0);
node1->process_active (fork0);
auto fork1 (std::make_shared <rai::send_block> (node2->latest (rai::test_genesis_key.pub), rep_big.pub, 0, rai::test_genesis_key.prv, rai::test_genesis_key.pub, 0));
node0->generate_work (*fork1);
system.wallet (2)->insert_adhoc (rep_small.prv);
node2->process_receive_republish (fork1);
node2->process_active (fork1);
//std::cerr << "fork0: " << fork_hash.to_string () << std::endl;
//std::cerr << "fork1: " << fork1.hash ().to_string () << std::endl;
auto iterations (0);
@ -1173,7 +1181,7 @@ TEST (node, bootstrap_no_publish)
node1->bootstrap_initiator.bootstrap (node0->network.endpoint ());
ASSERT_TRUE (node1->active.roots.empty ());
auto iterations1 (0);
while (node1->bootstrap_initiator.in_progress ())
while (node1->block (send0.hash ()) == nullptr)
{
// Poll until the TCP connection is torn down and in_progress goes false
system0.poll ();

View file

@ -290,7 +290,7 @@ TEST (rpc, send_fail)
request.put ("source", rai::test_genesis_key.pub.to_account ());
request.put ("destination", rai::test_genesis_key.pub.to_account ());
request.put ("amount", "100");
auto done (false);
std::atomic <bool> done (false);
std::thread thread2 ([&system, &done] ()
{
auto iterations (0);
@ -1221,6 +1221,7 @@ TEST (rpc, DISABLED_payment_wait)
TEST (rpc, peers)
{
rai::system system (24000, 2);
system.nodes [0]->peers.insert (rai::endpoint (boost::asio::ip::address_v6::from_string ("::ffff:80.80.80.80"), 4000), 1);
rai::rpc rpc (system.service, *system.nodes [0], rai::rpc_config (true));
rpc.start ();
boost::property_tree::ptree request;
@ -1232,7 +1233,7 @@ TEST (rpc, peers)
}
ASSERT_EQ (200, response.status);
auto & peers_node (response.json.get_child ("peers"));
ASSERT_EQ (1, peers_node.size ());
ASSERT_EQ (2, peers_node.size ());
}
TEST (rpc, pending)
@ -1359,7 +1360,7 @@ TEST (rpc, version)
ASSERT_EQ ("1", response1.json.get <std::string> ("rpc_version"));
ASSERT_EQ (200, response1.status);
ASSERT_EQ ("8", response1.json.get <std::string> ("store_version"));
ASSERT_EQ (boost::str (boost::format ("RaiBlocks %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH), response1.json.get <std::string> ("node_vendor"));
ASSERT_EQ (boost::str (boost::format ("RaiBlocks %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR), response1.json.get <std::string> ("node_vendor"));
auto headers (response1.resp.find ("Access-Control-Allow-Origin"));
ASSERT_NE (response1.resp.end (), headers);
ASSERT_EQ ("*", headers->value ());
@ -1432,7 +1433,7 @@ TEST (rpc, work_peer_bad)
rpc.start ();
node2.config.work_peers.push_back (std::make_pair (boost::asio::ip::address_v6::any (), 0));
rai::block_hash hash1 (1);
uint64_t work (0);
std::atomic <uint64_t> work (0);
node2.generate_work (hash1, [&work] (uint64_t work_a)
{
work = work_a;
@ -1958,7 +1959,7 @@ TEST (rpc, bootstrap_any)
ASSERT_TRUE (success.empty());
}
TEST (rpc, republish)
TEST (rpc, DISABLED_republish)
{
rai::system system (24000, 2);
rai::keypair key;
@ -2486,7 +2487,7 @@ TEST (rpc, search_pending_all)
}
}
TEST (rpc, wallet_republish)
TEST (rpc, DISABLED_wallet_republish)
{
rai::system system (24000, 1);
rai::genesis genesis;

View file

@ -52,7 +52,7 @@ TEST (work, cancel_many)
pool.cancel (key1);
}
TEST (work, opencl)
TEST (work, DISABLED_opencl)
{
rai::logging logging;
logging.init (rai::unique_path ());

View file

@ -150,21 +150,15 @@ rai::sync_result rai::push_synchronization::target (MDB_txn * transaction_a, rai
rai::bootstrap_client::bootstrap_client (std::shared_ptr <rai::node> node_a, std::shared_ptr <rai::bootstrap_attempt> attempt_a, rai::tcp_endpoint const & endpoint_a) :
node (node_a),
attempt (attempt_a),
socket (node_a->network.service),
pull_client (*this),
socket (node_a->service),
endpoint (endpoint_a),
timeout (node_a->network.service)
timeout (node_a->service)
{
++attempt->connections;
}
rai::bootstrap_client::~bootstrap_client ()
{
if (!pull_client.pull.account.is_zero ())
{
// If this connection is ending and request_account hasn't been cleared it didn't finish, requeue
attempt->requeue_pull (pull_client.pull);
--attempt->pulling;
}
--attempt->connections;
}
@ -179,6 +173,7 @@ void rai::bootstrap_client::start_timeout ()
auto this_l (this_w.lock ());
if (this_l != nullptr)
{
BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Disconnecting from %1% due to timeout") % this_l->endpoint);
this_l->socket.close ();
}
}
@ -187,7 +182,7 @@ void rai::bootstrap_client::start_timeout ()
void rai::bootstrap_client::stop_timeout ()
{
auto killed (timeout.expires_from_now ());
size_t killed (timeout.cancel ());
(void) killed;
}
@ -201,7 +196,7 @@ void rai::bootstrap_client::run ()
if (!ec)
{
BOOST_LOG (this_l->node->log) << boost::str (boost::format ("Connection established to %1%") % this_l->endpoint);
this_l->work ();
this_l->attempt->pool_connection (this_l->shared_from_this ());
}
else
{
@ -222,13 +217,6 @@ void rai::bootstrap_client::run ()
});
}
void rai::bootstrap_client::frontier_request ()
{
auto this_l (shared_from_this ());
auto client_l (std::make_shared <rai::frontier_req_client> (this_l));
client_l->run ();
}
void rai::frontier_req_client::run ()
{
std::unique_ptr <rai::frontier_req> request (new rai::frontier_req);
@ -264,7 +252,7 @@ std::shared_ptr <rai::bootstrap_client> rai::bootstrap_client::shared ()
return shared_from_this ();
}
rai::frontier_req_client::frontier_req_client (std::shared_ptr <rai::bootstrap_client> const & connection_a) :
rai::frontier_req_client::frontier_req_client (std::shared_ptr <rai::bootstrap_client> connection_a) :
connection (connection_a),
current (0),
count (0),
@ -276,23 +264,6 @@ next_report (std::chrono::system_clock::now () + std::chrono::seconds (15))
rai::frontier_req_client::~frontier_req_client ()
{
std::lock_guard <std::mutex> lock (connection->attempt->mutex);
if (connection->attempt->state == rai::attempt_state::requesting_frontiers)
{
if (connection->node->config.logging.network_logging ())
{
BOOST_LOG (connection->node->log) << "frontier_req failed, reattempting";
}
connection->attempt->state = rai::attempt_state::starting;
connection->attempt->pulls.clear ();
}
else
{
if (connection->node->config.logging.network_logging ())
{
BOOST_LOG (connection->node->log) << "Exiting frontier_req initiator";
}
}
}
void rai::frontier_req_client::receive_frontier ()
@ -325,7 +296,7 @@ void rai::frontier_req_client::unsynced (MDB_txn * transaction_a, rai::block_has
void rai::frontier_req_client::received_frontier (boost::system::error_code const & ec, size_t size_a)
{
if (!ec && connection->attempt->state != rai::attempt_state::complete)
if (!ec)
{
assert (size_a == sizeof (rai::uint256_union) + sizeof (rai::uint256_union));
rai::account account;
@ -417,7 +388,16 @@ void rai::frontier_req_client::received_frontier (boost::system::error_code cons
next (transaction);
}
}
connection->completed_frontier_request ();
{
try
{
promise.set_value (false);
}
catch (std::future_error &)
{
}
connection->attempt->pool_connection (connection);
}
}
}
else
@ -443,14 +423,20 @@ void rai::frontier_req_client::next (MDB_txn * transaction_a)
}
}
rai::bulk_pull_client::bulk_pull_client (rai::bootstrap_client & connection_a) :
connection (connection_a),
account_count (0)
rai::bulk_pull_client::bulk_pull_client (std::shared_ptr <rai::bootstrap_client> connection_a) :
connection (connection_a)
{
++connection->attempt->pulling;
}
rai::bulk_pull_client::~bulk_pull_client ()
{
--connection->attempt->pulling;
if (!pull.account.is_zero ())
{
connection->attempt->requeue_pull (pull);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % connection->endpoint);
}
}
void rai::bulk_pull_client::request (rai::pull_info const & pull_a)
@ -465,103 +451,106 @@ void rai::bulk_pull_client::request (rai::pull_info const & pull_a)
rai::vectorstream stream (*buffer);
req.serialize (stream);
}
if (connection.node->config.logging.bulk_pull_logging ())
if (connection->node->config.logging.bulk_pull_logging ())
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint);
}
else if (connection.node->config.logging.network_logging () && account_count % 256 == 0)
else if (connection->node->config.logging.network_logging () && connection->attempt->account_count++ % 256 == 0)
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection.endpoint);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Requesting account %1% down to %2% from %3%") % req.start.to_account () % req.end.to_string () % connection->endpoint);
}
++account_count;
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_write (connection.socket, boost::asio::buffer (buffer->data (), buffer->size ()), [connection_l, buffer] (boost::system::error_code const & ec, size_t size_a)
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_write (connection->socket, boost::asio::buffer (buffer->data (), buffer->size ()), [this_l, buffer] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
this_l->connection->stop_timeout ();
if (!ec)
{
connection_l->pull_client.receive_block ();
this_l->receive_block ();
}
else
{
BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % connection_l->endpoint);
BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error sending bulk pull request %1% to %2%") % ec.message () % this_l->connection->endpoint);
}
});
}
void rai::bulk_pull_client::receive_block ()
{
auto connection_l (connection.shared ());
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data (), 1), [connection_l] (boost::system::error_code const & ec, size_t size_a)
auto this_l (shared_from_this ());
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
this_l->connection->stop_timeout ();
if (!ec)
{
connection_l->pull_client.received_type ();
this_l->received_type ();
}
else
{
BOOST_LOG (connection_l->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ());
BOOST_LOG (this_l->connection->node->log) << boost::str (boost::format ("Error receiving block type %1%") % ec.message ());
}
});
}
void rai::bulk_pull_client::received_type ()
{
auto connection_l (connection.shared ());
rai::block_type type (static_cast <rai::block_type> (connection.receive_buffer [0]));
auto this_l (shared_from_this ());
rai::block_type type (static_cast <rai::block_type> (connection->receive_buffer [0]));
switch (type)
{
case rai::block_type::send:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::send_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::send_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::receive:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::receive_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::receive_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::open:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::open_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::open_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::change:
{
connection.start_timeout ();
boost::asio::async_read (connection.socket, boost::asio::buffer (connection.receive_buffer.data () + 1, rai::change_block::size), [connection_l] (boost::system::error_code const & ec, size_t size_a)
connection->start_timeout ();
boost::asio::async_read (connection->socket, boost::asio::buffer (connection->receive_buffer.data () + 1, rai::change_block::size), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
connection_l->stop_timeout ();
connection_l->pull_client.received_block (ec, size_a);
this_l->connection->stop_timeout ();
this_l->received_block (ec, size_a);
});
break;
}
case rai::block_type::not_a_block:
{
connection.completed_pull ();
if (expected == pull.end)
{
pull = rai::pull_info ();
connection->attempt->pool_connection (connection);
}
break;
}
default:
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast <int> (type));
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Unknown type received as block type: %1%") % static_cast <int> (type));
break;
}
}
@ -571,58 +560,53 @@ void rai::bulk_pull_client::received_block (boost::system::error_code const & ec
{
if (!ec)
{
rai::bufferstream stream (connection.receive_buffer.data (), 1 + size_a);
rai::bufferstream stream (connection->receive_buffer.data (), 1 + size_a);
std::shared_ptr <rai::block> block (rai::deserialize_block (stream));
if (block != nullptr)
{
auto hash (block->hash ());
if (connection.node->config.logging.bulk_pull_logging ())
if (connection->node->config.logging.bulk_pull_logging ())
{
std::string block_l;
block->serialize_json (block_l);
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l);
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Pulled block %1% %2%") % hash.to_string () % block_l);
}
if (hash == expected)
{
expected = block->previous ();
}
auto attempt (connection.attempt);
// Process the block asynchronously from making the next network requests since this is a potentially long operation
// Hold a reference to the current attempt so we don't start another one while blocks are being processed.
attempt->node->background ([attempt, block] ()
{
attempt->node->process_receive_many (block, [attempt] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
auto attempt_l (connection->attempt);
attempt_l->node->block_processor.add (block, [attempt_l] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
{
switch (result_a.code)
{
switch (result_a.code)
case rai::process_result::progress:
case rai::process_result::old:
break;
case rai::process_result::fork:
{
case rai::process_result::progress:
case rai::process_result::old:
break;
case rai::process_result::fork:
{
auto node_l (attempt->node);
std::shared_ptr <rai::block> block (node_l->ledger.forked_block (transaction_a, *block_a));
node_l->active.start (transaction_a, block);
attempt->node->network.broadcast_confirm_req (block_a);
attempt->node->network.broadcast_confirm_req (block);
BOOST_LOG (attempt->node->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ());
break;
}
default:
break;
auto node_l (attempt_l->node);
std::shared_ptr <rai::block> block (node_l->ledger.forked_block (transaction_a, *block_a));
node_l->active.start (transaction_a, block);
node_l->network.broadcast_confirm_req (block_a);
node_l->network.broadcast_confirm_req (block);
BOOST_LOG (node_l->log) << boost::str (boost::format ("Fork received in bootstrap between: %1% and %2% root %3%") % block_a->hash ().to_string () % block->hash ().to_string () % block_a->root ().to_string ());
break;
}
});
default:
break;
}
});
receive_block ();
}
else
{
BOOST_LOG (connection.node->log) << "Error deserializing block received from pull request";
BOOST_LOG (connection->node->log) << "Error deserializing block received from pull request";
}
}
else
{
BOOST_LOG (connection.node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ());
BOOST_LOG (connection->node->log) << boost::str (boost::format ("Error bulk receiving block: %1%") % ec.message ());
}
}
@ -638,22 +622,6 @@ synchronization (*connection->node, [this] (MDB_txn * transaction_a, rai::block
rai::bulk_push_client::~bulk_push_client ()
{
std::lock_guard <std::mutex> lock (connection->attempt->mutex);
if (connection->attempt->state == rai::attempt_state::pushing)
{
if (connection->node->config.logging.network_logging ())
{
BOOST_LOG (connection->node->log) << "Bulk push client failed";
}
connection->attempt->state = rai::attempt_state::complete;
}
else
{
if (connection->node->config.logging.network_logging ())
{
BOOST_LOG (connection->node->log) << "Exiting bulk push client";
}
}
}
void rai::bulk_push_client::start ()
@ -722,7 +690,13 @@ void rai::bulk_push_client::send_finished ()
auto this_l (shared_from_this ());
async_write (connection->socket, boost::asio::buffer (buffer->data (), 1), [this_l] (boost::system::error_code const & ec, size_t size_a)
{
this_l->connection->completed_pushes ();
try
{
this_l->promise.set_value (false);
}
catch (std::future_error &)
{
}
});
}
@ -776,52 +750,186 @@ rai::bootstrap_attempt::bootstrap_attempt (std::shared_ptr <rai::node> node_a) :
connections (0),
pulling (0),
node (node_a),
state (rai::attempt_state::starting)
account_count (0),
stopped (false)
{
BOOST_LOG (node->log) << "Starting bootstrap attempt";
node->bootstrap_initiator.notify_listeners (true);
}
rai::bootstrap_attempt::~bootstrap_attempt ()
{
node->bootstrap_initiator.notify_listeners ();
BOOST_LOG (node->log) << "Exiting bootstrap attempt";
node->bootstrap_initiator.notify_listeners (false);
}
bool rai::bootstrap_attempt::request_frontier (std::unique_lock <std::mutex> & lock_a)
{
auto result (true);
auto connection_l (connection (lock_a));
if (connection_l)
{
std::future <bool> future;
{
auto client (std::make_shared <rai::frontier_req_client> (connection_l));
client->run ();
frontiers = client;
future = client->promise.get_future ();
}
lock_a.unlock ();
result = consume_future (future);
lock_a.lock ();
if (result)
{
pulls.clear ();
}
if (node->config.logging.network_logging ())
{
if (!result)
{
BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % pulls.size () % connection_l->endpoint);
}
else
{
BOOST_LOG (node->log) << "frontier_req failed, reattempting";
}
}
}
return result;
}
void rai::bootstrap_attempt::request_pull (std::unique_lock <std::mutex> & lock_a)
{
auto connection_l (connection (lock_a));
if (connection_l)
{
auto pull (pulls.front ());
pulls.pop_front ();
auto client (std::make_shared <rai::bulk_pull_client> (connection_l));
client->request (pull);
}
}
bool rai::bootstrap_attempt::request_push (std::unique_lock <std::mutex> & lock_a)
{
auto result (true);
auto connection_l (connection (lock_a));
if (connection_l)
{
std::future <bool> future;
{
auto client (std::make_shared <rai::bulk_push_client> (connection_l));
client->start ();
push = client;
future = client->promise.get_future ();
}
lock_a.unlock ();
result = consume_future (future);
lock_a.lock ();
if (node->config.logging.network_logging ())
{
BOOST_LOG (node->log) << "Exiting bulk push client";
if (result)
{
BOOST_LOG (node->log) << "Bulk push client failed";
}
}
}
return result;
}
void rai::bootstrap_attempt::run ()
{
populate_connections ();
std::unique_lock <std::mutex> lock (mutex);
auto frontier_failure (true);
while (!stopped && frontier_failure)
{
frontier_failure = request_frontier (lock);
}
while (!stopped && (!pulls.empty () || pulling > 0))
{
if (!pulls.empty ())
{
request_pull (lock);
}
else
{
condition.wait (lock);
}
}
if (!stopped)
{
BOOST_LOG (node->log) << "Completed pulls";
}
auto push_failure (true);
while (!stopped && push_failure)
{
push_failure = request_push (lock);
}
stopped = true;
condition.notify_all ();
idle.clear ();
}
std::shared_ptr <rai::bootstrap_client> rai::bootstrap_attempt::connection (std::unique_lock <std::mutex> & lock_a)
{
while (!stopped && idle.empty ())
{
condition.wait (lock_a);
}
std::shared_ptr <rai::bootstrap_client> result;
if (!idle.empty ())
{
result = idle.back ();
idle.pop_back ();
}
return result;
}
bool rai::bootstrap_attempt::consume_future (std::future <bool> & future_a)
{
bool result;
try
{
result = future_a.get ();
}
catch (std::future_error &)
{
result = true;
}
return result;
}
void rai::bootstrap_attempt::populate_connections ()
{
if (++connections < node->config.bootstrap_connections)
if (connections < node->config.bootstrap_connections)
{
auto peer (node->peers.bootstrap_peer ());
if (peer != rai::endpoint ())
if (peer != rai::endpoint (boost::asio::ip::address_v6::any (), 0))
{
auto client (std::make_shared <rai::bootstrap_client> (node, shared_from_this (), rai::tcp_endpoint (peer.address (), peer.port ())));
client->run ();
std::lock_guard <std::mutex> lock (mutex);
clients.push_back (client);
}
else
{
--connections;
BOOST_LOG (node->log) << boost::str (boost::format ("Bootstrap stopped because there are no peers"));
stopped = true;
condition.notify_all ();
}
}
else
if (!stopped)
{
--connections;
}
std::weak_ptr <rai::bootstrap_attempt> this_w (shared_from_this ());
switch (state)
{
case rai::attempt_state::starting:
case rai::attempt_state::requesting_frontiers:
case rai::attempt_state::requesting_pulls:
node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] ()
std::weak_ptr <rai::bootstrap_attempt> this_w (shared_from_this ());
node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [this_w] ()
{
if (auto this_l = this_w.lock ())
{
if (auto this_l = this_w.lock ())
{
this_l->populate_connections ();
}
});
break;
default:
break;
this_l->populate_connections ();
}
});
}
}
@ -831,112 +939,44 @@ void rai::bootstrap_attempt::add_connection (rai::endpoint const & endpoint_a)
client->run ();
}
void rai::bootstrap_attempt::pool_connection (std::shared_ptr <rai::bootstrap_client> client_a)
{
std::lock_guard <std::mutex> lock (mutex);
idle.push_back (client_a);
condition.notify_all ();
}
void rai::bootstrap_attempt::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
state = rai::attempt_state::complete;
}
void rai::bootstrap_client::completed_frontier_request ()
{
stopped = true;
condition.notify_all ();
for (auto i : clients)
{
std::lock_guard <std::mutex> lock (attempt->mutex);
if (node->config.logging.network_logging ())
if (auto client = i.lock ())
{
BOOST_LOG (node->log) << boost::str (boost::format ("Completed frontier request, %1% out of sync accounts according to %2%") % attempt->pulls.size () % endpoint);
client->socket.close ();
}
attempt->state = rai::attempt_state::requesting_pulls;
}
work ();
}
void rai::bootstrap_client::completed_pull ()
{
if (pull_client.expected == pull_client.pull.end)
if (auto i = frontiers.lock ())
{
pull_client.pull = rai::pull_info ();
--attempt->pulling;
work ();
try
{
i->promise.set_value (true);
}
catch (std::future_error &)
{
}
}
else
if (auto i = push.lock ())
{
attempt->requeue_pull (pull_client.pull);
BOOST_LOG (node->log) << boost::str (boost::format ("Disconnecting from %1% because it didn't give us what we requested") % endpoint);
}
}
void rai::bootstrap_client::completed_pulls ()
{
BOOST_LOG (node->log) << "Completed pulls";
assert (node->bootstrap_initiator.in_progress ());
auto pushes (std::make_shared <rai::bulk_push_client> (shared_from_this ()));
pushes->start ();
}
void rai::bootstrap_client::completed_pushes ()
{
std::lock_guard <std::mutex> lock (attempt->mutex);
attempt->state = rai::attempt_state::complete;
}
void rai::bootstrap_client::poll ()
{
auto this_l (shared_from_this ());
attempt->node->alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (rai::rai_network == rai::rai_networks::rai_test_network ? 0 : 1), [this_l] ()
{
this_l->work ();
});
}
void rai::bootstrap_client::work ()
{
auto poll_l (false);
std::unique_lock <std::mutex> lock (attempt->mutex);
switch (attempt->state)
{
case rai::attempt_state::starting:
attempt->state = rai::attempt_state::requesting_frontiers;
lock.unlock ();
if (this->node->config.logging.network_logging ())
{
BOOST_LOG (this->node->log) << boost::str (boost::format ("Initiating frontier request to %1%") % endpoint);
}
frontier_request ();
break;
case rai::attempt_state::requesting_frontiers:
poll_l = true;
break;
case rai::attempt_state::requesting_pulls:
if (!attempt->pulls.empty ())
{
// There are more things to pull
auto pull (attempt->pulls.front ());
attempt->pulls.pop_front ();
++attempt->pulling;
lock.unlock ();
pull_client.request (pull);
}
else
{
if (attempt->pulling == 0)
{
attempt->state = rai::attempt_state::pushing;
lock.unlock ();
completed_pulls ();
}
else
{
poll_l = true;
}
}
break;
case rai::attempt_state::pushing:
case rai::attempt_state::complete:
break;
};
if (poll_l)
{
poll ();
try
{
i->promise.set_value (true);
}
catch (std::future_error &)
{
}
}
}
@ -947,6 +987,7 @@ void rai::bootstrap_attempt::requeue_pull (rai::pull_info const & pull_a)
{
std::lock_guard <std::mutex> lock (mutex);
pulls.push_front (pull);
condition.notify_all ();
}
else
{
@ -960,27 +1001,37 @@ stopped (false)
{
}
rai::bootstrap_initiator::~bootstrap_initiator ()
{
stop ();
}
void rai::bootstrap_initiator::bootstrap ()
{
std::lock_guard <std::mutex> lock (mutex);
if (attempt.lock () == nullptr && !stopped)
std::unique_lock <std::mutex> lock (mutex);
if (!stopped && attempt == nullptr)
{
auto attempt_l (std::make_shared <rai::bootstrap_attempt> (node.shared ()));
attempt = attempt_l;
attempt_l->populate_connections ();
stop_attempt (lock);
attempt = std::make_shared <rai::bootstrap_attempt> (node.shared ());
attempt_thread.reset (new std::thread ([this] ()
{
attempt->run ();
this->node.block_processor.flush ();
std::lock_guard <std::mutex> lock (mutex);
attempt.reset ();
condition.notify_all ();
}));
}
}
void rai::bootstrap_initiator::bootstrap (rai::endpoint const & endpoint_a)
{
node.peers.insert (endpoint_a, 0);
bootstrap ();
std::lock_guard <std::mutex> lock (mutex);
if (auto attempt_l = attempt.lock ())
if (attempt != nullptr)
{
if (!stopped)
{
attempt_l->add_connection (endpoint_a);
}
attempt->add_connection (endpoint_a);
}
}
@ -992,26 +1043,40 @@ void rai::bootstrap_initiator::add_observer (std::function <void (bool)> const &
bool rai::bootstrap_initiator::in_progress ()
{
return attempt.lock () != nullptr;
std::lock_guard <std::mutex> lock (mutex);
return attempt != nullptr;
}
void rai::bootstrap_initiator::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
std::unique_lock <std::mutex> lock (mutex);
stopped = true;
auto attempt_l (attempt.lock ());
if (attempt_l != nullptr)
stop_attempt (lock);
}
void rai::bootstrap_initiator::stop_attempt (std::unique_lock <std::mutex> & lock_a)
{
assert (!mutex.try_lock ());
if (attempt != nullptr)
{
attempt_l->stop ();
attempt->stop ();
}
while (attempt != nullptr)
{
condition.wait (lock_a);
}
if (attempt_thread)
{
attempt_thread->join ();
attempt_thread.reset ();
}
}
void rai::bootstrap_initiator::notify_listeners ()
void rai::bootstrap_initiator::notify_listeners (bool in_progress_a)
{
auto in_progress_l (in_progress ());
for (auto & i: observers)
{
i (in_progress_l);
i (in_progress_a);
}
}
@ -1483,7 +1548,7 @@ void rai::bulk_push_server::received_block (boost::system::error_code const & ec
{
if (!connection->node->bootstrap_initiator.in_progress ())
{
connection->node->process_receive_republish (std::move (block));
connection->node->process_active (std::move (block));
}
receive ();
}

View file

@ -4,6 +4,7 @@
#include <rai/node/common.hpp>
#include <atomic>
#include <future>
#include <queue>
#include <unordered_set>
#include <stack>
@ -48,14 +49,6 @@ public:
rai::node & node;
};
class bootstrap_client;
enum class attempt_state
{
starting,
requesting_frontiers,
requesting_pulls,
pushing,
complete
};
class pull_info
{
public:
@ -66,27 +59,41 @@ public:
rai::block_hash end;
unsigned attempts;
};
class frontier_req_client;
class bulk_push_client;
class bootstrap_attempt : public std::enable_shared_from_this <bootstrap_attempt>
{
public:
bootstrap_attempt (std::shared_ptr <rai::node> node_a);
~bootstrap_attempt ();
void run ();
std::shared_ptr <rai::bootstrap_client> connection (std::unique_lock <std::mutex> &);
bool consume_future (std::future <bool> &);
void populate_connections ();
bool request_frontier (std::unique_lock <std::mutex> &);
void request_pull (std::unique_lock <std::mutex> &);
bool request_push (std::unique_lock <std::mutex> &);
void add_connection (rai::endpoint const &);
void pool_connection (std::shared_ptr <rai::bootstrap_client>);
void stop ();
void requeue_pull (rai::pull_info const &);
std::deque <std::weak_ptr <rai::bootstrap_client>> clients;
std::weak_ptr <rai::frontier_req_client> frontiers;
std::weak_ptr <rai::bulk_push_client> push;
std::deque <rai::pull_info> pulls;
std::vector <std::shared_ptr <rai::bootstrap_client>> idle;
std::atomic <unsigned> connections;
std::atomic <unsigned> pulling;
std::atomic <unsigned> pulling;
std::shared_ptr <rai::node> node;
rai::attempt_state state;
std::unordered_set <rai::endpoint> attempted;
std::atomic <unsigned> account_count;
bool stopped;
std::mutex mutex;
std::condition_variable condition;
};
class frontier_req_client : public std::enable_shared_from_this <rai::frontier_req_client>
{
public:
frontier_req_client (std::shared_ptr <rai::bootstrap_client> const &);
frontier_req_client (std::shared_ptr <rai::bootstrap_client>);
~frontier_req_client ();
void run ();
void receive_frontier ();
@ -99,19 +106,19 @@ public:
rai::account_info info;
unsigned count;
std::chrono::system_clock::time_point next_report;
std::promise <bool> promise;
};
class bulk_pull_client
class bulk_pull_client : public std::enable_shared_from_this <rai::bulk_pull_client>
{
public:
bulk_pull_client (rai::bootstrap_client &);
bulk_pull_client (std::shared_ptr <rai::bootstrap_client>);
~bulk_pull_client ();
void request (rai::pull_info const &);
void receive_block ();
void received_type ();
void received_block (boost::system::error_code const &, size_t);
rai::block_hash first ();
rai::bootstrap_client & connection;
size_t account_count;
std::shared_ptr <rai::bootstrap_client> connection;
rai::block_hash expected;
rai::pull_info pull;
};
@ -121,14 +128,6 @@ public:
bootstrap_client (std::shared_ptr <rai::node>, std::shared_ptr <rai::bootstrap_attempt>, rai::tcp_endpoint const &);
~bootstrap_client ();
void run ();
void frontier_request ();
void work ();
void poll ();
void completed_frontier_request ();
void sent_request (boost::system::error_code const &, size_t);
void completed_pull ();
void completed_pulls ();
void completed_pushes ();
std::shared_ptr <rai::bootstrap_client> shared ();
void start_timeout ();
void stop_timeout ();
@ -136,7 +135,6 @@ public:
std::shared_ptr <rai::bootstrap_attempt> attempt;
boost::asio::ip::tcp::socket socket;
std::array <uint8_t, 200> receive_buffer;
rai::bulk_pull_client pull_client;
rai::tcp_endpoint endpoint;
boost::asio::deadline_timer timeout;
};
@ -151,22 +149,27 @@ public:
void send_finished ();
std::shared_ptr <rai::bootstrap_client> connection;
rai::push_synchronization synchronization;
std::promise <bool> promise;
};
class bootstrap_initiator
{
public:
bootstrap_initiator (rai::node &);
~bootstrap_initiator ();
void bootstrap (rai::endpoint const &);
void bootstrap ();
void notify_listeners ();
void notify_listeners (bool);
void add_observer (std::function <void (bool)> const &);
bool in_progress ();
void stop ();
void stop_attempt (std::unique_lock <std::mutex> &);
rai::node & node;
std::weak_ptr <rai::bootstrap_attempt> attempt;
std::shared_ptr <rai::bootstrap_attempt> attempt;
std::unique_ptr <std::thread> attempt_thread;
bool stopped;
private:
std::mutex mutex;
std::condition_variable condition;
std::vector <std::function <void (bool)>> observers;
};
class bootstrap_listener

View file

@ -8,8 +8,8 @@ size_t constexpr rai::message::bootstrap_server_position;
std::bitset <16> constexpr rai::message::block_type_mask;
rai::message::message (rai::message_type type_a) :
version_max (0x03),
version_using (0x03),
version_max (0x04),
version_using (0x04),
version_min (0x01),
type (type_a)
{

View file

@ -39,10 +39,9 @@ confirm_ack (0)
{
}
rai::network::network (boost::asio::io_service & service_a, uint16_t port, rai::node & node_a) :
socket (service_a, rai::endpoint (boost::asio::ip::address_v6::any (), port)),
service (service_a),
resolver (service_a),
rai::network::network (rai::node & node_a, uint16_t port) :
socket (node_a.service, rai::endpoint (boost::asio::ip::address_v6::any (), port)),
resolver (node_a.service),
node (node_a),
bad_sender_count (0),
on (true),
@ -166,16 +165,15 @@ void rai::network::rebroadcast_reps (std::shared_ptr <rai::block> block_a)
}
template <typename T>
bool confirm_block (rai::node & node_a, T & list_a, std::shared_ptr <rai::block> block_a)
bool confirm_block (MDB_txn * transaction_a, rai::node & node_a, T & list_a, std::shared_ptr <rai::block> block_a)
{
bool result (false);
if (node_a.config.enable_voting)
{
rai::transaction transaction (node_a.store.environment, nullptr, true);
node_a.wallets.foreach_representative (transaction, [&result, &block_a, &list_a, &node_a, &transaction] (rai::public_key const & pub_a, rai::raw_key const & prv_a)
node_a.wallets.foreach_representative (transaction_a, [&result, &block_a, &list_a, &node_a, &transaction_a] (rai::public_key const & pub_a, rai::raw_key const & prv_a)
{
result = true;
auto sequence (node_a.store.sequence_atomic_inc (transaction, pub_a));
auto sequence (node_a.store.sequence_atomic_inc (transaction_a, pub_a));
rai::vote vote (pub_a, prv_a, sequence, block_a);
rai::confirm_ack confirm (vote);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
@ -193,21 +191,21 @@ bool confirm_block (rai::node & node_a, T & list_a, std::shared_ptr <rai::block>
}
template <>
bool confirm_block (rai::node & node_a, rai::endpoint & peer_a, std::shared_ptr <rai::block> block_a)
bool confirm_block (MDB_txn * transaction_a, rai::node & node_a, rai::endpoint & peer_a, std::shared_ptr <rai::block> block_a)
{
std::array <rai::endpoint, 1> endpoints;
endpoints [0] = peer_a;
auto result (confirm_block (node_a, endpoints, std::move (block_a)));
auto result (confirm_block (transaction_a, node_a, endpoints, std::move (block_a)));
return result;
}
void rai::network::republish_block (std::shared_ptr <rai::block> block)
void rai::network::republish_block (MDB_txn * transaction, std::shared_ptr <rai::block> block)
{
rebroadcast_reps (block);
auto hash (block->hash ());
auto list (node.peers.list_sqrt ());
// If we're a representative, broadcast a signed confirm, otherwise an unsigned publish
if (!confirm_block (node, list, block))
if (!confirm_block (transaction, node, list, block))
{
rai::publish message (block);
std::shared_ptr <std::vector <uint8_t>> bytes (new std::vector <uint8_t>);
@ -360,7 +358,7 @@ public:
++node.network.incoming.publish;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.process_receive_republish (message_a.block);
node.process_active (message_a.block);
}
void confirm_req (rai::confirm_req const & message_a) override
{
@ -371,10 +369,11 @@ public:
++node.network.incoming.confirm_req;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.process_receive_republish (message_a.block);
if (node.ledger.block_exists (message_a.block->hash ()))
node.process_active (message_a.block);
rai::transaction transaction_a (node.store.environment, nullptr, false);
if (node.store.block_exists (transaction_a, message_a.block->hash ()))
{
confirm_block (node, sender, message_a.block);
confirm_block (transaction_a, node, sender, message_a.block);
}
}
void confirm_ack (rai::confirm_ack const & message_a) override
@ -386,7 +385,7 @@ public:
++node.network.incoming.confirm_ack;
node.peers.contacted (sender, message_a.version_using);
node.peers.insert (sender, message_a.version_using);
node.process_receive_republish (message_a.vote.block);
node.process_active (message_a.vote.block);
node.vote_processor.vote (message_a.vote, sender);
}
void bulk_pull (rai::bulk_pull const &) override
@ -996,7 +995,7 @@ rai::vote_result rai::vote_processor::vote (rai::vote const & vote_a, rai::endpo
{
rai::vote_result result;
{
rai::transaction transaction (node.store.environment, nullptr, true);
rai::transaction transaction (node.store.environment, nullptr, false);
result = vote_a.validate (transaction, node.store);
}
if (node.config.logging.vote_logging ())
@ -1045,12 +1044,237 @@ bool rai::rep_crawler::exists (rai::block_hash const & hash_a)
return active.count (hash_a) != 0;
}
rai::block_processor::block_processor (rai::node & node_a) :
stopped (false),
idle (true),
node (node_a),
thread ([this] () { process_blocks (); })
{
}
rai::block_processor::~block_processor ()
{
stop ();
thread.join ();
}
void rai::block_processor::stop ()
{
std::lock_guard <std::mutex> lock (mutex);
stopped = true;
condition.notify_all ();
}
void rai::block_processor::flush ()
{
std::unique_lock <std::mutex> lock (mutex);
while (!stopped && (!blocks.empty () || !idle))
{
condition.wait (lock);
}
}
void rai::block_processor::add (std::shared_ptr <rai::block> block_a, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> action_a)
{
std::lock_guard <std::mutex> lock (mutex);
blocks.push_back (std::make_pair (block_a, action_a));
condition.notify_all ();
}
void rai::block_processor::process_blocks ()
{
std::unique_lock <std::mutex> lock (mutex);
while (!stopped)
{
if (!blocks.empty ())
{
{
auto info (blocks.front ());
blocks.pop_front ();
lock.unlock ();
process_receive_many (info.first, info.second);
}
// Let other threads get an opportunity to transaction lock
std::this_thread::yield ();
lock.lock ();
}
else
{
idle = true;
condition.notify_all ();
condition.wait (lock);
idle = false;
}
}
}
void rai::block_processor::process_receive_many (std::shared_ptr <rai::block> block_a, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
{
std::vector <std::shared_ptr <rai::block>> blocks;
blocks.push_back (block_a);
while (!blocks.empty ())
{
std::deque <std::pair <std::shared_ptr <rai::block>, rai::process_return>> progress;
{
rai::transaction transaction (node.store.environment, nullptr, true);
auto count (0);
while (!blocks.empty () && count < rai::blocks_per_transaction)
{
auto block (blocks.back ());
blocks.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction, block));
completed_a (transaction, process_result, block);
switch (process_result.code)
{
case rai::process_result::progress:
{
progress.push_back (std::make_pair (block, process_result));
}
case rai::process_result::old:
{
auto cached (node.store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
node.store.unchecked_del (transaction, hash, **i);
blocks.push_back (std::move (*i));
}
std::lock_guard <std::mutex> lock (node.gap_cache.mutex);
node.gap_cache.blocks.get <1> ().erase (hash);
break;
}
default:
break;
}
++count;
}
}
for (auto & i : progress)
{
node.observers.blocks (i.first, i.second.account, i.second.amount);
}
}
}
rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr <rai::block> block_a)
{
rai::process_return result;
result = node.ledger.process (transaction_a, *block_a);
switch (result.code)
{
case rai::process_result::progress:
{
if (node.config.logging.ledger_logging ())
{
std::string block;
block_a->serialize_json (block);
BOOST_LOG (node.log) << boost::str (boost::format ("Processing block %1% %2%") % block_a->hash ().to_string () % block);
}
break;
}
case rai::process_result::gap_previous:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
}
node.store.unchecked_put (transaction_a, block_a->previous (), block_a);
node.gap_cache.add (transaction_a, block_a);
break;
}
case rai::process_result::gap_source:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
}
node.store.unchecked_put (transaction_a, block_a->source (), block_a);
node.gap_cache.add (transaction_a, block_a);
break;
}
case rai::process_result::old:
{
{
auto root (block_a->root ());
auto hash (block_a->hash ());
auto existing (node.store.block_get (transaction_a, hash));
if (existing != nullptr)
{
// Replace block with one that has higher work value
if (node.work.work_value (root, block_a->block_work ()) > node.work.work_value (root, existing->block_work ()))
{
node.store.block_put (transaction_a, hash, *block_a, node.store.block_successor (transaction_a, hash));
}
}
else
{
// Could have been rolled back, maybe
}
}
if (node.config.logging.ledger_duplicate_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::bad_signature:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::overspend:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Overspend for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::unreceivable:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::not_receive_from_send:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::fork:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());
}
break;
}
case rai::process_result::account_mismatch:
{
if (node.config.logging.ledger_logging ())
{
BOOST_LOG (node.log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ());
}
}
}
return result;
}
rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, uint16_t peering_port_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::logging const & logging_a, rai::work_pool & work_a) :
node (init_a, service_a, application_path_a, alarm_a, rai::node_config (peering_port_a, logging_a), work_a)
{
}
rai::node::node (rai::node_init & init_a, boost::asio::io_service & service_a, boost::filesystem::path const & application_path_a, rai::alarm & alarm_a, rai::node_config const & config_a, rai::work_pool & work_a) :
service (service_a),
config (config_a),
alarm (alarm_a),
work (work_a),
@ -1059,14 +1283,15 @@ gap_cache (*this),
ledger (store, config_a.inactive_supply.number ()),
active (*this),
wallets (init_a.block_store_init, *this),
network (service_a, config.peering_port, *this),
network (*this, config.peering_port),
bootstrap_initiator (*this),
bootstrap (service_a, config.peering_port, *this),
peers (network.endpoint ()),
application_path (application_path_a),
port_mapping (*this),
vote_processor (*this),
warmed_up (0)
warmed_up (0),
block_processor (*this)
{
store.environment.sizing_action = [this] ()
{
@ -1088,101 +1313,115 @@ warmed_up (0)
{
observers.disconnect ();
};
observers.blocks.add ([this] (rai::block const & block_a, rai::account const & account_a, rai::amount const & amount_a)
observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a)
{
if (!config.callback_address.empty ())
if (this->block_arrival.recent (block_a->hash ()))
{
rai::transaction transaction (store.environment, nullptr, true);
active.start (transaction, block_a);
}
});
observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a)
{
if (this->block_arrival.recent (block_a->hash ()))
{
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a.hash ().to_string ());
std::string block_text;
block_a.serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared <std::string> (ostream.str ()));
auto address (config.callback_address);
auto port (config.callback_port);
auto target (std::make_shared <std::string> (config.callback_target));
auto node_l (shared_from_this ());
auto resolver (std::make_shared <boost::asio::ip::tcp::resolver> (network.service));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a)
background ([node_l, block_a, account_a, amount_a] ()
{
if (!ec)
if (!node_l->config.callback_address.empty ())
{
for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i)
boost::property_tree::ptree event;
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", amount_a.to_string_dec ());
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
ostream.flush ();
auto body (std::make_shared <std::string> (ostream.str ()));
auto address (node_l->config.callback_address);
auto port (node_l->config.callback_port);
auto target (std::make_shared <std::string> (node_l->config.callback_target));
auto resolver (std::make_shared <boost::asio::ip::tcp::resolver> (node_l->service));
resolver->async_resolve (boost::asio::ip::tcp::resolver::query (address, std::to_string (port)), [node_l, address, port, target, body, resolver] (boost::system::error_code const & ec, boost::asio::ip::tcp::resolver::iterator i_a)
{
auto sock (std::make_shared <boost::asio::ip::tcp::socket> (node_l->network.service));
sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec)
if (!ec)
{
if (!ec)
for (auto i (i_a), n (boost::asio::ip::tcp::resolver::iterator {}); i != n; ++i)
{
auto req (std::make_shared <boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version = 11;
req->insert(boost::beast::http::field::host, address);
req->body = *body;
//req->prepare (*req);
//boost::beast::http::prepare(req);
req->prepare_payload();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec)
auto sock (std::make_shared <boost::asio::ip::tcp::socket> (node_l->service));
sock->async_connect (i->endpoint(), [node_l, target, body, sock, address, port] (boost::system::error_code const & ec)
{
if (!ec)
{
auto sb (std::make_shared <boost::beast::flat_buffer> ());
auto resp (std::make_shared <boost::beast::http::response <boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec)
auto req (std::make_shared <boost::beast::http::request<boost::beast::http::string_body>> ());
req->method (boost::beast::http::verb::post);
req->target (*target);
req->version = 11;
req->insert(boost::beast::http::field::host, address);
req->body = *body;
//req->prepare (*req);
//boost::beast::http::prepare(req);
req->prepare_payload();
boost::beast::http::async_write (*sock, *req, [node_l, sock, address, port, req] (boost::system::error_code const & ec)
{
if (!ec)
{
if (resp->result() == boost::beast::http::status::ok)
auto sb (std::make_shared <boost::beast::flat_buffer> ());
auto resp (std::make_shared <boost::beast::http::response <boost::beast::http::string_body>> ());
boost::beast::http::async_read (*sock, *sb, *resp, [node_l, sb, resp, sock, address, port] (boost::system::error_code const & ec)
{
}
else
{
if (node_l->config.logging.callback_logging ())
if (!ec)
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result());
if (resp->result() == boost::beast::http::status::ok)
{
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Callback to %1%:%2% failed with status: %3%") % address % port % resp->result());
}
}
}
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ());
}
};
});
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable complete callback: %1%:%2% %3%") % address % port % ec.message ());
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ());
}
};
}
});
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to send callback: %1%:%2% %3%") % address % port % ec.message ());
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ());
}
}
});
}
else
}
else
{
if (node_l->config.logging.callback_logging ())
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Unable to connect to callback address: %1%:%2%, %3%") % address % port % ec.message ());
}
BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ());
}
});
}
}
else
{
if (node_l->config.logging.callback_logging ())
{
BOOST_LOG (node_l->log) << boost::str (boost::format ("Error resolving callback: %1%:%2%, %3%") % address % port % ec.message ());
}
}
});
}
});
}
@ -1212,7 +1451,7 @@ warmed_up (0)
}
}
});
BOOST_LOG (log) << "Node starting, version: " << RAIBLOCKS_VERSION_MAJOR << "." << RAIBLOCKS_VERSION_MINOR << "." << RAIBLOCKS_VERSION_PATCH;
BOOST_LOG (log) << "Node starting, version: " << RAIBLOCKS_VERSION_MAJOR << "." << RAIBLOCKS_VERSION_MINOR;
BOOST_LOG (log) << boost::str (boost::format ("Work pool running %1% threads") % work.threads.size ());
if (!init_a.error ())
{
@ -1354,187 +1593,14 @@ void rai::network::confirm_send (rai::confirm_ack const & confirm_a, std::shared
});
}
void rai::node::process_receive_republish (std::shared_ptr <rai::block> incoming)
void rai::node::process_active (std::shared_ptr <rai::block> incoming)
{
std::vector <std::tuple <rai::process_return, std::shared_ptr <rai::block>>> completed;
block_arrival.add (incoming->hash ());
block_processor.process_receive_many (incoming);
if (rai::rai_network == rai::rai_networks::rai_test_network)
{
assert (incoming != nullptr);
process_receive_many (incoming, [this, &completed] (MDB_txn * transaction_a, rai::process_return result_a, std::shared_ptr <rai::block> block_a)
{
switch (result_a.code)
{
case rai::process_result::progress:
{
auto node_l (shared_from_this ());
active.start (transaction_a, block_a);
completed.push_back (std::make_tuple (result_a, block_a));
break;
}
default:
{
break;
}
}
});
block_processor.flush ();
}
for (auto & i: completed)
{
observers.blocks (*std::get <1> (i), std::get <0> (i).account, std::get <0>(i).amount);
}
}
void rai::node::process_receive_many (std::shared_ptr <rai::block> block_a, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> completed_a)
{
std::vector <std::shared_ptr <rai::block>> blocks;
blocks.push_back (block_a);
while (!blocks.empty ())
{
{
rai::transaction transaction (store.environment, nullptr, true);
auto count (0);
while (!blocks.empty () && count < rai::blocks_per_transaction)
{
auto block (blocks.back ());
blocks.pop_back ();
auto hash (block->hash ());
auto process_result (process_receive_one (transaction, block));
completed_a (transaction, process_result, block);
switch (process_result.code)
{
case rai::process_result::progress:
case rai::process_result::old:
{
auto cached (store.unchecked_get (transaction, hash));
for (auto i (cached.begin ()), n (cached.end ()); i != n; ++i)
{
store.unchecked_del (transaction, hash, **i);
blocks.push_back (std::move (*i));
}
std::lock_guard <std::mutex> lock (gap_cache.mutex);
gap_cache.blocks.get <1> ().erase (hash);
break;
}
default:
break;
}
++count;
}
}
// Let other threads get an opportunity to transaction lock
std::this_thread::yield ();
}
}
rai::process_return rai::node::process_receive_one (MDB_txn * transaction_a, std::shared_ptr <rai::block> block_a)
{
rai::process_return result;
result = ledger.process (transaction_a, *block_a);
switch (result.code)
{
case rai::process_result::progress:
{
if (config.logging.ledger_logging ())
{
std::string block;
block_a->serialize_json (block);
BOOST_LOG (log) << boost::str (boost::format ("Processing block %1% %2%") % block_a->hash ().to_string () % block);
}
break;
}
case rai::process_result::gap_previous:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Gap previous for: %1%") % block_a->hash ().to_string ());
}
store.unchecked_put (transaction_a, block_a->previous (), block_a);
gap_cache.add (transaction_a, block_a);
break;
}
case rai::process_result::gap_source:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Gap source for: %1%") % block_a->hash ().to_string ());
}
store.unchecked_put (transaction_a, block_a->source (), block_a);
gap_cache.add (transaction_a, block_a);
break;
}
case rai::process_result::old:
{
{
auto root (block_a->root ());
auto hash (block_a->hash ());
auto existing (store.block_get (transaction_a, hash));
if (existing != nullptr)
{
// Replace block with one that has higher work value
if (work.work_value (root, block_a->block_work ()) > work.work_value (root, existing->block_work ()))
{
store.block_put (transaction_a, hash, *block_a, store.block_successor (transaction_a, hash));
}
}
else
{
// Could have been rolled back, maybe
}
}
if (config.logging.ledger_duplicate_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Old for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::bad_signature:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Bad signature for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::overspend:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Overspend for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::unreceivable:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Unreceivable for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::not_receive_from_send:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Not receive from send for: %1%") % block_a->hash ().to_string ());
}
break;
}
case rai::process_result::fork:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Fork for: %1% root: %2%") % block_a->hash ().to_string () % block_a->root ().to_string ());
}
break;
}
case rai::process_result::account_mismatch:
{
if (config.logging.ledger_logging ())
{
BOOST_LOG (log) << boost::str (boost::format ("Account mismatch for: %1%") % block_a->hash ().to_string ());
}
}
}
return result;
}
rai::process_return rai::node::process (rai::block const & block_a)
@ -1570,6 +1636,17 @@ std::vector <rai::endpoint> rai::peer_container::list ()
return result;
}
std::map <rai::endpoint, unsigned> rai::peer_container::list_version ()
{
std::map <rai::endpoint, unsigned> result;
std::lock_guard <std::mutex> lock (mutex);
for (auto i (peers.begin ()), j (peers.end ()); i != j; ++i)
{
result.insert (std::pair <rai::endpoint, unsigned> (i->endpoint, i->network_version));
}
return result;
}
rai::endpoint rai::peer_container::bootstrap_peer ()
{
rai::endpoint result (boost::asio::ip::address_v6::any (), 0);
@ -1666,6 +1743,7 @@ void rai::node::start ()
network.receive ();
ongoing_keepalive ();
ongoing_bootstrap ();
ongoing_vote_flush ();
ongoing_rep_crawl ();
bootstrap.start ();
backup_wallet ();
@ -1678,6 +1756,7 @@ void rai::node::start ()
void rai::node::stop ()
{
BOOST_LOG (log) << "Node stopping";
block_processor.stop ();
active.stop ();
network.stop ();
bootstrap_initiator.stop ();
@ -1797,6 +1876,22 @@ void rai::node::ongoing_bootstrap ()
});
}
void rai::node::ongoing_vote_flush ()
{
{
rai::transaction transaction (store.environment, nullptr, true);
store.sequence_flush (transaction);
}
std::weak_ptr <rai::node> node_w (shared_from_this ());
alarm.add (std::chrono::system_clock::now () + std::chrono::seconds (5), [node_w] ()
{
if (auto node_l = node_w.lock ())
{
node_l->ongoing_vote_flush ();
}
});
}
void rai::node::backup_wallet ()
{
rai::transaction transaction (store.environment, nullptr, false);
@ -1871,7 +1966,7 @@ void start ()
auto service (i.second);
node->background ([this_l, host, service] ()
{
auto connection (std::make_shared <work_request> (this_l->node->network.service, host, service));
auto connection (std::make_shared <work_request> (this_l->node->service, host, service));
connection->socket.async_connect (rai::tcp_endpoint (host, service), [this_l, connection] (boost::system::error_code const & ec)
{
if (!ec)
@ -1961,9 +2056,8 @@ void stop ()
request.target ("/");
request.version = 11;
request.body = request_string;
//boost::beast::http::prepare (request);
request.prepare_payload();
auto socket (std::make_shared <boost::asio::ip::tcp::socket> (this_l->node->network.service));
auto socket (std::make_shared <boost::asio::ip::tcp::socket> (this_l->node->service));
boost::beast::http::async_write (*socket, request, [socket] (boost::system::error_code const & ec)
{
});
@ -2149,6 +2243,24 @@ rai::endpoint rai::network::endpoint ()
return rai::endpoint (boost::asio::ip::address_v6::loopback (), port);
}
void rai::block_arrival::add (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto now (std::chrono::system_clock::now ());
arrival.insert (rai::block_arrival_info {now, hash_a});
}
bool rai::block_arrival::recent (rai::block_hash const & hash_a)
{
std::lock_guard <std::mutex> lock (mutex);
auto now (std::chrono::system_clock::now ());
while (!arrival.empty () && arrival.begin ()->arrival + std::chrono::seconds (60) < now)
{
arrival.erase (arrival.begin ());
}
return arrival.get <1> ().find (hash_a) != arrival.get <1> ().end ();
}
std::unordered_set <rai::endpoint> rai::peer_container::random_set (size_t count_a)
{
std::unordered_set <rai::endpoint> result;
@ -2520,7 +2632,8 @@ void rai::election::broadcast_winner ()
rai::transaction transaction (node.store.environment, nullptr, true);
compute_rep_votes (transaction);
}
node.network.republish_block (last_winner);
rai::transaction transaction_a (node.store.environment, nullptr, false);
node.network.republish_block (transaction_a, last_winner);
}
rai::uint128_t rai::election::quorum_threshold (MDB_txn * transaction_a, rai::ledger & ledger_a)
@ -2542,14 +2655,6 @@ void rai::election::confirm_once (MDB_txn * transaction_a)
auto tally_l (node.ledger.tally (transaction_a, votes));
assert (tally_l.size () > 0);
auto winner (tally_l.begin ());
if (tally_l.size () > 1)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Vote tally weight %2% for root %1%") % votes.id.to_string () % winner->first.convert_to <std::string> ());
for (auto i (votes.rep_votes.begin ()), n (votes.rep_votes.end ()); i != n; ++i)
{
BOOST_LOG (node.log) << boost::str (boost::format ("%1% %2%") % i->first.to_account () % i->second->hash ().to_string ());
}
}
if (!(*winner->second == *last_winner))
{
if (winner->first > minimum_treshold (transaction_a, node.ledger))
@ -2558,12 +2663,7 @@ void rai::election::confirm_once (MDB_txn * transaction_a)
// Replace our block with the winner and roll back any dependent blocks
node.ledger.rollback (transaction_a, last_winner->hash ());
node.ledger.process (transaction_a, *winner->second);
auto block_l (winner->second);
auto node_l (node.shared ());
node.background ([block_l, node_l] ()
{
node_l->process_receive_many (block_l);
});
node.block_processor.add (winner->second);
last_winner = std::move (winner->second);
}
else
@ -2601,6 +2701,14 @@ void rai::election::confirm_if_quarum (MDB_txn * transaction_a)
void rai::election::confirm_cutoff (MDB_txn * transaction_a)
{
//if (tally_l.size () > 1)
{
BOOST_LOG (node.log) << boost::str (boost::format ("Vote tally weight %2% for root %1%") % votes.id.to_string () % last_winner->root ().to_string ());
for (auto i (votes.rep_votes.begin ()), n (votes.rep_votes.end ()); i != n; ++i)
{
BOOST_LOG (node.log) << boost::str (boost::format ("%1% %2%") % i->first.to_account () % i->second->hash ().to_string ());
}
}
confirm_once (transaction_a);
}
@ -2725,36 +2833,6 @@ int rai::node::store_version ()
return store.version_get (transaction);
}
rai::fan::fan (rai::uint256_union const & key, size_t count_a)
{
std::unique_ptr <rai::uint256_union> first (new rai::uint256_union (key));
for (auto i (1); i < count_a; ++i)
{
std::unique_ptr <rai::uint256_union> entry (new rai::uint256_union);
random_pool.GenerateBlock (entry->bytes.data (), entry->bytes.size ());
*first ^= *entry;
values.push_back (std::move (entry));
}
values.push_back (std::move (first));
}
void rai::fan::value (rai::raw_key & prv_a)
{
prv_a.data.clear ();
for (auto & i: values)
{
prv_a.data ^= *i;
}
}
void rai::fan::value_set (rai::raw_key const & value_a)
{
rai::raw_key value_l;
value (value_l);
*(values [0]) ^= value_l.data;
*(values [0]) ^= value_a.data;
}
rai::thread_runner::thread_runner (boost::asio::io_service & service_a, unsigned service_threads_a)
{
for (auto i (0); i < service_threads_a; ++i)

View file

@ -180,6 +180,7 @@ public:
std::vector <peer_information> representatives (size_t);
// List of all peers
std::vector <rai::endpoint> list ();
std::map <rai::endpoint, unsigned> list_version ();
// A list of random peers with size the square root of total peer count
std::vector <rai::endpoint> list_sqrt ();
// Get the next peer for attempting bootstrap
@ -266,17 +267,41 @@ public:
std::atomic <uint64_t> confirm_req;
std::atomic <uint64_t> confirm_ack;
};
class block_arrival_info
{
public:
std::chrono::system_clock::time_point arrival;
rai::block_hash hash;
};
// This class tracks blocks that are probably live because they arrived in a UDP packet
// This gives a fairly reliable way to differentiate between blocks being inserted via bootstrap or new, live blocks.
class block_arrival
{
public:
void add (rai::block_hash const &);
bool recent (rai::block_hash const &);
boost::multi_index_container
<
rai::block_arrival_info,
boost::multi_index::indexed_by
<
boost::multi_index::ordered_non_unique <boost::multi_index::member <rai::block_arrival_info, std::chrono::system_clock::time_point, &rai::block_arrival_info::arrival>>,
boost::multi_index::hashed_unique <boost::multi_index::member <rai::block_arrival_info, rai::block_hash, &rai::block_arrival_info::hash>>
>
> arrival;
std::mutex mutex;
};
class network
{
public:
network (boost::asio::io_service &, uint16_t, rai::node &);
network (rai::node &, uint16_t);
void receive ();
void stop ();
void receive_action (boost::system::error_code const &, size_t);
void rpc_action (boost::system::error_code const &, size_t);
void rebroadcast_reps (std::shared_ptr <rai::block>);
void republish_vote (std::chrono::system_clock::time_point const &, rai::vote const &);
void republish_block (std::shared_ptr <rai::block>);
void republish_block (MDB_txn *, std::shared_ptr <rai::block>);
void republish (rai::block_hash const &, std::shared_ptr <std::vector <uint8_t>>, rai::endpoint);
void publish_broadcast (std::vector <rai::peer_information> &, std::unique_ptr <rai::block>);
void confirm_send (rai::confirm_ack const &, std::shared_ptr <std::vector <uint8_t>>, rai::endpoint const &);
@ -290,7 +315,6 @@ public:
std::array <uint8_t, 512> buffer;
boost::asio::ip::udp::socket socket;
std::mutex socket_mutex;
boost::asio::io_service & service;
boost::asio::ip::udp::resolver resolver;
rai::node & node;
uint64_t bad_sender_count;
@ -382,7 +406,7 @@ public:
class node_observers
{
public:
rai::observer_set <rai::block const &, rai::account const &, rai::amount const &> blocks;
rai::observer_set <std::shared_ptr <rai::block>, rai::account const &, rai::amount const &> blocks;
rai::observer_set <rai::account const &, bool> wallet;
rai::observer_set <rai::vote const &, rai::endpoint const &> vote;
rai::observer_set <rai::endpoint const &> endpoint;
@ -406,6 +430,28 @@ public:
std::mutex mutex;
std::unordered_set <rai::block_hash> active;
};
// Processing blocks is a potentially long IO operation
// This class isolates block insertion from other operations like servicing network operations
class block_processor
{
public:
block_processor (rai::node &);
~block_processor ();
void stop ();
void flush ();
void add (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr <rai::block>);
private:
void process_blocks ();
bool stopped;
bool idle;
std::deque <std::pair <std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)>>> blocks;
std::mutex mutex;
std::condition_variable condition;
rai::node & node;
std::thread thread;
};
class node : public std::enable_shared_from_this <rai::node>
{
public:
@ -425,9 +471,7 @@ public:
int store_version ();
void process_confirmed (std::shared_ptr <rai::block>);
void process_message (rai::message &, rai::endpoint const &);
void process_receive_republish (std::shared_ptr <rai::block>);
void process_receive_many (std::shared_ptr <rai::block>, std::function <void (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>)> = [] (MDB_txn *, rai::process_return, std::shared_ptr <rai::block>) {});
rai::process_return process_receive_one (MDB_txn *, std::shared_ptr <rai::block>);
void process_active (std::shared_ptr <rai::block>);
rai::process_return process (rai::block const &);
void keepalive_preconfigured (std::vector <std::string> const &);
rai::block_hash latest (rai::account const &);
@ -439,12 +483,14 @@ public:
void ongoing_keepalive ();
void ongoing_rep_crawl ();
void ongoing_bootstrap ();
void ongoing_vote_flush ();
void backup_wallet ();
int price (rai::uint128_t const &, int);
void generate_work (rai::block &);
uint64_t generate_work (rai::uint256_union const &);
void generate_work (rai::uint256_union const &, std::function <void (uint64_t)>);
void add_initial_peers ();
void add_initial_peers ();
boost::asio::io_service & service;
rai::node_config config;
rai::alarm & alarm;
rai::work_pool & work;
@ -464,6 +510,8 @@ public:
rai::vote_processor vote_processor;
rai::rep_crawler rep_crawler;
unsigned warmed_up;
rai::block_processor block_processor;
rai::block_arrival block_arrival;
static double constexpr price_max = 16.0;
static double constexpr free_cutoff = 1024.0;
static std::chrono::seconds constexpr period = std::chrono::seconds (60);

View file

@ -77,7 +77,7 @@ node (node_a)
acceptor.set_option (boost::asio::ip::tcp::acceptor::reuse_address (true));
acceptor.bind (endpoint);
acceptor.listen ();
node_a.observers.blocks.add ([this] (rai::block const & block_a, rai::account const & account_a, rai::amount const &)
node_a.observers.blocks.add ([this] (std::shared_ptr <rai::block> block_a, rai::account const & account_a, rai::amount const &)
{
observer_action (account_a);
});
@ -1471,14 +1471,12 @@ void rai::rpc_handler::peers ()
{
boost::property_tree::ptree response_l;
boost::property_tree::ptree peers_l;
auto peers_list (node.peers.list());
auto peers_list (node.peers.list_version ());
for (auto i (peers_list.begin ()), n (peers_list.end ()); i != n; ++i)
{
boost::property_tree::ptree entry;
std::stringstream text;
text << *i;
entry.put ("", text.str ());
peers_l.push_back (std::make_pair ("", entry));
text << i->first;
peers_l.push_back (boost::property_tree::ptree::value_type (text.str (), boost::property_tree::ptree (std::to_string (i->second))));
}
response_l.add_child ("peers", peers_l);
response (response_l);
@ -1562,10 +1560,14 @@ void rai::rpc_handler::pending_exists ()
auto block (node.store.block_get (transaction, hash));
if (block != nullptr)
{
auto block_l (static_cast <rai::send_block *> (block.release ()));
auto account (block_l->hashables.destination);
auto block_l (dynamic_cast <rai::send_block *> (block.get ()));
auto exists (false);
if (block_l != nullptr)
{
auto account (block_l->hashables.destination);
exists = node.store.pending_exists (transaction, rai::pending_key (account, hash));
}
boost::property_tree::ptree response_l;
auto exists (node.store.pending_exists (transaction, rai::pending_key (account, hash)));
response_l.put ("exists", exists ? "1" : "0");
response (response_l);
}
@ -1786,7 +1788,7 @@ void rai::rpc_handler::process ()
{
if (!node.work.work_validate (*block))
{
node.process_receive_republish (std::move (block));
node.process_active (std::move (block));
boost::property_tree::ptree response_l;
response (response_l);
}
@ -1982,33 +1984,35 @@ void rai::rpc_handler::representatives ()
void rai::rpc_handler::republish ()
{
uint64_t count (2048U);
uint64_t count (1024U);
uint64_t sources (0);
try
uint64_t destinations (0);
boost::optional <std::string> count_text (request.get_optional <std::string> ("count"));
if (count_text.is_initialized ())
{
std::string count_text (request.get <std::string> ("count"));
auto error (decode_unsigned (count_text, count));
auto error (decode_unsigned (count_text.get (), count));
if (error)
{
error_response (response, "Invalid count");
error_response (response, "Invalid count limit");
}
}
catch (std::runtime_error &)
boost::optional <std::string> sources_text (request.get_optional <std::string> ("sources"));
if (sources_text.is_initialized ())
{
// If there is no "count" in request
}
try
{
std::string sources_text (request.get <std::string> ("sources"));
auto error (decode_unsigned (sources_text, sources));
if (error)
auto sources_error (decode_unsigned (sources_text.get (), sources));
if (sources_error)
{
error_response (response, "Invalid sources number");
}
}
catch (std::runtime_error &)
boost::optional <std::string> destinations_text (request.get_optional <std::string> ("destinations"));
if (destinations_text.is_initialized ())
{
// If there is no "sources" in request
auto destinations_error (decode_unsigned (destinations_text.get (), destinations));
if (destinations_error)
{
error_response (response, "Invalid destinations number");
}
}
std::string hash_text (request.get <std::string> ("hash"));
rai::uint256_union hash;
@ -2026,29 +2030,66 @@ void rai::rpc_handler::republish ()
block = node.store.block_get (transaction, hash);
if (sources != 0) // Republish source chain
{
std::unique_ptr <rai::block> block_a;
rai::block_hash source (block->source ());
std::unique_ptr <rai::block> block_a (node.store.block_get (transaction, source));
std::vector <rai::block_hash> hashes;
while (!source.is_zero () && hashes.size () < sources)
while (block_a != nullptr && hashes.size () < sources)
{
hashes.push_back (source);
block_a = node.store.block_get (transaction, source);
source = block_a->previous ();
block_a = node.store.block_get (transaction, source);
}
std::reverse (hashes.begin (), hashes.end ());
std::reverse (hashes.begin (), hashes.end ());
for (auto & hash_l : hashes)
{
block_a = node.store.block_get (transaction, hash_l);
node.network.republish_block (std::move (block_a));
node.network.republish_block (transaction, std::move (block_a));
boost::property_tree::ptree entry_l;
entry_l.put ("", hash_l.to_string ());
blocks.push_back (std::make_pair ("", entry_l));
}
}
node.network.republish_block (std::move (block)); // Republish block
node.network.republish_block (transaction, std::move (block)); // Republish block
boost::property_tree::ptree entry;
entry.put ("", hash.to_string ());
blocks.push_back (std::make_pair ("", entry));
if (destinations != 0) // Republish destination chain
{
auto block_b (node.store.block_get (transaction, hash));
auto block_s (dynamic_cast <rai::send_block *> (block_b.get ()));
if (block_s != nullptr)
{
auto destination (block_s->hashables.destination);
auto exists (node.store.pending_exists (transaction, rai::pending_key (destination, hash)));
if (!exists)
{
rai::block_hash previous (node.ledger.latest (transaction, destination));
std::unique_ptr <rai::block> block_d (node.store.block_get (transaction, previous));
rai::block_hash source;
std::vector <rai::block_hash> hashes;
while (block_d != nullptr && hash != source)
{
hashes.push_back (previous);
source = block_d->source ();
previous = block_d->previous ();
block_d = node.store.block_get (transaction, previous);
}
std::reverse (hashes.begin (), hashes.end ());
if (hashes.size () > destinations)
{
hashes.resize(destinations);
}
for (auto & hash_l : hashes)
{
block_d = node.store.block_get (transaction, hash_l);
node.network.republish_block (transaction, std::move (block_d));
boost::property_tree::ptree entry_l;
entry_l.put ("", hash_l.to_string ());
blocks.push_back (std::make_pair ("", entry_l));
}
}
}
}
hash = node.store.block_successor (transaction, hash);
}
response_l.put ("success", ""); // obsolete
@ -2332,7 +2373,7 @@ void rai::rpc_handler::version ()
boost::property_tree::ptree response_l;
response_l.put ("rpc_version", "1");
response_l.put ("store_version", std::to_string (node.store_version ()));
response_l.put ("node_vendor", boost::str (boost::format ("RaiBlocks %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH));
response_l.put ("node_vendor", boost::str (boost::format ("RaiBlocks %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR));
response (response_l);
}
@ -2884,7 +2925,7 @@ void rai::rpc_handler::wallet_republish ()
for (auto & hash : hashes)
{
block = node.store.block_get (transaction, hash);
node.network.republish_block (std::move (block));;
node.network.republish_block (transaction, std::move (block));;
boost::property_tree::ptree entry;
entry.put ("", hash.to_string ());
blocks.push_back (std::make_pair ("", entry));
@ -3230,7 +3271,7 @@ void rai::rpc_handler::work_peers_clear ()
rai::rpc_connection::rpc_connection (rai::node & node_a, rai::rpc & rpc_a) :
node (node_a.shared ()),
rpc (rpc_a),
socket (node_a.network.service)
socket (node_a.service)
{
}

View file

@ -342,8 +342,7 @@ bool rai::wallet_store::rekey (MDB_txn * transaction_a, std::string const & pass
wallet_key (wallet_key_l, transaction_a);
rai::raw_key password_l;
password.value (password_l);
(*password.values [0]) ^= password_l.data;
(*password.values [0]) ^= password_new.data;
password.value_set (password_new);
rai::uint256_union encrypted;
encrypted.encrypt (wallet_key_l, password_new, salt (transaction_a).owords [0]);
entry_put_raw (transaction_a, rai::wallet_store::wallet_key_special, rai::wallet_value (encrypted));
@ -361,6 +360,44 @@ void rai::wallet_store::derive_key (rai::raw_key & prv_a, MDB_txn * transaction_
kdf.phs (prv_a, password_a, salt_l);
}
rai::fan::fan (rai::uint256_union const & key, size_t count_a)
{
std::unique_ptr <rai::uint256_union> first (new rai::uint256_union (key));
for (auto i (1); i < count_a; ++i)
{
std::unique_ptr <rai::uint256_union> entry (new rai::uint256_union);
random_pool.GenerateBlock (entry->bytes.data (), entry->bytes.size ());
*first ^= *entry;
values.push_back (std::move (entry));
}
values.push_back (std::move (first));
}
void rai::fan::value (rai::raw_key & prv_a)
{
std::lock_guard <std::mutex> lock (mutex);
value_get (prv_a);
}
void rai::fan::value_get (rai::raw_key & prv_a)
{
assert (!mutex.try_lock ());
prv_a.data.clear ();
for (auto & i: values)
{
prv_a.data ^= *i;
}
}
void rai::fan::value_set (rai::raw_key const & value_a)
{
std::lock_guard <std::mutex> lock (mutex);
rai::raw_key value_l;
value_get (value_l);
*(values [0]) ^= value_l.data;
*(values [0]) ^= value_a.data;
}
rai::wallet_value::wallet_value (MDB_val const & val_a)
{
assert (val_a.mv_size == sizeof (*this));
@ -998,7 +1035,7 @@ std::shared_ptr <rai::block> rai::wallet::receive_action (rai::send_block const
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
auto source (send_a.hashables.destination);
@ -1036,7 +1073,7 @@ std::shared_ptr <rai::block> rai::wallet::change_action (rai::account const & so
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash]
@ -1077,7 +1114,7 @@ std::shared_ptr <rai::block> rai::wallet::send_action (rai::account const & sour
if (block != nullptr)
{
assert (block != nullptr);
node.process_receive_republish (block);
node.process_active (block);
auto hash (block->hash ());
auto this_l (shared_from_this ());
node.wallets.queue_wallet_action (source_a, rai::wallets::generate_priority, [this_l, source_a, hash]

View file

@ -49,6 +49,9 @@ public:
void value (rai::raw_key &);
void value_set (rai::raw_key const &);
std::vector <std::unique_ptr <rai::uint256_union>> values;
private:
std::mutex mutex;
void value_get (rai::raw_key &);
};
class wallet_value
{

View file

@ -68,7 +68,7 @@ balance_layout (new QHBoxLayout),
balance_label (new QLabel),
wallet (wallet_a)
{
version = new QLabel (boost::str (boost::format ("Version %1%.%2%.%3%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR % RAIBLOCKS_VERSION_PATCH).c_str ());
version = new QLabel (boost::str (boost::format ("Version %1%.%2%") % RAIBLOCKS_VERSION_MAJOR % RAIBLOCKS_VERSION_MINOR).c_str ());
self_layout->addWidget (your_account_label);
self_layout->addStretch ();
self_layout->addWidget (version);
@ -629,7 +629,7 @@ void rai_qt::block_viewer::rebroadcast_action (rai::uint256_union const & hash_a
auto block (wallet.node.store.block_get (transaction, hash_a));
if (block != nullptr)
{
wallet.node.network.republish_block (std::move (block));
wallet.node.network.republish_block (transaction, std::move (block));
auto successor (wallet.node.store.block_successor (transaction, hash_a));
if (!successor.is_zero ())
{
@ -790,7 +790,7 @@ std::string rai_qt::status::color ()
result = "color: blue";
break;
case rai_qt::status_types::synchronizing:
result = "color: red";
result = "color: blue";
break;
case rai_qt::status_types::locked:
result = "color: orange";
@ -996,7 +996,7 @@ void rai_qt::wallet::start ()
this_l->push_main_stack (this_l->send_blocks_window);
}
});
node.observers.blocks.add ([this_w] (rai::block const &, rai::account const & account_a, rai::amount const &)
node.observers.blocks.add ([this_w] (std::shared_ptr <rai::block>, rai::account const & account_a, rai::amount const &)
{
if (auto this_l = this_w.lock ())
{
@ -1445,8 +1445,8 @@ ledger_refresh (new QPushButton ("Refresh")),
ledger_back (new QPushButton ("Back")),
peers_window (new QWidget),
peers_layout (new QVBoxLayout),
peers_model (new QStringListModel),
peers_view (new QListView),
peers_model (new QStandardItemModel),
peers_view (new QTableView),
bootstrap_label (new QLabel ("IPV6:port \"::ffff:192.168.0.1:7075\"")),
bootstrap_line (new QLineEdit),
peers_bootstrap (new QPushButton ("Initiate Bootstrap")),
@ -1478,8 +1478,13 @@ wallet (wallet_a)
ledger_layout->setContentsMargins (0, 0, 0, 0);
ledger_window->setLayout (ledger_layout);
peers_model->setHorizontalHeaderItem (0, new QStandardItem ("IPv6 address:port"));
peers_model->setHorizontalHeaderItem (1, new QStandardItem ("Net version"));
peers_view->setEditTriggers (QAbstractItemView::NoEditTriggers);
peers_view->verticalHeader ()->hide ();
peers_view->setModel (peers_model);
peers_view->setColumnWidth(0,220);
peers_view->setSortingEnabled(true);
peers_layout->addWidget (peers_view);
peers_layout->addWidget (bootstrap_label);
peers_layout->addWidget (bootstrap_line);
@ -1598,7 +1603,6 @@ wallet (wallet_a)
{
this->wallet.push_main_stack (this->wallet.account_viewer.window);
});
refresh_ledger ();
bootstrap->setToolTip ("Multi-connection bootstrap to random peers");
search_for_receivables->setToolTip ("Search for pending blocks");
create_block->setToolTip ("Create block in JSON format");
@ -1607,22 +1611,20 @@ wallet (wallet_a)
void rai_qt::advanced_actions::refresh_peers ()
{
auto list (wallet.node.peers.list ());
std::sort (list.begin (), list.end (), [] (rai::endpoint const & lhs, rai::endpoint const & rhs)
peers_model->removeRows (0, peers_model->rowCount ());
auto list (wallet.node.peers.list_version ());
for (auto i (list.begin ()), n (list.end ()); i != n; ++i)
{
return lhs < rhs;
});
QStringList peers;
for (auto i: list)
{
std::stringstream endpoint;
endpoint << i.address ().to_string ();
endpoint << ':';
endpoint << i.port ();
QString qendpoint (endpoint.str().c_str ());
peers << qendpoint;
}
peers_model->setStringList (peers);
std::stringstream endpoint;
endpoint << i->first.address ().to_string ();
endpoint << ':';
endpoint << i->first.port ();
QString qendpoint (endpoint.str().c_str ());
QList <QStandardItem *> items;
items.push_back (new QStandardItem (qendpoint));
items.push_back (new QStandardItem (QString (std::to_string (i->second).c_str ())));
peers_model->appendRow (items);
}
}
void rai_qt::advanced_actions::refresh_ledger ()
@ -1671,7 +1673,7 @@ wallet (wallet_a)
{
show_label_ok (*status);
this->status->setText ("");
this->wallet.node.process_receive_republish (std::move (block_l));
this->wallet.node.process_active (std::move (block_l));
}
else
{

View file

@ -81,8 +81,8 @@ namespace rai_qt {
QWidget * peers_window;
QVBoxLayout * peers_layout;
QStringListModel * peers_model;
QListView * peers_view;
QStandardItemModel * peers_model;
QTableView * peers_view;
QLabel * bootstrap_label;
QLineEdit * bootstrap_line;
QPushButton * peers_bootstrap;
@ -262,9 +262,9 @@ namespace rai_qt {
disconnected,
working,
locked,
synchronizing,
vulnerable,
active,
synchronizing,
nominal
};
class status

View file

@ -205,78 +205,74 @@ int run_wallet (QApplication & application, int argc, char * const * argv, boost
auto error (rai::fetch_object (config, config_path, config_file));
config_file.close ();
if (!error)
{
{
boost::asio::io_service service;
config.node.logging.init (data_path);
std::shared_ptr <rai::node> node;
std::shared_ptr <rai_qt::wallet> gui;
rai::set_application_icon (application);
std::thread node_thread ([&] ()
rai::work_pool work (config.node.work_threads, rai::opencl_work::create (config.opencl_enable, config.opencl, config.node.logging));
rai::alarm alarm (service);
rai::node_init init;
node = std::make_shared <rai::node> (init, service, data_path, alarm, config.node, work);
if (!init.error ())
{
boost::asio::io_service service;
rai::work_pool work (config.node.work_threads, rai::opencl_work::create (config.opencl_enable, config.opencl, config.node.logging));
rai::alarm alarm (service);
rai::node_init init;
node = std::make_shared <rai::node> (init, service, data_path, alarm, config.node, work);
if (!init.error ())
auto wallet (node->wallets.open (config.wallet));
if (wallet == nullptr)
{
auto wallet (node->wallets.open (config.wallet));
if (wallet == nullptr)
auto existing (node->wallets.items.begin ());
if (existing != node->wallets.items.end ())
{
auto existing (node->wallets.items.begin ());
if (existing != node->wallets.items.end ())
{
wallet = existing->second;
config.wallet = existing->first;
}
else
{
wallet = node->wallets.create (config.wallet);
}
wallet = existing->second;
config.wallet = existing->first;
}
if (config.account.is_zero () || !wallet->exists (config.account))
else
{
rai::transaction transaction (wallet->store.environment, nullptr, true);
auto existing (wallet->store.begin (transaction));
if (existing != wallet->store.end ())
{
rai::uint256_union account (existing->first);
config.account = account;
}
else
{
config.account = wallet->deterministic_insert (transaction);
}
wallet = node->wallets.create (config.wallet);
}
assert (wallet->exists (config.account));
update_config (config, config_path, config_file);
node->start ();
rai::rpc rpc (service, *node, config.rpc);
if (config.rpc_enable)
{
rpc.start ();
}
rai::thread_runner runner (service, node->config.io_threads);
QObject::connect (&application, &QApplication::aboutToQuit, [&] ()
{
rpc.stop ();
node->stop ();
});
application.postEvent (&processor, new rai_qt::eventloop_event ([&] ()
{
gui = std::make_shared <rai_qt::wallet> (application, processor, *node, wallet, config.account);
splash->close();
gui->start ();
gui->client_window->show ();
}));
runner.join ();
}
else
if (config.account.is_zero () || !wallet->exists (config.account))
{
show_error ("Error initializing node");
rai::transaction transaction (wallet->store.environment, nullptr, true);
auto existing (wallet->store.begin (transaction));
if (existing != wallet->store.end ())
{
rai::uint256_union account (existing->first);
config.account = account;
}
else
{
config.account = wallet->deterministic_insert (transaction);
}
}
});
result = application.exec ();
node_thread.join ();
assert (wallet->exists (config.account));
update_config (config, config_path, config_file);
node->start ();
rai::rpc rpc (service, *node, config.rpc);
if (config.rpc_enable)
{
rpc.start ();
}
rai::thread_runner runner (service, node->config.io_threads);
QObject::connect (&application, &QApplication::aboutToQuit, [&] ()
{
rpc.stop ();
node->stop ();
});
application.postEvent (&processor, new rai_qt::eventloop_event ([&] ()
{
gui = std::make_shared <rai_qt::wallet> (application, processor, *node, wallet, config.account);
splash->close();
gui->start ();
gui->client_window->show ();
}));
result = application.exec ();
runner.join ();
}
else
{
show_error ("Error initializing node");
}
update_config (config, config_path, config_file);
}
else

55
rai/secure.cpp Normal file → Executable file
View file

@ -61,9 +61,10 @@ genesis_account (rai::rai_network == rai::rai_networks::rai_test_network ? rai_t
genesis_block (rai::rai_network == rai::rai_networks::rai_test_network ? rai_test_genesis : rai::rai_network == rai::rai_networks::rai_beta_network ? rai_beta_genesis : rai_live_genesis),
genesis_amount (std::numeric_limits <rai::uint128_t>::max ())
{
CryptoPP::AutoSeededRandomPool random_pool;
// Randomly generating these mean no two nodes will ever have the same sentinal values which protects against some insecure algorithms
rai::random_pool.GenerateBlock (not_a_block.bytes.data (), not_a_block.bytes.size ());
rai::random_pool.GenerateBlock (not_an_account.bytes.data (), not_an_account.bytes.size ());
random_pool.GenerateBlock (not_a_block.bytes.data (), not_a_block.bytes.size ());
random_pool.GenerateBlock (not_an_account.bytes.data (), not_an_account.bytes.size ());
}
rai::keypair zero_key;
rai::keypair test_genesis_key;
@ -76,7 +77,6 @@ std::string rai_live_genesis;
rai::account genesis_account;
std::string genesis_block;
rai::uint128_t genesis_amount;
CryptoPP::AutoSeededRandomPool random_pool;
rai::block_hash not_a_block;
rai::account not_an_account;
};
@ -100,7 +100,6 @@ std::string const & rai::rai_live_genesis (globals.rai_live_genesis);
rai::account const & rai::genesis_account (globals.genesis_account);
std::string const & rai::genesis_block (globals.genesis_block);
rai::uint128_t const & rai::genesis_amount (globals.genesis_amount);
CryptoPP::AutoSeededRandomPool & rai::random_pool (globals.random_pool);
rai::block_hash const & rai::not_a_block (globals.not_a_block);
rai::block_hash const & rai::not_an_account (globals.not_an_account);
@ -1508,7 +1507,6 @@ size_t rai::block_counts::sum ()
}
rai::block_store::block_store (bool & error_a, boost::filesystem::path const & path_a) :
sequence_cache_count (0),
environment (error_a, path_a),
frontiers (0),
accounts (0),
@ -2484,17 +2482,36 @@ void rai::block_store::checksum_del (MDB_txn * transaction_a, uint64_t prefix, u
void rai::block_store::sequence_flush (MDB_txn * transaction_a)
{
for (auto i (sequence_cache.begin ()), n (sequence_cache.end ()); i != n; ++i)
std::unordered_map <rai::account, uint64_t> sequence_cache_l;
{
std::lock_guard <std::mutex> lock (sequence_mutex);
sequence_cache_l.swap (sequence_cache);
}
for (auto i (sequence_cache_l.begin ()), n (sequence_cache_l.end ()); i != n; ++i)
{
auto status1 (mdb_put (transaction_a, sequence, i->first.val (), rai::mdb_val (sizeof (i->second), &i->second), 0));
assert (status1 == 0);
}
sequence_cache_count = 0;
sequence_cache.clear ();
}
uint64_t rai::block_store::sequence_get (MDB_txn * transaction_a, rai::account const & account_a)
{
uint64_t result (0);
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
}
return result;
}
uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::account const & account_a)
{
assert (!sequence_mutex.try_lock ());
uint64_t result (0);
auto existing (sequence_cache.find (account_a));
if (existing != sequence_cache.end ())
@ -2503,44 +2520,28 @@ uint64_t rai::block_store::sequence_current (MDB_txn * transaction_a, rai::accou
}
else
{
MDB_val value;
auto status (mdb_get (transaction_a, sequence, account_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
if (status == 0)
{
rai::bufferstream stream (reinterpret_cast <uint8_t const *> (value.mv_data), value.mv_size);
auto error (rai::read (stream, result));
assert (!error);
}
result = sequence_get (transaction_a, account_a);
}
return result;
}
uint64_t rai::block_store::sequence_atomic_inc (MDB_txn * transaction_a, rai::account const & account_a)
{
std::lock_guard <std::mutex> lock (sequence_mutex);
auto result (sequence_current (transaction_a, account_a));
result += 1;
sequence_cache [account_a] = result;
++sequence_cache_count;
if (sequence_cache_count > sequence_cache_max)
{
sequence_flush (transaction_a);
}
return result;
}
uint64_t rai::block_store::sequence_atomic_observe (MDB_txn * transaction_a, rai::account const & account_a, uint64_t sequence_a)
{
std::lock_guard <std::mutex> lock (sequence_mutex);
auto current (sequence_current (transaction_a, account_a));
auto result (std::max (current, sequence_a));
if (sequence_a > current)
{
sequence_cache [account_a] = sequence_a;
++sequence_cache_count;
if (sequence_cache_count > sequence_cache_max)
{
sequence_flush (transaction_a);
}
}
return result;
}

View file

@ -388,19 +388,13 @@ public:
bool checksum_get (MDB_txn *, uint64_t, uint8_t, rai::checksum &);
void checksum_del (MDB_txn *, uint64_t, uint8_t);
uint64_t sequence_get (MDB_txn *, rai::account const &);
uint64_t sequence_atomic_inc (MDB_txn *, rai::account const &);
uint64_t sequence_atomic_observe (MDB_txn *, rai::account const &, uint64_t);
uint64_t sequence_current (MDB_txn *, rai::account const &);
void sequence_flush (MDB_txn *);
std::mutex sequence_mutex;
std::unordered_map <rai::account, uint64_t> sequence_cache;
size_t sequence_cache_count;
static size_t const sequence_cache_max = 256;
// IO per sequence_atomic_max profiled with store.vote_load
// 1 - 1,900,000
// 16 - 235,000
// 128 - 27,000
// 256 - 14,000
// 1024 - 3,200
void version_put (MDB_txn *, int);
int version_get (MDB_txn *);

View file

@ -193,7 +193,8 @@ TEST (node, fork_storm)
system.nodes [i]->generate_work (*open);
auto open_result (system.nodes [i]->process (*open));
ASSERT_EQ (rai::process_result::progress, open_result.code);
system.nodes [i]->network.republish_block (open);
rai::transaction transaction (system.nodes [i]->store.environment, nullptr, false);
system.nodes [i]->network.republish_block (transaction, open);
}
}
auto again (true);

View file

@ -7,6 +7,8 @@
#include <lmdb/libraries/liblmdb/lmdb.h>
thread_local CryptoPP::AutoSeededRandomPool rai::random_pool;
boost::filesystem::path rai::unique_path ()
{
auto result (working_path () / boost::filesystem::unique_path ());

4
rai/utility.hpp Normal file → Executable file
View file

@ -21,7 +21,9 @@
namespace rai
{
extern CryptoPP::AutoSeededRandomPool & random_pool;
// Random pool used by RaiBlocks.
// This must be thread_local as long as the AutoSeededRandomPool implementation requires it
extern thread_local CryptoPP::AutoSeededRandomPool random_pool;
// We operate on streams of uint8_t by convention
using stream = std::basic_streambuf <uint8_t>;
using bufferstream = boost::iostreams::stream_buffer <boost::iostreams::basic_array_source <uint8_t>>;