Change signature of virtual channel::send_buffer (...)
This commit is contained in:
parent
c7401195ab
commit
7b9aad9c96
8 changed files with 19 additions and 10 deletions
|
|
@ -16,8 +16,7 @@ nano::transport::channel::channel (nano::node & node_a) :
|
||||||
|
|
||||||
bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback)
|
bool nano::transport::channel::send (nano::message const & message, nano::transport::traffic_type traffic_type, callback_t callback)
|
||||||
{
|
{
|
||||||
auto buffer = message.to_shared_const_buffer ();
|
bool sent = send_impl (message, traffic_type, std::move (callback));
|
||||||
bool sent = send_buffer (buffer, traffic_type, std::move (callback));
|
|
||||||
node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
|
node.stats.inc (sent ? nano::stat::type::message : nano::stat::type::drop, to_stat_detail (message.type ()), nano::stat::dir::out, /* aggregate all */ true);
|
||||||
return sent;
|
return sent;
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -120,7 +120,7 @@ public:
|
||||||
std::shared_ptr<nano::node> owner () const;
|
std::shared_ptr<nano::node> owner () const;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
virtual bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, callback_t) = 0;
|
virtual bool send_impl (nano::message const &, nano::transport::traffic_type, callback_t) = 0;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
nano::node & node;
|
nano::node & node;
|
||||||
|
|
|
||||||
|
|
@ -14,8 +14,9 @@ nano::transport::fake::channel::channel (nano::node & node) :
|
||||||
/**
|
/**
|
||||||
* The send function behaves like a null device, it throws the data away and returns success.
|
* The send function behaves like a null device, it throws the data away and returns success.
|
||||||
*/
|
*/
|
||||||
bool nano::transport::fake::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
bool nano::transport::fake::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
|
auto buffer = message.to_shared_const_buffer ();
|
||||||
auto size = buffer.size ();
|
auto size = buffer.size ();
|
||||||
if (callback)
|
if (callback)
|
||||||
{
|
{
|
||||||
|
|
|
||||||
|
|
@ -50,7 +50,7 @@ namespace transport
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::endpoint endpoint;
|
nano::endpoint endpoint;
|
||||||
|
|
|
||||||
|
|
@ -18,8 +18,10 @@ nano::transport::inproc::channel::channel (nano::node & node, nano::node & desti
|
||||||
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
* Send the buffer to the peer and call the callback function when done. The call never fails.
|
||||||
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
* Note that the inbound message visitor will be called before the callback because it is called directly whereas the callback is spawned in the background.
|
||||||
*/
|
*/
|
||||||
bool nano::transport::inproc::channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
bool nano::transport::inproc::channel::send_impl (nano::message const & message, nano::transport::traffic_type traffic_type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
|
auto buffer = message.to_shared_const_buffer ();
|
||||||
|
|
||||||
std::size_t offset{ 0 };
|
std::size_t offset{ 0 };
|
||||||
auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
auto const buffer_read_fn = [&offset, buffer_v = buffer.to_bytes ()] (std::shared_ptr<std::vector<uint8_t>> const & data_a, std::size_t size_a, std::function<void (boost::system::error_code const &, std::size_t)> callback_a) {
|
||||||
debug_assert (buffer_v.size () >= (offset + size_a));
|
debug_assert (buffer_v.size () >= (offset + size_a));
|
||||||
|
|
|
||||||
|
|
@ -40,7 +40,7 @@ namespace transport
|
||||||
}
|
}
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
nano::node & destination;
|
nano::node & destination;
|
||||||
|
|
|
||||||
|
|
@ -76,20 +76,27 @@ bool nano::transport::tcp_channel::max (nano::transport::traffic_type traffic_ty
|
||||||
return queue.max (traffic_type);
|
return queue.max (traffic_type);
|
||||||
}
|
}
|
||||||
|
|
||||||
bool nano::transport::tcp_channel::send_buffer (nano::shared_const_buffer const & buffer, nano::transport::traffic_type type, nano::transport::channel::callback_t callback)
|
bool nano::transport::tcp_channel::send_impl (nano::message const & message, nano::transport::traffic_type type, nano::transport::channel::callback_t callback)
|
||||||
{
|
{
|
||||||
|
auto buffer = message.to_shared_const_buffer ();
|
||||||
|
|
||||||
nano::unique_lock<nano::mutex> lock{ mutex };
|
nano::unique_lock<nano::mutex> lock{ mutex };
|
||||||
if (!queue.full (type))
|
if (!queue.full (type))
|
||||||
{
|
{
|
||||||
queue.push (type, { buffer, callback });
|
queue.push (type, { buffer, callback });
|
||||||
lock.unlock ();
|
lock.unlock ();
|
||||||
|
|
||||||
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::queued, nano::stat::dir::out);
|
||||||
node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel_queued, to_stat_detail (type), nano::stat::dir::out);
|
||||||
sending_task.notify ();
|
|
||||||
|
sending_task.notify (); // Wake up the sending task
|
||||||
|
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
else
|
else
|
||||||
{
|
{
|
||||||
|
lock.unlock ();
|
||||||
|
|
||||||
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel, nano::stat::detail::drop, nano::stat::dir::out);
|
||||||
node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out);
|
node.stats.inc (nano::stat::type::tcp_channel_drop, to_stat_detail (type), nano::stat::dir::out);
|
||||||
}
|
}
|
||||||
|
|
|
||||||
|
|
@ -66,7 +66,7 @@ public:
|
||||||
std::string to_string () const override;
|
std::string to_string () const override;
|
||||||
|
|
||||||
protected:
|
protected:
|
||||||
bool send_buffer (nano::shared_const_buffer const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
bool send_impl (nano::message const &, nano::transport::traffic_type, nano::transport::channel::callback_t) override;
|
||||||
|
|
||||||
private:
|
private:
|
||||||
void start ();
|
void start ();
|
||||||
|
|
|
||||||
Loading…
Add table
Add a link
Reference in a new issue