Merge pull request #4280 from clemahieu/bucket_class
Cleanup scheduler::buckets class
This commit is contained in:
commit
afa344efb1
7 changed files with 150 additions and 77 deletions
|
@ -194,6 +194,8 @@ add_library(
|
|||
rocksdb/rocksdb_iterator.hpp
|
||||
rocksdb/rocksdb_txn.hpp
|
||||
rocksdb/rocksdb_txn.cpp
|
||||
scheduler/bucket.cpp
|
||||
scheduler/bucket.hpp
|
||||
scheduler/buckets.cpp
|
||||
scheduler/buckets.hpp
|
||||
scheduler/component.hpp
|
||||
|
|
62
nano/node/scheduler/bucket.cpp
Normal file
62
nano/node/scheduler/bucket.cpp
Normal file
|
@ -0,0 +1,62 @@
|
|||
#include <nano/lib/blocks.hpp>
|
||||
#include <nano/node/scheduler/bucket.hpp>
|
||||
|
||||
bool nano::scheduler::bucket::value_type::operator< (value_type const & other_a) const
|
||||
{
|
||||
return time < other_a.time || (time == other_a.time && block->hash () < other_a.block->hash ());
|
||||
}
|
||||
|
||||
bool nano::scheduler::bucket::value_type::operator== (value_type const & other_a) const
|
||||
{
|
||||
return time == other_a.time && block->hash () == other_a.block->hash ();
|
||||
}
|
||||
|
||||
nano::scheduler::bucket::bucket (size_t maximum) :
|
||||
maximum{ maximum }
|
||||
{
|
||||
debug_assert (maximum > 0);
|
||||
}
|
||||
|
||||
nano::scheduler::bucket::~bucket ()
|
||||
{
|
||||
}
|
||||
|
||||
std::shared_ptr<nano::block> nano::scheduler::bucket::top () const
|
||||
{
|
||||
debug_assert (!queue.empty ());
|
||||
return queue.begin ()->block;
|
||||
}
|
||||
|
||||
void nano::scheduler::bucket::pop ()
|
||||
{
|
||||
debug_assert (!queue.empty ());
|
||||
queue.erase (queue.begin ());
|
||||
}
|
||||
|
||||
void nano::scheduler::bucket::push (uint64_t time, std::shared_ptr<nano::block> block)
|
||||
{
|
||||
queue.insert ({ time, block });
|
||||
if (queue.size () > maximum)
|
||||
{
|
||||
debug_assert (!queue.empty ());
|
||||
queue.erase (--queue.end ());
|
||||
}
|
||||
}
|
||||
|
||||
size_t nano::scheduler::bucket::size () const
|
||||
{
|
||||
return queue.size ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::bucket::empty () const
|
||||
{
|
||||
return queue.empty ();
|
||||
}
|
||||
|
||||
void nano::scheduler::bucket::dump () const
|
||||
{
|
||||
for (auto const & item : queue)
|
||||
{
|
||||
std::cerr << item.time << ' ' << item.block->hash ().to_string () << '\n';
|
||||
}
|
||||
}
|
39
nano/node/scheduler/bucket.hpp
Normal file
39
nano/node/scheduler/bucket.hpp
Normal file
|
@ -0,0 +1,39 @@
|
|||
#pragma once
|
||||
|
||||
#include <cstddef>
|
||||
#include <cstdint>
|
||||
#include <memory>
|
||||
#include <set>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
class block;
|
||||
}
|
||||
namespace nano::scheduler
|
||||
{
|
||||
/** A class which holds an ordered set of blocks to be scheduled, ordered by their block arrival time
|
||||
*/
|
||||
class bucket final
|
||||
{
|
||||
class value_type
|
||||
{
|
||||
public:
|
||||
uint64_t time;
|
||||
std::shared_ptr<nano::block> block;
|
||||
bool operator< (value_type const & other_a) const;
|
||||
bool operator== (value_type const & other_a) const;
|
||||
};
|
||||
std::set<value_type> queue;
|
||||
size_t const maximum;
|
||||
|
||||
public:
|
||||
bucket (size_t maximum);
|
||||
~bucket ();
|
||||
std::shared_ptr<nano::block> top () const;
|
||||
void pop ();
|
||||
void push (uint64_t time, std::shared_ptr<nano::block> block);
|
||||
size_t size () const;
|
||||
bool empty () const;
|
||||
void dump () const;
|
||||
};
|
||||
} // namespace nano::scheduler
|
|
@ -1,26 +1,17 @@
|
|||
#include <nano/lib/blocks.hpp>
|
||||
#include <nano/lib/utility.hpp>
|
||||
#include <nano/node/scheduler/bucket.hpp>
|
||||
#include <nano/node/scheduler/buckets.hpp>
|
||||
|
||||
#include <string>
|
||||
|
||||
bool nano::scheduler::buckets::value_type::operator< (value_type const & other_a) const
|
||||
{
|
||||
return time < other_a.time || (time == other_a.time && block->hash () < other_a.block->hash ());
|
||||
}
|
||||
|
||||
bool nano::scheduler::buckets::value_type::operator== (value_type const & other_a) const
|
||||
{
|
||||
return time == other_a.time && block->hash () == other_a.block->hash ();
|
||||
}
|
||||
|
||||
/** Moves the bucket pointer to the next bucket */
|
||||
void nano::scheduler::buckets::next ()
|
||||
{
|
||||
++current;
|
||||
if (current == schedule.end ())
|
||||
if (current == buckets_m.end ())
|
||||
{
|
||||
current = schedule.begin ();
|
||||
current = buckets_m.begin ();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -28,21 +19,12 @@ void nano::scheduler::buckets::next ()
|
|||
void nano::scheduler::buckets::seek ()
|
||||
{
|
||||
next ();
|
||||
for (std::size_t i = 0, n = schedule.size (); buckets_m[*current].empty () && i < n; ++i)
|
||||
for (std::size_t i = 0, n = buckets_m.size (); (*current)->empty () && i < n; ++i)
|
||||
{
|
||||
next ();
|
||||
}
|
||||
}
|
||||
|
||||
/** Initialise the schedule vector */
|
||||
void nano::scheduler::buckets::populate_schedule ()
|
||||
{
|
||||
for (auto i = 0; i < buckets_m.size (); ++i)
|
||||
{
|
||||
schedule.push_back (i);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Prioritization constructor, construct a container containing approximately 'maximum' number of blocks.
|
||||
* @param maximum number of blocks that this container can hold, this is a soft and approximate limit.
|
||||
|
@ -67,9 +49,16 @@ nano::scheduler::buckets::buckets (uint64_t maximum) :
|
|||
build_region (uint128_t{ 1 } << 112, uint128_t{ 1 } << 116, 4);
|
||||
build_region (uint128_t{ 1 } << 116, uint128_t{ 1 } << 120, 2);
|
||||
minimums.push_back (uint128_t{ 1 } << 120);
|
||||
buckets_m.resize (minimums.size ());
|
||||
populate_schedule ();
|
||||
current = schedule.begin ();
|
||||
auto bucket_max = std::max<size_t> (1u, maximum / minimums.size ());
|
||||
for (size_t i = 0u, n = minimums.size (); i < n; ++i)
|
||||
{
|
||||
buckets_m.push_back (std::make_unique<scheduler::bucket> (bucket_max));
|
||||
}
|
||||
current = buckets_m.begin ();
|
||||
}
|
||||
|
||||
nano::scheduler::buckets::~buckets ()
|
||||
{
|
||||
}
|
||||
|
||||
std::size_t nano::scheduler::buckets::index (nano::uint128_t const & balance) const
|
||||
|
@ -86,11 +75,7 @@ void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr<nano::block>
|
|||
{
|
||||
auto was_empty = empty ();
|
||||
auto & bucket = buckets_m[index (priority.number ())];
|
||||
bucket.emplace (value_type{ time, block });
|
||||
if (bucket.size () > std::max (decltype (maximum){ 1 }, maximum / buckets_m.size ()))
|
||||
{
|
||||
bucket.erase (--bucket.end ());
|
||||
}
|
||||
bucket->push (time, block);
|
||||
if (was_empty)
|
||||
{
|
||||
seek ();
|
||||
|
@ -101,8 +86,7 @@ void nano::scheduler::buckets::push (uint64_t time, std::shared_ptr<nano::block>
|
|||
std::shared_ptr<nano::block> nano::scheduler::buckets::top () const
|
||||
{
|
||||
debug_assert (!empty ());
|
||||
debug_assert (!buckets_m[*current].empty ());
|
||||
auto result = buckets_m[*current].begin ()->block;
|
||||
auto result = (*current)->top ();
|
||||
return result;
|
||||
}
|
||||
|
||||
|
@ -110,9 +94,8 @@ std::shared_ptr<nano::block> nano::scheduler::buckets::top () const
|
|||
void nano::scheduler::buckets::pop ()
|
||||
{
|
||||
debug_assert (!empty ());
|
||||
debug_assert (!buckets_m[*current].empty ());
|
||||
auto & bucket = buckets_m[*current];
|
||||
bucket.erase (bucket.begin ());
|
||||
auto & bucket = *current;
|
||||
bucket->pop ();
|
||||
seek ();
|
||||
}
|
||||
|
||||
|
@ -120,9 +103,9 @@ void nano::scheduler::buckets::pop ()
|
|||
std::size_t nano::scheduler::buckets::size () const
|
||||
{
|
||||
std::size_t result{ 0 };
|
||||
for (auto const & queue : buckets_m)
|
||||
for (auto const & bucket : buckets_m)
|
||||
{
|
||||
result += queue.size ();
|
||||
result += bucket->size ();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
@ -136,26 +119,23 @@ std::size_t nano::scheduler::buckets::bucket_count () const
|
|||
/** Returns number of items in bucket with index 'index' */
|
||||
std::size_t nano::scheduler::buckets::bucket_size (std::size_t index) const
|
||||
{
|
||||
return buckets_m[index].size ();
|
||||
return buckets_m[index]->size ();
|
||||
}
|
||||
|
||||
/** Returns true if all buckets are empty */
|
||||
bool nano::scheduler::buckets::empty () const
|
||||
{
|
||||
return std::all_of (buckets_m.begin (), buckets_m.end (), [] (priority const & bucket_a) { return bucket_a.empty (); });
|
||||
return std::all_of (buckets_m.begin (), buckets_m.end (), [] (auto const & bucket) { return bucket->empty (); });
|
||||
}
|
||||
|
||||
/** Print the state of the class in stderr */
|
||||
void nano::scheduler::buckets::dump () const
|
||||
{
|
||||
for (auto const & i : buckets_m)
|
||||
for (auto const & bucket : buckets_m)
|
||||
{
|
||||
for (auto const & j : i)
|
||||
{
|
||||
std::cerr << j.time << ' ' << j.block->hash ().to_string () << '\n';
|
||||
}
|
||||
bucket->dump ();
|
||||
}
|
||||
std::cerr << "current: " << std::to_string (*current) << '\n';
|
||||
std::cerr << "current: " << current - buckets_m.begin () << '\n';
|
||||
}
|
||||
|
||||
std::unique_ptr<nano::container_info_component> nano::scheduler::buckets::collect_container_info (std::string const & name)
|
||||
|
@ -164,7 +144,7 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::buckets::collec
|
|||
for (auto i = 0; i < buckets_m.size (); ++i)
|
||||
{
|
||||
auto const & bucket = buckets_m[i];
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (i), bucket.size (), 0 }));
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ std::to_string (i), bucket->size (), 0 }));
|
||||
}
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -3,8 +3,9 @@
|
|||
#include <nano/lib/utility.hpp>
|
||||
|
||||
#include <cstddef>
|
||||
#include <set>
|
||||
#include <vector>
|
||||
#include <cstdint>
|
||||
#include <deque>
|
||||
#include <memory>
|
||||
|
||||
namespace nano
|
||||
{
|
||||
|
@ -12,6 +13,7 @@ class block;
|
|||
}
|
||||
namespace nano::scheduler
|
||||
{
|
||||
class bucket;
|
||||
/** A container for holding blocks and their arrival/creation time.
|
||||
*
|
||||
* The container consists of a number of buckets. Each bucket holds an ordered set of 'value_type' items.
|
||||
|
@ -24,39 +26,25 @@ namespace nano::scheduler
|
|||
*/
|
||||
class buckets final
|
||||
{
|
||||
class value_type
|
||||
{
|
||||
public:
|
||||
uint64_t time;
|
||||
std::shared_ptr<nano::block> block;
|
||||
bool operator< (value_type const & other_a) const;
|
||||
bool operator== (value_type const & other_a) const;
|
||||
};
|
||||
|
||||
using priority = std::set<value_type>;
|
||||
|
||||
/** container for the buckets to be read in round robin fashion */
|
||||
std::vector<priority> buckets_m;
|
||||
std::deque<std::unique_ptr<bucket>> buckets_m;
|
||||
|
||||
/** thresholds that define the bands for each bucket, the minimum balance an account must have to enter a bucket,
|
||||
* the container writes a block to the lowest indexed bucket that has balance larger than the bucket's minimum value */
|
||||
std::vector<nano::uint128_t> minimums;
|
||||
|
||||
/** Contains bucket indicies to iterate over when making the next scheduling decision */
|
||||
std::vector<uint8_t> schedule;
|
||||
std::deque<nano::uint128_t> minimums;
|
||||
|
||||
/** index of bucket to read next */
|
||||
decltype (schedule)::const_iterator current;
|
||||
decltype (buckets_m)::const_iterator current;
|
||||
|
||||
/** maximum number of blocks in whole container, each bucket's maximum is maximum / bucket_number */
|
||||
uint64_t const maximum;
|
||||
|
||||
void next ();
|
||||
void seek ();
|
||||
void populate_schedule ();
|
||||
|
||||
public:
|
||||
buckets (uint64_t maximum = 250000u);
|
||||
~buckets ();
|
||||
void push (uint64_t time, std::shared_ptr<nano::block> block, nano::amount const & priority);
|
||||
std::shared_ptr<nano::block> top () const;
|
||||
void pop ();
|
||||
|
|
|
@ -1,9 +1,11 @@
|
|||
#include <nano/node/node.hpp>
|
||||
#include <nano/node/scheduler/buckets.hpp>
|
||||
#include <nano/node/scheduler/priority.hpp>
|
||||
|
||||
nano::scheduler::priority::priority (nano::node & node_a, nano::stats & stats_a) :
|
||||
node{ node_a },
|
||||
stats{ stats_a }
|
||||
stats{ stats_a },
|
||||
buckets{ std::make_unique<scheduler::buckets> () }
|
||||
{
|
||||
}
|
||||
|
||||
|
@ -60,7 +62,7 @@ bool nano::scheduler::priority::activate (nano::account const & account_a, nano:
|
|||
auto balance = node.ledger.balance (transaction, hash);
|
||||
auto previous_balance = node.ledger.balance (transaction, conf_info.frontier);
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
buckets.push (info->modified, block, std::max (balance, previous_balance));
|
||||
buckets->push (info->modified, block, std::max (balance, previous_balance));
|
||||
notify ();
|
||||
return true; // Activated
|
||||
}
|
||||
|
@ -77,12 +79,12 @@ void nano::scheduler::priority::notify ()
|
|||
std::size_t nano::scheduler::priority::size () const
|
||||
{
|
||||
nano::lock_guard<nano::mutex> lock{ mutex };
|
||||
return buckets.size () + manual_queue.size ();
|
||||
return buckets->size () + manual_queue.size ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::empty_locked () const
|
||||
{
|
||||
return buckets.empty () && manual_queue.empty ();
|
||||
return buckets->empty () && manual_queue.empty ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::empty () const
|
||||
|
@ -93,12 +95,12 @@ bool nano::scheduler::priority::empty () const
|
|||
|
||||
std::size_t nano::scheduler::priority::priority_queue_size () const
|
||||
{
|
||||
return buckets.size ();
|
||||
return buckets->size ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::priority_queue_predicate () const
|
||||
{
|
||||
return node.active.vacancy () > 0 && !buckets.empty ();
|
||||
return node.active.vacancy () > 0 && !buckets->empty ();
|
||||
}
|
||||
|
||||
bool nano::scheduler::priority::manual_queue_predicate () const
|
||||
|
@ -133,8 +135,8 @@ void nano::scheduler::priority::run ()
|
|||
}
|
||||
else if (priority_queue_predicate ())
|
||||
{
|
||||
auto block = buckets.top ();
|
||||
buckets.pop ();
|
||||
auto block = buckets->top ();
|
||||
buckets->pop ();
|
||||
lock.unlock ();
|
||||
stats.inc (nano::stat::type::election_scheduler, nano::stat::detail::insert_priority);
|
||||
auto result = node.active.insert (block);
|
||||
|
@ -163,6 +165,6 @@ std::unique_ptr<nano::container_info_component> nano::scheduler::priority::colle
|
|||
|
||||
auto composite = std::make_unique<container_info_composite> (name);
|
||||
composite->add_component (std::make_unique<container_info_leaf> (container_info{ "manual_queue", manual_queue.size (), sizeof (decltype (manual_queue)::value_type) }));
|
||||
composite->add_component (buckets.collect_container_info ("buckets"));
|
||||
composite->add_component (buckets->collect_container_info ("buckets"));
|
||||
return composite;
|
||||
}
|
||||
|
|
|
@ -2,7 +2,6 @@
|
|||
|
||||
#include <nano/lib/numbers.hpp>
|
||||
#include <nano/node/active_transactions.hpp>
|
||||
#include <nano/node/scheduler/buckets.hpp>
|
||||
|
||||
#include <boost/optional.hpp>
|
||||
|
||||
|
@ -19,6 +18,7 @@ class node;
|
|||
|
||||
namespace nano::scheduler
|
||||
{
|
||||
class buckets;
|
||||
class priority final
|
||||
{
|
||||
public:
|
||||
|
@ -52,7 +52,7 @@ private:
|
|||
bool priority_queue_predicate () const;
|
||||
bool manual_queue_predicate () const;
|
||||
|
||||
nano::scheduler::buckets buckets;
|
||||
std::unique_ptr<nano::scheduler::buckets> buckets;
|
||||
|
||||
std::deque<std::tuple<std::shared_ptr<nano::block>, boost::optional<nano::uint128_t>, nano::election_behavior>> manual_queue;
|
||||
bool stopped{ false };
|
||||
|
|
Loading…
Add table
Add a link
Reference in a new issue