Fabcoin Core  0.16.2
P2P Digital Currency
Session.cpp
Go to the documentation of this file.
1 /*
2  This file is part of cpp-ethereum.
3 
4  cpp-ethereum is free software: you can redistribute it and/or modify
5  it under the terms of the GNU General Public License as published by
6  the Free Software Foundation, either version 3 of the License, or
7  (at your option) any later version.
8 
9  cpp-ethereum is distributed in the hope that it will be useful,
10  but WITHOUT ANY WARRANTY; without even the implied warranty of
11  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
12  GNU General Public License for more details.
13 
14  You should have received a copy of the GNU General Public License
15  along with cpp-ethereum. If not, see <http://www.gnu.org/licenses/>.
16 */
23 #include "Session.h"
24 
25 #include <chrono>
26 #include <libdevcore/Common.h>
27 #include <libdevcore/CommonIO.h>
28 #include <libdevcore/Exceptions.h>
29 #include "Host.h"
30 #include "Capability.h"
31 using namespace std;
32 using namespace dev;
33 using namespace dev::p2p;
34 
35 Session::Session(Host* _h, unique_ptr<RLPXFrameCoder>&& _io, std::shared_ptr<RLPXSocket> const& _s, std::shared_ptr<Peer> const& _n, PeerSessionInfo _info):
36  m_server(_h),
37  m_io(move(_io)),
38  m_socket(_s),
39  m_peer(_n),
40  m_info(_info),
41  m_ping(chrono::steady_clock::time_point::max())
42 {
43  registerFraming(0);
44  m_peer->m_lastDisconnect = NoDisconnect;
45  m_lastReceived = m_connect = chrono::steady_clock::now();
47  m_info.socketId = m_socket->ref().native_handle();
48 }
49 
51 {
52  ThreadContext tc(info().id.abridged());
53  ThreadContext tc2(info().clientVersion);
54  clog(NetMessageSummary) << "Closing peer session :-(";
55  m_peer->m_lastConnected = m_peer->m_lastAttempted - chrono::seconds(1);
56 
57  // Read-chain finished for one reason or another.
58  for (auto& i: m_capabilities)
59  i.second.reset();
60 
61  try
62  {
63  bi::tcp::socket& socket = m_socket->ref();
64  if (socket.is_open())
65  {
66  boost::system::error_code ec;
67  socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
68  socket.close();
69  }
70  }
71  catch (...){}
72 }
73 
75 {
76  return m_server->repMan();
77 }
78 
80 {
81  return m_peer ? m_peer->id : NodeID();
82 }
83 
84 void Session::addRating(int _r)
85 {
86  if (m_peer)
87  {
88  m_peer->m_rating += _r;
89  m_peer->m_score += _r;
90  if (_r >= 0)
91  m_peer->noteSessionGood();
92  }
93 }
94 
95 int Session::rating() const
96 {
97  return m_peer->m_rating;
98 }
99 
100 template <class T> vector<T> randomSelection(vector<T> const& _t, unsigned _n)
101 {
102  if (_t.size() <= _n)
103  return _t;
104  vector<T> ret = _t;
105  while (ret.size() > _n)
106  {
107  auto i = ret.begin();
108  advance(i, rand() % ret.size());
109  ret.erase(i);
110  }
111  return ret;
112 }
113 
114 bool Session::readPacket(uint16_t _capId, PacketType _t, RLP const& _r)
115 {
116  m_lastReceived = chrono::steady_clock::now();
117  clog(NetRight) << _t << _r;
118  try // Generic try-catch block designed to capture RLP format errors - TODO: give decent diagnostics, make a bit more specific over what is caught.
119  {
120  // v4 frame headers are useless, offset packet type used
121  // v5 protocol type is in header, packet type not offset
122  if (_capId == 0 && _t < UserPacket)
123  return interpret(_t, _r);
124 
125  if (isFramingEnabled())
126  {
127  for (auto const& i: m_capabilities)
128  if (i.second->c_protocolID == _capId)
129  return i.second->m_enabled ? i.second->interpret(_t, _r) : true;
130  }
131  else
132  {
133  for (auto const& i: m_capabilities)
134  if (_t >= (int)i.second->m_idOffset && _t - i.second->m_idOffset < i.second->hostCapability()->messageCount())
135  return i.second->m_enabled ? i.second->interpret(_t - i.second->m_idOffset, _r) : true;
136  }
137 
138  return false;
139  }
140  catch (std::exception const& _e)
141  {
142  clog(NetWarn) << "Exception caught in p2p::Session::interpret(): " << _e.what() << ". PacketType: " << _t << ". RLP: " << _r;
144  return true;
145  }
146  return true;
147 }
148 
149 bool Session::interpret(PacketType _t, RLP const& _r)
150 {
151  switch (_t)
152  {
153  case DisconnectPacket:
154  {
155  string reason = "Unspecified";
156  auto r = (DisconnectReason)_r[0].toInt<int>();
157  if (!_r[0].isInt())
158  drop(BadProtocol);
159  else
160  {
161  reason = reasonOf(r);
162  clog(NetMessageSummary) << "Disconnect (reason: " << reason << ")";
164  }
165  break;
166  }
167  case PingPacket:
168  {
169  clog(NetTriviaSummary) << "Ping" << m_info.id;
170  RLPStream s;
171  sealAndSend(prep(s, PongPacket), 0);
172  break;
173  }
174  case PongPacket:
176  {
177  m_info.lastPing = std::chrono::steady_clock::now() - m_ping;
178  clog(NetTriviaSummary) << "Latency: " << chrono::duration_cast<chrono::milliseconds>(m_info.lastPing).count() << " ms";
179  }
180  break;
181  case GetPeersPacket:
182  case PeersPacket:
183  break;
184  default:
185  return false;
186  }
187  return true;
188 }
189 
191 {
192  RLPStream s;
193  sealAndSend(prep(s, PingPacket), 0);
194  m_ping = std::chrono::steady_clock::now();
195 }
196 
197 RLPStream& Session::prep(RLPStream& _s, PacketType _id, unsigned _args)
198 {
199  return _s.append((unsigned)_id).appendList(_args);
200 }
201 
202 void Session::sealAndSend(RLPStream& _s, uint16_t _protocolID)
203 {
204  bytes b;
205  _s.swapOut(b);
206  send(move(b), _protocolID);
207 }
208 
210 {
211  if (_msg[0] > 0x7f || _msg.size() < 2)
212  return false;
213  if (RLP(_msg.cropped(1)).actualSize() + 1 != _msg.size())
214  return false;
215  return true;
216 }
217 
218 void Session::send(bytes&& _msg, uint16_t _protocolID)
219 {
220  bytesConstRef msg(&_msg);
221  clog(NetLeft) << RLP(msg.cropped(1));
222  if (!checkPacket(msg))
223  clog(NetWarn) << "INVALID PACKET CONSTRUCTED!";
224 
225  if (!m_socket->ref().is_open())
226  return;
227 
228  bool doWrite = false;
229  if (isFramingEnabled())
230  {
232  {
233  doWrite = m_encFrames.empty();
234  auto f = getFraming(_protocolID);
235  if (!f)
236  return;
237 
238  f->writer.enque(RLPXPacket(_protocolID, msg));
239  multiplexAll();
240  }
241 
242  if (doWrite)
243  writeFrames();
244  }
245  else
246  {
248  {
249  m_writeQueue.push_back(std::move(_msg));
250  doWrite = (m_writeQueue.size() == 1);
251  }
252 
253  if (doWrite)
254  write();
255  }
256 }
257 
259 {
260  bytes const* out = nullptr;
262  {
263  m_io->writeSingleFramePacket(&m_writeQueue[0], m_writeQueue[0]);
264  out = &m_writeQueue[0];
265  }
266  auto self(shared_from_this());
267  ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
268  {
269  ThreadContext tc(info().id.abridged());
270  ThreadContext tc2(info().clientVersion);
271  // must check queue, as write callback can occur following dropped()
272  if (ec)
273  {
274  clog(NetWarn) << "Error sending: " << ec.message();
275  drop(TCPError);
276  return;
277  }
278 
280  {
281  m_writeQueue.pop_front();
282  if (m_writeQueue.empty())
283  return;
284  }
285  write();
286  });
287 }
288 
290 {
291  bytes const* out = nullptr;
293  {
294  if (m_encFrames.empty())
295  return;
296  else
297  out = &m_encFrames[0];
298  }
299 
300  auto self(shared_from_this());
301  ba::async_write(m_socket->ref(), ba::buffer(*out), [this, self](boost::system::error_code ec, std::size_t /*length*/)
302  {
303  ThreadContext tc(info().id.abridged());
304  ThreadContext tc2(info().clientVersion);
305  // must check queue, as write callback can occur following dropped()
306  if (ec)
307  {
308  clog(NetWarn) << "Error sending: " << ec.message();
309  drop(TCPError);
310  return;
311  }
312 
314  {
315  if (!m_encFrames.empty())
316  m_encFrames.pop_front();
317 
318  multiplexAll();
319  if (m_encFrames.empty())
320  return;
321  }
322 
323  writeFrames();
324  });
325 }
326 
328 {
329  if (m_dropped)
330  return;
331  bi::tcp::socket& socket = m_socket->ref();
332  if (socket.is_open())
333  try
334  {
335  boost::system::error_code ec;
336  clog(NetConnect) << "Closing " << socket.remote_endpoint(ec) << "(" << reasonOf(_reason) << ")";
337  socket.shutdown(boost::asio::ip::tcp::socket::shutdown_both, ec);
338  socket.close();
339  }
340  catch (...) {}
341 
342  m_peer->m_lastDisconnect = _reason;
343  if (_reason == BadProtocol)
344  {
345  m_peer->m_rating /= 2;
346  m_peer->m_score /= 2;
347  }
348  m_dropped = true;
349 }
350 
352 {
353  clog(NetConnect) << "Disconnecting (our reason:" << reasonOf(_reason) << ")";
354 
355  if (m_socket->ref().is_open())
356  {
357  RLPStream s;
358  prep(s, DisconnectPacket, 1) << (int)_reason;
359  sealAndSend(s, 0);
360  }
361  drop(_reason);
362 }
363 
365 {
366  ping();
367 
368  if (isFramingEnabled())
369  doReadFrames();
370  else
371  doRead();
372 }
373 
375 {
376  // ignore packets received while waiting to disconnect.
377  if (m_dropped)
378  return;
379 
380  auto self(shared_from_this());
381  m_data.resize(h256::size);
382  ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this,self](boost::system::error_code ec, std::size_t length)
383  {
384  ThreadContext tc(info().id.abridged());
385  ThreadContext tc2(info().clientVersion);
386  if (!checkRead(h256::size, ec, length))
387  return;
388  else if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
389  {
390  clog(NetWarn) << "header decrypt failed";
391  drop(BadProtocol); // todo: better error
392  return;
393  }
394 
395  uint16_t hProtocolId;
396  uint32_t hLength;
397  uint8_t hPadding;
398  try
399  {
400  RLPXFrameInfo header(bytesConstRef(m_data.data(), length));
401  hProtocolId = header.protocolId;
402  hLength = header.length;
403  hPadding = header.padding;
404  }
405  catch (std::exception const& _e)
406  {
407  clog(NetWarn) << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
408  drop(BadProtocol);
409  return;
410  }
411 
413  auto tlen = hLength + hPadding + h128::size;
414  m_data.resize(tlen);
415  ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, hLength, hProtocolId, tlen](boost::system::error_code ec, std::size_t length)
416  {
417  ThreadContext tc(info().id.abridged());
418  ThreadContext tc2(info().clientVersion);
419  if (!checkRead(tlen, ec, length))
420  return;
421  else if (!m_io->authAndDecryptFrame(bytesRef(m_data.data(), tlen)))
422  {
423  clog(NetWarn) << "frame decrypt failed";
424  drop(BadProtocol); // todo: better error
425  return;
426  }
427 
428  bytesConstRef frame(m_data.data(), hLength);
429  if (!checkPacket(frame))
430  {
431  cerr << "Received " << frame.size() << ": " << toHex(frame) << endl;
432  clog(NetWarn) << "INVALID MESSAGE RECEIVED";
434  return;
435  }
436  else
437  {
438  auto packetType = (PacketType)RLP(frame.cropped(0, 1)).toInt<unsigned>();
439  RLP r(frame.cropped(1));
440  bool ok = readPacket(hProtocolId, packetType, r);
441  (void)ok;
442 #if ETH_DEBUG
443  if (!ok)
444  clog(NetWarn) << "Couldn't interpret packet." << RLP(r);
445 #endif
446  }
447  doRead();
448  });
449  });
450 }
451 
452 bool Session::checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
453 {
454  if (_ec && _ec.category() != boost::asio::error::get_misc_category() && _ec.value() != boost::asio::error::eof)
455  {
456  clog(NetConnect) << "Error reading: " << _ec.message();
457  drop(TCPError);
458  return false;
459  }
460  else if (_ec && _length < _expected)
461  {
462  clog(NetWarn) << "Error reading - Abrupt peer disconnect: " << _ec.message();
463  repMan().noteRude(*this);
464  drop(TCPError);
465  return false;
466  }
467  else if (_length != _expected)
468  {
469  // with static m_data-sized buffer this shouldn't happen unless there's a regression
470  // sec recommends checking anyways (instead of assert)
471  clog(NetWarn) << "Error reading - TCP read buffer length differs from expected frame size.";
473  return false;
474  }
475 
476  return true;
477 }
478 
480 {
481  if (m_dropped)
482  return; // ignore packets received while waiting to disconnect
483 
484  auto self(shared_from_this());
485  m_data.resize(h256::size);
486  ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, h256::size), [this, self](boost::system::error_code ec, std::size_t length)
487  {
488  ThreadContext tc(info().id.abridged());
489  ThreadContext tc2(info().clientVersion);
490  if (!checkRead(h256::size, ec, length))
491  return;
492 
494  {
495  if (!m_io->authAndDecryptHeader(bytesRef(m_data.data(), length)))
496  {
497  clog(NetWarn) << "header decrypt failed";
498  drop(BadProtocol); // todo: better error
499  return;
500  }
501  }
502 
503  bytesConstRef rawHeader(m_data.data(), length);
504  try
505  {
506  RLPXFrameInfo tmpHeader(rawHeader);
507  }
508  catch (std::exception const& _e)
509  {
510  clog(NetWarn) << "Exception decoding frame header RLP:" << _e.what() << bytesConstRef(m_data.data(), h128::size).cropped(3);
511  drop(BadProtocol);
512  return;
513  }
514 
515  RLPXFrameInfo header(rawHeader);
516  auto tlen = header.length + header.padding + h128::size; // padded frame and mac
517  m_data.resize(tlen);
518  ba::async_read(m_socket->ref(), boost::asio::buffer(m_data, tlen), [this, self, tlen, header](boost::system::error_code ec, std::size_t length)
519  {
520  ThreadContext tc(info().id.abridged());
521  ThreadContext tc2(info().clientVersion);
522  if (!checkRead(tlen, ec, length))
523  return;
524 
525  bytesRef frame(m_data.data(), tlen);
526  vector<RLPXPacket> px;
528  {
529  auto f = getFraming(header.protocolId);
530  if (!f)
531  {
532  clog(NetWarn) << "Unknown subprotocol " << header.protocolId;
533  drop(BadProtocol);
534  return;
535  }
536 
537  auto v = f->reader.demux(*m_io, header, frame);
538  px.swap(v);
539  }
540 
541  for (RLPXPacket& p: px)
542  {
543  PacketType packetType = (PacketType)RLP(p.type()).toInt<unsigned>(RLP::AllowNonCanon);
544  bool ok = readPacket(header.protocolId, packetType, RLP(p.data()));
545 #if ETH_DEBUG
546  if (!ok)
547  clog(NetWarn) << "Couldn't interpret packet." << RLP(p.data());
548 #endif
549  ok = true;
550  (void)ok;
551  }
552  doReadFrames();
553  });
554  });
555 }
556 
557 std::shared_ptr<Session::Framing> Session::getFraming(uint16_t _protocolID)
558 {
559  if (m_framing.find(_protocolID) == m_framing.end())
560  return nullptr;
561  else
562  return m_framing[_protocolID];
563 }
564 
565 void Session::registerCapability(CapDesc const& _desc, std::shared_ptr<Capability> _p)
566 {
568  {
569  m_capabilities[_desc] = _p;
570  }
571 }
572 
573 void Session::registerFraming(uint16_t _id)
574 {
576  {
577  if (m_framing.find(_id) == m_framing.end())
578  {
579  std::shared_ptr<Session::Framing> f(new Session::Framing(_id));
580  m_framing[_id] = f;
581  }
582  }
583 }
584 
586 {
587  for (auto& f: m_framing)
588  f.second->writer.mux(*m_io, maxFrameSize(), m_encFrames);
589 }
void doReadFrames()
Definition: Session.cpp:479
bool readPacket(uint16_t _capId, PacketType _t, RLP const &_r)
Deliver RLPX packet to Session or Capability for interpretation.
Definition: Session.cpp:114
std::unique_ptr< RLPXFrameCoder > m_io
Transport over which packets are sent.
Definition: Session.h:158
h512 NodeID
Definition: Common.h:62
Adapted from code found on http://stackoverflow.com/questions/180947/base64-decode-snippet-in-c Origi...
Definition: Arith256.cpp:15
unsigned maxFrameSize() const
Definition: Session.h:190
bool isFramingEnabled() const
Definition: Session.h:189
std::map< uint16_t, std::shared_ptr< Framing > > m_framing
Definition: Session.h:186
std::string toHex(T const &_data, int _w=2, HexPrefix _prefix=HexPrefix::DontAdd)
Definition: CommonData.h:54
Mutex x_framing
Mutex for the write queue.
Definition: Session.h:160
std::pair< std::string, u256 > CapDesc
Definition: Common.h:142
RLPStream & append(unsigned _s)
Append given datum to the byte stream.
Definition: RLP.h:395
void registerCapability(CapDesc const &_desc, std::shared_ptr< Capability > _p) override
Definition: Session.cpp:565
void noteRude(SessionFace const &_s, std::string const &_sub=std::string())
Definition: Host.cpp:61
static RLPStream & prep(RLPStream &_s, PacketType _t, unsigned _args=0)
Definition: Session.cpp:197
PacketType
Definition: Common.h:95
void write()
Perform a single round of the write operation. This could end up calling itself asynchronously.
Definition: Session.cpp:258
bool interpret(PacketType _t, RLP const &_r)
Interpret an incoming Session packet.
Definition: Session.cpp:149
size_t count
Definition: ExecStats.cpp:37
The Host class Capabilities should be registered prior to startNetwork, since m_capabilities is not t...
Definition: Host.h:129
ReputationManager & repMan() override
Definition: Session.cpp:74
void multiplexAll()
Definition: Session.cpp:585
std::shared_ptr< RLPXSocket > m_socket
Socket of peer&#39;s connection.
Definition: Session.h:159
void writeFrames()
Definition: Session.cpp:289
void ping() override
Definition: Session.cpp:190
std::hash for asio::adress
Definition: Common.h:323
void drop(DisconnectReason _r)
Drop the connection for the reason _r.
Definition: Session.cpp:327
void registerFraming(uint16_t _id) override
Definition: Session.cpp:573
vector_ref< _T > cropped(size_t _begin, size_t _count) const
Definition: vector_ref.h:62
void doRead()
Perform a read on the socket.
Definition: Session.cpp:374
vector< T > randomSelection(vector< T > const &_t, unsigned _n)
Definition: Session.cpp:100
std::chrono::steady_clock::time_point m_lastReceived
Time point of last message.
Definition: Session.h:173
std::chrono::steady_clock::duration lastPing
Definition: Common.h:157
std::deque< bytes > m_encFrames
Definition: Session.h:187
DisconnectReason
Definition: Common.h:106
std::shared_ptr< Framing > getFraming(uint16_t _protocolID)
Definition: Session.cpp:557
int rating() const override
Definition: Session.cpp:95
#define DEV_GUARDED(MUTEX)
Simple block guard.
Definition: Guards.h:144
std::string reasonOf(DisconnectReason _r)
Definition: Common.cpp:134
Encapsulation of Frame.
static bool checkPacket(bytesConstRef _msg)
Definition: Session.cpp:209
std::vector< byte > bytes
Definition: Common.h:75
vector_ref< byte const > bytesConstRef
Definition: Common.h:77
RLPStream & appendList(size_t _items)
Appends a list.
Definition: RLP.cpp:276
bool m_dropped
If true, we&#39;ve already divested ourselves of this peer. We&#39;re just waiting for the reads & writes to ...
Definition: Session.h:166
bool checkRead(std::size_t _expected, boost::system::error_code _ec, std::size_t _length)
Check error code after reading and drop peer if error code.
Definition: Session.cpp:452
void start() override
Definition: Session.cpp:364
#define b(i, j)
size_t size() const
Definition: vector_ref.h:55
#define f(x)
Definition: gost.cpp:57
PeerSessionInfo info() const override
Definition: Session.h:114
void addRating(int _r) override
Definition: Session.cpp:84
std::deque< bytes > m_writeQueue
The write queue.
Definition: Session.h:161
std::map< CapDesc, std::shared_ptr< Capability > > m_capabilities
The peer&#39;s capability set.
Definition: Session.h:175
RLPX Packet.
Definition: RLPXPacket.h:39
std::vector< byte > m_data
Buffer for ingress packet data.
Definition: Session.h:162
NodeID id() const override
Definition: Session.cpp:79
std::chrono::steady_clock::time_point m_ping
Time point of last ping.
Definition: Session.h:172
clock::time_point time_point
Definition: bench.h:49
virtual ~Session()
Definition: Session.cpp:50
std::chrono::steady_clock::time_point m_connect
Time point of connection.
Definition: Session.h:171
#define clog(X)
Definition: Log.h:295
void disconnect(DisconnectReason _reason) override
Definition: Session.cpp:351
void sealAndSend(RLPStream &_s, uint16_t _protocolID) override
Definition: Session.cpp:202
PeerSessionInfo m_info
Dynamic information about this peer.
Definition: Session.h:169
void swapOut(bytes &_dest)
Swap the contents of the output stream out for some other byte array.
Definition: RLP.h:439
dev::WithExisting max(dev::WithExisting _a, dev::WithExisting _b)
Definition: Common.h:326
Host * m_server
The host that owns us. Never null.
Definition: Session.h:156
Class for writing to an RLP bytestream.
Definition: RLP.h:383
std::shared_ptr< Peer > m_peer
The Peer object.
Definition: Session.h:165
uint8_t const padding
Length of padding which follows .
uint32_t const length
Size of frame (excludes padding). Max: 2**24.
Class for interpreting Recursive Linear-Prefix Data.
Definition: RLP.h:64
void send(bytes &&_msg, uint16_t _protocolID)
Definition: Session.cpp:218
uint16_t const protocolId
Protocol ID as negotiated by handshake.
ReputationManager & repMan()
Definition: Host.h:221