Moving block callback post-confirmation.

This commit is contained in:
clemahieu 2018-04-26 01:49:54 +02:00
commit 8131a608f9
5 changed files with 64 additions and 43 deletions

View file

@ -1012,7 +1012,7 @@ TEST (node, coherent_observer)
{
rai::system system (24000, 1);
auto & node1 (*system.nodes[0]);
node1.observers.blocks.add ([&node1](std::shared_ptr<rai::block> block_a, rai::process_return const &) {
node1.observers.blocks.add ([&node1](std::shared_ptr<rai::block> block_a, rai::account const &, rai::uint128_t const &, bool) {
rai::transaction transaction (node1.store.environment, nullptr, false);
ASSERT_TRUE (node1.store.block_exists (transaction, block_a->hash ()));
});

View file

@ -1273,7 +1273,10 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
{
case rai::process_result::progress:
{
progress.push_back (std::make_pair (block, process_result));
if (node.block_arrival.recent (hash))
{
node.active.start (transaction, block);
}
}
case rai::process_result::old:
{
@ -1294,18 +1297,6 @@ void rai::block_processor::process_receive_many (std::unique_lock<std::mutex> &
}
}
lock_a.unlock ();
for (auto & i : progress)
{
node.observers.blocks (i.first, i.second);
if (i.second.amount > 0)
{
node.observers.account_balance (i.second.account, false);
if (!i.second.pending_account.is_zero ())
{
node.observers.account_balance (i.second.pending_account, true);
}
}
}
}
rai::process_return rai::block_processor::process_receive_one (MDB_txn * transaction_a, std::shared_ptr<rai::block> block_a)
@ -1476,30 +1467,23 @@ online_reps (*this)
peers.disconnect_observer = [this]() {
observers.disconnect ();
};
observers.blocks.add ([this](std::shared_ptr<rai::block> block_a, rai::process_return const & result_a) {
if (this->block_arrival.recent (block_a->hash ()))
{
rai::transaction transaction (store.environment, nullptr, false);
active.start (transaction, block_a);
}
});
observers.blocks.add ([this](std::shared_ptr<rai::block> block_a, rai::process_return const & result_a) {
observers.blocks.add ([this](std::shared_ptr<rai::block> block_a, rai::account const & account_a, rai::amount const & amount_a, bool is_state_send_a) {
if (this->block_arrival.recent (block_a->hash ()))
{
auto node_l (shared_from_this ());
background ([node_l, block_a, result_a]() {
background ([node_l, block_a, account_a, amount_a, is_state_send_a]() {
if (!node_l->config.callback_address.empty ())
{
boost::property_tree::ptree event;
event.add ("account", result_a.account.to_account ());
event.add ("account", account_a.to_account ());
event.add ("hash", block_a->hash ().to_string ());
std::string block_text;
block_a->serialize_json (block_text);
event.add ("block", block_text);
event.add ("amount", result_a.amount.to_string_dec ());
if (result_a.state_is_send)
event.add ("amount", amount_a.to_string_dec ());
if (is_state_send_a)
{
event.add ("is_send", *result_a.state_is_send);
event.add ("is_send", is_state_send_a);
}
std::stringstream ostream;
boost::property_tree::write_json (ostream, event);
@ -2406,10 +2390,11 @@ namespace
class confirmed_visitor : public rai::block_visitor
{
public:
confirmed_visitor (MDB_txn * transaction_a, rai::node & node_a, std::shared_ptr<rai::block> block_a) :
confirmed_visitor (MDB_txn * transaction_a, rai::node & node_a, std::shared_ptr<rai::block> block_a, rai::block_hash const & hash_a) :
transaction (transaction_a),
node (node_a),
block (block_a)
block (block_a),
hash (hash_a)
{
}
virtual ~confirmed_visitor () = default;
@ -2423,7 +2408,7 @@ public:
rai::account representative;
rai::pending_info pending;
representative = wallet->store.representative (transaction);
auto error (node.store.pending_get (transaction, rai::pending_key (account_a, block->hash ()), pending));
auto error (node.store.pending_get (transaction, rai::pending_key (account_a, hash), pending));
if (!error)
{
auto node_l (node.shared ());
@ -2432,9 +2417,9 @@ public:
}
else
{
if (!node.store.block_exists (transaction, block->hash ()))
if (!node.store.block_exists (transaction, hash))
{
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% has already been received") % block->hash ().to_string ());
BOOST_LOG (node.log) << boost::str (boost::format ("Block %1% has already been received") % hash.to_string ());
}
else
{
@ -2464,14 +2449,42 @@ public:
MDB_txn * transaction;
rai::node & node;
std::shared_ptr<rai::block> block;
rai::block_hash const &hash;
};
}
void rai::node::process_confirmed (std::shared_ptr<rai::block> block_a)
{
rai::transaction transaction (store.environment, nullptr, false);
confirmed_visitor visitor (transaction, *this, block_a);
block_a->visit (visitor);
auto hash (block_a->hash ());
if (store.block_exists (transaction, hash))
{
confirmed_visitor visitor (transaction, *this, block_a, hash);
block_a->visit (visitor);
auto account (ledger.account (transaction, hash));
auto amount (ledger.amount (transaction, hash));
bool is_state_send (false);
rai::account pending_account (0);
if (auto state = dynamic_cast <rai::state_block *> (block_a.get ()))
{
rai::transaction transaction (store.environment, nullptr, false);
is_state_send = ledger.is_send (transaction, *state);
pending_account = state->hashables.link;
}
if (auto send = dynamic_cast <rai::send_block *> (block_a.get ()))
{
pending_account = send->hashables.destination;
}
observers.blocks (block_a, account, amount, is_state_send);
if (amount > 0)
{
observers.account_balance (account, false);
if (!pending_account.is_zero ())
{
observers.account_balance (pending_account, true);
}
}
}
}
void rai::node::process_message (rai::message & message_a, rai::endpoint const & sender_a)

View file

@ -458,7 +458,7 @@ public:
class node_observers
{
public:
rai::observer_set<std::shared_ptr<rai::block>, rai::process_return const &> blocks;
rai::observer_set<std::shared_ptr<rai::block>, rai::account const &, rai::uint128_t const &, bool> blocks;
rai::observer_set<bool> wallet;
rai::observer_set<std::shared_ptr<rai::vote>, rai::endpoint const &> vote;
rai::observer_set<rai::account const &, bool> account_balance;

View file

@ -141,8 +141,8 @@ void rai::rpc::start ()
}
acceptor.listen ();
node.observers.blocks.add ([this](std::shared_ptr<rai::block> block_a, rai::process_return const & result_a) {
observer_action (result_a.account);
node.observers.blocks.add ([this](std::shared_ptr<rai::block> block_a, rai::account const & account_a, rai::uint128_t const &, bool) {
observer_action (account_a);
});
accept ();
@ -2584,7 +2584,16 @@ void rai::rpc_handler::process ()
{
case rai::process_result::progress:
{
node.observers.blocks (block_a, result);
rai::transaction transaction (node.store.environment, nullptr, false);
auto account (node.ledger.account (transaction, hash));
auto amount (node.ledger.amount (transaction, hash));
bool is_state_send (false);
if (auto state = dynamic_cast <rai::state_block *> (block_a.get ()))
{
rai::transaction transaction (node.store.environment, nullptr, false);
is_state_send = node.ledger.is_send (transaction, *state);
}
node.observers.blocks (block_a, account, amount, is_state_send);
boost::property_tree::ptree response_l;
response_l.put ("hash", hash.to_string ());
response (response_l);

View file

@ -1099,18 +1099,17 @@ void rai_qt::wallet::start ()
this_l->push_main_stack (this_l->send_blocks_window);
}
});
node.observers.blocks.add ([this_w](std::shared_ptr<rai::block> block_a, rai::process_return const & result_a) {
node.observers.blocks.add ([this_w](std::shared_ptr<rai::block> block_a, rai::account const & account_a, rai::uint128_t const & amount_a, bool) {
if (auto this_l = this_w.lock ())
{
auto account (result_a.account);
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, block_a, account]() {
this_l->application.postEvent (&this_l->processor, new eventloop_event ([this_w, block_a, account_a]() {
if (auto this_l = this_w.lock ())
{
if (this_l->wallet_m->exists (account))
if (this_l->wallet_m->exists (account_a))
{
this_l->accounts.refresh ();
}
if (account == this_l->account)
if (account_a == this_l->account)
{
this_l->history.refresh ();
}