Changing pending table to be keyed by account/hash instead of just hash in order to quickly query for pending blocks for an account.

This commit is contained in:
clemahieu 2017-02-06 23:51:36 -06:00
commit 69d69a4b10
9 changed files with 203 additions and 110 deletions

View file

@ -107,16 +107,16 @@ TEST (block_store, add_pending)
rai::block_store store (init, rai::unique_path ());
ASSERT_TRUE (!init);
rai::keypair key1;
rai::block_hash hash1 (0);
rai::pending_key key2 (0, 0);
rai::pending_info pending1;
rai::transaction transaction (store.environment, nullptr, true);
ASSERT_TRUE (store.pending_get (transaction, hash1, pending1));
store.pending_put (transaction, hash1, pending1);
ASSERT_TRUE (store.pending_get (transaction, key2, pending1));
store.pending_put (transaction, key2, pending1);
rai::pending_info pending2;
ASSERT_FALSE (store.pending_get (transaction, hash1, pending2));
ASSERT_FALSE (store.pending_get (transaction, key2, pending2));
ASSERT_EQ (pending1, pending2);
store.pending_del (transaction, hash1);
ASSERT_TRUE (store.pending_get (transaction, hash1, pending2));
store.pending_del (transaction, key2);
ASSERT_TRUE (store.pending_get (transaction, key2, pending2));
}
TEST (block_store, pending_iterator)
@ -126,10 +126,12 @@ TEST (block_store, pending_iterator)
ASSERT_TRUE (!init);
rai::transaction transaction (store.environment, nullptr, true);
ASSERT_EQ (store.pending_end (), store.pending_begin (transaction));
store.pending_put (transaction, 1, {2, 3, 4});
store.pending_put (transaction, rai::pending_key (1, 2), {2, 3, 4});
auto current (store.pending_begin (transaction));
ASSERT_NE (store.pending_end (), current);
ASSERT_EQ (rai::account (1), current->first);
rai::pending_key key1 (current->first);
ASSERT_EQ (rai::account (1), key1.account);
ASSERT_EQ (rai::block_hash (2), key1.hash);
rai::pending_info pending (current->second);
ASSERT_EQ (rai::account (2), pending.source);
ASSERT_EQ (rai::amount (3), pending.amount);
@ -410,11 +412,11 @@ TEST (block_store, pending_exists)
bool init (false);
rai::block_store store (init, rai::unique_path ());
ASSERT_TRUE (!init);
rai::block_hash two (2);
rai::pending_key two (2, 0);
rai::pending_info pending;
rai::transaction transaction (store.environment, nullptr, true);
store.pending_put (transaction, two, pending);
rai::block_hash one (1);
rai::pending_key one (1, 0);
ASSERT_FALSE (store.pending_exists (transaction, one));
}

View file

@ -148,7 +148,7 @@ TEST (ledger, process_send)
rai::account_info info5;
ASSERT_TRUE (ledger.store.account_get (transaction, key2.pub, info5));
rai::pending_info pending1;
ASSERT_FALSE (ledger.store.pending_get (transaction, hash1, pending1));
ASSERT_FALSE (ledger.store.pending_get (transaction, rai::pending_key (key2.pub, hash1), pending1));
ASSERT_EQ (rai::test_genesis_key.pub, pending1.source);
ASSERT_EQ (key2.pub, pending1.destination);
ASSERT_EQ (rai::genesis_amount - 50, pending1.amount.number ());
@ -167,7 +167,7 @@ TEST (ledger, process_send)
ASSERT_FALSE (ledger.store.account_get (transaction, rai::test_genesis_key.pub, info7));
ASSERT_EQ (info1.head, info7.head);
rai::pending_info pending2;
ASSERT_TRUE (ledger.store.pending_get (transaction, hash1, pending2));
ASSERT_TRUE (ledger.store.pending_get (transaction, rai::pending_key (key2.pub, hash1), pending2));
ASSERT_EQ (rai::genesis_amount, ledger.account_balance (transaction, rai::test_genesis_key.pub));
}
@ -220,7 +220,7 @@ TEST (ledger, process_receive)
ASSERT_EQ (rai::genesis_amount - 50, ledger.weight (transaction, key3.pub));
ASSERT_EQ (hash2, ledger.latest (transaction, key2.pub));
rai::pending_info pending1;
ASSERT_FALSE (ledger.store.pending_get (transaction, hash3, pending1));
ASSERT_FALSE (ledger.store.pending_get (transaction, rai::pending_key (key2.pub, hash3), pending1));
ASSERT_EQ (rai::test_genesis_key.pub, pending1.source);
ASSERT_EQ (25, pending1.amount.number ());
}
@ -259,7 +259,7 @@ TEST (ledger, rollback_receiver)
rai::account_info info2;
ASSERT_TRUE (ledger.store.account_get (transaction, key2.pub, info2));
rai::pending_info pending1;
ASSERT_TRUE (ledger.store.pending_get (transaction, info2.head, pending1));
ASSERT_TRUE (ledger.store.pending_get (transaction, rai::pending_key (key2.pub, info2.head), pending1));
}
TEST (ledger, rollback_representation)
@ -1117,7 +1117,7 @@ TEST (ledger, fail_open_account_mismatch)
ASSERT_EQ (rai::process_result::progress, ledger.process (transaction, block1).code);
rai::keypair badkey;
rai::open_block block2 (block1.hash (), 1, badkey.pub, badkey.prv, badkey.pub, 0);
ASSERT_EQ (rai::process_result::account_mismatch, ledger.process (transaction, block2).code);
ASSERT_NE (rai::process_result::progress, ledger.process (transaction, block2).code);
}
TEST (ledger, fail_receive_old)

View file

@ -711,7 +711,7 @@ TEST (node, fork_keep)
auto & node1 (*system.nodes [0]);
auto & node2 (*system.nodes [1]);
ASSERT_EQ (1, node1.peers.size ());
system.wallet (0)->insert_adhoc ( rai::test_genesis_key.prv);
system.wallet (0)->insert_adhoc (rai::test_genesis_key.prv);
rai::keypair key1;
rai::genesis genesis;
std::unique_ptr <rai::send_block> send1 (new rai::send_block (genesis.hash (), key1.pub, rai::genesis_amount - 100, rai::test_genesis_key.prv, rai::test_genesis_key.pub, system.work.generate (genesis.hash ())));
@ -742,7 +742,7 @@ TEST (node, fork_keep)
{
system.poll ();
++iterations;
ASSERT_LT (iterations, 200);
ASSERT_LT (iterations, 2000);
}
rai::transaction transaction (system.nodes [0]->store.environment, nullptr, false);
auto winner (node1.ledger.winner (transaction, votes1->votes));

View file

@ -1640,7 +1640,7 @@ public:
rai::pending_info pending;
rai::transaction transaction (node.store.environment, nullptr, false);
representative = wallet->store.representative (transaction);
auto error (node.store.pending_get (transaction, block_a.hash (), pending));
auto error (node.store.pending_get (transaction, rai::pending_key (block_a.hashables.destination, block_a.hash ()), pending));
if (!error)
{
auto block_l (std::shared_ptr <rai::send_block> (static_cast <rai::send_block *> (block_a.clone ().release ())));

View file

@ -164,7 +164,7 @@ void rai::system::generate_receive (rai::node & node_a)
rai::transaction transaction (node_a.store.environment, nullptr, false);
rai::uint256_union random_block;
random_pool.GenerateBlock (random_block.bytes.data (), sizeof (random_block.bytes));
auto i (node_a.store.pending_begin (transaction, random_block));
auto i (node_a.store.pending_begin (transaction, rai::pending_key (random_block, 0)));
if (i != node_a.store.pending_end ())
{
rai::block_hash send_hash (i->first);

View file

@ -934,7 +934,7 @@ std::unique_ptr <rai::block> rai::wallet::receive_action (rai::send_block const
if (node.config.receive_minimum.number () <= amount_a.number ())
{
rai::transaction transaction (node.ledger.store.environment, nullptr, false);
if (node.ledger.store.pending_exists (transaction, hash))
if (node.ledger.store.pending_exists (transaction, rai::pending_key (send_a.hashables.destination, hash)))
{
rai::raw_key prv;
if (!store.fetch (transaction, send_a.hashables.destination, prv))
@ -1238,7 +1238,8 @@ public:
{
if (wallet->store.valid_password (transaction))
{
auto block_l (wallet->node.store.block_get (transaction, i->first));
rai::pending_key key (i->first);
auto block_l (wallet->node.store.block_get (transaction, key.hash));
assert (dynamic_cast <rai::send_block *> (block_l.get ()) != nullptr);
std::shared_ptr <rai::send_block> block (static_cast <rai::send_block *> (block_l.release ()));
auto wallet_l (wallet);

View file

@ -1579,41 +1579,59 @@ void rai_qt::block_creation::create_receive ()
if (!error)
{
rai::transaction transaction (wallet.node.store.environment, nullptr, false);
rai::pending_info pending;
if (!wallet.node.store.pending_get (transaction, source_l, pending))
{
rai::account_info info;
auto error (wallet.node.store.account_get (transaction, pending.destination, info));
if (!error)
{
rai::raw_key key;
auto error (wallet.wallet_m->store.fetch (transaction, pending.destination, key));
if (!error)
{
rai::receive_block receive (info.head, source_l, key, pending.destination, wallet.wallet_m->work_fetch (transaction, pending.destination, info.head));
std::string block_l;
receive.serialize_json (block_l);
block->setPlainText (QString (block_l.c_str ()));
show_label_ok (*status);
status->setText ("Created block");
}
else
{
auto block_l (wallet.node.store.block_get (transaction, source_l));
if (block_l != nullptr)
{
auto send_block (dynamic_cast <rai::send_block *> (block_l.get ()));
if (send_block != nullptr)
{
rai::pending_info pending;
if (!wallet.node.store.pending_get (transaction, rai::pending_key (send_block->hashables.destination, source_l), pending))
{
rai::account_info info;
auto error (wallet.node.store.account_get (transaction, pending.destination, info));
if (!error)
{
rai::raw_key key;
auto error (wallet.wallet_m->store.fetch (transaction, pending.destination, key));
if (!error)
{
rai::receive_block receive (info.head, source_l, key, pending.destination, wallet.wallet_m->work_fetch (transaction, pending.destination, info.head));
std::string block_l;
receive.serialize_json (block_l);
block->setPlainText (QString (block_l.c_str ()));
show_label_ok (*status);
status->setText ("Created block");
}
else
{
show_label_error (*status);
status->setText ("Account is not in wallet");
}
}
else
{
show_label_error (*status);
status->setText ("Account not yet open");
}
}
else
{
show_label_error (*status);
status->setText ("Account is not in wallet");
}
}
else
{
status->setText ("Source block is not pending to receive");
}
}
else
{
show_label_error (*status);
status->setText ("Account not yet open");
}
}
else
{
status->setText("Source is not a send block");
}
}
else
{
show_label_error (*status);
status->setText ("Source block is not pending to receive");
}
status->setText("Source block not found");
}
}
else
{
@ -1684,41 +1702,59 @@ void rai_qt::block_creation::create_open ()
if (!error)
{
rai::transaction transaction (wallet.node.store.environment, nullptr, false);
rai::pending_info pending;
if (!wallet.node.store.pending_get (transaction, source_l, pending))
{
rai::account_info info;
auto error (wallet.node.store.account_get (transaction, pending.destination, info));
if (error)
{
rai::raw_key key;
auto error (wallet.wallet_m->store.fetch (transaction, pending.destination, key));
if (!error)
{
rai::open_block open (source_l, representative_l, pending.destination, key, pending.destination, wallet.wallet_m->work_fetch (transaction, pending.destination, pending.destination));
std::string block_l;
open.serialize_json (block_l);
block->setPlainText (QString (block_l.c_str ()));
show_label_ok (*status);
status->setText ("Created block");
}
else
{
auto block_l (wallet.node.store.block_get (transaction, source_l));
if (block_l != nullptr)
{
auto send_block (dynamic_cast <rai::send_block *> (block_l.get ()));
if (send_block != nullptr)
{
rai::pending_info pending;
if (!wallet.node.store.pending_get (transaction, rai::pending_key (send_block->hashables.destination, source_l), pending))
{
rai::account_info info;
auto error (wallet.node.store.account_get (transaction, pending.destination, info));
if (error)
{
rai::raw_key key;
auto error (wallet.wallet_m->store.fetch (transaction, pending.destination, key));
if (!error)
{
rai::open_block open (source_l, representative_l, pending.destination, key, pending.destination, wallet.wallet_m->work_fetch (transaction, pending.destination, pending.destination));
std::string block_l;
open.serialize_json (block_l);
block->setPlainText (QString (block_l.c_str ()));
show_label_ok (*status);
status->setText ("Created block");
}
else
{
show_label_error (*status);
status->setText ("Account is not in wallet");
}
}
else
{
show_label_error (*status);
status->setText ("Account already open");
}
}
else
{
show_label_error (*status);
status->setText ("Account is not in wallet");
}
}
else
{
status->setText ("Source block is not pending to receive");
}
}
else
{
show_label_error (*status);
status->setText ("Account already open");
}
}
else
{
status->setText("Source is not a send block");
}
}
else
{
show_label_error (*status);
status->setText ("Source block is not pending to receive");
}
status->setText("Source block not found");
}
}
else
{

View file

@ -1981,7 +1981,7 @@ void rai::block_store::account_put (MDB_txn * transaction_a, rai::account const
assert (status == 0);
}
void rai::block_store::pending_put (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::pending_info const & pending_a)
void rai::block_store::pending_put (MDB_txn * transaction_a, rai::pending_key const & key_a, rai::pending_info const & pending_a)
{
std::vector <uint8_t> vector;
{
@ -1990,26 +1990,26 @@ void rai::block_store::pending_put (MDB_txn * transaction_a, rai::block_hash con
rai::write (stream, pending_a.amount);
rai::write (stream, pending_a.destination);
}
auto status (mdb_put (transaction_a, pending, hash_a.val (), pending_a.val (), 0));
auto status (mdb_put (transaction_a, pending, key_a.val (), pending_a.val (), 0));
assert (status == 0);
}
void rai::block_store::pending_del (MDB_txn * transaction_a, rai::block_hash const & hash_a)
void rai::block_store::pending_del (MDB_txn * transaction_a, rai::pending_key const & key_a)
{
auto status (mdb_del (transaction_a, pending, hash_a.val (), nullptr));
auto status (mdb_del (transaction_a, pending, key_a.val (), nullptr));
assert (status == 0);
}
bool rai::block_store::pending_exists (MDB_txn * transaction_a, rai::block_hash const & hash_a)
bool rai::block_store::pending_exists (MDB_txn * transaction_a, rai::pending_key const & key_a)
{
auto iterator (pending_begin (transaction_a, hash_a));
return iterator != rai::store_iterator (nullptr) && rai::block_hash (iterator->first) == hash_a;
auto iterator (pending_begin (transaction_a, key_a));
return iterator != rai::store_iterator (nullptr) && rai::pending_key (iterator->first) == key_a;
}
bool rai::block_store::pending_get (MDB_txn * transaction_a, rai::block_hash const & hash_a, rai::pending_info & pending_a)
bool rai::block_store::pending_get (MDB_txn * transaction_a, rai::pending_key const & key_a, rai::pending_info & pending_a)
{
MDB_val value;
auto status (mdb_get (transaction_a, pending, hash_a.val (), &value));
auto status (mdb_get (transaction_a, pending, key_a.val (), &value));
assert (status == 0 || status == MDB_NOTFOUND);
bool result;
if (status == MDB_NOTFOUND)
@ -2031,9 +2031,9 @@ bool rai::block_store::pending_get (MDB_txn * transaction_a, rai::block_hash con
return result;
}
rai::store_iterator rai::block_store::pending_begin (MDB_txn * transaction_a, rai::block_hash const & hash_a)
rai::store_iterator rai::block_store::pending_begin (MDB_txn * transaction_a, rai::pending_key const & key_a)
{
rai::store_iterator result (transaction_a, pending, hash_a.val ());
rai::store_iterator result (transaction_a, pending, key_a.val ());
return result;
}
@ -2101,6 +2101,45 @@ rai::mdb_val rai::pending_info::val () const
return rai::mdb_val (sizeof (*this), const_cast <rai::pending_info *> (this));
}
rai::pending_key::pending_key (rai::account const & account_a, rai::block_hash const & hash_a) :
account (account_a),
hash (hash_a)
{
}
rai::pending_key::pending_key (MDB_val const & val_a)
{
assert(val_a.mv_size == sizeof (*this));
static_assert (sizeof (account) + sizeof (hash) == sizeof (*this), "Packed class");
std::copy (reinterpret_cast <uint8_t const *> (val_a.mv_data), reinterpret_cast <uint8_t const *> (val_a.mv_data) + sizeof (*this), reinterpret_cast <uint8_t *> (this));
}
void rai::pending_key::serialize (rai::stream & stream_a) const
{
rai::write (stream_a, account.bytes);
rai::write (stream_a, hash.bytes);
}
bool rai::pending_key::deserialize (rai::stream & stream_a)
{
auto result (rai::read (stream_a, account.bytes));
if (!result)
{
result = rai::read (stream_a, hash.bytes);
}
return result;
}
bool rai::pending_key::operator == (rai::pending_key const & other_a) const
{
return account == other_a.account && hash == other_a.hash;
}
rai::mdb_val rai::pending_key::val () const
{
return rai::mdb_val (sizeof (*this), const_cast <rai::pending_key *> (this));
}
rai::uint128_t rai::block_store::representation_get (MDB_txn * transaction_a, rai::account const & account_a)
{
MDB_val value;
@ -2529,7 +2568,8 @@ public:
{
auto hash (block_a.hash ());
rai::pending_info pending;
while (ledger.store.pending_get (transaction, hash, pending) && !error)
rai::pending_key key (block_a.hashables.destination, hash);
while (ledger.store.pending_get (transaction, key, pending) && !error)
{
error = ledger.rollback (transaction, ledger.latest (transaction, block_a.hashables.destination));
}
@ -2538,7 +2578,7 @@ public:
rai::account_info info;
auto error (ledger.store.account_get (transaction, pending.source, info));
assert (!error);
ledger.store.pending_del (transaction, hash);
ledger.store.pending_del (transaction, key);
ledger.store.representation_add (transaction, ledger.representative (transaction, hash), pending.amount.number ());
ledger.change_latest (transaction, pending.source, block_a.hashables.previous, info.rep_block, ledger.balance (transaction, block_a.hashables.previous));
ledger.store.block_del (transaction, hash);
@ -2563,7 +2603,7 @@ public:
ledger.store.representation_add (transaction, ledger.representative (transaction, hash), 0 - amount);
ledger.change_latest (transaction, destination_account, block_a.hashables.previous, representative, ledger.balance (transaction, block_a.hashables.previous));
ledger.store.block_del (transaction, hash);
ledger.store.pending_put (transaction, block_a.hashables.source, {ledger.account (transaction, block_a.hashables.source), amount, destination_account});
ledger.store.pending_put (transaction, rai::pending_key (destination_account, block_a.hashables.source), {ledger.account (transaction, block_a.hashables.source), amount, destination_account});
ledger.store.frontier_del (transaction, hash);
ledger.store.frontier_put (transaction, block_a.hashables.previous, destination_account);
ledger.store.block_successor_clear (transaction, block_a.hashables.previous);
@ -2584,7 +2624,7 @@ public:
ledger.store.representation_add (transaction, ledger.representative (transaction, hash), 0 - amount);
ledger.change_latest (transaction, destination_account, 0, representative, 0);
ledger.store.block_del (transaction, hash);
ledger.store.pending_put (transaction, block_a.hashables.source, {ledger.account (transaction, block_a.hashables.source), amount, destination_account});
ledger.store.pending_put (transaction, rai::pending_key (destination_account, block_a.hashables.source), {ledger.account (transaction, block_a.hashables.source), amount, destination_account});
ledger.store.frontier_del (transaction, hash);
}
else
@ -2977,7 +3017,7 @@ void ledger_processor::send_block (rai::send_block const & block_a)
ledger.store.representation_add (transaction, info.rep_block, 0 - amount);
ledger.store.block_put (transaction, hash, block_a);
ledger.change_latest (transaction, account, hash, info.rep_block, block_a.hashables.balance);
ledger.store.pending_put (transaction, hash, {account, amount, block_a.hashables.destination});
ledger.store.pending_put (transaction, rai::pending_key (block_a.hashables.destination, hash), {account, amount, block_a.hashables.destination});
ledger.store.frontier_del (transaction, block_a.hashables.previous);
ledger.store.frontier_put (transaction, hash, account);
result.account = account;
@ -3011,8 +3051,9 @@ void ledger_processor::receive_block (rai::receive_block const & block_a)
result.code = info.head == block_a.hashables.previous ? rai::process_result::progress : rai::process_result::gap_previous; // Block doesn't immediately follow latest block (Harmless)
if (result.code == rai::process_result::progress)
{
rai::pending_key key (source->hashables.destination, block_a.hashables.source);
rai::pending_info pending;
result.code = ledger.store.pending_get (transaction, block_a.hashables.source, pending) ? rai::process_result::unreceivable : rai::process_result::progress; // Has this source already been received (Malformed)
result.code = ledger.store.pending_get (transaction, key, pending) ? rai::process_result::unreceivable : rai::process_result::progress; // Has this source already been received (Malformed)
if (result.code == rai::process_result::progress)
{
assert (ledger.store.frontier_get (transaction, block_a.hashables.previous) == pending.destination);
@ -3020,7 +3061,7 @@ void ledger_processor::receive_block (rai::receive_block const & block_a)
rai::account_info source_info;
auto error (ledger.store.account_get (transaction, pending.source, source_info));
assert (!error);
ledger.store.pending_del (transaction, block_a.hashables.source);
ledger.store.pending_del (transaction, key);
ledger.store.block_put (transaction, hash, block_a);
ledger.change_latest (transaction, pending.destination, hash, info.rep_block, new_balance);
ledger.store.representation_add (transaction, info.rep_block, pending.amount.number ());
@ -3058,8 +3099,9 @@ void ledger_processor::open_block (rai::open_block const & block_a)
result.code = ledger.store.account_get (transaction, block_a.hashables.account, info) ? rai::process_result::progress : rai::process_result::fork; // Has this account already been opened? (Malicious)
if (result.code == rai::process_result::progress)
{
rai::pending_key key (block_a.hashables.account, block_a.hashables.source);
rai::pending_info pending;
result.code = ledger.store.pending_get (transaction, block_a.hashables.source, pending) ? rai::process_result::unreceivable : rai::process_result::progress; // Has this source already been received (Malformed)
result.code = ledger.store.pending_get (transaction, key, pending) ? rai::process_result::unreceivable : rai::process_result::progress; // Has this source already been received (Malformed)
if (result.code == rai::process_result::progress)
{
result.code = pending.destination == block_a.hashables.account ? rai::process_result::progress : rai::process_result::account_mismatch; // Does the account listed in the open block match the one named in the send block? (Malformed)
@ -3068,7 +3110,7 @@ void ledger_processor::open_block (rai::open_block const & block_a)
rai::account_info source_info;
auto error (ledger.store.account_get (transaction, pending.source, source_info));
assert (!error);
ledger.store.pending_del (transaction, block_a.hashables.source);
ledger.store.pending_del (transaction, key);
ledger.store.block_put (transaction, hash, block_a);
ledger.change_latest (transaction, pending.destination, hash, hash, pending.amount.number ());
ledger.store.representation_add (transaction, hash, pending.amount.number ());

View file

@ -295,6 +295,18 @@ public:
rai::amount amount;
rai::account destination;
};
class pending_key
{
public:
pending_key (rai::account const &, rai::block_hash const &);
pending_key (MDB_val const &);
void serialize (rai::stream &) const;
bool deserialize (rai::stream &);
bool operator == (rai::pending_key const &) const;
rai::mdb_val val () const;
rai::account account;
rai::block_hash hash;
};
class block_store
{
public:
@ -325,11 +337,11 @@ public:
rai::store_iterator latest_begin (MDB_txn *);
rai::store_iterator latest_end ();
void pending_put (MDB_txn *, rai::block_hash const &, rai::pending_info const &);
void pending_del (MDB_txn *, rai::block_hash const &);
bool pending_get (MDB_txn *, rai::block_hash const &, rai::pending_info &);
bool pending_exists (MDB_txn *, rai::block_hash const &);
rai::store_iterator pending_begin (MDB_txn *, rai::block_hash const &);
void pending_put (MDB_txn *, rai::pending_key const &, rai::pending_info const &);
void pending_del (MDB_txn *, rai::pending_key const &);
bool pending_get (MDB_txn *, rai::pending_key const &, rai::pending_info &);
bool pending_exists (MDB_txn *, rai::pending_key const &);
rai::store_iterator pending_begin (MDB_txn *, rai::pending_key const &);
rai::store_iterator pending_begin (MDB_txn *);
rai::store_iterator pending_end ();