Fabcoin Core  0.16.2
P2P Digital Currency
RLPXSocketIO.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16  */
22 #include "RLPXSocketIO.h"
23 
24 #include <algorithm>
25 using namespace std;
26 using namespace dev;
27 using namespace dev::p2p;
28 
29 uint32_t const RLPXSocketIO::MinFrameSize = h128::size * 3; // header + block + mac
30 uint32_t const RLPXSocketIO::MaxPacketSize = 1 << 24;
31 uint16_t const RLPXSocketIO::DefaultInitialCapacity = 8 << 8;
32 
33 RLPXSocketIO::RLPXSocketIO(unsigned _protCount, RLPXFrameCoder& _coder, bi::tcp::socket& _socket, bool _flowControl, size_t _initialCapacity):
34  m_flowControl(_flowControl),
35  m_coder(_coder),
36  m_socket(_socket),
37  m_writers(writers(_protCount)),
38  m_egressCapacity(m_flowControl ? _initialCapacity : MaxPacketSize * m_writers.size())
39 {}
40 
41 vector<RLPXFrameWriter> RLPXSocketIO::writers(unsigned _capacity)
42 {
43  vector<RLPXFrameWriter> ret;
44  for (unsigned i = 0; i < _capacity; i++)
45  ret.push_back(RLPXFrameWriter(i));
46  return ret;
47 }
48 
49 void RLPXSocketIO::send(unsigned _protocolType, unsigned _type, RLPStream& _payload)
50 {
51  if (!m_socket.is_open())
52  return; // TCPSocketNotOpen
53  m_writers.at(_protocolType).enque(_type, _payload);
54  bool wasEmtpy = false;
56  wasEmtpy = (++m_queued == 1);
57  if (wasEmtpy)
58  doWrite();
59 }
60 
62 {
63  m_toSend.clear();
64 
65  size_t capacity = 0;
67  capacity = min(m_egressCapacity, MaxPacketSize);
68 
69  size_t active = 0;
70  for (auto const& w: m_writers)
71  if (w.size())
72  active += 1;
73  size_t dequed = 0;
74  size_t protFrameSize = capacity / active;
75  if (protFrameSize >= MinFrameSize)
76  for (auto& w: m_writers)
77  dequed += w.mux(m_coder, protFrameSize, m_toSend);
78 
79  if (dequed)
80  write(dequed);
81  else
82  deferWrite();
83 }
84 
86 {
87  auto self(shared_from_this());
88  m_congestion.reset(new ba::deadline_timer(m_socket.get_io_service()));
89  m_congestion->expires_from_now(boost::posix_time::milliseconds(50));
90  m_congestion->async_wait([=](boost::system::error_code const& _ec) { m_congestion.reset(); if (!_ec) doWrite(); });
91 }
92 
93 void RLPXSocketIO::write(size_t _dequed)
94 {
95  auto self(shared_from_this());
96  if (m_toSend.empty())
97  return;
98 
99  ba::async_write(m_socket, ba::buffer(m_toSend[0]), [this, self, _dequed](boost::system::error_code ec, size_t written)
100  {
101  if (ec)
102  return; // TCPSocketWriteError
103 
104  bool reschedule = false;
106  {
107  if (m_flowControl)
108  m_egressCapacity -= written;
109  m_queued -= _dequed;
110  reschedule = m_queued > 0;
111  }
112  if (reschedule)
113  doWrite();
114  });
115 }
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
Definition: Arith256.cpp:15
static std::vector< RLPXFrameWriter > writers(unsigned _capacity)
std::deque< bytes > m_toSend
Reusable byte buffer for pending socket writes.
Definition: RLPXSocketIO.h:58
unsigned m_queued
Track total queued packets to ensure single write loop.
Definition: RLPXSocketIO.h:64
std::hash for asio::adress
Definition: Common.h:323
std::unique_ptr< ba::deadline_timer > m_congestion
Scheduled when writes are deferred due to congestion.
Definition: RLPXSocketIO.h:61
ExecStats::duration min
Definition: ExecStats.cpp:35
std::vector< RLPXFrameWriter > m_writers
Write queues for each protocol. TODO: map to bytes (of capability)
Definition: RLPXSocketIO.h:60
void send(unsigned _protocolType, unsigned _type, RLPStream &_payload)
bi::tcp::socket & m_socket
Definition: RLPXSocketIO.h:56
#define DEV_GUARDED(MUTEX)
Simple block guard.
Definition: Guards.h:144
static uint32_t const MinFrameSize
Definition: RLPXSocketIO.h:36
bool const m_flowControl
True if flow control is enabled.
Definition: RLPXSocketIO.h:53
void write(size_t _dequed)
RLPXFrameCoder & m_coder
Encoder/decoder of frame payloads.
Definition: RLPXSocketIO.h:55
uint8_t const size_t const size
Definition: sha3.h:20
Class for writing to an RLP bytestream.
Definition: RLP.h:383
Encoder/decoder transport for RLPx connection established by RLPXHandshake.
static uint32_t const MaxPacketSize
Definition: RLPXSocketIO.h:37
Multiplex packets into encrypted RLPX frames.